mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-20 02:50:39 +00:00
Compare commits
18 Commits
problame/l
...
problame/l
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
15b8618d25 | ||
|
|
3e9a731e76 | ||
|
|
257b5d4865 | ||
|
|
05773708d3 | ||
|
|
382473d9a5 | ||
|
|
eb0a698adc | ||
|
|
81b6578c44 | ||
|
|
bc49c73fee | ||
|
|
e98580b092 | ||
|
|
804ef23043 | ||
|
|
87f7d6bce3 | ||
|
|
39e3fbbeb0 | ||
|
|
8d2a4aa5f8 | ||
|
|
d1fcdf75b3 | ||
|
|
7e39a96441 | ||
|
|
babefdd3f9 | ||
|
|
805fee1483 | ||
|
|
85d6d9dc85 |
@@ -145,7 +145,11 @@ runs:
|
||||
|
||||
if [ "${RERUN_FLAKY}" == "true" ]; then
|
||||
mkdir -p $TEST_OUTPUT
|
||||
poetry run ./scripts/flaky_tests.py "${TEST_RESULT_CONNSTR}" --days 10 --output "$TEST_OUTPUT/flaky.json"
|
||||
poetry run ./scripts/flaky_tests.py "${TEST_RESULT_CONNSTR}" \
|
||||
--days 7 \
|
||||
--output "$TEST_OUTPUT/flaky.json" \
|
||||
--pg-version "${DEFAULT_PG_VERSION}" \
|
||||
--build-type "${BUILD_TYPE}"
|
||||
|
||||
EXTRA_PARAMS="--flaky-tests-json $TEST_OUTPUT/flaky.json $EXTRA_PARAMS"
|
||||
fi
|
||||
|
||||
84
Cargo.lock
generated
84
Cargo.lock
generated
@@ -1996,26 +1996,6 @@ dependencies = [
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "io-uring"
|
||||
version = "0.5.13"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dd1e1a01cfb924fd8c5c43b6827965db394f5a3a16c599ce03452266e1cf984c"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "io-uring"
|
||||
version = "0.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "141a0f4546a50b2ed637c7a6df0d7dff45c9f41523254996764461c8ae0d9424"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ipnet"
|
||||
version = "2.7.2"
|
||||
@@ -2115,9 +2095,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
|
||||
|
||||
[[package]]
|
||||
name = "libc"
|
||||
version = "0.2.147"
|
||||
version = "0.2.144"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3"
|
||||
checksum = "2b00cc1c228a6782d0f076e7b232802e0c5689d41bb5df366f2a6b6621cfdfe1"
|
||||
|
||||
[[package]]
|
||||
name = "libloading"
|
||||
@@ -2683,7 +2663,6 @@ dependencies = [
|
||||
"tenant_size_model",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-epoll-uring",
|
||||
"tokio-io-timeout",
|
||||
"tokio-postgres",
|
||||
"tokio-tar",
|
||||
@@ -2857,9 +2836,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "pin-project-lite"
|
||||
version = "0.2.13"
|
||||
version = "0.2.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58"
|
||||
checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116"
|
||||
|
||||
[[package]]
|
||||
name = "pin-utils"
|
||||
@@ -3746,12 +3725,6 @@ dependencies = [
|
||||
"windows-sys 0.42.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "scoped-tls"
|
||||
version = "1.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294"
|
||||
|
||||
[[package]]
|
||||
name = "scopeguard"
|
||||
version = "1.1.0"
|
||||
@@ -4323,18 +4296,18 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "thiserror"
|
||||
version = "1.0.47"
|
||||
version = "1.0.40"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "97a802ec30afc17eee47b2855fc72e0c4cd62be9b4efe6591edde0ec5bd68d8f"
|
||||
checksum = "978c9a314bd8dc99be594bc3c175faaa9794be04a5a5e153caba6915336cebac"
|
||||
dependencies = [
|
||||
"thiserror-impl",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "thiserror-impl"
|
||||
version = "1.0.47"
|
||||
version = "1.0.40"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6bb623b56e39ab7dcd4b1b98bb6c8f8d907ed255b18de254088016b27a8ee19b"
|
||||
checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -4419,40 +4392,22 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tokio"
|
||||
version = "1.32.0"
|
||||
version = "1.28.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9"
|
||||
checksum = "0aa32867d44e6f2ce3385e89dceb990188b8bb0fb25b0cf576647a6f98ac5105"
|
||||
dependencies = [
|
||||
"backtrace",
|
||||
"autocfg",
|
||||
"bytes",
|
||||
"libc",
|
||||
"mio",
|
||||
"num_cpus",
|
||||
"parking_lot 0.12.1",
|
||||
"pin-project-lite",
|
||||
"signal-hook-registry",
|
||||
"socket2 0.5.3",
|
||||
"socket2 0.4.9",
|
||||
"tokio-macros",
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-epoll-uring"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=problame/hacky-openat#96e5a1f3a3d6921438002807475d01540e1211b2"
|
||||
dependencies = [
|
||||
"futures",
|
||||
"io-uring 0.6.1",
|
||||
"libc",
|
||||
"once_cell",
|
||||
"scopeguard",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-uring",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-io-timeout"
|
||||
version = "1.2.0"
|
||||
@@ -4592,20 +4547,6 @@ dependencies = [
|
||||
"tungstenite 0.20.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-uring"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0d5e02bb137e030b3a547c65a3bd2f1836d66a97369fdcc69034002b10e155ef"
|
||||
dependencies = [
|
||||
"io-uring 0.5.13",
|
||||
"libc",
|
||||
"scoped-tls",
|
||||
"slab",
|
||||
"socket2 0.4.9",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-util"
|
||||
version = "0.7.8"
|
||||
@@ -5073,6 +5014,7 @@ dependencies = [
|
||||
"nix 0.26.2",
|
||||
"once_cell",
|
||||
"pin-project-lite",
|
||||
"postgres_connection",
|
||||
"pq_proto",
|
||||
"rand",
|
||||
"regex",
|
||||
|
||||
55
Cargo.toml
55
Cargo.toml
@@ -200,3 +200,58 @@ tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", re
|
||||
# Besides, debug info should not affect the performance.
|
||||
debug = true
|
||||
|
||||
# disable debug symbols for all packages except this one to decrease binaries size
|
||||
[profile.release.package."*"]
|
||||
debug = false
|
||||
|
||||
[profile.release-line-debug]
|
||||
inherits = "release"
|
||||
debug = 1 # true = 2 = all symbols, 1 = line only
|
||||
[profile.release-line-debug-lto]
|
||||
inherits = "release"
|
||||
debug = 1 # true = 2 = all symbols, 1 = line only
|
||||
lto = true
|
||||
|
||||
[profile.release-line-debug-size]
|
||||
inherits = "release"
|
||||
debug = 1 # true = 2 = all symbols, 1 = line only
|
||||
opt-level = "s"
|
||||
[profile.release-line-debug-zize]
|
||||
inherits = "release"
|
||||
debug = 1 # true = 2 = all symbols, 1 = line only
|
||||
opt-level = "z"
|
||||
[profile.release-line-debug-size-lto]
|
||||
inherits = "release"
|
||||
debug = 1 # true = 2 = all symbols, 1 = line only
|
||||
opt-level = "s"
|
||||
lto = true
|
||||
[profile.release-line-debug-zize-lto]
|
||||
inherits = "release"
|
||||
debug = 1 # true = 2 = all symbols, 1 = line only
|
||||
opt-level = "z"
|
||||
lto = true
|
||||
|
||||
[profile.release-no-debug]
|
||||
inherits = "release"
|
||||
debug = false # true = 2 = all symbols, 1 = line only
|
||||
|
||||
[profile.release-no-debug-size]
|
||||
inherits = "release"
|
||||
debug = false # true = 2 = all symbols, 1 = line only
|
||||
opt-level = "s"
|
||||
[profile.release-no-debug-zize]
|
||||
inherits = "release"
|
||||
debug = false # true = 2 = all symbols, 1 = line only
|
||||
opt-level = "z"
|
||||
|
||||
[profile.release-no-debug-size-lto]
|
||||
inherits = "release"
|
||||
debug = false # true = 2 = all symbols, 1 = line only
|
||||
opt-level = "s"
|
||||
lto = true
|
||||
|
||||
[profile.release-no-debug-zize-lto]
|
||||
inherits = "release"
|
||||
debug = false # true = 2 = all symbols, 1 = line only
|
||||
opt-level = "z"
|
||||
lto = true
|
||||
|
||||
@@ -211,8 +211,8 @@ RUN wget https://github.com/df7cb/postgresql-unit/archive/refs/tags/7.7.tar.gz -
|
||||
FROM build-deps AS vector-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.4.4.tar.gz -O pgvector.tar.gz && \
|
||||
echo "1cb70a63f8928e396474796c22a20be9f7285a8a013009deb8152445b61b72e6 pgvector.tar.gz" | sha256sum --check && \
|
||||
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.5.0.tar.gz -O pgvector.tar.gz && \
|
||||
echo "d8aa3504b215467ca528525a6de12c3f85f9891b091ce0e5864dd8a9b757f77b pgvector.tar.gz" | sha256sum --check && \
|
||||
mkdir pgvector-src && cd pgvector-src && tar xvzf ../pgvector.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
|
||||
@@ -19,9 +19,10 @@ Also `compute_ctl` spawns two separate service threads:
|
||||
- `http-endpoint` runs a Hyper HTTP API server, which serves readiness and the
|
||||
last activity requests.
|
||||
|
||||
If the `vm-informant` binary is present at `/bin/vm-informant`, it will also be started. For VM
|
||||
compute nodes, `vm-informant` communicates with the VM autoscaling system. It coordinates
|
||||
downscaling and (eventually) will request immediate upscaling under resource pressure.
|
||||
If `AUTOSCALING` environment variable is set, `compute_ctl` will start the
|
||||
`vm-monitor` located in [`neon/libs/vm_monitor`]. For VM compute nodes,
|
||||
`vm-monitor` communicates with the VM autoscaling system. It coordinates
|
||||
downscaling and requests immediate upscaling under resource pressure.
|
||||
|
||||
Usage example:
|
||||
```sh
|
||||
|
||||
@@ -20,9 +20,10 @@
|
||||
//! - `http-endpoint` runs a Hyper HTTP API server, which serves readiness and the
|
||||
//! last activity requests.
|
||||
//!
|
||||
//! If the `vm-informant` binary is present at `/bin/vm-informant`, it will also be started. For VM
|
||||
//! compute nodes, `vm-informant` communicates with the VM autoscaling system. It coordinates
|
||||
//! downscaling and (eventually) will request immediate upscaling under resource pressure.
|
||||
//! If `AUTOSCALING` environment variable is set, `compute_ctl` will start the
|
||||
//! `vm-monitor` located in [`neon/libs/vm_monitor`]. For VM compute nodes,
|
||||
//! `vm-monitor` communicates with the VM autoscaling system. It coordinates
|
||||
//! downscaling and requests immediate upscaling under resource pressure.
|
||||
//!
|
||||
//! Usage example:
|
||||
//! ```sh
|
||||
@@ -280,6 +281,7 @@ fn main() -> Result<()> {
|
||||
let vm_monitor_addr = matches.get_one::<String>("vm-monitor-addr");
|
||||
let file_cache_connstr = matches.get_one::<String>("filecache-connstr");
|
||||
let cgroup = matches.get_one::<String>("cgroup");
|
||||
let file_cache_on_disk = matches.get_flag("file-cache-on-disk");
|
||||
|
||||
// Only make a runtime if we need to.
|
||||
// Note: it seems like you can make a runtime in an inner scope and
|
||||
@@ -312,6 +314,7 @@ fn main() -> Result<()> {
|
||||
cgroup: cgroup.cloned(),
|
||||
pgconnstr: file_cache_connstr.cloned(),
|
||||
addr: vm_monitor_addr.cloned().unwrap(),
|
||||
file_cache_on_disk,
|
||||
})),
|
||||
token.clone(),
|
||||
))
|
||||
@@ -482,6 +485,11 @@ fn cli() -> clap::Command {
|
||||
)
|
||||
.value_name("FILECACHE_CONNSTR"),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("file-cache-on-disk")
|
||||
.long("file-cache-on-disk")
|
||||
.action(clap::ArgAction::SetTrue),
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
957
docs/rfcs/025-generation-numbers.md
Normal file
957
docs/rfcs/025-generation-numbers.md
Normal file
@@ -0,0 +1,957 @@
|
||||
# Pageserver: split-brain safety for remote storage through generation numbers
|
||||
|
||||
## Summary
|
||||
|
||||
A scheme of logical "generation numbers" for tenant attachment to pageservers is proposed, along with
|
||||
changes to the remote storage format to include these generation numbers in S3 keys.
|
||||
|
||||
Using the control plane as the issuer of these generation numbers enables strong anti-split-brain
|
||||
properties in the pageserver cluster without implementing a consensus mechanism directly
|
||||
in the pageservers.
|
||||
|
||||
## Motivation
|
||||
|
||||
Currently, the pageserver's remote storage format does not provide a mechanism for addressing
|
||||
split brain conditions that may happen when replacing a node or when migrating
|
||||
a tenant from one pageserver to another.
|
||||
|
||||
From a remote storage perspective, a split brain condition occurs whenever two nodes both think
|
||||
they have the same tenant attached, and both can write to S3. This can happen in the case of a
|
||||
network partition, pathologically long delays (e.g. suspended VM), or software bugs.
|
||||
|
||||
In the current deployment model, control plane guarantees that a tenant is attached to one
|
||||
pageserver at a time, thereby ruling out split-brain conditions resulting from dual
|
||||
attachment (however, there is always the risk of a control plane bug). This control
|
||||
plane guarantee prevents robust response to failures, as if a pageserver is unresponsive
|
||||
we may not detach from it. The mechanism in this RFC fixes this, by making it safe to
|
||||
attach to a new, different pageserver even if an unresponsive pageserver may be running.
|
||||
|
||||
Futher, lack of safety during split-brain conditions blocks two important features where occasional
|
||||
split-brain conditions are part of the design assumptions:
|
||||
|
||||
- seamless tenant migration ([RFC PR](https://github.com/neondatabase/neon/pull/5029))
|
||||
- automatic pageserver instance failure handling (aka "failover") (RFC TBD)
|
||||
|
||||
### Prior art
|
||||
|
||||
- 020-pageserver-s3-coordination.md
|
||||
- 023-the-state-of-pageserver-tenant-relocation.md
|
||||
- 026-pageserver-s3-mvcc.md
|
||||
|
||||
This RFC has broad similarities to the proposal to implement a MVCC scheme in
|
||||
S3 object names, but this RFC avoids a general purpose transaction scheme in
|
||||
favour of more specialized "generations" that work like a transaction ID that
|
||||
always has the same lifetime as a pageserver process or tenant attachment, whichever
|
||||
is shorter.
|
||||
|
||||
## Requirements
|
||||
|
||||
- Accommodate storage backends with no atomic or fencing capability (i.e. work within
|
||||
S3's limitation that there are no atomics and clients can't be fenced)
|
||||
- Don't depend on any STONITH or node fencing in the compute layer (i.e. we will not
|
||||
assume that we can reliably kill and EC2 instance and have it die)
|
||||
- Scoped per-tenant, not per-pageserver; for _seamless tenant migration_, we need
|
||||
per-tenant granularity, and for _failover_, we likely want to spread the workload
|
||||
of the failed pageserver instance to a number of peers, rather than monolithically
|
||||
moving the entire workload to another machine.
|
||||
We do not rule out the latter case, but should not constrain ourselves to it.
|
||||
|
||||
## Design Tenets
|
||||
|
||||
These are not requirements, but are ideas that guide the following design:
|
||||
|
||||
- Avoid implementing another consensus system: we already have a strongly consistent
|
||||
database in the control plane that can do atomic operations where needed, and we also
|
||||
have a Paxos implementation in the safekeeper.
|
||||
- Avoiding locking in to specific models of how failover will work (e.g. do not assume that
|
||||
all the tenants on a pageserver will fail over as a unit).
|
||||
- Be strictly correct when it comes to data integrity. Occasional failures of availability
|
||||
are tolerable, occasional data loss is not.
|
||||
|
||||
## Non Goals
|
||||
|
||||
The changes in this RFC intentionally isolate the design decision of how to define
|
||||
logical generations numbers and object storage format in a way that is somewhat flexible with
|
||||
respect to how actual orchestration of failover works.
|
||||
|
||||
This RFC intentionally does not cover:
|
||||
|
||||
- Failure detection
|
||||
- Orchestration of failover
|
||||
- Standby modes to keep data ready for fast migration
|
||||
- Intentional multi-writer operation on tenants (multi-writer scenarios are assumed to be transient split-brain situations).
|
||||
- Sharding.
|
||||
|
||||
The interaction between this RFC and those features is discussed in [Appendix B](#appendix-b-interoperability-with-other-features)
|
||||
|
||||
## Impacted Components
|
||||
|
||||
pageserver, control plane, safekeeper (optional)
|
||||
|
||||
## Implementation Part 1: Correctness
|
||||
|
||||
### Summary
|
||||
|
||||
- A per-tenant **generation number** is introduced to uniquely identifying tenant attachments to pageserver processes.
|
||||
|
||||
- This generation number increments each time the control plane modifies a tenant (`Project`)'s assigned pageserver, or when the assigned pageserver restarts.
|
||||
- the control plane is the authority for generation numbers: only it may
|
||||
increment a generation number.
|
||||
|
||||
- **Object keys are suffixed** with the generation number
|
||||
- **Safety for multiply-attached tenants** is provided by the
|
||||
generation number in the object key: the competing pageservers will not
|
||||
try to write to the same keys.
|
||||
- **Safety in split brain for multiple nodes running with
|
||||
the same node ID** is provided by the pageserver calling out to the control plane
|
||||
on startup, to re-attach and thereby increment the generations of any attached tenants
|
||||
- **Safety for deletions** is achieved by deferring the DELETE from S3 to a point in time where the deleting node has validated with control plane that no attachment with a higher generation has a reference to the to-be-DELETEd key.
|
||||
- **The control plane is used to issue generation numbers** to avoid the need for
|
||||
a built-in consensus system in the pageserver, although this could in principle
|
||||
be changed without changing the storage format.
|
||||
|
||||
### Generation numbers
|
||||
|
||||
A generation number is associated with each tenant in the control plane,
|
||||
and each time the attachment status of the tenant changes, this is incremented.
|
||||
Changes in attachment status include:
|
||||
|
||||
- Attaching the tenant to a different pageserver
|
||||
- A pageserver restarting, and "re-attaching" its tenants on startup
|
||||
|
||||
These increments of attachment generation provide invariants we need to avoid
|
||||
split-brain issues in storage:
|
||||
|
||||
- If two pageservers have the same tenant attached, the attachments are guaranteed to have different generation numbers, because the generation would increment
|
||||
while attaching the second one.
|
||||
- If there are multiple pageservers running with the same node ID, all the attachments on all pageservers are guaranteed to have different generation numbers, because the generation would increment
|
||||
when the second node started and re-attached its tenants.
|
||||
|
||||
As long as the infrastructure does not transparently replace an underlying
|
||||
physical machine, we are totally safe. See the later [unsafe case](#unsafe-case-on-badly-behaved-infrastructure) section for details.
|
||||
|
||||
### Object Key Changes
|
||||
|
||||
#### Generation suffix
|
||||
|
||||
All object keys (layer objects and index objects) will contain the attachment
|
||||
generation as a [suffix](#why-a-generation-suffix-rather-than-prefix).
|
||||
This suffix is the primary mechanism for protecting against split-brain situations, and
|
||||
enabling safe multi-attachment of tenants:
|
||||
|
||||
- Two pageservers running with the same node ID (e.g. after a failure, where there is
|
||||
some rogue pageserver still running) will not try to write to the same objects, because at startup they will have re-attached tenants and thereby incremented
|
||||
generation numbers.
|
||||
- Multiple attachments (to different pageservers) of the same tenant will not try to write to the same objects, as each attachment would have a distinct generation.
|
||||
|
||||
The generation is appended in hex format (8 byte string representing
|
||||
u32), to all our existing key names. A u32's range limit would permit
|
||||
27 restarts _per second_ over a 5 year system lifetime: orders of magnitude more than
|
||||
is realistic.
|
||||
|
||||
The exact meaning of the generation suffix can evolve over time if necessary, for
|
||||
example if we chose to implement a failover mechanism internally to the pageservers
|
||||
rather than going via the control plane. The storage format just sees it as a number,
|
||||
with the only semantic property being that the highest numbered index is the latest.
|
||||
|
||||
#### Index changes
|
||||
|
||||
Since object keys now include a generation suffix, the index of these keys must also be updated. IndexPart currently stores keys and LSNs sufficient to reconstruct key names: this would be extended to store the generation as well.
|
||||
|
||||
This will increase the size of the file, but only modestly: layers are already encoded as
|
||||
their string-ized form, so the overhead is about 10 bytes per layer. This will be less if/when
|
||||
the index storage format is migrated to a binary format from JSON.
|
||||
|
||||
#### Visibility
|
||||
|
||||
_This section doesn't describe code changes, but extends on the consequences of the
|
||||
object key changes given above_
|
||||
|
||||
##### Visibility of objects to pageservers
|
||||
|
||||
Pageservers can of course list objects in S3 at any time, but in practice their
|
||||
visible set is based on the contents of their LayerMap, which is initialized
|
||||
from the `index_part.json.???` that they load.
|
||||
|
||||
Starting with the `index_part` from the most recent previous generation
|
||||
(see [loading index_part](#finding-the-remote-indices-for-timelines)), a pageserver
|
||||
initially has visibility of all the objects that were referenced in the loaded index.
|
||||
These objects are guaranteed to remain visible until the current generation is
|
||||
superseded, via pageservers in older generations avoiding deletions (see [deletion](#deletion)).
|
||||
|
||||
The "most recent previous generation" is _not_ necessarily the most recent
|
||||
in terms of walltime, it is the one that is readable at the time a new generation
|
||||
starts. Consider the following sequence of a tenant being re-attached to different
|
||||
pageserver nodes:
|
||||
|
||||
- Create + attach on PS1 in generation 1
|
||||
- PS1 Do some work, write out index_part.json-0001
|
||||
- Attach to PS2 in generation 2
|
||||
- Read index_part.json-0001
|
||||
- PS2 starts doing some work...
|
||||
- Attach to PS3 in generation 3
|
||||
- Read index_part.json-0001
|
||||
- **...PS2 finishes its work: now it writes index_part.json-0002**
|
||||
- PS3 writes out index_part.json-0003
|
||||
|
||||
In the above sequence, the ancestry of indices is:
|
||||
|
||||
```
|
||||
0001 -> 0002
|
||||
|
|
||||
-> 0003
|
||||
```
|
||||
|
||||
This is not an issue for safety: if the 0002 references some object that is
|
||||
not in 0001, then 0003 simply does not see it, and will re-do whatever
|
||||
work was required (e.g. ingesting WAL or doing compaction). Objects referenced
|
||||
by only the 0002 index will never be read by future attachment generations, and
|
||||
will eventually be cleaned up by a scrub (see [scrubbing](#cleaning-up-orphan-objects-scrubbing)).
|
||||
|
||||
##### Visibility of LSNs to clients
|
||||
|
||||
Because index_part.json is now written with a generation suffix, which data
|
||||
is visible depends on which generation the reader is operating in:
|
||||
|
||||
- If one was passively reading from S3 from outside of a pageserver, the
|
||||
visibility of data would depend on which index_part.json-<generation> file
|
||||
one had chosen to read from.
|
||||
- If two pageservers have the same tenant attached, they may have different
|
||||
data visible as they're independently replaying the WAL, and maintaining
|
||||
independent LayerMaps that are written to independent index_part.json files.
|
||||
Data does not have to be remotely committed to be visible.
|
||||
- For a pageserver writing with a stale generation, historic LSNs
|
||||
remain readable until another pageserver (with a higher generation suffix)
|
||||
decides to execute GC deletions. At this point, we may think of the stale
|
||||
attachment's generation as having logically ended: during its existence
|
||||
the generation had a consistent view of the world.
|
||||
- For a newly attached pageserver, its highest visible LSN may appears to
|
||||
go backwards with respect to an earlier attachment, if that earlier
|
||||
attachment had not uploaded all data to S3 before the new attachment.
|
||||
|
||||
### Deletion
|
||||
|
||||
#### Generation number validation
|
||||
|
||||
While writes are de-conflicted by writers always using their own generation number in the key,
|
||||
deletions are slightly more challenging: if a pageserver A is isolated, and the true active node is
|
||||
pageserver B, then it is dangerous for A to do any object deletions, even of objects that it wrote
|
||||
itself, because pageserver's B metadata might reference those objects.
|
||||
|
||||
We solve this by inserting a "generation validation" step between the write of a remote index
|
||||
that un-links a particular object from the index, and the actual deletion of the object, such
|
||||
that deletions strictly obey the following ordering:
|
||||
|
||||
1. Write out index_part.json: this guarantees that any subsequent reader of the metadata will
|
||||
not try and read the object we unlinked.
|
||||
2. Call out to control plane to validate that the generation which we use for our attachment is still the latest.
|
||||
3. If step 2 passes, it is safe to delete the object. Why? The check-in with control plane
|
||||
together with our visibility rules guarantees that any later generation
|
||||
will use either the exact `index_part.json` that we uploaded in step 1, or a successor
|
||||
of it; not an earlier one. In both cases, the `index_part.json` doesn't reference the
|
||||
key we are deleting anymore, so, the key is invisible to any later attachment generation.
|
||||
Hence it's safe to delete it.
|
||||
|
||||
Note that at step 2 we are only confirming that deletions of objects _no longer referenced
|
||||
by the specific `index_part.json` written in step 1_ are safe. If we were attempting other deletions concurrently,
|
||||
these would need their own generation validation step.
|
||||
|
||||
If step 2 fails, we may leak the object. This is safe, but has a cost: see [scrubbing](#cleaning-up-orphan-objects-scrubbing). We may avoid this entirely outside of node
|
||||
failures, if we do proper flushing of deletions on clean shutdown and clean migration.
|
||||
|
||||
To avoid doing a huge number of control plane requests to perform generation validation,
|
||||
validation of many tenants will be done in a single request, and deletions will be queued up
|
||||
prior to validation: see [Persistent deletion queue](#persistent-deletion-queue) for more.
|
||||
|
||||
#### `remote_consistent_lsn` updates
|
||||
|
||||
Remote objects are not the only kind of deletion the pageserver does: it also indirectly deletes
|
||||
WAL data, by feeding back remote_consistent_lsn to safekeepers, as a signal to the safekeepers that
|
||||
they may drop data below this LSN.
|
||||
|
||||
For the same reasons that deletion of objects must be guarded by an attachment generation number
|
||||
validation step, updates to `remote_consistent_lsn` are subject to the same rules, using
|
||||
an ordering as follows:
|
||||
|
||||
1. upload the index_part that covers data up to LSN `L0` to S3
|
||||
2. Call out to control plane to validate that the generation which we use for our attachment is still the latest.
|
||||
3. advance the `remote_consistent_lsn` that we advertise to the safekeepers to `L0`
|
||||
|
||||
If step 2 fails, then the `remote_consistent_lsn` advertised
|
||||
to safekeepers will not advance again until a pageserver
|
||||
with the latest generation is ready to do so.
|
||||
|
||||
**Note:** at step 3 we are not advertising the _latest_ remote_consistent_lsn, we are
|
||||
advertising the value in the index_part that we uploaded in step 1. This provides
|
||||
a strong ordering guarantee.
|
||||
|
||||
Internally to the pageserver, each timeline will have two remote_consistent_lsn values: the one that
|
||||
reflects its latest write to remote storage, and the one that reflects the most
|
||||
recent validation of generation number. It is only the latter value that may
|
||||
be advertised to the outside world (i.e. to the safekeeper).
|
||||
|
||||
The control plane remains unaware of `remote_consistent_lsn`: it only has to validate
|
||||
the freshness of generation numbers, thereby granting the pageserver permission to
|
||||
share the information with the safekeeper.
|
||||
|
||||
For convenience, in subsequent sections and RFCs we will use "deletion" to mean both deletion
|
||||
of objects in S3, and updates to the `remote_consistent_lsn`, as updates to the remote consistent
|
||||
LSN are de-facto deletions done via the safekeeper, and both kinds of deletion are subject to
|
||||
the same generation validation requirement.
|
||||
|
||||
### Pageserver attach/startup changes
|
||||
|
||||
#### Attachment
|
||||
|
||||
Calls to `/v1/tenant/{tenant_id}/attach` are augmented with an additional
|
||||
`generation` field in the body.
|
||||
|
||||
The pageserver does not persist this: a generation is only good for the lifetime
|
||||
of a process.
|
||||
|
||||
#### Finding the remote indices for timelines
|
||||
|
||||
Because index files are now suffixed with generation numbers, the pageserver
|
||||
cannot always GET the remote index in one request, because it can't always
|
||||
know a-priori what the latest remote index is.
|
||||
|
||||
Typically, the most recent generation to write an index would be our own
|
||||
generation minus 1. However, this might not be the case: the previous
|
||||
node might have started and acquired a generation number, and then crashed
|
||||
before writing out a remote index.
|
||||
|
||||
In the general case and as a fallback, the pageserver may list all the `index_part.json`
|
||||
files for a timeline, sort them by generation, and pick the highest that is `<=`
|
||||
its current generation for this attachment. The tenant should never load an index
|
||||
with an attachment generation _newer_ than its own.
|
||||
These two rules combined ensure that objects written by later generations are never visible to earlier generations.
|
||||
|
||||
Note that if a given attachment picks an index part from an earlier generation (say n-2), but crashes & restarts before it writes its own generation's index part, next time it tries to pick an index part there may be an index part from generation n-1.
|
||||
It would pick the n-1 index part in that case, because it's sorted higher than the previous one from generation n-2.
|
||||
So, above rules guarantee no determinism in selecting the index part.
|
||||
are allowed to be attached with stale attachment generations during a multiply-attached
|
||||
phase in a migration, and in this instance if the old location's pageserver restarts,
|
||||
it should not try and load the newer generation's index.
|
||||
|
||||
To summarize, on starting a timeline, the pageserver will:
|
||||
|
||||
1. Issue a GET for index_part.json-<my generation - 1>
|
||||
2. If 1 failed, issue a ListObjectsv2 request for index_part.json\* and
|
||||
pick the newest.
|
||||
|
||||
One could optimize this further by using the control plane to record specifically
|
||||
which generation most recently wrote an index_part.json, if necessary, to increase
|
||||
the probability of finding the index_part.json in one GET. One could also improve
|
||||
the chances by having pageservers proactively write out index_part.json after they
|
||||
get a new generation ID.
|
||||
|
||||
#### Re-attachment on startup
|
||||
|
||||
On startup, the pageserver will call out to an new control plane `/re-attach`
|
||||
API (see [Generation API](#generation-api)). This returns a list of
|
||||
tenants that should be attached to the pageserver, and their generation numbers, which
|
||||
the control plane will increment before returning.
|
||||
|
||||
The pageserver should still scan its local disk on startup, but should _delete_
|
||||
any local content for tenants not indicated in the `/re-attach` response: their
|
||||
absence is an implicit detach operation.
|
||||
|
||||
**Note** if a tenant is omitted from the re-attach response, its local disk content
|
||||
will be deleted. This will change in subsequent work, when the control plane gains
|
||||
the concept of a secondary/standby location: a node with local content may revert
|
||||
to this status and retain some local content.
|
||||
|
||||
#### Cleaning up previous generations' remote indices
|
||||
|
||||
Deletion of old indices is not necessary for correctness, although it is necessary
|
||||
to avoid the ListObjects fallback in the previous section becoming ever more expensive.
|
||||
|
||||
Once the new attachment has written out its index_part.json, it may asynchronously clean up historic index_part.json
|
||||
objects that were found.
|
||||
|
||||
We may choose to implement this deletion either as an explicit step after we
|
||||
write out index_part for the first time in a pageserver's lifetime, or for
|
||||
simplicity just do it periodically as part of the background scrub (see [scrubbing](#cleaning-up-orphan-objects-scrubbing));
|
||||
|
||||
### Control Plane Changes
|
||||
|
||||
#### Store generations for attaching tenants
|
||||
|
||||
- The `Project` table must store the generation number for use when
|
||||
attaching the tenant to a new pageserver.
|
||||
- The `/v1/tenant/:tenant_id/attach` pageserver API will require the generation number,
|
||||
which the control plane can supply by simply incrementing the `Project`'s
|
||||
generation number each time the tenant is attached to a different server: the same database
|
||||
transaction that changes the assigned pageserver should also change the generation number.
|
||||
|
||||
#### Generation API
|
||||
|
||||
This section describes an API that could be provided directly by the control plane,
|
||||
or built as a separate microservice. In earlier parts of the RFC, when we
|
||||
discuss the control plane providing generation numbers, we are referring to this API.
|
||||
|
||||
The API endpoints used by the pageserver to acquire and validate generation
|
||||
numbers are quite simple, and only require access to some persistent and
|
||||
linerizable storage (such as a database).
|
||||
|
||||
Building this into the control plane is proposed as a least-effort option to exploit existing infrastructure and implement generation number issuance in the same transaction that mandates it (i.e., the transaction that updates the `Project` assignment to another pageserver).
|
||||
However, this is not mandatory: this "Generation Number Issuer" could
|
||||
be built as a microservice. In practice, we will write such a miniature service
|
||||
anyway, to enable E2E pageserver/compute testing without control plane.
|
||||
|
||||
The endpoints required by pageservers are:
|
||||
|
||||
##### `/re-attach`
|
||||
|
||||
- Request: `{node_id: <u32>}`
|
||||
- Response:
|
||||
- 200 `{tenants: [{id: <TenantId>, gen: <u32>}]}`
|
||||
- 404: unknown node_id
|
||||
- (Future: 429: flapping detected, perhaps nodes are fighting for the same node ID,
|
||||
or perhaps this node was in a retry loop)
|
||||
- (On unknown tenants, omit tenant from `tenants` array)
|
||||
- Server behavior: query database for which tenants should be attached to this pageserver.
|
||||
- for each tenant that should be attached, increment the attachment generation and
|
||||
include the new generation in the response
|
||||
- Client behavior:
|
||||
- for all tenants in the response, activate with the new generation number
|
||||
- for any local disk content _not_ referenced in the response, act as if we
|
||||
had been asked to detach it (i.e. delete local files)
|
||||
|
||||
**Note** the `node_id` in this request will change in future if we move to ephemeral
|
||||
node IDs, to be replaced with some correlation ID that helps the control plane realize
|
||||
if a process is running with the same storage as a previous pageserver process (e.g.
|
||||
we might use EC instance ID, or we might just write some UUID to the disk the first
|
||||
time we use it)
|
||||
|
||||
##### `/validate`
|
||||
|
||||
- Request: `{'tenants': [{tenant: <tenant id>, attach_gen: <gen>}, ...]}'`
|
||||
- Response:
|
||||
- 200 `{'tenants': [{tenant: <tenant id>, status: <bool>}...]}`
|
||||
- (On unknown tenants, omit tenant from `tenants` array)
|
||||
- Purpose: enable the pageserver to discover for the given attachments whether they are still the latest.
|
||||
- Server behavior: this is a read-only operation: simply compare the generations in the request with
|
||||
the generations known to the server, and set status to `true` if they match.
|
||||
- Client behavior: clients must not do deletions within a tenant's remote data until they have
|
||||
received a response indicating the generation they hold for the attachment is current.
|
||||
|
||||
#### Use of `/load` and `/ignore` APIs
|
||||
|
||||
Because the pageserver will be changed to only attach tenants on startup
|
||||
based on the control plane's response to a `/re-attach` request, the load/ignore
|
||||
APIs no longer make sense in their current form.
|
||||
|
||||
The `/load` API becomes functionally equivalent to attach, and will be removed:
|
||||
any location that used `/load` before should just attach instead.
|
||||
|
||||
The `/ignore` API is equivalent to detaching, but without deleting local files.
|
||||
|
||||
### Timeline/Branch creation & deletion
|
||||
|
||||
All of the previous arguments for safety have described operations within
|
||||
a timeline, where we may describe a sequence that includes updates to
|
||||
index_part.json, and where reads and writes are coming from a postgres
|
||||
endpoint (writes via the safekeeper).
|
||||
|
||||
Creating or destroying timeline is a bit different, because writes
|
||||
are coming from the control plane.
|
||||
|
||||
We must be safe against scenarios such as:
|
||||
|
||||
- A tenant is attached to pageserver B while pageserver A is
|
||||
in the middle of servicing an RPC from the control plane to
|
||||
create or delete a tenant.
|
||||
- A pageserver A has been sent a timeline creation request
|
||||
but becomes unresponsive. The tenant is attached to a
|
||||
different pageserver B, and the timeline creation request
|
||||
is sent there too.
|
||||
|
||||
#### Timeline Creation
|
||||
|
||||
If some very slow node tries to do a timeline creation _after_
|
||||
a more recent generation node has already created the timeline
|
||||
and written some data into it, that must not cause harm. This
|
||||
is provided in timeline creations by the way all the objects
|
||||
within the timeline's remote path include a generation suffix:
|
||||
a slow node in an old generation that attempts to "create" a timeline
|
||||
that already exists will just emit an index_part.json with
|
||||
an old generation suffix.
|
||||
|
||||
Timeline IDs are never reused, so we don't have
|
||||
to worry about the case of create/delete/create cycles. If they
|
||||
were re-used during a disaster recovery "un-delete" of a timeline,
|
||||
that special case can be handled by calling out to all available pageservers
|
||||
to check that they return 404 for the timeline, and to flush their
|
||||
deletion queues in case they had any deletions pending from the
|
||||
timeline.
|
||||
|
||||
The above makes it safe for control plane to change the assignment of
|
||||
tenant to pageserver in control plane while a timeline creation is ongoing.
|
||||
The reason is that the creation request against the new assigned pageserver
|
||||
uses a new generation number. However, care must be taken by control plane
|
||||
to ensure that a "timeline creation successul" response from some pageserver
|
||||
is checked for the pageserver's generation for that timeline's tenant still being the latest.
|
||||
If it is not the latest, the response does not constitute a successful timeline creation.
|
||||
It is acceptable to discard such responses, the scrubber will clean up the S3 state.
|
||||
It is better to issue a timelien deletion request to the stale attachment.
|
||||
|
||||
#### Timeline Deletion
|
||||
|
||||
Tenant/timeline deletion operations are exempt from generation validation
|
||||
on deletes, and therefore don't have to go through the same deletion
|
||||
queue as GC/compaction layer deletions. This is because once a
|
||||
delete is issued by the control plane, it is a promise that the
|
||||
control plane will keep trying until the deletion is done, so even stale
|
||||
pageservers are permitted to go ahead and delete the objects.
|
||||
|
||||
The implications of this for control plane are:
|
||||
|
||||
- During timeline/tenant deletion, the control plane must wait for the deletion to
|
||||
be truly complete (status 404) and also handle the case where the pageserver
|
||||
becomes unavailable, either by waiting for a replacement with the same node_id,
|
||||
or by *re-attaching the tenant elsewhere.
|
||||
|
||||
- The control plane must persist its intent to delete
|
||||
a timeline/tenant before issuing any RPCs, and then once it starts, it must
|
||||
keep retrying until the tenant/timeline is gone. This is already handled
|
||||
by using a persistent `Operation` record that is retried indefinitely.
|
||||
|
||||
Timeline deletion may result in a special kind of object leak, where
|
||||
the latest generation attachment completes a deletion (including erasing
|
||||
all objects in the timeline path), but some slow/partitioned node is
|
||||
writing into the timeline path with a stale generation number. This would
|
||||
not be caught by any per-timeline scrubbing (see [scrubbing](#cleaning-up-orphan-objects-scrubbing)), since scrubbing happens on the
|
||||
attached pageserver, and once the timeline is deleted it isn't attached anywhere.
|
||||
This scenario should be pretty rare, and the control plane can make it even
|
||||
rarer by ensuring that if a tenant is in a multi-attached state (e.g. during
|
||||
migration), we wait for that to complete before processing the deletion. Beyond
|
||||
that, we may implement some other top-level scrub of timelines in
|
||||
an external tool, to identify any tenant/timeline paths that are not found
|
||||
in the control plane database.
|
||||
|
||||
#### Examples
|
||||
|
||||
- Deletion, node restarts partway through:
|
||||
- By the time we returned 202, we have written a remote delete marker
|
||||
- Any subsequent incarnation of the same node_id will see the remote
|
||||
delete marker and continue to process the deletion
|
||||
- If the original pageserver is lost permanently and no replacement
|
||||
with the same node_id is available, then the control plane must recover
|
||||
by re-attaching the tenant to a different node.
|
||||
- Creation, node becomes unresponsive partway through.
|
||||
- Control plane will see HTTP request timeout, keep re-issuing
|
||||
request to whoever is the latest attachment point for the tenant
|
||||
until it succeeds.
|
||||
- Stale nodes may be trying to execute timeline creation: they will
|
||||
write out index_part.json files with
|
||||
stale attachment generation: these will be eventually cleaned up
|
||||
by the same mechanism as other old indices.
|
||||
|
||||
### Unsafe case on badly behaved infrastructure
|
||||
|
||||
This section is only relevant if running on a different environment
|
||||
than EC2 machines with ephemeral disks.
|
||||
|
||||
If we ever run pageservers on infrastructure that might transparently restart
|
||||
a pageserver while leaving an old process running (e.g. a VM gets rescheduled
|
||||
without the old one being fenced), then there is a risk of corruption, when
|
||||
the control plane attaches the tenant, as follows:
|
||||
|
||||
- If the control plane sends an `/attach` request to node A, then node A dies
|
||||
and is replaced, and the control plane's retries the request without
|
||||
incrementing that attachment ID, then it could end up with two physical nodes
|
||||
both using the same generation number.
|
||||
- This is not an issue when using EC2 instances with ephemeral storage, as long
|
||||
as the control plane never re-uses a node ID, but it would need re-examining
|
||||
if running on different infrastructure.
|
||||
- To robustly protect against this class of issue, we would either:
|
||||
- add a "node generation" to distinguish between different processes holding the
|
||||
same node_id.
|
||||
- or, dispense with static node_id entirely and issue an ephemeral ID to each
|
||||
pageserver process when it starts.
|
||||
|
||||
## Implementation Part 2: Optimizations
|
||||
|
||||
### Persistent deletion queue
|
||||
|
||||
Between writing our a new index_part.json that doesn't reference an object,
|
||||
and executing the deletion, an object passes through a window where it is
|
||||
only referenced in memory, and could be leaked if the pageserver is stopped
|
||||
uncleanly. That introduces conflicting incentives: on the one hand, we would
|
||||
like to delay and batch deletions to
|
||||
1. minimize the cost of the mandatory validations calls to control plane, and
|
||||
2. minimize cost for DeleteObjects requests.
|
||||
On the other hand we would also like to minimize leakage by executing
|
||||
deletions promptly.
|
||||
|
||||
To resolve this, we may make the deletion queue persistent
|
||||
and then executing these in the background at a later time.
|
||||
|
||||
_Note: The deletion queue's reason for existence is optimization rather than correctness,
|
||||
so there is a lot of flexibility in exactly how the it should work,
|
||||
as long as it obeys the rule to validate generations before executing deletions,
|
||||
so the following details are not essential to the overall RFC._
|
||||
|
||||
#### Scope
|
||||
|
||||
The deletion queue will be global per pageserver, not per-tenant. There
|
||||
are several reasons for this choice:
|
||||
|
||||
- Use the queue as a central point to coalesce validation requests to the
|
||||
control plane: this avoids individual `Timeline` objects ever touching
|
||||
the control plane API, and avoids them having to know the rules about
|
||||
validating deletions. This separation of concerns will avoid burdening
|
||||
the already many-LoC `Timeline` type with even more responsibility.
|
||||
- Decouple the deletion queue from Tenant attachment lifetime: we may
|
||||
"hibernate" an inactive tenant by tearing down its `Tenant`/`Timeline`
|
||||
objects in the pageserver, without having to wait for deletions to be done.
|
||||
- Amortize the cost of I/O for the persistent queue, instead of having many
|
||||
tiny queues.
|
||||
- Coalesce deletions into a smaller number of larger DeleteObjects calls
|
||||
|
||||
Because of the cost of doing I/O for persistence, and the desire to coalesce
|
||||
generation validation requests across tenants, and coalesce deletions into
|
||||
larger DeleteObjects requests, there will be one deletion queue per pageserver
|
||||
rather than one per tenant. This has the added benefit that when deactivating
|
||||
a tenant, we do not have to drain their deletion queue: deletions can proceed
|
||||
for a tenant whose main `Tenant` object has been torn down.
|
||||
|
||||
#### Flow of deletion
|
||||
|
||||
The flow of a deletion is becomes:
|
||||
|
||||
1. Need for deletion of an object (=> layer file) is identified.
|
||||
2. Unlink the object from all the places that reference it (=> `index_part.json`).
|
||||
3. Enqueue the deletion to a persistent queue.
|
||||
Each entry is `tenant_id, attachment_generation, S3 key`.
|
||||
4. Validate & execute in batches:
|
||||
4.1 For a batch of entries, call into control plane.
|
||||
4.2 For the subset of entries that passed validation, execute a `DeleteObjects` S3 DELETE request for their S3 keys.
|
||||
|
||||
As outlined in the Part 1 on correctness, it is critical that deletions are only
|
||||
executed once the key is not referenced anywhere in S3.
|
||||
This property is obviously upheld by the scheme above.
|
||||
|
||||
#### We Accept Object Leakage In Acceptable Circumcstances
|
||||
|
||||
If we crash in the flow above between (2) and (3), we lose track of unreferenced object.
|
||||
Further, enqueuing a single to the persistent queue may not be durable immediately to amortize cost of flush to disk.
|
||||
This is acceptable for now, it can be caught by [the scrubber](#cleaning-up-orphan-objects-scrubbing).
|
||||
|
||||
There are various measures we can take to improve this in the future.
|
||||
1. Cap amount of time until enqueued entry becomes durable (timeout for flush-to-tisk)
|
||||
2. Proactively flush:
|
||||
- On graceful shutdown, as we anticipate that some or
|
||||
all of our attachments may be re-assigned while we are offline.
|
||||
- On tenant detach.
|
||||
3. For each entry, keep track of whether it has passed (2).
|
||||
Only admit entries to (4) one they have passed (2).
|
||||
This requires re-writing / two queue entries (intent, commit) per deletion.
|
||||
|
||||
The important take-away with any of the above is that it's not
|
||||
disastrous to leak objects in exceptional circumstances.
|
||||
|
||||
#### Operations that may skip the queue
|
||||
|
||||
Deletions of an entire timeline are [exempt](#Timeline-Deletion) from generation number validation. Once the
|
||||
control plane sends the deletion request, there is no requirement to retain the readability
|
||||
of any data within the timeline, and all objects within the timeline path may be deleted
|
||||
at any time from the control plane's deletion request onwards.
|
||||
|
||||
Since deletions of smaller timelines won't have enough objects to compose a full sized
|
||||
DeleteObjects request, it is still useful to send these through the last part of the
|
||||
deletion pipeline to coalesce with other executing deletions: to enable this, the
|
||||
deletion queue should expose two input channels: one for deletions that must be
|
||||
processed in a generation-aware way, and a fast path for timeline deletions, where
|
||||
that fast path may skip validation and the persistent queue.
|
||||
|
||||
### Cleaning up orphan objects (scrubbing)
|
||||
|
||||
An orphan object is any object which is no longer referenced by a running node or by metadata.
|
||||
|
||||
Examples of how orphan objects arise:
|
||||
|
||||
- A node PUTs a layer object, then crashes before it writes the
|
||||
index_part.json that references that layer.
|
||||
- A stale node carries on running for some time, and writes out an unbounded number of
|
||||
objects while it believes itself to be the rightful writer for a tenant.
|
||||
- A pageserver crashes between un-linking an object from the index, and persisting
|
||||
the object to its deletion queue.
|
||||
|
||||
Orphan objects are functionally harmless, but have a small cost due to S3 capacity consumed. We
|
||||
may clean them up at some time in the future, but doing a ListObjectsv2 operation and cross
|
||||
referencing with the latest metadata to identify objects which are not referenced.
|
||||
|
||||
Scrubbing will be done only by an attached pageserver (not some third party process), and deletions requested during scrub will go through the same
|
||||
validation as all other deletions: the attachment generation must be
|
||||
fresh. This avoids the possibility of a stale pageserver incorrectly
|
||||
thinking than an object written by a newer generation is stale, and deleting
|
||||
it.
|
||||
|
||||
It is not strictly necessary that scrubbing be done by an attached
|
||||
pageserver: it could also be done externally. However, an external
|
||||
scrubber would still require the same validation procedure that
|
||||
a pageserver's deletion queue performs, before actually erasing
|
||||
objects.
|
||||
|
||||
## Operational impact
|
||||
|
||||
### Availability
|
||||
|
||||
Coordination of generation numbers via the control plane introduce a dependency for certain
|
||||
operations:
|
||||
|
||||
1. Starting new pageservers (or activating pageservers after a restart)
|
||||
2. Executing enqueued deletions
|
||||
3. Advertising updated `remote_consistent_lsn` to enable WAL trimming
|
||||
|
||||
Item 1. would mean that some in-place restarts that previously would have resumed service even if the control plane were
|
||||
unavailable, will now not resume service to users until the control plane is available. We could
|
||||
avoid this by having a timeout on communication with the control plane, and after some timeout,
|
||||
resume service with the previous generation numbers (assuming this was persisted to disk). However,
|
||||
this is unlikely to be needed as the control plane is already an essential & highly available component. Also, having a node re-use an old generation number would complicate
|
||||
reasoning about the system, as it would break the invariant that a generation number uniquely identifies
|
||||
a tenant's attachment to a given pageserver _process_: it would merely identify the tenant's attachment
|
||||
to the pageserver _machine_ or its _on-disk-state_.
|
||||
|
||||
Item 2. is a non-issue operationally: it's harmless to delay deletions, the only impact of objects pending deletion is
|
||||
the S3 capacity cost.
|
||||
|
||||
Item 3. could be an issue if safekeepers are low on disk space and the control plane is unavailable for a long time. If this became an issue,
|
||||
we could adjust the safekeeper to delete segments from local disk sooner, as soon as they're uploaded to S3, rather than waiting for
|
||||
remote_consistent_lsn to advance.
|
||||
|
||||
For a managed service, the general approach should be to make sure we are monitoring & respond fast enough
|
||||
that control plane outages are bounded in time.
|
||||
|
||||
There is also the fact that control plane runs in a single region.
|
||||
The latency for distant regions is not a big concern for us because all request types added by this RFC are either infrequent or not in the way of the data path.
|
||||
However, we lose region isolation for the operations listed above.
|
||||
The ongoing work to split console and control will give us per-region control plane, and all operations in this RFC can be handled by these per-region control planes.
|
||||
With that in mind, we accept the trade-offs outlined in this paragraph.
|
||||
|
||||
We will also implement an "escape hatch" config generation numbers, where in a major disaster outage,
|
||||
we may manually run pageservers with a hand-selected generation number, so that we can bring them online
|
||||
independently of a control plane.
|
||||
|
||||
### Rollout
|
||||
|
||||
Although there is coupling between components, we may deploy most of the new data plane components
|
||||
independently of the control plane: initially they can just use a static generation number.
|
||||
|
||||
#### Phase 1
|
||||
|
||||
The pageserver is deployed with some special config to:
|
||||
|
||||
- Always act like everything is generation 1 and do not wait for a control plane issued generation on attach
|
||||
- Skip the places in deletion and remote_consistent_lsn updates where we would call into control plane
|
||||
|
||||
#### Phase 2
|
||||
|
||||
The control plane changes are deployed: control plane will now track and increment generation numbers.
|
||||
|
||||
#### Phase 3
|
||||
|
||||
The pageserver is deployed with its control-plane-dependent changes enabled: it will now require
|
||||
the control plane to service re-attach requests on startup, and handle generation
|
||||
validation requests.
|
||||
|
||||
### On-disk backward compatibility
|
||||
|
||||
Backward compatibility with existing data is straightforward:
|
||||
|
||||
- When reading the index, we may assume that any layer whose metadata doesn't include
|
||||
generations will have a path without generation suffix.
|
||||
- When locating the index file on attachment, we may use the "fallback" listing path
|
||||
and if there is only an index without generation suffix, that is the one we load.
|
||||
|
||||
It is not necessary to re-write existing layers: even new index files will be able
|
||||
to represent generation-less layers.
|
||||
|
||||
### On-disk forward compatibility
|
||||
|
||||
We will do a two phase rollout, probably over multiple releases because we will naturally
|
||||
have some of the read-side code ready before the overall functionality is ready:
|
||||
|
||||
1. Deploy pageservers which understand the new index format and generation suffixes
|
||||
in keys, but do not write objects with generation numbers in the keys.
|
||||
2. Deploy pageservers that write objects with generation numbers in the keys.
|
||||
|
||||
Old pageservers will be oblivious to generation numbers. That means that they can't
|
||||
read objects with generation numbers in the name. This is why we must
|
||||
first step must deploy the ability to read, before the second step
|
||||
starts writing them.
|
||||
|
||||
# Frequently Asked Questions
|
||||
|
||||
## Why a generation _suffix_ rather than _prefix_?
|
||||
|
||||
The choice is motivated by object listing, since one can list by prefix but not
|
||||
suffix.
|
||||
|
||||
In [finding remote indices](#finding-the-remote-indices-for-timelines), we rely
|
||||
on being able to do a prefix listing for `<tenant>/<timeline>/index_part.json*`.
|
||||
That relies on the prefix listing.
|
||||
|
||||
The converse case of using a generation prefix and listing by generation is
|
||||
not needed: one could imagine listing by generation while scrubbing (so that
|
||||
a particular generation's layers could be scrubbed), but this is not part
|
||||
of normal operations, and the [scrubber](#cleaning-up-orphan-objects-scrubbing) probably won't work that way anyway.
|
||||
|
||||
## Wouldn't it be simpler to have a separate deletion queue per timeline?
|
||||
|
||||
Functionally speaking, we could. That's how RemoteTimelineClient currently works,
|
||||
but this approach does not map well to a long-lived persistent queue with
|
||||
generation validation.
|
||||
|
||||
Anything we do per-timeline generates tiny random I/O, on a pageserver with
|
||||
tens of thousands of timelines operating: to be ready for high scale, we should:
|
||||
|
||||
- A) Amortize costs where we can (e.g. a shared deletion queue)
|
||||
- B) Expect to put tenants into a quiescent state while they're not
|
||||
busy: i.e. we shouldn't keep a tenant alive to service its deletion queue.
|
||||
|
||||
This was discussed in the [scope](#scope) part of the deletion queue section.
|
||||
|
||||
# Appendix A: Examples of use in high availability/failover
|
||||
|
||||
The generation numbers proposed in this RFC are adaptable to a variety of different
|
||||
failover scenarios and models. The sections below sketch how they would work in practice.
|
||||
|
||||
### In-place restart of a pageserver
|
||||
|
||||
"In-place" here means that the restart is done before any other element in the system
|
||||
has taken action in response to the node being down.
|
||||
|
||||
- After restart, the node issues a re-attach request to the control plane, and
|
||||
receives new generation numbers for all its attached tenants.
|
||||
- Tenants may be activated with the generation number in the re-attach response.
|
||||
- If any of its attachments were in fact stale (i.e. had be reassigned to another
|
||||
node while this node was offline), then
|
||||
- the re-attach response will inform the tenant about this by not including
|
||||
the tenant of this by _not_ incrementing the generation for that attachment.
|
||||
- This will implicitly block deletions in the tenant, but as an optimization
|
||||
the pageserver should also proactively stop doing S3 uploads when it notices this stale-generation state.
|
||||
- The control plane is expected to eventually detach this tenant from the
|
||||
pageserver.
|
||||
|
||||
If the control plane does not include a tenant in the re-attach response,
|
||||
but there is still local state for the tenant in the filesystem, the pageserver
|
||||
deletes the local state in response and does not load/active the tenant.
|
||||
See the [earlier section on pageserver startup](#pageserver-attachstartup-changes) for details.
|
||||
Control plane can use this mechanism to clean up a pageserver that has been
|
||||
down for so long that all its tenants were migrated away before it came back
|
||||
up again and asked for re-attach.
|
||||
|
||||
### Failure of a pageserver
|
||||
|
||||
In this context, read "failure" as the most ambiguous possible case, where
|
||||
a pageserver is unavailable to clients and control plane, but may still be executing and talking
|
||||
to S3.
|
||||
|
||||
#### Case A: re-attachment to other nodes
|
||||
|
||||
1. Let's say node 0 becomes unresponsive in a cluster of three nodes 0, 1, 2.
|
||||
2. Some external mechanism notices that the node is unavailable and initiates
|
||||
movement of all tenants attached to that node to a different node according
|
||||
to some distribution rule.
|
||||
In this example, it would mean incrementing the generation
|
||||
of all tenants that were attached to node 0, as each tenant's assigned pageserver changes.
|
||||
3. A tenant which is now attached to node 1 will _also_ still be attached to node
|
||||
0, from the perspective of node 0. Node 0 will still be using its old generation,
|
||||
node 1 will be using a newer generation.
|
||||
4. S3 writes will continue from nodes 0 and 1: there will be an index_part.json-00000001
|
||||
\_and\* an index_part.json-00000002. Objects written under the old suffix
|
||||
after the new attachment was created do not matter from the rest of the system's
|
||||
perspective: the endpoints are reading from the new attachment location. Objects
|
||||
written by node 0 are just garbage that can be cleaned up at leisure. Node 0 will
|
||||
not do any deletions because it can't synchronize with control plane, or if it could,
|
||||
its deletion queue processing would get errors for the validation requests.
|
||||
|
||||
#### Case B: direct node replacement with same node_id and drive
|
||||
|
||||
This is the scenario we would experience if running pageservers in some dynamic
|
||||
VM/container environment that would auto-replace a given node_id when it became
|
||||
unresponsive, with the node's storage supplied by some network block device
|
||||
that is attached to the replacement VM/container.
|
||||
|
||||
1. Let's say node 0 fails, and there may be some other peers but they aren't relevant.
|
||||
2. Some external mechanism notices that the node is unavailable, and creates
|
||||
a "new node 0" (Node 0b) which is a physically separate server. The original node 0
|
||||
(Node 0a) may still be running, because we do not assume the environment fences nodes.
|
||||
3. On startup, node 0b re-attaches and gets higher generation numbers for
|
||||
all tenants.
|
||||
4. S3 writes continue from nodes 0a and 0b, but the writes do not collide due to different
|
||||
generation in the suffix, and the writes from node 0a are not visible to the rest
|
||||
of the system because endpoints are reading only from node 0b.
|
||||
|
||||
# Appendix B: interoperability with other features
|
||||
|
||||
## Sharded Keyspace
|
||||
|
||||
The design in this RFC maps neatly to a sharded keyspace design where subsets of the key space
|
||||
for a tenant are assigned to different pageservers:
|
||||
|
||||
- the "unit of work" for attachments becomes something like a TenantShard rather than a Tenant
|
||||
- TenantShards get generation numbers just as Tenants do.
|
||||
- Write workload (ingest, compaction) for a tenant is spread out across pageservers via
|
||||
TenantShards, but each TenantShard still has exactly one valid writer at a time.
|
||||
|
||||
## Read replicas
|
||||
|
||||
_This section is about a passive reader of S3 pageserver state, not a postgres
|
||||
read replica_
|
||||
|
||||
For historical reads to LSNs below the remote persistent LSN, any node may act as a reader at any
|
||||
time: remote data is logically immutable data, and the use of deferred deletion in this RFC helps
|
||||
mitigate the fact that remote data is not _physically_ immutable (i.e. the actual data for a given
|
||||
page moves around as compaction happens).
|
||||
|
||||
A read replica needs to be aware of generations in remote data in order to read the latest
|
||||
metadata (find the index_part.json with the latest suffix). It may either query this
|
||||
from the control plane, or find it with ListObjectsv2 request
|
||||
|
||||
## Seamless migration
|
||||
|
||||
To make tenant migration totally seamless, we will probably want to intentionally double-attach
|
||||
a tenant briefly, serving reads from the old node while waiting for the new node to be ready.
|
||||
|
||||
This RFC enables that double-attachment: two nodes may be attached at the same time, with the migration destination
|
||||
having a higher generation number. The old node will be able to ingest and serve reads, but not
|
||||
do any deletes. The new node's attachment must also avoid deleting layers that the old node may
|
||||
still use. A new piece of state
|
||||
will be needed for this in the control plane's definition of an attachment.
|
||||
|
||||
## Warm secondary locations
|
||||
|
||||
To enable faster tenant movement after a pageserver is lost, we will probably want to spend some
|
||||
disk capacity on keeping standby locations populated with local disk data.
|
||||
|
||||
There's no conflict between this RFC and that: implementing warm secondary locations on a per-tenant basis
|
||||
would be a separate change to the control plane to store standby location(s) for a tenant. Because
|
||||
the standbys do not write to S3, they do not need to be assigned generation numbers. When a tenant is
|
||||
re-attached to a standby location, that would increment the tenant attachment generation and this
|
||||
would work the same as any other attachment change, but with a warm cache.
|
||||
|
||||
## Ephemeral node IDs
|
||||
|
||||
This RFC intentionally avoids changing anything fundamental about how pageservers are identified
|
||||
and registered with the control plane, to avoid coupling the implementation of pageserver split
|
||||
brain protection with more fundamental changes in the management of the pageservers.
|
||||
|
||||
Moving to ephemeral node IDs would provide an extra layer of
|
||||
resilience in the system, as it would prevent the control plane
|
||||
accidentally attaching to two physical nodes with the same
|
||||
generation, if somehow there were two physical nodes with
|
||||
the same node IDs (currently we rely on EC2 guarantees to
|
||||
eliminate this scenario). With ephemeral node IDs, there would be
|
||||
no possibility of that happening, no matter the behavior of
|
||||
underlying infrastructure.
|
||||
|
||||
Nothing fundamental in the pageserver's handling of generations needs to change to handle ephemeral node IDs, since we hardly use the
|
||||
`node_id` anywhere. The `/re-attach` API would be extended
|
||||
to enable the pageserver to obtain its ephemeral ID, and provide
|
||||
some correlation identifier (e.g. EC instance ID), to help the
|
||||
control plane re-attach tenants to the same physical server that
|
||||
previously had them attached.
|
||||
@@ -31,6 +31,8 @@ fn lsn_invalid() -> Lsn {
|
||||
#[serde_as]
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct SkTimelineInfo {
|
||||
/// Term.
|
||||
pub term: Option<u64>,
|
||||
/// Term of the last entry.
|
||||
pub last_log_term: Option<u64>,
|
||||
/// LSN of the last record.
|
||||
@@ -58,4 +60,6 @@ pub struct SkTimelineInfo {
|
||||
/// A connection string to use for WAL receiving.
|
||||
#[serde(default)]
|
||||
pub safekeeper_connstr: Option<String>,
|
||||
#[serde(default)]
|
||||
pub http_connstr: Option<String>,
|
||||
}
|
||||
|
||||
@@ -38,6 +38,7 @@ url.workspace = true
|
||||
uuid.workspace = true
|
||||
|
||||
pq_proto.workspace = true
|
||||
postgres_connection.workspace = true
|
||||
metrics.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
|
||||
|
||||
@@ -58,6 +58,8 @@ pub mod serde_regex;
|
||||
|
||||
pub mod pageserver_feedback;
|
||||
|
||||
pub mod postgres_client;
|
||||
|
||||
pub mod tracing_span_assert;
|
||||
|
||||
pub mod rate_limit;
|
||||
|
||||
37
libs/utils/src/postgres_client.rs
Normal file
37
libs/utils/src/postgres_client.rs
Normal file
@@ -0,0 +1,37 @@
|
||||
//! Postgres client connection code common to other crates (safekeeper and
|
||||
//! pageserver) which depends on tenant/timeline ids and thus not fitting into
|
||||
//! postgres_connection crate.
|
||||
|
||||
use anyhow::Context;
|
||||
use postgres_connection::{parse_host_port, PgConnectionConfig};
|
||||
|
||||
use crate::id::TenantTimelineId;
|
||||
|
||||
/// Create client config for fetching WAL from safekeeper on particular timeline.
|
||||
/// listen_pg_addr_str is in form host:\[port\].
|
||||
pub fn wal_stream_connection_config(
|
||||
TenantTimelineId {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
}: TenantTimelineId,
|
||||
listen_pg_addr_str: &str,
|
||||
auth_token: Option<&str>,
|
||||
availability_zone: Option<&str>,
|
||||
) -> anyhow::Result<PgConnectionConfig> {
|
||||
let (host, port) =
|
||||
parse_host_port(listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?;
|
||||
let port = port.unwrap_or(5432);
|
||||
let mut connstr = PgConnectionConfig::new_host_port(host, port)
|
||||
.extend_options([
|
||||
"-c".to_owned(),
|
||||
format!("timeline_id={}", timeline_id),
|
||||
format!("tenant_id={}", tenant_id),
|
||||
])
|
||||
.set_password(auth_token.map(|s| s.to_owned()));
|
||||
|
||||
if let Some(availability_zone) = availability_zone {
|
||||
connstr = connstr.extend_options([format!("availability_zone={}", availability_zone)]);
|
||||
}
|
||||
|
||||
Ok(connstr)
|
||||
}
|
||||
@@ -16,3 +16,19 @@ in the `neon-postgres` cgroup and set its `memory.{max,high}`.
|
||||
* See also: [`neondatabase/vm-monitor`](https://github.com/neondatabase/vm-monitor/),
|
||||
where initial development of the monitor happened. The repository is no longer
|
||||
maintained but the commit history may be useful for debugging.
|
||||
|
||||
## Structure
|
||||
|
||||
The `vm-monitor` is loosely comprised of a few systems. These are:
|
||||
* the server: this is just a simple `axum` server that accepts requests and
|
||||
upgrades them to websocket connections. The server only allows one connection at
|
||||
a time. This means that upon receiving a new connection, the server will terminate
|
||||
and old one if it exists.
|
||||
* the filecache: a struct that allows communication with the Postgres file cache.
|
||||
On startup, we connect to the filecache and hold on to the connection for the
|
||||
entire monitor lifetime.
|
||||
* the cgroup watcher: the `CgroupWatcher` manages the `neon-postgres` cgroup by
|
||||
listening for `memory.high` events and setting its `memory.{high,max}` values.
|
||||
* the runner: the runner marries the filecache and cgroup watcher together,
|
||||
communicating with the agent throught the `Dispatcher`, and then calling filecache
|
||||
and cgroup watcher functions as needed to upscale and downscale
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
//! Managing the websocket connection and other signals in the monitor.
|
||||
//!
|
||||
//! Contains types that manage the interaction (not data interchange, see `protocol`)
|
||||
//! between informant and monitor, allowing us to to process and send messages in a
|
||||
//! between agent and monitor, allowing us to to process and send messages in a
|
||||
//! straightforward way. The dispatcher also manages that signals that come from
|
||||
//! the cgroup (requesting upscale), and the signals that go to the cgroup
|
||||
//! (notifying it of upscale).
|
||||
@@ -24,16 +24,16 @@ use crate::protocol::{
|
||||
/// The central handler for all communications in the monitor.
|
||||
///
|
||||
/// The dispatcher has two purposes:
|
||||
/// 1. Manage the connection to the informant, sending and receiving messages.
|
||||
/// 1. Manage the connection to the agent, sending and receiving messages.
|
||||
/// 2. Communicate with the cgroup manager, notifying it when upscale is received,
|
||||
/// and sending a message to the informant when the cgroup manager requests
|
||||
/// and sending a message to the agent when the cgroup manager requests
|
||||
/// upscale.
|
||||
#[derive(Debug)]
|
||||
pub struct Dispatcher {
|
||||
/// We read informant messages of of `source`
|
||||
/// We read agent messages of of `source`
|
||||
pub(crate) source: SplitStream<WebSocket>,
|
||||
|
||||
/// We send messages to the informant through `sink`
|
||||
/// We send messages to the agent through `sink`
|
||||
sink: SplitSink<WebSocket, Message>,
|
||||
|
||||
/// Used to notify the cgroup when we are upscaled.
|
||||
@@ -43,7 +43,7 @@ pub struct Dispatcher {
|
||||
/// we send an `UpscaleRequst` to the agent.
|
||||
pub(crate) request_upscale_events: mpsc::Receiver<()>,
|
||||
|
||||
/// The protocol version we have agreed to use with the informant. This is negotiated
|
||||
/// The protocol version we have agreed to use with the agent. This is negotiated
|
||||
/// during the creation of the dispatcher, and should be the highest shared protocol
|
||||
/// version.
|
||||
///
|
||||
@@ -56,9 +56,9 @@ pub struct Dispatcher {
|
||||
impl Dispatcher {
|
||||
/// Creates a new dispatcher using the passed-in connection.
|
||||
///
|
||||
/// Performs a negotiation with the informant to determine the highest protocol
|
||||
/// Performs a negotiation with the agent to determine the highest protocol
|
||||
/// version that both support. This consists of two steps:
|
||||
/// 1. Wait for the informant to sent the range of protocols it supports.
|
||||
/// 1. Wait for the agent to sent the range of protocols it supports.
|
||||
/// 2. Send a protocol version that works for us as well, or an error if there
|
||||
/// is no compatible version.
|
||||
pub async fn new(
|
||||
@@ -69,7 +69,7 @@ impl Dispatcher {
|
||||
let (mut sink, mut source) = stream.split();
|
||||
|
||||
// Figure out the highest protocol version we both support
|
||||
info!("waiting for informant to send protocol version range");
|
||||
info!("waiting for agent to send protocol version range");
|
||||
let Some(message) = source.next().await else {
|
||||
bail!("websocket connection closed while performing protocol handshake")
|
||||
};
|
||||
@@ -79,7 +79,7 @@ impl Dispatcher {
|
||||
let Message::Text(message_text) = message else {
|
||||
// All messages should be in text form, since we don't do any
|
||||
// pinging/ponging. See nhooyr/websocket's implementation and the
|
||||
// informant/agent for more info
|
||||
// agent for more info
|
||||
bail!("received non-text message during proocol handshake: {message:?}")
|
||||
};
|
||||
|
||||
@@ -88,32 +88,30 @@ impl Dispatcher {
|
||||
max: PROTOCOL_MAX_VERSION,
|
||||
};
|
||||
|
||||
let informant_range: ProtocolRange = serde_json::from_str(&message_text)
|
||||
let agent_range: ProtocolRange = serde_json::from_str(&message_text)
|
||||
.context("failed to deserialize protocol version range")?;
|
||||
|
||||
info!(range = ?informant_range, "received protocol version range");
|
||||
info!(range = ?agent_range, "received protocol version range");
|
||||
|
||||
let highest_shared_version = match monitor_range.highest_shared_version(&informant_range) {
|
||||
let highest_shared_version = match monitor_range.highest_shared_version(&agent_range) {
|
||||
Ok(version) => {
|
||||
sink.send(Message::Text(
|
||||
serde_json::to_string(&ProtocolResponse::Version(version)).unwrap(),
|
||||
))
|
||||
.await
|
||||
.context("failed to notify informant of negotiated protocol version")?;
|
||||
.context("failed to notify agent of negotiated protocol version")?;
|
||||
version
|
||||
}
|
||||
Err(e) => {
|
||||
sink.send(Message::Text(
|
||||
serde_json::to_string(&ProtocolResponse::Error(format!(
|
||||
"Received protocol version range {} which does not overlap with {}",
|
||||
informant_range, monitor_range
|
||||
agent_range, monitor_range
|
||||
)))
|
||||
.unwrap(),
|
||||
))
|
||||
.await
|
||||
.context(
|
||||
"failed to notify informant of no overlap between protocol version ranges",
|
||||
)?;
|
||||
.context("failed to notify agent of no overlap between protocol version ranges")?;
|
||||
Err(e).context("error determining suitable protocol version range")?
|
||||
}
|
||||
};
|
||||
@@ -137,7 +135,7 @@ impl Dispatcher {
|
||||
.context("failed to send resources and oneshot sender across channel")
|
||||
}
|
||||
|
||||
/// Send a message to the informant.
|
||||
/// Send a message to the agent.
|
||||
///
|
||||
/// Although this function is small, it has one major benefit: it is the only
|
||||
/// way to send data accross the connection, and you can only pass in a proper
|
||||
|
||||
@@ -59,8 +59,8 @@ pub struct FileCacheConfig {
|
||||
spread_factor: f64,
|
||||
}
|
||||
|
||||
impl Default for FileCacheConfig {
|
||||
fn default() -> Self {
|
||||
impl FileCacheConfig {
|
||||
pub fn default_in_memory() -> Self {
|
||||
Self {
|
||||
in_memory: true,
|
||||
// 75 %
|
||||
@@ -71,9 +71,19 @@ impl Default for FileCacheConfig {
|
||||
spread_factor: 0.1,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl FileCacheConfig {
|
||||
pub fn default_on_disk() -> Self {
|
||||
Self {
|
||||
in_memory: false,
|
||||
resource_multiplier: 0.75,
|
||||
// 256 MiB - lower than when in memory because overcommitting is safe; if we don't have
|
||||
// memory, the kernel will just evict from its page cache, rather than e.g. killing
|
||||
// everything.
|
||||
min_remaining_after_cache: NonZeroU64::new(256 * MiB).unwrap(),
|
||||
spread_factor: 0.1,
|
||||
}
|
||||
}
|
||||
|
||||
/// Make sure fields of the config are consistent.
|
||||
pub fn validate(&self) -> anyhow::Result<()> {
|
||||
// Single field validity
|
||||
|
||||
@@ -39,6 +39,16 @@ pub struct Args {
|
||||
#[arg(short, long)]
|
||||
pub pgconnstr: Option<String>,
|
||||
|
||||
/// Flag to signal that the Postgres file cache is on disk (i.e. not in memory aside from the
|
||||
/// kernel's page cache), and therefore should not count against available memory.
|
||||
//
|
||||
// NB: Ideally this flag would directly refer to whether the file cache is in memory (rather
|
||||
// than a roundabout way, via whether it's on disk), but in order to be backwards compatible
|
||||
// during the switch away from an in-memory file cache, we had to default to the previous
|
||||
// behavior.
|
||||
#[arg(long)]
|
||||
pub file_cache_on_disk: bool,
|
||||
|
||||
/// The address we should listen on for connection requests. For the
|
||||
/// agent, this is 0.0.0.0:10301. For the informant, this is 127.0.0.1:10369.
|
||||
#[arg(short, long)]
|
||||
@@ -146,7 +156,7 @@ pub async fn start(args: &'static Args, token: CancellationToken) -> anyhow::Res
|
||||
|
||||
/// Handles incoming websocket connections.
|
||||
///
|
||||
/// If we are already to connected to an informant, we kill that old connection
|
||||
/// If we are already to connected to an agent, we kill that old connection
|
||||
/// and accept the new one.
|
||||
#[tracing::instrument(name = "/monitor", skip_all, fields(?args))]
|
||||
pub async fn ws_handler(
|
||||
@@ -196,7 +206,7 @@ async fn start_monitor(
|
||||
return;
|
||||
}
|
||||
};
|
||||
info!("connected to informant");
|
||||
info!("connected to agent");
|
||||
|
||||
match monitor.run().await {
|
||||
Ok(()) => info!("monitor was killed due to new connection"),
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
//! Types representing protocols and actual informant-monitor messages.
|
||||
//! Types representing protocols and actual agent-monitor messages.
|
||||
//!
|
||||
//! The pervasive use of serde modifiers throughout this module is to ease
|
||||
//! serialization on the go side. Because go does not have enums (which model
|
||||
//! messages well), it is harder to model messages, and we accomodate that with
|
||||
//! serde.
|
||||
//!
|
||||
//! *Note*: the informant sends and receives messages in different ways.
|
||||
//! *Note*: the agent sends and receives messages in different ways.
|
||||
//!
|
||||
//! The informant serializes messages in the form and then sends them. The use
|
||||
//! The agent serializes messages in the form and then sends them. The use
|
||||
//! of `#[serde(tag = "type", content = "content")]` allows us to use `Type`
|
||||
//! to determine how to deserialize `Content`.
|
||||
//! ```ignore
|
||||
@@ -25,9 +25,9 @@
|
||||
//! Id uint64
|
||||
//! }
|
||||
//! ```
|
||||
//! After reading the type field, the informant will decode the entire message
|
||||
//! After reading the type field, the agent will decode the entire message
|
||||
//! again, this time into the correct type using the embedded fields.
|
||||
//! Because the informant cannot just extract the json contained in a certain field
|
||||
//! Because the agent cannot just extract the json contained in a certain field
|
||||
//! (it initially deserializes to `map[string]interface{}`), we keep the fields
|
||||
//! at the top level, so the entire piece of json can be deserialized into a struct,
|
||||
//! such as a `DownscaleResult`, with the `Type` and `Id` fields ignored.
|
||||
@@ -37,7 +37,7 @@ use std::cmp;
|
||||
|
||||
use serde::{de::Error, Deserialize, Serialize};
|
||||
|
||||
/// A Message we send to the informant.
|
||||
/// A Message we send to the agent.
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub struct OutboundMsg {
|
||||
#[serde(flatten)]
|
||||
@@ -51,31 +51,31 @@ impl OutboundMsg {
|
||||
}
|
||||
}
|
||||
|
||||
/// The different underlying message types we can send to the informant.
|
||||
/// The different underlying message types we can send to the agent.
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
#[serde(tag = "type")]
|
||||
pub enum OutboundMsgKind {
|
||||
/// Indicates that the informant sent an invalid message, i.e, we couldn't
|
||||
/// Indicates that the agent sent an invalid message, i.e, we couldn't
|
||||
/// properly deserialize it.
|
||||
InvalidMessage { error: String },
|
||||
/// Indicates that we experienced an internal error while processing a message.
|
||||
/// For example, if a cgroup operation fails while trying to handle an upscale,
|
||||
/// we return `InternalError`.
|
||||
InternalError { error: String },
|
||||
/// Returned to the informant once we have finished handling an upscale. If the
|
||||
/// Returned to the agent once we have finished handling an upscale. If the
|
||||
/// handling was unsuccessful, an `InternalError` will get returned instead.
|
||||
/// *Note*: this is a struct variant because of the way go serializes struct{}
|
||||
UpscaleConfirmation {},
|
||||
/// Indicates to the monitor that we are urgently requesting resources.
|
||||
/// *Note*: this is a struct variant because of the way go serializes struct{}
|
||||
UpscaleRequest {},
|
||||
/// Returned to the informant once we have finished attempting to downscale. If
|
||||
/// Returned to the agent once we have finished attempting to downscale. If
|
||||
/// an error occured trying to do so, an `InternalError` will get returned instead.
|
||||
/// However, if we are simply unsuccessful (for example, do to needing the resources),
|
||||
/// that gets included in the `DownscaleResult`.
|
||||
DownscaleResult {
|
||||
// FIXME for the future (once the informant is deprecated)
|
||||
// As of the time of writing, the informant/agent version of this struct is
|
||||
// As of the time of writing, the agent/informant version of this struct is
|
||||
// called api.DownscaleResult. This struct has uppercase fields which are
|
||||
// serialized as such. Thus, we serialize using uppercase names so we don't
|
||||
// have to make a breaking change to the agent<->informant protocol. Once
|
||||
@@ -88,12 +88,12 @@ pub enum OutboundMsgKind {
|
||||
status: String,
|
||||
},
|
||||
/// Part of the bidirectional heartbeat. The heartbeat is initiated by the
|
||||
/// informant.
|
||||
/// agent.
|
||||
/// *Note*: this is a struct variant because of the way go serializes struct{}
|
||||
HealthCheck {},
|
||||
}
|
||||
|
||||
/// A message received form the informant.
|
||||
/// A message received form the agent.
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub struct InboundMsg {
|
||||
#[serde(flatten)]
|
||||
@@ -101,7 +101,7 @@ pub struct InboundMsg {
|
||||
pub(crate) id: usize,
|
||||
}
|
||||
|
||||
/// The different underlying message types we can receive from the informant.
|
||||
/// The different underlying message types we can receive from the agent.
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
#[serde(tag = "type", content = "content")]
|
||||
pub enum InboundMsgKind {
|
||||
@@ -120,14 +120,14 @@ pub enum InboundMsgKind {
|
||||
/// when done.
|
||||
DownscaleRequest { target: Resources },
|
||||
/// Part of the bidirectional heartbeat. The heartbeat is initiated by the
|
||||
/// informant.
|
||||
/// agent.
|
||||
/// *Note*: this is a struct variant because of the way go serializes struct{}
|
||||
HealthCheck {},
|
||||
}
|
||||
|
||||
/// Represents the resources granted to a VM.
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
|
||||
// Renamed because the agent/informant has multiple resources types:
|
||||
// Renamed because the agent has multiple resources types:
|
||||
// `Resources` (milliCPU/memory slots)
|
||||
// `Allocation` (vCPU/bytes) <- what we correspond to
|
||||
#[serde(rename(serialize = "Allocation", deserialize = "Allocation"))]
|
||||
@@ -151,7 +151,7 @@ pub const PROTOCOL_MAX_VERSION: ProtocolVersion = ProtocolVersion::V1_0;
|
||||
pub struct ProtocolVersion(u8);
|
||||
|
||||
impl ProtocolVersion {
|
||||
/// Represents v1.0 of the informant<-> monitor protocol - the initial version
|
||||
/// Represents v1.0 of the agent<-> monitor protocol - the initial version
|
||||
///
|
||||
/// Currently the latest version.
|
||||
const V1_0: ProtocolVersion = ProtocolVersion(1);
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
//! Exposes the `Runner`, which handles messages received from informant and
|
||||
//! Exposes the `Runner`, which handles messages received from agent and
|
||||
//! sends upscale requests.
|
||||
//!
|
||||
//! This is the "Monitor" part of the monitor binary and is the main entrypoint for
|
||||
@@ -21,8 +21,8 @@ use crate::filecache::{FileCacheConfig, FileCacheState};
|
||||
use crate::protocol::{InboundMsg, InboundMsgKind, OutboundMsg, OutboundMsgKind, Resources};
|
||||
use crate::{bytes_to_mebibytes, get_total_system_memory, spawn_with_cancel, Args, MiB};
|
||||
|
||||
/// Central struct that interacts with informant, dispatcher, and cgroup to handle
|
||||
/// signals from the informant.
|
||||
/// Central struct that interacts with agent, dispatcher, and cgroup to handle
|
||||
/// signals from the agent.
|
||||
#[derive(Debug)]
|
||||
pub struct Runner {
|
||||
config: Config,
|
||||
@@ -110,10 +110,10 @@ impl Runner {
|
||||
// memory limits.
|
||||
if let Some(connstr) = &args.pgconnstr {
|
||||
info!("initializing file cache");
|
||||
let config: FileCacheConfig = Default::default();
|
||||
if !config.in_memory {
|
||||
panic!("file cache not in-memory implemented")
|
||||
}
|
||||
let config = match args.file_cache_on_disk {
|
||||
true => FileCacheConfig::default_on_disk(),
|
||||
false => FileCacheConfig::default_in_memory(),
|
||||
};
|
||||
|
||||
let mut file_cache = FileCacheState::new(connstr, config, token.clone())
|
||||
.await
|
||||
@@ -140,7 +140,10 @@ impl Runner {
|
||||
if actual_size != new_size {
|
||||
info!("file cache size actually got set to {actual_size}")
|
||||
}
|
||||
file_cache_reserved_bytes = actual_size;
|
||||
// Mark the resources given to the file cache as reserved, but only if it's in memory.
|
||||
if !args.file_cache_on_disk {
|
||||
file_cache_reserved_bytes = actual_size;
|
||||
}
|
||||
|
||||
state.filecache = Some(file_cache);
|
||||
}
|
||||
@@ -227,18 +230,17 @@ impl Runner {
|
||||
let mut status = vec![];
|
||||
let mut file_cache_mem_usage = 0;
|
||||
if let Some(file_cache) = &mut self.filecache {
|
||||
if !file_cache.config.in_memory {
|
||||
panic!("file cache not in-memory unimplemented")
|
||||
}
|
||||
|
||||
let actual_usage = file_cache
|
||||
.set_file_cache_size(expected_file_cache_mem_usage)
|
||||
.await
|
||||
.context("failed to set file cache size")?;
|
||||
file_cache_mem_usage = actual_usage;
|
||||
if file_cache.config.in_memory {
|
||||
file_cache_mem_usage = actual_usage;
|
||||
}
|
||||
let message = format!(
|
||||
"set file cache size to {} MiB",
|
||||
bytes_to_mebibytes(actual_usage)
|
||||
"set file cache size to {} MiB (in memory = {})",
|
||||
bytes_to_mebibytes(actual_usage),
|
||||
file_cache.config.in_memory,
|
||||
);
|
||||
info!("downscale: {message}");
|
||||
status.push(message);
|
||||
@@ -289,10 +291,6 @@ impl Runner {
|
||||
// Get the file cache's expected contribution to the memory usage
|
||||
let mut file_cache_mem_usage = 0;
|
||||
if let Some(file_cache) = &mut self.filecache {
|
||||
if !file_cache.config.in_memory {
|
||||
panic!("file cache not in-memory unimplemented");
|
||||
}
|
||||
|
||||
let expected_usage = file_cache.config.calculate_cache_size(usable_system_memory);
|
||||
info!(
|
||||
target = bytes_to_mebibytes(expected_usage),
|
||||
@@ -304,6 +302,9 @@ impl Runner {
|
||||
.set_file_cache_size(expected_usage)
|
||||
.await
|
||||
.context("failed to set file cache size")?;
|
||||
if file_cache.config.in_memory {
|
||||
file_cache_mem_usage = actual_usage;
|
||||
}
|
||||
|
||||
if actual_usage != expected_usage {
|
||||
warn!(
|
||||
@@ -312,7 +313,6 @@ impl Runner {
|
||||
bytes_to_mebibytes(actual_usage)
|
||||
)
|
||||
}
|
||||
file_cache_mem_usage = actual_usage;
|
||||
}
|
||||
|
||||
if let Some(cgroup) = &self.cgroup {
|
||||
@@ -371,7 +371,7 @@ impl Runner {
|
||||
Ok(None)
|
||||
}
|
||||
InboundMsgKind::InternalError { error } => {
|
||||
warn!(error, id, "informant experienced an internal error");
|
||||
warn!(error, id, "agent experienced an internal error");
|
||||
Ok(None)
|
||||
}
|
||||
InboundMsgKind::HealthCheck {} => {
|
||||
@@ -405,7 +405,7 @@ impl Runner {
|
||||
.await
|
||||
.context("failed to send message")?;
|
||||
}
|
||||
// there is a message from the informant
|
||||
// there is a message from the agent
|
||||
msg = self.dispatcher.source.next() => {
|
||||
if let Some(msg) = msg {
|
||||
// Don't use 'message' as a key as the string also uses
|
||||
@@ -422,7 +422,7 @@ impl Runner {
|
||||
// Don't use 'message' as a key as the
|
||||
// string also uses that for its key
|
||||
msg = ?other,
|
||||
"informant should only send text messages but received different type"
|
||||
"agent should only send text messages but received different type"
|
||||
);
|
||||
continue
|
||||
},
|
||||
|
||||
@@ -80,8 +80,6 @@ enum-map.workspace = true
|
||||
enumset.workspace = true
|
||||
strum.workspace = true
|
||||
strum_macros.workspace = true
|
||||
#tokio-epoll-uring = { path = "/home/admin/tokio-epoll-uring/tokio-epoll-uring" }
|
||||
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "problame/hacky-openat" }
|
||||
|
||||
[dev-dependencies]
|
||||
criterion.workspace = true
|
||||
|
||||
@@ -8,9 +8,10 @@ use std::collections::BinaryHeap;
|
||||
use std::ops::Range;
|
||||
use std::{fs, path::Path, str};
|
||||
|
||||
use pageserver::page_cache::PAGE_SZ;
|
||||
use pageserver::repository::{Key, KEY_SIZE};
|
||||
use pageserver::tenant::block_io::FileBlockReader;
|
||||
use pageserver::tenant::disk_btree::{DiskBtreeReader, VisitDirection, PAGE_SZ};
|
||||
use pageserver::tenant::disk_btree::{DiskBtreeReader, VisitDirection};
|
||||
use pageserver::tenant::storage_layer::delta_layer::{Summary, DELTA_KEY_SIZE};
|
||||
use pageserver::tenant::storage_layer::range_overlaps;
|
||||
use pageserver::virtual_file::VirtualFile;
|
||||
@@ -134,6 +135,10 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
|
||||
let storage_path = &cmd.path;
|
||||
let max_holes = cmd.max_holes.unwrap_or(DEFAULT_MAX_HOLES);
|
||||
|
||||
// Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree.
|
||||
pageserver::virtual_file::init(10);
|
||||
pageserver::page_cache::init(100);
|
||||
|
||||
let mut total_delta_layers = 0usize;
|
||||
let mut total_image_layers = 0usize;
|
||||
let mut total_excess_layers = 0usize;
|
||||
|
||||
@@ -5,6 +5,7 @@ use clap::Subcommand;
|
||||
use pageserver::tenant::block_io::BlockCursor;
|
||||
use pageserver::tenant::disk_btree::DiskBtreeReader;
|
||||
use pageserver::tenant::storage_layer::delta_layer::{BlobRef, Summary};
|
||||
use pageserver::{page_cache, virtual_file};
|
||||
use pageserver::{
|
||||
repository::{Key, KEY_SIZE},
|
||||
tenant::{
|
||||
@@ -44,6 +45,8 @@ pub(crate) enum LayerCmd {
|
||||
|
||||
async fn read_delta_file(path: impl AsRef<Path>) -> Result<()> {
|
||||
let path = path.as_ref();
|
||||
virtual_file::init(10);
|
||||
page_cache::init(100);
|
||||
let file = FileBlockReader::new(VirtualFile::open(path)?);
|
||||
let summary_blk = file.read_blk(0).await?;
|
||||
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
|
||||
|
||||
@@ -12,8 +12,10 @@ use clap::{Parser, Subcommand};
|
||||
use layers::LayerCmd;
|
||||
use pageserver::{
|
||||
context::{DownloadBehavior, RequestContext},
|
||||
task_mgr::TaskKind,
|
||||
page_cache,
|
||||
task_mgr::TaskKind,
|
||||
tenant::{dump_layerfile_from_path, metadata::TimelineMetadata},
|
||||
virtual_file,
|
||||
};
|
||||
use postgres_ffi::ControlFileData;
|
||||
use std::path::{Path, PathBuf};
|
||||
@@ -113,6 +115,9 @@ fn read_pg_control_file(control_file_path: &Path) -> anyhow::Result<()> {
|
||||
}
|
||||
|
||||
async fn print_layerfile(path: &Path) -> anyhow::Result<()> {
|
||||
// Basic initialization of things that don't change after startup
|
||||
virtual_file::init(10);
|
||||
page_cache::init(100);
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
dump_layerfile_from_path(path, true, &ctx).await
|
||||
}
|
||||
|
||||
@@ -20,10 +20,11 @@ use metrics::set_build_info_metric;
|
||||
use pageserver::{
|
||||
config::{defaults::*, PageServerConf},
|
||||
context::{DownloadBehavior, RequestContext},
|
||||
http, page_service, task_mgr,
|
||||
http, page_cache, page_service, task_mgr,
|
||||
task_mgr::TaskKind,
|
||||
task_mgr::{BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME},
|
||||
tenant::mgr,
|
||||
virtual_file,
|
||||
};
|
||||
use postgres_backend::AuthType;
|
||||
use utils::logging::TracingErrorLayerEnablement;
|
||||
@@ -123,6 +124,10 @@ fn main() -> anyhow::Result<()> {
|
||||
// Initialize up failpoints support
|
||||
let scenario = pageserver::failpoint_support::init();
|
||||
|
||||
// Basic initialization of things that don't change after startup
|
||||
virtual_file::init(conf.max_file_descriptors);
|
||||
page_cache::init(conf.page_cache_size);
|
||||
|
||||
start_pageserver(launch_ts, conf).context("Failed to start pageserver")?;
|
||||
|
||||
scenario.teardown();
|
||||
|
||||
@@ -1,39 +0,0 @@
|
||||
use std::cell::RefCell;
|
||||
|
||||
use crate::tenant::disk_btree::PAGE_SZ;
|
||||
|
||||
pub struct Buffer(Option<Box<[u8; PAGE_SZ]>>);
|
||||
|
||||
// Thread-local list of re-usable buffers.
|
||||
thread_local! {
|
||||
static POOL: RefCell<Vec<Box<[u8; PAGE_SZ]>>> = RefCell::new(Vec::new());
|
||||
}
|
||||
|
||||
pub(crate) fn get() -> Buffer {
|
||||
let maybe = POOL.with(|rc| rc.borrow_mut().pop());
|
||||
match maybe {
|
||||
Some(buf) => Buffer(Some(buf)),
|
||||
None => Buffer(Some(Box::new([0; PAGE_SZ]))),
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Buffer {
|
||||
fn drop(&mut self) {
|
||||
let buf = self.0.take().unwrap();
|
||||
POOL.with(|rc| rc.borrow_mut().push(buf))
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::Deref for Buffer {
|
||||
type Target = [u8; PAGE_SZ];
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.0.as_ref().unwrap().as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::DerefMut for Buffer {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
self.0.as_mut().unwrap().as_mut()
|
||||
}
|
||||
}
|
||||
@@ -8,6 +8,7 @@ pub mod http;
|
||||
pub mod import_datadir;
|
||||
pub mod keyspace;
|
||||
pub mod metrics;
|
||||
pub mod page_cache;
|
||||
pub mod page_service;
|
||||
pub mod pgdatadir_mapping;
|
||||
pub mod repository;
|
||||
@@ -27,8 +28,6 @@ use std::path::Path;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use tracing::info;
|
||||
|
||||
pub mod buffer_pool;
|
||||
|
||||
/// Current storage format version
|
||||
///
|
||||
/// This is embedded in the header of all the layer files.
|
||||
|
||||
854
pageserver/src/page_cache.rs
Normal file
854
pageserver/src/page_cache.rs
Normal file
@@ -0,0 +1,854 @@
|
||||
//!
|
||||
//! Global page cache
|
||||
//!
|
||||
//! The page cache uses up most of the memory in the page server. It is shared
|
||||
//! by all tenants, and it is used to store different kinds of pages. Sharing
|
||||
//! the cache allows memory to be dynamically allocated where it's needed the
|
||||
//! most.
|
||||
//!
|
||||
//! The page cache consists of fixed-size buffers, 8 kB each to match the
|
||||
//! PostgreSQL buffer size, and a Slot struct for each buffer to contain
|
||||
//! information about what's stored in the buffer.
|
||||
//!
|
||||
//! # Types Of Pages
|
||||
//!
|
||||
//! [`PageCache`] only supports immutable pages.
|
||||
//! Hence there is no need to worry about coherency.
|
||||
//!
|
||||
//! Two types of pages are supported:
|
||||
//!
|
||||
//! * **Materialized pages**, filled & used by page reconstruction
|
||||
//! * **Immutable File pages**, filled & used by [`crate::tenant::block_io`] and [`crate::tenant::ephemeral_file`].
|
||||
//!
|
||||
//! Note that [`crate::tenant::ephemeral_file::EphemeralFile`] is generally mutable, but, it's append-only.
|
||||
//! It uses the page cache only for the blocks that are already fully written and immutable.
|
||||
//!
|
||||
//! # Filling The Page Cache
|
||||
//!
|
||||
//! Page cache maps from a cache key to a buffer slot.
|
||||
//! The cache key uniquely identifies the piece of data that is being cached.
|
||||
//!
|
||||
//! The cache key for **materialized pages** is [`TenantId`], [`TimelineId`], [`Key`], and [`Lsn`].
|
||||
//! Use [`PageCache::memorize_materialized_page`] and [`PageCache::lookup_materialized_page`] for fill & access.
|
||||
//!
|
||||
//! The cache key for **immutable file** pages is [`FileId`] and a block number.
|
||||
//! Users of page cache that wish to page-cache an arbitrary (immutable!) on-disk file do the following:
|
||||
//! * Have a mechanism to deterministically associate the on-disk file with a [`FileId`].
|
||||
//! * Get a [`FileId`] using [`next_file_id`].
|
||||
//! * Use the mechanism to associate the on-disk file with the returned [`FileId`].
|
||||
//! * Use [`PageCache::read_immutable_buf`] to get a [`ReadBufResult`].
|
||||
//! * If the page was already cached, it'll be the [`ReadBufResult::Found`] variant that contains
|
||||
//! a read guard for the page. Just use it.
|
||||
//! * If the page was not cached, it'll be the [`ReadBufResult::NotFound`] variant that contains
|
||||
//! a write guard for the page. Fill the page with the contents of the on-disk file.
|
||||
//! Then call [`PageWriteGuard::mark_valid`] to mark the page as valid.
|
||||
//! Then try again to [`PageCache::read_immutable_buf`].
|
||||
//! Unless there's high cache pressure, the page should now be cached.
|
||||
//! (TODO: allow downgrading the write guard to a read guard to ensure forward progress.)
|
||||
//!
|
||||
//! # Locking
|
||||
//!
|
||||
//! There are two levels of locking involved: There's one lock for the "mapping"
|
||||
//! from page identifier (tenant ID, timeline ID, rel, block, LSN) to the buffer
|
||||
//! slot, and a separate lock on each slot. To read or write the contents of a
|
||||
//! slot, you must hold the lock on the slot in read or write mode,
|
||||
//! respectively. To change the mapping of a slot, i.e. to evict a page or to
|
||||
//! assign a buffer for a page, you must hold the mapping lock and the lock on
|
||||
//! the slot at the same time.
|
||||
//!
|
||||
//! Whenever you need to hold both locks simultaneously, the slot lock must be
|
||||
//! acquired first. This consistent ordering avoids deadlocks. To look up a page
|
||||
//! in the cache, you would first look up the mapping, while holding the mapping
|
||||
//! lock, and then lock the slot. You must release the mapping lock in between,
|
||||
//! to obey the lock ordering and avoid deadlock.
|
||||
//!
|
||||
//! A slot can momentarily have invalid contents, even if it's already been
|
||||
//! inserted to the mapping, but you must hold the write-lock on the slot until
|
||||
//! the contents are valid. If you need to release the lock without initializing
|
||||
//! the contents, you must remove the mapping first. We make that easy for the
|
||||
//! callers with PageWriteGuard: when lock_for_write() returns an uninitialized
|
||||
//! page, the caller must explicitly call guard.mark_valid() after it has
|
||||
//! initialized it. If the guard is dropped without calling mark_valid(), the
|
||||
//! mapping is automatically removed and the slot is marked free.
|
||||
//!
|
||||
|
||||
use std::{
|
||||
collections::{hash_map::Entry, HashMap},
|
||||
convert::TryInto,
|
||||
sync::atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
use once_cell::sync::OnceCell;
|
||||
use utils::{
|
||||
id::{TenantId, TimelineId},
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
use crate::{metrics::PageCacheSizeMetrics, repository::Key};
|
||||
|
||||
static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
|
||||
const TEST_PAGE_CACHE_SIZE: usize = 50;
|
||||
|
||||
///
|
||||
/// Initialize the page cache. This must be called once at page server startup.
|
||||
///
|
||||
pub fn init(size: usize) {
|
||||
if PAGE_CACHE.set(PageCache::new(size)).is_err() {
|
||||
panic!("page cache already initialized");
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Get a handle to the page cache.
|
||||
///
|
||||
pub fn get() -> &'static PageCache {
|
||||
//
|
||||
// In unit tests, page server startup doesn't happen and no one calls
|
||||
// page_cache::init(). Initialize it here with a tiny cache, so that the
|
||||
// page cache is usable in unit tests.
|
||||
//
|
||||
if cfg!(test) {
|
||||
PAGE_CACHE.get_or_init(|| PageCache::new(TEST_PAGE_CACHE_SIZE))
|
||||
} else {
|
||||
PAGE_CACHE.get().expect("page cache not initialized")
|
||||
}
|
||||
}
|
||||
|
||||
pub const PAGE_SZ: usize = postgres_ffi::BLCKSZ as usize;
|
||||
const MAX_USAGE_COUNT: u8 = 5;
|
||||
|
||||
/// See module-level comment.
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
|
||||
pub struct FileId(u64);
|
||||
|
||||
static NEXT_ID: AtomicU64 = AtomicU64::new(1);
|
||||
|
||||
/// See module-level comment.
|
||||
pub fn next_file_id() -> FileId {
|
||||
FileId(NEXT_ID.fetch_add(1, Ordering::Relaxed))
|
||||
}
|
||||
|
||||
///
|
||||
/// CacheKey uniquely identifies a "thing" to cache in the page cache.
|
||||
///
|
||||
#[derive(Debug, PartialEq, Eq, Clone)]
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
enum CacheKey {
|
||||
MaterializedPage {
|
||||
hash_key: MaterializedPageHashKey,
|
||||
lsn: Lsn,
|
||||
},
|
||||
ImmutableFilePage {
|
||||
file_id: FileId,
|
||||
blkno: u32,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
|
||||
struct MaterializedPageHashKey {
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
key: Key,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct Version {
|
||||
lsn: Lsn,
|
||||
slot_idx: usize,
|
||||
}
|
||||
|
||||
struct Slot {
|
||||
inner: tokio::sync::RwLock<SlotInner>,
|
||||
usage_count: AtomicU8,
|
||||
}
|
||||
|
||||
struct SlotInner {
|
||||
key: Option<CacheKey>,
|
||||
buf: &'static mut [u8; PAGE_SZ],
|
||||
}
|
||||
|
||||
impl Slot {
|
||||
/// Increment usage count on the buffer, with ceiling at MAX_USAGE_COUNT.
|
||||
fn inc_usage_count(&self) {
|
||||
let _ = self
|
||||
.usage_count
|
||||
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
|
||||
if val == MAX_USAGE_COUNT {
|
||||
None
|
||||
} else {
|
||||
Some(val + 1)
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// Decrement usage count on the buffer, unless it's already zero. Returns
|
||||
/// the old usage count.
|
||||
fn dec_usage_count(&self) -> u8 {
|
||||
let count_res =
|
||||
self.usage_count
|
||||
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
|
||||
if val == 0 {
|
||||
None
|
||||
} else {
|
||||
Some(val - 1)
|
||||
}
|
||||
});
|
||||
|
||||
match count_res {
|
||||
Ok(usage_count) => usage_count,
|
||||
Err(usage_count) => usage_count,
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets the usage count to a specific value.
|
||||
fn set_usage_count(&self, count: u8) {
|
||||
self.usage_count.store(count, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PageCache {
|
||||
/// This contains the mapping from the cache key to buffer slot that currently
|
||||
/// contains the page, if any.
|
||||
///
|
||||
/// TODO: This is protected by a single lock. If that becomes a bottleneck,
|
||||
/// this HashMap can be replaced with a more concurrent version, there are
|
||||
/// plenty of such crates around.
|
||||
///
|
||||
/// If you add support for caching different kinds of objects, each object kind
|
||||
/// can have a separate mapping map, next to this field.
|
||||
materialized_page_map: std::sync::RwLock<HashMap<MaterializedPageHashKey, Vec<Version>>>,
|
||||
|
||||
immutable_page_map: std::sync::RwLock<HashMap<(FileId, u32), usize>>,
|
||||
|
||||
/// The actual buffers with their metadata.
|
||||
slots: Box<[Slot]>,
|
||||
|
||||
/// Index of the next candidate to evict, for the Clock replacement algorithm.
|
||||
/// This is interpreted modulo the page cache size.
|
||||
next_evict_slot: AtomicUsize,
|
||||
|
||||
size_metrics: &'static PageCacheSizeMetrics,
|
||||
}
|
||||
|
||||
///
|
||||
/// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked
|
||||
/// until the guard is dropped.
|
||||
///
|
||||
pub struct PageReadGuard<'i>(tokio::sync::RwLockReadGuard<'i, SlotInner>);
|
||||
|
||||
impl std::ops::Deref for PageReadGuard<'_> {
|
||||
type Target = [u8; PAGE_SZ];
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.0.buf
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
|
||||
fn as_ref(&self) -> &[u8; PAGE_SZ] {
|
||||
self.0.buf
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// PageWriteGuard is a lease on a buffer for modifying it. The page is kept locked
|
||||
/// until the guard is dropped.
|
||||
///
|
||||
/// Counterintuitively, this is used even for a read, if the requested page is not
|
||||
/// currently found in the page cache. In that case, the caller of lock_for_read()
|
||||
/// is expected to fill in the page contents and call mark_valid(). Similarly
|
||||
/// lock_for_write() can return an invalid buffer that the caller is expected to
|
||||
/// to initialize.
|
||||
///
|
||||
pub struct PageWriteGuard<'i> {
|
||||
inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
|
||||
|
||||
// Are the page contents currently valid?
|
||||
// Used to mark pages as invalid that are assigned but not yet filled with data.
|
||||
valid: bool,
|
||||
}
|
||||
|
||||
impl std::ops::DerefMut for PageWriteGuard<'_> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
self.inner.buf
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::Deref for PageWriteGuard<'_> {
|
||||
type Target = [u8; PAGE_SZ];
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.inner.buf
|
||||
}
|
||||
}
|
||||
|
||||
impl AsMut<[u8; PAGE_SZ]> for PageWriteGuard<'_> {
|
||||
fn as_mut(&mut self) -> &mut [u8; PAGE_SZ] {
|
||||
self.inner.buf
|
||||
}
|
||||
}
|
||||
|
||||
impl PageWriteGuard<'_> {
|
||||
/// Mark that the buffer contents are now valid.
|
||||
pub fn mark_valid(&mut self) {
|
||||
assert!(self.inner.key.is_some());
|
||||
assert!(
|
||||
!self.valid,
|
||||
"mark_valid called on a buffer that was already valid"
|
||||
);
|
||||
self.valid = true;
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for PageWriteGuard<'_> {
|
||||
///
|
||||
/// If the buffer was allocated for a page that was not already in the
|
||||
/// cache, but the lock_for_read/write() caller dropped the buffer without
|
||||
/// initializing it, remove the mapping from the page cache.
|
||||
///
|
||||
fn drop(&mut self) {
|
||||
assert!(self.inner.key.is_some());
|
||||
if !self.valid {
|
||||
let self_key = self.inner.key.as_ref().unwrap();
|
||||
PAGE_CACHE.get().unwrap().remove_mapping(self_key);
|
||||
self.inner.key = None;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// lock_for_read() return value
|
||||
pub enum ReadBufResult<'a> {
|
||||
Found(PageReadGuard<'a>),
|
||||
NotFound(PageWriteGuard<'a>),
|
||||
}
|
||||
|
||||
/// lock_for_write() return value
|
||||
pub enum WriteBufResult<'a> {
|
||||
Found(PageWriteGuard<'a>),
|
||||
NotFound(PageWriteGuard<'a>),
|
||||
}
|
||||
|
||||
impl PageCache {
|
||||
//
|
||||
// Section 1.1: Public interface functions for looking up and memorizing materialized page
|
||||
// versions in the page cache
|
||||
//
|
||||
|
||||
/// Look up a materialized page version.
|
||||
///
|
||||
/// The 'lsn' is an upper bound, this will return the latest version of
|
||||
/// the given block, but not newer than 'lsn'. Returns the actual LSN of the
|
||||
/// returned page.
|
||||
pub async fn lookup_materialized_page(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
key: &Key,
|
||||
lsn: Lsn,
|
||||
) -> Option<(Lsn, PageReadGuard)> {
|
||||
crate::metrics::PAGE_CACHE
|
||||
.read_accesses_materialized_page
|
||||
.inc();
|
||||
|
||||
let mut cache_key = CacheKey::MaterializedPage {
|
||||
hash_key: MaterializedPageHashKey {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
key: *key,
|
||||
},
|
||||
lsn,
|
||||
};
|
||||
|
||||
if let Some(guard) = self.try_lock_for_read(&mut cache_key).await {
|
||||
if let CacheKey::MaterializedPage {
|
||||
hash_key: _,
|
||||
lsn: available_lsn,
|
||||
} = cache_key
|
||||
{
|
||||
if available_lsn == lsn {
|
||||
crate::metrics::PAGE_CACHE
|
||||
.read_hits_materialized_page_exact
|
||||
.inc();
|
||||
} else {
|
||||
crate::metrics::PAGE_CACHE
|
||||
.read_hits_materialized_page_older_lsn
|
||||
.inc();
|
||||
}
|
||||
Some((available_lsn, guard))
|
||||
} else {
|
||||
panic!("unexpected key type in slot");
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Store an image of the given page in the cache.
|
||||
///
|
||||
pub async fn memorize_materialized_page(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
img: &[u8],
|
||||
) -> anyhow::Result<()> {
|
||||
let cache_key = CacheKey::MaterializedPage {
|
||||
hash_key: MaterializedPageHashKey {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
key,
|
||||
},
|
||||
lsn,
|
||||
};
|
||||
|
||||
match self.lock_for_write(&cache_key).await? {
|
||||
WriteBufResult::Found(write_guard) => {
|
||||
// We already had it in cache. Another thread must've put it there
|
||||
// concurrently. Check that it had the same contents that we
|
||||
// replayed.
|
||||
assert!(*write_guard == img);
|
||||
}
|
||||
WriteBufResult::NotFound(mut write_guard) => {
|
||||
write_guard.copy_from_slice(img);
|
||||
write_guard.mark_valid();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Section 1.2: Public interface functions for working with immutable file pages.
|
||||
|
||||
pub async fn read_immutable_buf(
|
||||
&self,
|
||||
file_id: FileId,
|
||||
blkno: u32,
|
||||
) -> anyhow::Result<ReadBufResult> {
|
||||
let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno };
|
||||
|
||||
self.lock_for_read(&mut cache_key).await
|
||||
}
|
||||
|
||||
//
|
||||
// Section 2: Internal interface functions for lookup/update.
|
||||
//
|
||||
// To add support for a new kind of "thing" to cache, you will need
|
||||
// to add public interface routines above, and code to deal with the
|
||||
// "mappings" after this section. But the routines in this section should
|
||||
// not require changes.
|
||||
|
||||
/// Look up a page in the cache.
|
||||
///
|
||||
/// If the search criteria is not exact, *cache_key is updated with the key
|
||||
/// for exact key of the returned page. (For materialized pages, that means
|
||||
/// that the LSN in 'cache_key' is updated with the LSN of the returned page
|
||||
/// version.)
|
||||
///
|
||||
/// If no page is found, returns None and *cache_key is left unmodified.
|
||||
///
|
||||
async fn try_lock_for_read(&self, cache_key: &mut CacheKey) -> Option<PageReadGuard> {
|
||||
let cache_key_orig = cache_key.clone();
|
||||
if let Some(slot_idx) = self.search_mapping(cache_key) {
|
||||
// The page was found in the mapping. Lock the slot, and re-check
|
||||
// that it's still what we expected (because we released the mapping
|
||||
// lock already, another thread could have evicted the page)
|
||||
let slot = &self.slots[slot_idx];
|
||||
let inner = slot.inner.read().await;
|
||||
if inner.key.as_ref() == Some(cache_key) {
|
||||
slot.inc_usage_count();
|
||||
return Some(PageReadGuard(inner));
|
||||
} else {
|
||||
// search_mapping might have modified the search key; restore it.
|
||||
*cache_key = cache_key_orig;
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Return a locked buffer for given block.
|
||||
///
|
||||
/// Like try_lock_for_read(), if the search criteria is not exact and the
|
||||
/// page is already found in the cache, *cache_key is updated.
|
||||
///
|
||||
/// If the page is not found in the cache, this allocates a new buffer for
|
||||
/// it. The caller may then initialize the buffer with the contents, and
|
||||
/// call mark_valid().
|
||||
///
|
||||
/// Example usage:
|
||||
///
|
||||
/// ```ignore
|
||||
/// let cache = page_cache::get();
|
||||
///
|
||||
/// match cache.lock_for_read(&key) {
|
||||
/// ReadBufResult::Found(read_guard) => {
|
||||
/// // The page was found in cache. Use it
|
||||
/// },
|
||||
/// ReadBufResult::NotFound(write_guard) => {
|
||||
/// // The page was not found in cache. Read it from disk into the
|
||||
/// // buffer.
|
||||
/// //read_my_page_from_disk(write_guard);
|
||||
///
|
||||
/// // The buffer contents are now valid. Tell the page cache.
|
||||
/// write_guard.mark_valid();
|
||||
/// },
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
async fn lock_for_read(&self, cache_key: &mut CacheKey) -> anyhow::Result<ReadBufResult> {
|
||||
let (read_access, hit) = match cache_key {
|
||||
CacheKey::MaterializedPage { .. } => {
|
||||
unreachable!("Materialized pages use lookup_materialized_page")
|
||||
}
|
||||
CacheKey::ImmutableFilePage { .. } => (
|
||||
&crate::metrics::PAGE_CACHE.read_accesses_immutable,
|
||||
&crate::metrics::PAGE_CACHE.read_hits_immutable,
|
||||
),
|
||||
};
|
||||
read_access.inc();
|
||||
|
||||
let mut is_first_iteration = true;
|
||||
loop {
|
||||
// First check if the key already exists in the cache.
|
||||
if let Some(read_guard) = self.try_lock_for_read(cache_key).await {
|
||||
if is_first_iteration {
|
||||
hit.inc();
|
||||
}
|
||||
return Ok(ReadBufResult::Found(read_guard));
|
||||
}
|
||||
is_first_iteration = false;
|
||||
|
||||
// Not found. Find a victim buffer
|
||||
let (slot_idx, mut inner) =
|
||||
self.find_victim().context("Failed to find evict victim")?;
|
||||
|
||||
// Insert mapping for this. At this point, we may find that another
|
||||
// thread did the same thing concurrently. In that case, we evicted
|
||||
// our victim buffer unnecessarily. Put it into the free list and
|
||||
// continue with the slot that the other thread chose.
|
||||
if let Some(_existing_slot_idx) = self.try_insert_mapping(cache_key, slot_idx) {
|
||||
// TODO: put to free list
|
||||
|
||||
// We now just loop back to start from beginning. This is not
|
||||
// optimal, we'll perform the lookup in the mapping again, which
|
||||
// is not really necessary because we already got
|
||||
// 'existing_slot_idx'. But this shouldn't happen often enough
|
||||
// to matter much.
|
||||
continue;
|
||||
}
|
||||
|
||||
// Make the slot ready
|
||||
let slot = &self.slots[slot_idx];
|
||||
inner.key = Some(cache_key.clone());
|
||||
slot.set_usage_count(1);
|
||||
|
||||
return Ok(ReadBufResult::NotFound(PageWriteGuard {
|
||||
inner,
|
||||
valid: false,
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
/// Look up a page in the cache and lock it in write mode. If it's not
|
||||
/// found, returns None.
|
||||
///
|
||||
/// When locking a page for writing, the search criteria is always "exact".
|
||||
async fn try_lock_for_write(&self, cache_key: &CacheKey) -> Option<PageWriteGuard> {
|
||||
if let Some(slot_idx) = self.search_mapping_for_write(cache_key) {
|
||||
// The page was found in the mapping. Lock the slot, and re-check
|
||||
// that it's still what we expected (because we don't released the mapping
|
||||
// lock already, another thread could have evicted the page)
|
||||
let slot = &self.slots[slot_idx];
|
||||
let inner = slot.inner.write().await;
|
||||
if inner.key.as_ref() == Some(cache_key) {
|
||||
slot.inc_usage_count();
|
||||
return Some(PageWriteGuard { inner, valid: true });
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Return a write-locked buffer for given block.
|
||||
///
|
||||
/// Similar to lock_for_read(), but the returned buffer is write-locked and
|
||||
/// may be modified by the caller even if it's already found in the cache.
|
||||
async fn lock_for_write(&self, cache_key: &CacheKey) -> anyhow::Result<WriteBufResult> {
|
||||
loop {
|
||||
// First check if the key already exists in the cache.
|
||||
if let Some(write_guard) = self.try_lock_for_write(cache_key).await {
|
||||
return Ok(WriteBufResult::Found(write_guard));
|
||||
}
|
||||
|
||||
// Not found. Find a victim buffer
|
||||
let (slot_idx, mut inner) =
|
||||
self.find_victim().context("Failed to find evict victim")?;
|
||||
|
||||
// Insert mapping for this. At this point, we may find that another
|
||||
// thread did the same thing concurrently. In that case, we evicted
|
||||
// our victim buffer unnecessarily. Put it into the free list and
|
||||
// continue with the slot that the other thread chose.
|
||||
if let Some(_existing_slot_idx) = self.try_insert_mapping(cache_key, slot_idx) {
|
||||
// TODO: put to free list
|
||||
|
||||
// We now just loop back to start from beginning. This is not
|
||||
// optimal, we'll perform the lookup in the mapping again, which
|
||||
// is not really necessary because we already got
|
||||
// 'existing_slot_idx'. But this shouldn't happen often enough
|
||||
// to matter much.
|
||||
continue;
|
||||
}
|
||||
|
||||
// Make the slot ready
|
||||
let slot = &self.slots[slot_idx];
|
||||
inner.key = Some(cache_key.clone());
|
||||
slot.set_usage_count(1);
|
||||
|
||||
return Ok(WriteBufResult::NotFound(PageWriteGuard {
|
||||
inner,
|
||||
valid: false,
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Section 3: Mapping functions
|
||||
//
|
||||
|
||||
/// Search for a page in the cache using the given search key.
|
||||
///
|
||||
/// Returns the slot index, if any. If the search criteria is not exact,
|
||||
/// *cache_key is updated with the actual key of the found page.
|
||||
///
|
||||
/// NOTE: We don't hold any lock on the mapping on return, so the slot might
|
||||
/// get recycled for an unrelated page immediately after this function
|
||||
/// returns. The caller is responsible for re-checking that the slot still
|
||||
/// contains the page with the same key before using it.
|
||||
///
|
||||
fn search_mapping(&self, cache_key: &mut CacheKey) -> Option<usize> {
|
||||
match cache_key {
|
||||
CacheKey::MaterializedPage { hash_key, lsn } => {
|
||||
let map = self.materialized_page_map.read().unwrap();
|
||||
let versions = map.get(hash_key)?;
|
||||
|
||||
let version_idx = match versions.binary_search_by_key(lsn, |v| v.lsn) {
|
||||
Ok(version_idx) => version_idx,
|
||||
Err(0) => return None,
|
||||
Err(version_idx) => version_idx - 1,
|
||||
};
|
||||
let version = &versions[version_idx];
|
||||
*lsn = version.lsn;
|
||||
Some(version.slot_idx)
|
||||
}
|
||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
||||
let map = self.immutable_page_map.read().unwrap();
|
||||
Some(*map.get(&(*file_id, *blkno))?)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Search for a page in the cache using the given search key.
|
||||
///
|
||||
/// Like 'search_mapping, but performs an "exact" search. Used for
|
||||
/// allocating a new buffer.
|
||||
fn search_mapping_for_write(&self, key: &CacheKey) -> Option<usize> {
|
||||
match key {
|
||||
CacheKey::MaterializedPage { hash_key, lsn } => {
|
||||
let map = self.materialized_page_map.read().unwrap();
|
||||
let versions = map.get(hash_key)?;
|
||||
|
||||
if let Ok(version_idx) = versions.binary_search_by_key(lsn, |v| v.lsn) {
|
||||
Some(versions[version_idx].slot_idx)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
||||
let map = self.immutable_page_map.read().unwrap();
|
||||
Some(*map.get(&(*file_id, *blkno))?)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Remove mapping for given key.
|
||||
///
|
||||
fn remove_mapping(&self, old_key: &CacheKey) {
|
||||
match old_key {
|
||||
CacheKey::MaterializedPage {
|
||||
hash_key: old_hash_key,
|
||||
lsn: old_lsn,
|
||||
} => {
|
||||
let mut map = self.materialized_page_map.write().unwrap();
|
||||
if let Entry::Occupied(mut old_entry) = map.entry(old_hash_key.clone()) {
|
||||
let versions = old_entry.get_mut();
|
||||
|
||||
if let Ok(version_idx) = versions.binary_search_by_key(old_lsn, |v| v.lsn) {
|
||||
versions.remove(version_idx);
|
||||
self.size_metrics
|
||||
.current_bytes_materialized_page
|
||||
.sub_page_sz(1);
|
||||
if versions.is_empty() {
|
||||
old_entry.remove_entry();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
panic!("could not find old key in mapping")
|
||||
}
|
||||
}
|
||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
||||
let mut map = self.immutable_page_map.write().unwrap();
|
||||
map.remove(&(*file_id, *blkno))
|
||||
.expect("could not find old key in mapping");
|
||||
self.size_metrics.current_bytes_immutable.sub_page_sz(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Insert mapping for given key.
|
||||
///
|
||||
/// If a mapping already existed for the given key, returns the slot index
|
||||
/// of the existing mapping and leaves it untouched.
|
||||
fn try_insert_mapping(&self, new_key: &CacheKey, slot_idx: usize) -> Option<usize> {
|
||||
match new_key {
|
||||
CacheKey::MaterializedPage {
|
||||
hash_key: new_key,
|
||||
lsn: new_lsn,
|
||||
} => {
|
||||
let mut map = self.materialized_page_map.write().unwrap();
|
||||
let versions = map.entry(new_key.clone()).or_default();
|
||||
match versions.binary_search_by_key(new_lsn, |v| v.lsn) {
|
||||
Ok(version_idx) => Some(versions[version_idx].slot_idx),
|
||||
Err(version_idx) => {
|
||||
versions.insert(
|
||||
version_idx,
|
||||
Version {
|
||||
lsn: *new_lsn,
|
||||
slot_idx,
|
||||
},
|
||||
);
|
||||
self.size_metrics
|
||||
.current_bytes_materialized_page
|
||||
.add_page_sz(1);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
||||
let mut map = self.immutable_page_map.write().unwrap();
|
||||
match map.entry((*file_id, *blkno)) {
|
||||
Entry::Occupied(entry) => Some(*entry.get()),
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(slot_idx);
|
||||
self.size_metrics.current_bytes_immutable.add_page_sz(1);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Section 4: Misc internal helpers
|
||||
//
|
||||
|
||||
/// Find a slot to evict.
|
||||
///
|
||||
/// On return, the slot is empty and write-locked.
|
||||
fn find_victim(&self) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
|
||||
let iter_limit = self.slots.len() * 10;
|
||||
let mut iters = 0;
|
||||
loop {
|
||||
iters += 1;
|
||||
let slot_idx = self.next_evict_slot.fetch_add(1, Ordering::Relaxed) % self.slots.len();
|
||||
|
||||
let slot = &self.slots[slot_idx];
|
||||
|
||||
if slot.dec_usage_count() == 0 {
|
||||
let mut inner = match slot.inner.try_write() {
|
||||
Ok(inner) => inner,
|
||||
Err(_err) => {
|
||||
// If we have looped through the whole buffer pool 10 times
|
||||
// and still haven't found a victim buffer, something's wrong.
|
||||
// Maybe all the buffers were in locked. That could happen in
|
||||
// theory, if you have more threads holding buffers locked than
|
||||
// there are buffers in the pool. In practice, with a reasonably
|
||||
// large buffer pool it really shouldn't happen.
|
||||
if iters > iter_limit {
|
||||
anyhow::bail!("exceeded evict iter limit");
|
||||
}
|
||||
continue;
|
||||
}
|
||||
};
|
||||
if let Some(old_key) = &inner.key {
|
||||
// remove mapping for old buffer
|
||||
self.remove_mapping(old_key);
|
||||
inner.key = None;
|
||||
}
|
||||
return Ok((slot_idx, inner));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Initialize a new page cache
|
||||
///
|
||||
/// This should be called only once at page server startup.
|
||||
fn new(num_pages: usize) -> Self {
|
||||
assert!(num_pages > 0, "page cache size must be > 0");
|
||||
|
||||
// We use Box::leak here and into_boxed_slice to avoid leaking uninitialized
|
||||
// memory that Vec's might contain.
|
||||
let page_buffer = Box::leak(vec![0u8; num_pages * PAGE_SZ].into_boxed_slice());
|
||||
|
||||
let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
|
||||
size_metrics.max_bytes.set_page_sz(num_pages);
|
||||
size_metrics.current_bytes_immutable.set_page_sz(0);
|
||||
size_metrics.current_bytes_materialized_page.set_page_sz(0);
|
||||
|
||||
let slots = page_buffer
|
||||
.chunks_exact_mut(PAGE_SZ)
|
||||
.map(|chunk| {
|
||||
let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap();
|
||||
|
||||
Slot {
|
||||
inner: tokio::sync::RwLock::new(SlotInner { key: None, buf }),
|
||||
usage_count: AtomicU8::new(0),
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
Self {
|
||||
materialized_page_map: Default::default(),
|
||||
immutable_page_map: Default::default(),
|
||||
slots,
|
||||
next_evict_slot: AtomicUsize::new(0),
|
||||
size_metrics,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
trait PageSzBytesMetric {
|
||||
fn set_page_sz(&self, count: usize);
|
||||
fn add_page_sz(&self, count: usize);
|
||||
fn sub_page_sz(&self, count: usize);
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
fn count_times_page_sz(count: usize) -> u64 {
|
||||
u64::try_from(count).unwrap() * u64::try_from(PAGE_SZ).unwrap()
|
||||
}
|
||||
|
||||
impl PageSzBytesMetric for metrics::UIntGauge {
|
||||
fn set_page_sz(&self, count: usize) {
|
||||
self.set(count_times_page_sz(count));
|
||||
}
|
||||
fn add_page_sz(&self, count: usize) {
|
||||
self.add(count_times_page_sz(count));
|
||||
}
|
||||
fn sub_page_sz(&self, count: usize) {
|
||||
self.sub(count_times_page_sz(count));
|
||||
}
|
||||
}
|
||||
@@ -11,12 +11,11 @@
|
||||
//! len < 128: 0XXXXXXX
|
||||
//! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
|
||||
//!
|
||||
use crate::page_cache::PAGE_SZ;
|
||||
use crate::tenant::block_io::BlockCursor;
|
||||
use std::cmp::min;
|
||||
use std::io::{Error, ErrorKind};
|
||||
|
||||
use super::disk_btree::PAGE_SZ;
|
||||
|
||||
impl<'a> BlockCursor<'a> {
|
||||
/// Read a blob into a new buffer.
|
||||
pub async fn read_blob(&self, offset: u64) -> Result<Vec<u8>, std::io::Error> {
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
|
||||
use super::ephemeral_file::EphemeralFile;
|
||||
use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
|
||||
use crate::tenant::disk_btree::PAGE_SZ;
|
||||
use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ};
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use bytes::Bytes;
|
||||
use std::fs::File;
|
||||
@@ -36,14 +36,14 @@ where
|
||||
|
||||
/// Reference to an in-memory copy of an immutable on-disk block.
|
||||
pub enum BlockLease<'a> {
|
||||
PageReadGuard(crate::buffer_pool::Buffer),
|
||||
PageReadGuard(PageReadGuard<'static>),
|
||||
EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
|
||||
#[cfg(test)]
|
||||
Arc(std::sync::Arc<[u8; PAGE_SZ]>),
|
||||
}
|
||||
|
||||
impl From<crate::buffer_pool::Buffer> for BlockLease<'static> {
|
||||
fn from(value: crate::buffer_pool::Buffer) -> BlockLease<'static> {
|
||||
impl From<PageReadGuard<'static>> for BlockLease<'static> {
|
||||
fn from(value: PageReadGuard<'static>) -> BlockLease<'static> {
|
||||
BlockLease::PageReadGuard(value)
|
||||
}
|
||||
}
|
||||
@@ -74,7 +74,7 @@ impl<'a> Deref for BlockLease<'a> {
|
||||
/// Unlike traits, we also support the read function to be async though.
|
||||
pub(crate) enum BlockReaderRef<'a> {
|
||||
FileBlockReaderVirtual(&'a FileBlockReader<VirtualFile>),
|
||||
// FileBlockReaderFile(&'a FileBlockReader<std::fs::File>),
|
||||
FileBlockReaderFile(&'a FileBlockReader<std::fs::File>),
|
||||
EphemeralFile(&'a EphemeralFile),
|
||||
Adapter(Adapter<&'a DeltaLayerInner>),
|
||||
#[cfg(test)]
|
||||
@@ -87,7 +87,7 @@ impl<'a> BlockReaderRef<'a> {
|
||||
use BlockReaderRef::*;
|
||||
match self {
|
||||
FileBlockReaderVirtual(r) => r.read_blk(blknum).await,
|
||||
// FileBlockReaderFile(r) => r.read_blk(blknum).await,
|
||||
FileBlockReaderFile(r) => r.read_blk(blknum).await,
|
||||
EphemeralFile(r) => r.read_blk(blknum).await,
|
||||
Adapter(r) => r.read_blk(blknum).await,
|
||||
#[cfg(test)]
|
||||
@@ -145,52 +145,63 @@ impl<'a> BlockCursor<'a> {
|
||||
/// for modifying the file, nor for invalidating the cache if it is modified.
|
||||
pub struct FileBlockReader<F> {
|
||||
pub file: F,
|
||||
|
||||
/// Unique ID of this file, used as key in the page cache.
|
||||
file_id: page_cache::FileId,
|
||||
}
|
||||
|
||||
impl<F> FileBlockReader<F> {
|
||||
impl<F> FileBlockReader<F>
|
||||
where
|
||||
F: FileExt,
|
||||
{
|
||||
pub fn new(file: F) -> Self {
|
||||
FileBlockReader { file }
|
||||
let file_id = page_cache::next_file_id();
|
||||
|
||||
FileBlockReader { file_id, file }
|
||||
}
|
||||
|
||||
/// Read a page from the underlying file into given buffer.
|
||||
fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> {
|
||||
assert!(buf.len() == PAGE_SZ);
|
||||
self.file.read_exact_at(buf, blkno as u64 * PAGE_SZ as u64)
|
||||
}
|
||||
/// Read a block.
|
||||
///
|
||||
/// Returns a "lease" object that can be used to
|
||||
/// access to the contents of the page. (For the page cache, the
|
||||
/// lease object represents a lock on the buffer.)
|
||||
pub async fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
|
||||
let cache = page_cache::get();
|
||||
loop {
|
||||
match cache
|
||||
.read_immutable_buf(self.file_id, blknum)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
format!("Failed to read immutable buf: {e:#}"),
|
||||
)
|
||||
})? {
|
||||
ReadBufResult::Found(guard) => break Ok(guard.into()),
|
||||
ReadBufResult::NotFound(mut write_guard) => {
|
||||
// Read the page from disk into the buffer
|
||||
self.fill_buffer(write_guard.deref_mut(), blknum)?;
|
||||
write_guard.mark_valid();
|
||||
|
||||
// Swap for read lock
|
||||
continue;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! impls {
|
||||
(FileBlockReader<$ty:ty>) => {
|
||||
impl FileBlockReader<$ty> {
|
||||
/// Read a page from the underlying file into given buffer.
|
||||
async fn fill_buffer(
|
||||
&self,
|
||||
buf: crate::buffer_pool::Buffer,
|
||||
blkno: u32,
|
||||
) -> Result<crate::buffer_pool::Buffer, std::io::Error> {
|
||||
assert!(buf.len() == PAGE_SZ);
|
||||
self.file
|
||||
.read_exact_at_async(buf, blkno as u64 * PAGE_SZ as u64)
|
||||
.await
|
||||
}
|
||||
/// Read a block.
|
||||
///
|
||||
/// Returns a "lease" object that can be used to
|
||||
/// access to the contents of the page. (For the page cache, the
|
||||
/// lease object represents a lock on the buffer.)
|
||||
pub async fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
|
||||
let buf = crate::buffer_pool::get();
|
||||
// Read the page from disk into the buffer
|
||||
let mut write_guard = self.fill_buffer(buf, blknum).await?;
|
||||
Ok(BlockLease::PageReadGuard(write_guard))
|
||||
}
|
||||
}
|
||||
};
|
||||
impl BlockReader for FileBlockReader<File> {
|
||||
fn block_cursor(&self) -> BlockCursor<'_> {
|
||||
BlockCursor::new(BlockReaderRef::FileBlockReaderFile(self))
|
||||
}
|
||||
}
|
||||
|
||||
// impls!(FileBlockReader<File>);
|
||||
impls!(FileBlockReader<VirtualFile>);
|
||||
|
||||
// impl BlockReader for FileBlockReader<File> {
|
||||
// fn block_cursor(&self) -> BlockCursor<'_> {
|
||||
// BlockCursor::new(BlockReaderRef::FileBlockReaderFile(self))
|
||||
// }
|
||||
// }
|
||||
|
||||
impl BlockReader for FileBlockReader<VirtualFile> {
|
||||
fn block_cursor(&self) -> BlockCursor<'_> {
|
||||
BlockCursor::new(BlockReaderRef::FileBlockReaderVirtual(self))
|
||||
|
||||
@@ -2,19 +2,22 @@
|
||||
//! used to keep in-memory layers spilled on disk.
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::page_cache::{self, PAGE_SZ};
|
||||
use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
|
||||
use crate::tenant::disk_btree::PAGE_SZ;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use std::cmp::min;
|
||||
use std::fs::OpenOptions;
|
||||
use std::io::{self, ErrorKind};
|
||||
use std::ops::DerefMut;
|
||||
use std::os::unix::prelude::FileExt;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use tracing::*;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
pub struct EphemeralFile {
|
||||
page_cache_file_id: page_cache::FileId,
|
||||
|
||||
_tenant_id: TenantId,
|
||||
_timeline_id: TimelineId,
|
||||
file: VirtualFile,
|
||||
@@ -45,6 +48,7 @@ impl EphemeralFile {
|
||||
)?;
|
||||
|
||||
Ok(EphemeralFile {
|
||||
page_cache_file_id: page_cache::next_file_id(),
|
||||
_tenant_id: tenant_id,
|
||||
_timeline_id: timeline_id,
|
||||
file,
|
||||
@@ -60,14 +64,38 @@ impl EphemeralFile {
|
||||
pub(crate) async fn read_blk(&self, blknum: u32) -> Result<BlockLease, io::Error> {
|
||||
let flushed_blknums = 0..self.len / PAGE_SZ as u64;
|
||||
if flushed_blknums.contains(&(blknum as u64)) {
|
||||
let mut write_guard: crate::buffer_pool::Buffer = crate::buffer_pool::get();
|
||||
let buf: &mut [u8] = write_guard.deref_mut();
|
||||
debug_assert_eq!(buf.len(), PAGE_SZ);
|
||||
let mut buf = self
|
||||
.file
|
||||
.read_exact_at_async(write_guard, blknum as u64 * PAGE_SZ as u64)
|
||||
.await?;
|
||||
Ok(BlockLease::PageReadGuard(buf))
|
||||
let cache = page_cache::get();
|
||||
loop {
|
||||
match cache
|
||||
.read_immutable_buf(self.page_cache_file_id, blknum)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
// order path before error because error is anyhow::Error => might have many contexts
|
||||
format!(
|
||||
"ephemeral file: read immutable page #{}: {}: {:#}",
|
||||
blknum,
|
||||
self.file.path.display(),
|
||||
e,
|
||||
),
|
||||
)
|
||||
})? {
|
||||
page_cache::ReadBufResult::Found(guard) => {
|
||||
return Ok(BlockLease::PageReadGuard(guard))
|
||||
}
|
||||
page_cache::ReadBufResult::NotFound(mut write_guard) => {
|
||||
let buf: &mut [u8] = write_guard.deref_mut();
|
||||
debug_assert_eq!(buf.len(), PAGE_SZ);
|
||||
self.file
|
||||
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)?;
|
||||
write_guard.mark_valid();
|
||||
|
||||
// Swap for read lock
|
||||
continue;
|
||||
}
|
||||
};
|
||||
}
|
||||
} else {
|
||||
debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64);
|
||||
Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail))
|
||||
@@ -105,6 +133,32 @@ impl EphemeralFile {
|
||||
self.blknum as u64 * PAGE_SZ as u64,
|
||||
) {
|
||||
Ok(_) => {
|
||||
// Pre-warm the page cache with what we just wrote.
|
||||
// This isn't necessary for coherency/correctness, but it's how we've always done it.
|
||||
let cache = page_cache::get();
|
||||
match cache
|
||||
.read_immutable_buf(
|
||||
self.ephemeral_file.page_cache_file_id,
|
||||
self.blknum,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(page_cache::ReadBufResult::Found(_guard)) => {
|
||||
// This function takes &mut self, so, it shouldn't be possible to reach this point.
|
||||
unreachable!("we just wrote blknum {} and this function takes &mut self, so, no concurrent read_blk is possible", self.blknum);
|
||||
}
|
||||
Ok(page_cache::ReadBufResult::NotFound(mut write_guard)) => {
|
||||
let buf: &mut [u8] = write_guard.deref_mut();
|
||||
debug_assert_eq!(buf.len(), PAGE_SZ);
|
||||
buf.copy_from_slice(&self.ephemeral_file.mutable_tail);
|
||||
write_guard.mark_valid();
|
||||
// pre-warm successful
|
||||
}
|
||||
Err(e) => {
|
||||
error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}");
|
||||
// fail gracefully, it's not the end of the world if we can't pre-warm the cache here
|
||||
}
|
||||
}
|
||||
// Zero the buffer for re-use.
|
||||
// Zeroing is critical for correcntess because the write_blob code below
|
||||
// and similarly read_blk expect zeroed pages.
|
||||
|
||||
@@ -26,7 +26,7 @@
|
||||
//! recovered from this file. This is tracked in
|
||||
//! <https://github.com/neondatabase/neon/issues/4418>
|
||||
|
||||
use std::io::{self, Write};
|
||||
use std::io::{self, Read, Write};
|
||||
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use anyhow::Result;
|
||||
@@ -151,12 +151,11 @@ impl Manifest {
|
||||
/// Load a manifest. Returns the manifest and a list of operations. If the manifest is corrupted,
|
||||
/// the bool flag will be set to true and the user is responsible to reconstruct a new manifest and
|
||||
/// backup the current one.
|
||||
pub async fn load(
|
||||
file: VirtualFile,
|
||||
pub fn load(
|
||||
mut file: VirtualFile,
|
||||
) -> Result<(Self, Vec<Operation>, ManifestPartiallyCorrupted), ManifestLoadError> {
|
||||
let mut buf = vec![];
|
||||
file.read_exact_at(&mut buf, 0)
|
||||
.map_err(ManifestLoadError::Io)?;
|
||||
file.read_to_end(&mut buf).map_err(ManifestLoadError::Io)?;
|
||||
|
||||
// Read manifest header
|
||||
let mut buf = Bytes::from(buf);
|
||||
@@ -242,8 +241,8 @@ mod tests {
|
||||
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_read_manifest() {
|
||||
#[test]
|
||||
fn test_read_manifest() {
|
||||
let testdir = crate::config::PageServerConf::test_repo_dir("test_read_manifest");
|
||||
std::fs::create_dir_all(&testdir).unwrap();
|
||||
let file = VirtualFile::create(&testdir.join("MANIFEST")).unwrap();
|
||||
@@ -275,7 +274,7 @@ mod tests {
|
||||
.truncate(false),
|
||||
)
|
||||
.unwrap();
|
||||
let (mut manifest, operations, corrupted) = Manifest::load(file).await.unwrap();
|
||||
let (mut manifest, operations, corrupted) = Manifest::load(file).unwrap();
|
||||
assert!(!corrupted.0);
|
||||
assert_eq!(operations.len(), 2);
|
||||
assert_eq!(
|
||||
@@ -307,7 +306,7 @@ mod tests {
|
||||
.truncate(false),
|
||||
)
|
||||
.unwrap();
|
||||
let (_manifest, operations, corrupted) = Manifest::load(file).await.unwrap();
|
||||
let (_manifest, operations, corrupted) = Manifest::load(file).unwrap();
|
||||
assert!(!corrupted.0);
|
||||
assert_eq!(operations.len(), 3);
|
||||
assert_eq!(&operations[0], &Operation::Snapshot(snapshot, Lsn::from(0)));
|
||||
|
||||
@@ -29,7 +29,7 @@
|
||||
//!
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::RequestContext;
|
||||
use crate::tenant::disk_btree::PAGE_SZ;
|
||||
use crate::page_cache::PAGE_SZ;
|
||||
use crate::repository::{Key, Value, KEY_SIZE};
|
||||
use crate::tenant::blob_io::{BlobWriter, WriteBlobWriter};
|
||||
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
|
||||
@@ -45,8 +45,8 @@ use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind};
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fs::{self, File};
|
||||
use std::io::SeekFrom;
|
||||
use std::io::{BufWriter, Write};
|
||||
use std::io::{Seek, SeekFrom};
|
||||
use std::ops::Range;
|
||||
use std::os::unix::fs::FileExt;
|
||||
use std::path::{Path, PathBuf};
|
||||
@@ -845,8 +845,7 @@ impl DeltaLayerInner {
|
||||
path: &std::path::Path,
|
||||
summary: Option<Summary>,
|
||||
) -> anyhow::Result<Self> {
|
||||
let file = VirtualFile::open_async(path)
|
||||
.await
|
||||
let file = VirtualFile::open(path)
|
||||
.with_context(|| format!("Failed to open file '{}'", path.display()))?;
|
||||
let file = FileBlockReader::new(file);
|
||||
|
||||
|
||||
@@ -25,7 +25,7 @@
|
||||
//! actual page images are stored in the "values" part.
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::RequestContext;
|
||||
use crate::tenant::disk_btree::PAGE_SZ;
|
||||
use crate::page_cache::PAGE_SZ;
|
||||
use crate::repository::{Key, KEY_SIZE};
|
||||
use crate::tenant::blob_io::{BlobWriter, WriteBlobWriter};
|
||||
use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
|
||||
@@ -42,8 +42,8 @@ use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind};
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fs::{self, File};
|
||||
use std::io::SeekFrom;
|
||||
use std::io::Write;
|
||||
use std::io::{Seek, SeekFrom};
|
||||
use std::ops::Range;
|
||||
use std::os::unix::prelude::FileExt;
|
||||
use std::path::{Path, PathBuf};
|
||||
@@ -438,8 +438,7 @@ impl ImageLayerInner {
|
||||
lsn: Lsn,
|
||||
summary: Option<Summary>,
|
||||
) -> anyhow::Result<Self> {
|
||||
let file = VirtualFile::open_async(path)
|
||||
.await
|
||||
let file = VirtualFile::open(path)
|
||||
.with_context(|| format!("Failed to open file '{}'", path.display()))?;
|
||||
let file = FileBlockReader::new(file);
|
||||
let summary_blk = file.read_blk(0).await?;
|
||||
|
||||
@@ -38,7 +38,6 @@ use std::time::{Duration, Instant, SystemTime};
|
||||
use crate::context::{
|
||||
AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder,
|
||||
};
|
||||
use crate::tenant::disk_btree::PAGE_SZ;
|
||||
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
use crate::tenant::storage_layer::delta_layer::DeltaEntry;
|
||||
use crate::tenant::storage_layer::{
|
||||
@@ -74,6 +73,7 @@ use utils::{
|
||||
simple_rcu::{Rcu, RcuReadGuard},
|
||||
};
|
||||
|
||||
use crate::page_cache;
|
||||
use crate::repository::GcResult;
|
||||
use crate::repository::{Key, Value};
|
||||
use crate::task_mgr;
|
||||
@@ -2265,7 +2265,15 @@ impl Timeline {
|
||||
)));
|
||||
}
|
||||
}
|
||||
ancestor.wait_lsn(timeline.ancestor_lsn, ctx).await?;
|
||||
ancestor
|
||||
.wait_lsn(timeline.ancestor_lsn, ctx)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"wait for lsn {} on ancestor timeline_id={}",
|
||||
timeline.ancestor_lsn, ancestor.timeline_id
|
||||
)
|
||||
})?;
|
||||
|
||||
timeline_owned = ancestor;
|
||||
timeline = &*timeline_owned;
|
||||
@@ -2445,7 +2453,15 @@ impl Timeline {
|
||||
}
|
||||
|
||||
async fn lookup_cached_page(&self, key: &Key, lsn: Lsn) -> Option<(Lsn, Bytes)> {
|
||||
None
|
||||
let cache = page_cache::get();
|
||||
|
||||
// FIXME: It's pointless to check the cache for things that are not 8kB pages.
|
||||
// We should look at the key to determine if it's a cacheable object
|
||||
let (lsn, read_guard) = cache
|
||||
.lookup_materialized_page(self.tenant_id, self.timeline_id, key, lsn)
|
||||
.await?;
|
||||
let img = Bytes::from(read_guard.to_vec());
|
||||
Some((lsn, img))
|
||||
}
|
||||
|
||||
fn get_ancestor_timeline(&self) -> anyhow::Result<Arc<Timeline>> {
|
||||
@@ -3360,7 +3376,7 @@ impl Timeline {
|
||||
// Determine N largest holes where N is number of compacted layers.
|
||||
let max_holes = deltas_to_compact.len();
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
let min_hole_range = (target_file_size / PAGE_SZ as u64) as i128;
|
||||
let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
|
||||
let min_hole_coverage_size = 3; // TODO: something more flexible?
|
||||
|
||||
// min-heap (reserve space for one more element added before eviction)
|
||||
@@ -3596,7 +3612,7 @@ impl Timeline {
|
||||
// Add two pages for potential overhead. This should in theory be already
|
||||
// accounted for in the target calculation, but for very small targets,
|
||||
// we still might easily hit the limit otherwise.
|
||||
let warn_limit = target_file_size * 2 + PAGE_SZ as u64 * 2;
|
||||
let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
|
||||
for layer in new_layers.iter() {
|
||||
if layer.layer_desc().file_size > warn_limit {
|
||||
warn!(
|
||||
@@ -4184,6 +4200,23 @@ impl Timeline {
|
||||
Err(e) => return Err(PageReconstructError::from(e)),
|
||||
};
|
||||
|
||||
if img.len() == page_cache::PAGE_SZ {
|
||||
let cache = page_cache::get();
|
||||
if let Err(e) = cache
|
||||
.memorize_materialized_page(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
key,
|
||||
last_rec_lsn,
|
||||
&img,
|
||||
)
|
||||
.await
|
||||
.context("Materialized page memoization failed")
|
||||
{
|
||||
return Err(PageReconstructError::from(e));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(img)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,10 +31,11 @@ use storage_broker::Streaming;
|
||||
use tokio::select;
|
||||
use tracing::*;
|
||||
|
||||
use postgres_connection::{parse_host_port, PgConnectionConfig};
|
||||
use postgres_connection::PgConnectionConfig;
|
||||
use utils::backoff::{
|
||||
exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
|
||||
};
|
||||
use utils::postgres_client::wal_stream_connection_config;
|
||||
use utils::{
|
||||
id::{NodeId, TenantTimelineId},
|
||||
lsn::Lsn,
|
||||
@@ -879,33 +880,6 @@ impl ReconnectReason {
|
||||
}
|
||||
}
|
||||
|
||||
fn wal_stream_connection_config(
|
||||
TenantTimelineId {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
}: TenantTimelineId,
|
||||
listen_pg_addr_str: &str,
|
||||
auth_token: Option<&str>,
|
||||
availability_zone: Option<&str>,
|
||||
) -> anyhow::Result<PgConnectionConfig> {
|
||||
let (host, port) =
|
||||
parse_host_port(listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?;
|
||||
let port = port.unwrap_or(5432);
|
||||
let mut connstr = PgConnectionConfig::new_host_port(host, port)
|
||||
.extend_options([
|
||||
"-c".to_owned(),
|
||||
format!("timeline_id={}", timeline_id),
|
||||
format!("tenant_id={}", tenant_id),
|
||||
])
|
||||
.set_password(auth_token.map(|s| s.to_owned()));
|
||||
|
||||
if let Some(availability_zone) = availability_zone {
|
||||
connstr = connstr.extend_options([format!("availability_zone={}", availability_zone)]);
|
||||
}
|
||||
|
||||
Ok(connstr)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -921,6 +895,7 @@ mod tests {
|
||||
timeline: SafekeeperTimelineInfo {
|
||||
safekeeper_id: 0,
|
||||
tenant_timeline_id: None,
|
||||
term: 0,
|
||||
last_log_term: 0,
|
||||
flush_lsn: 0,
|
||||
commit_lsn,
|
||||
@@ -929,6 +904,7 @@ mod tests {
|
||||
peer_horizon_lsn: 0,
|
||||
local_start_lsn: 0,
|
||||
safekeeper_connstr: safekeeper_connstr.to_owned(),
|
||||
http_connstr: safekeeper_connstr.to_owned(),
|
||||
availability_zone: None,
|
||||
},
|
||||
latest_update,
|
||||
|
||||
@@ -11,15 +11,13 @@
|
||||
//! src/backend/storage/file/fd.c
|
||||
//!
|
||||
use crate::metrics::{STORAGE_IO_SIZE, STORAGE_IO_TIME};
|
||||
|
||||
use once_cell::sync::OnceCell;
|
||||
use std::fs::{self, File, OpenOptions};
|
||||
use std::io::{Error, ErrorKind, Seek, SeekFrom, Write};
|
||||
|
||||
use std::os::fd::OwnedFd;
|
||||
use std::io::{Error, ErrorKind, Read, Seek, SeekFrom, Write};
|
||||
use std::os::unix::fs::FileExt;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{RwLock, RwLockWriteGuard};
|
||||
|
||||
///
|
||||
/// A virtual file descriptor. You can use this just like std::fs::File, but internally
|
||||
@@ -41,7 +39,7 @@ pub struct VirtualFile {
|
||||
/// Lazy handle to the global file descriptor cache. The slot that this points to
|
||||
/// might contain our File, or it may be empty, or it may contain a File that
|
||||
/// belongs to a different VirtualFile.
|
||||
handle: Arc<Mutex<Option<File>>>, // only transiently None
|
||||
handle: RwLock<SlotHandle>,
|
||||
|
||||
/// Current file position
|
||||
pos: u64,
|
||||
@@ -53,6 +51,7 @@ pub struct VirtualFile {
|
||||
/// opened, in the VirtualFile::create() function, and strip the flag before
|
||||
/// storing it here.
|
||||
pub path: PathBuf,
|
||||
open_options: OpenOptions,
|
||||
|
||||
// These are strings becase we only use them for metrics, and those expect strings.
|
||||
// It makes no sense for us to constantly turn the `TimelineId` and `TenantId` into
|
||||
@@ -61,6 +60,118 @@ pub struct VirtualFile {
|
||||
timeline_id: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Clone, Copy)]
|
||||
struct SlotHandle {
|
||||
/// Index into OPEN_FILES.slots
|
||||
index: usize,
|
||||
|
||||
/// Value of 'tag' in the slot. If slot's tag doesn't match, then the slot has
|
||||
/// been recycled and no longer contains the FD for this virtual file.
|
||||
tag: u64,
|
||||
}
|
||||
|
||||
/// OPEN_FILES is the global array that holds the physical file descriptors that
|
||||
/// are currently open. Each slot in the array is protected by a separate lock,
|
||||
/// so that different files can be accessed independently. The lock must be held
|
||||
/// in write mode to replace the slot with a different file, but a read mode
|
||||
/// is enough to operate on the file, whether you're reading or writing to it.
|
||||
///
|
||||
/// OPEN_FILES starts in uninitialized state, and it's initialized by
|
||||
/// the virtual_file::init() function. It must be called exactly once at page
|
||||
/// server startup.
|
||||
static OPEN_FILES: OnceCell<OpenFiles> = OnceCell::new();
|
||||
|
||||
struct OpenFiles {
|
||||
slots: &'static [Slot],
|
||||
|
||||
/// clock arm for the clock algorithm
|
||||
next: AtomicUsize,
|
||||
}
|
||||
|
||||
struct Slot {
|
||||
inner: RwLock<SlotInner>,
|
||||
|
||||
/// has this file been used since last clock sweep?
|
||||
recently_used: AtomicBool,
|
||||
}
|
||||
|
||||
struct SlotInner {
|
||||
/// Counter that's incremented every time a different file is stored here.
|
||||
/// To avoid the ABA problem.
|
||||
tag: u64,
|
||||
|
||||
/// the underlying file
|
||||
file: Option<File>,
|
||||
}
|
||||
|
||||
impl OpenFiles {
|
||||
/// Find a slot to use, evicting an existing file descriptor if needed.
|
||||
///
|
||||
/// On return, we hold a lock on the slot, and its 'tag' has been updated
|
||||
/// recently_used has been set. It's all ready for reuse.
|
||||
fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
|
||||
//
|
||||
// Run the clock algorithm to find a slot to replace.
|
||||
//
|
||||
let num_slots = self.slots.len();
|
||||
let mut retries = 0;
|
||||
let mut slot;
|
||||
let mut slot_guard;
|
||||
let index;
|
||||
loop {
|
||||
let next = self.next.fetch_add(1, Ordering::AcqRel) % num_slots;
|
||||
slot = &self.slots[next];
|
||||
|
||||
// If the recently_used flag on this slot is set, continue the clock
|
||||
// sweep. Otherwise try to use this slot. If we cannot acquire the
|
||||
// lock, also continue the clock sweep.
|
||||
//
|
||||
// We only continue in this manner for a while, though. If we loop
|
||||
// through the array twice without finding a victim, just pick the
|
||||
// next slot and wait until we can reuse it. This way, we avoid
|
||||
// spinning in the extreme case that all the slots are busy with an
|
||||
// I/O operation.
|
||||
if retries < num_slots * 2 {
|
||||
if !slot.recently_used.swap(false, Ordering::Release) {
|
||||
if let Ok(guard) = slot.inner.try_write() {
|
||||
slot_guard = guard;
|
||||
index = next;
|
||||
break;
|
||||
}
|
||||
}
|
||||
retries += 1;
|
||||
} else {
|
||||
slot_guard = slot.inner.write().unwrap();
|
||||
index = next;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// We now have the victim slot locked. If it was in use previously, close the
|
||||
// old file.
|
||||
//
|
||||
if let Some(old_file) = slot_guard.file.take() {
|
||||
// the normal path of dropping VirtualFile uses "close", use "close-by-replace" here to
|
||||
// distinguish the two.
|
||||
STORAGE_IO_TIME
|
||||
.with_label_values(&["close-by-replace"])
|
||||
.observe_closure_duration(|| drop(old_file));
|
||||
}
|
||||
|
||||
// Prepare the slot for reuse and return it
|
||||
slot_guard.tag += 1;
|
||||
slot.recently_used.store(true, Ordering::Relaxed);
|
||||
(
|
||||
SlotHandle {
|
||||
index,
|
||||
tag: slot_guard.tag,
|
||||
},
|
||||
slot_guard,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl VirtualFile {
|
||||
/// Open a file in read-only mode. Like File::open.
|
||||
pub fn open(path: &Path) -> Result<VirtualFile, std::io::Error> {
|
||||
@@ -96,6 +207,7 @@ impl VirtualFile {
|
||||
tenant_id = "*".to_string();
|
||||
timeline_id = "*".to_string();
|
||||
}
|
||||
let (handle, mut slot_guard) = get_open_files().find_victim_slot();
|
||||
let file = STORAGE_IO_TIME
|
||||
.with_label_values(&["open"])
|
||||
.observe_closure_duration(|| open_options.open(path))?;
|
||||
@@ -111,76 +223,15 @@ impl VirtualFile {
|
||||
reopen_options.truncate(false);
|
||||
|
||||
let vfile = VirtualFile {
|
||||
handle: Arc::new(Mutex::new(Some(file))),
|
||||
handle: RwLock::new(handle),
|
||||
pos: 0,
|
||||
path: path.to_path_buf(),
|
||||
open_options: reopen_options,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
};
|
||||
|
||||
Ok(vfile)
|
||||
}
|
||||
|
||||
/// Open a file in read-only mode. Like File::open.
|
||||
pub async fn open_async(path: &Path) -> Result<VirtualFile, std::io::Error> {
|
||||
let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new();
|
||||
options.read(true);
|
||||
Self::open_with_options_async(path, options).await
|
||||
}
|
||||
|
||||
/// Open a file with given options.
|
||||
///
|
||||
/// Note: If any custom flags were set in 'open_options' through OpenOptionsExt,
|
||||
/// they will be applied also when the file is subsequently re-opened, not only
|
||||
/// on the first time. Make sure that's sane!
|
||||
pub async fn open_with_options_async(
|
||||
path: &Path,
|
||||
open_options: tokio_epoll_uring::ops::open_at::OpenOptions,
|
||||
) -> Result<VirtualFile, std::io::Error> {
|
||||
let path_str = path.to_string_lossy();
|
||||
let parts = path_str.split('/').collect::<Vec<&str>>();
|
||||
let tenant_id;
|
||||
let timeline_id;
|
||||
if parts.len() > 5 && parts[parts.len() - 5] == "tenants" {
|
||||
tenant_id = parts[parts.len() - 4].to_string();
|
||||
timeline_id = parts[parts.len() - 2].to_string();
|
||||
} else {
|
||||
tenant_id = "*".to_string();
|
||||
timeline_id = "*".to_string();
|
||||
}
|
||||
let start = std::time::Instant::now();
|
||||
let system = tokio_epoll_uring::thread_local_system().await;
|
||||
let file: OwnedFd = system
|
||||
.open(path, &open_options)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
tokio_epoll_uring::Error::Op(e) => e,
|
||||
tokio_epoll_uring::Error::System(system) => {
|
||||
std::io::Error::new(std::io::ErrorKind::Other, system)
|
||||
}
|
||||
})?;
|
||||
let file = File::from(file);
|
||||
STORAGE_IO_TIME
|
||||
.with_label_values(&["open"])
|
||||
.observe(start.elapsed().as_secs_f64());
|
||||
|
||||
// Strip all options other than read and write.
|
||||
//
|
||||
// It would perhaps be nicer to check just for the read and write flags
|
||||
// explicitly, but OpenOptions doesn't contain any functions to read flags,
|
||||
// only to set them.
|
||||
let mut reopen_options = open_options;
|
||||
reopen_options.create(false);
|
||||
reopen_options.create_new(false);
|
||||
reopen_options.truncate(false);
|
||||
|
||||
let vfile = VirtualFile {
|
||||
handle: Arc::new(Mutex::new(Some(file))),
|
||||
pos: 0,
|
||||
path: path.to_path_buf(),
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
};
|
||||
slot_guard.file.replace(file);
|
||||
|
||||
Ok(vfile)
|
||||
}
|
||||
@@ -193,9 +244,7 @@ impl VirtualFile {
|
||||
pub fn metadata(&self) -> Result<fs::Metadata, Error> {
|
||||
self.with_file("metadata", |file| file.metadata())?
|
||||
}
|
||||
}
|
||||
|
||||
impl VirtualFile {
|
||||
/// Helper function that looks up the underlying File for this VirtualFile,
|
||||
/// opening it and evicting some other File if necessary. It calls 'func'
|
||||
/// with the physical File.
|
||||
@@ -203,9 +252,68 @@ impl VirtualFile {
|
||||
where
|
||||
F: FnMut(&File) -> R,
|
||||
{
|
||||
return Ok(STORAGE_IO_TIME
|
||||
let open_files = get_open_files();
|
||||
|
||||
let mut handle_guard = {
|
||||
// Read the cached slot handle, and see if the slot that it points to still
|
||||
// contains our File.
|
||||
//
|
||||
// We only need to hold the handle lock while we read the current handle. If
|
||||
// another thread closes the file and recycles the slot for a different file,
|
||||
// we will notice that the handle we read is no longer valid and retry.
|
||||
let mut handle = *self.handle.read().unwrap();
|
||||
loop {
|
||||
// Check if the slot contains our File
|
||||
{
|
||||
let slot = &open_files.slots[handle.index];
|
||||
let slot_guard = slot.inner.read().unwrap();
|
||||
if slot_guard.tag == handle.tag {
|
||||
if let Some(file) = &slot_guard.file {
|
||||
// Found a cached file descriptor.
|
||||
slot.recently_used.store(true, Ordering::Relaxed);
|
||||
return Ok(STORAGE_IO_TIME
|
||||
.with_label_values(&[op])
|
||||
.observe_closure_duration(|| func(file)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// The slot didn't contain our File. We will have to open it ourselves,
|
||||
// but before that, grab a write lock on handle in the VirtualFile, so
|
||||
// that no other thread will try to concurrently open the same file.
|
||||
let handle_guard = self.handle.write().unwrap();
|
||||
|
||||
// If another thread changed the handle while we were not holding the lock,
|
||||
// then the handle might now be valid again. Loop back to retry.
|
||||
if *handle_guard != handle {
|
||||
handle = *handle_guard;
|
||||
continue;
|
||||
}
|
||||
break handle_guard;
|
||||
}
|
||||
};
|
||||
|
||||
// We need to open the file ourselves. The handle in the VirtualFile is
|
||||
// now locked in write-mode. Find a free slot to put it in.
|
||||
let (handle, mut slot_guard) = open_files.find_victim_slot();
|
||||
|
||||
// Open the physical file
|
||||
let file = STORAGE_IO_TIME
|
||||
.with_label_values(&["open"])
|
||||
.observe_closure_duration(|| self.open_options.open(&self.path))?;
|
||||
|
||||
// Perform the requested operation on it
|
||||
let result = STORAGE_IO_TIME
|
||||
.with_label_values(&[op])
|
||||
.observe_closure_duration(|| func(&*self.handle.lock().unwrap().as_ref().unwrap())));
|
||||
.observe_closure_duration(|| func(&file));
|
||||
|
||||
// Store the File in the slot and update the handle in the VirtualFile
|
||||
// to point to it.
|
||||
slot_guard.file.replace(file);
|
||||
|
||||
*handle_guard = handle;
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub fn remove(self) {
|
||||
@@ -215,6 +323,35 @@ impl VirtualFile {
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for VirtualFile {
|
||||
/// If a VirtualFile is dropped, close the underlying file if it was open.
|
||||
fn drop(&mut self) {
|
||||
let handle = self.handle.get_mut().unwrap();
|
||||
|
||||
// We could check with a read-lock first, to avoid waiting on an
|
||||
// unrelated I/O.
|
||||
let slot = &get_open_files().slots[handle.index];
|
||||
let mut slot_guard = slot.inner.write().unwrap();
|
||||
if slot_guard.tag == handle.tag {
|
||||
slot.recently_used.store(false, Ordering::Relaxed);
|
||||
// there is also operation "close-by-replace" for closes done on eviction for
|
||||
// comparison.
|
||||
STORAGE_IO_TIME
|
||||
.with_label_values(&["close"])
|
||||
.observe_closure_duration(|| drop(slot_guard.file.take()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Read for VirtualFile {
|
||||
fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
|
||||
let pos = self.pos;
|
||||
let n = self.read_at(buf, pos)?;
|
||||
self.pos += n as u64;
|
||||
Ok(n)
|
||||
}
|
||||
}
|
||||
|
||||
impl Write for VirtualFile {
|
||||
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
|
||||
let pos = self.pos;
|
||||
@@ -230,8 +367,8 @@ impl Write for VirtualFile {
|
||||
}
|
||||
}
|
||||
|
||||
impl VirtualFile {
|
||||
pub fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
|
||||
impl Seek for VirtualFile {
|
||||
fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
|
||||
match pos {
|
||||
SeekFrom::Start(offset) => {
|
||||
self.pos = offset;
|
||||
@@ -255,113 +392,11 @@ impl VirtualFile {
|
||||
}
|
||||
Ok(self.pos)
|
||||
}
|
||||
}
|
||||
|
||||
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
|
||||
pub fn read_exact_at(&self, mut buf: &mut [u8], mut offset: u64) -> Result<(), Error> {
|
||||
while !buf.is_empty() {
|
||||
match self.read_at(buf, offset) {
|
||||
Ok(0) => {
|
||||
return Err(Error::new(
|
||||
std::io::ErrorKind::UnexpectedEof,
|
||||
"failed to fill whole buffer",
|
||||
))
|
||||
}
|
||||
Ok(n) => {
|
||||
buf = &mut buf[n..];
|
||||
offset += n as u64;
|
||||
}
|
||||
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
|
||||
pub async fn read_exact_at_async(
|
||||
&self,
|
||||
mut write_guard: crate::buffer_pool::Buffer,
|
||||
offset: u64,
|
||||
) -> Result<crate::buffer_pool::Buffer, Error> {
|
||||
let file = self.handle.lock().unwrap().take().unwrap();
|
||||
let put_back = AtomicBool::new(false);
|
||||
let put_back_ref = &put_back;
|
||||
scopeguard::defer! {
|
||||
if !put_back_ref.load(std::sync::atomic::Ordering::Relaxed) {
|
||||
panic!("mut put self.handle back")
|
||||
}
|
||||
};
|
||||
let system = tokio_epoll_uring::thread_local_system().await;
|
||||
struct PageWriteGuardBuf {
|
||||
buf: crate::buffer_pool::Buffer,
|
||||
init_up_to: usize,
|
||||
}
|
||||
unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf {
|
||||
fn stable_ptr(&self) -> *const u8 {
|
||||
self.buf.as_ptr()
|
||||
}
|
||||
fn bytes_init(&self) -> usize {
|
||||
self.init_up_to
|
||||
}
|
||||
fn bytes_total(&self) -> usize {
|
||||
self.buf.len()
|
||||
}
|
||||
}
|
||||
unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf {
|
||||
fn stable_mut_ptr(&mut self) -> *mut u8 {
|
||||
self.buf.as_mut_ptr()
|
||||
}
|
||||
|
||||
unsafe fn set_init(&mut self, pos: usize) {
|
||||
assert!(pos <= self.buf.len());
|
||||
self.init_up_to = pos;
|
||||
}
|
||||
}
|
||||
let buf = PageWriteGuardBuf {
|
||||
buf: write_guard,
|
||||
init_up_to: 0,
|
||||
};
|
||||
let ((file, buf), res) = system.read(file.into(), offset, buf).await;
|
||||
let PageWriteGuardBuf {
|
||||
buf: write_guard,
|
||||
init_up_to,
|
||||
} = buf;
|
||||
if let Ok(num_read) = res {
|
||||
assert!(init_up_to <= num_read);
|
||||
}
|
||||
let replaced = self.handle.lock().unwrap().replace(File::from(file));
|
||||
assert!(replaced.is_none());
|
||||
put_back.store(true, std::sync::atomic::Ordering::Relaxed);
|
||||
res.map(|_| write_guard)
|
||||
.map_err(|e| Error::new(ErrorKind::Other, e))
|
||||
}
|
||||
|
||||
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
|
||||
pub fn write_all_at(&self, mut buf: &[u8], mut offset: u64) -> Result<(), Error> {
|
||||
while !buf.is_empty() {
|
||||
match self.write_at(buf, offset) {
|
||||
Ok(0) => {
|
||||
return Err(Error::new(
|
||||
std::io::ErrorKind::WriteZero,
|
||||
"failed to write whole buffer",
|
||||
));
|
||||
}
|
||||
Ok(n) => {
|
||||
buf = &buf[n..];
|
||||
offset += n as u64;
|
||||
}
|
||||
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
impl FileExt for VirtualFile {
|
||||
fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
|
||||
let result = self.with_file("read", |file| {
|
||||
tracing::info!("sync read\n{}", std::backtrace::Backtrace::force_capture());
|
||||
file.read_at(buf, offset)
|
||||
})?;
|
||||
let result = self.with_file("read", |file| file.read_at(buf, offset))?;
|
||||
if let Ok(size) = result {
|
||||
STORAGE_IO_SIZE
|
||||
.with_label_values(&["read", &self.tenant_id, &self.timeline_id])
|
||||
@@ -370,7 +405,7 @@ impl VirtualFile {
|
||||
result
|
||||
}
|
||||
|
||||
pub fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
|
||||
fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
|
||||
let result = self.with_file("write", |file| file.write_at(buf, offset))?;
|
||||
if let Ok(size) = result {
|
||||
STORAGE_IO_SIZE
|
||||
@@ -380,3 +415,256 @@ impl VirtualFile {
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
impl OpenFiles {
|
||||
fn new(num_slots: usize) -> OpenFiles {
|
||||
let mut slots = Box::new(Vec::with_capacity(num_slots));
|
||||
for _ in 0..num_slots {
|
||||
let slot = Slot {
|
||||
recently_used: AtomicBool::new(false),
|
||||
inner: RwLock::new(SlotInner { tag: 0, file: None }),
|
||||
};
|
||||
slots.push(slot);
|
||||
}
|
||||
|
||||
OpenFiles {
|
||||
next: AtomicUsize::new(0),
|
||||
slots: Box::leak(slots),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Initialize the virtual file module. This must be called once at page
|
||||
/// server startup.
|
||||
///
|
||||
pub fn init(num_slots: usize) {
|
||||
if OPEN_FILES.set(OpenFiles::new(num_slots)).is_err() {
|
||||
panic!("virtual_file::init called twice");
|
||||
}
|
||||
}
|
||||
|
||||
const TEST_MAX_FILE_DESCRIPTORS: usize = 10;
|
||||
|
||||
// Get a handle to the global slots array.
|
||||
fn get_open_files() -> &'static OpenFiles {
|
||||
//
|
||||
// In unit tests, page server startup doesn't happen and no one calls
|
||||
// virtual_file::init(). Initialize it here, with a small array.
|
||||
//
|
||||
// This applies to the virtual file tests below, but all other unit
|
||||
// tests too, so the virtual file facility is always usable in
|
||||
// unit tests.
|
||||
//
|
||||
if cfg!(test) {
|
||||
OPEN_FILES.get_or_init(|| OpenFiles::new(TEST_MAX_FILE_DESCRIPTORS))
|
||||
} else {
|
||||
OPEN_FILES.get().expect("virtual_file::init not called yet")
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use rand::seq::SliceRandom;
|
||||
use rand::thread_rng;
|
||||
use rand::Rng;
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
|
||||
// Helper function to slurp contents of a file, starting at the current position,
|
||||
// into a string
|
||||
fn read_string<FD>(vfile: &mut FD) -> Result<String, Error>
|
||||
where
|
||||
FD: Read,
|
||||
{
|
||||
let mut buf = String::new();
|
||||
vfile.read_to_string(&mut buf)?;
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
// Helper function to slurp a portion of a file into a string
|
||||
fn read_string_at<FD>(vfile: &mut FD, pos: u64, len: usize) -> Result<String, Error>
|
||||
where
|
||||
FD: FileExt,
|
||||
{
|
||||
let mut buf = Vec::new();
|
||||
buf.resize(len, 0);
|
||||
vfile.read_exact_at(&mut buf, pos)?;
|
||||
Ok(String::from_utf8(buf).unwrap())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_virtual_files() -> Result<(), Error> {
|
||||
// The real work is done in the test_files() helper function. This
|
||||
// allows us to run the same set of tests against a native File, and
|
||||
// VirtualFile. We trust the native Files and wouldn't need to test them,
|
||||
// but this allows us to verify that the operations return the same
|
||||
// results with VirtualFiles as with native Files. (Except that with
|
||||
// native files, you will run out of file descriptors if the ulimit
|
||||
// is low enough.)
|
||||
test_files("virtual_files", |path, open_options| {
|
||||
VirtualFile::open_with_options(path, open_options)
|
||||
})
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_physical_files() -> Result<(), Error> {
|
||||
test_files("physical_files", |path, open_options| {
|
||||
open_options.open(path)
|
||||
})
|
||||
}
|
||||
|
||||
fn test_files<OF, FD>(testname: &str, openfunc: OF) -> Result<(), Error>
|
||||
where
|
||||
FD: Read + Write + Seek + FileExt,
|
||||
OF: Fn(&Path, &OpenOptions) -> Result<FD, std::io::Error>,
|
||||
{
|
||||
let testdir = crate::config::PageServerConf::test_repo_dir(testname);
|
||||
std::fs::create_dir_all(&testdir)?;
|
||||
|
||||
let path_a = testdir.join("file_a");
|
||||
let mut file_a = openfunc(
|
||||
&path_a,
|
||||
OpenOptions::new().write(true).create(true).truncate(true),
|
||||
)?;
|
||||
file_a.write_all(b"foobar")?;
|
||||
|
||||
// cannot read from a file opened in write-only mode
|
||||
assert!(read_string(&mut file_a).is_err());
|
||||
|
||||
// Close the file and re-open for reading
|
||||
let mut file_a = openfunc(&path_a, OpenOptions::new().read(true))?;
|
||||
|
||||
// cannot write to a file opened in read-only mode
|
||||
assert!(file_a.write(b"bar").is_err());
|
||||
|
||||
// Try simple read
|
||||
assert_eq!("foobar", read_string(&mut file_a)?);
|
||||
|
||||
// It's positioned at the EOF now.
|
||||
assert_eq!("", read_string(&mut file_a)?);
|
||||
|
||||
// Test seeks.
|
||||
assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1);
|
||||
assert_eq!("oobar", read_string(&mut file_a)?);
|
||||
|
||||
assert_eq!(file_a.seek(SeekFrom::End(-2))?, 4);
|
||||
assert_eq!("ar", read_string(&mut file_a)?);
|
||||
|
||||
assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1);
|
||||
assert_eq!(file_a.seek(SeekFrom::Current(2))?, 3);
|
||||
assert_eq!("bar", read_string(&mut file_a)?);
|
||||
|
||||
assert_eq!(file_a.seek(SeekFrom::Current(-5))?, 1);
|
||||
assert_eq!("oobar", read_string(&mut file_a)?);
|
||||
|
||||
// Test erroneous seeks to before byte 0
|
||||
assert!(file_a.seek(SeekFrom::End(-7)).is_err());
|
||||
assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1);
|
||||
assert!(file_a.seek(SeekFrom::Current(-2)).is_err());
|
||||
|
||||
// the erroneous seek should have left the position unchanged
|
||||
assert_eq!("oobar", read_string(&mut file_a)?);
|
||||
|
||||
// Create another test file, and try FileExt functions on it.
|
||||
let path_b = testdir.join("file_b");
|
||||
let mut file_b = openfunc(
|
||||
&path_b,
|
||||
OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create(true)
|
||||
.truncate(true),
|
||||
)?;
|
||||
file_b.write_all_at(b"BAR", 3)?;
|
||||
file_b.write_all_at(b"FOO", 0)?;
|
||||
|
||||
assert_eq!(read_string_at(&mut file_b, 2, 3)?, "OBA");
|
||||
|
||||
// Open a lot of files, enough to cause some evictions. (Or to be precise,
|
||||
// open the same file many times. The effect is the same.)
|
||||
//
|
||||
// leave file_a positioned at offset 1 before we start
|
||||
assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1);
|
||||
|
||||
let mut vfiles = Vec::new();
|
||||
for _ in 0..100 {
|
||||
let mut vfile = openfunc(&path_b, OpenOptions::new().read(true))?;
|
||||
assert_eq!("FOOBAR", read_string(&mut vfile)?);
|
||||
vfiles.push(vfile);
|
||||
}
|
||||
|
||||
// make sure we opened enough files to definitely cause evictions.
|
||||
assert!(vfiles.len() > TEST_MAX_FILE_DESCRIPTORS * 2);
|
||||
|
||||
// The underlying file descriptor for 'file_a' should be closed now. Try to read
|
||||
// from it again. We left the file positioned at offset 1 above.
|
||||
assert_eq!("oobar", read_string(&mut file_a)?);
|
||||
|
||||
// Check that all the other FDs still work too. Use them in random order for
|
||||
// good measure.
|
||||
vfiles.as_mut_slice().shuffle(&mut thread_rng());
|
||||
for vfile in vfiles.iter_mut() {
|
||||
assert_eq!("OOBAR", read_string_at(vfile, 1, 5)?);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Test using VirtualFiles from many threads concurrently. This tests both using
|
||||
/// a lot of VirtualFiles concurrently, causing evictions, and also using the same
|
||||
/// VirtualFile from multiple threads concurrently.
|
||||
#[test]
|
||||
fn test_vfile_concurrency() -> Result<(), Error> {
|
||||
const SIZE: usize = 8 * 1024;
|
||||
const VIRTUAL_FILES: usize = 100;
|
||||
const THREADS: usize = 100;
|
||||
const SAMPLE: [u8; SIZE] = [0xADu8; SIZE];
|
||||
|
||||
let testdir = crate::config::PageServerConf::test_repo_dir("vfile_concurrency");
|
||||
std::fs::create_dir_all(&testdir)?;
|
||||
|
||||
// Create a test file.
|
||||
let test_file_path = testdir.join("concurrency_test_file");
|
||||
{
|
||||
let file = File::create(&test_file_path)?;
|
||||
file.write_all_at(&SAMPLE, 0)?;
|
||||
}
|
||||
|
||||
// Open the file many times.
|
||||
let mut files = Vec::new();
|
||||
for _ in 0..VIRTUAL_FILES {
|
||||
let f = VirtualFile::open_with_options(&test_file_path, OpenOptions::new().read(true))?;
|
||||
files.push(f);
|
||||
}
|
||||
let files = Arc::new(files);
|
||||
|
||||
// Launch many threads, and use the virtual files concurrently in random order.
|
||||
let mut threads = Vec::new();
|
||||
for threadno in 0..THREADS {
|
||||
let builder =
|
||||
thread::Builder::new().name(format!("test_vfile_concurrency thread {}", threadno));
|
||||
|
||||
let files = files.clone();
|
||||
let thread = builder
|
||||
.spawn(move || {
|
||||
let mut buf = [0u8; SIZE];
|
||||
let mut rng = rand::thread_rng();
|
||||
for _ in 1..1000 {
|
||||
let f = &files[rng.gen_range(0..files.len())];
|
||||
f.read_exact_at(&mut buf, 0).unwrap();
|
||||
assert!(buf == SAMPLE);
|
||||
}
|
||||
})
|
||||
.unwrap();
|
||||
threads.push(thread);
|
||||
}
|
||||
|
||||
for thread in threads {
|
||||
thread.join().unwrap();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -341,21 +341,35 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
|
||||
|
||||
let (wal_backup_launcher_tx, wal_backup_launcher_rx) = mpsc::channel(100);
|
||||
|
||||
// Load all timelines from disk to memory.
|
||||
GlobalTimelines::init(conf.clone(), wal_backup_launcher_tx)?;
|
||||
|
||||
// Keep handles to main tasks to die if any of them disappears.
|
||||
let mut tasks_handles: FuturesUnordered<BoxFuture<(String, JoinTaskRes)>> =
|
||||
FuturesUnordered::new();
|
||||
|
||||
// Start wal backup launcher before loading timelines as we'll notify it
|
||||
// through the channel about timelines which need offloading, not draining
|
||||
// the channel would cause deadlock.
|
||||
let current_thread_rt = conf
|
||||
.current_thread_runtime
|
||||
.then(|| Handle::try_current().expect("no runtime in main"));
|
||||
let conf_ = conf.clone();
|
||||
let wal_backup_handle = current_thread_rt
|
||||
.as_ref()
|
||||
.unwrap_or_else(|| WAL_BACKUP_RUNTIME.handle())
|
||||
.spawn(wal_backup::wal_backup_launcher_task_main(
|
||||
conf_,
|
||||
wal_backup_launcher_rx,
|
||||
))
|
||||
.map(|res| ("WAL backup launcher".to_owned(), res));
|
||||
tasks_handles.push(Box::pin(wal_backup_handle));
|
||||
|
||||
// Load all timelines from disk to memory.
|
||||
GlobalTimelines::init(conf.clone(), wal_backup_launcher_tx).await?;
|
||||
|
||||
let conf_ = conf.clone();
|
||||
// Run everything in current thread rt, if asked.
|
||||
if conf.current_thread_runtime {
|
||||
info!("running in current thread runtime");
|
||||
}
|
||||
let current_thread_rt = conf
|
||||
.current_thread_runtime
|
||||
.then(|| Handle::try_current().expect("no runtime in main"));
|
||||
|
||||
let wal_service_handle = current_thread_rt
|
||||
.as_ref()
|
||||
@@ -408,17 +422,6 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
|
||||
.map(|res| ("WAL remover".to_owned(), res));
|
||||
tasks_handles.push(Box::pin(wal_remover_handle));
|
||||
|
||||
let conf_ = conf.clone();
|
||||
let wal_backup_handle = current_thread_rt
|
||||
.as_ref()
|
||||
.unwrap_or_else(|| WAL_BACKUP_RUNTIME.handle())
|
||||
.spawn(wal_backup::wal_backup_launcher_task_main(
|
||||
conf_,
|
||||
wal_backup_launcher_rx,
|
||||
))
|
||||
.map(|res| ("WAL backup launcher".to_owned(), res));
|
||||
tasks_handles.push(Box::pin(wal_backup_handle));
|
||||
|
||||
set_build_info_metric(GIT_VERSION);
|
||||
|
||||
// TODO: update tokio-stream, convert to real async Stream with
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
//! Code to deal with safekeeper control file upgrades
|
||||
use crate::safekeeper::{
|
||||
AcceptorState, PersistedPeers, PgUuid, SafeKeeperState, ServerInfo, Term, TermHistory,
|
||||
TermSwitchEntry,
|
||||
AcceptorState, PersistedPeers, PgUuid, SafeKeeperState, ServerInfo, Term, TermHistory, TermLsn,
|
||||
};
|
||||
use anyhow::{bail, Result};
|
||||
use pq_proto::SystemId;
|
||||
@@ -145,7 +144,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
|
||||
let oldstate = SafeKeeperStateV1::des(&buf[..buf.len()])?;
|
||||
let ac = AcceptorState {
|
||||
term: oldstate.acceptor_state.term,
|
||||
term_history: TermHistory(vec![TermSwitchEntry {
|
||||
term_history: TermHistory(vec![TermLsn {
|
||||
term: oldstate.acceptor_state.epoch,
|
||||
lsn: Lsn(0),
|
||||
}]),
|
||||
|
||||
@@ -19,6 +19,7 @@ use crate::receive_wal::WalReceiverState;
|
||||
use crate::safekeeper::ServerInfo;
|
||||
use crate::safekeeper::Term;
|
||||
use crate::send_wal::WalSenderState;
|
||||
use crate::timeline::PeerInfo;
|
||||
use crate::{debug_dump, pull_timeline};
|
||||
|
||||
use crate::timelines_global_map::TimelineDeleteForceResult;
|
||||
@@ -101,6 +102,7 @@ pub struct TimelineStatus {
|
||||
pub peer_horizon_lsn: Lsn,
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub remote_consistent_lsn: Lsn,
|
||||
pub peers: Vec<PeerInfo>,
|
||||
pub walsenders: Vec<WalSenderState>,
|
||||
pub walreceivers: Vec<WalReceiverState>,
|
||||
}
|
||||
@@ -140,6 +142,7 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
|
||||
term_history,
|
||||
};
|
||||
|
||||
let conf = get_conf(&request);
|
||||
// Note: we report in memory values which can be lost.
|
||||
let status = TimelineStatus {
|
||||
tenant_id: ttid.tenant_id,
|
||||
@@ -153,6 +156,7 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
|
||||
backup_lsn: inmem.backup_lsn,
|
||||
peer_horizon_lsn: inmem.peer_horizon_lsn,
|
||||
remote_consistent_lsn: tli.get_walsenders().get_remote_consistent_lsn(),
|
||||
peers: tli.get_peers(conf).await,
|
||||
walsenders: tli.get_walsenders().get_all(),
|
||||
walreceivers: tli.get_walreceivers().get_all(),
|
||||
};
|
||||
@@ -282,12 +286,14 @@ async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<B
|
||||
tenant_id: ttid.tenant_id.as_ref().to_owned(),
|
||||
timeline_id: ttid.timeline_id.as_ref().to_owned(),
|
||||
}),
|
||||
term: sk_info.term.unwrap_or(0),
|
||||
last_log_term: sk_info.last_log_term.unwrap_or(0),
|
||||
flush_lsn: sk_info.flush_lsn.0,
|
||||
commit_lsn: sk_info.commit_lsn.0,
|
||||
remote_consistent_lsn: sk_info.remote_consistent_lsn.0,
|
||||
peer_horizon_lsn: sk_info.peer_horizon_lsn.0,
|
||||
safekeeper_connstr: sk_info.safekeeper_connstr.unwrap_or_else(|| "".to_owned()),
|
||||
http_connstr: sk_info.http_connstr.unwrap_or_else(|| "".to_owned()),
|
||||
backup_lsn: sk_info.backup_lsn.0,
|
||||
local_start_lsn: sk_info.local_start_lsn.0,
|
||||
availability_zone: None,
|
||||
|
||||
@@ -21,7 +21,7 @@ use crate::safekeeper::{AcceptorProposerMessage, AppendResponse, ServerInfo};
|
||||
use crate::safekeeper::{
|
||||
AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerElected,
|
||||
};
|
||||
use crate::safekeeper::{SafeKeeperState, Term, TermHistory, TermSwitchEntry};
|
||||
use crate::safekeeper::{SafeKeeperState, Term, TermHistory, TermLsn};
|
||||
use crate::timeline::Timeline;
|
||||
use crate::GlobalTimelines;
|
||||
use postgres_backend::PostgresBackend;
|
||||
@@ -119,7 +119,7 @@ async fn send_proposer_elected(tli: &Arc<Timeline>, term: Term, lsn: Lsn) -> any
|
||||
let history = tli.get_state().await.1.acceptor_state.term_history;
|
||||
let history = history.up_to(lsn.checked_sub(1u64).unwrap());
|
||||
let mut history_entries = history.0;
|
||||
history_entries.push(TermSwitchEntry { term, lsn });
|
||||
history_entries.push(TermLsn { term, lsn });
|
||||
let history = TermHistory(history_entries);
|
||||
|
||||
let proposer_elected_request = ProposerAcceptorMessage::Elected(ProposerElected {
|
||||
|
||||
@@ -19,6 +19,7 @@ pub mod json_ctrl;
|
||||
pub mod metrics;
|
||||
pub mod pull_timeline;
|
||||
pub mod receive_wal;
|
||||
pub mod recovery;
|
||||
pub mod remove_wal;
|
||||
pub mod safekeeper;
|
||||
pub mod send_wal;
|
||||
|
||||
@@ -227,7 +227,9 @@ async fn pull_timeline(status: TimelineStatus, host: String) -> Result<Response>
|
||||
tokio::fs::create_dir_all(conf.tenant_dir(&ttid.tenant_id)).await?;
|
||||
tokio::fs::rename(tli_dir_path, &timeline_path).await?;
|
||||
|
||||
let tli = GlobalTimelines::load_timeline(ttid).context("Failed to load timeline after copy")?;
|
||||
let tli = GlobalTimelines::load_timeline(ttid)
|
||||
.await
|
||||
.context("Failed to load timeline after copy")?;
|
||||
|
||||
info!(
|
||||
"Loaded timeline {}, flush_lsn={}",
|
||||
|
||||
40
safekeeper/src/recovery.rs
Normal file
40
safekeeper/src/recovery.rs
Normal file
@@ -0,0 +1,40 @@
|
||||
//! This module implements pulling WAL from peer safekeepers if compute can't
|
||||
//! provide it, i.e. safekeeper lags too much.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use tokio::{select, time::sleep, time::Duration};
|
||||
use tracing::{info, instrument};
|
||||
|
||||
use crate::{timeline::Timeline, SafeKeeperConf};
|
||||
|
||||
/// Entrypoint for per timeline task which always runs, checking whether
|
||||
/// recovery for this safekeeper is needed and starting it if so.
|
||||
#[instrument(name = "recovery task", skip_all, fields(ttid = %tli.ttid))]
|
||||
pub async fn recovery_main(tli: Arc<Timeline>, _conf: SafeKeeperConf) {
|
||||
info!("started");
|
||||
let mut cancellation_rx = match tli.get_cancellation_rx() {
|
||||
Ok(rx) => rx,
|
||||
Err(_) => {
|
||||
info!("timeline canceled during task start");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
select! {
|
||||
_ = recovery_main_loop(tli) => { unreachable!() }
|
||||
_ = cancellation_rx.changed() => {
|
||||
info!("stopped");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const CHECK_INTERVAL_MS: u64 = 2000;
|
||||
|
||||
/// Check regularly whether we need to start recovery.
|
||||
async fn recovery_main_loop(_tli: Arc<Timeline>) {
|
||||
let check_duration = Duration::from_millis(CHECK_INTERVAL_MS);
|
||||
loop {
|
||||
sleep(check_duration).await;
|
||||
}
|
||||
}
|
||||
@@ -34,22 +34,33 @@ pub const UNKNOWN_SERVER_VERSION: u32 = 0;
|
||||
|
||||
/// Consensus logical timestamp.
|
||||
pub type Term = u64;
|
||||
const INVALID_TERM: Term = 0;
|
||||
pub const INVALID_TERM: Term = 0;
|
||||
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
|
||||
pub struct TermSwitchEntry {
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
|
||||
pub struct TermLsn {
|
||||
pub term: Term,
|
||||
pub lsn: Lsn,
|
||||
}
|
||||
|
||||
// Creation from tuple provides less typing (e.g. for unit tests).
|
||||
impl From<(Term, Lsn)> for TermLsn {
|
||||
fn from(pair: (Term, Lsn)) -> TermLsn {
|
||||
TermLsn {
|
||||
term: pair.0,
|
||||
lsn: pair.1,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
pub struct TermHistory(pub Vec<TermSwitchEntry>);
|
||||
pub struct TermHistory(pub Vec<TermLsn>);
|
||||
|
||||
impl TermHistory {
|
||||
pub fn empty() -> TermHistory {
|
||||
TermHistory(Vec::new())
|
||||
}
|
||||
|
||||
// Parse TermHistory as n_entries followed by TermSwitchEntry pairs
|
||||
// Parse TermHistory as n_entries followed by TermLsn pairs
|
||||
pub fn from_bytes(bytes: &mut Bytes) -> Result<TermHistory> {
|
||||
if bytes.remaining() < 4 {
|
||||
bail!("TermHistory misses len");
|
||||
@@ -60,7 +71,7 @@ impl TermHistory {
|
||||
if bytes.remaining() < 16 {
|
||||
bail!("TermHistory is incomplete");
|
||||
}
|
||||
res.push(TermSwitchEntry {
|
||||
res.push(TermLsn {
|
||||
term: bytes.get_u64_le(),
|
||||
lsn: bytes.get_u64_le().into(),
|
||||
})
|
||||
@@ -557,12 +568,17 @@ where
|
||||
.up_to(self.flush_lsn())
|
||||
}
|
||||
|
||||
/// Get current term.
|
||||
pub fn get_term(&self) -> Term {
|
||||
self.state.acceptor_state.term
|
||||
}
|
||||
|
||||
pub fn get_epoch(&self) -> Term {
|
||||
self.state.acceptor_state.get_epoch(self.flush_lsn())
|
||||
}
|
||||
|
||||
/// wal_store wrapper avoiding commit_lsn <= flush_lsn violation when we don't have WAL yet.
|
||||
fn flush_lsn(&self) -> Lsn {
|
||||
pub fn flush_lsn(&self) -> Lsn {
|
||||
max(self.wal_store.flush_lsn(), self.state.timeline_start_lsn)
|
||||
}
|
||||
|
||||
@@ -1138,7 +1154,7 @@ mod tests {
|
||||
let pem = ProposerElected {
|
||||
term: 1,
|
||||
start_streaming_at: Lsn(1),
|
||||
term_history: TermHistory(vec![TermSwitchEntry {
|
||||
term_history: TermHistory(vec![TermLsn {
|
||||
term: 1,
|
||||
lsn: Lsn(3),
|
||||
}]),
|
||||
|
||||
@@ -2,12 +2,12 @@
|
||||
//! with the "START_REPLICATION" message, and registry of walsenders.
|
||||
|
||||
use crate::handler::SafekeeperPostgresHandler;
|
||||
use crate::safekeeper::Term;
|
||||
use crate::safekeeper::{Term, TermLsn};
|
||||
use crate::timeline::Timeline;
|
||||
use crate::wal_service::ConnectionId;
|
||||
use crate::wal_storage::WalReader;
|
||||
use crate::GlobalTimelines;
|
||||
use anyhow::Context as AnyhowContext;
|
||||
use anyhow::{bail, Context as AnyhowContext};
|
||||
use bytes::Bytes;
|
||||
use parking_lot::Mutex;
|
||||
use postgres_backend::PostgresBackend;
|
||||
@@ -390,26 +390,25 @@ impl SafekeeperPostgresHandler {
|
||||
self.appname.clone(),
|
||||
));
|
||||
|
||||
let commit_lsn_watch_rx = tli.get_commit_lsn_watch_rx();
|
||||
|
||||
// Walproposer gets special handling: safekeeper must give proposer all
|
||||
// local WAL till the end, whether committed or not (walproposer will
|
||||
// hang otherwise). That's because walproposer runs the consensus and
|
||||
// synchronizes safekeepers on the most advanced one.
|
||||
// Walsender can operate in one of two modes which we select by
|
||||
// application_name: give only committed WAL (used by pageserver) or all
|
||||
// existing WAL (up to flush_lsn, used by walproposer or peer recovery).
|
||||
// The second case is always driven by a consensus leader which term
|
||||
// must generally be also supplied. However we're sloppy to do this in
|
||||
// walproposer recovery which will be removed soon. So TODO is to make
|
||||
// it not Option'al then.
|
||||
//
|
||||
// There is a small risk of this WAL getting concurrently garbaged if
|
||||
// another compute rises which collects majority and starts fixing log
|
||||
// on this safekeeper itself. That's ok as (old) proposer will never be
|
||||
// able to commit such WAL.
|
||||
let stop_pos: Option<Lsn> = if self.is_walproposer_recovery() {
|
||||
let wal_end = tli.get_flush_lsn().await;
|
||||
Some(wal_end)
|
||||
// Fetching WAL without term in recovery creates a small risk of this
|
||||
// WAL getting concurrently garbaged if another compute rises which
|
||||
// collects majority and starts fixing log on this safekeeper itself.
|
||||
// That's ok as (old) proposer will never be able to commit such WAL.
|
||||
let end_watch = if self.is_walproposer_recovery() {
|
||||
EndWatch::Flush(tli.get_term_flush_lsn_watch_rx())
|
||||
} else {
|
||||
None
|
||||
EndWatch::Commit(tli.get_commit_lsn_watch_rx())
|
||||
};
|
||||
|
||||
// take the latest commit_lsn if don't have stop_pos
|
||||
let end_pos = stop_pos.unwrap_or(*commit_lsn_watch_rx.borrow());
|
||||
// we don't check term here; it will be checked on first waiting/WAL reading anyway.
|
||||
let end_pos = end_watch.get();
|
||||
|
||||
if end_pos < start_pos {
|
||||
warn!(
|
||||
@@ -419,8 +418,10 @@ impl SafekeeperPostgresHandler {
|
||||
}
|
||||
|
||||
info!(
|
||||
"starting streaming from {:?} till {:?}, available WAL ends at {}",
|
||||
start_pos, stop_pos, end_pos
|
||||
"starting streaming from {:?}, available WAL ends at {}, recovery={}",
|
||||
start_pos,
|
||||
end_pos,
|
||||
matches!(end_watch, EndWatch::Flush(_))
|
||||
);
|
||||
|
||||
// switch to copy
|
||||
@@ -445,9 +446,8 @@ impl SafekeeperPostgresHandler {
|
||||
appname,
|
||||
start_pos,
|
||||
end_pos,
|
||||
stop_pos,
|
||||
term,
|
||||
commit_lsn_watch_rx,
|
||||
end_watch,
|
||||
ws_guard: ws_guard.clone(),
|
||||
wal_reader,
|
||||
send_buf: [0; MAX_SEND_SIZE],
|
||||
@@ -466,6 +466,32 @@ impl SafekeeperPostgresHandler {
|
||||
}
|
||||
}
|
||||
|
||||
/// Walsender streams either up to commit_lsn (normally) or flush_lsn in the
|
||||
/// given term (recovery by walproposer or peer safekeeper).
|
||||
enum EndWatch {
|
||||
Commit(Receiver<Lsn>),
|
||||
Flush(Receiver<TermLsn>),
|
||||
}
|
||||
|
||||
impl EndWatch {
|
||||
/// Get current end of WAL.
|
||||
fn get(&self) -> Lsn {
|
||||
match self {
|
||||
EndWatch::Commit(r) => *r.borrow(),
|
||||
EndWatch::Flush(r) => r.borrow().lsn,
|
||||
}
|
||||
}
|
||||
|
||||
/// Wait for the update.
|
||||
async fn changed(&mut self) -> anyhow::Result<()> {
|
||||
match self {
|
||||
EndWatch::Commit(r) => r.changed().await?,
|
||||
EndWatch::Flush(r) => r.changed().await?,
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// A half driving sending WAL.
|
||||
struct WalSender<'a, IO> {
|
||||
pgb: &'a mut PostgresBackend<IO>,
|
||||
@@ -480,14 +506,12 @@ struct WalSender<'a, IO> {
|
||||
// We send this LSN to the receiver as wal_end, so that it knows how much
|
||||
// WAL this safekeeper has. This LSN should be as fresh as possible.
|
||||
end_pos: Lsn,
|
||||
// If present, terminate after reaching this position; used by walproposer
|
||||
// in recovery.
|
||||
stop_pos: Option<Lsn>,
|
||||
/// When streaming uncommitted part, the term the client acts as the leader
|
||||
/// in. Streaming is stopped if local term changes to a different (higher)
|
||||
/// value.
|
||||
term: Option<Term>,
|
||||
commit_lsn_watch_rx: Receiver<Lsn>,
|
||||
/// Watch channel receiver to learn end of available WAL (and wait for its advancement).
|
||||
end_watch: EndWatch,
|
||||
ws_guard: Arc<WalSenderGuard>,
|
||||
wal_reader: WalReader,
|
||||
// buffer for readling WAL into to send it
|
||||
@@ -497,29 +521,20 @@ struct WalSender<'a, IO> {
|
||||
impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
|
||||
/// Send WAL until
|
||||
/// - an error occurs
|
||||
/// - if we are streaming to walproposer, we've streamed until stop_pos
|
||||
/// (recovery finished)
|
||||
/// - receiver is caughtup and there is no computes
|
||||
/// - receiver is caughtup and there is no computes (if streaming up to commit_lsn)
|
||||
///
|
||||
/// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ?
|
||||
/// convenience.
|
||||
async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> {
|
||||
loop {
|
||||
// If we are streaming to walproposer, check it is time to stop.
|
||||
if let Some(stop_pos) = self.stop_pos {
|
||||
if self.start_pos >= stop_pos {
|
||||
// recovery finished
|
||||
return Err(CopyStreamHandlerEnd::ServerInitiated(format!(
|
||||
"ending streaming to walproposer at {}, recovery finished",
|
||||
self.start_pos
|
||||
)));
|
||||
}
|
||||
} else {
|
||||
// Wait for the next portion if it is not there yet, or just
|
||||
// update our end of WAL available for sending value, we
|
||||
// communicate it to the receiver.
|
||||
self.wait_wal().await?;
|
||||
}
|
||||
// Wait for the next portion if it is not there yet, or just
|
||||
// update our end of WAL available for sending value, we
|
||||
// communicate it to the receiver.
|
||||
self.wait_wal().await?;
|
||||
assert!(
|
||||
self.end_pos > self.start_pos,
|
||||
"nothing to send after waiting for WAL"
|
||||
);
|
||||
|
||||
// try to send as much as available, capped by MAX_SEND_SIZE
|
||||
let mut send_size = self
|
||||
@@ -567,7 +582,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
|
||||
/// exit in the meanwhile
|
||||
async fn wait_wal(&mut self) -> Result<(), CopyStreamHandlerEnd> {
|
||||
loop {
|
||||
self.end_pos = *self.commit_lsn_watch_rx.borrow();
|
||||
self.end_pos = self.end_watch.get();
|
||||
if self.end_pos > self.start_pos {
|
||||
// We have something to send.
|
||||
trace!("got end_pos {:?}, streaming", self.end_pos);
|
||||
@@ -575,27 +590,31 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
|
||||
}
|
||||
|
||||
// Wait for WAL to appear, now self.end_pos == self.start_pos.
|
||||
if let Some(lsn) = wait_for_lsn(&mut self.commit_lsn_watch_rx, self.start_pos).await? {
|
||||
if let Some(lsn) = wait_for_lsn(&mut self.end_watch, self.term, self.start_pos).await? {
|
||||
self.end_pos = lsn;
|
||||
trace!("got end_pos {:?}, streaming", self.end_pos);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Timed out waiting for WAL, check for termination and send KA
|
||||
if let Some(remote_consistent_lsn) = self
|
||||
.ws_guard
|
||||
.walsenders
|
||||
.get_ws_remote_consistent_lsn(self.ws_guard.id)
|
||||
{
|
||||
if self.tli.should_walsender_stop(remote_consistent_lsn).await {
|
||||
// Terminate if there is nothing more to send.
|
||||
// Note that "ending streaming" part of the string is used by
|
||||
// pageserver to identify WalReceiverError::SuccessfulCompletion,
|
||||
// do not change this string without updating pageserver.
|
||||
return Err(CopyStreamHandlerEnd::ServerInitiated(format!(
|
||||
// Timed out waiting for WAL, check for termination and send KA.
|
||||
// Check for termination only if we are streaming up to commit_lsn
|
||||
// (to pageserver).
|
||||
if let EndWatch::Commit(_) = self.end_watch {
|
||||
if let Some(remote_consistent_lsn) = self
|
||||
.ws_guard
|
||||
.walsenders
|
||||
.get_ws_remote_consistent_lsn(self.ws_guard.id)
|
||||
{
|
||||
if self.tli.should_walsender_stop(remote_consistent_lsn).await {
|
||||
// Terminate if there is nothing more to send.
|
||||
// Note that "ending streaming" part of the string is used by
|
||||
// pageserver to identify WalReceiverError::SuccessfulCompletion,
|
||||
// do not change this string without updating pageserver.
|
||||
return Err(CopyStreamHandlerEnd::ServerInitiated(format!(
|
||||
"ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
|
||||
self.appname, self.start_pos,
|
||||
)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -663,22 +682,32 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
|
||||
|
||||
const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
|
||||
|
||||
/// Wait until we have commit_lsn > lsn or timeout expires. Returns
|
||||
/// - Ok(Some(commit_lsn)) if needed lsn is successfully observed;
|
||||
/// Wait until we have available WAL > start_pos or timeout expires. Returns
|
||||
/// - Ok(Some(end_pos)) if needed lsn is successfully observed;
|
||||
/// - Ok(None) if timeout expired;
|
||||
/// - Err in case of error (if watch channel is in trouble, shouldn't happen).
|
||||
async fn wait_for_lsn(rx: &mut Receiver<Lsn>, lsn: Lsn) -> anyhow::Result<Option<Lsn>> {
|
||||
/// - Err in case of error -- only if 1) term changed while fetching in recovery
|
||||
/// mode 2) watch channel closed, which must never happen.
|
||||
async fn wait_for_lsn(
|
||||
rx: &mut EndWatch,
|
||||
client_term: Option<Term>,
|
||||
start_pos: Lsn,
|
||||
) -> anyhow::Result<Option<Lsn>> {
|
||||
let res = timeout(POLL_STATE_TIMEOUT, async move {
|
||||
let mut commit_lsn;
|
||||
loop {
|
||||
rx.changed().await?;
|
||||
commit_lsn = *rx.borrow();
|
||||
if commit_lsn > lsn {
|
||||
break;
|
||||
let end_pos = rx.get();
|
||||
if end_pos > start_pos {
|
||||
return Ok(end_pos);
|
||||
}
|
||||
if let EndWatch::Flush(rx) = rx {
|
||||
let curr_term = rx.borrow().term;
|
||||
if let Some(client_term) = client_term {
|
||||
if curr_term != client_term {
|
||||
bail!("term changed: requested {}, now {}", client_term, curr_term);
|
||||
}
|
||||
}
|
||||
}
|
||||
rx.changed().await?;
|
||||
}
|
||||
|
||||
Ok(commit_lsn)
|
||||
})
|
||||
.await;
|
||||
|
||||
|
||||
@@ -3,8 +3,11 @@
|
||||
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use postgres_ffi::XLogSegNo;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::serde_as;
|
||||
use tokio::fs;
|
||||
|
||||
use serde_with::DisplayFromStr;
|
||||
use std::cmp::max;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
@@ -24,9 +27,10 @@ use storage_broker::proto::SafekeeperTimelineInfo;
|
||||
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
|
||||
|
||||
use crate::receive_wal::WalReceivers;
|
||||
use crate::recovery::recovery_main;
|
||||
use crate::safekeeper::{
|
||||
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState,
|
||||
SafekeeperMemState, ServerInfo, Term,
|
||||
SafekeeperMemState, ServerInfo, Term, TermLsn, INVALID_TERM,
|
||||
};
|
||||
use crate::send_wal::WalSenders;
|
||||
use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};
|
||||
@@ -37,18 +41,25 @@ use crate::SafeKeeperConf;
|
||||
use crate::{debug_dump, wal_storage};
|
||||
|
||||
/// Things safekeeper should know about timeline state on peers.
|
||||
#[derive(Debug, Clone)]
|
||||
#[serde_as]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct PeerInfo {
|
||||
pub sk_id: NodeId,
|
||||
/// Term of the last entry.
|
||||
_last_log_term: Term,
|
||||
/// LSN of the last record.
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
_flush_lsn: Lsn,
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub commit_lsn: Lsn,
|
||||
/// Since which LSN safekeeper has WAL. TODO: remove this once we fill new
|
||||
/// sk since backup_lsn.
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub local_start_lsn: Lsn,
|
||||
/// When info was received.
|
||||
/// When info was received. Serde annotations are not very useful but make
|
||||
/// the code compile -- we don't rely on this field externally.
|
||||
#[serde(skip)]
|
||||
#[serde(default = "Instant::now")]
|
||||
ts: Instant,
|
||||
}
|
||||
|
||||
@@ -237,8 +248,9 @@ impl SharedState {
|
||||
tenant_id: ttid.tenant_id.as_ref().to_owned(),
|
||||
timeline_id: ttid.timeline_id.as_ref().to_owned(),
|
||||
}),
|
||||
term: self.sk.state.acceptor_state.term,
|
||||
last_log_term: self.sk.get_epoch(),
|
||||
flush_lsn: self.sk.wal_store.flush_lsn().0,
|
||||
flush_lsn: self.sk.flush_lsn().0,
|
||||
// note: this value is not flushed to control file yet and can be lost
|
||||
commit_lsn: self.sk.inmem.commit_lsn.0,
|
||||
remote_consistent_lsn: remote_consistent_lsn.0,
|
||||
@@ -247,6 +259,7 @@ impl SharedState {
|
||||
.advertise_pg_addr
|
||||
.to_owned()
|
||||
.unwrap_or(conf.listen_pg_addr.clone()),
|
||||
http_connstr: conf.listen_http_addr.to_owned(),
|
||||
backup_lsn: self.sk.inmem.backup_lsn.0,
|
||||
local_start_lsn: self.sk.state.local_start_lsn.0,
|
||||
availability_zone: conf.availability_zone.clone(),
|
||||
@@ -296,6 +309,13 @@ pub struct Timeline {
|
||||
commit_lsn_watch_tx: watch::Sender<Lsn>,
|
||||
commit_lsn_watch_rx: watch::Receiver<Lsn>,
|
||||
|
||||
/// Broadcasts (current term, flush_lsn) updates, walsender is interested in
|
||||
/// them when sending in recovery mode (to walproposer or peers). Note: this
|
||||
/// is just a notification, WAL reading should always done with lock held as
|
||||
/// term can change otherwise.
|
||||
term_flush_lsn_watch_tx: watch::Sender<TermLsn>,
|
||||
term_flush_lsn_watch_rx: watch::Receiver<TermLsn>,
|
||||
|
||||
/// Safekeeper and other state, that should remain consistent and
|
||||
/// synchronized with the disk. This is tokio mutex as we write WAL to disk
|
||||
/// while holding it, ensuring that consensus checks are in order.
|
||||
@@ -317,16 +337,20 @@ pub struct Timeline {
|
||||
impl Timeline {
|
||||
/// Load existing timeline from disk.
|
||||
pub fn load_timeline(
|
||||
conf: SafeKeeperConf,
|
||||
conf: &SafeKeeperConf,
|
||||
ttid: TenantTimelineId,
|
||||
wal_backup_launcher_tx: Sender<TenantTimelineId>,
|
||||
) -> Result<Timeline> {
|
||||
let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
|
||||
|
||||
let shared_state = SharedState::restore(&conf, &ttid)?;
|
||||
let shared_state = SharedState::restore(conf, &ttid)?;
|
||||
let rcl = shared_state.sk.state.remote_consistent_lsn;
|
||||
let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
|
||||
watch::channel(shared_state.sk.state.commit_lsn);
|
||||
let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) = watch::channel(TermLsn::from((
|
||||
shared_state.sk.get_term(),
|
||||
shared_state.sk.flush_lsn(),
|
||||
)));
|
||||
let (cancellation_tx, cancellation_rx) = watch::channel(false);
|
||||
|
||||
Ok(Timeline {
|
||||
@@ -334,6 +358,8 @@ impl Timeline {
|
||||
wal_backup_launcher_tx,
|
||||
commit_lsn_watch_tx,
|
||||
commit_lsn_watch_rx,
|
||||
term_flush_lsn_watch_tx,
|
||||
term_flush_lsn_watch_rx,
|
||||
mutex: Mutex::new(shared_state),
|
||||
walsenders: WalSenders::new(rcl),
|
||||
walreceivers: WalReceivers::new(),
|
||||
@@ -345,7 +371,7 @@ impl Timeline {
|
||||
|
||||
/// Create a new timeline, which is not yet persisted to disk.
|
||||
pub fn create_empty(
|
||||
conf: SafeKeeperConf,
|
||||
conf: &SafeKeeperConf,
|
||||
ttid: TenantTimelineId,
|
||||
wal_backup_launcher_tx: Sender<TenantTimelineId>,
|
||||
server_info: ServerInfo,
|
||||
@@ -353,6 +379,8 @@ impl Timeline {
|
||||
local_start_lsn: Lsn,
|
||||
) -> Result<Timeline> {
|
||||
let (commit_lsn_watch_tx, commit_lsn_watch_rx) = watch::channel(Lsn::INVALID);
|
||||
let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) =
|
||||
watch::channel(TermLsn::from((INVALID_TERM, Lsn::INVALID)));
|
||||
let (cancellation_tx, cancellation_rx) = watch::channel(false);
|
||||
let state = SafeKeeperState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn);
|
||||
|
||||
@@ -361,7 +389,9 @@ impl Timeline {
|
||||
wal_backup_launcher_tx,
|
||||
commit_lsn_watch_tx,
|
||||
commit_lsn_watch_rx,
|
||||
mutex: Mutex::new(SharedState::create_new(&conf, &ttid, state)?),
|
||||
term_flush_lsn_watch_tx,
|
||||
term_flush_lsn_watch_rx,
|
||||
mutex: Mutex::new(SharedState::create_new(conf, &ttid, state)?),
|
||||
walsenders: WalSenders::new(Lsn(0)),
|
||||
walreceivers: WalReceivers::new(),
|
||||
cancellation_rx,
|
||||
@@ -370,12 +400,16 @@ impl Timeline {
|
||||
})
|
||||
}
|
||||
|
||||
/// Initialize fresh timeline on disk and start background tasks. If bootstrap
|
||||
/// Initialize fresh timeline on disk and start background tasks. If init
|
||||
/// fails, timeline is cancelled and cannot be used anymore.
|
||||
///
|
||||
/// Bootstrap is transactional, so if it fails, created files will be deleted,
|
||||
/// Init is transactional, so if it fails, created files will be deleted,
|
||||
/// and state on disk should remain unchanged.
|
||||
pub async fn bootstrap(&self, shared_state: &mut MutexGuard<'_, SharedState>) -> Result<()> {
|
||||
pub async fn init_new(
|
||||
self: &Arc<Timeline>,
|
||||
shared_state: &mut MutexGuard<'_, SharedState>,
|
||||
conf: &SafeKeeperConf,
|
||||
) -> Result<()> {
|
||||
match fs::metadata(&self.timeline_dir).await {
|
||||
Ok(_) => {
|
||||
// Timeline directory exists on disk, we should leave state unchanged
|
||||
@@ -391,7 +425,7 @@ impl Timeline {
|
||||
// Create timeline directory.
|
||||
fs::create_dir_all(&self.timeline_dir).await?;
|
||||
|
||||
// Write timeline to disk and TODO: start background tasks.
|
||||
// Write timeline to disk and start background tasks.
|
||||
if let Err(e) = shared_state.sk.persist().await {
|
||||
// Bootstrap failed, cancel timeline and remove timeline directory.
|
||||
self.cancel(shared_state);
|
||||
@@ -405,12 +439,16 @@ impl Timeline {
|
||||
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
// TODO: add more initialization steps here
|
||||
self.update_status(shared_state);
|
||||
self.bootstrap(conf);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Bootstrap new or existing timeline starting background stasks.
|
||||
pub fn bootstrap(self: &Arc<Timeline>, conf: &SafeKeeperConf) {
|
||||
// Start recovery task which always runs on the timeline.
|
||||
tokio::spawn(recovery_main(self.clone(), conf.clone()));
|
||||
}
|
||||
|
||||
/// Delete timeline from disk completely, by removing timeline directory. Background
|
||||
/// timeline activities will stop eventually.
|
||||
pub async fn delete_from_disk(
|
||||
@@ -444,6 +482,16 @@ impl Timeline {
|
||||
*self.cancellation_rx.borrow()
|
||||
}
|
||||
|
||||
/// Returns watch channel which gets value when timeline is cancelled. It is
|
||||
/// guaranteed to have not cancelled value observed (errors otherwise).
|
||||
pub fn get_cancellation_rx(&self) -> Result<watch::Receiver<bool>> {
|
||||
let rx = self.cancellation_rx.clone();
|
||||
if *rx.borrow() {
|
||||
bail!(TimelineError::Cancelled(self.ttid));
|
||||
}
|
||||
Ok(rx)
|
||||
}
|
||||
|
||||
/// Take a writing mutual exclusive lock on timeline shared_state.
|
||||
pub async fn write_shared_state(&self) -> MutexGuard<SharedState> {
|
||||
self.mutex.lock().await
|
||||
@@ -520,6 +568,11 @@ impl Timeline {
|
||||
self.commit_lsn_watch_rx.clone()
|
||||
}
|
||||
|
||||
/// Returns term_flush_lsn watch channel.
|
||||
pub fn get_term_flush_lsn_watch_rx(&self) -> watch::Receiver<TermLsn> {
|
||||
self.term_flush_lsn_watch_rx.clone()
|
||||
}
|
||||
|
||||
/// Pass arrived message to the safekeeper.
|
||||
pub async fn process_msg(
|
||||
&self,
|
||||
@@ -531,6 +584,7 @@ impl Timeline {
|
||||
|
||||
let mut rmsg: Option<AcceptorProposerMessage>;
|
||||
let commit_lsn: Lsn;
|
||||
let term_flush_lsn: TermLsn;
|
||||
{
|
||||
let mut shared_state = self.write_shared_state().await;
|
||||
rmsg = shared_state.sk.process_msg(msg).await?;
|
||||
@@ -544,8 +598,11 @@ impl Timeline {
|
||||
}
|
||||
|
||||
commit_lsn = shared_state.sk.inmem.commit_lsn;
|
||||
term_flush_lsn =
|
||||
TermLsn::from((shared_state.sk.get_term(), shared_state.sk.flush_lsn()));
|
||||
}
|
||||
self.commit_lsn_watch_tx.send(commit_lsn)?;
|
||||
self.term_flush_lsn_watch_tx.send(term_flush_lsn)?;
|
||||
Ok(rmsg)
|
||||
}
|
||||
|
||||
|
||||
@@ -11,7 +11,7 @@ use serde::Serialize;
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
use std::str::FromStr;
|
||||
use std::sync::{Arc, Mutex, MutexGuard};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tracing::*;
|
||||
use utils::id::{TenantId, TenantTimelineId, TimelineId};
|
||||
@@ -71,19 +71,23 @@ pub struct GlobalTimelines;
|
||||
|
||||
impl GlobalTimelines {
|
||||
/// Inject dependencies needed for the timeline constructors and load all timelines to memory.
|
||||
pub fn init(
|
||||
pub async fn init(
|
||||
conf: SafeKeeperConf,
|
||||
wal_backup_launcher_tx: Sender<TenantTimelineId>,
|
||||
) -> Result<()> {
|
||||
let mut state = TIMELINES_STATE.lock().unwrap();
|
||||
assert!(state.wal_backup_launcher_tx.is_none());
|
||||
state.wal_backup_launcher_tx = Some(wal_backup_launcher_tx);
|
||||
state.conf = Some(conf);
|
||||
// clippy isn't smart enough to understand that drop(state) releases the
|
||||
// lock, so use explicit block
|
||||
let tenants_dir = {
|
||||
let mut state = TIMELINES_STATE.lock().unwrap();
|
||||
assert!(state.wal_backup_launcher_tx.is_none());
|
||||
state.wal_backup_launcher_tx = Some(wal_backup_launcher_tx);
|
||||
state.conf = Some(conf);
|
||||
|
||||
// Iterate through all directories and load tenants for all directories
|
||||
// named as a valid tenant_id.
|
||||
// Iterate through all directories and load tenants for all directories
|
||||
// named as a valid tenant_id.
|
||||
state.get_conf().workdir.clone()
|
||||
};
|
||||
let mut tenant_count = 0;
|
||||
let tenants_dir = state.get_conf().workdir.clone();
|
||||
for tenants_dir_entry in std::fs::read_dir(&tenants_dir)
|
||||
.with_context(|| format!("failed to list tenants dir {}", tenants_dir.display()))?
|
||||
{
|
||||
@@ -93,7 +97,7 @@ impl GlobalTimelines {
|
||||
TenantId::from_str(tenants_dir_entry.file_name().to_str().unwrap_or(""))
|
||||
{
|
||||
tenant_count += 1;
|
||||
GlobalTimelines::load_tenant_timelines(&mut state, tenant_id)?;
|
||||
GlobalTimelines::load_tenant_timelines(tenant_id).await?;
|
||||
}
|
||||
}
|
||||
Err(e) => error!(
|
||||
@@ -108,7 +112,7 @@ impl GlobalTimelines {
|
||||
info!(
|
||||
"found {} tenants directories, successfully loaded {} timelines",
|
||||
tenant_count,
|
||||
state.timelines.len()
|
||||
TIMELINES_STATE.lock().unwrap().timelines.len()
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
@@ -116,17 +120,21 @@ impl GlobalTimelines {
|
||||
/// Loads all timelines for the given tenant to memory. Returns fs::read_dir
|
||||
/// errors if any.
|
||||
///
|
||||
/// Note: This function (and all reading/loading below) is sync because
|
||||
/// timelines are loaded while holding GlobalTimelinesState lock. Which is
|
||||
/// fine as this is called only from single threaded main runtime on boot,
|
||||
/// but clippy complains anyway, and suppressing that isn't trivial as async
|
||||
/// is the keyword, ha. That only other user is pull_timeline.rs for which
|
||||
/// being blocked is not that bad, and we can do spawn_blocking.
|
||||
fn load_tenant_timelines(
|
||||
state: &mut MutexGuard<'_, GlobalTimelinesState>,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<()> {
|
||||
let timelines_dir = state.get_conf().tenant_dir(&tenant_id);
|
||||
/// It is async for update_status_notify sake. Since TIMELINES_STATE lock is
|
||||
/// sync and there is no important reason to make it async (it is always
|
||||
/// held for a short while) we just lock and unlock it for each timeline --
|
||||
/// this function is called during init when nothing else is running, so
|
||||
/// this is fine.
|
||||
async fn load_tenant_timelines(tenant_id: TenantId) -> Result<()> {
|
||||
let (conf, wal_backup_launcher_tx) = {
|
||||
let state = TIMELINES_STATE.lock().unwrap();
|
||||
(
|
||||
state.get_conf().clone(),
|
||||
state.wal_backup_launcher_tx.as_ref().unwrap().clone(),
|
||||
)
|
||||
};
|
||||
|
||||
let timelines_dir = conf.tenant_dir(&tenant_id);
|
||||
for timelines_dir_entry in std::fs::read_dir(&timelines_dir)
|
||||
.with_context(|| format!("failed to list timelines dir {}", timelines_dir.display()))?
|
||||
{
|
||||
@@ -136,13 +144,16 @@ impl GlobalTimelines {
|
||||
TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or(""))
|
||||
{
|
||||
let ttid = TenantTimelineId::new(tenant_id, timeline_id);
|
||||
match Timeline::load_timeline(
|
||||
state.get_conf().clone(),
|
||||
ttid,
|
||||
state.wal_backup_launcher_tx.as_ref().unwrap().clone(),
|
||||
) {
|
||||
match Timeline::load_timeline(&conf, ttid, wal_backup_launcher_tx.clone()) {
|
||||
Ok(timeline) => {
|
||||
state.timelines.insert(ttid, Arc::new(timeline));
|
||||
let tli = Arc::new(timeline);
|
||||
TIMELINES_STATE
|
||||
.lock()
|
||||
.unwrap()
|
||||
.timelines
|
||||
.insert(ttid, tli.clone());
|
||||
tli.bootstrap(&conf);
|
||||
tli.update_status_notify().await.unwrap();
|
||||
}
|
||||
// If we can't load a timeline, it's most likely because of a corrupted
|
||||
// directory. We will log an error and won't allow to delete/recreate
|
||||
@@ -168,18 +179,22 @@ impl GlobalTimelines {
|
||||
}
|
||||
|
||||
/// Load timeline from disk to the memory.
|
||||
pub fn load_timeline(ttid: TenantTimelineId) -> Result<Arc<Timeline>> {
|
||||
pub async fn load_timeline(ttid: TenantTimelineId) -> Result<Arc<Timeline>> {
|
||||
let (conf, wal_backup_launcher_tx) = TIMELINES_STATE.lock().unwrap().get_dependencies();
|
||||
|
||||
match Timeline::load_timeline(conf, ttid, wal_backup_launcher_tx) {
|
||||
match Timeline::load_timeline(&conf, ttid, wal_backup_launcher_tx) {
|
||||
Ok(timeline) => {
|
||||
let tli = Arc::new(timeline);
|
||||
|
||||
// TODO: prevent concurrent timeline creation/loading
|
||||
TIMELINES_STATE
|
||||
.lock()
|
||||
.unwrap()
|
||||
.timelines
|
||||
.insert(ttid, tli.clone());
|
||||
|
||||
tli.bootstrap(&conf);
|
||||
|
||||
Ok(tli)
|
||||
}
|
||||
// If we can't load a timeline, it's bad. Caller will figure it out.
|
||||
@@ -217,7 +232,7 @@ impl GlobalTimelines {
|
||||
info!("creating new timeline {}", ttid);
|
||||
|
||||
let timeline = Arc::new(Timeline::create_empty(
|
||||
conf,
|
||||
&conf,
|
||||
ttid,
|
||||
wal_backup_launcher_tx,
|
||||
server_info,
|
||||
@@ -240,23 +255,24 @@ impl GlobalTimelines {
|
||||
// Write the new timeline to the disk and start background workers.
|
||||
// Bootstrap is transactional, so if it fails, the timeline will be deleted,
|
||||
// and the state on disk should remain unchanged.
|
||||
if let Err(e) = timeline.bootstrap(&mut shared_state).await {
|
||||
// Note: the most likely reason for bootstrap failure is that the timeline
|
||||
if let Err(e) = timeline.init_new(&mut shared_state, &conf).await {
|
||||
// Note: the most likely reason for init failure is that the timeline
|
||||
// directory already exists on disk. This happens when timeline is corrupted
|
||||
// and wasn't loaded from disk on startup because of that. We want to preserve
|
||||
// the timeline directory in this case, for further inspection.
|
||||
|
||||
// TODO: this is an unusual error, perhaps we should send it to sentry
|
||||
// TODO: compute will try to create timeline every second, we should add backoff
|
||||
error!("failed to bootstrap timeline {}: {}", ttid, e);
|
||||
error!("failed to init new timeline {}: {}", ttid, e);
|
||||
|
||||
// Timeline failed to bootstrap, it cannot be used. Remove it from the map.
|
||||
// Timeline failed to init, it cannot be used. Remove it from the map.
|
||||
TIMELINES_STATE.lock().unwrap().timelines.remove(&ttid);
|
||||
return Err(e);
|
||||
}
|
||||
// We are done with bootstrap, release the lock, return the timeline.
|
||||
// {} block forces release before .await
|
||||
}
|
||||
timeline.update_status_notify().await?;
|
||||
timeline.wal_backup_launcher_tx.send(timeline.ttid).await?;
|
||||
Ok(timeline)
|
||||
}
|
||||
|
||||
@@ -12,25 +12,26 @@ import psycopg2.extras
|
||||
# We call the test "flaky" if it failed at least once on the main branch in the last N=10 days.
|
||||
FLAKY_TESTS_QUERY = """
|
||||
SELECT
|
||||
DISTINCT parent_suite, suite, test
|
||||
DISTINCT parent_suite, suite, REGEXP_REPLACE(test, '(release|debug)-pg(\\d+)-?', '') as deparametrized_test
|
||||
FROM
|
||||
(
|
||||
SELECT
|
||||
revision,
|
||||
jsonb_array_elements(data -> 'children') -> 'name' as parent_suite,
|
||||
jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'name' as suite,
|
||||
jsonb_array_elements(jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'children') -> 'name' as test,
|
||||
jsonb_array_elements(jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'children') -> 'status' as status,
|
||||
jsonb_array_elements(jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'children') -> 'retriesStatusChange' as retries_status_change,
|
||||
to_timestamp((jsonb_array_elements(jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'children') -> 'time' -> 'start')::bigint / 1000)::date as timestamp
|
||||
reference,
|
||||
jsonb_array_elements(data -> 'children') ->> 'name' as parent_suite,
|
||||
jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') ->> 'name' as suite,
|
||||
jsonb_array_elements(jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'children') ->> 'name' as test,
|
||||
jsonb_array_elements(jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'children') ->> 'status' as status,
|
||||
jsonb_array_elements(jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'children') ->> 'retriesStatusChange' as retries_status_change,
|
||||
to_timestamp((jsonb_array_elements(jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'children') -> 'time' ->> 'start')::bigint / 1000)::date as timestamp
|
||||
FROM
|
||||
regress_test_results
|
||||
WHERE
|
||||
reference = 'refs/heads/main'
|
||||
) data
|
||||
WHERE
|
||||
timestamp > CURRENT_DATE - INTERVAL '%s' day
|
||||
AND (status::text IN ('"failed"', '"broken"') OR retries_status_change::boolean)
|
||||
AND (
|
||||
(status IN ('failed', 'broken') AND reference = 'refs/heads/main')
|
||||
OR retries_status_change::boolean
|
||||
)
|
||||
;
|
||||
"""
|
||||
|
||||
@@ -40,6 +41,9 @@ def main(args: argparse.Namespace):
|
||||
interval_days = args.days
|
||||
output = args.output
|
||||
|
||||
build_type = args.build_type
|
||||
pg_version = args.pg_version
|
||||
|
||||
res: DefaultDict[str, DefaultDict[str, Dict[str, bool]]]
|
||||
res = defaultdict(lambda: defaultdict(dict))
|
||||
|
||||
@@ -55,8 +59,21 @@ def main(args: argparse.Namespace):
|
||||
rows = []
|
||||
|
||||
for row in rows:
|
||||
logging.info(f"\t{row['parent_suite'].replace('.', '/')}/{row['suite']}.py::{row['test']}")
|
||||
res[row["parent_suite"]][row["suite"]][row["test"]] = True
|
||||
# We don't want to automatically rerun tests in a performance suite
|
||||
if row["parent_suite"] != "test_runner.regress":
|
||||
continue
|
||||
|
||||
deparametrized_test = row["deparametrized_test"]
|
||||
dash_if_needed = "" if deparametrized_test.endswith("[]") else "-"
|
||||
parametrized_test = deparametrized_test.replace(
|
||||
"[",
|
||||
f"[{build_type}-pg{pg_version}{dash_if_needed}",
|
||||
)
|
||||
res[row["parent_suite"]][row["suite"]][parametrized_test] = True
|
||||
|
||||
logging.info(
|
||||
f"\t{row['parent_suite'].replace('.', '/')}/{row['suite']}.py::{parametrized_test}"
|
||||
)
|
||||
|
||||
logging.info(f"saving results to {output.name}")
|
||||
json.dump(res, output, indent=2)
|
||||
@@ -77,6 +94,18 @@ if __name__ == "__main__":
|
||||
type=int,
|
||||
help="how many days to look back for flaky tests (default: 10)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--build-type",
|
||||
required=True,
|
||||
type=str,
|
||||
help="for which build type to create list of flaky tests (debug or release)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--pg-version",
|
||||
required=True,
|
||||
type=int,
|
||||
help="for which Postgres version to create list of flaky tests (14, 15, etc.)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"connstr",
|
||||
help="connection string to the test results database",
|
||||
|
||||
@@ -125,6 +125,7 @@ async fn publish(client: Option<BrokerClientChannel>, n_keys: u64) {
|
||||
tenant_id: vec![0xFF; 16],
|
||||
timeline_id: tli_from_u64(counter % n_keys),
|
||||
}),
|
||||
term: 0,
|
||||
last_log_term: 0,
|
||||
flush_lsn: counter,
|
||||
commit_lsn: 2,
|
||||
@@ -132,6 +133,7 @@ async fn publish(client: Option<BrokerClientChannel>, n_keys: u64) {
|
||||
remote_consistent_lsn: 4,
|
||||
peer_horizon_lsn: 5,
|
||||
safekeeper_connstr: "zenith-1-sk-1.local:7676".to_owned(),
|
||||
http_connstr: "zenith-1-sk-1.local:7677".to_owned(),
|
||||
local_start_lsn: 0,
|
||||
availability_zone: None,
|
||||
};
|
||||
|
||||
@@ -22,6 +22,8 @@ message SubscribeSafekeeperInfoRequest {
|
||||
message SafekeeperTimelineInfo {
|
||||
uint64 safekeeper_id = 1;
|
||||
TenantTimelineId tenant_timeline_id = 2;
|
||||
// Safekeeper term
|
||||
uint64 term = 12;
|
||||
// Term of the last entry.
|
||||
uint64 last_log_term = 3;
|
||||
// LSN of the last record.
|
||||
@@ -36,6 +38,8 @@ message SafekeeperTimelineInfo {
|
||||
uint64 local_start_lsn = 9;
|
||||
// A connection string to use for WAL receiving.
|
||||
string safekeeper_connstr = 10;
|
||||
// HTTP endpoint connection string
|
||||
string http_connstr = 13;
|
||||
// Availability zone of a safekeeper.
|
||||
optional string availability_zone = 11;
|
||||
}
|
||||
|
||||
@@ -519,6 +519,7 @@ mod tests {
|
||||
tenant_id: vec![0x00; 16],
|
||||
timeline_id,
|
||||
}),
|
||||
term: 0,
|
||||
last_log_term: 0,
|
||||
flush_lsn: 1,
|
||||
commit_lsn: 2,
|
||||
@@ -526,6 +527,7 @@ mod tests {
|
||||
remote_consistent_lsn: 4,
|
||||
peer_horizon_lsn: 5,
|
||||
safekeeper_connstr: "neon-1-sk-1.local:7676".to_owned(),
|
||||
http_connstr: "neon-1-sk-1.local:7677".to_owned(),
|
||||
local_start_lsn: 0,
|
||||
availability_zone: None,
|
||||
}
|
||||
|
||||
@@ -233,10 +233,19 @@ if TYPE_CHECKING:
|
||||
|
||||
def assert_prefix_empty(neon_env_builder: "NeonEnvBuilder", prefix: Optional[str] = None):
|
||||
response = list_prefix(neon_env_builder, prefix)
|
||||
objects = response.get("Contents")
|
||||
assert (
|
||||
response["KeyCount"] == 0
|
||||
), f"remote dir with prefix {prefix} is not empty after deletion: {objects}"
|
||||
keys = response["KeyCount"]
|
||||
objects = response.get("Contents", [])
|
||||
|
||||
if keys != 0 and len(objects) == 0:
|
||||
# this has been seen in one case with mock_s3:
|
||||
# https://neon-github-public-dev.s3.amazonaws.com/reports/pr-4938/6000769714/index.html#suites/3556ed71f2d69272a7014df6dcb02317/ca01e4f4d8d9a11f
|
||||
# looking at moto impl, it might be there's a race with common prefix (sub directory) not going away with deletes
|
||||
common_prefixes = response.get("CommonPrefixes", [])
|
||||
log.warn(
|
||||
f"contradicting ListObjectsV2 response with KeyCount={keys} and Contents={objects}, CommonPrefixes={common_prefixes}"
|
||||
)
|
||||
|
||||
assert keys == 0, f"remote dir with prefix {prefix} is not empty after deletion: {objects}"
|
||||
|
||||
|
||||
def assert_prefix_not_empty(neon_env_builder: "NeonEnvBuilder", prefix: Optional[str] = None):
|
||||
|
||||
Reference in New Issue
Block a user