mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-29 11:00:38 +00:00
Compare commits
286 Commits
problame/r
...
bh-ignore-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
22a787ec9c | ||
|
|
9c662e65a2 | ||
|
|
01180666b0 | ||
|
|
5667372c61 | ||
|
|
61f99d703d | ||
|
|
24014d8383 | ||
|
|
e3ded64d1b | ||
|
|
9b714c8572 | ||
|
|
29fb675432 | ||
|
|
ca07fa5f8b | ||
|
|
5d039c6e9b | ||
|
|
36e1100949 | ||
|
|
59c5b374de | ||
|
|
0f3b87d023 | ||
|
|
c19625a29c | ||
|
|
f2e5212fed | ||
|
|
568bc1fde3 | ||
|
|
45e929c069 | ||
|
|
6b980f38da | ||
|
|
f0d8bd7855 | ||
|
|
6c94269c32 | ||
|
|
edc691647d | ||
|
|
855d7b4781 | ||
|
|
c49c9707ce | ||
|
|
2227540a0d | ||
|
|
f1347f2417 | ||
|
|
30b295b017 | ||
|
|
1cef395266 | ||
|
|
78d160f76d | ||
|
|
b9238059d6 | ||
|
|
d0cb4b88c8 | ||
|
|
1ec3e39d4e | ||
|
|
a1a74eef2c | ||
|
|
90e689adda | ||
|
|
f0b2d4b053 | ||
|
|
299d9474c9 | ||
|
|
7234208b36 | ||
|
|
93450f11f5 | ||
|
|
2f0f9edf33 | ||
|
|
d424f2b7c8 | ||
|
|
21315e80bc | ||
|
|
483b66d383 | ||
|
|
aa72a22661 | ||
|
|
5c0264b591 | ||
|
|
9f13277729 | ||
|
|
54aa319805 | ||
|
|
4a227484bf | ||
|
|
2f83f85291 | ||
|
|
d6cfcb0d93 | ||
|
|
392843ad2a | ||
|
|
bd4dae8f4a | ||
|
|
b05fe53cfd | ||
|
|
c13a2f0df1 | ||
|
|
39be366fc5 | ||
|
|
6eda0a3158 | ||
|
|
306c7a1813 | ||
|
|
80be423a58 | ||
|
|
5dcfef82f2 | ||
|
|
e67b8f69c0 | ||
|
|
e546872ab4 | ||
|
|
322ea1cf7c | ||
|
|
3633742de9 | ||
|
|
079d3a37ba | ||
|
|
a46e77b476 | ||
|
|
a92702b01e | ||
|
|
8ff3253f20 | ||
|
|
04b82c92a7 | ||
|
|
e5bf423e68 | ||
|
|
60af392e45 | ||
|
|
661fc41e71 | ||
|
|
702c488f32 | ||
|
|
45c5122754 | ||
|
|
558394f710 | ||
|
|
73b0898608 | ||
|
|
e65be4c2dc | ||
|
|
40087b8164 | ||
|
|
c762b59483 | ||
|
|
5d71601ca9 | ||
|
|
a113c3e433 | ||
|
|
e81fc598f4 | ||
|
|
48b845fa76 | ||
|
|
27096858dc | ||
|
|
4430d0ae7d | ||
|
|
6e183aa0de | ||
|
|
fd6d0b7635 | ||
|
|
3710c32aae | ||
|
|
be83bee49d | ||
|
|
cf28e5922a | ||
|
|
7d384d6953 | ||
|
|
4b3b37b912 | ||
|
|
1d8d200f4d | ||
|
|
0d80d6ce18 | ||
|
|
f653ee039f | ||
|
|
e614a95853 | ||
|
|
850db4cc13 | ||
|
|
8a316b1277 | ||
|
|
4d13bae449 | ||
|
|
49377abd98 | ||
|
|
a6b2f4e54e | ||
|
|
face60d50b | ||
|
|
9768aa27f2 | ||
|
|
96b2e575e1 | ||
|
|
7222777784 | ||
|
|
5469fdede0 | ||
|
|
72aa6b9fdd | ||
|
|
ae0634b7be | ||
|
|
70711f32fa | ||
|
|
52a88af0aa | ||
|
|
b7a43bf817 | ||
|
|
dce91b33a4 | ||
|
|
23ee4f3050 | ||
|
|
46857e8282 | ||
|
|
368ab0ce54 | ||
|
|
a5987eebfd | ||
|
|
6686ede30f | ||
|
|
373c7057cc | ||
|
|
7d6ec16166 | ||
|
|
0e6fdc8a58 | ||
|
|
521438a5c6 | ||
|
|
07d7874bc8 | ||
|
|
1804111a02 | ||
|
|
cd0178efed | ||
|
|
333574be57 | ||
|
|
79a799a143 | ||
|
|
9da06af6c9 | ||
|
|
ce1753d036 | ||
|
|
67db8432b4 | ||
|
|
4e2e44e524 | ||
|
|
ed786104f3 | ||
|
|
84b74f2bd1 | ||
|
|
fec2ad6283 | ||
|
|
98eebd4682 | ||
|
|
2f74287c9b | ||
|
|
aee1bf95e3 | ||
|
|
b9de9d75ff | ||
|
|
7943b709e6 | ||
|
|
d7d066d493 | ||
|
|
e78ac22107 | ||
|
|
76a8f2bb44 | ||
|
|
8d59a8581f | ||
|
|
b1ddd01289 | ||
|
|
6eae4fc9aa | ||
|
|
765455bca2 | ||
|
|
4204960942 | ||
|
|
67345d66ea | ||
|
|
2266ee5971 | ||
|
|
b58445d855 | ||
|
|
36050e7f3d | ||
|
|
33360ed96d | ||
|
|
39a28d1108 | ||
|
|
efa6aa134f | ||
|
|
2c724e56e2 | ||
|
|
feff887c6f | ||
|
|
353d915fcf | ||
|
|
2e38098cbc | ||
|
|
a6fe5ea1ac | ||
|
|
05b0aed0c1 | ||
|
|
cd1705357d | ||
|
|
6bc7561290 | ||
|
|
fbd3ac14b5 | ||
|
|
e437787c8f | ||
|
|
3460dbf90b | ||
|
|
6b89d99677 | ||
|
|
6cc8ea86e4 | ||
|
|
e62a492d6f | ||
|
|
a475cdf642 | ||
|
|
7002c79a47 | ||
|
|
ee6cf357b4 | ||
|
|
e5c2086b5f | ||
|
|
5f1208296a | ||
|
|
88e8e473cd | ||
|
|
b0a77844f6 | ||
|
|
1baf464307 | ||
|
|
e9b8e81cea | ||
|
|
85d6194aa4 | ||
|
|
333a7a68ef | ||
|
|
6aa4e41bee | ||
|
|
840183e51f | ||
|
|
cbccc94b03 | ||
|
|
fce227df22 | ||
|
|
bd787e800f | ||
|
|
4a7704b4a3 | ||
|
|
ff1119da66 | ||
|
|
4c3ba1627b | ||
|
|
1407174fb2 | ||
|
|
ec9dcb1889 | ||
|
|
d11d781afc | ||
|
|
4e44565b71 | ||
|
|
4ed51ad33b | ||
|
|
1c1ebe5537 | ||
|
|
c19cb7f386 | ||
|
|
4b97d31b16 | ||
|
|
923ade3dd7 | ||
|
|
b04e711975 | ||
|
|
afd0a6b39a | ||
|
|
99752286d8 | ||
|
|
15df93363c | ||
|
|
bc0ab741af | ||
|
|
51d9dfeaa3 | ||
|
|
f63cb18155 | ||
|
|
0de603d88e | ||
|
|
240913912a | ||
|
|
91a4ea0de2 | ||
|
|
8608704f49 | ||
|
|
efef68ce99 | ||
|
|
8daefd24da | ||
|
|
46cc8b7982 | ||
|
|
38cd90dd0c | ||
|
|
a51b269f15 | ||
|
|
43bf6d0a0f | ||
|
|
15273a9b66 | ||
|
|
78aca668d0 | ||
|
|
acbf4148ea | ||
|
|
6508540561 | ||
|
|
a41b5244a8 | ||
|
|
2b3189be95 | ||
|
|
248563c595 | ||
|
|
14cd6ca933 | ||
|
|
eb36403e71 | ||
|
|
3c6f779698 | ||
|
|
f67f0c1c11 | ||
|
|
edb02d3299 | ||
|
|
664a69e65b | ||
|
|
478322ebf9 | ||
|
|
802f174072 | ||
|
|
47f9890bae | ||
|
|
262265daad | ||
|
|
300da5b872 | ||
|
|
7b22b5c433 | ||
|
|
ffca97bc1e | ||
|
|
cb356f3259 | ||
|
|
c85374295f | ||
|
|
4992160677 | ||
|
|
bd535b3371 | ||
|
|
d90c5a03af | ||
|
|
2d02cc9079 | ||
|
|
49ad94b99f | ||
|
|
948a217398 | ||
|
|
125381eae7 | ||
|
|
cd01bbc715 | ||
|
|
d8b5e3b88d | ||
|
|
06d25f2186 | ||
|
|
f759b561f3 | ||
|
|
ece0555600 | ||
|
|
73ea0a0b01 | ||
|
|
d8f6d6fd6f | ||
|
|
d24de169a7 | ||
|
|
0816168296 | ||
|
|
277b44d57a | ||
|
|
68c2c3880e | ||
|
|
49da498f65 | ||
|
|
2c76ba3dd7 | ||
|
|
dbe3dc69ad | ||
|
|
8e5bb3ed49 | ||
|
|
ab0be7b8da | ||
|
|
b4c55f5d24 | ||
|
|
ede70d833c | ||
|
|
70c3d18bb0 | ||
|
|
7a491f52c4 | ||
|
|
323c4ecb4f | ||
|
|
3d2466607e | ||
|
|
ed478b39f4 | ||
|
|
91585a558d | ||
|
|
93467eae1f | ||
|
|
f3aac81d19 | ||
|
|
979ad60c19 | ||
|
|
9316cb1b1f | ||
|
|
e7939a527a | ||
|
|
36d26665e1 | ||
|
|
873347f977 | ||
|
|
e814ac16f9 | ||
|
|
ad3055d386 | ||
|
|
94e03eb452 | ||
|
|
380f26ef79 | ||
|
|
3c5b7f59d7 | ||
|
|
fee89f80b5 | ||
|
|
41cce8eaf1 | ||
|
|
f88fe0218d | ||
|
|
cc856eca85 | ||
|
|
cf350c6002 | ||
|
|
0ce6b6a0a3 | ||
|
|
73f247d537 | ||
|
|
960be82183 | ||
|
|
806e5a6c19 | ||
|
|
8d5df07cce | ||
|
|
df7a9d1407 |
15
Cargo.lock
generated
15
Cargo.lock
generated
@@ -1813,6 +1813,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e875f1719c16de097dee81ed675e2d9bb63096823ed3f0ca827b7dea3028bbbb"
|
||||
dependencies = [
|
||||
"enumset_derive",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2757,6 +2758,17 @@ version = "1.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
|
||||
|
||||
[[package]]
|
||||
name = "leaky-bucket"
|
||||
version = "1.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8eb491abd89e9794d50f93c8db610a29509123e3fbbc9c8c67a528e9391cd853"
|
||||
dependencies = [
|
||||
"parking_lot 0.12.1",
|
||||
"tokio",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "libc"
|
||||
version = "0.2.150"
|
||||
@@ -3448,6 +3460,7 @@ name = "pageserver"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"arc-swap",
|
||||
"async-compression",
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
@@ -3475,6 +3488,7 @@ dependencies = [
|
||||
"humantime-serde",
|
||||
"hyper",
|
||||
"itertools",
|
||||
"leaky-bucket",
|
||||
"md5",
|
||||
"metrics",
|
||||
"nix 0.27.1",
|
||||
@@ -6347,6 +6361,7 @@ dependencies = [
|
||||
"hex-literal",
|
||||
"hyper",
|
||||
"jsonwebtoken",
|
||||
"leaky-bucket",
|
||||
"metrics",
|
||||
"nix 0.27.1",
|
||||
"once_cell",
|
||||
|
||||
@@ -97,6 +97,7 @@ ipnet = "2.9.0"
|
||||
itertools = "0.10"
|
||||
jsonwebtoken = "9"
|
||||
lasso = "0.7"
|
||||
leaky-bucket = "1.0.1"
|
||||
libc = "0.2"
|
||||
md5 = "0.7.0"
|
||||
memoffset = "0.8"
|
||||
|
||||
@@ -47,7 +47,7 @@ COPY --chown=nonroot . .
|
||||
# Show build caching stats to check if it was used in the end.
|
||||
# Has to be the part of the same RUN since cachepot daemon is killed in the end of this RUN, losing the compilation stats.
|
||||
RUN set -e \
|
||||
&& mold -run cargo build \
|
||||
&& RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment" cargo build \
|
||||
--bin pg_sni_router \
|
||||
--bin pageserver \
|
||||
--bin pagectl \
|
||||
|
||||
@@ -769,6 +769,24 @@ RUN wget https://github.com/eulerto/wal2json/archive/refs/tags/wal2json_2_5.tar.
|
||||
make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "pg_ivm"
|
||||
# compile pg_ivm extension
|
||||
#
|
||||
#########################################################################################
|
||||
FROM build-deps AS pg-ivm-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
ENV PATH "/usr/local/pgsql/bin/:$PATH"
|
||||
RUN wget https://github.com/sraoss/pg_ivm/archive/refs/tags/v1.7.tar.gz -O pg_ivm.tar.gz && \
|
||||
echo "ebfde04f99203c7be4b0e873f91104090e2e83e5429c32ac242d00f334224d5e pg_ivm.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_ivm-src && cd pg_ivm-src && tar xvzf ../pg_ivm.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_ivm.control
|
||||
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "neon-pg-ext-build"
|
||||
@@ -810,6 +828,7 @@ COPY --from=pg-semver-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-embedding-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=wal2json-pg-build /usr/local/pgsql /usr/local/pgsql
|
||||
COPY --from=pg-anon-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-ivm-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY pgxn/ pgxn/
|
||||
|
||||
RUN make -j $(getconf _NPROCESSORS_ONLN) \
|
||||
|
||||
16
Makefile
16
Makefile
@@ -159,8 +159,8 @@ neon-pg-ext-%: postgres-%
|
||||
-C $(POSTGRES_INSTALL_DIR)/build/neon-utils-$* \
|
||||
-f $(ROOT_PROJECT_DIR)/pgxn/neon_utils/Makefile install
|
||||
|
||||
.PHONY: neon-pg-ext-clean-%
|
||||
neon-pg-ext-clean-%:
|
||||
.PHONY: neon-pg-clean-ext-%
|
||||
neon-pg-clean-ext-%:
|
||||
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config \
|
||||
-C $(POSTGRES_INSTALL_DIR)/build/neon-$* \
|
||||
-f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile clean
|
||||
@@ -216,11 +216,11 @@ neon-pg-ext: \
|
||||
neon-pg-ext-v15 \
|
||||
neon-pg-ext-v16
|
||||
|
||||
.PHONY: neon-pg-ext-clean
|
||||
neon-pg-ext-clean: \
|
||||
neon-pg-ext-clean-v14 \
|
||||
neon-pg-ext-clean-v15 \
|
||||
neon-pg-ext-clean-v16
|
||||
.PHONY: neon-pg-clean-ext
|
||||
neon-pg-clean-ext: \
|
||||
neon-pg-clean-ext-v14 \
|
||||
neon-pg-clean-ext-v15 \
|
||||
neon-pg-clean-ext-v16
|
||||
|
||||
# shorthand to build all Postgres versions
|
||||
.PHONY: postgres
|
||||
@@ -249,7 +249,7 @@ postgres-check: \
|
||||
|
||||
# This doesn't remove the effects of 'configure'.
|
||||
.PHONY: clean
|
||||
clean: postgres-clean neon-pg-ext-clean
|
||||
clean: postgres-clean neon-pg-clean-ext
|
||||
$(CARGO_CMD_PREFIX) cargo clean
|
||||
|
||||
# This removes everything
|
||||
|
||||
10
README.md
10
README.md
@@ -249,6 +249,16 @@ testing locally, it is convenient to run just one set of permutations, like this
|
||||
DEFAULT_PG_VERSION=15 BUILD_TYPE=release ./scripts/pytest
|
||||
```
|
||||
|
||||
## Flamegraphs
|
||||
|
||||
You may find yourself in need of flamegraphs for software in this repository.
|
||||
You can use [`flamegraph-rs`](https://github.com/flamegraph-rs/flamegraph) or the original [`flamegraph.pl`](https://github.com/brendangregg/FlameGraph). Your choice!
|
||||
|
||||
>[!IMPORTANT]
|
||||
> If you're using `lld` or `mold`, you need the `--no-rosegment` linker argument.
|
||||
> It's a [general thing with Rust / lld / mold](https://crbug.com/919499#c16), not specific to this repository.
|
||||
> See [this PR for further instructions](https://github.com/neondatabase/neon/pull/6764).
|
||||
|
||||
## Documentation
|
||||
|
||||
[docs](/docs) Contains a top-level overview of all available markdown documentation.
|
||||
|
||||
@@ -324,7 +324,8 @@ impl ComputeNode {
|
||||
let spec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
let start_time = Instant::now();
|
||||
|
||||
let mut config = postgres::Config::from_str(&spec.pageserver_connstr)?;
|
||||
let shard0_connstr = spec.pageserver_connstr.split(',').next().unwrap();
|
||||
let mut config = postgres::Config::from_str(shard0_connstr)?;
|
||||
|
||||
// Use the storage auth token from the config file, if given.
|
||||
// Note: this overrides any password set in the connection string.
|
||||
|
||||
@@ -51,6 +51,9 @@ pub fn write_postgres_conf(
|
||||
if let Some(s) = &spec.pageserver_connstring {
|
||||
writeln!(file, "neon.pageserver_connstring={}", escape_conf_value(s))?;
|
||||
}
|
||||
if let Some(stripe_size) = spec.shard_stripe_size {
|
||||
writeln!(file, "neon.stripe_size={stripe_size}")?;
|
||||
}
|
||||
if !spec.safekeeper_connstrings.is_empty() {
|
||||
writeln!(
|
||||
file,
|
||||
|
||||
@@ -4,6 +4,11 @@ version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[features]
|
||||
default = []
|
||||
# Enables test-only APIs and behaviors
|
||||
testing = []
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
aws-config.workspace = true
|
||||
|
||||
@@ -3,7 +3,7 @@ use std::{collections::HashMap, time::Duration};
|
||||
use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
|
||||
use control_plane::local_env::LocalEnv;
|
||||
use hyper::{Method, StatusCode};
|
||||
use pageserver_api::shard::{ShardCount, ShardIndex, ShardNumber, TenantShardId};
|
||||
use pageserver_api::shard::{ShardIndex, ShardNumber, TenantShardId};
|
||||
use postgres_connection::parse_host_port;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -77,7 +77,7 @@ impl ComputeHookTenant {
|
||||
self.shards
|
||||
.sort_by_key(|(shard, _node_id)| shard.shard_number);
|
||||
|
||||
if self.shards.len() == shard_count.0 as usize || shard_count == ShardCount(0) {
|
||||
if self.shards.len() == shard_count.count() as usize || shard_count.is_unsharded() {
|
||||
// We have pageservers for all the shards: emit a configuration update
|
||||
return Some(ComputeHookNotifyRequest {
|
||||
tenant_id,
|
||||
@@ -94,7 +94,7 @@ impl ComputeHookTenant {
|
||||
tracing::info!(
|
||||
"ComputeHookTenant::maybe_reconfigure: not enough shards ({}/{})",
|
||||
self.shards.len(),
|
||||
shard_count.0
|
||||
shard_count.count()
|
||||
);
|
||||
}
|
||||
|
||||
@@ -155,7 +155,7 @@ impl ComputeHook {
|
||||
|
||||
for (endpoint_name, endpoint) in &cplane.endpoints {
|
||||
if endpoint.tenant_id == tenant_id && endpoint.status() == EndpointStatus::Running {
|
||||
tracing::info!("🔁 Reconfiguring endpoint {}", endpoint_name,);
|
||||
tracing::info!("Reconfiguring endpoint {}", endpoint_name,);
|
||||
endpoint.reconfigure(compute_pageservers.clone()).await?;
|
||||
}
|
||||
}
|
||||
@@ -177,7 +177,7 @@ impl ComputeHook {
|
||||
req
|
||||
};
|
||||
|
||||
tracing::debug!(
|
||||
tracing::info!(
|
||||
"Sending notify request to {} ({:?})",
|
||||
url,
|
||||
reconfigure_request
|
||||
@@ -266,7 +266,7 @@ impl ComputeHook {
|
||||
/// periods, but we don't retry forever. The **caller** is responsible for handling failures and
|
||||
/// ensuring that they eventually call again to ensure that the compute is eventually notified of
|
||||
/// the proper pageserver nodes for a tenant.
|
||||
#[tracing::instrument(skip_all, fields(tenant_shard_id, node_id))]
|
||||
#[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), node_id))]
|
||||
pub(super) async fn notify(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
@@ -298,7 +298,7 @@ impl ComputeHook {
|
||||
let Some(reconfigure_request) = reconfigure_request else {
|
||||
// The tenant doesn't yet have pageservers for all its shards: we won't notify anything
|
||||
// until it does.
|
||||
tracing::debug!("Tenant isn't yet ready to emit a notification",);
|
||||
tracing::info!("Tenant isn't yet ready to emit a notification");
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
|
||||
@@ -37,6 +37,12 @@ impl std::fmt::Display for Sequence {
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for Sequence {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(f, "{}", self.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl MonotonicCounter<Sequence> for Sequence {
|
||||
fn cnt_advance(&mut self, v: Sequence) {
|
||||
assert!(*self <= v);
|
||||
|
||||
@@ -15,6 +15,7 @@ use diesel::Connection;
|
||||
use metrics::launch_timestamp::LaunchTimestamp;
|
||||
use std::sync::Arc;
|
||||
use tokio::signal::unix::SignalKind;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::auth::{JwtAuth, SwappableJwtAuth};
|
||||
use utils::logging::{self, LogFormat};
|
||||
|
||||
@@ -237,15 +238,23 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
let auth = secrets
|
||||
.public_key
|
||||
.map(|jwt_auth| Arc::new(SwappableJwtAuth::new(jwt_auth)));
|
||||
let router = make_router(service, auth)
|
||||
let router = make_router(service.clone(), auth)
|
||||
.build()
|
||||
.map_err(|err| anyhow!(err))?;
|
||||
let router_service = utils::http::RouterService::new(router).unwrap();
|
||||
let server = hyper::Server::from_tcp(http_listener)?.serve(router_service);
|
||||
|
||||
// Start HTTP server
|
||||
let server_shutdown = CancellationToken::new();
|
||||
let server = hyper::Server::from_tcp(http_listener)?
|
||||
.serve(router_service)
|
||||
.with_graceful_shutdown({
|
||||
let server_shutdown = server_shutdown.clone();
|
||||
async move {
|
||||
server_shutdown.cancelled().await;
|
||||
}
|
||||
});
|
||||
tracing::info!("Serving on {0}", args.listen);
|
||||
|
||||
tokio::task::spawn(server);
|
||||
let server_task = tokio::task::spawn(server);
|
||||
|
||||
// Wait until we receive a signal
|
||||
let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())?;
|
||||
@@ -266,5 +275,16 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
}
|
||||
}
|
||||
|
||||
// Stop HTTP server first, so that we don't have to service requests
|
||||
// while shutting down Service
|
||||
server_shutdown.cancel();
|
||||
if let Err(e) = server_task.await {
|
||||
tracing::error!("Error joining HTTP server task: {e}")
|
||||
}
|
||||
tracing::info!("Joined HTTP server task");
|
||||
|
||||
service.shutdown().await;
|
||||
tracing::info!("Service shutdown complete");
|
||||
|
||||
std::process::exit(0);
|
||||
}
|
||||
|
||||
@@ -222,7 +222,7 @@ impl Persistence {
|
||||
let tenant_shard_id = TenantShardId {
|
||||
tenant_id: TenantId::from_str(tsp.tenant_id.as_str())?,
|
||||
shard_number: ShardNumber(tsp.shard_number as u8),
|
||||
shard_count: ShardCount(tsp.shard_count as u8),
|
||||
shard_count: ShardCount::new(tsp.shard_count as u8),
|
||||
};
|
||||
|
||||
tenants_map.insert(tenant_shard_id, tsp);
|
||||
@@ -318,7 +318,7 @@ impl Persistence {
|
||||
tenant_id: TenantId::from_str(tsp.tenant_id.as_str())
|
||||
.map_err(|e| DatabaseError::Logical(format!("Malformed tenant id: {e}")))?,
|
||||
shard_number: ShardNumber(tsp.shard_number as u8),
|
||||
shard_count: ShardCount(tsp.shard_count as u8),
|
||||
shard_count: ShardCount::new(tsp.shard_count as u8),
|
||||
};
|
||||
result.insert(tenant_shard_id, Generation::new(tsp.generation as u32));
|
||||
}
|
||||
@@ -340,7 +340,7 @@ impl Persistence {
|
||||
let updated = diesel::update(tenant_shards)
|
||||
.filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
|
||||
.filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
|
||||
.filter(shard_count.eq(tenant_shard_id.shard_count.0 as i32))
|
||||
.filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
|
||||
.set((
|
||||
generation.eq(generation + 1),
|
||||
generation_pageserver.eq(node_id.0 as i64),
|
||||
@@ -362,7 +362,7 @@ impl Persistence {
|
||||
let updated = diesel::update(tenant_shards)
|
||||
.filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
|
||||
.filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
|
||||
.filter(shard_count.eq(tenant_shard_id.shard_count.0 as i32))
|
||||
.filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
|
||||
.set((
|
||||
generation_pageserver.eq(i64::MAX),
|
||||
placement_policy.eq(serde_json::to_string(&PlacementPolicy::Detached).unwrap()),
|
||||
@@ -392,21 +392,19 @@ impl Persistence {
|
||||
conn.transaction(|conn| -> DatabaseResult<()> {
|
||||
// Mark parent shards as splitting
|
||||
|
||||
let expect_parent_records = std::cmp::max(1, old_shard_count.0);
|
||||
|
||||
let updated = diesel::update(tenant_shards)
|
||||
.filter(tenant_id.eq(split_tenant_id.to_string()))
|
||||
.filter(shard_count.eq(old_shard_count.0 as i32))
|
||||
.filter(shard_count.eq(old_shard_count.literal() as i32))
|
||||
.set((splitting.eq(1),))
|
||||
.execute(conn)?;
|
||||
if u8::try_from(updated)
|
||||
.map_err(|_| DatabaseError::Logical(
|
||||
format!("Overflow existing shard count {} while splitting", updated))
|
||||
)? != expect_parent_records {
|
||||
)? != old_shard_count.count() {
|
||||
// Perhaps a deletion or another split raced with this attempt to split, mutating
|
||||
// the parent shards that we intend to split. In this case the split request should fail.
|
||||
return Err(DatabaseError::Logical(
|
||||
format!("Unexpected existing shard count {updated} when preparing tenant for split (expected {expect_parent_records})")
|
||||
format!("Unexpected existing shard count {updated} when preparing tenant for split (expected {})", old_shard_count.count())
|
||||
));
|
||||
}
|
||||
|
||||
@@ -418,7 +416,7 @@ impl Persistence {
|
||||
let mut parent = crate::schema::tenant_shards::table
|
||||
.filter(tenant_id.eq(parent_shard_id.tenant_id.to_string()))
|
||||
.filter(shard_number.eq(parent_shard_id.shard_number.0 as i32))
|
||||
.filter(shard_count.eq(parent_shard_id.shard_count.0 as i32))
|
||||
.filter(shard_count.eq(parent_shard_id.shard_count.literal() as i32))
|
||||
.load::<TenantShardPersistence>(conn)?;
|
||||
let parent = if parent.len() != 1 {
|
||||
return Err(DatabaseError::Logical(format!(
|
||||
@@ -459,7 +457,7 @@ impl Persistence {
|
||||
// Drop parent shards
|
||||
diesel::delete(tenant_shards)
|
||||
.filter(tenant_id.eq(split_tenant_id.to_string()))
|
||||
.filter(shard_count.eq(old_shard_count.0 as i32))
|
||||
.filter(shard_count.eq(old_shard_count.literal() as i32))
|
||||
.execute(conn)?;
|
||||
|
||||
// Clear sharding flag
|
||||
|
||||
@@ -13,6 +13,7 @@ use tokio_util::sync::CancellationToken;
|
||||
use utils::generation::Generation;
|
||||
use utils::id::{NodeId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::sync::gate::GateGuard;
|
||||
|
||||
use crate::compute_hook::{ComputeHook, NotifyError};
|
||||
use crate::node::Node;
|
||||
@@ -53,6 +54,10 @@ pub(super) struct Reconciler {
|
||||
/// the tenant is changed.
|
||||
pub(crate) cancel: CancellationToken,
|
||||
|
||||
/// Reconcilers are registered with a Gate so that during a graceful shutdown we
|
||||
/// can wait for all the reconcilers to respond to their cancellation tokens.
|
||||
pub(crate) _gate_guard: GateGuard,
|
||||
|
||||
/// Access to persistent storage for updating generation numbers
|
||||
pub(crate) persistence: Arc<Persistence>,
|
||||
}
|
||||
@@ -263,7 +268,7 @@ impl Reconciler {
|
||||
secondary_conf,
|
||||
tenant_conf: config.clone(),
|
||||
shard_number: shard.number.0,
|
||||
shard_count: shard.count.0,
|
||||
shard_count: shard.count.literal(),
|
||||
shard_stripe_size: shard.stripe_size.0,
|
||||
}
|
||||
}
|
||||
@@ -458,7 +463,7 @@ impl Reconciler {
|
||||
generation: None,
|
||||
secondary_conf: None,
|
||||
shard_number: self.shard.number.0,
|
||||
shard_count: self.shard.count.0,
|
||||
shard_count: self.shard.count.literal(),
|
||||
shard_stripe_size: self.shard.stripe_size.0,
|
||||
tenant_conf: self.config.clone(),
|
||||
},
|
||||
@@ -506,7 +511,7 @@ pub(crate) fn attached_location_conf(
|
||||
generation: generation.into(),
|
||||
secondary_conf: None,
|
||||
shard_number: shard.number.0,
|
||||
shard_count: shard.count.0,
|
||||
shard_count: shard.count.literal(),
|
||||
shard_stripe_size: shard.stripe_size.0,
|
||||
tenant_conf: config.clone(),
|
||||
}
|
||||
@@ -521,7 +526,7 @@ pub(crate) fn secondary_location_conf(
|
||||
generation: None,
|
||||
secondary_conf: Some(LocationConfigSecondary { warm: true }),
|
||||
shard_number: shard.number.0,
|
||||
shard_count: shard.count.0,
|
||||
shard_count: shard.count.literal(),
|
||||
shard_stripe_size: shard.stripe_size.0,
|
||||
tenant_conf: config.clone(),
|
||||
}
|
||||
|
||||
@@ -77,12 +77,11 @@ impl Scheduler {
|
||||
return Err(ScheduleError::ImpossibleConstraint);
|
||||
}
|
||||
|
||||
for (node_id, count) in &tenant_counts {
|
||||
tracing::info!("tenant_counts[{node_id}]={count}");
|
||||
}
|
||||
|
||||
let node_id = tenant_counts.first().unwrap().0;
|
||||
tracing::info!("scheduler selected node {node_id}");
|
||||
tracing::info!(
|
||||
"scheduler selected node {node_id} (elegible nodes {:?}, exclude: {hard_exclude:?})",
|
||||
tenant_counts.iter().map(|i| i.0 .0).collect::<Vec<_>>()
|
||||
);
|
||||
*self.tenant_counts.get_mut(&node_id).unwrap() += 1;
|
||||
Ok(node_id)
|
||||
}
|
||||
|
||||
@@ -30,6 +30,7 @@ use pageserver_api::{
|
||||
};
|
||||
use pageserver_client::mgmt_api;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::instrument;
|
||||
use utils::{
|
||||
backoff,
|
||||
completion::Barrier,
|
||||
@@ -37,6 +38,7 @@ use utils::{
|
||||
http::error::ApiError,
|
||||
id::{NodeId, TenantId, TimelineId},
|
||||
seqwait::SeqWait,
|
||||
sync::gate::Gate,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
@@ -124,6 +126,12 @@ pub struct Service {
|
||||
config: Config,
|
||||
persistence: Arc<Persistence>,
|
||||
|
||||
// Process shutdown will fire this token
|
||||
cancel: CancellationToken,
|
||||
|
||||
// Background tasks will hold this gate
|
||||
gate: Gate,
|
||||
|
||||
/// This waits for initial reconciliation with pageservers to complete. Until this barrier
|
||||
/// passes, it isn't safe to do any actions that mutate tenants.
|
||||
pub(crate) startup_complete: Barrier,
|
||||
@@ -144,8 +152,9 @@ impl Service {
|
||||
&self.config
|
||||
}
|
||||
|
||||
/// TODO: don't allow other API calls until this is done, don't start doing any background housekeeping
|
||||
/// until this is done.
|
||||
/// Called once on startup, this function attempts to contact all pageservers to build an up-to-date
|
||||
/// view of the world, and determine which pageservers are responsive.
|
||||
#[instrument(skip_all)]
|
||||
async fn startup_reconcile(&self) {
|
||||
// For all tenant shards, a vector of observed states on nodes (where None means
|
||||
// indeterminate, same as in [`ObservedStateLocation`])
|
||||
@@ -153,9 +162,6 @@ impl Service {
|
||||
|
||||
let mut nodes_online = HashSet::new();
|
||||
|
||||
// TODO: give Service a cancellation token for clean shutdown
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
// TODO: issue these requests concurrently
|
||||
{
|
||||
let nodes = {
|
||||
@@ -190,7 +196,7 @@ impl Service {
|
||||
1,
|
||||
5,
|
||||
"Location config listing",
|
||||
&cancel,
|
||||
&self.cancel,
|
||||
)
|
||||
.await;
|
||||
let Some(list_response) = list_response else {
|
||||
@@ -292,7 +298,7 @@ impl Service {
|
||||
generation: None,
|
||||
secondary_conf: None,
|
||||
shard_number: tenant_shard_id.shard_number.0,
|
||||
shard_count: tenant_shard_id.shard_count.0,
|
||||
shard_count: tenant_shard_id.shard_count.literal(),
|
||||
shard_stripe_size: 0,
|
||||
tenant_conf: models::TenantConfig::default(),
|
||||
},
|
||||
@@ -331,7 +337,7 @@ impl Service {
|
||||
let stream = futures::stream::iter(compute_notifications.into_iter())
|
||||
.map(|(tenant_shard_id, node_id)| {
|
||||
let compute_hook = compute_hook.clone();
|
||||
let cancel = cancel.clone();
|
||||
let cancel = self.cancel.clone();
|
||||
async move {
|
||||
if let Err(e) = compute_hook.notify(tenant_shard_id, node_id, &cancel).await {
|
||||
tracing::error!(
|
||||
@@ -368,8 +374,98 @@ impl Service {
|
||||
tracing::info!("Startup complete, spawned {reconcile_tasks} reconciliation tasks ({shard_count} shards total)");
|
||||
}
|
||||
|
||||
/// Long running background task that periodically wakes up and looks for shards that need
|
||||
/// reconciliation. Reconciliation is fallible, so any reconciliation tasks that fail during
|
||||
/// e.g. a tenant create/attach/migrate must eventually be retried: this task is responsible
|
||||
/// for those retries.
|
||||
#[instrument(skip_all)]
|
||||
async fn background_reconcile(&self) {
|
||||
self.startup_complete.clone().wait().await;
|
||||
|
||||
const BACKGROUND_RECONCILE_PERIOD: Duration = Duration::from_secs(20);
|
||||
|
||||
let mut interval = tokio::time::interval(BACKGROUND_RECONCILE_PERIOD);
|
||||
while !self.cancel.is_cancelled() {
|
||||
tokio::select! {
|
||||
_ = interval.tick() => { self.reconcile_all(); }
|
||||
_ = self.cancel.cancelled() => return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn process_results(
|
||||
&self,
|
||||
mut result_rx: tokio::sync::mpsc::UnboundedReceiver<ReconcileResult>,
|
||||
) {
|
||||
loop {
|
||||
// Wait for the next result, or for cancellation
|
||||
let result = tokio::select! {
|
||||
r = result_rx.recv() => {
|
||||
match r {
|
||||
Some(result) => {result},
|
||||
None => {break;}
|
||||
}
|
||||
}
|
||||
_ = self.cancel.cancelled() => {
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
tracing::info!(
|
||||
"Reconcile result for sequence {}, ok={}",
|
||||
result.sequence,
|
||||
result.result.is_ok()
|
||||
);
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let Some(tenant) = locked.tenants.get_mut(&result.tenant_shard_id) else {
|
||||
// A reconciliation result might race with removing a tenant: drop results for
|
||||
// tenants that aren't in our map.
|
||||
continue;
|
||||
};
|
||||
|
||||
// Usually generation should only be updated via this path, so the max() isn't
|
||||
// needed, but it is used to handle out-of-band updates via. e.g. test hook.
|
||||
tenant.generation = std::cmp::max(tenant.generation, result.generation);
|
||||
|
||||
// If the reconciler signals that it failed to notify compute, set this state on
|
||||
// the shard so that a future [`TenantState::maybe_reconcile`] will try again.
|
||||
tenant.pending_compute_notification = result.pending_compute_notification;
|
||||
|
||||
match result.result {
|
||||
Ok(()) => {
|
||||
for (node_id, loc) in &result.observed.locations {
|
||||
if let Some(conf) = &loc.conf {
|
||||
tracing::info!("Updating observed location {}: {:?}", node_id, conf);
|
||||
} else {
|
||||
tracing::info!("Setting observed location {} to None", node_id,)
|
||||
}
|
||||
}
|
||||
tenant.observed = result.observed;
|
||||
tenant.waiter.advance(result.sequence);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
"Reconcile error on tenant {}: {}",
|
||||
tenant.tenant_shard_id,
|
||||
e
|
||||
);
|
||||
|
||||
// Ordering: populate last_error before advancing error_seq,
|
||||
// so that waiters will see the correct error after waiting.
|
||||
*(tenant.last_error.lock().unwrap()) = format!("{e}");
|
||||
tenant.error_waiter.advance(result.sequence);
|
||||
|
||||
for (node_id, o) in result.observed.locations {
|
||||
tenant.observed.locations.insert(node_id, o);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn spawn(config: Config, persistence: Arc<Persistence>) -> anyhow::Result<Arc<Self>> {
|
||||
let (result_tx, mut result_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let (result_tx, result_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
|
||||
tracing::info!("Loading nodes from database...");
|
||||
let nodes = persistence.list_nodes().await?;
|
||||
@@ -389,14 +485,14 @@ impl Service {
|
||||
let tenant_shard_id = TenantShardId {
|
||||
tenant_id: TenantId::from_str(tsp.tenant_id.as_str())?,
|
||||
shard_number: ShardNumber(tsp.shard_number as u8),
|
||||
shard_count: ShardCount(tsp.shard_count as u8),
|
||||
shard_count: ShardCount::new(tsp.shard_count as u8),
|
||||
};
|
||||
let shard_identity = if tsp.shard_count == 0 {
|
||||
ShardIdentity::unsharded()
|
||||
} else {
|
||||
ShardIdentity::new(
|
||||
ShardNumber(tsp.shard_number as u8),
|
||||
ShardCount(tsp.shard_count as u8),
|
||||
ShardCount::new(tsp.shard_count as u8),
|
||||
ShardStripeSize(tsp.shard_stripe_size as u32),
|
||||
)?
|
||||
};
|
||||
@@ -418,6 +514,7 @@ impl Service {
|
||||
observed: ObservedState::new(),
|
||||
config: serde_json::from_str(&tsp.config).unwrap(),
|
||||
reconciler: None,
|
||||
splitting: tsp.splitting,
|
||||
waiter: Arc::new(SeqWait::new(Sequence::initial())),
|
||||
error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
|
||||
last_error: Arc::default(),
|
||||
@@ -439,73 +536,35 @@ impl Service {
|
||||
config,
|
||||
persistence,
|
||||
startup_complete: startup_complete.clone(),
|
||||
cancel: CancellationToken::new(),
|
||||
gate: Gate::default(),
|
||||
});
|
||||
|
||||
let result_task_this = this.clone();
|
||||
tokio::task::spawn(async move {
|
||||
while let Some(result) = result_rx.recv().await {
|
||||
tracing::info!(
|
||||
"Reconcile result for sequence {}, ok={}",
|
||||
result.sequence,
|
||||
result.result.is_ok()
|
||||
);
|
||||
let mut locked = result_task_this.inner.write().unwrap();
|
||||
let Some(tenant) = locked.tenants.get_mut(&result.tenant_shard_id) else {
|
||||
// A reconciliation result might race with removing a tenant: drop results for
|
||||
// tenants that aren't in our map.
|
||||
continue;
|
||||
};
|
||||
|
||||
// Usually generation should only be updated via this path, so the max() isn't
|
||||
// needed, but it is used to handle out-of-band updates via. e.g. test hook.
|
||||
tenant.generation = std::cmp::max(tenant.generation, result.generation);
|
||||
|
||||
// If the reconciler signals that it failed to notify compute, set this state on
|
||||
// the shard so that a future [`TenantState::maybe_reconcile`] will try again.
|
||||
tenant.pending_compute_notification = result.pending_compute_notification;
|
||||
|
||||
match result.result {
|
||||
Ok(()) => {
|
||||
for (node_id, loc) in &result.observed.locations {
|
||||
if let Some(conf) = &loc.conf {
|
||||
tracing::info!(
|
||||
"Updating observed location {}: {:?}",
|
||||
node_id,
|
||||
conf
|
||||
);
|
||||
} else {
|
||||
tracing::info!("Setting observed location {} to None", node_id,)
|
||||
}
|
||||
}
|
||||
tenant.observed = result.observed;
|
||||
tenant.waiter.advance(result.sequence);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
"Reconcile error on tenant {}: {}",
|
||||
tenant.tenant_shard_id,
|
||||
e
|
||||
);
|
||||
|
||||
// Ordering: populate last_error before advancing error_seq,
|
||||
// so that waiters will see the correct error after waiting.
|
||||
*(tenant.last_error.lock().unwrap()) = format!("{e}");
|
||||
tenant.error_waiter.advance(result.sequence);
|
||||
|
||||
for (node_id, o) in result.observed.locations {
|
||||
tenant.observed.locations.insert(node_id, o);
|
||||
}
|
||||
}
|
||||
}
|
||||
// Block shutdown until we're done (we must respect self.cancel)
|
||||
if let Ok(_gate) = result_task_this.gate.enter() {
|
||||
result_task_this.process_results(result_rx).await
|
||||
}
|
||||
});
|
||||
|
||||
let startup_reconcile_this = this.clone();
|
||||
tokio::task::spawn(async move {
|
||||
// Block the [`Service::startup_complete`] barrier until we're done
|
||||
let _completion = startup_completion;
|
||||
tokio::task::spawn({
|
||||
let this = this.clone();
|
||||
// We will block the [`Service::startup_complete`] barrier until [`Self::startup_reconcile`]
|
||||
// is done.
|
||||
let startup_completion = startup_completion.clone();
|
||||
async move {
|
||||
// Block shutdown until we're done (we must respect self.cancel)
|
||||
let Ok(_gate) = this.gate.enter() else {
|
||||
return;
|
||||
};
|
||||
|
||||
startup_reconcile_this.startup_reconcile().await
|
||||
this.startup_reconcile().await;
|
||||
|
||||
drop(startup_completion);
|
||||
|
||||
this.background_reconcile().await;
|
||||
}
|
||||
});
|
||||
|
||||
Ok(this)
|
||||
@@ -526,7 +585,7 @@ impl Service {
|
||||
let tsp = TenantShardPersistence {
|
||||
tenant_id: attach_req.tenant_shard_id.tenant_id.to_string(),
|
||||
shard_number: attach_req.tenant_shard_id.shard_number.0 as i32,
|
||||
shard_count: attach_req.tenant_shard_id.shard_count.0 as i32,
|
||||
shard_count: attach_req.tenant_shard_id.shard_count.literal() as i32,
|
||||
shard_stripe_size: 0,
|
||||
generation: 0,
|
||||
generation_pageserver: i64::MAX,
|
||||
@@ -620,6 +679,28 @@ impl Service {
|
||||
attach_req.node_id.unwrap_or(utils::id::NodeId(0xfffffff))
|
||||
);
|
||||
|
||||
// Trick the reconciler into not doing anything for this tenant: this helps
|
||||
// tests that manually configure a tenant on the pagesrever, and then call this
|
||||
// attach hook: they don't want background reconciliation to modify what they
|
||||
// did to the pageserver.
|
||||
#[cfg(feature = "testing")]
|
||||
{
|
||||
if let Some(node_id) = attach_req.node_id {
|
||||
tenant_state.observed.locations = HashMap::from([(
|
||||
node_id,
|
||||
ObservedStateLocation {
|
||||
conf: Some(attached_location_conf(
|
||||
tenant_state.generation,
|
||||
&tenant_state.shard,
|
||||
&tenant_state.config,
|
||||
)),
|
||||
},
|
||||
)]);
|
||||
} else {
|
||||
tenant_state.observed.locations.clear();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(AttachHookResponse {
|
||||
gen: attach_req
|
||||
.node_id
|
||||
@@ -726,16 +807,9 @@ impl Service {
|
||||
&self,
|
||||
create_req: TenantCreateRequest,
|
||||
) -> Result<TenantCreateResponse, ApiError> {
|
||||
// Shard count 0 is valid: it means create a single shard (ShardCount(0) means "unsharded")
|
||||
let literal_shard_count = if create_req.shard_parameters.is_unsharded() {
|
||||
1
|
||||
} else {
|
||||
create_req.shard_parameters.count.0
|
||||
};
|
||||
|
||||
// This service expects to handle sharding itself: it is an error to try and directly create
|
||||
// a particular shard here.
|
||||
let tenant_id = if create_req.new_tenant_id.shard_count > ShardCount(1) {
|
||||
let tenant_id = if !create_req.new_tenant_id.is_unsharded() {
|
||||
return Err(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"Attempted to create a specific shard, this API is for creating the whole tenant"
|
||||
)));
|
||||
@@ -749,7 +823,7 @@ impl Service {
|
||||
create_req.shard_parameters.count,
|
||||
);
|
||||
|
||||
let create_ids = (0..literal_shard_count)
|
||||
let create_ids = (0..create_req.shard_parameters.count.count())
|
||||
.map(|i| TenantShardId {
|
||||
tenant_id,
|
||||
shard_number: ShardNumber(i),
|
||||
@@ -769,7 +843,7 @@ impl Service {
|
||||
.map(|tenant_shard_id| TenantShardPersistence {
|
||||
tenant_id: tenant_shard_id.tenant_id.to_string(),
|
||||
shard_number: tenant_shard_id.shard_number.0 as i32,
|
||||
shard_count: tenant_shard_id.shard_count.0 as i32,
|
||||
shard_count: tenant_shard_id.shard_count.literal() as i32,
|
||||
shard_stripe_size: create_req.shard_parameters.stripe_size.0 as i32,
|
||||
generation: create_req.generation.map(|g| g as i32).unwrap_or(0),
|
||||
generation_pageserver: i64::MAX,
|
||||
@@ -875,6 +949,8 @@ impl Service {
|
||||
&compute_hook,
|
||||
&self.config,
|
||||
&self.persistence,
|
||||
&self.gate,
|
||||
&self.cancel,
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
@@ -914,7 +990,7 @@ impl Service {
|
||||
tenant_id: TenantId,
|
||||
req: TenantLocationConfigRequest,
|
||||
) -> Result<TenantLocationConfigResponse, ApiError> {
|
||||
if req.tenant_id.shard_count.0 > 1 {
|
||||
if !req.tenant_id.is_unsharded() {
|
||||
return Err(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"This API is for importing single-sharded or unsharded tenants"
|
||||
)));
|
||||
@@ -977,6 +1053,8 @@ impl Service {
|
||||
&compute_hook,
|
||||
&self.config,
|
||||
&self.persistence,
|
||||
&self.gate,
|
||||
&self.cancel,
|
||||
);
|
||||
if let Some(waiter) = maybe_waiter {
|
||||
waiters.push(waiter);
|
||||
@@ -1066,6 +1144,8 @@ impl Service {
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_delete(&self, tenant_id: TenantId) -> Result<StatusCode, ApiError> {
|
||||
self.ensure_attached_wait(tenant_id).await?;
|
||||
|
||||
// TODO: refactor into helper
|
||||
let targets = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
@@ -1087,8 +1167,6 @@ impl Service {
|
||||
targets
|
||||
};
|
||||
|
||||
// TODO: error out if the tenant is not attached anywhere.
|
||||
|
||||
// Phase 1: delete on the pageservers
|
||||
let mut any_pending = false;
|
||||
for (tenant_shard_id, node) in targets {
|
||||
@@ -1424,9 +1502,6 @@ impl Service {
|
||||
let mut policy = None;
|
||||
let mut shard_ident = None;
|
||||
|
||||
// TODO: put a cancellation token on Service for clean shutdown
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
// A parent shard which will be split
|
||||
struct SplitTarget {
|
||||
parent_id: TenantShardId,
|
||||
@@ -1449,7 +1524,7 @@ impl Service {
|
||||
for (tenant_shard_id, shard) in
|
||||
locked.tenants.range(TenantShardId::tenant_range(tenant_id))
|
||||
{
|
||||
match shard.shard.count.0.cmp(&split_req.new_shard_count) {
|
||||
match shard.shard.count.count().cmp(&split_req.new_shard_count) {
|
||||
Ordering::Equal => {
|
||||
// Already split this
|
||||
children_found.push(*tenant_shard_id);
|
||||
@@ -1459,7 +1534,7 @@ impl Service {
|
||||
return Err(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"Requested count {} but already have shards at count {}",
|
||||
split_req.new_shard_count,
|
||||
shard.shard.count.0
|
||||
shard.shard.count.count()
|
||||
)));
|
||||
}
|
||||
Ordering::Less => {
|
||||
@@ -1489,7 +1564,7 @@ impl Service {
|
||||
shard_ident = Some(shard.shard);
|
||||
}
|
||||
|
||||
if tenant_shard_id.shard_count == ShardCount(split_req.new_shard_count) {
|
||||
if tenant_shard_id.shard_count.count() == split_req.new_shard_count {
|
||||
tracing::info!(
|
||||
"Tenant shard {} already has shard count {}",
|
||||
tenant_shard_id,
|
||||
@@ -1515,7 +1590,7 @@ impl Service {
|
||||
targets.push(SplitTarget {
|
||||
parent_id: *tenant_shard_id,
|
||||
node: node.clone(),
|
||||
child_ids: tenant_shard_id.split(ShardCount(split_req.new_shard_count)),
|
||||
child_ids: tenant_shard_id.split(ShardCount::new(split_req.new_shard_count)),
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1562,7 +1637,7 @@ impl Service {
|
||||
this_child_tsps.push(TenantShardPersistence {
|
||||
tenant_id: child.tenant_id.to_string(),
|
||||
shard_number: child.shard_number.0 as i32,
|
||||
shard_count: child.shard_count.0 as i32,
|
||||
shard_count: child.shard_count.literal() as i32,
|
||||
shard_stripe_size: shard_ident.stripe_size.0 as i32,
|
||||
// Note: this generation is a placeholder, [`Persistence::begin_shard_split`] will
|
||||
// populate the correct generation as part of its transaction, to protect us
|
||||
@@ -1598,6 +1673,18 @@ impl Service {
|
||||
}
|
||||
}
|
||||
|
||||
// Now that I have persisted the splitting state, apply it in-memory. This is infallible, so
|
||||
// callers may assume that if splitting is set in memory, then it was persisted, and if splitting
|
||||
// is not set in memory, then it was not persisted.
|
||||
{
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
for target in &targets {
|
||||
if let Some(parent_shard) = locked.tenants.get_mut(&target.parent_id) {
|
||||
parent_shard.splitting = SplitState::Splitting;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME: we have now committed the shard split state to the database, so any subsequent
|
||||
// failure needs to roll it back. We will later wrap this function in logic to roll back
|
||||
// the split if it fails.
|
||||
@@ -1657,7 +1744,7 @@ impl Service {
|
||||
.complete_shard_split(tenant_id, old_shard_count)
|
||||
.await?;
|
||||
|
||||
// Replace all the shards we just split with their children
|
||||
// Replace all the shards we just split with their children: this phase is infallible.
|
||||
let mut response = TenantShardSplitResponse {
|
||||
new_shards: Vec::new(),
|
||||
};
|
||||
@@ -1705,6 +1792,10 @@ impl Service {
|
||||
child_state.generation = generation;
|
||||
child_state.config = config.clone();
|
||||
|
||||
// The child's TenantState::splitting is intentionally left at the default value of Idle,
|
||||
// as at this point in the split process we have succeeded and this part is infallible:
|
||||
// we will never need to do any special recovery from this state.
|
||||
|
||||
child_locations.push((child, pageserver));
|
||||
|
||||
locked.tenants.insert(child, child_state);
|
||||
@@ -1716,7 +1807,7 @@ impl Service {
|
||||
// Send compute notifications for all the new shards
|
||||
let mut failed_notifications = Vec::new();
|
||||
for (child_id, child_ps) in child_locations {
|
||||
if let Err(e) = compute_hook.notify(child_id, child_ps, &cancel).await {
|
||||
if let Err(e) = compute_hook.notify(child_id, child_ps, &self.cancel).await {
|
||||
tracing::warn!("Failed to update compute of {}->{} during split, proceeding anyway to complete split ({e})",
|
||||
child_id, child_ps);
|
||||
failed_notifications.push(child_id);
|
||||
@@ -1792,6 +1883,8 @@ impl Service {
|
||||
&compute_hook,
|
||||
&self.config,
|
||||
&self.persistence,
|
||||
&self.gate,
|
||||
&self.cancel,
|
||||
)
|
||||
};
|
||||
|
||||
@@ -1993,6 +2086,8 @@ impl Service {
|
||||
&compute_hook,
|
||||
&self.config,
|
||||
&self.persistence,
|
||||
&self.gate,
|
||||
&self.cancel,
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -2014,6 +2109,8 @@ impl Service {
|
||||
&compute_hook,
|
||||
&self.config,
|
||||
&self.persistence,
|
||||
&self.gate,
|
||||
&self.cancel,
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -2053,6 +2150,8 @@ impl Service {
|
||||
&compute_hook,
|
||||
&self.config,
|
||||
&self.persistence,
|
||||
&self.gate,
|
||||
&self.cancel,
|
||||
) {
|
||||
waiters.push(waiter);
|
||||
}
|
||||
@@ -2064,6 +2163,17 @@ impl Service {
|
||||
let ensure_waiters = {
|
||||
let locked = self.inner.write().unwrap();
|
||||
|
||||
// Check if the tenant is splitting: in this case, even if it is attached,
|
||||
// we must act as if it is not: this blocks e.g. timeline creation/deletion
|
||||
// operations during the split.
|
||||
for (_shard_id, shard) in locked.tenants.range(TenantShardId::tenant_range(tenant_id)) {
|
||||
if !matches!(shard.splitting, SplitState::Idle) {
|
||||
return Err(ApiError::ResourceUnavailable(
|
||||
"Tenant shards are currently splitting".into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
self.ensure_attached_schedule(locked, tenant_id)
|
||||
.map_err(ApiError::InternalServerError)?
|
||||
};
|
||||
@@ -2095,8 +2205,25 @@ impl Service {
|
||||
&compute_hook,
|
||||
&self.config,
|
||||
&self.persistence,
|
||||
&self.gate,
|
||||
&self.cancel,
|
||||
)
|
||||
})
|
||||
.count()
|
||||
}
|
||||
|
||||
pub async fn shutdown(&self) {
|
||||
// Note that this already stops processing any results from reconciles: so
|
||||
// we do not expect that our [`TenantState`] objects will reach a neat
|
||||
// final state.
|
||||
self.cancel.cancel();
|
||||
|
||||
// The cancellation tokens in [`crate::reconciler::Reconciler`] are children
|
||||
// of our cancellation token, so we do not need to explicitly cancel each of
|
||||
// them.
|
||||
|
||||
// Background tasks and reconcilers hold gate guards: this waits for them all
|
||||
// to complete.
|
||||
self.gate.close().await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,16 +7,18 @@ use pageserver_api::{
|
||||
};
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{instrument, Instrument};
|
||||
use utils::{
|
||||
generation::Generation,
|
||||
id::NodeId,
|
||||
seqwait::{SeqWait, SeqWaitError},
|
||||
sync::gate::Gate,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
compute_hook::ComputeHook,
|
||||
node::Node,
|
||||
persistence::Persistence,
|
||||
persistence::{split_state::SplitState, Persistence},
|
||||
reconciler::{attached_location_conf, secondary_location_conf, ReconcileError, Reconciler},
|
||||
scheduler::{ScheduleError, Scheduler},
|
||||
service, PlacementPolicy, Sequence,
|
||||
@@ -58,6 +60,11 @@ pub(crate) struct TenantState {
|
||||
/// cancellation token has been fired)
|
||||
pub(crate) reconciler: Option<ReconcilerHandle>,
|
||||
|
||||
/// If a tenant is being split, then all shards with that TenantId will have a
|
||||
/// SplitState set, this acts as a guard against other operations such as background
|
||||
/// reconciliation, and timeline creation.
|
||||
pub(crate) splitting: SplitState,
|
||||
|
||||
/// Optionally wait for reconciliation to complete up to a particular
|
||||
/// sequence number.
|
||||
pub(crate) waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
|
||||
@@ -238,6 +245,7 @@ impl TenantState {
|
||||
observed: ObservedState::default(),
|
||||
config: TenantConfig::default(),
|
||||
reconciler: None,
|
||||
splitting: SplitState::Idle,
|
||||
sequence: Sequence(1),
|
||||
waiter: Arc::new(SeqWait::new(Sequence(0))),
|
||||
error_waiter: Arc::new(SeqWait::new(Sequence(0))),
|
||||
@@ -415,6 +423,8 @@ impl TenantState {
|
||||
false
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
|
||||
pub(crate) fn maybe_reconcile(
|
||||
&mut self,
|
||||
result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
|
||||
@@ -422,6 +432,8 @@ impl TenantState {
|
||||
compute_hook: &Arc<ComputeHook>,
|
||||
service_config: &service::Config,
|
||||
persistence: &Arc<Persistence>,
|
||||
gate: &Gate,
|
||||
cancel: &CancellationToken,
|
||||
) -> Option<ReconcilerWaiter> {
|
||||
// If there are any ambiguous observed states, and the nodes they refer to are available,
|
||||
// we should reconcile to clean them up.
|
||||
@@ -443,6 +455,14 @@ impl TenantState {
|
||||
return None;
|
||||
}
|
||||
|
||||
// If we are currently splitting, then never start a reconciler task: the splitting logic
|
||||
// requires that shards are not interfered with while it runs. Do this check here rather than
|
||||
// up top, so that we only log this message if we would otherwise have done a reconciliation.
|
||||
if !matches!(self.splitting, SplitState::Idle) {
|
||||
tracing::info!("Refusing to reconcile, splitting in progress");
|
||||
return None;
|
||||
}
|
||||
|
||||
// Reconcile already in flight for the current sequence?
|
||||
if let Some(handle) = &self.reconciler {
|
||||
if handle.sequence == self.sequence {
|
||||
@@ -460,7 +480,12 @@ impl TenantState {
|
||||
// doing our sequence's work.
|
||||
let old_handle = self.reconciler.take();
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
let Ok(gate_guard) = gate.enter() else {
|
||||
// Shutting down, don't start a reconciler
|
||||
return None;
|
||||
};
|
||||
|
||||
let reconciler_cancel = cancel.child_token();
|
||||
let mut reconciler = Reconciler {
|
||||
tenant_shard_id: self.tenant_shard_id,
|
||||
shard: self.shard,
|
||||
@@ -471,59 +496,66 @@ impl TenantState {
|
||||
pageservers: pageservers.clone(),
|
||||
compute_hook: compute_hook.clone(),
|
||||
service_config: service_config.clone(),
|
||||
cancel: cancel.clone(),
|
||||
_gate_guard: gate_guard,
|
||||
cancel: reconciler_cancel.clone(),
|
||||
persistence: persistence.clone(),
|
||||
compute_notify_failure: false,
|
||||
};
|
||||
|
||||
let reconcile_seq = self.sequence;
|
||||
|
||||
tracing::info!("Spawning Reconciler for sequence {}", self.sequence);
|
||||
tracing::info!(seq=%reconcile_seq, "Spawning Reconciler for sequence {}", self.sequence);
|
||||
let must_notify = self.pending_compute_notification;
|
||||
let join_handle = tokio::task::spawn(async move {
|
||||
// Wait for any previous reconcile task to complete before we start
|
||||
if let Some(old_handle) = old_handle {
|
||||
old_handle.cancel.cancel();
|
||||
if let Err(e) = old_handle.handle.await {
|
||||
// We can't do much with this other than log it: the task is done, so
|
||||
// we may proceed with our work.
|
||||
tracing::error!("Unexpected join error waiting for reconcile task: {e}");
|
||||
let reconciler_span = tracing::info_span!(parent: None, "reconciler", seq=%reconcile_seq,
|
||||
tenant_id=%reconciler.tenant_shard_id.tenant_id,
|
||||
shard_id=%reconciler.tenant_shard_id.shard_slug());
|
||||
let join_handle = tokio::task::spawn(
|
||||
async move {
|
||||
// Wait for any previous reconcile task to complete before we start
|
||||
if let Some(old_handle) = old_handle {
|
||||
old_handle.cancel.cancel();
|
||||
if let Err(e) = old_handle.handle.await {
|
||||
// We can't do much with this other than log it: the task is done, so
|
||||
// we may proceed with our work.
|
||||
tracing::error!("Unexpected join error waiting for reconcile task: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
// Early check for cancellation before doing any work
|
||||
// TODO: wrap all remote API operations in cancellation check
|
||||
// as well.
|
||||
if reconciler.cancel.is_cancelled() {
|
||||
return;
|
||||
}
|
||||
|
||||
// Attempt to make observed state match intent state
|
||||
let result = reconciler.reconcile().await;
|
||||
|
||||
// If we know we had a pending compute notification from some previous action, send a notification irrespective
|
||||
// of whether the above reconcile() did any work
|
||||
if result.is_ok() && must_notify {
|
||||
// If this fails we will send the need to retry in [`ReconcileResult::pending_compute_notification`]
|
||||
reconciler.compute_notify().await.ok();
|
||||
}
|
||||
|
||||
result_tx
|
||||
.send(ReconcileResult {
|
||||
sequence: reconcile_seq,
|
||||
result,
|
||||
tenant_shard_id: reconciler.tenant_shard_id,
|
||||
generation: reconciler.generation,
|
||||
observed: reconciler.observed,
|
||||
pending_compute_notification: reconciler.compute_notify_failure,
|
||||
})
|
||||
.ok();
|
||||
}
|
||||
|
||||
// Early check for cancellation before doing any work
|
||||
// TODO: wrap all remote API operations in cancellation check
|
||||
// as well.
|
||||
if reconciler.cancel.is_cancelled() {
|
||||
return;
|
||||
}
|
||||
|
||||
// Attempt to make observed state match intent state
|
||||
let result = reconciler.reconcile().await;
|
||||
|
||||
// If we know we had a pending compute notification from some previous action, send a notification irrespective
|
||||
// of whether the above reconcile() did any work
|
||||
if result.is_ok() && must_notify {
|
||||
// If this fails we will send the need to retry in [`ReconcileResult::pending_compute_notification`]
|
||||
reconciler.compute_notify().await.ok();
|
||||
}
|
||||
|
||||
result_tx
|
||||
.send(ReconcileResult {
|
||||
sequence: reconcile_seq,
|
||||
result,
|
||||
tenant_shard_id: reconciler.tenant_shard_id,
|
||||
generation: reconciler.generation,
|
||||
observed: reconciler.observed,
|
||||
pending_compute_notification: reconciler.compute_notify_failure,
|
||||
})
|
||||
.ok();
|
||||
});
|
||||
.instrument(reconciler_span),
|
||||
);
|
||||
|
||||
self.reconciler = Some(ReconcilerHandle {
|
||||
sequence: self.sequence,
|
||||
handle: join_handle,
|
||||
cancel,
|
||||
cancel: reconciler_cancel,
|
||||
});
|
||||
|
||||
Some(ReconcilerWaiter {
|
||||
|
||||
@@ -450,7 +450,7 @@ async fn handle_tenant(
|
||||
new_tenant_id: TenantShardId::unsharded(tenant_id),
|
||||
generation: None,
|
||||
shard_parameters: ShardParameters {
|
||||
count: ShardCount(shard_count),
|
||||
count: ShardCount::new(shard_count),
|
||||
stripe_size: shard_stripe_size
|
||||
.map(ShardStripeSize)
|
||||
.unwrap_or(ShardParameters::DEFAULT_STRIPE_SIZE),
|
||||
|
||||
@@ -400,6 +400,11 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'lazy_slru_download' as bool")?,
|
||||
timeline_get_throttle: settings
|
||||
.remove("timeline_get_throttle")
|
||||
.map(serde_json::from_str)
|
||||
.transpose()
|
||||
.context("parse `timeline_get_throttle` from json")?,
|
||||
};
|
||||
if !settings.is_empty() {
|
||||
bail!("Unrecognized tenant settings: {settings:?}")
|
||||
@@ -505,6 +510,11 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'lazy_slru_download' as bool")?,
|
||||
timeline_get_throttle: settings
|
||||
.remove("timeline_get_throttle")
|
||||
.map(serde_json::from_str)
|
||||
.transpose()
|
||||
.context("parse `timeline_get_throttle` from json")?,
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -214,14 +214,14 @@ impl ShardParameters {
|
||||
pub const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8);
|
||||
|
||||
pub fn is_unsharded(&self) -> bool {
|
||||
self.count == ShardCount(0)
|
||||
self.count.is_unsharded()
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for ShardParameters {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
count: ShardCount(0),
|
||||
count: ShardCount::new(0),
|
||||
stripe_size: Self::DEFAULT_STRIPE_SIZE,
|
||||
}
|
||||
}
|
||||
@@ -283,6 +283,7 @@ pub struct TenantConfig {
|
||||
pub gc_feedback: Option<bool>,
|
||||
pub heatmap_period: Option<String>,
|
||||
pub lazy_slru_download: Option<bool>,
|
||||
pub timeline_get_throttle: Option<ThrottleConfig>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
@@ -309,6 +310,35 @@ pub struct EvictionPolicyLayerAccessThreshold {
|
||||
pub threshold: Duration,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
|
||||
pub struct ThrottleConfig {
|
||||
pub task_kinds: Vec<String>, // TaskKind
|
||||
pub initial: usize,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub refill_interval: Duration,
|
||||
pub refill_amount: NonZeroUsize,
|
||||
pub max: usize,
|
||||
pub fair: bool,
|
||||
}
|
||||
|
||||
impl ThrottleConfig {
|
||||
pub fn disabled() -> Self {
|
||||
Self {
|
||||
task_kinds: vec![], // effectively disables the throttle
|
||||
// other values don't matter with emtpy `task_kinds`.
|
||||
initial: 0,
|
||||
refill_interval: Duration::from_millis(1),
|
||||
refill_amount: NonZeroUsize::new(1).unwrap(),
|
||||
max: 1,
|
||||
fair: true,
|
||||
}
|
||||
}
|
||||
/// The requests per second allowed by the given config.
|
||||
pub fn steady_rps(&self) -> f64 {
|
||||
(self.refill_amount.get() as f64) / (self.refill_interval.as_secs_f64()) / 1e3
|
||||
}
|
||||
}
|
||||
|
||||
/// A flattened analog of a `pagesever::tenant::LocationMode`, which
|
||||
/// lists out all possible states (and the virtual "Detached" state)
|
||||
/// in a flat form rather than using rust-style enums.
|
||||
|
||||
@@ -13,10 +13,41 @@ use utils::id::TenantId;
|
||||
pub struct ShardNumber(pub u8);
|
||||
|
||||
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
|
||||
pub struct ShardCount(pub u8);
|
||||
pub struct ShardCount(u8);
|
||||
|
||||
impl ShardCount {
|
||||
pub const MAX: Self = Self(u8::MAX);
|
||||
|
||||
/// The internal value of a ShardCount may be zero, which means "1 shard, but use
|
||||
/// legacy format for TenantShardId that excludes the shard suffix", also known
|
||||
/// as `TenantShardId::unsharded`.
|
||||
///
|
||||
/// This method returns the actual number of shards, i.e. if our internal value is
|
||||
/// zero, we return 1 (unsharded tenants have 1 shard).
|
||||
pub fn count(&self) -> u8 {
|
||||
if self.0 > 0 {
|
||||
self.0
|
||||
} else {
|
||||
1
|
||||
}
|
||||
}
|
||||
|
||||
/// The literal internal value: this is **not** the number of shards in the
|
||||
/// tenant, as we have a special zero value for legacy unsharded tenants. Use
|
||||
/// [`Self::count`] if you want to know the cardinality of shards.
|
||||
pub fn literal(&self) -> u8 {
|
||||
self.0
|
||||
}
|
||||
|
||||
pub fn is_unsharded(&self) -> bool {
|
||||
self.0 == 0
|
||||
}
|
||||
|
||||
/// `v` may be zero, or the number of shards in the tenant. `v` is what
|
||||
/// [`Self::literal`] would return.
|
||||
pub fn new(val: u8) -> Self {
|
||||
Self(val)
|
||||
}
|
||||
}
|
||||
|
||||
impl ShardNumber {
|
||||
@@ -86,7 +117,7 @@ impl TenantShardId {
|
||||
}
|
||||
|
||||
pub fn is_unsharded(&self) -> bool {
|
||||
self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0)
|
||||
self.shard_number == ShardNumber(0) && self.shard_count.is_unsharded()
|
||||
}
|
||||
|
||||
/// Convenience for dropping the tenant_id and just getting the ShardIndex: this
|
||||
@@ -471,10 +502,12 @@ impl ShardIdentity {
|
||||
pub fn is_key_disposable(&self, key: &Key) -> bool {
|
||||
if key_is_shard0(key) {
|
||||
// Q: Why can't we dispose of shard0 content if we're not shard 0?
|
||||
// A: because the WAL ingestion logic currently ingests some shard 0
|
||||
// content on all shards, even though it's only read on shard 0. If we
|
||||
// dropped it, then subsequent WAL ingest to these keys would encounter
|
||||
// an error.
|
||||
// A1: because the WAL ingestion logic currently ingests some shard 0
|
||||
// content on all shards, even though it's only read on shard 0. If we
|
||||
// dropped it, then subsequent WAL ingest to these keys would encounter
|
||||
// an error.
|
||||
// A2: because key_is_shard0 also covers relation size keys, which are written
|
||||
// on all shards even though they're only maintained accurately on shard 0.
|
||||
false
|
||||
} else {
|
||||
!self.is_key_local(key)
|
||||
|
||||
@@ -25,6 +25,7 @@ hyper = { workspace = true, features = ["full"] }
|
||||
fail.workspace = true
|
||||
futures = { workspace = true}
|
||||
jsonwebtoken.workspace = true
|
||||
leaky-bucket.workspace = true
|
||||
nix.workspace = true
|
||||
once_cell.workspace = true
|
||||
pin-project-lite.workspace = true
|
||||
|
||||
@@ -29,6 +29,9 @@ pub enum Scope {
|
||||
// Should only be used e.g. for status check.
|
||||
// Currently also used for connection from any pageserver to any safekeeper.
|
||||
SafekeeperData,
|
||||
// The scope used by pageservers in upcalls to storage controller and cloud control plane
|
||||
#[serde(rename = "generations_api")]
|
||||
GenerationsApi,
|
||||
}
|
||||
|
||||
/// JWT payload. See docs/authentication.md for the format
|
||||
|
||||
@@ -12,6 +12,7 @@ testing = ["fail/failpoints"]
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
arc-swap.workspace = true
|
||||
async-compression.workspace = true
|
||||
async-stream.workspace = true
|
||||
async-trait.workspace = true
|
||||
@@ -35,6 +36,7 @@ humantime.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
hyper.workspace = true
|
||||
itertools.workspace = true
|
||||
leaky-bucket.workspace = true
|
||||
md5.workspace = true
|
||||
nix.workspace = true
|
||||
# hack to get the number of worker threads tokio uses
|
||||
@@ -82,7 +84,7 @@ workspace_hack.workspace = true
|
||||
reqwest.workspace = true
|
||||
rpds.workspace = true
|
||||
enum-map.workspace = true
|
||||
enumset.workspace = true
|
||||
enumset = { workspace = true, features = ["serde"]}
|
||||
strum.workspace = true
|
||||
strum_macros.workspace = true
|
||||
|
||||
|
||||
@@ -6,14 +6,28 @@
|
||||
//! There are two sets of inputs; `short` and `medium`. They were collected on postgres v14 by
|
||||
//! logging what happens when a sequential scan is requested on a small table, then picking out two
|
||||
//! suitable from logs.
|
||||
//!
|
||||
//!
|
||||
//! Reference data (git blame to see commit) on an i3en.3xlarge
|
||||
// ```text
|
||||
//! short/short/1 time: [39.175 µs 39.348 µs 39.536 µs]
|
||||
//! short/short/2 time: [51.227 µs 51.487 µs 51.755 µs]
|
||||
//! short/short/4 time: [76.048 µs 76.362 µs 76.674 µs]
|
||||
//! short/short/8 time: [128.94 µs 129.82 µs 130.74 µs]
|
||||
//! short/short/16 time: [227.84 µs 229.00 µs 230.28 µs]
|
||||
//! short/short/32 time: [455.97 µs 457.81 µs 459.90 µs]
|
||||
//! short/short/64 time: [902.46 µs 904.84 µs 907.32 µs]
|
||||
//! short/short/128 time: [1.7416 ms 1.7487 ms 1.7561 ms]
|
||||
//! ``
|
||||
|
||||
use std::sync::{Arc, Barrier};
|
||||
use std::sync::Arc;
|
||||
|
||||
use bytes::{Buf, Bytes};
|
||||
use pageserver::{
|
||||
config::PageServerConf, repository::Key, walrecord::NeonWalRecord, walredo::PostgresRedoManager,
|
||||
};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use tokio::task::JoinSet;
|
||||
use utils::{id::TenantId, lsn::Lsn};
|
||||
|
||||
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
|
||||
@@ -39,11 +53,11 @@ fn redo_scenarios(c: &mut Criterion) {
|
||||
.build()
|
||||
.unwrap();
|
||||
tracing::info!("executing first");
|
||||
short().execute(rt.handle(), &manager).unwrap();
|
||||
rt.block_on(short().execute(&manager)).unwrap();
|
||||
tracing::info!("first executed");
|
||||
}
|
||||
|
||||
let thread_counts = [1, 2, 4, 8, 16];
|
||||
let thread_counts = [1, 2, 4, 8, 16, 32, 64, 128];
|
||||
|
||||
let mut group = c.benchmark_group("short");
|
||||
group.sampling_mode(criterion::SamplingMode::Flat);
|
||||
@@ -74,114 +88,69 @@ fn redo_scenarios(c: &mut Criterion) {
|
||||
drop(group);
|
||||
}
|
||||
|
||||
/// Sets up `threads` number of requesters to `request_redo`, with the given input.
|
||||
/// Sets up a multi-threaded tokio runtime with default worker thread count,
|
||||
/// then, spawn `requesters` tasks that repeatedly:
|
||||
/// - get input from `input_factor()`
|
||||
/// - call `manager.request_redo()` with their input
|
||||
///
|
||||
/// This stress-tests the scalability of a single walredo manager at high tokio-level concurrency.
|
||||
///
|
||||
/// Using tokio's default worker thread count means the results will differ on machines
|
||||
/// with different core countrs. We don't care about that, the performance will always
|
||||
/// be different on different hardware. To compare performance of different software versions,
|
||||
/// use the same hardware.
|
||||
fn add_multithreaded_walredo_requesters(
|
||||
b: &mut criterion::Bencher,
|
||||
threads: u32,
|
||||
nrequesters: usize,
|
||||
manager: &Arc<PostgresRedoManager>,
|
||||
input_factory: fn() -> Request,
|
||||
) {
|
||||
assert_ne!(threads, 0);
|
||||
assert_ne!(nrequesters, 0);
|
||||
|
||||
if threads == 1 {
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
let handle = rt.handle();
|
||||
b.iter_batched_ref(
|
||||
|| Some(input_factory()),
|
||||
|input| execute_all(input.take(), handle, manager),
|
||||
criterion::BatchSize::PerIteration,
|
||||
);
|
||||
} else {
|
||||
let (work_tx, work_rx) = std::sync::mpsc::sync_channel(threads as usize);
|
||||
let rt = tokio::runtime::Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let work_rx = std::sync::Arc::new(std::sync::Mutex::new(work_rx));
|
||||
let barrier = Arc::new(tokio::sync::Barrier::new(nrequesters + 1));
|
||||
|
||||
let barrier = Arc::new(Barrier::new(threads as usize + 1));
|
||||
|
||||
let jhs = (0..threads)
|
||||
.map(|_| {
|
||||
std::thread::spawn({
|
||||
let manager = manager.clone();
|
||||
let barrier = barrier.clone();
|
||||
let work_rx = work_rx.clone();
|
||||
move || {
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
let handle = rt.handle();
|
||||
loop {
|
||||
// queue up and wait if we want to go another round
|
||||
if work_rx.lock().unwrap().recv().is_err() {
|
||||
break;
|
||||
}
|
||||
|
||||
let input = Some(input_factory());
|
||||
|
||||
barrier.wait();
|
||||
|
||||
execute_all(input, handle, &manager).unwrap();
|
||||
|
||||
barrier.wait();
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let _jhs = JoinOnDrop(jhs);
|
||||
|
||||
b.iter_batched(
|
||||
|| {
|
||||
for _ in 0..threads {
|
||||
work_tx.send(()).unwrap()
|
||||
}
|
||||
},
|
||||
|()| {
|
||||
// start the work
|
||||
barrier.wait();
|
||||
|
||||
// wait for work to complete
|
||||
barrier.wait();
|
||||
},
|
||||
criterion::BatchSize::PerIteration,
|
||||
);
|
||||
|
||||
drop(work_tx);
|
||||
let mut requesters = JoinSet::new();
|
||||
for _ in 0..nrequesters {
|
||||
let _entered = rt.enter();
|
||||
let manager = manager.clone();
|
||||
let barrier = barrier.clone();
|
||||
requesters.spawn(async move {
|
||||
loop {
|
||||
let input = input_factory();
|
||||
barrier.wait().await;
|
||||
let page = input.execute(&manager).await.unwrap();
|
||||
assert_eq!(page.remaining(), 8192);
|
||||
barrier.wait().await;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
struct JoinOnDrop(Vec<std::thread::JoinHandle<()>>);
|
||||
let do_one_iteration = || {
|
||||
rt.block_on(async {
|
||||
barrier.wait().await;
|
||||
// wait for work to complete
|
||||
barrier.wait().await;
|
||||
})
|
||||
};
|
||||
|
||||
impl Drop for JoinOnDrop {
|
||||
// it's not really needless because we want join all then check for panicks
|
||||
#[allow(clippy::needless_collect)]
|
||||
fn drop(&mut self) {
|
||||
// first join all
|
||||
let results = self.0.drain(..).map(|jh| jh.join()).collect::<Vec<_>>();
|
||||
// then check the results; panicking here is not great, but it does get the message across
|
||||
// to the user, and sets an exit value.
|
||||
results.into_iter().try_for_each(|res| res).unwrap();
|
||||
}
|
||||
}
|
||||
b.iter_batched(
|
||||
|| {
|
||||
// warmup
|
||||
do_one_iteration();
|
||||
},
|
||||
|()| {
|
||||
// work loop
|
||||
do_one_iteration();
|
||||
},
|
||||
criterion::BatchSize::PerIteration,
|
||||
);
|
||||
|
||||
fn execute_all<I>(
|
||||
input: I,
|
||||
handle: &tokio::runtime::Handle,
|
||||
manager: &PostgresRedoManager,
|
||||
) -> anyhow::Result<()>
|
||||
where
|
||||
I: IntoIterator<Item = Request>,
|
||||
{
|
||||
// just fire all requests as fast as possible
|
||||
input.into_iter().try_for_each(|req| {
|
||||
let page = req.execute(handle, manager)?;
|
||||
assert_eq!(page.remaining(), 8192);
|
||||
anyhow::Ok(())
|
||||
})
|
||||
rt.block_on(requesters.shutdown());
|
||||
}
|
||||
|
||||
criterion_group!(benches, redo_scenarios);
|
||||
@@ -493,11 +462,7 @@ struct Request {
|
||||
}
|
||||
|
||||
impl Request {
|
||||
fn execute(
|
||||
self,
|
||||
rt: &tokio::runtime::Handle,
|
||||
manager: &PostgresRedoManager,
|
||||
) -> anyhow::Result<Bytes> {
|
||||
async fn execute(self, manager: &PostgresRedoManager) -> anyhow::Result<Bytes> {
|
||||
let Request {
|
||||
key,
|
||||
lsn,
|
||||
@@ -506,6 +471,8 @@ impl Request {
|
||||
pg_version,
|
||||
} = self;
|
||||
|
||||
rt.block_on(manager.request_redo(key, lsn, base_img, records, pg_version))
|
||||
manager
|
||||
.request_redo(key, lsn, base_img, records, pg_version)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use anyhow::Context;
|
||||
use camino::Utf8PathBuf;
|
||||
use futures::future::join_all;
|
||||
use pageserver_api::key::{is_rel_block_key, key_to_rel_block, Key};
|
||||
use pageserver_api::keyspace::KeySpaceAccum;
|
||||
use pageserver_api::models::PagestreamGetPageRequest;
|
||||
@@ -10,11 +9,10 @@ use utils::id::TenantTimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use rand::prelude::*;
|
||||
use tokio::sync::Barrier;
|
||||
use tokio::task::JoinSet;
|
||||
use tracing::{info, instrument};
|
||||
use tracing::info;
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::collections::HashSet;
|
||||
use std::future::Future;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::pin::Pin;
|
||||
@@ -38,8 +36,12 @@ pub(crate) struct Args {
|
||||
num_clients: NonZeroUsize,
|
||||
#[clap(long)]
|
||||
runtime: Option<humantime::Duration>,
|
||||
/// Each client sends requests at the given rate.
|
||||
///
|
||||
/// If a request takes too long and we should be issuing a new request already,
|
||||
/// we skip that request and account it as `MISSED`.
|
||||
#[clap(long)]
|
||||
per_target_rate_limit: Option<usize>,
|
||||
per_client_rate: Option<usize>,
|
||||
/// Probability for sending `latest=true` in the request (uniform distribution).
|
||||
#[clap(long, default_value = "1")]
|
||||
req_latest_probability: f64,
|
||||
@@ -61,12 +63,16 @@ pub(crate) struct Args {
|
||||
#[derive(Debug, Default)]
|
||||
struct LiveStats {
|
||||
completed_requests: AtomicU64,
|
||||
missed: AtomicU64,
|
||||
}
|
||||
|
||||
impl LiveStats {
|
||||
fn inc(&self) {
|
||||
fn request_done(&self) {
|
||||
self.completed_requests.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
fn missed(&self, n: u64) {
|
||||
self.missed.fetch_add(n, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, serde::Serialize, serde::Deserialize)]
|
||||
@@ -220,13 +226,12 @@ async fn main_impl(
|
||||
|
||||
let live_stats = Arc::new(LiveStats::default());
|
||||
|
||||
let num_client_tasks = args.num_clients.get() * timelines.len();
|
||||
let num_live_stats_dump = 1;
|
||||
let num_work_sender_tasks = 1;
|
||||
let num_work_sender_tasks = args.num_clients.get() * timelines.len();
|
||||
let num_main_impl = 1;
|
||||
|
||||
let start_work_barrier = Arc::new(tokio::sync::Barrier::new(
|
||||
num_client_tasks + num_live_stats_dump + num_work_sender_tasks + num_main_impl,
|
||||
num_live_stats_dump + num_work_sender_tasks + num_main_impl,
|
||||
));
|
||||
|
||||
tokio::spawn({
|
||||
@@ -238,10 +243,12 @@ async fn main_impl(
|
||||
let start = std::time::Instant::now();
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
|
||||
let missed = stats.missed.swap(0, Ordering::Relaxed);
|
||||
let elapsed = start.elapsed();
|
||||
info!(
|
||||
"RPS: {:.0}",
|
||||
completed_requests as f64 / elapsed.as_secs_f64()
|
||||
"RPS: {:.0} MISSED: {:.0}",
|
||||
completed_requests as f64 / elapsed.as_secs_f64(),
|
||||
missed as f64 / elapsed.as_secs_f64()
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -249,127 +256,105 @@ async fn main_impl(
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let mut work_senders: HashMap<WorkerId, _> = HashMap::new();
|
||||
let mut tasks = Vec::new();
|
||||
let rps_period = args
|
||||
.per_client_rate
|
||||
.map(|rps_limit| Duration::from_secs_f64(1.0 / (rps_limit as f64)));
|
||||
let make_worker: &dyn Fn(WorkerId) -> Pin<Box<dyn Send + Future<Output = ()>>> = &|worker_id| {
|
||||
let live_stats = live_stats.clone();
|
||||
let start_work_barrier = start_work_barrier.clone();
|
||||
let ranges: Vec<KeyRange> = all_ranges
|
||||
.iter()
|
||||
.filter(|r| r.timeline == worker_id.timeline)
|
||||
.cloned()
|
||||
.collect();
|
||||
let weights =
|
||||
rand::distributions::weighted::WeightedIndex::new(ranges.iter().map(|v| v.len()))
|
||||
.unwrap();
|
||||
|
||||
let cancel = cancel.clone();
|
||||
Box::pin(async move {
|
||||
let client =
|
||||
pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
let mut client = client
|
||||
.pagestream(worker_id.timeline.tenant_id, worker_id.timeline.timeline_id)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
start_work_barrier.wait().await;
|
||||
let client_start = Instant::now();
|
||||
let mut ticks_processed = 0;
|
||||
while !cancel.is_cancelled() {
|
||||
// Detect if a request took longer than the RPS rate
|
||||
if let Some(period) = &rps_period {
|
||||
let periods_passed_until_now =
|
||||
usize::try_from(client_start.elapsed().as_micros() / period.as_micros())
|
||||
.unwrap();
|
||||
|
||||
if periods_passed_until_now > ticks_processed {
|
||||
live_stats.missed((periods_passed_until_now - ticks_processed) as u64);
|
||||
}
|
||||
ticks_processed = periods_passed_until_now;
|
||||
}
|
||||
|
||||
let start = Instant::now();
|
||||
let req = {
|
||||
let mut rng = rand::thread_rng();
|
||||
let r = &ranges[weights.sample(&mut rng)];
|
||||
let key: i128 = rng.gen_range(r.start..r.end);
|
||||
let key = Key::from_i128(key);
|
||||
assert!(is_rel_block_key(&key));
|
||||
let (rel_tag, block_no) =
|
||||
key_to_rel_block(key).expect("we filter non-rel-block keys out above");
|
||||
PagestreamGetPageRequest {
|
||||
latest: rng.gen_bool(args.req_latest_probability),
|
||||
lsn: r.timeline_lsn,
|
||||
rel: rel_tag,
|
||||
blkno: block_no,
|
||||
}
|
||||
};
|
||||
client.getpage(req).await.unwrap();
|
||||
let end = Instant::now();
|
||||
live_stats.request_done();
|
||||
ticks_processed += 1;
|
||||
STATS.with(|stats| {
|
||||
stats
|
||||
.borrow()
|
||||
.lock()
|
||||
.unwrap()
|
||||
.observe(end.duration_since(start))
|
||||
.unwrap();
|
||||
});
|
||||
|
||||
if let Some(period) = &rps_period {
|
||||
let next_at = client_start
|
||||
+ Duration::from_micros(
|
||||
(ticks_processed) as u64 * u64::try_from(period.as_micros()).unwrap(),
|
||||
);
|
||||
tokio::time::sleep_until(next_at.into()).await;
|
||||
}
|
||||
}
|
||||
})
|
||||
};
|
||||
|
||||
info!("spawning workers");
|
||||
let mut workers = JoinSet::new();
|
||||
for timeline in timelines.iter().cloned() {
|
||||
for num_client in 0..args.num_clients.get() {
|
||||
let (sender, receiver) = tokio::sync::mpsc::channel(10); // TODO: not sure what the implications of this are
|
||||
let worker_id = WorkerId {
|
||||
timeline,
|
||||
num_client,
|
||||
};
|
||||
work_senders.insert(worker_id, sender);
|
||||
tasks.push(tokio::spawn(client(
|
||||
args,
|
||||
worker_id,
|
||||
Arc::clone(&start_work_barrier),
|
||||
receiver,
|
||||
Arc::clone(&live_stats),
|
||||
cancel.clone(),
|
||||
)));
|
||||
workers.spawn(make_worker(worker_id));
|
||||
}
|
||||
}
|
||||
|
||||
let work_sender: Pin<Box<dyn Send + Future<Output = ()>>> = {
|
||||
let start_work_barrier = start_work_barrier.clone();
|
||||
let cancel = cancel.clone();
|
||||
match args.per_target_rate_limit {
|
||||
None => Box::pin(async move {
|
||||
let weights = rand::distributions::weighted::WeightedIndex::new(
|
||||
all_ranges.iter().map(|v| v.len()),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
start_work_barrier.wait().await;
|
||||
|
||||
while !cancel.is_cancelled() {
|
||||
let (timeline, req) = {
|
||||
let mut rng = rand::thread_rng();
|
||||
let r = &all_ranges[weights.sample(&mut rng)];
|
||||
let key: i128 = rng.gen_range(r.start..r.end);
|
||||
let key = Key::from_i128(key);
|
||||
let (rel_tag, block_no) =
|
||||
key_to_rel_block(key).expect("we filter non-rel-block keys out above");
|
||||
(
|
||||
WorkerId {
|
||||
timeline: r.timeline,
|
||||
num_client: rng.gen_range(0..args.num_clients.get()),
|
||||
},
|
||||
PagestreamGetPageRequest {
|
||||
latest: rng.gen_bool(args.req_latest_probability),
|
||||
lsn: r.timeline_lsn,
|
||||
rel: rel_tag,
|
||||
blkno: block_no,
|
||||
},
|
||||
)
|
||||
};
|
||||
let sender = work_senders.get(&timeline).unwrap();
|
||||
// TODO: what if this blocks?
|
||||
if sender.send(req).await.is_err() {
|
||||
assert!(cancel.is_cancelled(), "client has gone away unexpectedly");
|
||||
}
|
||||
}
|
||||
}),
|
||||
Some(rps_limit) => Box::pin(async move {
|
||||
let period = Duration::from_secs_f64(1.0 / (rps_limit as f64));
|
||||
let make_task: &dyn Fn(WorkerId) -> Pin<Box<dyn Send + Future<Output = ()>>> =
|
||||
&|worker_id| {
|
||||
let sender = work_senders.get(&worker_id).unwrap();
|
||||
let ranges: Vec<KeyRange> = all_ranges
|
||||
.iter()
|
||||
.filter(|r| r.timeline == worker_id.timeline)
|
||||
.cloned()
|
||||
.collect();
|
||||
let weights = rand::distributions::weighted::WeightedIndex::new(
|
||||
ranges.iter().map(|v| v.len()),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let cancel = cancel.clone();
|
||||
Box::pin(async move {
|
||||
let mut ticker = tokio::time::interval(period);
|
||||
ticker.set_missed_tick_behavior(
|
||||
/* TODO review this choice */
|
||||
tokio::time::MissedTickBehavior::Burst,
|
||||
);
|
||||
while !cancel.is_cancelled() {
|
||||
ticker.tick().await;
|
||||
let req = {
|
||||
let mut rng = rand::thread_rng();
|
||||
let r = &ranges[weights.sample(&mut rng)];
|
||||
let key: i128 = rng.gen_range(r.start..r.end);
|
||||
let key = Key::from_i128(key);
|
||||
assert!(is_rel_block_key(&key));
|
||||
let (rel_tag, block_no) = key_to_rel_block(key)
|
||||
.expect("we filter non-rel-block keys out above");
|
||||
PagestreamGetPageRequest {
|
||||
latest: rng.gen_bool(args.req_latest_probability),
|
||||
lsn: r.timeline_lsn,
|
||||
rel: rel_tag,
|
||||
blkno: block_no,
|
||||
}
|
||||
};
|
||||
if sender.send(req).await.is_err() {
|
||||
assert!(
|
||||
cancel.is_cancelled(),
|
||||
"client has gone away unexpectedly"
|
||||
);
|
||||
}
|
||||
}
|
||||
})
|
||||
};
|
||||
|
||||
let tasks: Vec<_> = work_senders.keys().map(|tl| make_task(*tl)).collect();
|
||||
|
||||
start_work_barrier.wait().await;
|
||||
|
||||
join_all(tasks).await;
|
||||
}),
|
||||
let workers = async move {
|
||||
while let Some(res) = workers.join_next().await {
|
||||
res.unwrap();
|
||||
}
|
||||
};
|
||||
|
||||
let work_sender_task = tokio::spawn(work_sender);
|
||||
|
||||
info!("waiting for everything to become ready");
|
||||
start_work_barrier.wait().await;
|
||||
info!("work started");
|
||||
@@ -377,20 +362,13 @@ async fn main_impl(
|
||||
tokio::time::sleep(runtime.into()).await;
|
||||
info!("runtime over, signalling cancellation");
|
||||
cancel.cancel();
|
||||
work_sender_task.await.unwrap();
|
||||
workers.await;
|
||||
info!("work sender exited");
|
||||
} else {
|
||||
work_sender_task.await.unwrap();
|
||||
workers.await;
|
||||
unreachable!("work sender never terminates");
|
||||
}
|
||||
|
||||
info!("joining clients");
|
||||
for t in tasks {
|
||||
t.await.unwrap();
|
||||
}
|
||||
|
||||
info!("all clients stopped");
|
||||
|
||||
let output = Output {
|
||||
total: {
|
||||
let mut agg_stats = request_stats::Stats::new();
|
||||
@@ -407,49 +385,3 @@ async fn main_impl(
|
||||
|
||||
anyhow::Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn client(
|
||||
args: &'static Args,
|
||||
id: WorkerId,
|
||||
start_work_barrier: Arc<Barrier>,
|
||||
mut work: tokio::sync::mpsc::Receiver<PagestreamGetPageRequest>,
|
||||
live_stats: Arc<LiveStats>,
|
||||
cancel: CancellationToken,
|
||||
) {
|
||||
let WorkerId {
|
||||
timeline,
|
||||
num_client: _,
|
||||
} = id;
|
||||
let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
let mut client = client
|
||||
.pagestream(timeline.tenant_id, timeline.timeline_id)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let do_requests = async {
|
||||
start_work_barrier.wait().await;
|
||||
while let Some(req) = work.recv().await {
|
||||
let start = Instant::now();
|
||||
client
|
||||
.getpage(req)
|
||||
.await
|
||||
.with_context(|| format!("getpage for {timeline}"))
|
||||
.unwrap();
|
||||
let elapsed = start.elapsed();
|
||||
live_stats.inc();
|
||||
STATS.with(|stats| {
|
||||
stats.borrow().lock().unwrap().observe(elapsed).unwrap();
|
||||
});
|
||||
}
|
||||
};
|
||||
tokio::select! {
|
||||
res = do_requests => { res },
|
||||
_ = cancel.cancelled() => {
|
||||
// fallthrough to shutdown
|
||||
}
|
||||
}
|
||||
client.shutdown().await;
|
||||
}
|
||||
|
||||
@@ -14,8 +14,12 @@ pub fn check_permission(claims: &Claims, tenant_id: Option<TenantId>) -> Result<
|
||||
}
|
||||
(Scope::PageServerApi, None) => Ok(()), // access to management api for PageServerApi scope
|
||||
(Scope::PageServerApi, Some(_)) => Ok(()), // access to tenant api using PageServerApi scope
|
||||
(Scope::SafekeeperData, _) => Err(AuthError(
|
||||
"SafekeeperData scope makes no sense for Pageserver".into(),
|
||||
(Scope::SafekeeperData | Scope::GenerationsApi, _) => Err(AuthError(
|
||||
format!(
|
||||
"JWT scope '{:?}' is ineligible for Pageserver auth",
|
||||
claims.scope
|
||||
)
|
||||
.into(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,8 +13,9 @@
|
||||
use anyhow::{anyhow, bail, ensure, Context};
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use fail::fail_point;
|
||||
|
||||
use pageserver_api::key::{key_to_slru_block, Key};
|
||||
use postgres_ffi::pg_constants;
|
||||
|
||||
use std::fmt::Write as FmtWrite;
|
||||
use std::time::SystemTime;
|
||||
use tokio::io;
|
||||
@@ -273,7 +274,7 @@ where
|
||||
slru_builder.finish().await?;
|
||||
}
|
||||
|
||||
let mut min_restart_lsn: Lsn = Lsn::MAX;
|
||||
let min_restart_lsn: Lsn = Lsn::MAX;
|
||||
// Create tablespace directories
|
||||
for ((spcnode, dbnode), has_relmap_file) in
|
||||
self.timeline.list_dbdirs(self.lsn, self.ctx).await?
|
||||
@@ -308,21 +309,24 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
for (path, content) in self.timeline.list_aux_files(self.lsn, self.ctx).await? {
|
||||
if path.starts_with("pg_replslot") {
|
||||
let offs = pg_constants::REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN;
|
||||
let restart_lsn = Lsn(u64::from_le_bytes(
|
||||
content[offs..offs + 8].try_into().unwrap(),
|
||||
));
|
||||
info!("Replication slot {} restart LSN={}", path, restart_lsn);
|
||||
min_restart_lsn = Lsn::min(min_restart_lsn, restart_lsn);
|
||||
}
|
||||
let header = new_tar_header(&path, content.len() as u64)?;
|
||||
self.ar
|
||||
.append(&header, &*content)
|
||||
.await
|
||||
.context("could not add aux file to basebackup tarball")?;
|
||||
}
|
||||
// one-off hack: disable listing aux files
|
||||
// {
|
||||
// for (path, content) in self.timeline.list_aux_files(self.lsn, self.ctx).await? {
|
||||
// if path.starts_with("pg_replslot") {
|
||||
// let offs = pg_constants::REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN;
|
||||
// let restart_lsn = Lsn(u64::from_le_bytes(
|
||||
// content[offs..offs + 8].try_into().unwrap(),
|
||||
// ));
|
||||
// info!("Replication slot {} restart LSN={}", path, restart_lsn);
|
||||
// min_restart_lsn = Lsn::min(min_restart_lsn, restart_lsn);
|
||||
// }
|
||||
// let header = new_tar_header(&path, content.len() as u64)?;
|
||||
// self.ar
|
||||
// .append(&header, &*content)
|
||||
// .await
|
||||
// .context("could not add aux file to basebackup tarball")?;
|
||||
// }
|
||||
// }
|
||||
}
|
||||
if min_restart_lsn != Lsn::MAX {
|
||||
info!(
|
||||
|
||||
@@ -83,12 +83,12 @@ use utils::{
|
||||
// This is not functionally necessary (clients will retry), but avoids generating a lot of
|
||||
// failed API calls while tenants are activating.
|
||||
#[cfg(not(feature = "testing"))]
|
||||
const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(5000);
|
||||
pub(crate) const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(5000);
|
||||
|
||||
// Tests run on slow/oversubscribed nodes, and may need to wait much longer for tenants to
|
||||
// finish attaching, if calls to remote storage are slow.
|
||||
#[cfg(feature = "testing")]
|
||||
const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
|
||||
pub(crate) const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
|
||||
|
||||
pub struct State {
|
||||
conf: &'static PageServerConf,
|
||||
@@ -571,10 +571,16 @@ async fn timeline_list_handler(
|
||||
parse_query_param(&request, "force-await-initial-logical-size")?;
|
||||
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||
|
||||
let state = get_state(&request);
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
|
||||
let response_data = async {
|
||||
let tenant = mgr::get_tenant(tenant_shard_id, true)?;
|
||||
let tenant = state
|
||||
.tenant_manager
|
||||
.get_attached_tenant_shard(tenant_shard_id, false)?;
|
||||
|
||||
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
|
||||
|
||||
let timelines = tenant.list_timelines();
|
||||
|
||||
let mut response_data = Vec::with_capacity(timelines.len());
|
||||
@@ -1136,7 +1142,7 @@ async fn tenant_shard_split_handler(
|
||||
|
||||
let new_shards = state
|
||||
.tenant_manager
|
||||
.shard_split(tenant_shard_id, ShardCount(req.new_shard_count), &ctx)
|
||||
.shard_split(tenant_shard_id, ShardCount::new(req.new_shard_count), &ctx)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
|
||||
@@ -2496,6 +2496,56 @@ pub mod tokio_epoll_uring {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) mod tenant_throttling {
|
||||
use metrics::{register_int_counter_vec, IntCounter};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
use crate::tenant::{self, throttle::Metric};
|
||||
|
||||
pub(crate) struct TimelineGet {
|
||||
wait_time: IntCounter,
|
||||
count: IntCounter,
|
||||
}
|
||||
|
||||
pub(crate) static TIMELINE_GET: Lazy<TimelineGet> = Lazy::new(|| {
|
||||
static WAIT_USECS: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_tenant_throttling_wait_usecs_sum_global",
|
||||
"Sum of microseconds that tenants spent waiting for a tenant throttle of a given kind.",
|
||||
&["kind"]
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
static WAIT_COUNT: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_tenant_throttling_count_global",
|
||||
"Count of tenant throttlings, by kind of throttle.",
|
||||
&["kind"]
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
let kind = "timeline_get";
|
||||
TimelineGet {
|
||||
wait_time: WAIT_USECS.with_label_values(&[kind]),
|
||||
count: WAIT_COUNT.with_label_values(&[kind]),
|
||||
}
|
||||
});
|
||||
|
||||
impl Metric for &'static TimelineGet {
|
||||
#[inline(always)]
|
||||
fn observe_throttling(
|
||||
&self,
|
||||
tenant::throttle::Observation { wait_time }: &tenant::throttle::Observation,
|
||||
) {
|
||||
let val = u64::try_from(wait_time.as_micros()).unwrap();
|
||||
self.wait_time.inc_by(val);
|
||||
self.count.inc();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn preinitialize_metrics() {
|
||||
// Python tests need these and on some we do alerting.
|
||||
//
|
||||
@@ -2557,4 +2607,5 @@ pub fn preinitialize_metrics() {
|
||||
|
||||
// Custom
|
||||
Lazy::force(&RECONSTRUCT_TIME);
|
||||
Lazy::force(&tenant_throttling::TIMELINE_GET);
|
||||
}
|
||||
|
||||
@@ -26,7 +26,7 @@ use pageserver_api::models::{
|
||||
PagestreamNblocksResponse,
|
||||
};
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
use pageserver_api::shard::{ShardCount, ShardNumber};
|
||||
use pageserver_api::shard::ShardNumber;
|
||||
use postgres_backend::{self, is_expected_io_error, AuthType, PostgresBackend, QueryError};
|
||||
use pq_proto::framed::ConnectionError;
|
||||
use pq_proto::FeStartupPacket;
|
||||
@@ -998,7 +998,7 @@ impl PageServerHandler {
|
||||
) -> Result<&Arc<Timeline>, Key> {
|
||||
let key = if let Some((first_idx, first_timeline)) = self.shard_timelines.iter().next() {
|
||||
// Fastest path: single sharded case
|
||||
if first_idx.shard_count < ShardCount(2) {
|
||||
if first_idx.shard_count.count() == 1 {
|
||||
return Ok(&first_timeline.timeline);
|
||||
}
|
||||
|
||||
|
||||
@@ -671,6 +671,7 @@ impl Timeline {
|
||||
self.get(CHECKPOINT_KEY, lsn, ctx).await
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) async fn list_aux_files(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
@@ -1389,81 +1390,96 @@ impl<'a> DatadirModification<'a> {
|
||||
|
||||
pub async fn put_file(
|
||||
&mut self,
|
||||
path: &str,
|
||||
content: &[u8],
|
||||
ctx: &RequestContext,
|
||||
_path: &str,
|
||||
_content: &[u8],
|
||||
_ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let file_path = path.to_string();
|
||||
let content = if content.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(Bytes::copy_from_slice(content))
|
||||
// one-off hack: zero aux files on any ingest
|
||||
let dir = AuxFilesDirectory {
|
||||
files: HashMap::new(),
|
||||
};
|
||||
|
||||
let dir = if let Some(mut dir) = self.pending_aux_files.take() {
|
||||
// We already updated aux files in `self`: emit a delta and update our latest value
|
||||
|
||||
self.put(
|
||||
AUX_FILES_KEY,
|
||||
Value::WalRecord(NeonWalRecord::AuxFile {
|
||||
file_path: file_path.clone(),
|
||||
content: content.clone(),
|
||||
}),
|
||||
);
|
||||
|
||||
dir.upsert(file_path, content);
|
||||
dir
|
||||
} else {
|
||||
// Check if the AUX_FILES_KEY is initialized
|
||||
match self.get(AUX_FILES_KEY, ctx).await {
|
||||
Ok(dir_bytes) => {
|
||||
let mut dir = AuxFilesDirectory::des(&dir_bytes)?;
|
||||
// Key is already set, we may append a delta
|
||||
self.put(
|
||||
AUX_FILES_KEY,
|
||||
Value::WalRecord(NeonWalRecord::AuxFile {
|
||||
file_path: file_path.clone(),
|
||||
content: content.clone(),
|
||||
}),
|
||||
);
|
||||
dir.upsert(file_path, content);
|
||||
dir
|
||||
}
|
||||
Err(
|
||||
e @ (PageReconstructError::AncestorStopping(_)
|
||||
| PageReconstructError::Cancelled
|
||||
| PageReconstructError::AncestorLsnTimeout(_)),
|
||||
) => {
|
||||
// Important that we do not interpret a shutdown error as "not found" and thereby
|
||||
// reset the map.
|
||||
return Err(e.into());
|
||||
}
|
||||
// FIXME: PageReconstructError doesn't have an explicit variant for key-not-found, so
|
||||
// we are assuming that all _other_ possible errors represents a missing key. If some
|
||||
// other error occurs, we may incorrectly reset the map of aux files.
|
||||
Err(PageReconstructError::Other(_) | PageReconstructError::WalRedo(_)) => {
|
||||
// Key is missing, we must insert an image as the basis for subsequent deltas.
|
||||
|
||||
let mut dir = AuxFilesDirectory {
|
||||
files: HashMap::new(),
|
||||
};
|
||||
dir.upsert(file_path, content);
|
||||
self.put(
|
||||
AUX_FILES_KEY,
|
||||
Value::Image(Bytes::from(
|
||||
AuxFilesDirectory::ser(&dir).context("serialize")?,
|
||||
)),
|
||||
);
|
||||
dir
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
self.put(
|
||||
AUX_FILES_KEY,
|
||||
Value::Image(Bytes::from(
|
||||
AuxFilesDirectory::ser(&dir).context("serialize")?,
|
||||
)),
|
||||
);
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::AuxFiles, dir.files.len()));
|
||||
self.pending_aux_files = Some(dir);
|
||||
|
||||
Ok(())
|
||||
|
||||
// let file_path = path.to_string();
|
||||
// let content = if content.is_empty() {
|
||||
// None
|
||||
// } else {
|
||||
// Some(Bytes::copy_from_slice(content))
|
||||
// };
|
||||
|
||||
// let dir = if let Some(mut dir) = self.pending_aux_files.take() {
|
||||
// // We already updated aux files in `self`: emit a delta and update our latest value
|
||||
|
||||
// self.put(
|
||||
// AUX_FILES_KEY,
|
||||
// Value::WalRecord(NeonWalRecord::AuxFile {
|
||||
// file_path: file_path.clone(),
|
||||
// content: content.clone(),
|
||||
// }),
|
||||
// );
|
||||
|
||||
// dir.upsert(file_path, content);
|
||||
// dir
|
||||
// } else {
|
||||
// // Check if the AUX_FILES_KEY is initialized
|
||||
// match self.get(AUX_FILES_KEY, ctx).await {
|
||||
// Ok(dir_bytes) => {
|
||||
// let mut dir = AuxFilesDirectory::des(&dir_bytes)?;
|
||||
// // Key is already set, we may append a delta
|
||||
// self.put(
|
||||
// AUX_FILES_KEY,
|
||||
// Value::WalRecord(NeonWalRecord::AuxFile {
|
||||
// file_path: file_path.clone(),
|
||||
// content: content.clone(),
|
||||
// }),
|
||||
// );
|
||||
// dir.upsert(file_path, content);
|
||||
// dir
|
||||
// }
|
||||
// Err(
|
||||
// e @ (PageReconstructError::AncestorStopping(_)
|
||||
// | PageReconstructError::Cancelled
|
||||
// | PageReconstructError::AncestorLsnTimeout(_)),
|
||||
// ) => {
|
||||
// // Important that we do not interpret a shutdown error as "not found" and thereby
|
||||
// // reset the map.
|
||||
// return Err(e.into());
|
||||
// }
|
||||
// // FIXME: PageReconstructError doesn't have an explicit variant for key-not-found, so
|
||||
// // we are assuming that all _other_ possible errors represents a missing key. If some
|
||||
// // other error occurs, we may incorrectly reset the map of aux files.
|
||||
// Err(PageReconstructError::Other(_) | PageReconstructError::WalRedo(_)) => {
|
||||
// // Key is missing, we must insert an image as the basis for subsequent deltas.
|
||||
|
||||
// let mut dir = AuxFilesDirectory {
|
||||
// files: HashMap::new(),
|
||||
// };
|
||||
// dir.upsert(file_path, content);
|
||||
// self.put(
|
||||
// AUX_FILES_KEY,
|
||||
// Value::Image(Bytes::from(
|
||||
// AuxFilesDirectory::ser(&dir).context("serialize")?,
|
||||
// )),
|
||||
// );
|
||||
// dir
|
||||
// }
|
||||
// }
|
||||
// };
|
||||
|
||||
// self.pending_directory_entries
|
||||
// .push((DirectoryKind::AuxFiles, dir.files.len()));
|
||||
// self.pending_aux_files = Some(dir);
|
||||
|
||||
// Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
|
||||
@@ -188,6 +188,7 @@ task_local! {
|
||||
serde::Serialize,
|
||||
serde::Deserialize,
|
||||
strum_macros::IntoStaticStr,
|
||||
strum_macros::EnumString,
|
||||
)]
|
||||
pub enum TaskKind {
|
||||
// Pageserver startup, i.e., `main`
|
||||
|
||||
@@ -49,7 +49,6 @@ use self::config::AttachmentMode;
|
||||
use self::config::LocationConf;
|
||||
use self::config::TenantConf;
|
||||
use self::delete::DeleteTenantFlow;
|
||||
use self::metadata::LoadMetadataError;
|
||||
use self::metadata::TimelineMetadata;
|
||||
use self::mgr::GetActiveTenantError;
|
||||
use self::mgr::GetTenantError;
|
||||
@@ -77,7 +76,6 @@ use crate::task_mgr;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::config::LocationMode;
|
||||
use crate::tenant::config::TenantConfOpt;
|
||||
use crate::tenant::metadata::load_metadata;
|
||||
pub use crate::tenant::remote_timeline_client::index::IndexPart;
|
||||
use crate::tenant::remote_timeline_client::remote_initdb_archive_path;
|
||||
use crate::tenant::remote_timeline_client::MaybeDeletedIndexPart;
|
||||
@@ -94,7 +92,6 @@ use std::fmt::Debug;
|
||||
use std::fmt::Display;
|
||||
use std::fs;
|
||||
use std::fs::File;
|
||||
use std::io;
|
||||
use std::ops::Bound::Included;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::atomic::Ordering;
|
||||
@@ -170,6 +167,8 @@ pub(crate) mod timeline;
|
||||
|
||||
pub mod size;
|
||||
|
||||
pub(crate) mod throttle;
|
||||
|
||||
pub(crate) use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
pub(crate) use timeline::{LogicalSizeCalculationCause, PageReconstructError, Timeline};
|
||||
|
||||
@@ -308,6 +307,11 @@ pub struct Tenant {
|
||||
// Users of the Tenant such as the page service must take this Gate to avoid
|
||||
// trying to use a Tenant which is shutting down.
|
||||
pub(crate) gate: Gate,
|
||||
|
||||
/// Throttle applied at the top of [`Timeline::get`].
|
||||
/// All [`Tenant::timelines`] of a given [`Tenant`] instance share the same [`throttle::Throttle`] instance.
|
||||
pub(crate) timeline_get_throttle:
|
||||
Arc<throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for Tenant {
|
||||
@@ -488,11 +492,6 @@ impl From<std::io::Error> for InitdbError {
|
||||
}
|
||||
}
|
||||
|
||||
struct TenantDirectoryScan {
|
||||
sorted_timelines_to_load: Vec<(TimelineId, TimelineMetadata)>,
|
||||
timelines_to_resume_deletion: Vec<(TimelineId, Option<TimelineMetadata>)>,
|
||||
}
|
||||
|
||||
enum CreateTimelineCause {
|
||||
Load,
|
||||
Delete,
|
||||
@@ -928,9 +927,7 @@ impl Tenant {
|
||||
timelines: HashMap::new(),
|
||||
},
|
||||
(None, SpawnMode::Normal) => {
|
||||
// Deprecated dev mode: load from local disk state instead of remote storage
|
||||
// https://github.com/neondatabase/neon/issues/5624
|
||||
return self.load_local(ctx).await;
|
||||
anyhow::bail!("local-only deployment is no longer supported, https://github.com/neondatabase/neon/issues/5624");
|
||||
}
|
||||
};
|
||||
|
||||
@@ -1000,6 +997,7 @@ impl Tenant {
|
||||
TimelineResources {
|
||||
remote_client: Some(remote_client),
|
||||
deletion_queue_client: self.deletion_queue_client.clone(),
|
||||
timeline_get_throttle: self.timeline_get_throttle.clone(),
|
||||
},
|
||||
ctx,
|
||||
)
|
||||
@@ -1198,149 +1196,6 @@ impl Tenant {
|
||||
))
|
||||
}
|
||||
|
||||
fn scan_and_sort_timelines_dir(self: Arc<Tenant>) -> anyhow::Result<TenantDirectoryScan> {
|
||||
let mut timelines_to_load: HashMap<TimelineId, TimelineMetadata> = HashMap::new();
|
||||
// Note timelines_to_resume_deletion needs to be separate because it can be not sortable
|
||||
// from the point of `tree_sort_timelines`. I e some parents can be missing because deletion
|
||||
// completed in non topological order (for example because parent has smaller number of layer files in it)
|
||||
let mut timelines_to_resume_deletion: Vec<(TimelineId, Option<TimelineMetadata>)> = vec![];
|
||||
|
||||
let timelines_dir = self.conf.timelines_path(&self.tenant_shard_id);
|
||||
|
||||
for entry in timelines_dir
|
||||
.read_dir_utf8()
|
||||
.context("list timelines directory for tenant")?
|
||||
{
|
||||
let entry = entry.context("read timeline dir entry")?;
|
||||
let timeline_dir = entry.path();
|
||||
|
||||
if crate::is_temporary(timeline_dir) {
|
||||
info!("Found temporary timeline directory, removing: {timeline_dir}");
|
||||
if let Err(e) = std::fs::remove_dir_all(timeline_dir) {
|
||||
error!("Failed to remove temporary directory '{timeline_dir}': {e:?}");
|
||||
}
|
||||
} else if is_uninit_mark(timeline_dir) {
|
||||
if !timeline_dir.exists() {
|
||||
warn!("Timeline dir entry become invalid: {timeline_dir}");
|
||||
continue;
|
||||
}
|
||||
|
||||
let timeline_uninit_mark_file = &timeline_dir;
|
||||
info!(
|
||||
"Found an uninit mark file {timeline_uninit_mark_file}, removing the timeline and its uninit mark",
|
||||
);
|
||||
let timeline_id =
|
||||
TimelineId::try_from(timeline_uninit_mark_file.file_stem())
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Could not parse timeline id out of the timeline uninit mark name {timeline_uninit_mark_file}",
|
||||
)
|
||||
})?;
|
||||
let timeline_dir = self.conf.timeline_path(&self.tenant_shard_id, &timeline_id);
|
||||
if let Err(e) =
|
||||
remove_timeline_and_uninit_mark(&timeline_dir, timeline_uninit_mark_file)
|
||||
{
|
||||
error!("Failed to clean up uninit marked timeline: {e:?}");
|
||||
}
|
||||
} else if crate::is_delete_mark(timeline_dir) {
|
||||
// If metadata exists, load as usual, continue deletion
|
||||
let timeline_id = TimelineId::try_from(timeline_dir.file_stem())
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Could not parse timeline id out of the timeline uninit mark name {timeline_dir}",
|
||||
)
|
||||
})?;
|
||||
|
||||
info!("Found deletion mark for timeline {}", timeline_id);
|
||||
|
||||
match load_metadata(self.conf, &self.tenant_shard_id, &timeline_id) {
|
||||
Ok(metadata) => {
|
||||
timelines_to_resume_deletion.push((timeline_id, Some(metadata)))
|
||||
}
|
||||
Err(e) => match &e {
|
||||
LoadMetadataError::Read(r) => {
|
||||
if r.kind() != io::ErrorKind::NotFound {
|
||||
return Err(anyhow::anyhow!(e)).with_context(|| {
|
||||
format!("Failed to load metadata for timeline_id {timeline_id}")
|
||||
});
|
||||
}
|
||||
|
||||
// If metadata doesnt exist it means that we've crashed without
|
||||
// completing cleanup_remaining_timeline_fs_traces in DeleteTimelineFlow.
|
||||
// So save timeline_id for later call to `DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces`.
|
||||
// We cant do it here because the method is async so we'd need block_on
|
||||
// and here we're in spawn_blocking. cleanup_remaining_timeline_fs_traces uses fs operations
|
||||
// so that basically results in a cycle:
|
||||
// spawn_blocking
|
||||
// - block_on
|
||||
// - spawn_blocking
|
||||
// which can lead to running out of threads in blocing pool.
|
||||
timelines_to_resume_deletion.push((timeline_id, None));
|
||||
}
|
||||
_ => {
|
||||
return Err(anyhow::anyhow!(e)).with_context(|| {
|
||||
format!("Failed to load metadata for timeline_id {timeline_id}")
|
||||
})
|
||||
}
|
||||
},
|
||||
}
|
||||
} else {
|
||||
if !timeline_dir.exists() {
|
||||
warn!("Timeline dir entry become invalid: {timeline_dir}");
|
||||
continue;
|
||||
}
|
||||
let timeline_id = TimelineId::try_from(timeline_dir.file_name())
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Could not parse timeline id out of the timeline dir name {timeline_dir}",
|
||||
)
|
||||
})?;
|
||||
let timeline_uninit_mark_file = self
|
||||
.conf
|
||||
.timeline_uninit_mark_file_path(self.tenant_shard_id, timeline_id);
|
||||
if timeline_uninit_mark_file.exists() {
|
||||
info!(
|
||||
%timeline_id,
|
||||
"Found an uninit mark file, removing the timeline and its uninit mark",
|
||||
);
|
||||
if let Err(e) =
|
||||
remove_timeline_and_uninit_mark(timeline_dir, &timeline_uninit_mark_file)
|
||||
{
|
||||
error!("Failed to clean up uninit marked timeline: {e:?}");
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
let timeline_delete_mark_file = self
|
||||
.conf
|
||||
.timeline_delete_mark_file_path(self.tenant_shard_id, timeline_id);
|
||||
if timeline_delete_mark_file.exists() {
|
||||
// Cleanup should be done in `is_delete_mark` branch above
|
||||
continue;
|
||||
}
|
||||
|
||||
let file_name = entry.file_name();
|
||||
if let Ok(timeline_id) = file_name.parse::<TimelineId>() {
|
||||
let metadata = load_metadata(self.conf, &self.tenant_shard_id, &timeline_id)
|
||||
.context("failed to load metadata")?;
|
||||
timelines_to_load.insert(timeline_id, metadata);
|
||||
} else {
|
||||
// A file or directory that doesn't look like a timeline ID
|
||||
warn!("unexpected file or directory in timelines directory: {file_name}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Sort the array of timeline IDs into tree-order, so that parent comes before
|
||||
// all its children.
|
||||
tree_sort_timelines(timelines_to_load, |m| m.ancestor_timeline()).map(|sorted_timelines| {
|
||||
TenantDirectoryScan {
|
||||
sorted_timelines_to_load: sorted_timelines,
|
||||
timelines_to_resume_deletion,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async fn load_timeline_metadata(
|
||||
self: &Arc<Tenant>,
|
||||
timeline_ids: HashSet<TimelineId>,
|
||||
@@ -1404,141 +1259,6 @@ impl Tenant {
|
||||
Ok(timeline_preloads)
|
||||
}
|
||||
|
||||
///
|
||||
/// Background task to load in-memory data structures for this tenant, from
|
||||
/// files on disk. Used at pageserver startup.
|
||||
///
|
||||
/// No background tasks are started as part of this routine.
|
||||
async fn load_local(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
span::debug_assert_current_span_has_tenant_id();
|
||||
|
||||
debug!("loading tenant task");
|
||||
|
||||
// Load in-memory state to reflect the local files on disk
|
||||
//
|
||||
// Scan the directory, peek into the metadata file of each timeline, and
|
||||
// collect a list of timelines and their ancestors.
|
||||
let span = info_span!("blocking");
|
||||
let cloned = Arc::clone(self);
|
||||
|
||||
let scan = tokio::task::spawn_blocking(move || {
|
||||
let _g = span.entered();
|
||||
cloned.scan_and_sort_timelines_dir()
|
||||
})
|
||||
.await
|
||||
.context("load spawn_blocking")
|
||||
.and_then(|res| res)?;
|
||||
|
||||
// FIXME original collect_timeline_files contained one more check:
|
||||
// 1. "Timeline has no ancestor and no layer files"
|
||||
|
||||
// Process loadable timelines first
|
||||
for (timeline_id, local_metadata) in scan.sorted_timelines_to_load {
|
||||
if let Err(e) = self
|
||||
.load_local_timeline(timeline_id, local_metadata, ctx, false)
|
||||
.await
|
||||
{
|
||||
match e {
|
||||
LoadLocalTimelineError::Load(source) => {
|
||||
return Err(anyhow::anyhow!(source)).with_context(|| {
|
||||
format!("Failed to load local timeline: {timeline_id}")
|
||||
})
|
||||
}
|
||||
LoadLocalTimelineError::ResumeDeletion(source) => {
|
||||
// Make sure resumed deletion wont fail loading for entire tenant.
|
||||
error!("Failed to resume timeline deletion: {source:#}")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Resume deletion ones with deleted_mark
|
||||
for (timeline_id, maybe_local_metadata) in scan.timelines_to_resume_deletion {
|
||||
match maybe_local_metadata {
|
||||
None => {
|
||||
// See comment in `scan_and_sort_timelines_dir`.
|
||||
if let Err(e) =
|
||||
DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces(self, timeline_id)
|
||||
.await
|
||||
{
|
||||
warn!(
|
||||
"cannot clean up deleted timeline dir timeline_id: {} error: {:#}",
|
||||
timeline_id, e
|
||||
);
|
||||
}
|
||||
}
|
||||
Some(local_metadata) => {
|
||||
if let Err(e) = self
|
||||
.load_local_timeline(timeline_id, local_metadata, ctx, true)
|
||||
.await
|
||||
{
|
||||
match e {
|
||||
LoadLocalTimelineError::Load(source) => {
|
||||
// We tried to load deleted timeline, this is a bug.
|
||||
return Err(anyhow::anyhow!(source).context(
|
||||
format!("This is a bug. We tried to load deleted timeline which is wrong and loading failed. Timeline: {timeline_id}")
|
||||
));
|
||||
}
|
||||
LoadLocalTimelineError::ResumeDeletion(source) => {
|
||||
// Make sure resumed deletion wont fail loading for entire tenant.
|
||||
error!("Failed to resume timeline deletion: {source:#}")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
trace!("Done");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Subroutine of `load_tenant`, to load an individual timeline
|
||||
///
|
||||
/// NB: The parent is assumed to be already loaded!
|
||||
#[instrument(skip(self, local_metadata, ctx))]
|
||||
async fn load_local_timeline(
|
||||
self: &Arc<Self>,
|
||||
timeline_id: TimelineId,
|
||||
local_metadata: TimelineMetadata,
|
||||
ctx: &RequestContext,
|
||||
found_delete_mark: bool,
|
||||
) -> Result<(), LoadLocalTimelineError> {
|
||||
span::debug_assert_current_span_has_tenant_id();
|
||||
|
||||
let resources = self.build_timeline_resources(timeline_id);
|
||||
|
||||
if found_delete_mark {
|
||||
// There is no remote client, we found local metadata.
|
||||
// Continue cleaning up local disk.
|
||||
DeleteTimelineFlow::resume_deletion(
|
||||
Arc::clone(self),
|
||||
timeline_id,
|
||||
&local_metadata,
|
||||
None,
|
||||
self.deletion_queue_client.clone(),
|
||||
)
|
||||
.await
|
||||
.context("resume deletion")
|
||||
.map_err(LoadLocalTimelineError::ResumeDeletion)?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let ancestor = if let Some(ancestor_timeline_id) = local_metadata.ancestor_timeline() {
|
||||
let ancestor_timeline = self.get_timeline(ancestor_timeline_id, false)
|
||||
.with_context(|| anyhow::anyhow!("cannot find ancestor timeline {ancestor_timeline_id} for timeline {timeline_id}"))
|
||||
.map_err(LoadLocalTimelineError::Load)?;
|
||||
Some(ancestor_timeline)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
self.timeline_init_and_sync(timeline_id, resources, None, local_metadata, ancestor, ctx)
|
||||
.await
|
||||
.map_err(LoadLocalTimelineError::Load)
|
||||
}
|
||||
|
||||
pub(crate) fn tenant_shard_id(&self) -> TenantShardId {
|
||||
self.tenant_shard_id
|
||||
}
|
||||
@@ -2363,14 +2083,14 @@ impl Tenant {
|
||||
};
|
||||
|
||||
// We have a pageserver TenantConf, we need the API-facing TenantConfig.
|
||||
let tenant_config: models::TenantConfig = conf.tenant_conf.into();
|
||||
let tenant_config: models::TenantConfig = conf.tenant_conf.clone().into();
|
||||
|
||||
models::LocationConfig {
|
||||
mode: location_config_mode,
|
||||
generation: self.generation.into(),
|
||||
secondary_conf: None,
|
||||
shard_number: self.shard_identity.number.0,
|
||||
shard_count: self.shard_identity.count.0,
|
||||
shard_count: self.shard_identity.count.literal(),
|
||||
shard_stripe_size: self.shard_identity.stripe_size.0,
|
||||
tenant_conf: tenant_config,
|
||||
}
|
||||
@@ -2497,93 +2217,93 @@ where
|
||||
|
||||
impl Tenant {
|
||||
pub fn tenant_specific_overrides(&self) -> TenantConfOpt {
|
||||
self.tenant_conf.read().unwrap().tenant_conf
|
||||
self.tenant_conf.read().unwrap().tenant_conf.clone()
|
||||
}
|
||||
|
||||
pub fn effective_config(&self) -> TenantConf {
|
||||
self.tenant_specific_overrides()
|
||||
.merge(self.conf.default_tenant_conf)
|
||||
.merge(self.conf.default_tenant_conf.clone())
|
||||
}
|
||||
|
||||
pub fn get_checkpoint_distance(&self) -> u64 {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.checkpoint_distance
|
||||
.unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
|
||||
}
|
||||
|
||||
pub fn get_checkpoint_timeout(&self) -> Duration {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.checkpoint_timeout
|
||||
.unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
|
||||
}
|
||||
|
||||
pub fn get_compaction_target_size(&self) -> u64 {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.compaction_target_size
|
||||
.unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
|
||||
}
|
||||
|
||||
pub fn get_compaction_period(&self) -> Duration {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.compaction_period
|
||||
.unwrap_or(self.conf.default_tenant_conf.compaction_period)
|
||||
}
|
||||
|
||||
pub fn get_compaction_threshold(&self) -> usize {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.compaction_threshold
|
||||
.unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
|
||||
}
|
||||
|
||||
pub fn get_gc_horizon(&self) -> u64 {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.gc_horizon
|
||||
.unwrap_or(self.conf.default_tenant_conf.gc_horizon)
|
||||
}
|
||||
|
||||
pub fn get_gc_period(&self) -> Duration {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.gc_period
|
||||
.unwrap_or(self.conf.default_tenant_conf.gc_period)
|
||||
}
|
||||
|
||||
pub fn get_image_creation_threshold(&self) -> usize {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.image_creation_threshold
|
||||
.unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
|
||||
}
|
||||
|
||||
pub fn get_pitr_interval(&self) -> Duration {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.pitr_interval
|
||||
.unwrap_or(self.conf.default_tenant_conf.pitr_interval)
|
||||
}
|
||||
|
||||
pub fn get_trace_read_requests(&self) -> bool {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.trace_read_requests
|
||||
.unwrap_or(self.conf.default_tenant_conf.trace_read_requests)
|
||||
}
|
||||
|
||||
pub fn get_min_resident_size_override(&self) -> Option<u64> {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.min_resident_size_override
|
||||
.or(self.conf.default_tenant_conf.min_resident_size_override)
|
||||
}
|
||||
|
||||
pub fn get_heatmap_period(&self) -> Option<Duration> {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
let heatmap_period = tenant_conf
|
||||
.heatmap_period
|
||||
.unwrap_or(self.conf.default_tenant_conf.heatmap_period);
|
||||
@@ -2596,6 +2316,7 @@ impl Tenant {
|
||||
|
||||
pub fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
|
||||
self.tenant_conf.write().unwrap().tenant_conf = new_tenant_conf;
|
||||
self.tenant_conf_updated();
|
||||
// Don't hold self.timelines.lock() during the notifies.
|
||||
// There's no risk of deadlock right now, but there could be if we consolidate
|
||||
// mutexes in struct Timeline in the future.
|
||||
@@ -2607,6 +2328,7 @@ impl Tenant {
|
||||
|
||||
pub(crate) fn set_new_location_config(&self, new_conf: AttachedTenantConf) {
|
||||
*self.tenant_conf.write().unwrap() = new_conf;
|
||||
self.tenant_conf_updated();
|
||||
// Don't hold self.timelines.lock() during the notifies.
|
||||
// There's no risk of deadlock right now, but there could be if we consolidate
|
||||
// mutexes in struct Timeline in the future.
|
||||
@@ -2616,6 +2338,24 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
fn get_timeline_get_throttle_config(
|
||||
psconf: &'static PageServerConf,
|
||||
overrides: &TenantConfOpt,
|
||||
) -> throttle::Config {
|
||||
overrides
|
||||
.timeline_get_throttle
|
||||
.clone()
|
||||
.unwrap_or(psconf.default_tenant_conf.timeline_get_throttle.clone())
|
||||
}
|
||||
|
||||
pub(crate) fn tenant_conf_updated(&self) {
|
||||
let conf = {
|
||||
let guard = self.tenant_conf.read().unwrap();
|
||||
Self::get_timeline_get_throttle_config(self.conf, &guard.tenant_conf)
|
||||
};
|
||||
self.timeline_get_throttle.reconfigure(conf)
|
||||
}
|
||||
|
||||
/// Helper function to create a new Timeline struct.
|
||||
///
|
||||
/// The returned Timeline is in Loading state. The caller is responsible for
|
||||
@@ -2742,7 +2482,6 @@ impl Tenant {
|
||||
// using now here is good enough approximation to catch tenants with really long
|
||||
// activation times.
|
||||
constructed_at: Instant::now(),
|
||||
tenant_conf: Arc::new(RwLock::new(attached_conf)),
|
||||
timelines: Mutex::new(HashMap::new()),
|
||||
timelines_creating: Mutex::new(HashSet::new()),
|
||||
gc_cs: tokio::sync::Mutex::new(()),
|
||||
@@ -2757,6 +2496,11 @@ impl Tenant {
|
||||
delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())),
|
||||
cancel: CancellationToken::default(),
|
||||
gate: Gate::default(),
|
||||
timeline_get_throttle: Arc::new(throttle::Throttle::new(
|
||||
Tenant::get_timeline_get_throttle_config(conf, &attached_conf.tenant_conf),
|
||||
&crate::metrics::tenant_throttling::TIMELINE_GET,
|
||||
)),
|
||||
tenant_conf: Arc::new(RwLock::new(attached_conf)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3512,6 +3256,7 @@ impl Tenant {
|
||||
TimelineResources {
|
||||
remote_client,
|
||||
deletion_queue_client: self.deletion_queue_client.clone(),
|
||||
timeline_get_throttle: self.timeline_get_throttle.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3783,33 +3528,10 @@ impl Tenant {
|
||||
}
|
||||
|
||||
pub(crate) fn get_tenant_conf(&self) -> TenantConfOpt {
|
||||
self.tenant_conf.read().unwrap().tenant_conf
|
||||
self.tenant_conf.read().unwrap().tenant_conf.clone()
|
||||
}
|
||||
}
|
||||
|
||||
fn remove_timeline_and_uninit_mark(
|
||||
timeline_dir: &Utf8Path,
|
||||
uninit_mark: &Utf8Path,
|
||||
) -> anyhow::Result<()> {
|
||||
fs::remove_dir_all(timeline_dir)
|
||||
.or_else(|e| {
|
||||
if e.kind() == std::io::ErrorKind::NotFound {
|
||||
// we can leave the uninit mark without a timeline dir,
|
||||
// just remove the mark then
|
||||
Ok(())
|
||||
} else {
|
||||
Err(e)
|
||||
}
|
||||
})
|
||||
.with_context(|| {
|
||||
format!("Failed to remove unit marked timeline directory {timeline_dir}")
|
||||
})?;
|
||||
fs::remove_file(uninit_mark)
|
||||
.with_context(|| format!("Failed to remove timeline uninit mark file {uninit_mark}"))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Create the cluster temporarily in 'initdbpath' directory inside the repository
|
||||
/// to get bootstrap data for timeline initialization.
|
||||
async fn run_initdb(
|
||||
@@ -3965,17 +3687,11 @@ pub(crate) mod harness {
|
||||
gc_feedback: Some(tenant_conf.gc_feedback),
|
||||
heatmap_period: Some(tenant_conf.heatmap_period),
|
||||
lazy_slru_download: Some(tenant_conf.lazy_slru_download),
|
||||
timeline_get_throttle: Some(tenant_conf.timeline_get_throttle),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[derive(Debug)]
|
||||
enum LoadMode {
|
||||
Local,
|
||||
Remote,
|
||||
}
|
||||
|
||||
pub struct TenantHarness {
|
||||
pub conf: &'static PageServerConf,
|
||||
pub tenant_conf: TenantConf,
|
||||
@@ -4057,42 +3773,17 @@ pub(crate) mod harness {
|
||||
pub(crate) async fn load(&self) -> (Arc<Tenant>, RequestContext) {
|
||||
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
|
||||
(
|
||||
self.try_load(&ctx)
|
||||
self.do_try_load(&ctx)
|
||||
.await
|
||||
.expect("failed to load test tenant"),
|
||||
ctx,
|
||||
)
|
||||
}
|
||||
|
||||
/// For tests that specifically want to exercise the local load path, which does
|
||||
/// not use remote storage.
|
||||
pub(crate) async fn try_load_local(
|
||||
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
|
||||
pub(crate) async fn do_try_load(
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Tenant>> {
|
||||
self.do_try_load(ctx, LoadMode::Local).await
|
||||
}
|
||||
|
||||
/// The 'load' in this function is either a local load or a normal attachment,
|
||||
pub(crate) async fn try_load(&self, ctx: &RequestContext) -> anyhow::Result<Arc<Tenant>> {
|
||||
// If we have nothing in remote storage, must use load_local instead of attach: attach
|
||||
// will error out if there are no timelines.
|
||||
//
|
||||
// See https://github.com/neondatabase/neon/issues/5456 for how we will eliminate
|
||||
// this weird state of a Tenant which exists but doesn't have any timelines.
|
||||
let mode = match self.remote_empty() {
|
||||
true => LoadMode::Local,
|
||||
false => LoadMode::Remote,
|
||||
};
|
||||
|
||||
self.do_try_load(ctx, mode).await
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), ?mode))]
|
||||
async fn do_try_load(
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
mode: LoadMode,
|
||||
) -> anyhow::Result<Arc<Tenant>> {
|
||||
let walredo_mgr = Arc::new(WalRedoManager::from(TestRedoManager));
|
||||
|
||||
@@ -4100,7 +3791,7 @@ pub(crate) mod harness {
|
||||
TenantState::Loading,
|
||||
self.conf,
|
||||
AttachedTenantConf::try_from(LocationConf::attached_single(
|
||||
TenantConfOpt::from(self.tenant_conf),
|
||||
TenantConfOpt::from(self.tenant_conf.clone()),
|
||||
self.generation,
|
||||
&ShardParameters::default(),
|
||||
))
|
||||
@@ -4113,17 +3804,10 @@ pub(crate) mod harness {
|
||||
self.deletion_queue.new_client(),
|
||||
));
|
||||
|
||||
match mode {
|
||||
LoadMode::Local => {
|
||||
tenant.load_local(ctx).await?;
|
||||
}
|
||||
LoadMode::Remote => {
|
||||
let preload = tenant
|
||||
.preload(&self.remote_storage, CancellationToken::new())
|
||||
.await?;
|
||||
tenant.attach(Some(preload), SpawnMode::Normal, ctx).await?;
|
||||
}
|
||||
}
|
||||
let preload = tenant
|
||||
.preload(&self.remote_storage, CancellationToken::new())
|
||||
.await?;
|
||||
tenant.attach(Some(preload), SpawnMode::Normal, ctx).await?;
|
||||
|
||||
tenant.state.send_replace(TenantState::Active);
|
||||
for timeline in tenant.timelines.lock().unwrap().values() {
|
||||
@@ -4132,31 +3816,6 @@ pub(crate) mod harness {
|
||||
Ok(tenant)
|
||||
}
|
||||
|
||||
fn remote_empty(&self) -> bool {
|
||||
let tenant_path = self.conf.tenant_path(&self.tenant_shard_id);
|
||||
let remote_tenant_dir = self
|
||||
.remote_fs_dir
|
||||
.join(tenant_path.strip_prefix(&self.conf.workdir).unwrap());
|
||||
if std::fs::metadata(&remote_tenant_dir).is_err() {
|
||||
return true;
|
||||
}
|
||||
|
||||
match std::fs::read_dir(remote_tenant_dir)
|
||||
.unwrap()
|
||||
.flatten()
|
||||
.next()
|
||||
{
|
||||
Some(entry) => {
|
||||
tracing::debug!(
|
||||
"remote_empty: not empty, found file {}",
|
||||
entry.file_name().to_string_lossy(),
|
||||
);
|
||||
false
|
||||
}
|
||||
None => true,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn timeline_path(&self, timeline_id: &TimelineId) -> Utf8PathBuf {
|
||||
self.conf.timeline_path(&self.tenant_shard_id, timeline_id)
|
||||
}
|
||||
@@ -4215,7 +3874,6 @@ mod tests {
|
||||
use crate::repository::{Key, Value};
|
||||
use crate::tenant::harness::*;
|
||||
use crate::DEFAULT_PG_VERSION;
|
||||
use crate::METADATA_FILE_NAME;
|
||||
use bytes::BytesMut;
|
||||
use hex_literal::hex;
|
||||
use once_cell::sync::Lazy;
|
||||
@@ -4757,60 +4415,6 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn corrupt_local_metadata() -> anyhow::Result<()> {
|
||||
const TEST_NAME: &str = "corrupt_metadata";
|
||||
let harness = TenantHarness::create(TEST_NAME)?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
drop(tline);
|
||||
// so that all uploads finish & we can call harness.try_load() below again
|
||||
tenant
|
||||
.shutdown(Default::default(), true)
|
||||
.instrument(harness.span())
|
||||
.await
|
||||
.ok()
|
||||
.unwrap();
|
||||
drop(tenant);
|
||||
|
||||
// Corrupt local metadata
|
||||
let metadata_path = harness.timeline_path(&TIMELINE_ID).join(METADATA_FILE_NAME);
|
||||
assert!(metadata_path.is_file());
|
||||
let mut metadata_bytes = std::fs::read(&metadata_path)?;
|
||||
assert_eq!(metadata_bytes.len(), 512);
|
||||
metadata_bytes[8] ^= 1;
|
||||
std::fs::write(metadata_path, metadata_bytes)?;
|
||||
|
||||
let err = harness.try_load_local(&ctx).await.expect_err("should fail");
|
||||
// get all the stack with all .context, not only the last one
|
||||
let message = format!("{err:#}");
|
||||
let expected = "failed to load metadata";
|
||||
assert!(
|
||||
message.contains(expected),
|
||||
"message '{message}' expected to contain {expected}"
|
||||
);
|
||||
|
||||
let mut found_error_message = false;
|
||||
let mut err_source = err.source();
|
||||
while let Some(source) = err_source {
|
||||
if source.to_string().contains("metadata checksum mismatch") {
|
||||
found_error_message = true;
|
||||
break;
|
||||
}
|
||||
err_source = source.source();
|
||||
}
|
||||
assert!(
|
||||
found_error_message,
|
||||
"didn't find the corrupted metadata error in {}",
|
||||
message
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_images() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_images")?.load().await;
|
||||
|
||||
@@ -9,8 +9,8 @@
|
||||
//! may lead to a data loss.
|
||||
//!
|
||||
use anyhow::bail;
|
||||
use pageserver_api::models;
|
||||
use pageserver_api::models::EvictionPolicy;
|
||||
use pageserver_api::models::{self, ThrottleConfig};
|
||||
use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize};
|
||||
use serde::de::IntoDeserializer;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -251,7 +251,7 @@ impl LocationConf {
|
||||
} else {
|
||||
ShardIdentity::new(
|
||||
ShardNumber(conf.shard_number),
|
||||
ShardCount(conf.shard_count),
|
||||
ShardCount::new(conf.shard_count),
|
||||
ShardStripeSize(conf.shard_stripe_size),
|
||||
)?
|
||||
};
|
||||
@@ -285,7 +285,7 @@ impl Default for LocationConf {
|
||||
///
|
||||
/// For storing and transmitting individual tenant's configuration, see
|
||||
/// TenantConfOpt.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct TenantConf {
|
||||
// Flush out an inmemory layer, if it's holding WAL older than this
|
||||
// This puts a backstop on how much WAL needs to be re-digested if the
|
||||
@@ -348,11 +348,13 @@ pub struct TenantConf {
|
||||
|
||||
/// If true then SLRU segments are dowloaded on demand, if false SLRU segments are included in basebackup
|
||||
pub lazy_slru_download: bool,
|
||||
|
||||
pub timeline_get_throttle: pageserver_api::models::ThrottleConfig,
|
||||
}
|
||||
|
||||
/// Same as TenantConf, but this struct preserves the information about
|
||||
/// which parameters are set and which are not.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
pub struct TenantConfOpt {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
@@ -437,6 +439,9 @@ pub struct TenantConfOpt {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
pub lazy_slru_download: Option<bool>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub timeline_get_throttle: Option<pageserver_api::models::ThrottleConfig>,
|
||||
}
|
||||
|
||||
impl TenantConfOpt {
|
||||
@@ -485,6 +490,10 @@ impl TenantConfOpt {
|
||||
lazy_slru_download: self
|
||||
.lazy_slru_download
|
||||
.unwrap_or(global_conf.lazy_slru_download),
|
||||
timeline_get_throttle: self
|
||||
.timeline_get_throttle
|
||||
.clone()
|
||||
.unwrap_or(global_conf.timeline_get_throttle),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -524,6 +533,7 @@ impl Default for TenantConf {
|
||||
gc_feedback: false,
|
||||
heatmap_period: Duration::ZERO,
|
||||
lazy_slru_download: false,
|
||||
timeline_get_throttle: crate::tenant::throttle::Config::disabled(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -596,6 +606,7 @@ impl From<TenantConfOpt> for models::TenantConfig {
|
||||
gc_feedback: value.gc_feedback,
|
||||
heatmap_period: value.heatmap_period.map(humantime),
|
||||
lazy_slru_download: value.lazy_slru_download,
|
||||
timeline_get_throttle: value.timeline_get_throttle.map(ThrottleConfig::from),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -294,17 +294,6 @@ pub enum LoadMetadataError {
|
||||
Decode(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
pub fn load_metadata(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: &TimelineId,
|
||||
) -> Result<TimelineMetadata, LoadMetadataError> {
|
||||
let metadata_path = conf.metadata_path(tenant_shard_id, timeline_id);
|
||||
let metadata_bytes = std::fs::read(metadata_path)?;
|
||||
|
||||
Ok(TimelineMetadata::from_bytes(&metadata_bytes)?)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -32,6 +32,7 @@ use crate::control_plane_client::{
|
||||
ControlPlaneClient, ControlPlaneGenerationsApi, RetryForeverError,
|
||||
};
|
||||
use crate::deletion_queue::DeletionQueueClient;
|
||||
use crate::http::routes::ACTIVE_TENANT_TIMEOUT;
|
||||
use crate::metrics::{TENANT, TENANT_MANAGER as METRICS};
|
||||
use crate::task_mgr::{self, TaskKind};
|
||||
use crate::tenant::config::{
|
||||
@@ -484,7 +485,7 @@ pub async fn init_tenant_mgr(
|
||||
TenantSlot::Secondary(SecondaryTenant::new(
|
||||
tenant_shard_id,
|
||||
location_conf.shard,
|
||||
location_conf.tenant_conf,
|
||||
location_conf.tenant_conf.clone(),
|
||||
&SecondaryLocationConfig { warm: false },
|
||||
)),
|
||||
);
|
||||
@@ -794,7 +795,7 @@ pub(crate) async fn set_new_tenant_config(
|
||||
info!("configuring tenant {tenant_id}");
|
||||
let tenant = get_tenant(tenant_shard_id, true)?;
|
||||
|
||||
if tenant.tenant_shard_id().shard_count > ShardCount(0) {
|
||||
if !tenant.tenant_shard_id().shard_count.is_unsharded() {
|
||||
// Note that we use ShardParameters::default below.
|
||||
return Err(SetNewTenantConfigError::Other(anyhow::anyhow!(
|
||||
"This API may only be used on single-sharded tenants, use the /location_config API for sharded tenants"
|
||||
@@ -805,7 +806,7 @@ pub(crate) async fn set_new_tenant_config(
|
||||
// API to use is the location_config/ endpoint, which lets the caller provide
|
||||
// the full LocationConf.
|
||||
let location_conf = LocationConf::attached_single(
|
||||
new_tenant_conf,
|
||||
new_tenant_conf.clone(),
|
||||
tenant.generation,
|
||||
&ShardParameters::default(),
|
||||
);
|
||||
@@ -1376,7 +1377,7 @@ impl TenantManager {
|
||||
result
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), new_shard_count=%new_shard_count.0))]
|
||||
#[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), new_shard_count=%new_shard_count.literal()))]
|
||||
pub(crate) async fn shard_split(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
@@ -1386,11 +1387,10 @@ impl TenantManager {
|
||||
let tenant = get_tenant(tenant_shard_id, true)?;
|
||||
|
||||
// Plan: identify what the new child shards will be
|
||||
let effective_old_shard_count = std::cmp::max(tenant_shard_id.shard_count.0, 1);
|
||||
if new_shard_count <= ShardCount(effective_old_shard_count) {
|
||||
if new_shard_count.count() <= tenant_shard_id.shard_count.count() {
|
||||
anyhow::bail!("Requested shard count is not an increase");
|
||||
}
|
||||
let expansion_factor = new_shard_count.0 / effective_old_shard_count;
|
||||
let expansion_factor = new_shard_count.count() / tenant_shard_id.shard_count.count();
|
||||
if !expansion_factor.is_power_of_two() {
|
||||
anyhow::bail!("Requested split is not a power of two");
|
||||
}
|
||||
@@ -1467,7 +1467,7 @@ impl TenantManager {
|
||||
attach_mode: AttachmentMode::Single,
|
||||
}),
|
||||
shard: child_shard_identity,
|
||||
tenant_conf: parent_tenant_conf,
|
||||
tenant_conf: parent_tenant_conf.clone(),
|
||||
};
|
||||
|
||||
self.upsert_location(
|
||||
@@ -1490,6 +1490,16 @@ impl TenantManager {
|
||||
peek_slot.and_then(|s| s.get_attached()).cloned()
|
||||
};
|
||||
if let Some(t) = child_shard {
|
||||
// Wait for the child shard to become active: this should be very quick because it only
|
||||
// has to download the index_part that we just uploaded when creating it.
|
||||
if let Err(e) = t.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await {
|
||||
// This is not fatal: we have durably created the child shard. It just makes the
|
||||
// split operation less seamless for clients, as we will may detach the parent
|
||||
// shard before the child shards are fully ready to serve requests.
|
||||
tracing::warn!("Failed to wait for shard {child_shard_id} to activate: {e}");
|
||||
continue;
|
||||
}
|
||||
|
||||
let timelines = t.timelines.lock().unwrap().clone();
|
||||
for timeline in timelines.values() {
|
||||
let Some(target_lsn) = target_lsns.get(&timeline.timeline_id) else {
|
||||
|
||||
@@ -133,7 +133,7 @@ impl SecondaryTenant {
|
||||
}
|
||||
|
||||
pub(crate) fn set_tenant_conf(&self, config: &TenantConfOpt) {
|
||||
*(self.tenant_conf.lock().unwrap()) = *config;
|
||||
*(self.tenant_conf.lock().unwrap()) = config.clone();
|
||||
}
|
||||
|
||||
/// For API access: generate a LocationConfig equivalent to the one that would be used to
|
||||
@@ -144,13 +144,13 @@ impl SecondaryTenant {
|
||||
|
||||
let conf = models::LocationConfigSecondary { warm: conf.warm };
|
||||
|
||||
let tenant_conf = *self.tenant_conf.lock().unwrap();
|
||||
let tenant_conf = self.tenant_conf.lock().unwrap().clone();
|
||||
models::LocationConfig {
|
||||
mode: models::LocationConfigMode::Secondary,
|
||||
generation: None,
|
||||
secondary_conf: Some(conf),
|
||||
shard_number: self.tenant_shard_id.shard_number.0,
|
||||
shard_count: self.tenant_shard_id.shard_count.0,
|
||||
shard_count: self.tenant_shard_id.shard_count.literal(),
|
||||
shard_stripe_size: self.shard_identity.stripe_size.0,
|
||||
tenant_conf: tenant_conf.into(),
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::metrics::TENANT_TASK_EVENTS;
|
||||
use crate::task_mgr;
|
||||
use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
|
||||
use crate::tenant::throttle::Stats;
|
||||
use crate::tenant::timeline::CompactionError;
|
||||
use crate::tenant::{Tenant, TenantState};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -139,6 +140,8 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
// How many errors we have seen consequtively
|
||||
let mut error_run_count = 0;
|
||||
|
||||
let mut last_throttle_flag_reset_at = Instant::now();
|
||||
|
||||
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
|
||||
async {
|
||||
let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
|
||||
@@ -203,6 +206,27 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
walredo_mgr.maybe_quiesce(period * 10);
|
||||
}
|
||||
|
||||
// TODO: move this (and walredo quiesce) to a separate task that isn't affected by the back-off,
|
||||
// so we get some upper bound guarantee on when walredo quiesce / this throttling reporting here happens.
|
||||
info_span!(parent: None, "timeline_get_throttle", tenant_id=%tenant.tenant_shard_id, shard_id=%tenant.tenant_shard_id.shard_slug()).in_scope(|| {
|
||||
let now = Instant::now();
|
||||
let prev = std::mem::replace(&mut last_throttle_flag_reset_at, now);
|
||||
let Stats { count_accounted, count_throttled, sum_throttled_usecs } = tenant.timeline_get_throttle.reset_stats();
|
||||
if count_throttled == 0 {
|
||||
return;
|
||||
}
|
||||
let allowed_rps = tenant.timeline_get_throttle.steady_rps();
|
||||
let delta = now - prev;
|
||||
warn!(
|
||||
n_seconds=%format_args!("{:.3}",
|
||||
delta.as_secs_f64()),
|
||||
count_accounted,
|
||||
count_throttled,
|
||||
sum_throttled_usecs,
|
||||
allowed_rps=%format_args!("{allowed_rps:.0}"),
|
||||
"shard was throttled in the last n_seconds")
|
||||
});
|
||||
|
||||
// Sleep
|
||||
if tokio::time::timeout(sleep_duration, cancel.cancelled())
|
||||
.await
|
||||
|
||||
162
pageserver/src/tenant/throttle.rs
Normal file
162
pageserver/src/tenant/throttle.rs
Normal file
@@ -0,0 +1,162 @@
|
||||
use std::{
|
||||
str::FromStr,
|
||||
sync::{
|
||||
atomic::{AtomicU64, Ordering},
|
||||
Arc,
|
||||
},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use arc_swap::ArcSwap;
|
||||
use enumset::EnumSet;
|
||||
use tracing::error;
|
||||
|
||||
use crate::{context::RequestContext, task_mgr::TaskKind};
|
||||
|
||||
/// Throttle for `async` functions.
|
||||
///
|
||||
/// Runtime reconfigurable.
|
||||
///
|
||||
/// To share a throttle among multiple entities, wrap it in an [`Arc`].
|
||||
///
|
||||
/// The intial use case for this is tenant-wide throttling of getpage@lsn requests.
|
||||
pub struct Throttle<M: Metric> {
|
||||
inner: ArcSwap<Inner>,
|
||||
metric: M,
|
||||
/// will be turned into [`Stats::count_accounted`]
|
||||
count_accounted: AtomicU64,
|
||||
/// will be turned into [`Stats::count_throttled`]
|
||||
count_throttled: AtomicU64,
|
||||
/// will be turned into [`Stats::sum_throttled_usecs`]
|
||||
sum_throttled_usecs: AtomicU64,
|
||||
}
|
||||
|
||||
pub struct Inner {
|
||||
task_kinds: EnumSet<TaskKind>,
|
||||
rate_limiter: Arc<leaky_bucket::RateLimiter>,
|
||||
config: Config,
|
||||
}
|
||||
|
||||
pub type Config = pageserver_api::models::ThrottleConfig;
|
||||
|
||||
pub struct Observation {
|
||||
pub wait_time: Duration,
|
||||
}
|
||||
pub trait Metric {
|
||||
fn observe_throttling(&self, observation: &Observation);
|
||||
}
|
||||
|
||||
/// See [`Throttle::reset_stats`].
|
||||
pub struct Stats {
|
||||
// Number of requests that were subject to throttling, i.e., requests of the configured [`Config::task_kinds`].
|
||||
pub count_accounted: u64,
|
||||
// Subset of the `accounted` requests that were actually throttled.
|
||||
// Note that the numbers are stored as two independent atomics, so, there might be a slight drift.
|
||||
pub count_throttled: u64,
|
||||
// Sum of microseconds that throttled requests spent waiting for throttling.
|
||||
pub sum_throttled_usecs: u64,
|
||||
}
|
||||
|
||||
impl<M> Throttle<M>
|
||||
where
|
||||
M: Metric,
|
||||
{
|
||||
pub fn new(config: Config, metric: M) -> Self {
|
||||
Self {
|
||||
inner: ArcSwap::new(Arc::new(Self::new_inner(config))),
|
||||
metric,
|
||||
count_accounted: AtomicU64::new(0),
|
||||
count_throttled: AtomicU64::new(0),
|
||||
sum_throttled_usecs: AtomicU64::new(0),
|
||||
}
|
||||
}
|
||||
fn new_inner(config: Config) -> Inner {
|
||||
let Config {
|
||||
task_kinds,
|
||||
initial,
|
||||
refill_interval,
|
||||
refill_amount,
|
||||
max,
|
||||
fair,
|
||||
} = &config;
|
||||
let task_kinds: EnumSet<TaskKind> = task_kinds
|
||||
.iter()
|
||||
.filter_map(|s| match TaskKind::from_str(s) {
|
||||
Ok(v) => Some(v),
|
||||
Err(e) => {
|
||||
// TODO: avoid this failure mode
|
||||
error!(
|
||||
"cannot parse task kind, ignoring for rate limiting {}",
|
||||
utils::error::report_compact_sources(&e)
|
||||
);
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
Inner {
|
||||
task_kinds,
|
||||
rate_limiter: Arc::new(
|
||||
leaky_bucket::RateLimiter::builder()
|
||||
.initial(*initial)
|
||||
.interval(*refill_interval)
|
||||
.refill(refill_amount.get())
|
||||
.max(*max)
|
||||
.fair(*fair)
|
||||
.build(),
|
||||
),
|
||||
config,
|
||||
}
|
||||
}
|
||||
pub fn reconfigure(&self, config: Config) {
|
||||
self.inner.store(Arc::new(Self::new_inner(config)));
|
||||
}
|
||||
|
||||
/// The [`Throttle`] keeps an internal flag that is true if there was ever any actual throttling.
|
||||
/// This method allows retrieving & resetting that flag.
|
||||
/// Useful for periodic reporting.
|
||||
pub fn reset_stats(&self) -> Stats {
|
||||
let count_accounted = self.count_accounted.swap(0, Ordering::Relaxed);
|
||||
let count_throttled = self.count_throttled.swap(0, Ordering::Relaxed);
|
||||
let sum_throttled_usecs = self.sum_throttled_usecs.swap(0, Ordering::Relaxed);
|
||||
Stats {
|
||||
count_accounted,
|
||||
count_throttled,
|
||||
sum_throttled_usecs,
|
||||
}
|
||||
}
|
||||
|
||||
/// See [`Config::steady_rps`].
|
||||
pub fn steady_rps(&self) -> f64 {
|
||||
self.inner.load().config.steady_rps()
|
||||
}
|
||||
|
||||
pub async fn throttle(&self, ctx: &RequestContext, key_count: usize) {
|
||||
let inner = self.inner.load_full(); // clones the `Inner` Arc
|
||||
if !inner.task_kinds.contains(ctx.task_kind()) {
|
||||
return;
|
||||
};
|
||||
let start = std::time::Instant::now();
|
||||
let mut did_throttle = false;
|
||||
let acquire = inner.rate_limiter.acquire(key_count);
|
||||
// turn off runtime-induced preemption (aka coop) so our `did_throttle` is accurate
|
||||
let acquire = tokio::task::unconstrained(acquire);
|
||||
let mut acquire = std::pin::pin!(acquire);
|
||||
std::future::poll_fn(|cx| {
|
||||
use std::future::Future;
|
||||
let poll = acquire.as_mut().poll(cx);
|
||||
did_throttle = did_throttle || poll.is_pending();
|
||||
poll
|
||||
})
|
||||
.await;
|
||||
self.count_accounted.fetch_add(1, Ordering::Relaxed);
|
||||
if did_throttle {
|
||||
self.count_throttled.fetch_add(1, Ordering::Relaxed);
|
||||
let now = Instant::now();
|
||||
let wait_time = now - start;
|
||||
self.sum_throttled_usecs
|
||||
.fetch_add(wait_time.as_micros() as u64, Ordering::Relaxed);
|
||||
let observation = Observation { wait_time };
|
||||
self.metric.observe_throttling(&observation);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -164,6 +164,9 @@ fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
|
||||
pub struct TimelineResources {
|
||||
pub remote_client: Option<RemoteTimelineClient>,
|
||||
pub deletion_queue_client: DeletionQueueClient,
|
||||
pub timeline_get_throttle: Arc<
|
||||
crate::tenant::throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>,
|
||||
>,
|
||||
}
|
||||
|
||||
pub struct Timeline {
|
||||
@@ -355,6 +358,11 @@ pub struct Timeline {
|
||||
///
|
||||
/// Timeline deletion will acquire both compaction and gc locks in whatever order.
|
||||
gc_lock: tokio::sync::Mutex<()>,
|
||||
|
||||
/// Cloned from [`super::Tenant::timeline_get_throttle`] on construction.
|
||||
timeline_get_throttle: Arc<
|
||||
crate::tenant::throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>,
|
||||
>,
|
||||
}
|
||||
|
||||
pub struct WalReceiverInfo {
|
||||
@@ -615,6 +623,8 @@ impl Timeline {
|
||||
return Err(PageReconstructError::Other(anyhow::anyhow!("Invalid LSN")));
|
||||
}
|
||||
|
||||
self.timeline_get_throttle.throttle(ctx, 1).await;
|
||||
|
||||
// This check is debug-only because of the cost of hashing, and because it's a double-check: we
|
||||
// already checked the key against the shard_identity when looking up the Timeline from
|
||||
// page_service.
|
||||
@@ -714,6 +724,10 @@ impl Timeline {
|
||||
return Err(GetVectoredError::Oversized(key_count));
|
||||
}
|
||||
|
||||
self.timeline_get_throttle
|
||||
.throttle(ctx, key_count as usize)
|
||||
.await;
|
||||
|
||||
let _timer = crate::metrics::GET_VECTORED_LATENCY
|
||||
.for_task_kind(ctx.task_kind())
|
||||
.map(|t| t.start_timer());
|
||||
@@ -1335,49 +1349,49 @@ const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10;
|
||||
// Private functions
|
||||
impl Timeline {
|
||||
pub(crate) fn get_lazy_slru_download(&self) -> bool {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.lazy_slru_download
|
||||
.unwrap_or(self.conf.default_tenant_conf.lazy_slru_download)
|
||||
}
|
||||
|
||||
fn get_checkpoint_distance(&self) -> u64 {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.checkpoint_distance
|
||||
.unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
|
||||
}
|
||||
|
||||
fn get_checkpoint_timeout(&self) -> Duration {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.checkpoint_timeout
|
||||
.unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
|
||||
}
|
||||
|
||||
fn get_compaction_target_size(&self) -> u64 {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.compaction_target_size
|
||||
.unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
|
||||
}
|
||||
|
||||
fn get_compaction_threshold(&self) -> usize {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.compaction_threshold
|
||||
.unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
|
||||
}
|
||||
|
||||
fn get_image_creation_threshold(&self) -> usize {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.image_creation_threshold
|
||||
.unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
|
||||
}
|
||||
|
||||
fn get_eviction_policy(&self) -> EvictionPolicy {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.eviction_policy
|
||||
.unwrap_or(self.conf.default_tenant_conf.eviction_policy)
|
||||
@@ -1393,7 +1407,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
fn get_gc_feedback(&self) -> bool {
|
||||
let tenant_conf = &self.tenant_conf.read().unwrap().tenant_conf;
|
||||
let tenant_conf = &self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.gc_feedback
|
||||
.unwrap_or(self.conf.default_tenant_conf.gc_feedback)
|
||||
@@ -1555,6 +1569,8 @@ impl Timeline {
|
||||
|
||||
compaction_lock: tokio::sync::Mutex::default(),
|
||||
gc_lock: tokio::sync::Mutex::default(),
|
||||
|
||||
timeline_get_throttle: resources.timeline_get_throttle,
|
||||
};
|
||||
result.repartition_threshold =
|
||||
result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
|
||||
@@ -3274,90 +3290,107 @@ impl Timeline {
|
||||
|
||||
for partition in partitioning.parts.iter() {
|
||||
let img_range = start..partition.ranges.last().unwrap().end;
|
||||
start = img_range.end;
|
||||
if force || self.time_for_new_image_layer(partition, lsn).await {
|
||||
let mut image_layer_writer = ImageLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_shard_id,
|
||||
&img_range,
|
||||
lsn,
|
||||
)
|
||||
.await?;
|
||||
if !force && !self.time_for_new_image_layer(partition, lsn).await {
|
||||
start = img_range.end;
|
||||
continue;
|
||||
}
|
||||
|
||||
fail_point!("image-layer-writer-fail-before-finish", |_| {
|
||||
Err(CreateImageLayersError::Other(anyhow::anyhow!(
|
||||
"failpoint image-layer-writer-fail-before-finish"
|
||||
)))
|
||||
});
|
||||
let mut image_layer_writer = ImageLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_shard_id,
|
||||
&img_range,
|
||||
lsn,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut key_request_accum = KeySpaceAccum::new();
|
||||
for range in &partition.ranges {
|
||||
let mut key = range.start;
|
||||
while key < range.end {
|
||||
if self.shard_identity.is_key_disposable(&key) {
|
||||
debug!(
|
||||
"Dropping key {} during compaction (it belongs on shard {:?})",
|
||||
key,
|
||||
self.shard_identity.get_shard_number(&key)
|
||||
);
|
||||
key = key.next();
|
||||
continue;
|
||||
}
|
||||
fail_point!("image-layer-writer-fail-before-finish", |_| {
|
||||
Err(CreateImageLayersError::Other(anyhow::anyhow!(
|
||||
"failpoint image-layer-writer-fail-before-finish"
|
||||
)))
|
||||
});
|
||||
|
||||
let mut wrote_keys = false;
|
||||
|
||||
let mut key_request_accum = KeySpaceAccum::new();
|
||||
for range in &partition.ranges {
|
||||
let mut key = range.start;
|
||||
while key < range.end {
|
||||
// Decide whether to retain this key: usually we do, but sharded tenants may
|
||||
// need to drop keys that don't belong to them. If we retain the key, add it
|
||||
// to `key_request_accum` for later issuing a vectored get
|
||||
if self.shard_identity.is_key_disposable(&key) {
|
||||
debug!(
|
||||
"Dropping key {} during compaction (it belongs on shard {:?})",
|
||||
key,
|
||||
self.shard_identity.get_shard_number(&key)
|
||||
);
|
||||
} else {
|
||||
key_request_accum.add_key(key);
|
||||
if key_request_accum.size() >= Timeline::MAX_GET_VECTORED_KEYS
|
||||
|| key.next() == range.end
|
||||
{
|
||||
let results = self
|
||||
.get_vectored(
|
||||
&key_request_accum.consume_keyspace().ranges,
|
||||
lsn,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
for (img_key, img) in results {
|
||||
let img = match img {
|
||||
Ok(img) => img,
|
||||
Err(err) => {
|
||||
// If we fail to reconstruct a VM or FSM page, we can zero the
|
||||
// page without losing any actual user data. That seems better
|
||||
// than failing repeatedly and getting stuck.
|
||||
//
|
||||
// We had a bug at one point, where we truncated the FSM and VM
|
||||
// in the pageserver, but the Postgres didn't know about that
|
||||
// and continued to generate incremental WAL records for pages
|
||||
// that didn't exist in the pageserver. Trying to replay those
|
||||
// WAL records failed to find the previous image of the page.
|
||||
// This special case allows us to recover from that situation.
|
||||
// See https://github.com/neondatabase/neon/issues/2601.
|
||||
//
|
||||
// Unfortunately we cannot do this for the main fork, or for
|
||||
// any metadata keys, keys, as that would lead to actual data
|
||||
// loss.
|
||||
if is_rel_fsm_block_key(img_key)
|
||||
|| is_rel_vm_block_key(img_key)
|
||||
{
|
||||
warn!("could not reconstruct FSM or VM key {img_key}, filling with zeros: {err:?}");
|
||||
ZERO_PAGE.clone()
|
||||
} else {
|
||||
return Err(
|
||||
CreateImageLayersError::PageReconstructError(err),
|
||||
);
|
||||
}
|
||||
let last_key_in_range = key.next() == range.end;
|
||||
key = key.next();
|
||||
|
||||
// Maybe flush `key_rest_accum`
|
||||
if key_request_accum.size() >= Timeline::MAX_GET_VECTORED_KEYS
|
||||
|| last_key_in_range
|
||||
{
|
||||
let results = self
|
||||
.get_vectored(&key_request_accum.consume_keyspace().ranges, lsn, ctx)
|
||||
.await?;
|
||||
|
||||
for (img_key, img) in results {
|
||||
let img = match img {
|
||||
Ok(img) => img,
|
||||
Err(err) => {
|
||||
// If we fail to reconstruct a VM or FSM page, we can zero the
|
||||
// page without losing any actual user data. That seems better
|
||||
// than failing repeatedly and getting stuck.
|
||||
//
|
||||
// We had a bug at one point, where we truncated the FSM and VM
|
||||
// in the pageserver, but the Postgres didn't know about that
|
||||
// and continued to generate incremental WAL records for pages
|
||||
// that didn't exist in the pageserver. Trying to replay those
|
||||
// WAL records failed to find the previous image of the page.
|
||||
// This special case allows us to recover from that situation.
|
||||
// See https://github.com/neondatabase/neon/issues/2601.
|
||||
//
|
||||
// Unfortunately we cannot do this for the main fork, or for
|
||||
// any metadata keys, keys, as that would lead to actual data
|
||||
// loss.
|
||||
if is_rel_fsm_block_key(img_key) || is_rel_vm_block_key(img_key)
|
||||
{
|
||||
warn!("could not reconstruct FSM or VM key {img_key}, filling with zeros: {err:?}");
|
||||
ZERO_PAGE.clone()
|
||||
} else {
|
||||
return Err(CreateImageLayersError::PageReconstructError(
|
||||
err,
|
||||
));
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
image_layer_writer.put_image(img_key, img).await?;
|
||||
}
|
||||
// Write all the keys we just read into our new image layer.
|
||||
image_layer_writer.put_image(img_key, img).await?;
|
||||
wrote_keys = true;
|
||||
}
|
||||
|
||||
key = key.next();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if wrote_keys {
|
||||
// Normal path: we have written some data into the new image layer for this
|
||||
// partition, so flush it to disk.
|
||||
start = img_range.end;
|
||||
let image_layer = image_layer_writer.finish(self).await?;
|
||||
image_layers.push(image_layer);
|
||||
} else {
|
||||
// Special case: the image layer may be empty if this is a sharded tenant and the
|
||||
// partition does not cover any keys owned by this shard. In this case, to ensure
|
||||
// we don't leave gaps between image layers, leave `start` where it is, so that the next
|
||||
// layer we write will cover the key range that we just scanned.
|
||||
tracing::debug!("no data in range {}-{}", img_range.start, img_range.end);
|
||||
}
|
||||
}
|
||||
// All layers that the GC wanted us to create have now been created.
|
||||
@@ -4849,7 +4882,7 @@ mod tests {
|
||||
TenantHarness::create("two_layer_eviction_attempts_at_the_same_time").unwrap();
|
||||
|
||||
let ctx = any_context();
|
||||
let tenant = harness.try_load(&ctx).await.unwrap();
|
||||
let tenant = harness.do_try_load(&ctx).await.unwrap();
|
||||
let timeline = tenant
|
||||
.create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
|
||||
.await
|
||||
|
||||
@@ -419,6 +419,7 @@ impl DeleteTimelineFlow {
|
||||
TimelineResources {
|
||||
remote_client,
|
||||
deletion_queue_client,
|
||||
timeline_get_throttle: tenant.timeline_get_throttle.clone(),
|
||||
},
|
||||
// Important. We dont pass ancestor above because it can be missing.
|
||||
// Thus we need to skip the validation here.
|
||||
|
||||
66
poetry.lock
generated
66
poetry.lock
generated
@@ -836,43 +836,43 @@ files = [
|
||||
|
||||
[[package]]
|
||||
name = "cryptography"
|
||||
version = "42.0.0"
|
||||
version = "42.0.2"
|
||||
description = "cryptography is a package which provides cryptographic recipes and primitives to Python developers."
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
{file = "cryptography-42.0.0-cp37-abi3-macosx_10_12_universal2.whl", hash = "sha256:c640b0ef54138fde761ec99a6c7dc4ce05e80420262c20fa239e694ca371d434"},
|
||||
{file = "cryptography-42.0.0-cp37-abi3-macosx_10_12_x86_64.whl", hash = "sha256:678cfa0d1e72ef41d48993a7be75a76b0725d29b820ff3cfd606a5b2b33fda01"},
|
||||
{file = "cryptography-42.0.0-cp37-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:146e971e92a6dd042214b537a726c9750496128453146ab0ee8971a0299dc9bd"},
|
||||
{file = "cryptography-42.0.0-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:87086eae86a700307b544625e3ba11cc600c3c0ef8ab97b0fda0705d6db3d4e3"},
|
||||
{file = "cryptography-42.0.0-cp37-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:0a68bfcf57a6887818307600c3c0ebc3f62fbb6ccad2240aa21887cda1f8df1b"},
|
||||
{file = "cryptography-42.0.0-cp37-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:5a217bca51f3b91971400890905a9323ad805838ca3fa1e202a01844f485ee87"},
|
||||
{file = "cryptography-42.0.0-cp37-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:ca20550bb590db16223eb9ccc5852335b48b8f597e2f6f0878bbfd9e7314eb17"},
|
||||
{file = "cryptography-42.0.0-cp37-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:33588310b5c886dfb87dba5f013b8d27df7ffd31dc753775342a1e5ab139e59d"},
|
||||
{file = "cryptography-42.0.0-cp37-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:9515ea7f596c8092fdc9902627e51b23a75daa2c7815ed5aa8cf4f07469212ec"},
|
||||
{file = "cryptography-42.0.0-cp37-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:35cf6ed4c38f054478a9df14f03c1169bb14bd98f0b1705751079b25e1cb58bc"},
|
||||
{file = "cryptography-42.0.0-cp37-abi3-win32.whl", hash = "sha256:8814722cffcfd1fbd91edd9f3451b88a8f26a5fd41b28c1c9193949d1c689dc4"},
|
||||
{file = "cryptography-42.0.0-cp37-abi3-win_amd64.whl", hash = "sha256:a2a8d873667e4fd2f34aedab02ba500b824692c6542e017075a2efc38f60a4c0"},
|
||||
{file = "cryptography-42.0.0-cp39-abi3-macosx_10_12_universal2.whl", hash = "sha256:8fedec73d590fd30c4e3f0d0f4bc961aeca8390c72f3eaa1a0874d180e868ddf"},
|
||||
{file = "cryptography-42.0.0-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:be41b0c7366e5549265adf2145135dca107718fa44b6e418dc7499cfff6b4689"},
|
||||
{file = "cryptography-42.0.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3ca482ea80626048975360c8e62be3ceb0f11803180b73163acd24bf014133a0"},
|
||||
{file = "cryptography-42.0.0-cp39-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:c58115384bdcfe9c7f644c72f10f6f42bed7cf59f7b52fe1bf7ae0a622b3a139"},
|
||||
{file = "cryptography-42.0.0-cp39-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:56ce0c106d5c3fec1038c3cca3d55ac320a5be1b44bf15116732d0bc716979a2"},
|
||||
{file = "cryptography-42.0.0-cp39-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:324721d93b998cb7367f1e6897370644751e5580ff9b370c0a50dc60a2003513"},
|
||||
{file = "cryptography-42.0.0-cp39-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:d97aae66b7de41cdf5b12087b5509e4e9805ed6f562406dfcf60e8481a9a28f8"},
|
||||
{file = "cryptography-42.0.0-cp39-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:85f759ed59ffd1d0baad296e72780aa62ff8a71f94dc1ab340386a1207d0ea81"},
|
||||
{file = "cryptography-42.0.0-cp39-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:206aaf42e031b93f86ad60f9f5d9da1b09164f25488238ac1dc488334eb5e221"},
|
||||
{file = "cryptography-42.0.0-cp39-abi3-win32.whl", hash = "sha256:74f18a4c8ca04134d2052a140322002fef535c99cdbc2a6afc18a8024d5c9d5b"},
|
||||
{file = "cryptography-42.0.0-cp39-abi3-win_amd64.whl", hash = "sha256:14e4b909373bc5bf1095311fa0f7fcabf2d1a160ca13f1e9e467be1ac4cbdf94"},
|
||||
{file = "cryptography-42.0.0-pp310-pypy310_pp73-macosx_10_12_x86_64.whl", hash = "sha256:3005166a39b70c8b94455fdbe78d87a444da31ff70de3331cdec2c568cf25b7e"},
|
||||
{file = "cryptography-42.0.0-pp310-pypy310_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:be14b31eb3a293fc6e6aa2807c8a3224c71426f7c4e3639ccf1a2f3ffd6df8c3"},
|
||||
{file = "cryptography-42.0.0-pp310-pypy310_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:bd7cf7a8d9f34cc67220f1195884151426ce616fdc8285df9054bfa10135925f"},
|
||||
{file = "cryptography-42.0.0-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:c310767268d88803b653fffe6d6f2f17bb9d49ffceb8d70aed50ad45ea49ab08"},
|
||||
{file = "cryptography-42.0.0-pp39-pypy39_pp73-macosx_10_12_x86_64.whl", hash = "sha256:bdce70e562c69bb089523e75ef1d9625b7417c6297a76ac27b1b8b1eb51b7d0f"},
|
||||
{file = "cryptography-42.0.0-pp39-pypy39_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:e9326ca78111e4c645f7e49cbce4ed2f3f85e17b61a563328c85a5208cf34440"},
|
||||
{file = "cryptography-42.0.0-pp39-pypy39_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:69fd009a325cad6fbfd5b04c711a4da563c6c4854fc4c9544bff3088387c77c0"},
|
||||
{file = "cryptography-42.0.0-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:988b738f56c665366b1e4bfd9045c3efae89ee366ca3839cd5af53eaa1401bce"},
|
||||
{file = "cryptography-42.0.0.tar.gz", hash = "sha256:6cf9b76d6e93c62114bd19485e5cb003115c134cf9ce91f8ac924c44f8c8c3f4"},
|
||||
{file = "cryptography-42.0.2-cp37-abi3-macosx_10_12_universal2.whl", hash = "sha256:701171f825dcab90969596ce2af253143b93b08f1a716d4b2a9d2db5084ef7be"},
|
||||
{file = "cryptography-42.0.2-cp37-abi3-macosx_10_12_x86_64.whl", hash = "sha256:61321672b3ac7aade25c40449ccedbc6db72c7f5f0fdf34def5e2f8b51ca530d"},
|
||||
{file = "cryptography-42.0.2-cp37-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ea2c3ffb662fec8bbbfce5602e2c159ff097a4631d96235fcf0fb00e59e3ece4"},
|
||||
{file = "cryptography-42.0.2-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3b15c678f27d66d247132cbf13df2f75255627bcc9b6a570f7d2fd08e8c081d2"},
|
||||
{file = "cryptography-42.0.2-cp37-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:8e88bb9eafbf6a4014d55fb222e7360eef53e613215085e65a13290577394529"},
|
||||
{file = "cryptography-42.0.2-cp37-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:a047682d324ba56e61b7ea7c7299d51e61fd3bca7dad2ccc39b72bd0118d60a1"},
|
||||
{file = "cryptography-42.0.2-cp37-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:36d4b7c4be6411f58f60d9ce555a73df8406d484ba12a63549c88bd64f7967f1"},
|
||||
{file = "cryptography-42.0.2-cp37-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:a00aee5d1b6c20620161984f8ab2ab69134466c51f58c052c11b076715e72929"},
|
||||
{file = "cryptography-42.0.2-cp37-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:b97fe7d7991c25e6a31e5d5e795986b18fbbb3107b873d5f3ae6dc9a103278e9"},
|
||||
{file = "cryptography-42.0.2-cp37-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:5fa82a26f92871eca593b53359c12ad7949772462f887c35edaf36f87953c0e2"},
|
||||
{file = "cryptography-42.0.2-cp37-abi3-win32.whl", hash = "sha256:4b063d3413f853e056161eb0c7724822a9740ad3caa24b8424d776cebf98e7ee"},
|
||||
{file = "cryptography-42.0.2-cp37-abi3-win_amd64.whl", hash = "sha256:841ec8af7a8491ac76ec5a9522226e287187a3107e12b7d686ad354bb78facee"},
|
||||
{file = "cryptography-42.0.2-cp39-abi3-macosx_10_12_universal2.whl", hash = "sha256:55d1580e2d7e17f45d19d3b12098e352f3a37fe86d380bf45846ef257054b242"},
|
||||
{file = "cryptography-42.0.2-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:28cb2c41f131a5758d6ba6a0504150d644054fd9f3203a1e8e8d7ac3aea7f73a"},
|
||||
{file = "cryptography-42.0.2-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b9097a208875fc7bbeb1286d0125d90bdfed961f61f214d3f5be62cd4ed8a446"},
|
||||
{file = "cryptography-42.0.2-cp39-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:44c95c0e96b3cb628e8452ec060413a49002a247b2b9938989e23a2c8291fc90"},
|
||||
{file = "cryptography-42.0.2-cp39-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:2f9f14185962e6a04ab32d1abe34eae8a9001569ee4edb64d2304bf0d65c53f3"},
|
||||
{file = "cryptography-42.0.2-cp39-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:09a77e5b2e8ca732a19a90c5bca2d124621a1edb5438c5daa2d2738bfeb02589"},
|
||||
{file = "cryptography-42.0.2-cp39-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:ad28cff53f60d99a928dfcf1e861e0b2ceb2bc1f08a074fdd601b314e1cc9e0a"},
|
||||
{file = "cryptography-42.0.2-cp39-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:130c0f77022b2b9c99d8cebcdd834d81705f61c68e91ddd614ce74c657f8b3ea"},
|
||||
{file = "cryptography-42.0.2-cp39-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:fa3dec4ba8fb6e662770b74f62f1a0c7d4e37e25b58b2bf2c1be4c95372b4a33"},
|
||||
{file = "cryptography-42.0.2-cp39-abi3-win32.whl", hash = "sha256:3dbd37e14ce795b4af61b89b037d4bc157f2cb23e676fa16932185a04dfbf635"},
|
||||
{file = "cryptography-42.0.2-cp39-abi3-win_amd64.whl", hash = "sha256:8a06641fb07d4e8f6c7dda4fc3f8871d327803ab6542e33831c7ccfdcb4d0ad6"},
|
||||
{file = "cryptography-42.0.2-pp310-pypy310_pp73-macosx_10_12_x86_64.whl", hash = "sha256:087887e55e0b9c8724cf05361357875adb5c20dec27e5816b653492980d20380"},
|
||||
{file = "cryptography-42.0.2-pp310-pypy310_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:a7ef8dd0bf2e1d0a27042b231a3baac6883cdd5557036f5e8df7139255feaac6"},
|
||||
{file = "cryptography-42.0.2-pp310-pypy310_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:4383b47f45b14459cab66048d384614019965ba6c1a1a141f11b5a551cace1b2"},
|
||||
{file = "cryptography-42.0.2-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:fbeb725c9dc799a574518109336acccaf1303c30d45c075c665c0793c2f79a7f"},
|
||||
{file = "cryptography-42.0.2-pp39-pypy39_pp73-macosx_10_12_x86_64.whl", hash = "sha256:320948ab49883557a256eab46149df79435a22d2fefd6a66fe6946f1b9d9d008"},
|
||||
{file = "cryptography-42.0.2-pp39-pypy39_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:5ef9bc3d046ce83c4bbf4c25e1e0547b9c441c01d30922d812e887dc5f125c12"},
|
||||
{file = "cryptography-42.0.2-pp39-pypy39_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:52ed9ebf8ac602385126c9a2fe951db36f2cb0c2538d22971487f89d0de4065a"},
|
||||
{file = "cryptography-42.0.2-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:141e2aa5ba100d3788c0ad7919b288f89d1fe015878b9659b307c9ef867d3a65"},
|
||||
{file = "cryptography-42.0.2.tar.gz", hash = "sha256:e0ec52ba3c7f1b7d813cd52649a5b3ef1fc0d433219dc8c93827c57eab6cf888"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
|
||||
@@ -12,8 +12,12 @@ pub fn check_permission(claims: &Claims, tenant_id: Option<TenantId>) -> Result<
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
(Scope::PageServerApi, _) => Err(AuthError(
|
||||
"PageServerApi scope makes no sense for Safekeeper".into(),
|
||||
(Scope::PageServerApi | Scope::GenerationsApi, _) => Err(AuthError(
|
||||
format!(
|
||||
"JWT scope '{:?}' is ineligible for Safekeeper auth",
|
||||
claims.scope
|
||||
)
|
||||
.into(),
|
||||
)),
|
||||
(Scope::SafekeeperData, _) => Ok(()),
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Tuple
|
||||
|
||||
@@ -33,6 +34,10 @@ from performance.pageserver.util import ensure_pageserver_ready_for_benchmarking
|
||||
@pytest.mark.timeout(
|
||||
10000
|
||||
) # TODO: this value is just "a really high number"; have this per instance type
|
||||
@pytest.mark.skipif(
|
||||
os.getenv("CI", "false") == "true",
|
||||
reason="The test if flaky on CI: https://github.com/neondatabase/neon/issues/6724",
|
||||
)
|
||||
def test_pageserver_max_throughput_getpage_at_latest_lsn(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
zenbenchmark: NeonBenchmarker,
|
||||
|
||||
@@ -176,6 +176,14 @@ def test_fully_custom_config(positive_env: NeonEnv):
|
||||
"lazy_slru_download": True,
|
||||
"max_lsn_wal_lag": 230000,
|
||||
"min_resident_size_override": 23,
|
||||
"timeline_get_throttle": {
|
||||
"task_kinds": ["PageRequestHandler"],
|
||||
"fair": True,
|
||||
"initial": 0,
|
||||
"refill_interval": "1s",
|
||||
"refill_amount": 1000,
|
||||
"max": 1000,
|
||||
},
|
||||
"trace_read_requests": True,
|
||||
"walreceiver_connect_timeout": "13m",
|
||||
}
|
||||
|
||||
@@ -225,9 +225,7 @@ def test_auth_failures(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
|
||||
|
||||
check_pageserver(True, password=pageserver_token)
|
||||
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*SafekeeperData scope makes no sense for Pageserver.*"
|
||||
)
|
||||
env.pageserver.allowed_errors.append(".*JWT scope '.+' is ineligible for Pageserver auth.*")
|
||||
check_pageserver(False, password=safekeeper_token)
|
||||
|
||||
def check_safekeeper(expect_success: bool, **conn_kwargs):
|
||||
|
||||
@@ -141,7 +141,12 @@ def test_create_snapshot(
|
||||
)
|
||||
if compatibility_snapshot_dir.exists():
|
||||
shutil.rmtree(compatibility_snapshot_dir)
|
||||
shutil.copytree(test_output_dir, compatibility_snapshot_dir)
|
||||
|
||||
shutil.copytree(
|
||||
test_output_dir,
|
||||
compatibility_snapshot_dir,
|
||||
ignore=shutil.ignore_patterns("pg_dynshmem"),
|
||||
)
|
||||
|
||||
|
||||
@check_ondisk_data_compatibility_if_enabled
|
||||
|
||||
@@ -20,6 +20,7 @@ from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
NeonPageserver,
|
||||
PgBin,
|
||||
S3Scrubber,
|
||||
last_flush_lsn_upload,
|
||||
@@ -62,7 +63,7 @@ def generate_uploads_and_deletions(
|
||||
tenant_id: Optional[TenantId] = None,
|
||||
timeline_id: Optional[TimelineId] = None,
|
||||
data: Optional[str] = None,
|
||||
pageserver_id: Optional[int] = None,
|
||||
pageserver: NeonPageserver,
|
||||
):
|
||||
"""
|
||||
Using the environment's default tenant + timeline, generate a load pattern
|
||||
@@ -77,14 +78,16 @@ def generate_uploads_and_deletions(
|
||||
timeline_id = env.initial_timeline
|
||||
assert timeline_id is not None
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
ps_http = pageserver.http_client()
|
||||
|
||||
with env.endpoints.create_start(
|
||||
"main", tenant_id=tenant_id, pageserver_id=pageserver_id
|
||||
"main", tenant_id=tenant_id, pageserver_id=pageserver.id
|
||||
) as endpoint:
|
||||
if init:
|
||||
endpoint.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)")
|
||||
last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id)
|
||||
last_flush_lsn_upload(
|
||||
env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id
|
||||
)
|
||||
|
||||
def churn(data):
|
||||
endpoint.safe_psql_many(
|
||||
@@ -105,7 +108,9 @@ def generate_uploads_and_deletions(
|
||||
# We are waiting for uploads as well as local flush, in order to avoid leaving the system
|
||||
# in a state where there are "future layers" in remote storage that will generate deletions
|
||||
# after a restart.
|
||||
last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id)
|
||||
last_flush_lsn_upload(
|
||||
env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id
|
||||
)
|
||||
ps_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
|
||||
# Compaction should generate some GC-elegible layers
|
||||
@@ -205,7 +210,7 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
|
||||
env.neon_cli.create_tenant(
|
||||
tenant_id=env.initial_tenant, conf=TENANT_CONF, timeline_id=env.initial_timeline
|
||||
)
|
||||
generate_uploads_and_deletions(env, pageserver_id=env.pageserver.id)
|
||||
generate_uploads_and_deletions(env, pageserver=env.pageserver)
|
||||
|
||||
def parse_generation_suffix(key):
|
||||
m = re.match(".+-([0-9a-zA-Z]{8})$", key)
|
||||
@@ -233,7 +238,7 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
|
||||
# Starting without the override that disabled control_plane_api
|
||||
env.pageserver.start()
|
||||
|
||||
generate_uploads_and_deletions(env, pageserver_id=env.pageserver.id, init=False)
|
||||
generate_uploads_and_deletions(env, pageserver=env.pageserver, init=False)
|
||||
|
||||
legacy_objects: list[str] = []
|
||||
suffixed_objects = []
|
||||
@@ -277,13 +282,16 @@ def test_deferred_deletion(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
RemoteStorageKind.MOCK_S3,
|
||||
)
|
||||
neon_env_builder.num_pageservers = 2
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
|
||||
|
||||
some_other_pageserver = 1234
|
||||
attached_to_id = env.attachment_service.locate(env.initial_tenant)[0]["node_id"]
|
||||
main_pageserver = env.get_pageserver(attached_to_id)
|
||||
other_pageserver = [p for p in env.pageservers if p.id != attached_to_id][0]
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
ps_http = main_pageserver.http_client()
|
||||
|
||||
generate_uploads_and_deletions(env)
|
||||
generate_uploads_and_deletions(env, pageserver=main_pageserver)
|
||||
|
||||
# Flush: pending deletions should all complete
|
||||
assert_deletion_queue(ps_http, lambda n: n > 0)
|
||||
@@ -296,14 +304,14 @@ def test_deferred_deletion(neon_env_builder: NeonEnvBuilder):
|
||||
assert timeline["remote_consistent_lsn"] == timeline["remote_consistent_lsn_visible"]
|
||||
assert get_deletion_queue_dropped_lsn_updates(ps_http) == 0
|
||||
|
||||
env.pageserver.allowed_errors.extend(
|
||||
main_pageserver.allowed_errors.extend(
|
||||
[".*Dropped remote consistent LSN updates.*", ".*Dropping stale deletions.*"]
|
||||
)
|
||||
|
||||
# Now advance the generation in the control plane: subsequent validations
|
||||
# from the running pageserver will fail. No more deletions should happen.
|
||||
env.attachment_service.attach_hook_issue(env.initial_tenant, some_other_pageserver)
|
||||
generate_uploads_and_deletions(env, init=False, pageserver_id=env.pageserver.id)
|
||||
env.attachment_service.attach_hook_issue(env.initial_tenant, other_pageserver.id)
|
||||
generate_uploads_and_deletions(env, init=False, pageserver=main_pageserver)
|
||||
|
||||
assert_deletion_queue(ps_http, lambda n: n > 0)
|
||||
queue_depth_before = get_deletion_queue_depth(ps_http)
|
||||
@@ -355,9 +363,14 @@ def test_deletion_queue_recovery(
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
RemoteStorageKind.MOCK_S3,
|
||||
)
|
||||
neon_env_builder.num_pageservers = 2
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
attached_to_id = env.attachment_service.locate(env.initial_tenant)[0]["node_id"]
|
||||
main_pageserver = env.get_pageserver(attached_to_id)
|
||||
other_pageserver = [p for p in env.pageservers if p.id != attached_to_id][0]
|
||||
|
||||
ps_http = main_pageserver.http_client()
|
||||
|
||||
failpoints = [
|
||||
# Prevent deletion lists from being executed, to build up some backlog of deletions
|
||||
@@ -374,7 +387,7 @@ def test_deletion_queue_recovery(
|
||||
|
||||
ps_http.configure_failpoints(failpoints)
|
||||
|
||||
generate_uploads_and_deletions(env)
|
||||
generate_uploads_and_deletions(env, pageserver=main_pageserver)
|
||||
|
||||
# There should be entries in the deletion queue
|
||||
assert_deletion_queue(ps_http, lambda n: n > 0)
|
||||
@@ -401,7 +414,7 @@ def test_deletion_queue_recovery(
|
||||
# also wait to see the header hit the disk: this seems paranoid but the race
|
||||
# can really happen on a heavily overloaded test machine.
|
||||
def assert_header_written():
|
||||
assert (env.pageserver.workdir / "deletion" / "header-01").exists()
|
||||
assert (main_pageserver.workdir / "deletion" / "header-01").exists()
|
||||
|
||||
wait_until(20, 1, assert_header_written)
|
||||
|
||||
@@ -411,13 +424,13 @@ def test_deletion_queue_recovery(
|
||||
before_restart_depth = get_deletion_queue_validated(ps_http)
|
||||
|
||||
log.info(f"Restarting pageserver with {before_restart_depth} deletions enqueued")
|
||||
env.pageserver.stop(immediate=True)
|
||||
main_pageserver.stop(immediate=True)
|
||||
|
||||
if keep_attachment == KeepAttachment.LOSE:
|
||||
some_other_pageserver = 101010
|
||||
some_other_pageserver = other_pageserver.id
|
||||
env.attachment_service.attach_hook_issue(env.initial_tenant, some_other_pageserver)
|
||||
|
||||
env.pageserver.start()
|
||||
main_pageserver.start()
|
||||
|
||||
def assert_deletions_submitted(n: int):
|
||||
assert ps_http.get_metric_value("pageserver_deletion_queue_submitted_total") == n
|
||||
@@ -440,7 +453,7 @@ def test_deletion_queue_recovery(
|
||||
# validated before restart.
|
||||
assert get_deletion_queue_executed(ps_http) == before_restart_depth
|
||||
else:
|
||||
env.pageserver.allowed_errors.extend([".*Dropping stale deletions.*"])
|
||||
main_pageserver.allowed_errors.extend([".*Dropping stale deletions.*"])
|
||||
|
||||
# If we lost the attachment, we should have dropped our pre-restart deletions.
|
||||
assert get_deletion_queue_dropped(ps_http) == before_restart_depth
|
||||
@@ -449,8 +462,8 @@ def test_deletion_queue_recovery(
|
||||
assert get_deletion_queue_dropped_lsn_updates(ps_http) == 0
|
||||
|
||||
# Restart again
|
||||
env.pageserver.stop(immediate=True)
|
||||
env.pageserver.start()
|
||||
main_pageserver.stop(immediate=True)
|
||||
main_pageserver.start()
|
||||
|
||||
# No deletion lists should be recovered: this demonstrates that deletion lists
|
||||
# were cleaned up after being executed or dropped in the previous process lifetime.
|
||||
@@ -469,7 +482,7 @@ def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
generate_uploads_and_deletions(env, pageserver_id=env.pageserver.id)
|
||||
generate_uploads_and_deletions(env, pageserver=env.pageserver)
|
||||
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
@@ -486,7 +499,7 @@ def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
# Remember how many validations had happened before the control plane went offline
|
||||
validated = get_deletion_queue_validated(ps_http)
|
||||
|
||||
generate_uploads_and_deletions(env, init=False, pageserver_id=env.pageserver.id)
|
||||
generate_uploads_and_deletions(env, init=False, pageserver=env.pageserver)
|
||||
|
||||
# The running pageserver should stop progressing deletions
|
||||
time.sleep(10)
|
||||
@@ -502,7 +515,7 @@ def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
)
|
||||
|
||||
# The pageserver should provide service to clients
|
||||
generate_uploads_and_deletions(env, init=False, pageserver_id=env.pageserver.id)
|
||||
generate_uploads_and_deletions(env, init=False, pageserver=env.pageserver)
|
||||
|
||||
# The pageserver should neither validate nor execute any deletions, it should have
|
||||
# loaded the DeletionLists from before though
|
||||
@@ -523,7 +536,7 @@ def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
env.pageserver.stop() # Non-immediate: implicitly checking that shutdown doesn't hang waiting for CP
|
||||
env.pageserver.start()
|
||||
|
||||
generate_uploads_and_deletions(env, init=False, pageserver_id=env.pageserver.id)
|
||||
generate_uploads_and_deletions(env, init=False, pageserver=env.pageserver)
|
||||
ps_http.deletion_queue_flush(execute=True)
|
||||
assert get_deletion_queue_depth(ps_http) == 0
|
||||
assert get_deletion_queue_validated(ps_http) > 0
|
||||
@@ -561,7 +574,7 @@ def test_eviction_across_generations(neon_env_builder: NeonEnvBuilder):
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
generate_uploads_and_deletions(env)
|
||||
generate_uploads_and_deletions(env, pageserver=env.pageserver)
|
||||
|
||||
read_all(env, tenant_id, timeline_id)
|
||||
evict_all_layers(env, tenant_id, timeline_id)
|
||||
|
||||
@@ -18,6 +18,7 @@ from fixtures.metrics import (
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverApiException
|
||||
from fixtures.pageserver.utils import timeline_delete_wait_completed, wait_until_tenant_active
|
||||
@@ -414,3 +415,50 @@ def test_create_churn_during_restart(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# The tenant should end up active
|
||||
wait_until_tenant_active(env.pageserver.http_client(), tenant_id, iterations=10, period=1)
|
||||
|
||||
|
||||
def test_pageserver_metrics_many_relations(neon_env_builder: NeonEnvBuilder):
|
||||
"""Test for the directory_entries_count metric"""
|
||||
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.MOCK_S3)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
endpoint_tenant = env.endpoints.create_start("main", tenant_id=env.initial_tenant)
|
||||
|
||||
# Not sure why but this many tables creates more relations than our limit
|
||||
TABLE_COUNT = 1600
|
||||
COUNT_AT_LEAST_EXPECTED = 5500
|
||||
|
||||
with endpoint_tenant.connect() as conn:
|
||||
with conn.cursor() as cur:
|
||||
# Wrapping begin; commit; around this and the loop below keeps the reproduction
|
||||
# but it also doesn't have a performance benefit
|
||||
cur.execute("CREATE TABLE template_tbl(key int primary key, value text);")
|
||||
for i in range(TABLE_COUNT):
|
||||
cur.execute(f"CREATE TABLE tbl_{i}(like template_tbl INCLUDING ALL);")
|
||||
wait_for_last_flush_lsn(env, endpoint_tenant, env.initial_tenant, env.initial_timeline)
|
||||
endpoint_tenant.stop()
|
||||
|
||||
m = ps_http.get_metrics()
|
||||
directory_entries_count_metric = m.query_all(
|
||||
"pageserver_directory_entries_count", {"tenant_id": str(env.initial_tenant)}
|
||||
)
|
||||
|
||||
def only_int(samples: List[Sample]) -> int:
|
||||
assert len(samples) == 1
|
||||
return int(samples[0].value)
|
||||
|
||||
directory_entries_count = only_int(directory_entries_count_metric)
|
||||
|
||||
log.info(f"pageserver_directory_entries_count metric value: {directory_entries_count}")
|
||||
|
||||
assert directory_entries_count > COUNT_AT_LEAST_EXPECTED
|
||||
|
||||
timeline_detail = ps_http.timeline_detail(env.initial_tenant, env.initial_timeline)
|
||||
|
||||
counts = timeline_detail["directory_entries_counts"]
|
||||
assert counts
|
||||
log.info(f"directory counts: {counts}")
|
||||
assert counts[2] > COUNT_AT_LEAST_EXPECTED
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: b4bae26a0f...9dd9956c55
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 9eef016e18...ca2def9993
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: f7b63d8cf9...9c37a49884
6
vendor/revisions.json
vendored
6
vendor/revisions.json
vendored
@@ -1,5 +1,5 @@
|
||||
{
|
||||
"postgres-v16": "f7b63d8cf9ae040f6907c3c13ef25fcf15a36161",
|
||||
"postgres-v15": "9eef016e18bf61753e3cbaa755f705db6a4f7b1d",
|
||||
"postgres-v14": "b4bae26a0f09c69e979e6cb55780398e3102e022"
|
||||
"postgres-v16": "9c37a4988463a97d9cacb321acf3828b09823269",
|
||||
"postgres-v15": "ca2def999368d9df098a637234ad5a9003189463",
|
||||
"postgres-v14": "9dd9956c55ffbbd9abe77d10382453757fedfcf5"
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user