mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-17 05:00:38 +00:00
Compare commits
26 Commits
problame/b
...
sk-migrate
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
22e9702525 | ||
|
|
ac38d3a88c | ||
|
|
0f56104a61 | ||
|
|
f260f1565e | ||
|
|
c29df80634 | ||
|
|
58dbca6ce3 | ||
|
|
613906acea | ||
|
|
82809d2ec2 | ||
|
|
0bd79eb063 | ||
|
|
8ff5387da1 | ||
|
|
8b91bbc38e | ||
|
|
e6bf6952b8 | ||
|
|
a2fab34371 | ||
|
|
c52384752e | ||
|
|
73d247c464 | ||
|
|
b701394d7a | ||
|
|
d89af4cf8e | ||
|
|
6ffbbb2e02 | ||
|
|
fbb979d5e3 | ||
|
|
a89d6dc76e | ||
|
|
c272c68e5c | ||
|
|
6e6e40dd7f | ||
|
|
6939fc3db6 | ||
|
|
c4c48cfd63 | ||
|
|
82215d20b0 | ||
|
|
62737f3776 |
29
.github/workflows/benchmarking.yml
vendored
29
.github/workflows/benchmarking.yml
vendored
@@ -11,7 +11,7 @@ on:
|
||||
# │ │ ┌───────────── day of the month (1 - 31)
|
||||
# │ │ │ ┌───────────── month (1 - 12 or JAN-DEC)
|
||||
# │ │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT)
|
||||
- cron: '0 3 * * *' # run once a day, timezone is utc
|
||||
- cron: '0 3 * * *' # run once a day, timezone is utc
|
||||
|
||||
workflow_dispatch: # adds ability to run this manually
|
||||
inputs:
|
||||
@@ -23,6 +23,21 @@ on:
|
||||
type: boolean
|
||||
description: 'Publish perf report. If not set, the report will be published only for the main branch'
|
||||
required: false
|
||||
collect_olap_explain:
|
||||
type: boolean
|
||||
description: 'Collect EXPLAIN ANALYZE for OLAP queries. If not set, EXPLAIN ANALYZE will not be collected'
|
||||
required: false
|
||||
default: false
|
||||
collect_pg_stat_statements:
|
||||
type: boolean
|
||||
description: 'Collect pg_stat_statements for OLAP queries. If not set, pg_stat_statements will not be collected'
|
||||
required: false
|
||||
default: false
|
||||
run_AWS_RDS_AND_AURORA:
|
||||
type: boolean
|
||||
description: 'AWS-RDS and AWS-AURORA normally only run on Saturday. Set this to true to run them on every workflow_dispatch'
|
||||
required: false
|
||||
default: false
|
||||
|
||||
defaults:
|
||||
run:
|
||||
@@ -113,6 +128,8 @@ jobs:
|
||||
# - neon-captest-reuse: Reusing existing project
|
||||
# - rds-aurora: Aurora Postgres Serverless v2 with autoscaling from 0.5 to 2 ACUs
|
||||
# - rds-postgres: RDS Postgres db.m5.large instance (2 vCPU, 8 GiB) with gp3 EBS storage
|
||||
env:
|
||||
RUN_AWS_RDS_AND_AURORA: ${{ github.event.inputs.run_AWS_RDS_AND_AURORA || 'false' }}
|
||||
runs-on: ubuntu-latest
|
||||
outputs:
|
||||
pgbench-compare-matrix: ${{ steps.pgbench-compare-matrix.outputs.matrix }}
|
||||
@@ -152,7 +169,7 @@ jobs:
|
||||
]
|
||||
}'
|
||||
|
||||
if [ "$(date +%A)" = "Saturday" ]; then
|
||||
if [ "$(date +%A)" = "Saturday" ] || [ ${RUN_AWS_RDS_AND_AURORA} = "true" ]; then
|
||||
matrix=$(echo "$matrix" | jq '.include += [{ "platform": "rds-postgres" },
|
||||
{ "platform": "rds-aurora" }]')
|
||||
fi
|
||||
@@ -171,9 +188,9 @@ jobs:
|
||||
]
|
||||
}'
|
||||
|
||||
if [ "$(date +%A)" = "Saturday" ]; then
|
||||
if [ "$(date +%A)" = "Saturday" ] || [ ${RUN_AWS_RDS_AND_AURORA} = "true" ]; then
|
||||
matrix=$(echo "$matrix" | jq '.include += [{ "platform": "rds-postgres", "scale": "10" },
|
||||
{ "platform": "rds-aurora", "scale": "10" }]')
|
||||
{ "platform": "rds-aurora", "scale": "10" }]')
|
||||
fi
|
||||
|
||||
echo "matrix=$(echo "$matrix" | jq --compact-output '.')" >> $GITHUB_OUTPUT
|
||||
@@ -337,6 +354,8 @@ jobs:
|
||||
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
|
||||
DEFAULT_PG_VERSION: 14
|
||||
TEST_OUTPUT: /tmp/test_output
|
||||
TEST_OLAP_COLLECT_EXPLAIN: ${{ github.event.inputs.collect_olap_explain }}
|
||||
TEST_OLAP_COLLECT_PG_STAT_STATEMENTS: ${{ github.event.inputs.collect_pg_stat_statements }}
|
||||
BUILD_TYPE: remote
|
||||
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
|
||||
PLATFORM: ${{ matrix.platform }}
|
||||
@@ -399,6 +418,8 @@ jobs:
|
||||
env:
|
||||
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
|
||||
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
|
||||
TEST_OLAP_COLLECT_EXPLAIN: ${{ github.event.inputs.collect_olap_explain || 'false' }}
|
||||
TEST_OLAP_COLLECT_PG_STAT_STATEMENTS: ${{ github.event.inputs.collect_pg_stat_statements || 'false' }}
|
||||
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
|
||||
TEST_OLAP_SCALE: 10
|
||||
|
||||
|
||||
2
.github/workflows/build_and_test.yml
vendored
2
.github/workflows/build_and_test.yml
vendored
@@ -857,7 +857,7 @@ jobs:
|
||||
run:
|
||||
shell: sh -eu {0}
|
||||
env:
|
||||
VM_BUILDER_VERSION: v0.19.0
|
||||
VM_BUILDER_VERSION: v0.21.0
|
||||
|
||||
steps:
|
||||
- name: Checkout
|
||||
|
||||
82
Cargo.lock
generated
82
Cargo.lock
generated
@@ -190,9 +190,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "async-compression"
|
||||
version = "0.4.0"
|
||||
version = "0.4.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5b0122885821398cc923ece939e24d1056a2384ee719432397fa9db87230ff11"
|
||||
checksum = "bc2d0cfb2a7388d34f590e76686704c494ed7aaceed62ee1ba35cbf363abc2a5"
|
||||
dependencies = [
|
||||
"flate2",
|
||||
"futures-core",
|
||||
@@ -2106,20 +2106,6 @@ dependencies = [
|
||||
"hashbrown 0.13.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hdrhistogram"
|
||||
version = "7.5.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d"
|
||||
dependencies = [
|
||||
"base64 0.21.1",
|
||||
"byteorder",
|
||||
"crossbeam-channel",
|
||||
"flate2",
|
||||
"nom",
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "heapless"
|
||||
version = "0.8.0"
|
||||
@@ -2501,13 +2487,14 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "jsonwebtoken"
|
||||
version = "8.3.0"
|
||||
version = "9.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6971da4d9c3aa03c3d8f3ff0f4155b534aad021292003895a469716b2a230378"
|
||||
checksum = "5c7ea04a7c5c055c175f189b6dc6ba036fd62306b58c66c9f6389036c503a3f4"
|
||||
dependencies = [
|
||||
"base64 0.21.1",
|
||||
"pem 1.1.1",
|
||||
"ring 0.16.20",
|
||||
"js-sys",
|
||||
"pem 3.0.3",
|
||||
"ring 0.17.6",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"simple_asn1",
|
||||
@@ -3070,28 +3057,6 @@ dependencies = [
|
||||
"sha2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pagebench"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"clap",
|
||||
"futures",
|
||||
"hdrhistogram",
|
||||
"humantime",
|
||||
"humantime-serde",
|
||||
"pageserver",
|
||||
"pageserver_api",
|
||||
"pageserver_client",
|
||||
"rand 0.8.5",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pagectl"
|
||||
version = "0.1.0"
|
||||
@@ -3327,18 +3292,19 @@ checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099"
|
||||
|
||||
[[package]]
|
||||
name = "pem"
|
||||
version = "1.1.1"
|
||||
version = "2.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a8835c273a76a90455d7344889b0964598e3316e2a79ede8e36f16bdcf2228b8"
|
||||
checksum = "6b13fe415cdf3c8e44518e18a7c95a13431d9bdf6d15367d82b23c377fdd441a"
|
||||
dependencies = [
|
||||
"base64 0.13.1",
|
||||
"base64 0.21.1",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pem"
|
||||
version = "2.0.1"
|
||||
version = "3.0.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6b13fe415cdf3c8e44518e18a7c95a13431d9bdf6d15367d82b23c377fdd441a"
|
||||
checksum = "1b8fcc794035347fb64beda2d3b462595dd2753e3f268d89c5aae77e8cf2c310"
|
||||
dependencies = [
|
||||
"base64 0.21.1",
|
||||
"serde",
|
||||
@@ -4464,12 +4430,12 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
|
||||
|
||||
[[package]]
|
||||
name = "sct"
|
||||
version = "0.7.0"
|
||||
version = "0.7.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4"
|
||||
checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414"
|
||||
dependencies = [
|
||||
"ring 0.16.20",
|
||||
"untrusted 0.7.1",
|
||||
"ring 0.17.6",
|
||||
"untrusted 0.9.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -6448,30 +6414,28 @@ checksum = "2a0956f1ba7c7909bfb66c2e9e4124ab6f6482560f6628b5aaeba39207c9aad9"
|
||||
|
||||
[[package]]
|
||||
name = "zstd"
|
||||
version = "0.12.4"
|
||||
version = "0.13.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1a27595e173641171fc74a1232b7b1c7a7cb6e18222c11e9dfb9888fa424c53c"
|
||||
checksum = "bffb3309596d527cfcba7dfc6ed6052f1d39dfbd7c867aa2e865e4a449c10110"
|
||||
dependencies = [
|
||||
"zstd-safe",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zstd-safe"
|
||||
version = "6.0.6"
|
||||
version = "7.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ee98ffd0b48ee95e6c5168188e44a54550b1564d9d530ee21d5f0eaed1069581"
|
||||
checksum = "43747c7422e2924c11144d5229878b98180ef8b06cca4ab5af37afc8a8d8ea3e"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"zstd-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zstd-sys"
|
||||
version = "2.0.8+zstd.1.5.5"
|
||||
version = "2.0.9+zstd.1.5.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5556e6ee25d32df2586c098bbfa278803692a20d0ab9565e049480d52707ec8c"
|
||||
checksum = "9e16efa8a874a0481a574084d34cc26fdb3b99627480f785888deb6386506656"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"libc",
|
||||
"pkg-config",
|
||||
]
|
||||
|
||||
@@ -6,7 +6,6 @@ members = [
|
||||
"pageserver",
|
||||
"pageserver/ctl",
|
||||
"pageserver/client",
|
||||
"pageserver/pagebench",
|
||||
"proxy",
|
||||
"safekeeper",
|
||||
"storage_broker",
|
||||
@@ -80,7 +79,6 @@ futures-util = "0.3"
|
||||
git-version = "0.3"
|
||||
hashbrown = "0.13"
|
||||
hashlink = "0.8.1"
|
||||
hdrhistogram = "7.5.2"
|
||||
hex = "0.4"
|
||||
hex-literal = "0.4"
|
||||
hmac = "0.12.1"
|
||||
@@ -93,7 +91,7 @@ hyper-tungstenite = "0.11"
|
||||
inotify = "0.10.2"
|
||||
ipnet = "2.9.0"
|
||||
itertools = "0.10"
|
||||
jsonwebtoken = "8"
|
||||
jsonwebtoken = "9"
|
||||
libc = "0.2"
|
||||
md5 = "0.7.0"
|
||||
memoffset = "0.8"
|
||||
|
||||
@@ -569,6 +569,23 @@ RUN wget https://github.com/ChenHuajun/pg_roaringbitmap/archive/refs/tags/v0.5.4
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/roaringbitmap.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "pg-semver-pg-build"
|
||||
# compile pg_semver extension
|
||||
#
|
||||
#########################################################################################
|
||||
FROM build-deps AS pg-semver-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
ENV PATH "/usr/local/pgsql/bin/:$PATH"
|
||||
RUN wget https://github.com/theory/pg-semver/archive/refs/tags/v0.32.1.tar.gz -O pg_semver.tar.gz && \
|
||||
echo "fbdaf7512026d62eec03fad8687c15ed509b6ba395bff140acd63d2e4fbe25d7 pg_semver.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_semver-src && cd pg_semver-src && tar xvzf ../pg_semver.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/semver.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "pg-embedding-pg-build"
|
||||
@@ -768,6 +785,7 @@ COPY --from=pg-pgx-ulid-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=rdkit-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-uuidv7-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-roaringbitmap-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-semver-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-embedding-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=wal2json-pg-build /usr/local/pgsql /usr/local/pgsql
|
||||
COPY pgxn/ pgxn/
|
||||
|
||||
@@ -37,5 +37,5 @@ workspace_hack.workspace = true
|
||||
toml_edit.workspace = true
|
||||
remote_storage = { version = "0.1", path = "../libs/remote_storage/" }
|
||||
vm_monitor = { version = "0.1", path = "../libs/vm_monitor/" }
|
||||
zstd = "0.12.4"
|
||||
zstd = "0.13"
|
||||
bytes = "1.0"
|
||||
|
||||
@@ -298,7 +298,7 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
|
||||
// safe to add more permissions here. BYPASSRLS and REPLICATION are inherited
|
||||
// from neon_superuser.
|
||||
let mut query: String = format!(
|
||||
"CREATE ROLE {} INHERIT CREATEROLE CREATEDB IN ROLE neon_superuser",
|
||||
"CREATE ROLE {} INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser",
|
||||
name.pg_quote()
|
||||
);
|
||||
info!("role create query: '{}'", &query);
|
||||
@@ -370,33 +370,49 @@ pub fn handle_role_deletions(spec: &ComputeSpec, connstr: &str, client: &mut Cli
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn reassign_owned_objects_in_one_db(
|
||||
conf: Config,
|
||||
role_name: &PgIdent,
|
||||
db_owner: &PgIdent,
|
||||
) -> Result<()> {
|
||||
let mut client = conf.connect(NoTls)?;
|
||||
|
||||
// This will reassign all dependent objects to the db owner
|
||||
let reassign_query = format!(
|
||||
"REASSIGN OWNED BY {} TO {}",
|
||||
role_name.pg_quote(),
|
||||
db_owner.pg_quote()
|
||||
);
|
||||
info!(
|
||||
"reassigning objects owned by '{}' in db '{}' to '{}'",
|
||||
role_name,
|
||||
conf.get_dbname().unwrap_or(""),
|
||||
db_owner
|
||||
);
|
||||
client.simple_query(&reassign_query)?;
|
||||
|
||||
// This now will only drop privileges of the role
|
||||
let drop_query = format!("DROP OWNED BY {}", role_name.pg_quote());
|
||||
client.simple_query(&drop_query)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Reassign all owned objects in all databases to the owner of the database.
|
||||
fn reassign_owned_objects(spec: &ComputeSpec, connstr: &str, role_name: &PgIdent) -> Result<()> {
|
||||
for db in &spec.cluster.databases {
|
||||
if db.owner != *role_name {
|
||||
let mut conf = Config::from_str(connstr)?;
|
||||
conf.dbname(&db.name);
|
||||
|
||||
let mut client = conf.connect(NoTls)?;
|
||||
|
||||
// This will reassign all dependent objects to the db owner
|
||||
let reassign_query = format!(
|
||||
"REASSIGN OWNED BY {} TO {}",
|
||||
role_name.pg_quote(),
|
||||
db.owner.pg_quote()
|
||||
);
|
||||
info!(
|
||||
"reassigning objects owned by '{}' in db '{}' to '{}'",
|
||||
role_name, &db.name, &db.owner
|
||||
);
|
||||
client.simple_query(&reassign_query)?;
|
||||
|
||||
// This now will only drop privileges of the role
|
||||
let drop_query = format!("DROP OWNED BY {}", role_name.pg_quote());
|
||||
client.simple_query(&drop_query)?;
|
||||
reassign_owned_objects_in_one_db(conf, role_name, &db.owner)?;
|
||||
}
|
||||
}
|
||||
|
||||
// Also handle case when there are no databases in the spec.
|
||||
// In this case we need to reassign objects in the default database.
|
||||
let conf = Config::from_str(connstr)?;
|
||||
let db_owner = PgIdent::from_str("cloud_admin")?;
|
||||
reassign_owned_objects_in_one_db(conf, role_name, &db_owner)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
197
docs/rfcs/029-getpage-throttling.md
Normal file
197
docs/rfcs/029-getpage-throttling.md
Normal file
@@ -0,0 +1,197 @@
|
||||
# Per-Tenant GetPage@LSN Throttling
|
||||
|
||||
Author: Christian Schwarz
|
||||
Date: Oct 24, 2023
|
||||
|
||||
## Summary
|
||||
|
||||
This RFC proposes per-tenant throttling of GetPage@LSN requests inside Pageserver
|
||||
and the interactions with its client, i.e., the neon_smgr component in Compute.
|
||||
|
||||
The result of implementing & executing this RFC will be a fleet-wide upper limit for
|
||||
**"the highest GetPage/second that Pageserver can support for a single tenant/shard"**.
|
||||
|
||||
## Background
|
||||
|
||||
### GetPage@LSN Request Flow
|
||||
|
||||
Pageserver exposes its `page_service.rs` as a libpq listener.
|
||||
The Computes' `neon_smgr` module connects to that libpq listener.
|
||||
Once a connection is established, the protocol allows Compute to request page images at a given LSN.
|
||||
We call these requests GetPage@LSN requests, or GetPage requests for short.
|
||||
Other request types can be sent, but these are low traffic compared to GetPage requests
|
||||
and are not the concern of this RFC.
|
||||
|
||||
Pageserver associates one libpq connection with one tokio task.
|
||||
|
||||
Per connection/task, the pq protocol is handled by the common `postgres_backend` crate.
|
||||
Its `run_message_loop` function invokes the `page_service` specific `impl<IO> postgres_backend::Handler<IO> for PageServerHandler`.
|
||||
Requests are processed in the order in which they arrive via the TCP-based pq protocol.
|
||||
So, there is no concurrent request processing within one connection/task.
|
||||
|
||||
There is a degree of natural pipelining:
|
||||
Compute can "fill the pipe" by sending more than one GetPage request into the libpq TCP stream.
|
||||
And Pageserver can fill the pipe with responses in the other direction.
|
||||
Both directions are subject to the limit of tx/rx buffers, nodelay, TCP flow control, etc.
|
||||
|
||||
### GetPage@LSN Access Pattern
|
||||
|
||||
The Compute has its own hierarchy of caches, specifically `shared_buffers` and the `local file cache` (LFC).
|
||||
Compute only issues GetPage requests to Pageserver if it encounters a miss in these caches.
|
||||
|
||||
If the working set stops fitting into Compute's caches, requests to Pageserver increase sharply -- the Compute starts *thrashing*.
|
||||
|
||||
## Motivation
|
||||
|
||||
In INC-69, a tenant issued 155k GetPage/second for a period of 10 minutes and 60k GetPage/second for a period of 3h,
|
||||
then dropping to ca 18k GetPage/second for a period of 9h.
|
||||
|
||||
We noticed this because of an internal GetPage latency SLO burn rate alert, i.e.,
|
||||
the request latency profile during this period significantly exceeded what was acceptable according to the internal SLO.
|
||||
|
||||
Sadly, we do not have the observability data to determine the impact of this tenant on other tenants on the same tenants.
|
||||
|
||||
However, here are some illustrative data points for the 155k period:
|
||||
The tenant was responsible for >= 99% of the GetPage traffic and, frankly, the overall activity on this Pageserver instance.
|
||||
We were serving pages at 10 Gb/s (`155k x 8 kbyte (PAGE_SZ) per second is 1.12GiB/s = 9.4Gb/s.`)
|
||||
The CPU utilization of the instance was 75% user+system.
|
||||
Pageserver page cache served 1.75M accesses/second at a hit rate of ca 90%.
|
||||
The hit rate for materialized pages was ca. 40%.
|
||||
Curiously, IOPS to the Instance Store NVMe were very low, rarely exceeding 100.
|
||||
|
||||
The fact that the IOPS were so low / the materialized page cache hit rate was so high suggests that **this tenant's compute's caches were thrashing**.
|
||||
The compute was of type `k8s-pod`; hence, auto-scaling could/would not have helped remediate the thrashing by provisioning more RAM.
|
||||
The consequence was that the **thrashing translated into excessive GetPage requests against Pageserver**.
|
||||
|
||||
My claim is that it was **unhealthy to serve this workload at the pace we did**:
|
||||
* it is likely that other tenants were/would have experienced high latencies (again, we sadly don't have per-tenant latency data to confirm this)
|
||||
* more importantly, it was **unsustainable** to serve traffic at this pace for multiple reasons:
|
||||
* **predictability of performance**: when the working set grows, the pageserver materialized page cache hit rate drops.
|
||||
At some point, we're bound by the EC2 Instance Store NVMe drive's IOPS limit.
|
||||
The result is an **uneven** performance profile from the Compute perspective.
|
||||
|
||||
* **economics**: Neon currently does not charge for IOPS, only capacity.
|
||||
**We cannot afford to undercut the market in IOPS/$ this drastically; it leads to adverse selection and perverse incentives.**
|
||||
For example, the 155k IOPS, which we served for 10min, would cost ca. 6.5k$/month when provisioned as an io2 EBS volume.
|
||||
Even the 18k IOPS, which we served for 9h, would cost ca. 1.1k$/month when provisioned as an io2 EBS volume.
|
||||
We charge 0$.
|
||||
It could be economically advantageous to keep using a low-DRAM compute because Pageserver IOPS are fast enough and free.
|
||||
|
||||
|
||||
Note: It is helpful to think of Pageserver as a disk, because it's precisely where `neon_smgr` sits:
|
||||
vanilla Postgres gets its pages from disk, Neon Postgres gets them from Pageserver.
|
||||
So, regarding the above performance & economic arguments, it is fair to say that we currently provide an "as-fast-as-possible-IOPS" disk that we charge for only by capacity.
|
||||
|
||||
## Solution: Throttling GetPage Requests
|
||||
|
||||
**The consequence of the above analysis must be that Pageserver throttles GetPage@LSN requests**.
|
||||
That is, unless we want to start charging for provisioned GetPage@LSN/second.
|
||||
Throttling sets the correct incentive for a thrashing Compute to scale up its DRAM to the working set size.
|
||||
Neon Autoscaling will make this easy, [eventually](https://github.com/neondatabase/neon/pull/3913).
|
||||
|
||||
## The Design Space
|
||||
|
||||
What that remains is the question about *policy* and *mechanism*:
|
||||
|
||||
**Policy** concerns itself with the question of what limit applies to a given connection|timeline|tenant.
|
||||
Candidates are:
|
||||
|
||||
* hard limit, same limit value per connection|timeline|tenant
|
||||
* Per-tenant will provide an upper bound for the impact of a tenant on a given Pageserver instance.
|
||||
This is a major operational pain point / risk right now.
|
||||
* hard limit, configurable per connection|timeline|tenant
|
||||
* This outsources policy to console/control plane, with obvious advantages for flexible structuring of what service we offer to customers.
|
||||
* Note that this is not a mechanism to guarantee a minium provisioned rate, i.e., this is not a mechanism to guarantee a certain QoS for a tenant.
|
||||
* fair share among active connections|timelines|tenants per instance
|
||||
* example: each connection|timeline|tenant gets a fair fraction of the machine's GetPage/second capacity
|
||||
* NB: needs definition of "active", and knowledge of available GetPage/second capacity in advance
|
||||
* ...
|
||||
|
||||
|
||||
Regarding **mechanism**, it's clear that **backpressure** is the way to go.
|
||||
However, we must choose between
|
||||
* **implicit** backpressure through pq/TCP and
|
||||
* **explicit** rejection of requests + retries with exponential backoff
|
||||
|
||||
Further, there is the question of how throttling GetPage@LSN will affect the **internal GetPage latency SLO**:
|
||||
where do we measure the SLI for Pageserver's internal getpage latency SLO? Before or after the throttling?
|
||||
|
||||
And when we eventually move the measurement point into the Computes (to avoid coordinated omission),
|
||||
how do we avoid counting throttling-induced latency toward the internal getpage latency SLI/SLO?
|
||||
|
||||
## Scope Of This RFC
|
||||
|
||||
**This RFC proposes introducing a hard GetPage@LSN/second limit per tenant, with the same value applying to each tenant on a Pageserver**.
|
||||
|
||||
This proposal is easy to implement and significantly de-risks operating large Pageservers,
|
||||
based on the assumption that extremely-high-GetPage-rate-episodes like the one from the "Motivation" section are uncorrelated between tenants.
|
||||
|
||||
For example, suppose we pick a limit that allows up to 10 tenants to go at limit rate.
|
||||
Suppose our Pageserver can serve 100k GetPage/second total at a 100% page cache miss rate.
|
||||
If each tenant gets a hard limit of 10k GetPage/second, we can serve up to 10 tenants at limit speed without latency degradation.
|
||||
|
||||
The mechanism for backpressure will be TCP-based implicit backpressure.
|
||||
The compute team isn't concerned about prefetch queue depth.
|
||||
Pageserver will implement it by delaying the reading of requests from the libpq connection(s).
|
||||
|
||||
The rate limit will be implemented using a per-tenant token bucket.
|
||||
The bucket will be be shared among all connections to the tenant.
|
||||
The bucket implementation supports starvation-preventing `await`ing.
|
||||
The current candidate for the implementation is [`leaky_bucket`](https://docs.rs/leaky-bucket/).
|
||||
The getpage@lsn benchmark that's being added in https://github.com/neondatabase/neon/issues/5771
|
||||
can be used to evaluate the overhead of sharing the bucket among connections of a tenant.
|
||||
A possible technique to mitigate the impact of sharing the bucket would be to maintain a buffer of a few tokens per connection handler.
|
||||
|
||||
Regarding metrics / the internal GetPage latency SLO:
|
||||
we will measure the GetPage latency SLO _after_ the throttler and introduce a new metric to measure the amount of throttling, quantified by:
|
||||
- histogram that records the tenants' observations of queue depth before they start waiting (one such histogram per pageserver)
|
||||
- histogram that records the tenants' observations of time spent waiting (one such histogram per pageserver)
|
||||
|
||||
Further observability measures:
|
||||
- an INFO log message at frequency 1/min if the tenant/timeline/connection was throttled in that last minute.
|
||||
The message will identify the tenant/timeline/connection to allow correlation with compute logs/stats.
|
||||
|
||||
Rollout will happen as follows:
|
||||
- deploy 1: implementation + config: disabled by default, ability to enable it per tenant through tenant_conf
|
||||
- experimentation in staging and later production to study impact & interaction with auto-scaling
|
||||
- determination of a sensible global default value
|
||||
- the value will be chosen as high as possible ...
|
||||
- ... but low enough to work towards this RFC's goal that one tenant should not be able to dominate a pageserver instance.
|
||||
- deploy 2: implementation fixes if any + config: enabled by default with the aforementioned global default
|
||||
- reset of the experimental per-tenant overrides
|
||||
- gain experience & lower the limit over time
|
||||
- we stop lowering the limit as soon as this RFC's goal is achieved, i.e.,
|
||||
once we decide that in practice the chosen value sufficiently de-risks operating large pageservers
|
||||
|
||||
The per-tenant override will remain for emergencies and testing.
|
||||
But since Console doesn't preserve it during tenant migrations, it isn't durably configurable for the tenant.
|
||||
|
||||
Toward the upper layers of the Neon stack, the resulting limit will be
|
||||
**"the highest GetPage/second that Pageserver can support for a single tenant"**.
|
||||
|
||||
### Rationale
|
||||
|
||||
We decided against error + retry because of worries about starvation.
|
||||
|
||||
## Future Work
|
||||
|
||||
Enable per-tenant emergency override of the limit via Console.
|
||||
Should be part of a more general framework to specify tenant config overrides.
|
||||
**NB:** this is **not** the right mechanism to _sell_ different max GetPage/second levels to users,
|
||||
or _auto-scale_ the GetPage/second levels. Such functionality will require a separate RFC that
|
||||
concerns itself with GetPage/second capacity planning.
|
||||
|
||||
Compute-side metrics for GetPage latency.
|
||||
|
||||
Back-channel to inform Compute/Autoscaling/ControlPlane that the project is being throttled.
|
||||
|
||||
Compute-side neon_smgr improvements to avoid sending the same GetPage request multiple times if multiple backends experience a cache miss.
|
||||
|
||||
Dealing with read-only endpoints: users use read-only endpoints to scale reads for a single tenant.
|
||||
Possibly there are also assumptions around read-only endpoints not affecting the primary read-write endpoint's performance.
|
||||
With per-tenant rate limiting, we will not meet that expectation.
|
||||
However, we can currently only scale per tenant.
|
||||
Soon, we will have sharding (#5505), which will apply the throttling on a per-shard basis.
|
||||
But, that's orthogonal to scaling reads: if many endpoints hit one shard, they share the same throttling limit.
|
||||
To solve this properly, I think we'll need replicas for tenants / shard.
|
||||
To performance-isolate a tenant's endpoints from each other, we'd then route them to different replicas.
|
||||
@@ -368,8 +368,6 @@ pub struct TenantInfo {
|
||||
/// If a layer is present in both local FS and S3, it counts only once.
|
||||
pub current_physical_size: Option<u64>, // physical size is only included in `tenant_status` endpoint
|
||||
pub attachment_status: TenantAttachmentStatus,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub generation: Option<u32>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
@@ -924,7 +922,6 @@ mod tests {
|
||||
state: TenantState::Active,
|
||||
current_physical_size: Some(42),
|
||||
attachment_status: TenantAttachmentStatus::Attached,
|
||||
generation: None,
|
||||
};
|
||||
let expected_active = json!({
|
||||
"id": original_active.id.to_string(),
|
||||
@@ -945,7 +942,6 @@ mod tests {
|
||||
},
|
||||
current_physical_size: Some(42),
|
||||
attachment_status: TenantAttachmentStatus::Attached,
|
||||
generation: None,
|
||||
};
|
||||
let expected_broken = json!({
|
||||
"id": original_broken.id.to_string(),
|
||||
|
||||
@@ -81,10 +81,6 @@ impl TenantShardId {
|
||||
pub fn is_zero(&self) -> bool {
|
||||
self.shard_number == ShardNumber(0)
|
||||
}
|
||||
|
||||
pub fn is_unsharded(&self) -> bool {
|
||||
self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0)
|
||||
}
|
||||
}
|
||||
|
||||
/// Formatting helper
|
||||
@@ -163,7 +159,7 @@ impl From<[u8; 18]> for TenantShardId {
|
||||
/// shard we're dealing with, but do not need to know the full ShardIdentity (because
|
||||
/// we won't be doing any page->shard mapping), and do not need to know the fully qualified
|
||||
/// TenantShardId.
|
||||
#[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy)]
|
||||
#[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
|
||||
pub struct ShardIndex {
|
||||
pub shard_number: ShardNumber,
|
||||
pub shard_count: ShardCount,
|
||||
|
||||
@@ -218,14 +218,6 @@ impl S3Bucket {
|
||||
|
||||
let started_at = ScopeGuard::into_inner(started_at);
|
||||
|
||||
if get_object.is_err() {
|
||||
metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
|
||||
kind,
|
||||
AttemptOutcome::Err,
|
||||
started_at,
|
||||
);
|
||||
}
|
||||
|
||||
match get_object {
|
||||
Ok(object_output) => {
|
||||
let metadata = object_output.metadata().cloned().map(StorageMetadata);
|
||||
@@ -241,11 +233,27 @@ impl S3Bucket {
|
||||
})
|
||||
}
|
||||
Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => {
|
||||
// Count this in the AttemptOutcome::Ok bucket, because 404 is not
|
||||
// an error: we expect to sometimes fetch an object and find it missing,
|
||||
// e.g. when probing for timeline indices.
|
||||
metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
|
||||
kind,
|
||||
AttemptOutcome::Ok,
|
||||
started_at,
|
||||
);
|
||||
Err(DownloadError::NotFound)
|
||||
}
|
||||
Err(e) => Err(DownloadError::Other(
|
||||
anyhow::Error::new(e).context("download s3 object"),
|
||||
)),
|
||||
Err(e) => {
|
||||
metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
|
||||
kind,
|
||||
AttemptOutcome::Err,
|
||||
started_at,
|
||||
);
|
||||
|
||||
Err(DownloadError::Other(
|
||||
anyhow::Error::new(e).context("download s3 object"),
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
200
libs/remote_storage/tests/common/mod.rs
Normal file
200
libs/remote_storage/tests/common/mod.rs
Normal file
@@ -0,0 +1,200 @@
|
||||
use std::collections::HashSet;
|
||||
use std::ops::ControlFlow;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Context;
|
||||
use bytes::Bytes;
|
||||
use camino::Utf8Path;
|
||||
use futures::stream::Stream;
|
||||
use once_cell::sync::OnceCell;
|
||||
use remote_storage::{Download, GenericRemoteStorage, RemotePath};
|
||||
use tokio::task::JoinSet;
|
||||
use tracing::{debug, error, info};
|
||||
|
||||
static LOGGING_DONE: OnceCell<()> = OnceCell::new();
|
||||
|
||||
pub(crate) fn upload_stream(
|
||||
content: std::borrow::Cow<'static, [u8]>,
|
||||
) -> (
|
||||
impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
|
||||
usize,
|
||||
) {
|
||||
use std::borrow::Cow;
|
||||
|
||||
let content = match content {
|
||||
Cow::Borrowed(x) => Bytes::from_static(x),
|
||||
Cow::Owned(vec) => Bytes::from(vec),
|
||||
};
|
||||
wrap_stream(content)
|
||||
}
|
||||
|
||||
pub(crate) fn wrap_stream(
|
||||
content: bytes::Bytes,
|
||||
) -> (
|
||||
impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
|
||||
usize,
|
||||
) {
|
||||
let len = content.len();
|
||||
let content = futures::future::ready(Ok(content));
|
||||
|
||||
(futures::stream::once(content), len)
|
||||
}
|
||||
|
||||
pub(crate) async fn download_to_vec(dl: Download) -> anyhow::Result<Vec<u8>> {
|
||||
let mut buf = Vec::new();
|
||||
tokio::io::copy_buf(
|
||||
&mut tokio_util::io::StreamReader::new(dl.download_stream),
|
||||
&mut buf,
|
||||
)
|
||||
.await?;
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
// Uploads files `folder{j}/blob{i}.txt`. See test description for more details.
|
||||
pub(crate) async fn upload_simple_remote_data(
|
||||
client: &Arc<GenericRemoteStorage>,
|
||||
upload_tasks_count: usize,
|
||||
) -> ControlFlow<HashSet<RemotePath>, HashSet<RemotePath>> {
|
||||
info!("Creating {upload_tasks_count} remote files");
|
||||
let mut upload_tasks = JoinSet::new();
|
||||
for i in 1..upload_tasks_count + 1 {
|
||||
let task_client = Arc::clone(client);
|
||||
upload_tasks.spawn(async move {
|
||||
let blob_path = PathBuf::from(format!("folder{}/blob_{}.txt", i / 7, i));
|
||||
let blob_path = RemotePath::new(
|
||||
Utf8Path::from_path(blob_path.as_path()).expect("must be valid blob path"),
|
||||
)
|
||||
.with_context(|| format!("{blob_path:?} to RemotePath conversion"))?;
|
||||
debug!("Creating remote item {i} at path {blob_path:?}");
|
||||
|
||||
let (data, len) = upload_stream(format!("remote blob data {i}").into_bytes().into());
|
||||
task_client.upload(data, len, &blob_path, None).await?;
|
||||
|
||||
Ok::<_, anyhow::Error>(blob_path)
|
||||
});
|
||||
}
|
||||
|
||||
let mut upload_tasks_failed = false;
|
||||
let mut uploaded_blobs = HashSet::with_capacity(upload_tasks_count);
|
||||
while let Some(task_run_result) = upload_tasks.join_next().await {
|
||||
match task_run_result
|
||||
.context("task join failed")
|
||||
.and_then(|task_result| task_result.context("upload task failed"))
|
||||
{
|
||||
Ok(upload_path) => {
|
||||
uploaded_blobs.insert(upload_path);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Upload task failed: {e:?}");
|
||||
upload_tasks_failed = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if upload_tasks_failed {
|
||||
ControlFlow::Break(uploaded_blobs)
|
||||
} else {
|
||||
ControlFlow::Continue(uploaded_blobs)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn cleanup(
|
||||
client: &Arc<GenericRemoteStorage>,
|
||||
objects_to_delete: HashSet<RemotePath>,
|
||||
) {
|
||||
info!(
|
||||
"Removing {} objects from the remote storage during cleanup",
|
||||
objects_to_delete.len()
|
||||
);
|
||||
let mut delete_tasks = JoinSet::new();
|
||||
for object_to_delete in objects_to_delete {
|
||||
let task_client = Arc::clone(client);
|
||||
delete_tasks.spawn(async move {
|
||||
debug!("Deleting remote item at path {object_to_delete:?}");
|
||||
task_client
|
||||
.delete(&object_to_delete)
|
||||
.await
|
||||
.with_context(|| format!("{object_to_delete:?} removal"))
|
||||
});
|
||||
}
|
||||
|
||||
while let Some(task_run_result) = delete_tasks.join_next().await {
|
||||
match task_run_result {
|
||||
Ok(task_result) => match task_result {
|
||||
Ok(()) => {}
|
||||
Err(e) => error!("Delete task failed: {e:?}"),
|
||||
},
|
||||
Err(join_err) => error!("Delete task did not finish correctly: {join_err}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
pub(crate) struct Uploads {
|
||||
pub(crate) prefixes: HashSet<RemotePath>,
|
||||
pub(crate) blobs: HashSet<RemotePath>,
|
||||
}
|
||||
|
||||
pub(crate) async fn upload_remote_data(
|
||||
client: &Arc<GenericRemoteStorage>,
|
||||
base_prefix_str: &'static str,
|
||||
upload_tasks_count: usize,
|
||||
) -> ControlFlow<Uploads, Uploads> {
|
||||
info!("Creating {upload_tasks_count} remote files");
|
||||
let mut upload_tasks = JoinSet::new();
|
||||
for i in 1..upload_tasks_count + 1 {
|
||||
let task_client = Arc::clone(client);
|
||||
upload_tasks.spawn(async move {
|
||||
let prefix = format!("{base_prefix_str}/sub_prefix_{i}/");
|
||||
let blob_prefix = RemotePath::new(Utf8Path::new(&prefix))
|
||||
.with_context(|| format!("{prefix:?} to RemotePath conversion"))?;
|
||||
let blob_path = blob_prefix.join(Utf8Path::new(&format!("blob_{i}")));
|
||||
debug!("Creating remote item {i} at path {blob_path:?}");
|
||||
|
||||
let (data, data_len) =
|
||||
upload_stream(format!("remote blob data {i}").into_bytes().into());
|
||||
task_client.upload(data, data_len, &blob_path, None).await?;
|
||||
|
||||
Ok::<_, anyhow::Error>((blob_prefix, blob_path))
|
||||
});
|
||||
}
|
||||
|
||||
let mut upload_tasks_failed = false;
|
||||
let mut uploaded_prefixes = HashSet::with_capacity(upload_tasks_count);
|
||||
let mut uploaded_blobs = HashSet::with_capacity(upload_tasks_count);
|
||||
while let Some(task_run_result) = upload_tasks.join_next().await {
|
||||
match task_run_result
|
||||
.context("task join failed")
|
||||
.and_then(|task_result| task_result.context("upload task failed"))
|
||||
{
|
||||
Ok((upload_prefix, upload_path)) => {
|
||||
uploaded_prefixes.insert(upload_prefix);
|
||||
uploaded_blobs.insert(upload_path);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Upload task failed: {e:?}");
|
||||
upload_tasks_failed = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let uploads = Uploads {
|
||||
prefixes: uploaded_prefixes,
|
||||
blobs: uploaded_blobs,
|
||||
};
|
||||
if upload_tasks_failed {
|
||||
ControlFlow::Break(uploads)
|
||||
} else {
|
||||
ControlFlow::Continue(uploads)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn ensure_logging_ready() {
|
||||
LOGGING_DONE.get_or_init(|| {
|
||||
utils::logging::init(
|
||||
utils::logging::LogFormat::Test,
|
||||
utils::logging::TracingErrorLayerEnablement::Disabled,
|
||||
utils::logging::Output::Stdout,
|
||||
)
|
||||
.expect("logging init failed");
|
||||
});
|
||||
}
|
||||
@@ -2,23 +2,23 @@ use std::collections::HashSet;
|
||||
use std::env;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::ops::ControlFlow;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::time::UNIX_EPOCH;
|
||||
|
||||
use anyhow::Context;
|
||||
use bytes::Bytes;
|
||||
use camino::Utf8Path;
|
||||
use futures::stream::Stream;
|
||||
use once_cell::sync::OnceCell;
|
||||
use remote_storage::{
|
||||
AzureConfig, Download, GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind,
|
||||
AzureConfig, GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind,
|
||||
};
|
||||
use test_context::{test_context, AsyncTestContext};
|
||||
use tokio::task::JoinSet;
|
||||
use tracing::{debug, error, info};
|
||||
use tracing::{debug, info};
|
||||
|
||||
static LOGGING_DONE: OnceCell<()> = OnceCell::new();
|
||||
mod common;
|
||||
|
||||
use common::{
|
||||
cleanup, download_to_vec, ensure_logging_ready, upload_remote_data, upload_simple_remote_data,
|
||||
upload_stream, wrap_stream,
|
||||
};
|
||||
|
||||
const ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_AZURE_REMOTE_STORAGE";
|
||||
|
||||
@@ -30,7 +30,7 @@ const BASE_PREFIX: &str = "test";
|
||||
/// If real Azure tests are disabled, the test passes, skipping any real test run: currently, there's no way to mark the test ignored in runtime with the
|
||||
/// deafult test framework, see https://github.com/rust-lang/rust/issues/68007 for details.
|
||||
///
|
||||
/// First, the test creates a set of Azure blobs with keys `/${random_prefix_part}/${base_prefix_str}/sub_prefix_${i}/blob_${i}` in [`upload_azure_data`]
|
||||
/// First, the test creates a set of Azure blobs with keys `/${random_prefix_part}/${base_prefix_str}/sub_prefix_${i}/blob_${i}` in [`upload_remote_data`]
|
||||
/// where
|
||||
/// * `random_prefix_part` is set for the entire Azure client during the Azure client creation in [`create_azure_client`], to avoid multiple test runs interference
|
||||
/// * `base_prefix_str` is a common prefix to use in the client requests: we would want to ensure that the client is able to list nested prefixes inside the bucket
|
||||
@@ -97,7 +97,7 @@ async fn azure_pagination_should_work(
|
||||
/// Uses real Azure and requires [`ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME`] and related Azure cred env vars specified. Test will skip real code and pass if env vars not set.
|
||||
/// See `Azure_pagination_should_work` for more information.
|
||||
///
|
||||
/// First, create a set of Azure objects with keys `random_prefix/folder{j}/blob_{i}.txt` in [`upload_azure_data`]
|
||||
/// First, create a set of Azure objects with keys `random_prefix/folder{j}/blob_{i}.txt` in [`upload_remote_data`]
|
||||
/// Then performs the following queries:
|
||||
/// 1. `list_files(None)`. This should return all files `random_prefix/folder{j}/blob_{i}.txt`
|
||||
/// 2. `list_files("folder1")`. This should return all files `random_prefix/folder1/blob_{i}.txt`
|
||||
@@ -218,18 +218,9 @@ async fn azure_upload_download_works(ctx: &mut MaybeEnabledAzure) -> anyhow::Res
|
||||
|
||||
ctx.client.upload(data, len, &path, None).await?;
|
||||
|
||||
async fn download_and_compare(dl: Download) -> anyhow::Result<Vec<u8>> {
|
||||
let mut buf = Vec::new();
|
||||
tokio::io::copy_buf(
|
||||
&mut tokio_util::io::StreamReader::new(dl.download_stream),
|
||||
&mut buf,
|
||||
)
|
||||
.await?;
|
||||
Ok(buf)
|
||||
}
|
||||
// Normal download request
|
||||
let dl = ctx.client.download(&path).await?;
|
||||
let buf = download_and_compare(dl).await?;
|
||||
let buf = download_to_vec(dl).await?;
|
||||
assert_eq!(&buf, &orig);
|
||||
|
||||
// Full range (end specified)
|
||||
@@ -237,12 +228,12 @@ async fn azure_upload_download_works(ctx: &mut MaybeEnabledAzure) -> anyhow::Res
|
||||
.client
|
||||
.download_byte_range(&path, 0, Some(len as u64))
|
||||
.await?;
|
||||
let buf = download_and_compare(dl).await?;
|
||||
let buf = download_to_vec(dl).await?;
|
||||
assert_eq!(&buf, &orig);
|
||||
|
||||
// partial range (end specified)
|
||||
let dl = ctx.client.download_byte_range(&path, 4, Some(10)).await?;
|
||||
let buf = download_and_compare(dl).await?;
|
||||
let buf = download_to_vec(dl).await?;
|
||||
assert_eq!(&buf, &orig[4..10]);
|
||||
|
||||
// partial range (end beyond real end)
|
||||
@@ -250,17 +241,17 @@ async fn azure_upload_download_works(ctx: &mut MaybeEnabledAzure) -> anyhow::Res
|
||||
.client
|
||||
.download_byte_range(&path, 8, Some(len as u64 * 100))
|
||||
.await?;
|
||||
let buf = download_and_compare(dl).await?;
|
||||
let buf = download_to_vec(dl).await?;
|
||||
assert_eq!(&buf, &orig[8..]);
|
||||
|
||||
// Partial range (end unspecified)
|
||||
let dl = ctx.client.download_byte_range(&path, 4, None).await?;
|
||||
let buf = download_and_compare(dl).await?;
|
||||
let buf = download_to_vec(dl).await?;
|
||||
assert_eq!(&buf, &orig[4..]);
|
||||
|
||||
// Full range (end unspecified)
|
||||
let dl = ctx.client.download_byte_range(&path, 0, None).await?;
|
||||
let buf = download_and_compare(dl).await?;
|
||||
let buf = download_to_vec(dl).await?;
|
||||
assert_eq!(&buf, &orig);
|
||||
|
||||
debug!("Cleanup: deleting file at path {path:?}");
|
||||
@@ -272,17 +263,6 @@ async fn azure_upload_download_works(ctx: &mut MaybeEnabledAzure) -> anyhow::Res
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn ensure_logging_ready() {
|
||||
LOGGING_DONE.get_or_init(|| {
|
||||
utils::logging::init(
|
||||
utils::logging::LogFormat::Test,
|
||||
utils::logging::TracingErrorLayerEnablement::Disabled,
|
||||
utils::logging::Output::Stdout,
|
||||
)
|
||||
.expect("logging init failed");
|
||||
});
|
||||
}
|
||||
|
||||
struct EnabledAzure {
|
||||
client: Arc<GenericRemoteStorage>,
|
||||
base_prefix: &'static str,
|
||||
@@ -352,7 +332,7 @@ impl AsyncTestContext for MaybeEnabledAzureWithTestBlobs {
|
||||
|
||||
let enabled = EnabledAzure::setup(Some(max_keys_in_list_response)).await;
|
||||
|
||||
match upload_azure_data(&enabled.client, enabled.base_prefix, upload_tasks_count).await {
|
||||
match upload_remote_data(&enabled.client, enabled.base_prefix, upload_tasks_count).await {
|
||||
ControlFlow::Continue(uploads) => {
|
||||
info!("Remote objects created successfully");
|
||||
|
||||
@@ -414,7 +394,7 @@ impl AsyncTestContext for MaybeEnabledAzureWithSimpleTestBlobs {
|
||||
|
||||
let enabled = EnabledAzure::setup(Some(max_keys_in_list_response)).await;
|
||||
|
||||
match upload_simple_azure_data(&enabled.client, upload_tasks_count).await {
|
||||
match upload_simple_remote_data(&enabled.client, upload_tasks_count).await {
|
||||
ControlFlow::Continue(uploads) => {
|
||||
info!("Remote objects created successfully");
|
||||
|
||||
@@ -478,166 +458,3 @@ fn create_azure_client(
|
||||
GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?,
|
||||
))
|
||||
}
|
||||
|
||||
struct Uploads {
|
||||
prefixes: HashSet<RemotePath>,
|
||||
blobs: HashSet<RemotePath>,
|
||||
}
|
||||
|
||||
async fn upload_azure_data(
|
||||
client: &Arc<GenericRemoteStorage>,
|
||||
base_prefix_str: &'static str,
|
||||
upload_tasks_count: usize,
|
||||
) -> ControlFlow<Uploads, Uploads> {
|
||||
info!("Creating {upload_tasks_count} Azure files");
|
||||
let mut upload_tasks = JoinSet::new();
|
||||
for i in 1..upload_tasks_count + 1 {
|
||||
let task_client = Arc::clone(client);
|
||||
upload_tasks.spawn(async move {
|
||||
let prefix = format!("{base_prefix_str}/sub_prefix_{i}/");
|
||||
let blob_prefix = RemotePath::new(Utf8Path::new(&prefix))
|
||||
.with_context(|| format!("{prefix:?} to RemotePath conversion"))?;
|
||||
let blob_path = blob_prefix.join(Utf8Path::new(&format!("blob_{i}")));
|
||||
debug!("Creating remote item {i} at path {blob_path:?}");
|
||||
|
||||
let (data, len) = upload_stream(format!("remote blob data {i}").into_bytes().into());
|
||||
task_client.upload(data, len, &blob_path, None).await?;
|
||||
|
||||
Ok::<_, anyhow::Error>((blob_prefix, blob_path))
|
||||
});
|
||||
}
|
||||
|
||||
let mut upload_tasks_failed = false;
|
||||
let mut uploaded_prefixes = HashSet::with_capacity(upload_tasks_count);
|
||||
let mut uploaded_blobs = HashSet::with_capacity(upload_tasks_count);
|
||||
while let Some(task_run_result) = upload_tasks.join_next().await {
|
||||
match task_run_result
|
||||
.context("task join failed")
|
||||
.and_then(|task_result| task_result.context("upload task failed"))
|
||||
{
|
||||
Ok((upload_prefix, upload_path)) => {
|
||||
uploaded_prefixes.insert(upload_prefix);
|
||||
uploaded_blobs.insert(upload_path);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Upload task failed: {e:?}");
|
||||
upload_tasks_failed = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let uploads = Uploads {
|
||||
prefixes: uploaded_prefixes,
|
||||
blobs: uploaded_blobs,
|
||||
};
|
||||
if upload_tasks_failed {
|
||||
ControlFlow::Break(uploads)
|
||||
} else {
|
||||
ControlFlow::Continue(uploads)
|
||||
}
|
||||
}
|
||||
|
||||
async fn cleanup(client: &Arc<GenericRemoteStorage>, objects_to_delete: HashSet<RemotePath>) {
|
||||
info!(
|
||||
"Removing {} objects from the remote storage during cleanup",
|
||||
objects_to_delete.len()
|
||||
);
|
||||
let mut delete_tasks = JoinSet::new();
|
||||
for object_to_delete in objects_to_delete {
|
||||
let task_client = Arc::clone(client);
|
||||
delete_tasks.spawn(async move {
|
||||
debug!("Deleting remote item at path {object_to_delete:?}");
|
||||
task_client
|
||||
.delete(&object_to_delete)
|
||||
.await
|
||||
.with_context(|| format!("{object_to_delete:?} removal"))
|
||||
});
|
||||
}
|
||||
|
||||
while let Some(task_run_result) = delete_tasks.join_next().await {
|
||||
match task_run_result {
|
||||
Ok(task_result) => match task_result {
|
||||
Ok(()) => {}
|
||||
Err(e) => error!("Delete task failed: {e:?}"),
|
||||
},
|
||||
Err(join_err) => error!("Delete task did not finish correctly: {join_err}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Uploads files `folder{j}/blob{i}.txt`. See test description for more details.
|
||||
async fn upload_simple_azure_data(
|
||||
client: &Arc<GenericRemoteStorage>,
|
||||
upload_tasks_count: usize,
|
||||
) -> ControlFlow<HashSet<RemotePath>, HashSet<RemotePath>> {
|
||||
info!("Creating {upload_tasks_count} Azure files");
|
||||
let mut upload_tasks = JoinSet::new();
|
||||
for i in 1..upload_tasks_count + 1 {
|
||||
let task_client = Arc::clone(client);
|
||||
upload_tasks.spawn(async move {
|
||||
let blob_path = PathBuf::from(format!("folder{}/blob_{}.txt", i / 7, i));
|
||||
let blob_path = RemotePath::new(
|
||||
Utf8Path::from_path(blob_path.as_path()).expect("must be valid blob path"),
|
||||
)
|
||||
.with_context(|| format!("{blob_path:?} to RemotePath conversion"))?;
|
||||
debug!("Creating remote item {i} at path {blob_path:?}");
|
||||
|
||||
let (data, len) = upload_stream(format!("remote blob data {i}").into_bytes().into());
|
||||
task_client.upload(data, len, &blob_path, None).await?;
|
||||
|
||||
Ok::<_, anyhow::Error>(blob_path)
|
||||
});
|
||||
}
|
||||
|
||||
let mut upload_tasks_failed = false;
|
||||
let mut uploaded_blobs = HashSet::with_capacity(upload_tasks_count);
|
||||
while let Some(task_run_result) = upload_tasks.join_next().await {
|
||||
match task_run_result
|
||||
.context("task join failed")
|
||||
.and_then(|task_result| task_result.context("upload task failed"))
|
||||
{
|
||||
Ok(upload_path) => {
|
||||
uploaded_blobs.insert(upload_path);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Upload task failed: {e:?}");
|
||||
upload_tasks_failed = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if upload_tasks_failed {
|
||||
ControlFlow::Break(uploaded_blobs)
|
||||
} else {
|
||||
ControlFlow::Continue(uploaded_blobs)
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME: copypasted from test_real_s3, can't remember how to share a module which is not compiled
|
||||
// to binary
|
||||
fn upload_stream(
|
||||
content: std::borrow::Cow<'static, [u8]>,
|
||||
) -> (
|
||||
impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
|
||||
usize,
|
||||
) {
|
||||
use std::borrow::Cow;
|
||||
|
||||
let content = match content {
|
||||
Cow::Borrowed(x) => Bytes::from_static(x),
|
||||
Cow::Owned(vec) => Bytes::from(vec),
|
||||
};
|
||||
wrap_stream(content)
|
||||
}
|
||||
|
||||
fn wrap_stream(
|
||||
content: bytes::Bytes,
|
||||
) -> (
|
||||
impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
|
||||
usize,
|
||||
) {
|
||||
let len = content.len();
|
||||
let content = futures::future::ready(Ok(content));
|
||||
|
||||
(futures::stream::once(content), len)
|
||||
}
|
||||
|
||||
@@ -2,23 +2,23 @@ use std::collections::HashSet;
|
||||
use std::env;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::ops::ControlFlow;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::time::UNIX_EPOCH;
|
||||
|
||||
use anyhow::Context;
|
||||
use bytes::Bytes;
|
||||
use camino::Utf8Path;
|
||||
use futures::stream::Stream;
|
||||
use once_cell::sync::OnceCell;
|
||||
use remote_storage::{
|
||||
GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind, S3Config,
|
||||
};
|
||||
use test_context::{test_context, AsyncTestContext};
|
||||
use tokio::task::JoinSet;
|
||||
use tracing::{debug, error, info};
|
||||
use tracing::{debug, info};
|
||||
|
||||
static LOGGING_DONE: OnceCell<()> = OnceCell::new();
|
||||
mod common;
|
||||
|
||||
use common::{
|
||||
cleanup, download_to_vec, ensure_logging_ready, upload_remote_data, upload_simple_remote_data,
|
||||
upload_stream, wrap_stream,
|
||||
};
|
||||
|
||||
const ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_S3_REMOTE_STORAGE";
|
||||
|
||||
@@ -30,7 +30,7 @@ const BASE_PREFIX: &str = "test";
|
||||
/// If real S3 tests are disabled, the test passes, skipping any real test run: currently, there's no way to mark the test ignored in runtime with the
|
||||
/// deafult test framework, see https://github.com/rust-lang/rust/issues/68007 for details.
|
||||
///
|
||||
/// First, the test creates a set of S3 objects with keys `/${random_prefix_part}/${base_prefix_str}/sub_prefix_${i}/blob_${i}` in [`upload_s3_data`]
|
||||
/// First, the test creates a set of S3 objects with keys `/${random_prefix_part}/${base_prefix_str}/sub_prefix_${i}/blob_${i}` in [`upload_remote_data`]
|
||||
/// where
|
||||
/// * `random_prefix_part` is set for the entire S3 client during the S3 client creation in [`create_s3_client`], to avoid multiple test runs interference
|
||||
/// * `base_prefix_str` is a common prefix to use in the client requests: we would want to ensure that the client is able to list nested prefixes inside the bucket
|
||||
@@ -95,7 +95,7 @@ async fn s3_pagination_should_work(ctx: &mut MaybeEnabledS3WithTestBlobs) -> any
|
||||
/// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified. Test will skip real code and pass if env vars not set.
|
||||
/// See `s3_pagination_should_work` for more information.
|
||||
///
|
||||
/// First, create a set of S3 objects with keys `random_prefix/folder{j}/blob_{i}.txt` in [`upload_s3_data`]
|
||||
/// First, create a set of S3 objects with keys `random_prefix/folder{j}/blob_{i}.txt` in [`upload_remote_data`]
|
||||
/// Then performs the following queries:
|
||||
/// 1. `list_files(None)`. This should return all files `random_prefix/folder{j}/blob_{i}.txt`
|
||||
/// 2. `list_files("folder1")`. This should return all files `random_prefix/folder1/blob_{i}.txt`
|
||||
@@ -198,15 +198,65 @@ async fn s3_delete_objects_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()>
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn ensure_logging_ready() {
|
||||
LOGGING_DONE.get_or_init(|| {
|
||||
utils::logging::init(
|
||||
utils::logging::LogFormat::Test,
|
||||
utils::logging::TracingErrorLayerEnablement::Disabled,
|
||||
utils::logging::Output::Stdout,
|
||||
)
|
||||
.expect("logging init failed");
|
||||
});
|
||||
#[test_context(MaybeEnabledS3)]
|
||||
#[tokio::test]
|
||||
async fn s3_upload_download_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()> {
|
||||
let MaybeEnabledS3::Enabled(ctx) = ctx else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let path = RemotePath::new(Utf8Path::new(format!("{}/file", ctx.base_prefix).as_str()))
|
||||
.with_context(|| "RemotePath conversion")?;
|
||||
|
||||
let orig = bytes::Bytes::from_static("remote blob data here".as_bytes());
|
||||
|
||||
let (data, len) = wrap_stream(orig.clone());
|
||||
|
||||
ctx.client.upload(data, len, &path, None).await?;
|
||||
|
||||
// Normal download request
|
||||
let dl = ctx.client.download(&path).await?;
|
||||
let buf = download_to_vec(dl).await?;
|
||||
assert_eq!(&buf, &orig);
|
||||
|
||||
// Full range (end specified)
|
||||
let dl = ctx
|
||||
.client
|
||||
.download_byte_range(&path, 0, Some(len as u64))
|
||||
.await?;
|
||||
let buf = download_to_vec(dl).await?;
|
||||
assert_eq!(&buf, &orig);
|
||||
|
||||
// partial range (end specified)
|
||||
let dl = ctx.client.download_byte_range(&path, 4, Some(10)).await?;
|
||||
let buf = download_to_vec(dl).await?;
|
||||
assert_eq!(&buf, &orig[4..10]);
|
||||
|
||||
// partial range (end beyond real end)
|
||||
let dl = ctx
|
||||
.client
|
||||
.download_byte_range(&path, 8, Some(len as u64 * 100))
|
||||
.await?;
|
||||
let buf = download_to_vec(dl).await?;
|
||||
assert_eq!(&buf, &orig[8..]);
|
||||
|
||||
// Partial range (end unspecified)
|
||||
let dl = ctx.client.download_byte_range(&path, 4, None).await?;
|
||||
let buf = download_to_vec(dl).await?;
|
||||
assert_eq!(&buf, &orig[4..]);
|
||||
|
||||
// Full range (end unspecified)
|
||||
let dl = ctx.client.download_byte_range(&path, 0, None).await?;
|
||||
let buf = download_to_vec(dl).await?;
|
||||
assert_eq!(&buf, &orig);
|
||||
|
||||
debug!("Cleanup: deleting file at path {path:?}");
|
||||
ctx.client
|
||||
.delete(&path)
|
||||
.await
|
||||
.with_context(|| format!("{path:?} removal"))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct EnabledS3 {
|
||||
@@ -278,7 +328,7 @@ impl AsyncTestContext for MaybeEnabledS3WithTestBlobs {
|
||||
|
||||
let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await;
|
||||
|
||||
match upload_s3_data(&enabled.client, enabled.base_prefix, upload_tasks_count).await {
|
||||
match upload_remote_data(&enabled.client, enabled.base_prefix, upload_tasks_count).await {
|
||||
ControlFlow::Continue(uploads) => {
|
||||
info!("Remote objects created successfully");
|
||||
|
||||
@@ -340,7 +390,7 @@ impl AsyncTestContext for MaybeEnabledS3WithSimpleTestBlobs {
|
||||
|
||||
let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await;
|
||||
|
||||
match upload_simple_s3_data(&enabled.client, upload_tasks_count).await {
|
||||
match upload_simple_remote_data(&enabled.client, upload_tasks_count).await {
|
||||
ControlFlow::Continue(uploads) => {
|
||||
info!("Remote objects created successfully");
|
||||
|
||||
@@ -403,166 +453,3 @@ fn create_s3_client(
|
||||
GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?,
|
||||
))
|
||||
}
|
||||
|
||||
struct Uploads {
|
||||
prefixes: HashSet<RemotePath>,
|
||||
blobs: HashSet<RemotePath>,
|
||||
}
|
||||
|
||||
async fn upload_s3_data(
|
||||
client: &Arc<GenericRemoteStorage>,
|
||||
base_prefix_str: &'static str,
|
||||
upload_tasks_count: usize,
|
||||
) -> ControlFlow<Uploads, Uploads> {
|
||||
info!("Creating {upload_tasks_count} S3 files");
|
||||
let mut upload_tasks = JoinSet::new();
|
||||
for i in 1..upload_tasks_count + 1 {
|
||||
let task_client = Arc::clone(client);
|
||||
upload_tasks.spawn(async move {
|
||||
let prefix = format!("{base_prefix_str}/sub_prefix_{i}/");
|
||||
let blob_prefix = RemotePath::new(Utf8Path::new(&prefix))
|
||||
.with_context(|| format!("{prefix:?} to RemotePath conversion"))?;
|
||||
let blob_path = blob_prefix.join(Utf8Path::new(&format!("blob_{i}")));
|
||||
debug!("Creating remote item {i} at path {blob_path:?}");
|
||||
|
||||
let (data, data_len) =
|
||||
upload_stream(format!("remote blob data {i}").into_bytes().into());
|
||||
task_client.upload(data, data_len, &blob_path, None).await?;
|
||||
|
||||
Ok::<_, anyhow::Error>((blob_prefix, blob_path))
|
||||
});
|
||||
}
|
||||
|
||||
let mut upload_tasks_failed = false;
|
||||
let mut uploaded_prefixes = HashSet::with_capacity(upload_tasks_count);
|
||||
let mut uploaded_blobs = HashSet::with_capacity(upload_tasks_count);
|
||||
while let Some(task_run_result) = upload_tasks.join_next().await {
|
||||
match task_run_result
|
||||
.context("task join failed")
|
||||
.and_then(|task_result| task_result.context("upload task failed"))
|
||||
{
|
||||
Ok((upload_prefix, upload_path)) => {
|
||||
uploaded_prefixes.insert(upload_prefix);
|
||||
uploaded_blobs.insert(upload_path);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Upload task failed: {e:?}");
|
||||
upload_tasks_failed = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let uploads = Uploads {
|
||||
prefixes: uploaded_prefixes,
|
||||
blobs: uploaded_blobs,
|
||||
};
|
||||
if upload_tasks_failed {
|
||||
ControlFlow::Break(uploads)
|
||||
} else {
|
||||
ControlFlow::Continue(uploads)
|
||||
}
|
||||
}
|
||||
|
||||
async fn cleanup(client: &Arc<GenericRemoteStorage>, objects_to_delete: HashSet<RemotePath>) {
|
||||
info!(
|
||||
"Removing {} objects from the remote storage during cleanup",
|
||||
objects_to_delete.len()
|
||||
);
|
||||
let mut delete_tasks = JoinSet::new();
|
||||
for object_to_delete in objects_to_delete {
|
||||
let task_client = Arc::clone(client);
|
||||
delete_tasks.spawn(async move {
|
||||
debug!("Deleting remote item at path {object_to_delete:?}");
|
||||
task_client
|
||||
.delete(&object_to_delete)
|
||||
.await
|
||||
.with_context(|| format!("{object_to_delete:?} removal"))
|
||||
});
|
||||
}
|
||||
|
||||
while let Some(task_run_result) = delete_tasks.join_next().await {
|
||||
match task_run_result {
|
||||
Ok(task_result) => match task_result {
|
||||
Ok(()) => {}
|
||||
Err(e) => error!("Delete task failed: {e:?}"),
|
||||
},
|
||||
Err(join_err) => error!("Delete task did not finish correctly: {join_err}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Uploads files `folder{j}/blob{i}.txt`. See test description for more details.
|
||||
async fn upload_simple_s3_data(
|
||||
client: &Arc<GenericRemoteStorage>,
|
||||
upload_tasks_count: usize,
|
||||
) -> ControlFlow<HashSet<RemotePath>, HashSet<RemotePath>> {
|
||||
info!("Creating {upload_tasks_count} S3 files");
|
||||
let mut upload_tasks = JoinSet::new();
|
||||
for i in 1..upload_tasks_count + 1 {
|
||||
let task_client = Arc::clone(client);
|
||||
upload_tasks.spawn(async move {
|
||||
let blob_path = PathBuf::from(format!("folder{}/blob_{}.txt", i / 7, i));
|
||||
let blob_path = RemotePath::new(
|
||||
Utf8Path::from_path(blob_path.as_path()).expect("must be valid blob path"),
|
||||
)
|
||||
.with_context(|| format!("{blob_path:?} to RemotePath conversion"))?;
|
||||
debug!("Creating remote item {i} at path {blob_path:?}");
|
||||
|
||||
let (data, data_len) =
|
||||
upload_stream(format!("remote blob data {i}").into_bytes().into());
|
||||
task_client.upload(data, data_len, &blob_path, None).await?;
|
||||
|
||||
Ok::<_, anyhow::Error>(blob_path)
|
||||
});
|
||||
}
|
||||
|
||||
let mut upload_tasks_failed = false;
|
||||
let mut uploaded_blobs = HashSet::with_capacity(upload_tasks_count);
|
||||
while let Some(task_run_result) = upload_tasks.join_next().await {
|
||||
match task_run_result
|
||||
.context("task join failed")
|
||||
.and_then(|task_result| task_result.context("upload task failed"))
|
||||
{
|
||||
Ok(upload_path) => {
|
||||
uploaded_blobs.insert(upload_path);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Upload task failed: {e:?}");
|
||||
upload_tasks_failed = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if upload_tasks_failed {
|
||||
ControlFlow::Break(uploaded_blobs)
|
||||
} else {
|
||||
ControlFlow::Continue(uploaded_blobs)
|
||||
}
|
||||
}
|
||||
|
||||
fn upload_stream(
|
||||
content: std::borrow::Cow<'static, [u8]>,
|
||||
) -> (
|
||||
impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
|
||||
usize,
|
||||
) {
|
||||
use std::borrow::Cow;
|
||||
|
||||
let content = match content {
|
||||
Cow::Borrowed(x) => Bytes::from_static(x),
|
||||
Cow::Owned(vec) => Bytes::from(vec),
|
||||
};
|
||||
wrap_stream(content)
|
||||
}
|
||||
|
||||
fn wrap_stream(
|
||||
content: bytes::Bytes,
|
||||
) -> (
|
||||
impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
|
||||
usize,
|
||||
) {
|
||||
let len = content.len();
|
||||
let content = futures::future::ready(Ok(content));
|
||||
|
||||
(futures::stream::once(content), len)
|
||||
}
|
||||
|
||||
@@ -366,47 +366,6 @@ impl MonotonicCounter<Lsn> for RecordLsn {
|
||||
}
|
||||
}
|
||||
|
||||
/// Implements [`rand::distributions::uniform::UniformSampler`] so we can sample [`Lsn`]s.
|
||||
pub struct LsnSampler(<u64 as rand::distributions::uniform::SampleUniform>::Sampler);
|
||||
|
||||
impl rand::distributions::uniform::SampleUniform for Lsn {
|
||||
type Sampler = LsnSampler;
|
||||
}
|
||||
|
||||
impl rand::distributions::uniform::UniformSampler for LsnSampler {
|
||||
type X = Lsn;
|
||||
|
||||
fn new<B1, B2>(low: B1, high: B2) -> Self
|
||||
where
|
||||
B1: rand::distributions::uniform::SampleBorrow<Self::X> + Sized,
|
||||
B2: rand::distributions::uniform::SampleBorrow<Self::X> + Sized,
|
||||
{
|
||||
Self(
|
||||
<u64 as rand::distributions::uniform::SampleUniform>::Sampler::new(
|
||||
low.borrow().0,
|
||||
high.borrow().0,
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
fn new_inclusive<B1, B2>(low: B1, high: B2) -> Self
|
||||
where
|
||||
B1: rand::distributions::uniform::SampleBorrow<Self::X> + Sized,
|
||||
B2: rand::distributions::uniform::SampleBorrow<Self::X> + Sized,
|
||||
{
|
||||
Self(
|
||||
<u64 as rand::distributions::uniform::SampleUniform>::Sampler::new_inclusive(
|
||||
low.borrow().0,
|
||||
high.borrow().0,
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
fn sample<R: rand::prelude::Rng + ?Sized>(&self, rng: &mut R) -> Self::X {
|
||||
Lsn(self.0.sample(rng))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::bin_ser::BeSer;
|
||||
|
||||
@@ -1 +1,2 @@
|
||||
#include "postgres.h"
|
||||
#include "walproposer.h"
|
||||
|
||||
@@ -1,26 +0,0 @@
|
||||
[package]
|
||||
name = "pagebench"
|
||||
version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
clap.workspace = true
|
||||
futures.workspace = true
|
||||
hdrhistogram.workspace = true
|
||||
humantime.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
rand.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
tracing.workspace = true
|
||||
tokio.workspace = true
|
||||
|
||||
pageserver = { path = ".." }
|
||||
pageserver_client.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
utils = { path = "../../libs/utils/" }
|
||||
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
|
||||
@@ -1,273 +0,0 @@
|
||||
use anyhow::Context;
|
||||
use pageserver_client::page_service::BasebackupRequest;
|
||||
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use rand::prelude::*;
|
||||
use tokio::sync::Barrier;
|
||||
use tokio::task::JoinSet;
|
||||
use tracing::{debug, info, instrument};
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::ops::Range;
|
||||
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Instant;
|
||||
|
||||
use crate::cli;
|
||||
use crate::util::tenant_timeline_id::TenantTimelineId;
|
||||
use crate::util::tokio_thread_local_stats::AllThreadLocalStats;
|
||||
use crate::util::{request_stats, tokio_thread_local_stats};
|
||||
|
||||
/// basebackup@LatestLSN
|
||||
#[derive(clap::Parser)]
|
||||
pub(crate) struct Args {
|
||||
#[clap(long, default_value = "http://localhost:9898")]
|
||||
mgmt_api_endpoint: String,
|
||||
#[clap(long, default_value = "localhost:64000")]
|
||||
page_service_host_port: String,
|
||||
#[clap(long)]
|
||||
pageserver_jwt: Option<String>,
|
||||
#[clap(long, default_value = "1")]
|
||||
num_clients: NonZeroUsize,
|
||||
#[clap(long, default_value = "1.0")]
|
||||
gzip_probability: f64,
|
||||
#[clap(long)]
|
||||
runtime: Option<humantime::Duration>,
|
||||
#[clap(long)]
|
||||
limit_to_first_n_targets: Option<usize>,
|
||||
targets: Option<Vec<TenantTimelineId>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct LiveStats {
|
||||
completed_requests: AtomicU64,
|
||||
}
|
||||
|
||||
impl LiveStats {
|
||||
fn inc(&self) {
|
||||
self.completed_requests.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
struct Target {
|
||||
timeline: TenantTimelineId,
|
||||
lsn_range: Option<Range<Lsn>>,
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize)]
|
||||
struct Output {
|
||||
total: request_stats::Output,
|
||||
}
|
||||
|
||||
tokio_thread_local_stats::declare!(STATS: request_stats::Stats);
|
||||
|
||||
pub(crate) fn main(args: Args) -> anyhow::Result<()> {
|
||||
tokio_thread_local_stats::main!(STATS, move |thread_local_stats| {
|
||||
main_impl(args, thread_local_stats)
|
||||
})
|
||||
}
|
||||
|
||||
async fn main_impl(
|
||||
args: Args,
|
||||
all_thread_local_stats: AllThreadLocalStats<request_stats::Stats>,
|
||||
) -> anyhow::Result<()> {
|
||||
let args: &'static Args = Box::leak(Box::new(args));
|
||||
|
||||
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
|
||||
args.mgmt_api_endpoint.clone(),
|
||||
args.pageserver_jwt.as_deref(),
|
||||
));
|
||||
|
||||
// discover targets
|
||||
let timelines: Vec<TenantTimelineId> = cli::targets::discover(
|
||||
&mgmt_api_client,
|
||||
cli::targets::Spec {
|
||||
limit_to_first_n_targets: args.limit_to_first_n_targets,
|
||||
targets: args.targets.clone(),
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
let mut js = JoinSet::new();
|
||||
for timeline in &timelines {
|
||||
js.spawn({
|
||||
let timeline = *timeline;
|
||||
// FIXME: this triggers initial logical size calculation
|
||||
// https://github.com/neondatabase/neon/issues/6168
|
||||
let info = mgmt_api_client
|
||||
.timeline_info(timeline.tenant_id, timeline.timeline_id)
|
||||
.await
|
||||
.unwrap();
|
||||
async move {
|
||||
anyhow::Ok(Target {
|
||||
timeline,
|
||||
// TODO: support lsn_range != latest LSN
|
||||
lsn_range: Some(info.last_record_lsn..(info.last_record_lsn + 1)),
|
||||
})
|
||||
}
|
||||
});
|
||||
}
|
||||
let mut all_targets: Vec<Target> = Vec::new();
|
||||
while let Some(res) = js.join_next().await {
|
||||
all_targets.push(res.unwrap().unwrap());
|
||||
}
|
||||
|
||||
let live_stats = Arc::new(LiveStats::default());
|
||||
|
||||
let num_client_tasks = timelines.len();
|
||||
let num_live_stats_dump = 1;
|
||||
let num_work_sender_tasks = 1;
|
||||
|
||||
let start_work_barrier = Arc::new(tokio::sync::Barrier::new(
|
||||
num_client_tasks + num_live_stats_dump + num_work_sender_tasks,
|
||||
));
|
||||
let all_work_done_barrier = Arc::new(tokio::sync::Barrier::new(num_client_tasks));
|
||||
|
||||
tokio::spawn({
|
||||
let stats = Arc::clone(&live_stats);
|
||||
let start_work_barrier = Arc::clone(&start_work_barrier);
|
||||
async move {
|
||||
start_work_barrier.wait().await;
|
||||
loop {
|
||||
let start = std::time::Instant::now();
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
|
||||
let elapsed = start.elapsed();
|
||||
info!(
|
||||
"RPS: {:.0}",
|
||||
completed_requests as f64 / elapsed.as_secs_f64()
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let mut work_senders = HashMap::new();
|
||||
let mut tasks = Vec::new();
|
||||
for tl in &timelines {
|
||||
let (sender, receiver) = tokio::sync::mpsc::channel(1); // TODO: not sure what the implications of this are
|
||||
work_senders.insert(tl, sender);
|
||||
tasks.push(tokio::spawn(client(
|
||||
args,
|
||||
*tl,
|
||||
Arc::clone(&start_work_barrier),
|
||||
receiver,
|
||||
Arc::clone(&all_work_done_barrier),
|
||||
Arc::clone(&live_stats),
|
||||
)));
|
||||
}
|
||||
|
||||
let work_sender = async move {
|
||||
start_work_barrier.wait().await;
|
||||
loop {
|
||||
let (timeline, work) = {
|
||||
let mut rng = rand::thread_rng();
|
||||
let target = all_targets.choose(&mut rng).unwrap();
|
||||
let lsn = target.lsn_range.clone().map(|r| rng.gen_range(r));
|
||||
(
|
||||
target.timeline,
|
||||
Work {
|
||||
lsn,
|
||||
gzip: rng.gen_bool(args.gzip_probability),
|
||||
},
|
||||
)
|
||||
};
|
||||
let sender = work_senders.get(&timeline).unwrap();
|
||||
// TODO: what if this blocks?
|
||||
sender.send(work).await.ok().unwrap();
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(runtime) = args.runtime {
|
||||
match tokio::time::timeout(runtime.into(), work_sender).await {
|
||||
Ok(()) => unreachable!("work sender never terminates"),
|
||||
Err(_timeout) => {
|
||||
// this implicitly drops the work_senders, making all the clients exit
|
||||
}
|
||||
}
|
||||
} else {
|
||||
work_sender.await;
|
||||
unreachable!("work sender never terminates");
|
||||
}
|
||||
|
||||
for t in tasks {
|
||||
t.await.unwrap();
|
||||
}
|
||||
|
||||
let output = Output {
|
||||
total: {
|
||||
let mut agg_stats = request_stats::Stats::new();
|
||||
for stats in all_thread_local_stats.lock().unwrap().iter() {
|
||||
let stats = stats.lock().unwrap();
|
||||
agg_stats.add(&stats);
|
||||
}
|
||||
agg_stats.output()
|
||||
},
|
||||
};
|
||||
|
||||
let output = serde_json::to_string_pretty(&output).unwrap();
|
||||
println!("{output}");
|
||||
|
||||
anyhow::Ok(())
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone)]
|
||||
struct Work {
|
||||
lsn: Option<Lsn>,
|
||||
gzip: bool,
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn client(
|
||||
args: &'static Args,
|
||||
timeline: TenantTimelineId,
|
||||
start_work_barrier: Arc<Barrier>,
|
||||
mut work: tokio::sync::mpsc::Receiver<Work>,
|
||||
all_work_done_barrier: Arc<Barrier>,
|
||||
live_stats: Arc<LiveStats>,
|
||||
) {
|
||||
start_work_barrier.wait().await;
|
||||
|
||||
let client = pageserver_client::page_service::Client::new(crate::util::connstring::connstring(
|
||||
&args.page_service_host_port,
|
||||
args.pageserver_jwt.as_deref(),
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
while let Some(Work { lsn, gzip }) = work.recv().await {
|
||||
let start = Instant::now();
|
||||
let copy_out_stream = client
|
||||
.basebackup(&BasebackupRequest {
|
||||
tenant_id: timeline.tenant_id,
|
||||
timeline_id: timeline.timeline_id,
|
||||
lsn,
|
||||
gzip,
|
||||
})
|
||||
.await
|
||||
.with_context(|| format!("start basebackup for {timeline}"))
|
||||
.unwrap();
|
||||
|
||||
use futures::StreamExt;
|
||||
let size = Arc::new(AtomicUsize::new(0));
|
||||
copy_out_stream
|
||||
.for_each({
|
||||
|r| {
|
||||
let size = Arc::clone(&size);
|
||||
async move {
|
||||
let size = Arc::clone(&size);
|
||||
size.fetch_add(r.unwrap().len(), Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
})
|
||||
.await;
|
||||
debug!("basebackup size is {} bytes", size.load(Ordering::Relaxed));
|
||||
let elapsed = start.elapsed();
|
||||
live_stats.inc();
|
||||
STATS.with(|stats| {
|
||||
stats.borrow().lock().unwrap().observe(elapsed).unwrap();
|
||||
});
|
||||
}
|
||||
|
||||
all_work_done_barrier.wait().await;
|
||||
}
|
||||
@@ -1 +0,0 @@
|
||||
pub(crate) mod targets;
|
||||
@@ -1,37 +0,0 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use pageserver_client::mgmt_api;
|
||||
use tracing::info;
|
||||
|
||||
use crate::util::{
|
||||
discover_timelines::get_pageserver_tenant_timelines, tenant_timeline_id::TenantTimelineId,
|
||||
};
|
||||
|
||||
pub(crate) struct Spec {
|
||||
pub(crate) limit_to_first_n_targets: Option<usize>,
|
||||
pub(crate) targets: Option<Vec<TenantTimelineId>>,
|
||||
}
|
||||
|
||||
pub(crate) async fn discover(
|
||||
api_client: &Arc<mgmt_api::Client>,
|
||||
spec: Spec,
|
||||
) -> anyhow::Result<Vec<TenantTimelineId>> {
|
||||
let mut timelines = if let Some(targets) = spec.targets {
|
||||
targets
|
||||
} else {
|
||||
get_pageserver_tenant_timelines(api_client).await?
|
||||
};
|
||||
|
||||
if let Some(limit) = spec.limit_to_first_n_targets {
|
||||
timelines.sort(); // for determinism
|
||||
timelines.truncate(limit);
|
||||
if timelines.len() < limit {
|
||||
anyhow::bail!("pageserver has less than limit_to_first_n_targets={limit} tenants");
|
||||
}
|
||||
}
|
||||
|
||||
info!("timelines:\n{:?}", timelines);
|
||||
info!("number of timelines:\n{:?}", timelines.len());
|
||||
|
||||
Ok(timelines)
|
||||
}
|
||||
@@ -1,337 +0,0 @@
|
||||
use anyhow::Context;
|
||||
use futures::future::join_all;
|
||||
use pageserver::pgdatadir_mapping::key_to_rel_block;
|
||||
use pageserver::repository;
|
||||
use pageserver_api::key::is_rel_block_key;
|
||||
use pageserver_client::page_service::RelTagBlockNo;
|
||||
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use rand::prelude::*;
|
||||
use tokio::sync::Barrier;
|
||||
use tokio::task::JoinSet;
|
||||
use tracing::{info, instrument};
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::future::Future;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use crate::cli;
|
||||
|
||||
use crate::util::tenant_timeline_id::TenantTimelineId;
|
||||
use crate::util::tokio_thread_local_stats::AllThreadLocalStats;
|
||||
use crate::util::{request_stats, tokio_thread_local_stats};
|
||||
|
||||
/// GetPage@LatestLSN, uniformly distributed across the compute-accessible keyspace.
|
||||
#[derive(clap::Parser)]
|
||||
pub(crate) struct Args {
|
||||
#[clap(long, default_value = "http://localhost:9898")]
|
||||
mgmt_api_endpoint: String,
|
||||
#[clap(long, default_value = "postgres://postgres@localhost:64000")]
|
||||
page_service_connstring: String,
|
||||
#[clap(long)]
|
||||
pageserver_jwt: Option<String>,
|
||||
#[clap(long, default_value = "1")]
|
||||
num_clients: NonZeroUsize,
|
||||
#[clap(long)]
|
||||
runtime: Option<humantime::Duration>,
|
||||
#[clap(long)]
|
||||
per_target_rate_limit: Option<usize>,
|
||||
#[clap(long)]
|
||||
limit_to_first_n_targets: Option<usize>,
|
||||
targets: Option<Vec<TenantTimelineId>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct LiveStats {
|
||||
completed_requests: AtomicU64,
|
||||
}
|
||||
|
||||
impl LiveStats {
|
||||
fn inc(&self) {
|
||||
self.completed_requests.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct KeyRange {
|
||||
timeline: TenantTimelineId,
|
||||
timeline_lsn: Lsn,
|
||||
start: i128,
|
||||
end: i128,
|
||||
}
|
||||
|
||||
impl KeyRange {
|
||||
fn len(&self) -> i128 {
|
||||
self.end - self.start
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize)]
|
||||
struct Output {
|
||||
total: request_stats::Output,
|
||||
}
|
||||
|
||||
tokio_thread_local_stats::declare!(STATS: request_stats::Stats);
|
||||
|
||||
pub(crate) fn main(args: Args) -> anyhow::Result<()> {
|
||||
tokio_thread_local_stats::main!(STATS, move |thread_local_stats| {
|
||||
main_impl(args, thread_local_stats)
|
||||
})
|
||||
}
|
||||
|
||||
async fn main_impl(
|
||||
args: Args,
|
||||
all_thread_local_stats: AllThreadLocalStats<request_stats::Stats>,
|
||||
) -> anyhow::Result<()> {
|
||||
let args: &'static Args = Box::leak(Box::new(args));
|
||||
|
||||
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
|
||||
args.mgmt_api_endpoint.clone(),
|
||||
args.pageserver_jwt.as_deref(),
|
||||
));
|
||||
|
||||
// discover targets
|
||||
let timelines: Vec<TenantTimelineId> = cli::targets::discover(
|
||||
&mgmt_api_client,
|
||||
cli::targets::Spec {
|
||||
limit_to_first_n_targets: args.limit_to_first_n_targets,
|
||||
targets: args.targets.clone(),
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut js = JoinSet::new();
|
||||
for timeline in &timelines {
|
||||
js.spawn({
|
||||
let mgmt_api_client = Arc::clone(&mgmt_api_client);
|
||||
let timeline = *timeline;
|
||||
async move {
|
||||
let partitioning = mgmt_api_client
|
||||
.keyspace(timeline.tenant_id, timeline.timeline_id)
|
||||
.await?;
|
||||
let lsn = partitioning.at_lsn;
|
||||
|
||||
let ranges = partitioning
|
||||
.keys
|
||||
.ranges
|
||||
.iter()
|
||||
.filter_map(|r| {
|
||||
let start = r.start;
|
||||
let end = r.end;
|
||||
// filter out non-relblock keys
|
||||
match (is_rel_block_key(&start), is_rel_block_key(&end)) {
|
||||
(true, true) => Some(KeyRange {
|
||||
timeline,
|
||||
timeline_lsn: lsn,
|
||||
start: start.to_i128(),
|
||||
end: end.to_i128(),
|
||||
}),
|
||||
(true, false) | (false, true) => {
|
||||
unimplemented!("split up range")
|
||||
}
|
||||
(false, false) => None,
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
anyhow::Ok(ranges)
|
||||
}
|
||||
});
|
||||
}
|
||||
let mut all_ranges: Vec<KeyRange> = Vec::new();
|
||||
while let Some(res) = js.join_next().await {
|
||||
all_ranges.extend(res.unwrap().unwrap());
|
||||
}
|
||||
|
||||
let live_stats = Arc::new(LiveStats::default());
|
||||
|
||||
let num_client_tasks = timelines.len();
|
||||
let num_live_stats_dump = 1;
|
||||
let num_work_sender_tasks = 1;
|
||||
|
||||
let start_work_barrier = Arc::new(tokio::sync::Barrier::new(
|
||||
num_client_tasks + num_live_stats_dump + num_work_sender_tasks,
|
||||
));
|
||||
let all_work_done_barrier = Arc::new(tokio::sync::Barrier::new(num_client_tasks));
|
||||
|
||||
tokio::spawn({
|
||||
let stats = Arc::clone(&live_stats);
|
||||
let start_work_barrier = Arc::clone(&start_work_barrier);
|
||||
async move {
|
||||
start_work_barrier.wait().await;
|
||||
loop {
|
||||
let start = std::time::Instant::now();
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
|
||||
let elapsed = start.elapsed();
|
||||
info!(
|
||||
"RPS: {:.0}",
|
||||
completed_requests as f64 / elapsed.as_secs_f64()
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let mut work_senders = HashMap::new();
|
||||
let mut tasks = Vec::new();
|
||||
for tl in &timelines {
|
||||
let (sender, receiver) = tokio::sync::mpsc::channel(10); // TODO: not sure what the implications of this are
|
||||
work_senders.insert(tl, sender);
|
||||
tasks.push(tokio::spawn(client(
|
||||
args,
|
||||
*tl,
|
||||
Arc::clone(&start_work_barrier),
|
||||
receiver,
|
||||
Arc::clone(&all_work_done_barrier),
|
||||
Arc::clone(&live_stats),
|
||||
)));
|
||||
}
|
||||
|
||||
let work_sender: Pin<Box<dyn Send + Future<Output = ()>>> = match args.per_target_rate_limit {
|
||||
None => Box::pin(async move {
|
||||
let weights = rand::distributions::weighted::WeightedIndex::new(
|
||||
all_ranges.iter().map(|v| v.len()),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
start_work_barrier.wait().await;
|
||||
|
||||
loop {
|
||||
let (range, key) = {
|
||||
let mut rng = rand::thread_rng();
|
||||
let r = &all_ranges[weights.sample(&mut rng)];
|
||||
let key: i128 = rng.gen_range(r.start..r.end);
|
||||
let key = repository::Key::from_i128(key);
|
||||
let (rel_tag, block_no) =
|
||||
key_to_rel_block(key).expect("we filter non-rel-block keys out above");
|
||||
(r, RelTagBlockNo { rel_tag, block_no })
|
||||
};
|
||||
let sender = work_senders.get(&range.timeline).unwrap();
|
||||
// TODO: what if this blocks?
|
||||
sender.send((key, range.timeline_lsn)).await.ok().unwrap();
|
||||
}
|
||||
}),
|
||||
Some(rps_limit) => Box::pin(async move {
|
||||
let period = Duration::from_secs_f64(1.0 / (rps_limit as f64));
|
||||
|
||||
let make_timeline_task: &dyn Fn(
|
||||
TenantTimelineId,
|
||||
)
|
||||
-> Pin<Box<dyn Send + Future<Output = ()>>> = &|timeline| {
|
||||
let sender = work_senders.get(&timeline).unwrap();
|
||||
let ranges: Vec<KeyRange> = all_ranges
|
||||
.iter()
|
||||
.filter(|r| r.timeline == timeline)
|
||||
.cloned()
|
||||
.collect();
|
||||
let weights = rand::distributions::weighted::WeightedIndex::new(
|
||||
ranges.iter().map(|v| v.len()),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
Box::pin(async move {
|
||||
let mut ticker = tokio::time::interval(period);
|
||||
ticker.set_missed_tick_behavior(
|
||||
/* TODO review this choice */
|
||||
tokio::time::MissedTickBehavior::Burst,
|
||||
);
|
||||
loop {
|
||||
ticker.tick().await;
|
||||
let (range, key) = {
|
||||
let mut rng = rand::thread_rng();
|
||||
let r = &ranges[weights.sample(&mut rng)];
|
||||
let key: i128 = rng.gen_range(r.start..r.end);
|
||||
let key = repository::Key::from_i128(key);
|
||||
let (rel_tag, block_no) = key_to_rel_block(key)
|
||||
.expect("we filter non-rel-block keys out above");
|
||||
(r, RelTagBlockNo { rel_tag, block_no })
|
||||
};
|
||||
sender.send((key, range.timeline_lsn)).await.ok().unwrap();
|
||||
}
|
||||
})
|
||||
};
|
||||
|
||||
let tasks: Vec<_> = work_senders
|
||||
.keys()
|
||||
.map(|tl| make_timeline_task(**tl))
|
||||
.collect();
|
||||
|
||||
start_work_barrier.wait().await;
|
||||
|
||||
join_all(tasks).await;
|
||||
}),
|
||||
};
|
||||
|
||||
if let Some(runtime) = args.runtime {
|
||||
match tokio::time::timeout(runtime.into(), work_sender).await {
|
||||
Ok(()) => unreachable!("work sender never terminates"),
|
||||
Err(_timeout) => {
|
||||
// this implicitly drops the work_senders, making all the clients exit
|
||||
}
|
||||
}
|
||||
} else {
|
||||
work_sender.await;
|
||||
unreachable!("work sender never terminates");
|
||||
}
|
||||
|
||||
for t in tasks {
|
||||
t.await.unwrap();
|
||||
}
|
||||
|
||||
let output = Output {
|
||||
total: {
|
||||
let mut agg_stats = request_stats::Stats::new();
|
||||
for stats in all_thread_local_stats.lock().unwrap().iter() {
|
||||
let stats = stats.lock().unwrap();
|
||||
agg_stats.add(&stats);
|
||||
}
|
||||
agg_stats.output()
|
||||
},
|
||||
};
|
||||
|
||||
let output = serde_json::to_string_pretty(&output).unwrap();
|
||||
println!("{output}");
|
||||
|
||||
anyhow::Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn client(
|
||||
args: &'static Args,
|
||||
timeline: TenantTimelineId,
|
||||
start_work_barrier: Arc<Barrier>,
|
||||
mut work: tokio::sync::mpsc::Receiver<(RelTagBlockNo, Lsn)>,
|
||||
all_work_done_barrier: Arc<Barrier>,
|
||||
live_stats: Arc<LiveStats>,
|
||||
) {
|
||||
start_work_barrier.wait().await;
|
||||
|
||||
let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
let mut client = client
|
||||
.pagestream(timeline.tenant_id, timeline.timeline_id)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
while let Some((key, lsn)) = work.recv().await {
|
||||
let start = Instant::now();
|
||||
client
|
||||
.getpage(key, lsn)
|
||||
.await
|
||||
.with_context(|| format!("getpage for {timeline}"))
|
||||
.unwrap();
|
||||
let elapsed = start.elapsed();
|
||||
live_stats.inc();
|
||||
STATS.with(|stats| {
|
||||
stats.borrow().lock().unwrap().observe(elapsed).unwrap();
|
||||
});
|
||||
}
|
||||
|
||||
all_work_done_barrier.wait().await;
|
||||
}
|
||||
@@ -1,34 +0,0 @@
|
||||
use clap::Parser;
|
||||
use utils::logging;
|
||||
|
||||
pub(crate) mod cli;
|
||||
pub(crate) mod util;
|
||||
|
||||
mod basebackup;
|
||||
mod getpage_latest_lsn;
|
||||
mod trigger_initial_size_calculation;
|
||||
|
||||
/// Component-level performance test for pageserver.
|
||||
#[derive(clap::Parser)]
|
||||
enum Args {
|
||||
Basebackup(basebackup::Args),
|
||||
GetPageLatestLsn(getpage_latest_lsn::Args),
|
||||
TriggerInitialSizeCalculation(trigger_initial_size_calculation::Args),
|
||||
}
|
||||
|
||||
fn main() {
|
||||
logging::init(
|
||||
logging::LogFormat::Plain,
|
||||
logging::TracingErrorLayerEnablement::Disabled,
|
||||
logging::Output::Stderr,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let args = Args::parse();
|
||||
match args {
|
||||
Args::Basebackup(args) => basebackup::main(args),
|
||||
Args::GetPageLatestLsn(args) => getpage_latest_lsn::main(args),
|
||||
Args::TriggerInitialSizeCalculation(args) => trigger_initial_size_calculation::main(args),
|
||||
}
|
||||
.unwrap()
|
||||
}
|
||||
@@ -1,86 +0,0 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use humantime::Duration;
|
||||
use tokio::task::JoinSet;
|
||||
|
||||
use crate::{cli, util::tenant_timeline_id::TenantTimelineId};
|
||||
|
||||
#[derive(clap::Parser)]
|
||||
pub(crate) struct Args {
|
||||
#[clap(long, default_value = "http://localhost:9898")]
|
||||
mgmt_api_endpoint: String,
|
||||
#[clap(long, default_value = "localhost:64000")]
|
||||
page_service_host_port: String,
|
||||
#[clap(long)]
|
||||
pageserver_jwt: Option<String>,
|
||||
#[clap(
|
||||
long,
|
||||
help = "if specified, poll mgmt api to check whether init logical size calculation has completed"
|
||||
)]
|
||||
poll_for_completion: Option<Duration>,
|
||||
#[clap(long)]
|
||||
limit_to_first_n_targets: Option<usize>,
|
||||
targets: Option<Vec<TenantTimelineId>>,
|
||||
}
|
||||
|
||||
pub(crate) fn main(args: Args) -> anyhow::Result<()> {
|
||||
let rt = tokio::runtime::Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let main_task = rt.spawn(main_impl(args));
|
||||
rt.block_on(main_task).unwrap()
|
||||
}
|
||||
|
||||
async fn main_impl(args: Args) -> anyhow::Result<()> {
|
||||
let args: &'static Args = Box::leak(Box::new(args));
|
||||
|
||||
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
|
||||
args.mgmt_api_endpoint.clone(),
|
||||
args.pageserver_jwt.as_deref(),
|
||||
));
|
||||
|
||||
// discover targets
|
||||
let timelines: Vec<TenantTimelineId> = cli::targets::discover(
|
||||
&mgmt_api_client,
|
||||
cli::targets::Spec {
|
||||
limit_to_first_n_targets: args.limit_to_first_n_targets,
|
||||
targets: args.targets.clone(),
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
// kick it off
|
||||
|
||||
let mut js = JoinSet::new();
|
||||
for tl in timelines {
|
||||
let mgmt_api_client = Arc::clone(&mgmt_api_client);
|
||||
js.spawn(async move {
|
||||
// TODO: API to explicitly trigger initial logical size computation.
|
||||
// Should probably also avoid making it a side effect of timeline details to trigger initial logical size calculation.
|
||||
// => https://github.com/neondatabase/neon/issues/6168
|
||||
let info = mgmt_api_client
|
||||
.timeline_info(tl.tenant_id, tl.timeline_id)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
if let Some(period) = args.poll_for_completion {
|
||||
let mut ticker = tokio::time::interval(period.into());
|
||||
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
|
||||
let mut info = info;
|
||||
while !info.current_logical_size_is_accurate {
|
||||
ticker.tick().await;
|
||||
info = mgmt_api_client
|
||||
.timeline_info(tl.tenant_id, tl.timeline_id)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
while let Some(res) = js.join_next().await {
|
||||
let _: () = res.unwrap();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -1,6 +0,0 @@
|
||||
pub(crate) mod connstring;
|
||||
pub(crate) mod discover_timelines;
|
||||
pub(crate) mod request_stats;
|
||||
pub(crate) mod tenant_timeline_id;
|
||||
#[macro_use]
|
||||
pub(crate) mod tokio_thread_local_stats;
|
||||
@@ -1,8 +0,0 @@
|
||||
pub(crate) fn connstring(host_port: &str, jwt: Option<&str>) -> String {
|
||||
let colon_and_jwt = if let Some(jwt) = jwt {
|
||||
format!(":{jwt}") // TODO: urlescape
|
||||
} else {
|
||||
String::new()
|
||||
};
|
||||
format!("postgres://postgres{colon_and_jwt}@{host_port}")
|
||||
}
|
||||
@@ -1,45 +0,0 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use pageserver_client::mgmt_api;
|
||||
use tokio::task::JoinSet;
|
||||
use utils::id::TenantId;
|
||||
|
||||
use super::tenant_timeline_id::TenantTimelineId;
|
||||
|
||||
pub(crate) async fn get_pageserver_tenant_timelines(
|
||||
api_client: &Arc<mgmt_api::Client>,
|
||||
) -> anyhow::Result<Vec<TenantTimelineId>> {
|
||||
let mut timelines: Vec<TenantTimelineId> = Vec::new();
|
||||
let mut tenants: Vec<TenantId> = Vec::new();
|
||||
for ti in api_client.list_tenants().await? {
|
||||
if !ti.id.is_unsharded() {
|
||||
anyhow::bail!(
|
||||
"only unsharded tenants are supported at this time: {}",
|
||||
ti.id
|
||||
);
|
||||
}
|
||||
tenants.push(ti.id.tenant_id)
|
||||
}
|
||||
let mut js = JoinSet::new();
|
||||
for tenant_id in tenants {
|
||||
js.spawn({
|
||||
let mgmt_api_client = Arc::clone(api_client);
|
||||
async move {
|
||||
(
|
||||
tenant_id,
|
||||
mgmt_api_client.tenant_details(tenant_id).await.unwrap(),
|
||||
)
|
||||
}
|
||||
});
|
||||
}
|
||||
while let Some(res) = js.join_next().await {
|
||||
let (tenant_id, details) = res.unwrap();
|
||||
for timeline_id in details.timelines {
|
||||
timelines.push(TenantTimelineId {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
});
|
||||
}
|
||||
}
|
||||
Ok(timelines)
|
||||
}
|
||||
@@ -1,88 +0,0 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Context;
|
||||
|
||||
pub(crate) struct Stats {
|
||||
latency_histo: hdrhistogram::Histogram<u64>,
|
||||
}
|
||||
|
||||
impl Stats {
|
||||
pub(crate) fn new() -> Self {
|
||||
Self {
|
||||
// Initialize with fixed bounds so that we panic at runtime instead of resizing the histogram,
|
||||
// which would skew the benchmark results.
|
||||
latency_histo: hdrhistogram::Histogram::new_with_bounds(1, 1_000_000_000, 3).unwrap(),
|
||||
}
|
||||
}
|
||||
pub(crate) fn observe(&mut self, latency: Duration) -> anyhow::Result<()> {
|
||||
let micros: u64 = latency
|
||||
.as_micros()
|
||||
.try_into()
|
||||
.context("latency greater than u64")?;
|
||||
self.latency_histo
|
||||
.record(micros)
|
||||
.context("add to histogram")?;
|
||||
Ok(())
|
||||
}
|
||||
pub(crate) fn output(&self) -> Output {
|
||||
let latency_percentiles = std::array::from_fn(|idx| {
|
||||
let micros = self
|
||||
.latency_histo
|
||||
.value_at_percentile(LATENCY_PERCENTILES[idx]);
|
||||
Duration::from_micros(micros)
|
||||
});
|
||||
Output {
|
||||
request_count: self.latency_histo.len(),
|
||||
latency_mean: Duration::from_micros(self.latency_histo.mean() as u64),
|
||||
latency_percentiles: LatencyPercentiles {
|
||||
latency_percentiles,
|
||||
},
|
||||
}
|
||||
}
|
||||
pub(crate) fn add(&mut self, other: &Self) {
|
||||
let Self {
|
||||
ref mut latency_histo,
|
||||
} = self;
|
||||
latency_histo.add(&other.latency_histo).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Stats {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
const LATENCY_PERCENTILES: [f64; 4] = [95.0, 99.00, 99.90, 99.99];
|
||||
|
||||
struct LatencyPercentiles {
|
||||
latency_percentiles: [Duration; 4],
|
||||
}
|
||||
|
||||
impl serde::Serialize for LatencyPercentiles {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
use serde::ser::SerializeMap;
|
||||
let mut ser = serializer.serialize_map(Some(LATENCY_PERCENTILES.len()))?;
|
||||
for p in LATENCY_PERCENTILES {
|
||||
ser.serialize_entry(
|
||||
&format!("p{p}"),
|
||||
&format!(
|
||||
"{}",
|
||||
&humantime::format_duration(self.latency_percentiles[0])
|
||||
),
|
||||
)?;
|
||||
}
|
||||
ser.end()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize)]
|
||||
pub(crate) struct Output {
|
||||
request_count: u64,
|
||||
#[serde(with = "humantime_serde")]
|
||||
latency_mean: Duration,
|
||||
latency_percentiles: LatencyPercentiles,
|
||||
}
|
||||
@@ -1,34 +0,0 @@
|
||||
use std::str::FromStr;
|
||||
|
||||
use anyhow::Context;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, PartialOrd, Ord)]
|
||||
pub(crate) struct TenantTimelineId {
|
||||
pub(crate) tenant_id: TenantId,
|
||||
pub(crate) timeline_id: TimelineId,
|
||||
}
|
||||
|
||||
impl FromStr for TenantTimelineId {
|
||||
type Err = anyhow::Error;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
let (tenant_id, timeline_id) = s
|
||||
.split_once('/')
|
||||
.context("tenant and timeline id must be separated by `/`")?;
|
||||
let tenant_id = TenantId::from_str(tenant_id)
|
||||
.with_context(|| format!("invalid tenant id: {tenant_id:?}"))?;
|
||||
let timeline_id = TimelineId::from_str(timeline_id)
|
||||
.with_context(|| format!("invalid timeline id: {timeline_id:?}"))?;
|
||||
Ok(Self {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for TenantTimelineId {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}/{}", self.tenant_id, self.timeline_id)
|
||||
}
|
||||
}
|
||||
@@ -1,45 +0,0 @@
|
||||
pub(crate) type ThreadLocalStats<T> = Arc<Mutex<T>>;
|
||||
pub(crate) type AllThreadLocalStats<T> = Arc<Mutex<Vec<ThreadLocalStats<T>>>>;
|
||||
|
||||
macro_rules! declare {
|
||||
($THREAD_LOCAL_NAME:ident: $T:ty) => {
|
||||
thread_local! {
|
||||
pub static $THREAD_LOCAL_NAME: std::cell::RefCell<crate::util::tokio_thread_local_stats::ThreadLocalStats<$T>> = std::cell::RefCell::new(
|
||||
std::sync::Arc::new(std::sync::Mutex::new(Default::default()))
|
||||
);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
pub(crate) use declare;
|
||||
|
||||
macro_rules! main {
|
||||
($THREAD_LOCAL_NAME:ident, $main_impl:expr) => {{
|
||||
let main_impl = $main_impl;
|
||||
let all = Arc::new(Mutex::new(Vec::new()));
|
||||
|
||||
let rt = tokio::runtime::Builder::new_multi_thread()
|
||||
.on_thread_start({
|
||||
let all = Arc::clone(&all);
|
||||
move || {
|
||||
// pre-initialize the thread local stats by accessesing them
|
||||
// (some stats like requests_stats::Stats are quite costly to initialize,
|
||||
// we don't want to pay that cost during the measurement period)
|
||||
$THREAD_LOCAL_NAME.with(|stats| {
|
||||
let stats: Arc<_> = Arc::clone(&*stats.borrow());
|
||||
all.lock().unwrap().push(stats);
|
||||
});
|
||||
}
|
||||
})
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let main_task = rt.spawn(main_impl(all));
|
||||
rt.block_on(main_task).unwrap()
|
||||
}};
|
||||
}
|
||||
|
||||
pub(crate) use main;
|
||||
@@ -267,7 +267,7 @@ async fn calculate_synthetic_size_worker(
|
||||
}
|
||||
};
|
||||
|
||||
for (tenant_shard_id, tenant_state, _gen) in tenants {
|
||||
for (tenant_shard_id, tenant_state) in tenants {
|
||||
if tenant_state != TenantState::Active {
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -196,7 +196,7 @@ pub(super) async fn collect_all_metrics(
|
||||
}
|
||||
};
|
||||
|
||||
let tenants = futures::stream::iter(tenants).filter_map(|(id, state, _)| async move {
|
||||
let tenants = futures::stream::iter(tenants).filter_map(|(id, state)| async move {
|
||||
if state != TenantState::Active || !id.is_zero() {
|
||||
None
|
||||
} else {
|
||||
|
||||
@@ -515,7 +515,7 @@ async fn collect_eviction_candidates(
|
||||
|
||||
let mut candidates = Vec::new();
|
||||
|
||||
for (tenant_id, _state, _gen) in &tenants {
|
||||
for (tenant_id, _state) in &tenants {
|
||||
if cancel.is_cancelled() {
|
||||
return Ok(EvictionCandidates::Cancelled);
|
||||
}
|
||||
|
||||
@@ -593,8 +593,6 @@ async fn get_lsn_by_timestamp_handler(
|
||||
)));
|
||||
}
|
||||
|
||||
let version: Option<u8> = parse_query_param(&request, "version")?;
|
||||
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
let timestamp_raw = must_get_query_param(&request, "timestamp")?;
|
||||
let timestamp = humantime::parse_rfc3339(×tamp_raw)
|
||||
@@ -607,31 +605,18 @@ async fn get_lsn_by_timestamp_handler(
|
||||
let result = timeline
|
||||
.find_lsn_for_timestamp(timestamp_pg, &cancel, &ctx)
|
||||
.await?;
|
||||
|
||||
if version.unwrap_or(0) > 1 {
|
||||
#[derive(serde::Serialize)]
|
||||
struct Result {
|
||||
lsn: Lsn,
|
||||
kind: &'static str,
|
||||
}
|
||||
let (lsn, kind) = match result {
|
||||
LsnForTimestamp::Present(lsn) => (lsn, "present"),
|
||||
LsnForTimestamp::Future(lsn) => (lsn, "future"),
|
||||
LsnForTimestamp::Past(lsn) => (lsn, "past"),
|
||||
LsnForTimestamp::NoData(lsn) => (lsn, "nodata"),
|
||||
};
|
||||
json_response(StatusCode::OK, Result { lsn, kind })
|
||||
} else {
|
||||
// FIXME: this is a temporary crutch not to break backwards compatibility
|
||||
// See https://github.com/neondatabase/neon/pull/5608
|
||||
let result = match result {
|
||||
LsnForTimestamp::Present(lsn) => format!("{lsn}"),
|
||||
LsnForTimestamp::Future(_lsn) => "future".into(),
|
||||
LsnForTimestamp::Past(_lsn) => "past".into(),
|
||||
LsnForTimestamp::NoData(_lsn) => "nodata".into(),
|
||||
};
|
||||
json_response(StatusCode::OK, result)
|
||||
#[derive(serde::Serialize)]
|
||||
struct Result {
|
||||
lsn: Lsn,
|
||||
kind: &'static str,
|
||||
}
|
||||
let (lsn, kind) = match result {
|
||||
LsnForTimestamp::Present(lsn) => (lsn, "present"),
|
||||
LsnForTimestamp::Future(lsn) => (lsn, "future"),
|
||||
LsnForTimestamp::Past(lsn) => (lsn, "past"),
|
||||
LsnForTimestamp::NoData(lsn) => (lsn, "nodata"),
|
||||
};
|
||||
json_response(StatusCode::OK, Result { lsn, kind })
|
||||
}
|
||||
|
||||
async fn get_timestamp_of_lsn_handler(
|
||||
@@ -845,12 +830,11 @@ async fn tenant_list_handler(
|
||||
ApiError::ResourceUnavailable("Tenant map is initializing or shutting down".into())
|
||||
})?
|
||||
.iter()
|
||||
.map(|(id, state, gen)| TenantInfo {
|
||||
.map(|(id, state)| TenantInfo {
|
||||
id: *id,
|
||||
state: state.clone(),
|
||||
current_physical_size: None,
|
||||
attachment_status: state.attachment_status(),
|
||||
generation: (*gen).into(),
|
||||
})
|
||||
.collect::<Vec<TenantInfo>>();
|
||||
|
||||
@@ -880,7 +864,6 @@ async fn tenant_status(
|
||||
state: state.clone(),
|
||||
current_physical_size: Some(current_physical_size),
|
||||
attachment_status: state.attachment_status(),
|
||||
generation: tenant.generation().into(),
|
||||
},
|
||||
timelines: tenant.list_timeline_ids(),
|
||||
})
|
||||
|
||||
@@ -522,14 +522,18 @@ pub(crate) mod initial_logical_size {
|
||||
impl StartCalculation {
|
||||
pub(crate) fn first(&self, circumstances: StartCircumstances) -> OngoingCalculationGuard {
|
||||
let circumstances_label: &'static str = circumstances.into();
|
||||
self.0.with_label_values(&["first", circumstances_label]);
|
||||
self.0
|
||||
.with_label_values(&["first", circumstances_label])
|
||||
.inc();
|
||||
OngoingCalculationGuard {
|
||||
inc_drop_calculation: Some(DROP_CALCULATION.first.clone()),
|
||||
}
|
||||
}
|
||||
pub(crate) fn retry(&self, circumstances: StartCircumstances) -> OngoingCalculationGuard {
|
||||
let circumstances_label: &'static str = circumstances.into();
|
||||
self.0.with_label_values(&["retry", circumstances_label]);
|
||||
self.0
|
||||
.with_label_values(&["retry", circumstances_label])
|
||||
.inc();
|
||||
OngoingCalculationGuard {
|
||||
inc_drop_calculation: Some(DROP_CALCULATION.retry.clone()),
|
||||
}
|
||||
@@ -1019,12 +1023,62 @@ static SMGR_QUERY_TIME_PER_TENANT_TIMELINE: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static SMGR_QUERY_TIME_GLOBAL_BUCKETS: Lazy<Vec<f64>> = Lazy::new(|| {
|
||||
[
|
||||
1,
|
||||
10,
|
||||
20,
|
||||
40,
|
||||
60,
|
||||
80,
|
||||
100,
|
||||
200,
|
||||
300,
|
||||
400,
|
||||
500,
|
||||
600,
|
||||
700,
|
||||
800,
|
||||
900,
|
||||
1_000, // 1ms
|
||||
2_000,
|
||||
4_000,
|
||||
6_000,
|
||||
8_000,
|
||||
10_000, // 10ms
|
||||
20_000,
|
||||
40_000,
|
||||
60_000,
|
||||
80_000,
|
||||
100_000,
|
||||
200_000,
|
||||
400_000,
|
||||
600_000,
|
||||
800_000,
|
||||
1_000_000, // 1s
|
||||
2_000_000,
|
||||
4_000_000,
|
||||
6_000_000,
|
||||
8_000_000,
|
||||
10_000_000, // 10s
|
||||
20_000_000,
|
||||
50_000_000,
|
||||
100_000_000,
|
||||
200_000_000,
|
||||
1_000_000_000, // 1000s
|
||||
]
|
||||
.into_iter()
|
||||
.map(Duration::from_micros)
|
||||
.map(|d| d.as_secs_f64())
|
||||
.collect()
|
||||
});
|
||||
|
||||
static SMGR_QUERY_TIME_GLOBAL: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
register_histogram_vec!(
|
||||
"pageserver_smgr_query_seconds_global",
|
||||
"Time spent on smgr query handling, aggregated by query type.",
|
||||
&["smgr_query_type"],
|
||||
CRITICAL_OP_BUCKETS.into(),
|
||||
SMGR_QUERY_TIME_GLOBAL_BUCKETS.clone(),
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
@@ -1776,7 +1776,6 @@ pub fn is_inherited_key(key: Key) -> bool {
|
||||
key != AUX_FILES_KEY
|
||||
}
|
||||
|
||||
/// Guaranteed to return `Ok()` if [[is_rel_block_key]] returns `true` for `key`.
|
||||
pub fn key_to_rel_block(key: Key) -> anyhow::Result<(RelTag, BlockNumber)> {
|
||||
Ok(match key.field1 {
|
||||
0x00 => (
|
||||
@@ -1791,6 +1790,7 @@ pub fn key_to_rel_block(key: Key) -> anyhow::Result<(RelTag, BlockNumber)> {
|
||||
_ => anyhow::bail!("unexpected value kind 0x{:02x}", key.field1),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn is_rel_fsm_block_key(key: Key) -> bool {
|
||||
key.field1 == 0x00 && key.field4 != 0 && key.field5 == FSM_FORKNUM && key.field6 != 0xffffffff
|
||||
}
|
||||
|
||||
@@ -1915,10 +1915,6 @@ impl Tenant {
|
||||
self.current_state() == TenantState::Active
|
||||
}
|
||||
|
||||
pub fn generation(&self) -> Generation {
|
||||
self.generation
|
||||
}
|
||||
|
||||
/// Changes tenant status to active, unless shutdown was already requested.
|
||||
///
|
||||
/// `background_jobs_can_start` is an optional barrier set to a value during pageserver startup
|
||||
|
||||
@@ -514,10 +514,7 @@ pub async fn init_tenant_mgr(
|
||||
&ctx,
|
||||
) {
|
||||
Ok(tenant) => {
|
||||
tenants.insert(
|
||||
TenantShardId::unsharded(tenant.tenant_id()),
|
||||
TenantSlot::Attached(tenant),
|
||||
);
|
||||
tenants.insert(tenant_shard_id, TenantSlot::Attached(tenant));
|
||||
}
|
||||
Err(e) => {
|
||||
error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Failed to start tenant: {e:#}");
|
||||
@@ -962,35 +959,27 @@ impl TenantManager {
|
||||
}
|
||||
|
||||
let tenant_path = self.conf.tenant_path(&tenant_shard_id);
|
||||
let timelines_path = self.conf.timelines_path(&tenant_shard_id);
|
||||
|
||||
// Directory structure is the same for attached and secondary modes:
|
||||
// create it if it doesn't exist. Timeline load/creation expects the
|
||||
// timelines/ subdir to already exist.
|
||||
//
|
||||
// Does not need to be fsync'd because local storage is just a cache.
|
||||
tokio::fs::create_dir_all(&timelines_path)
|
||||
.await
|
||||
.with_context(|| format!("Creating {timelines_path}"))?;
|
||||
|
||||
// Before activating either secondary or attached mode, persist the
|
||||
// configuration, so that on restart we will re-attach (or re-start
|
||||
// secondary) on the tenant.
|
||||
Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
|
||||
.await
|
||||
.map_err(SetNewTenantConfigError::Persist)?;
|
||||
|
||||
let new_slot = match &new_location_config.mode {
|
||||
LocationMode::Secondary(_) => {
|
||||
// Directory doesn't need to be fsync'd because if we crash it can
|
||||
// safely be recreated next time this tenant location is configured.
|
||||
tokio::fs::create_dir_all(&tenant_path)
|
||||
.await
|
||||
.with_context(|| format!("Creating {tenant_path}"))?;
|
||||
|
||||
Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
|
||||
.await
|
||||
.map_err(SetNewTenantConfigError::Persist)?;
|
||||
|
||||
TenantSlot::Secondary
|
||||
}
|
||||
LocationMode::Secondary(_) => TenantSlot::Secondary,
|
||||
LocationMode::Attached(_attach_config) => {
|
||||
let timelines_path = self.conf.timelines_path(&tenant_shard_id);
|
||||
|
||||
// Directory doesn't need to be fsync'd because we do not depend on
|
||||
// it to exist after crashes: it may be recreated when tenant is
|
||||
// re-attached, see https://github.com/neondatabase/neon/issues/5550
|
||||
tokio::fs::create_dir_all(&tenant_path)
|
||||
.await
|
||||
.with_context(|| format!("Creating {timelines_path}"))?;
|
||||
|
||||
Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
|
||||
.await
|
||||
.map_err(SetNewTenantConfigError::Persist)?;
|
||||
|
||||
let shard_identity = new_location_config.shard;
|
||||
let tenant = tenant_spawn(
|
||||
self.conf,
|
||||
@@ -1511,8 +1500,8 @@ pub(crate) enum TenantMapListError {
|
||||
///
|
||||
/// Get list of tenants, for the mgmt API
|
||||
///
|
||||
pub(crate) async fn list_tenants(
|
||||
) -> Result<Vec<(TenantShardId, TenantState, Generation)>, TenantMapListError> {
|
||||
pub(crate) async fn list_tenants() -> Result<Vec<(TenantShardId, TenantState)>, TenantMapListError>
|
||||
{
|
||||
let tenants = TENANTS.read().unwrap();
|
||||
let m = match &*tenants {
|
||||
TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
|
||||
@@ -1520,9 +1509,7 @@ pub(crate) async fn list_tenants(
|
||||
};
|
||||
Ok(m.iter()
|
||||
.filter_map(|(id, tenant)| match tenant {
|
||||
TenantSlot::Attached(tenant) => {
|
||||
Some((*id, tenant.current_state(), tenant.generation()))
|
||||
}
|
||||
TenantSlot::Attached(tenant) => Some((*id, tenant.current_state())),
|
||||
TenantSlot::Secondary => None,
|
||||
TenantSlot::InProgress(_) => None,
|
||||
})
|
||||
|
||||
@@ -878,6 +878,23 @@ impl LayerInner {
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
let consecutive_failures =
|
||||
this.consecutive_failures.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
let backoff = utils::backoff::exponential_backoff_duration_seconds(
|
||||
consecutive_failures.min(u32::MAX as usize) as u32,
|
||||
1.5,
|
||||
60.0,
|
||||
);
|
||||
|
||||
let backoff = std::time::Duration::from_secs_f64(backoff);
|
||||
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(backoff) => {},
|
||||
_ = crate::task_mgr::shutdown_token().cancelled_owned() => {},
|
||||
_ = timeline.cancel.cancelled() => {},
|
||||
};
|
||||
|
||||
Err(e)
|
||||
}
|
||||
};
|
||||
@@ -926,21 +943,9 @@ impl LayerInner {
|
||||
Ok(permit)
|
||||
}
|
||||
Ok((Err(e), _permit)) => {
|
||||
// FIXME: this should be with the spawned task and be cancellation sensitive
|
||||
//
|
||||
// while we should not need this, this backoff has turned out to be useful with
|
||||
// a bug of unexpectedly deleted remote layer file (#5787).
|
||||
let consecutive_failures =
|
||||
self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
|
||||
// sleep already happened in the spawned task, if it was not cancelled
|
||||
let consecutive_failures = self.consecutive_failures.load(Ordering::Relaxed);
|
||||
tracing::error!(consecutive_failures, "layer file download failed: {e:#}");
|
||||
let backoff = utils::backoff::exponential_backoff_duration_seconds(
|
||||
consecutive_failures.min(u32::MAX as usize) as u32,
|
||||
1.5,
|
||||
60.0,
|
||||
);
|
||||
let backoff = std::time::Duration::from_secs_f64(backoff);
|
||||
|
||||
tokio::time::sleep(backoff).await;
|
||||
Err(DownloadError::DownloadFailed)
|
||||
}
|
||||
Err(_gone) => Err(DownloadError::DownloadCancelled),
|
||||
|
||||
@@ -138,7 +138,7 @@ pub(super) async fn connection_manager_loop_step(
|
||||
Ok(Some(broker_update)) => connection_manager_state.register_timeline_update(broker_update),
|
||||
Err(status) => {
|
||||
match status.code() {
|
||||
Code::Unknown if status.message().contains("stream closed because of a broken pipe") => {
|
||||
Code::Unknown if status.message().contains("stream closed because of a broken pipe") || status.message().contains("connection reset") => {
|
||||
// tonic's error handling doesn't provide a clear code for disconnections: we get
|
||||
// "h2 protocol error: error reading a body from connection: stream closed because of a broken pipe"
|
||||
info!("broker disconnected: {status}");
|
||||
|
||||
@@ -19,20 +19,21 @@
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#include "postgres.h"
|
||||
|
||||
#include <curl/curl.h>
|
||||
|
||||
#include "access/xact.h"
|
||||
#include "commands/defrem.h"
|
||||
#include "fmgr.h"
|
||||
#include "libpq/crypt.h"
|
||||
#include "miscadmin.h"
|
||||
#include "tcop/pquery.h"
|
||||
#include "tcop/utility.h"
|
||||
#include "access/xact.h"
|
||||
#include "utils/acl.h"
|
||||
#include "utils/guc.h"
|
||||
#include "utils/hsearch.h"
|
||||
#include "utils/memutils.h"
|
||||
#include "commands/defrem.h"
|
||||
#include "miscadmin.h"
|
||||
#include "utils/acl.h"
|
||||
#include "fmgr.h"
|
||||
#include "utils/guc.h"
|
||||
#include "port.h"
|
||||
#include <curl/curl.h>
|
||||
#include "utils/jsonb.h"
|
||||
#include "libpq/crypt.h"
|
||||
|
||||
static ProcessUtility_hook_type PreviousProcessUtilityHook = NULL;
|
||||
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* extension_server.c
|
||||
@@ -10,21 +9,11 @@
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#include "postgres.h"
|
||||
#include "tcop/pquery.h"
|
||||
#include "tcop/utility.h"
|
||||
#include "access/xact.h"
|
||||
#include "utils/hsearch.h"
|
||||
#include "utils/memutils.h"
|
||||
#include "commands/defrem.h"
|
||||
#include "miscadmin.h"
|
||||
#include "utils/acl.h"
|
||||
#include "fmgr.h"
|
||||
#include "utils/guc.h"
|
||||
#include "port.h"
|
||||
#include "fmgr.h"
|
||||
|
||||
#include <curl/curl.h>
|
||||
|
||||
#include "utils/guc.h"
|
||||
|
||||
static int extension_server_port = 0;
|
||||
|
||||
static download_extension_file_hook_type prev_download_extension_file_hook = NULL;
|
||||
|
||||
@@ -13,32 +13,30 @@
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "postgres.h"
|
||||
|
||||
#include <sys/file.h>
|
||||
#include <unistd.h>
|
||||
#include <fcntl.h>
|
||||
|
||||
#include "postgres.h"
|
||||
|
||||
#include "neon_pgversioncompat.h"
|
||||
|
||||
#include "access/parallel.h"
|
||||
#include "funcapi.h"
|
||||
#include "miscadmin.h"
|
||||
#include "pgstat.h"
|
||||
#include "pagestore_client.h"
|
||||
#include "access/parallel.h"
|
||||
#include "pgstat.h"
|
||||
#include "postmaster/bgworker.h"
|
||||
#include RELFILEINFO_HDR
|
||||
#include "storage/buf_internals.h"
|
||||
#include "storage/latch.h"
|
||||
#include "storage/fd.h"
|
||||
#include "storage/ipc.h"
|
||||
#include "storage/latch.h"
|
||||
#include "storage/lwlock.h"
|
||||
#include "storage/pg_shmem.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/dynahash.h"
|
||||
#include "utils/guc.h"
|
||||
#include "storage/fd.h"
|
||||
#include "storage/pg_shmem.h"
|
||||
#include "storage/buf_internals.h"
|
||||
#include "pgstat.h"
|
||||
|
||||
/*
|
||||
* Local file cache is used to temporary store relations pages in local file system.
|
||||
@@ -102,8 +100,6 @@ static shmem_request_hook_type prev_shmem_request_hook;
|
||||
|
||||
#define LFC_ENABLED() (lfc_ctl->limit != 0)
|
||||
|
||||
void PGDLLEXPORT FileCacheMonitorMain(Datum main_arg);
|
||||
|
||||
/*
|
||||
* Local file cache is optional and Neon can work without it.
|
||||
* In case of any any errors with this cache, we should disable it but to not throw error.
|
||||
|
||||
@@ -14,28 +14,24 @@
|
||||
*/
|
||||
#include "postgres.h"
|
||||
|
||||
#include "pagestore_client.h"
|
||||
#include "fmgr.h"
|
||||
#include "access/xlog.h"
|
||||
#include "access/xlogutils.h"
|
||||
#include "storage/buf_internals.h"
|
||||
#include "storage/lwlock.h"
|
||||
#include "storage/ipc.h"
|
||||
#include "storage/pg_shmem.h"
|
||||
#include "c.h"
|
||||
#include "postmaster/interrupt.h"
|
||||
|
||||
#include "fmgr.h"
|
||||
#include "libpq-fe.h"
|
||||
#include "libpq/pqformat.h"
|
||||
#include "libpq/libpq.h"
|
||||
|
||||
#include "libpq/pqformat.h"
|
||||
#include "miscadmin.h"
|
||||
#include "pgstat.h"
|
||||
#include "postmaster/interrupt.h"
|
||||
#include "storage/buf_internals.h"
|
||||
#include "storage/ipc.h"
|
||||
#include "storage/lwlock.h"
|
||||
#include "storage/pg_shmem.h"
|
||||
#include "utils/guc.h"
|
||||
|
||||
#include "neon.h"
|
||||
#include "walproposer.h"
|
||||
#include "neon_utils.h"
|
||||
#include "pagestore_client.h"
|
||||
#include "walproposer.h"
|
||||
|
||||
#define PageStoreTrace DEBUG5
|
||||
|
||||
@@ -62,8 +58,8 @@ char *neon_auth_token;
|
||||
int readahead_buffer_size = 128;
|
||||
int flush_every_n_requests = 8;
|
||||
|
||||
int n_reconnect_attempts = 0;
|
||||
int max_reconnect_attempts = 60;
|
||||
static int n_reconnect_attempts = 0;
|
||||
static int max_reconnect_attempts = 60;
|
||||
|
||||
#define MAX_PAGESERVER_CONNSTRING_SIZE 256
|
||||
|
||||
@@ -83,8 +79,6 @@ static PagestoreShmemState *pagestore_shared;
|
||||
static uint64 pagestore_local_counter = 0;
|
||||
static char local_pageserver_connstring[MAX_PAGESERVER_CONNSTRING_SIZE];
|
||||
|
||||
bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = NULL;
|
||||
|
||||
static bool pageserver_flush(void);
|
||||
static void pageserver_disconnect(void);
|
||||
|
||||
@@ -627,8 +621,6 @@ pg_init_libpagestore(void)
|
||||
smgr_hook = smgr_neon;
|
||||
smgr_init_hook = smgr_init_neon;
|
||||
dbsize_hook = neon_dbsize;
|
||||
old_redo_read_buffer_filter = redo_read_buffer_filter;
|
||||
redo_read_buffer_filter = neon_redo_read_buffer_filter;
|
||||
}
|
||||
|
||||
lfc_init();
|
||||
|
||||
@@ -27,13 +27,6 @@ extern void pg_init_walproposer(void);
|
||||
|
||||
extern void pg_init_extension_server(void);
|
||||
|
||||
/*
|
||||
* Returns true if we shouldn't do REDO on that block in record indicated by
|
||||
* block_id; false otherwise.
|
||||
*/
|
||||
extern bool neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id);
|
||||
extern bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id);
|
||||
|
||||
extern uint64 BackpressureThrottlingTime(void);
|
||||
extern void replication_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr *applyLsn);
|
||||
|
||||
|
||||
@@ -3,33 +3,8 @@
|
||||
|
||||
#include "postgres.h"
|
||||
|
||||
#include "access/timeline.h"
|
||||
#include "access/xlogutils.h"
|
||||
#include "common/logging.h"
|
||||
#include "common/ip.h"
|
||||
#include "funcapi.h"
|
||||
#include "libpq/libpq.h"
|
||||
#include "lib/stringinfo.h"
|
||||
#include "libpq/pqformat.h"
|
||||
#include "miscadmin.h"
|
||||
#include "postmaster/interrupt.h"
|
||||
#include "replication/slot.h"
|
||||
#include "replication/walsender_private.h"
|
||||
|
||||
#include "storage/ipc.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/ps_status.h"
|
||||
|
||||
#include "libpq-fe.h"
|
||||
#include <netinet/tcp.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
#include "access/xlogutils.h"
|
||||
#include "access/xlogrecovery.h"
|
||||
#endif
|
||||
#if PG_MAJORVERSION_NUM >= 16
|
||||
#include "utils/guc.h"
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Convert a character which represents a hexadecimal digit to an integer.
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
#ifndef __NEON_UTILS_H__
|
||||
#define __NEON_UTILS_H__
|
||||
|
||||
#include "postgres.h"
|
||||
|
||||
bool HexDecodeString(uint8 *result, char *input, int nbytes);
|
||||
uint32 pq_getmsgint32_le(StringInfo msg);
|
||||
uint64 pq_getmsgint64_le(StringInfo msg);
|
||||
|
||||
@@ -13,19 +13,16 @@
|
||||
#ifndef pageserver_h
|
||||
#define pageserver_h
|
||||
|
||||
#include "postgres.h"
|
||||
#include "neon_pgversioncompat.h"
|
||||
|
||||
#include "access/xlogdefs.h"
|
||||
#include RELFILEINFO_HDR
|
||||
#include "storage/block.h"
|
||||
#include "storage/smgr.h"
|
||||
#include "lib/stringinfo.h"
|
||||
#include "libpq/pqformat.h"
|
||||
#include "storage/block.h"
|
||||
#include "storage/smgr.h"
|
||||
#include "utils/memutils.h"
|
||||
|
||||
#include "pg_config.h"
|
||||
|
||||
typedef enum
|
||||
{
|
||||
/* pagestore_client -> pagestore */
|
||||
@@ -158,11 +155,8 @@ extern page_server_api *page_server;
|
||||
extern char *page_server_connstring;
|
||||
extern int flush_every_n_requests;
|
||||
extern int readahead_buffer_size;
|
||||
extern bool seqscan_prefetch_enabled;
|
||||
extern int seqscan_prefetch_distance;
|
||||
extern char *neon_timeline;
|
||||
extern char *neon_tenant;
|
||||
extern bool wal_redo;
|
||||
extern int32 max_cluster_size;
|
||||
|
||||
extern const f_smgr *smgr_neon(BackendId backend, NRelFileInfo rinfo);
|
||||
|
||||
@@ -47,25 +47,26 @@
|
||||
|
||||
#include "access/xact.h"
|
||||
#include "access/xlog.h"
|
||||
#include "access/xlogdefs.h"
|
||||
#include "access/xloginsert.h"
|
||||
#include "access/xlog_internal.h"
|
||||
#include "access/xlogdefs.h"
|
||||
#include "access/xlogutils.h"
|
||||
#include "catalog/pg_class.h"
|
||||
#include "common/hashfn.h"
|
||||
#include "executor/instrument.h"
|
||||
#include "pagestore_client.h"
|
||||
#include "postmaster/interrupt.h"
|
||||
#include "pgstat.h"
|
||||
#include "postmaster/autovacuum.h"
|
||||
#include "postmaster/interrupt.h"
|
||||
#include "replication/walsender.h"
|
||||
#include "storage/bufmgr.h"
|
||||
#include "storage/buf_internals.h"
|
||||
#include "storage/fsm_internals.h"
|
||||
#include "storage/smgr.h"
|
||||
#include "storage/md.h"
|
||||
#include "pgstat.h"
|
||||
#include "storage/smgr.h"
|
||||
|
||||
#include "pagestore_client.h"
|
||||
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
#include "access/xlogutils.h"
|
||||
#include "access/xlogrecovery.h"
|
||||
#endif
|
||||
|
||||
@@ -106,6 +107,9 @@ typedef enum
|
||||
static SMgrRelation unlogged_build_rel = NULL;
|
||||
static UnloggedBuildPhase unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
|
||||
|
||||
static bool neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id);
|
||||
static bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = NULL;
|
||||
|
||||
/*
|
||||
* Prefetch implementation:
|
||||
*
|
||||
@@ -239,7 +243,7 @@ typedef struct PrefetchState
|
||||
PrefetchRequest prf_buffer[]; /* prefetch buffers */
|
||||
} PrefetchState;
|
||||
|
||||
PrefetchState *MyPState;
|
||||
static PrefetchState *MyPState;
|
||||
|
||||
#define GetPrfSlot(ring_index) ( \
|
||||
( \
|
||||
@@ -257,7 +261,7 @@ PrefetchState *MyPState;
|
||||
) \
|
||||
)
|
||||
|
||||
XLogRecPtr prefetch_lsn = 0;
|
||||
static XLogRecPtr prefetch_lsn = 0;
|
||||
|
||||
static bool compact_prefetch_buffers(void);
|
||||
static void consume_prefetch_responses(void);
|
||||
@@ -1371,6 +1375,9 @@ neon_init(void)
|
||||
MyPState->prf_hash = prfh_create(MyPState->hashctx,
|
||||
readahead_buffer_size, NULL);
|
||||
|
||||
old_redo_read_buffer_filter = redo_read_buffer_filter;
|
||||
redo_read_buffer_filter = neon_redo_read_buffer_filter;
|
||||
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
mdinit();
|
||||
#endif
|
||||
@@ -2869,7 +2876,7 @@ get_fsm_physical_block(BlockNumber heapblk)
|
||||
* contents, where with REDO locking it would wait on block 1 and see
|
||||
* block 3 with post-REDO contents only.
|
||||
*/
|
||||
bool
|
||||
static bool
|
||||
neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
|
||||
{
|
||||
XLogRecPtr end_recptr = record->EndRecPtr;
|
||||
|
||||
@@ -1,14 +1,12 @@
|
||||
#ifndef __NEON_WALPROPOSER_H__
|
||||
#define __NEON_WALPROPOSER_H__
|
||||
|
||||
#include "postgres.h"
|
||||
#include "access/xlogdefs.h"
|
||||
#include "port.h"
|
||||
#include "access/xlog_internal.h"
|
||||
#include "access/transam.h"
|
||||
#include "access/xlogdefs.h"
|
||||
#include "access/xlog_internal.h"
|
||||
#include "nodes/replnodes.h"
|
||||
#include "utils/uuid.h"
|
||||
#include "replication/walreceiver.h"
|
||||
#include "utils/uuid.h"
|
||||
|
||||
#define SK_MAGIC 0xCafeCeefu
|
||||
#define SK_PROTOCOL_VERSION 2
|
||||
|
||||
@@ -3,11 +3,13 @@
|
||||
* This is needed to avoid linking to full postgres server installation. This file
|
||||
* is compiled as a part of libwalproposer static library.
|
||||
*/
|
||||
#include "postgres.h"
|
||||
|
||||
#include <stdio.h>
|
||||
#include "walproposer.h"
|
||||
#include "utils/datetime.h"
|
||||
|
||||
#include "miscadmin.h"
|
||||
#include "utils/datetime.h"
|
||||
#include "walproposer.h"
|
||||
|
||||
void
|
||||
ExceptionalCondition(const char *conditionName,
|
||||
|
||||
@@ -1482,6 +1482,21 @@ walprop_pg_wait_event_set(WalProposer *wp, long timeout, Safekeeper **sk, uint32
|
||||
#if PG_MAJORVERSION_NUM >= 16
|
||||
if (WalSndCtl != NULL)
|
||||
ConditionVariablePrepareToSleep(&WalSndCtl->wal_flush_cv);
|
||||
|
||||
/*
|
||||
* Now that we prepared the condvar, check flush ptr again -- it might have
|
||||
* changed before we subscribed to cv so we missed the wakeup.
|
||||
*
|
||||
* Do that only when we're interested in new WAL: without sync-safekeepers
|
||||
* and if election already passed.
|
||||
*/
|
||||
if (!wp->config->syncSafekeepers && wp->availableLsn != InvalidXLogRecPtr && GetFlushRecPtr(NULL) > wp->availableLsn)
|
||||
{
|
||||
ConditionVariableCancelSleep();
|
||||
ResetLatch(MyLatch);
|
||||
*events = WL_LATCH_SET;
|
||||
return 1;
|
||||
}
|
||||
#endif
|
||||
|
||||
/*
|
||||
@@ -1697,9 +1712,9 @@ walprop_pg_after_election(WalProposer *wp)
|
||||
f = fopen("restart.lsn", "rb");
|
||||
if (f != NULL && !wp->config->syncSafekeepers)
|
||||
{
|
||||
fread(&lrRestartLsn, sizeof(lrRestartLsn), 1, f);
|
||||
size_t rc = fread(&lrRestartLsn, sizeof(lrRestartLsn), 1, f);
|
||||
fclose(f);
|
||||
if (lrRestartLsn != InvalidXLogRecPtr)
|
||||
if (rc == 1 && lrRestartLsn != InvalidXLogRecPtr)
|
||||
{
|
||||
elog(LOG, "Logical replication restart LSN %X/%X", LSN_FORMAT_ARGS(lrRestartLsn));
|
||||
|
||||
|
||||
@@ -87,6 +87,10 @@ impl AuthError {
|
||||
pub fn too_many_connections() -> Self {
|
||||
AuthErrorImpl::TooManyConnections.into()
|
||||
}
|
||||
|
||||
pub fn is_auth_failed(&self) -> bool {
|
||||
matches!(self.0.as_ref(), AuthErrorImpl::AuthFailed(_))
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: Into<AuthErrorImpl>> From<E> for AuthError {
|
||||
|
||||
@@ -192,14 +192,46 @@ async fn auth_quirks(
|
||||
if !check_peer_addr_is_in_list(&info.inner.peer_addr, &allowed_ips) {
|
||||
return Err(auth::AuthError::ip_address_not_allowed());
|
||||
}
|
||||
let secret = api.get_role_secret(extra, &info).await?.unwrap_or_else(|| {
|
||||
let cached_secret = api.get_role_secret(extra, &info).await?;
|
||||
|
||||
let secret = cached_secret.clone().unwrap_or_else(|| {
|
||||
// If we don't have an authentication secret, we mock one to
|
||||
// prevent malicious probing (possible due to missing protocol steps).
|
||||
// This mocked secret will never lead to successful authentication.
|
||||
info!("authentication info not found, mocking it");
|
||||
AuthSecret::Scram(scram::ServerSecret::mock(&info.inner.user, rand::random()))
|
||||
});
|
||||
match authenticate_with_secret(
|
||||
secret,
|
||||
info,
|
||||
client,
|
||||
unauthenticated_password,
|
||||
allow_cleartext,
|
||||
config,
|
||||
latency_timer,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(keys) => Ok(keys),
|
||||
Err(e) => {
|
||||
if e.is_auth_failed() {
|
||||
// The password could have been changed, so we invalidate the cache.
|
||||
cached_secret.invalidate();
|
||||
}
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn authenticate_with_secret(
|
||||
secret: AuthSecret,
|
||||
info: ComputeUserInfo,
|
||||
client: &mut stream::PqStream<Stream<impl AsyncRead + AsyncWrite + Unpin>>,
|
||||
unauthenticated_password: Option<Vec<u8>>,
|
||||
allow_cleartext: bool,
|
||||
config: &'static AuthenticationConfig,
|
||||
latency_timer: &mut LatencyTimer,
|
||||
) -> auth::Result<ComputeCredentials<ComputeCredentialKeys>> {
|
||||
if let Some(password) = unauthenticated_password {
|
||||
let auth_outcome = validate_password_and_exchange(&password, secret)?;
|
||||
let keys = match auth_outcome {
|
||||
|
||||
@@ -254,6 +254,7 @@ pub type NodeInfoCache = TimedLru<Arc<str>, NodeInfo>;
|
||||
pub type CachedNodeInfo = timed_lru::Cached<&'static NodeInfoCache>;
|
||||
pub type AllowedIpsCache = TimedLru<SmolStr, Arc<Vec<String>>>;
|
||||
pub type RoleSecretCache = TimedLru<(SmolStr, SmolStr), Option<AuthSecret>>;
|
||||
pub type CachedRoleSecret = timed_lru::Cached<&'static RoleSecretCache>;
|
||||
|
||||
/// This will allocate per each call, but the http requests alone
|
||||
/// already require a few allocations, so it should be fine.
|
||||
@@ -264,7 +265,7 @@ pub trait Api {
|
||||
&self,
|
||||
extra: &ConsoleReqExtra,
|
||||
creds: &ComputeUserInfo,
|
||||
) -> Result<Option<AuthSecret>, errors::GetAuthInfoError>;
|
||||
) -> Result<CachedRoleSecret, errors::GetAuthInfoError>;
|
||||
|
||||
async fn get_allowed_ips(
|
||||
&self,
|
||||
|
||||
@@ -6,6 +6,7 @@ use super::{
|
||||
errors::{ApiError, GetAuthInfoError, WakeComputeError},
|
||||
AuthInfo, AuthSecret, CachedNodeInfo, ConsoleReqExtra, NodeInfo,
|
||||
};
|
||||
use crate::console::provider::CachedRoleSecret;
|
||||
use crate::{auth::backend::ComputeUserInfo, compute, error::io_error, scram, url::ApiUrl};
|
||||
use async_trait::async_trait;
|
||||
use futures::TryFutureExt;
|
||||
@@ -146,8 +147,10 @@ impl super::Api for Api {
|
||||
&self,
|
||||
_extra: &ConsoleReqExtra,
|
||||
creds: &ComputeUserInfo,
|
||||
) -> Result<Option<AuthSecret>, GetAuthInfoError> {
|
||||
Ok(self.do_get_auth_info(creds).await?.secret)
|
||||
) -> Result<CachedRoleSecret, GetAuthInfoError> {
|
||||
Ok(CachedRoleSecret::new_uncached(
|
||||
self.do_get_auth_info(creds).await?.secret,
|
||||
))
|
||||
}
|
||||
|
||||
async fn get_allowed_ips(
|
||||
|
||||
@@ -3,7 +3,8 @@
|
||||
use super::{
|
||||
super::messages::{ConsoleError, GetRoleSecret, WakeCompute},
|
||||
errors::{ApiError, GetAuthInfoError, WakeComputeError},
|
||||
ApiCaches, ApiLocks, AuthInfo, AuthSecret, CachedNodeInfo, ConsoleReqExtra, NodeInfo,
|
||||
ApiCaches, ApiLocks, AuthInfo, AuthSecret, CachedNodeInfo, CachedRoleSecret, ConsoleReqExtra,
|
||||
NodeInfo,
|
||||
};
|
||||
use crate::metrics::{ALLOWED_IPS_BY_CACHE_OUTCOME, ALLOWED_IPS_NUMBER};
|
||||
use crate::{auth::backend::ComputeUserInfo, compute, http, scram};
|
||||
@@ -163,20 +164,21 @@ impl super::Api for Api {
|
||||
&self,
|
||||
extra: &ConsoleReqExtra,
|
||||
creds: &ComputeUserInfo,
|
||||
) -> Result<Option<AuthSecret>, GetAuthInfoError> {
|
||||
) -> Result<CachedRoleSecret, GetAuthInfoError> {
|
||||
let ep = creds.endpoint.clone();
|
||||
let user = creds.inner.user.clone();
|
||||
if let Some(role_secret) = self.caches.role_secret.get(&(ep.clone(), user.clone())) {
|
||||
return Ok(role_secret.clone());
|
||||
return Ok(role_secret);
|
||||
}
|
||||
let auth_info = self.do_get_auth_info(extra, creds).await?;
|
||||
self.caches
|
||||
let (_, secret) = self
|
||||
.caches
|
||||
.role_secret
|
||||
.insert((ep.clone(), user), auth_info.secret.clone());
|
||||
self.caches
|
||||
.allowed_ips
|
||||
.insert(ep, Arc::new(auth_info.allowed_ips));
|
||||
Ok(auth_info.secret)
|
||||
Ok(secret)
|
||||
}
|
||||
|
||||
async fn get_allowed_ips(
|
||||
|
||||
@@ -1,9 +1,12 @@
|
||||
use std::collections::HashSet;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
|
||||
use anyhow::Context;
|
||||
use aws_sdk_s3::{types::ObjectIdentifier, Client};
|
||||
use pageserver::tenant::remote_timeline_client::index::IndexLayerMetadata;
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
use tracing::{error, info, warn};
|
||||
use utils::generation::Generation;
|
||||
use utils::id::TimelineId;
|
||||
|
||||
use crate::cloud_admin_api::BranchData;
|
||||
use crate::metadata_stream::stream_listing;
|
||||
@@ -40,7 +43,7 @@ impl TimelineAnalysis {
|
||||
|
||||
pub(crate) fn branch_cleanup_and_check_errors(
|
||||
id: &TenantShardTimelineId,
|
||||
s3_root: &RootTarget,
|
||||
tenant_objects: &mut TenantObjectListing,
|
||||
s3_active_branch: Option<&BranchData>,
|
||||
console_branch: Option<BranchData>,
|
||||
s3_data: Option<S3TimelineBlobData>,
|
||||
@@ -72,8 +75,8 @@ pub(crate) fn branch_cleanup_and_check_errors(
|
||||
match s3_data.blob_data {
|
||||
BlobDataParseResult::Parsed {
|
||||
index_part,
|
||||
index_part_generation,
|
||||
mut s3_layers,
|
||||
index_part_generation: _index_part_generation,
|
||||
s3_layers: _s3_layers,
|
||||
} => {
|
||||
if !IndexPart::KNOWN_VERSIONS.contains(&index_part.get_version()) {
|
||||
result.errors.push(format!(
|
||||
@@ -111,65 +114,19 @@ pub(crate) fn branch_cleanup_and_check_errors(
|
||||
))
|
||||
}
|
||||
|
||||
let layer_map_key = (layer, metadata.generation);
|
||||
if !s3_layers.remove(&layer_map_key) {
|
||||
if !tenant_objects.check_ref(id.timeline_id, &layer, &metadata) {
|
||||
// FIXME: this will emit false positives if an index was
|
||||
// uploaded concurrently with our scan. To make this check
|
||||
// correct, we need to try sending a HEAD request for the
|
||||
// layer we think is missing.
|
||||
result.errors.push(format!(
|
||||
"index_part.json contains a layer {}{} that is not present in remote storage",
|
||||
layer_map_key.0.file_name(),
|
||||
layer_map_key.1.get_suffix()
|
||||
"index_part.json contains a layer {}{} (shard {}) that is not present in remote storage",
|
||||
layer.file_name(),
|
||||
metadata.generation.get_suffix(),
|
||||
metadata.shard
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
let orphan_layers: Vec<(LayerFileName, Generation)> = s3_layers
|
||||
.into_iter()
|
||||
.filter(|(_layer_name, gen)|
|
||||
// A layer is only considered orphaned if it has a generation below
|
||||
// the index. If the generation is >= the index, then the layer may
|
||||
// be an upload from a running pageserver, or even an upload from
|
||||
// a new generation that didn't upload an index yet.
|
||||
//
|
||||
// Even so, a layer that is not referenced by the index could just
|
||||
// be something enqueued for deletion, so while this check is valid
|
||||
// for indicating that a layer is garbage, it is not an indicator
|
||||
// of a problem.
|
||||
gen < &index_part_generation)
|
||||
.collect();
|
||||
|
||||
if !orphan_layers.is_empty() {
|
||||
// An orphan layer is not an error: it's arguably not even a warning, but it is helpful to report
|
||||
// these as a hint that there is something worth cleaning up here.
|
||||
result.warnings.push(format!(
|
||||
"index_part.json does not contain layers from S3: {:?}",
|
||||
orphan_layers
|
||||
.iter()
|
||||
.map(|(layer_name, gen)| format!(
|
||||
"{}{}",
|
||||
layer_name.file_name(),
|
||||
gen.get_suffix()
|
||||
))
|
||||
.collect::<Vec<_>>(),
|
||||
));
|
||||
result.garbage_keys.extend(orphan_layers.iter().map(
|
||||
|(layer_name, layer_gen)| {
|
||||
let mut key = s3_root.timeline_root(id).prefix_in_bucket;
|
||||
let delimiter = s3_root.delimiter();
|
||||
if !key.ends_with(delimiter) {
|
||||
key.push_str(delimiter);
|
||||
}
|
||||
key.push_str(&format!(
|
||||
"{}{}",
|
||||
&layer_name.file_name(),
|
||||
layer_gen.get_suffix()
|
||||
));
|
||||
key
|
||||
},
|
||||
));
|
||||
}
|
||||
}
|
||||
BlobDataParseResult::Relic => {}
|
||||
BlobDataParseResult::Incorrect(parse_errors) => result.errors.extend(
|
||||
@@ -204,6 +161,83 @@ pub(crate) fn branch_cleanup_and_check_errors(
|
||||
result
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub(crate) struct LayerRef {
|
||||
ref_count: usize,
|
||||
}
|
||||
|
||||
/// Top-level index of objects in a tenant. This may be used by any shard-timeline within
|
||||
/// the tenant to query whether an object exists.
|
||||
#[derive(Default)]
|
||||
pub(crate) struct TenantObjectListing {
|
||||
shard_timelines:
|
||||
HashMap<(ShardIndex, TimelineId), HashMap<(LayerFileName, Generation), LayerRef>>,
|
||||
}
|
||||
|
||||
impl TenantObjectListing {
|
||||
/// Having done an S3 listing of the keys within a timeline prefix, merge them into the overall
|
||||
/// list of layer keys for the Tenant.
|
||||
pub(crate) fn push(
|
||||
&mut self,
|
||||
ttid: TenantShardTimelineId,
|
||||
layers: HashSet<(LayerFileName, Generation)>,
|
||||
) {
|
||||
let shard_index = ShardIndex::new(
|
||||
ttid.tenant_shard_id.shard_number,
|
||||
ttid.tenant_shard_id.shard_count,
|
||||
);
|
||||
let replaced = self.shard_timelines.insert(
|
||||
(shard_index, ttid.timeline_id),
|
||||
layers
|
||||
.into_iter()
|
||||
.map(|l| (l, LayerRef::default()))
|
||||
.collect(),
|
||||
);
|
||||
|
||||
assert!(
|
||||
replaced.is_none(),
|
||||
"Built from an S3 object listing, which should never repeat a key"
|
||||
);
|
||||
}
|
||||
|
||||
/// Having loaded a timeline index, check if a layer referenced by the index exists. If it does,
|
||||
/// the layer's refcount will be incremented. Later, after calling this for all references in all indices
|
||||
/// in a tenant, orphan layers may be detected by their zero refcounts.
|
||||
///
|
||||
/// Returns true if the layer exists
|
||||
pub(crate) fn check_ref(
|
||||
&mut self,
|
||||
timeline_id: TimelineId,
|
||||
layer_file: &LayerFileName,
|
||||
metadata: &IndexLayerMetadata,
|
||||
) -> bool {
|
||||
let Some(shard_tl) = self.shard_timelines.get_mut(&(metadata.shard, timeline_id)) else {
|
||||
return false;
|
||||
};
|
||||
|
||||
let Some(layer_ref) = shard_tl.get_mut(&(layer_file.clone(), metadata.generation)) else {
|
||||
return false;
|
||||
};
|
||||
|
||||
layer_ref.ref_count += 1;
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
pub(crate) fn get_orphans(&self) -> Vec<(ShardIndex, TimelineId, LayerFileName, Generation)> {
|
||||
let mut result = Vec::new();
|
||||
for ((shard_index, timeline_id), layers) in &self.shard_timelines {
|
||||
for ((layer_file, generation), layer_ref) in layers {
|
||||
if layer_ref.ref_count == 0 {
|
||||
result.push((*shard_index, *timeline_id, layer_file.clone(), *generation))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct S3TimelineBlobData {
|
||||
pub(crate) blob_data: BlobDataParseResult,
|
||||
|
||||
@@ -2,22 +2,25 @@ use std::collections::{HashMap, HashSet};
|
||||
|
||||
use crate::checks::{
|
||||
branch_cleanup_and_check_errors, list_timeline_blobs, BlobDataParseResult, S3TimelineBlobData,
|
||||
TimelineAnalysis,
|
||||
TenantObjectListing, TimelineAnalysis,
|
||||
};
|
||||
use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
|
||||
use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId};
|
||||
use aws_sdk_s3::Client;
|
||||
use futures_util::{pin_mut, StreamExt, TryStreamExt};
|
||||
use histogram::Histogram;
|
||||
use pageserver::tenant::remote_timeline_client::remote_layer_path;
|
||||
use pageserver::tenant::IndexPart;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use serde::Serialize;
|
||||
use utils::id::TenantId;
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct MetadataSummary {
|
||||
count: usize,
|
||||
with_errors: HashSet<TenantShardTimelineId>,
|
||||
with_warnings: HashSet<TenantShardTimelineId>,
|
||||
with_garbage: HashSet<TenantShardTimelineId>,
|
||||
with_orphans: HashSet<TenantShardTimelineId>,
|
||||
indices_by_version: HashMap<usize, usize>,
|
||||
|
||||
layer_count: MinMaxHisto,
|
||||
@@ -87,7 +90,7 @@ impl MetadataSummary {
|
||||
count: 0,
|
||||
with_errors: HashSet::new(),
|
||||
with_warnings: HashSet::new(),
|
||||
with_garbage: HashSet::new(),
|
||||
with_orphans: HashSet::new(),
|
||||
indices_by_version: HashMap::new(),
|
||||
layer_count: MinMaxHisto::new(),
|
||||
timeline_size_bytes: MinMaxHisto::new(),
|
||||
@@ -141,6 +144,10 @@ impl MetadataSummary {
|
||||
}
|
||||
}
|
||||
|
||||
fn notify_timeline_orphan(&mut self, ttid: &TenantShardTimelineId) {
|
||||
self.with_orphans.insert(*ttid);
|
||||
}
|
||||
|
||||
/// Long-form output for printing at end of a scan
|
||||
pub fn summary_string(&self) -> String {
|
||||
let version_summary: String = itertools::join(
|
||||
@@ -154,7 +161,7 @@ impl MetadataSummary {
|
||||
"Timelines: {0}
|
||||
With errors: {1}
|
||||
With warnings: {2}
|
||||
With garbage: {3}
|
||||
With orphan layers: {3}
|
||||
Index versions: {version_summary}
|
||||
Timeline size bytes: {4}
|
||||
Layer size bytes: {5}
|
||||
@@ -163,7 +170,7 @@ Timeline layer count: {6}
|
||||
self.count,
|
||||
self.with_errors.len(),
|
||||
self.with_warnings.len(),
|
||||
self.with_garbage.len(),
|
||||
self.with_orphans.len(),
|
||||
self.timeline_size_bytes.oneline(),
|
||||
self.layer_size_bytes.oneline(),
|
||||
self.layer_count.oneline(),
|
||||
@@ -191,7 +198,7 @@ pub async fn scan_metadata(bucket_config: BucketConfig) -> anyhow::Result<Metada
|
||||
|
||||
// Generate a stream of TenantTimelineId
|
||||
let timelines = tenants.map_ok(|t| stream_tenant_timelines(&s3_client, &target, t));
|
||||
let timelines = timelines.try_buffer_unordered(CONCURRENCY);
|
||||
let timelines = timelines.try_buffered(CONCURRENCY);
|
||||
let timelines = timelines.try_flatten();
|
||||
|
||||
// Generate a stream of S3TimelineBlobData
|
||||
@@ -204,17 +211,118 @@ pub async fn scan_metadata(bucket_config: BucketConfig) -> anyhow::Result<Metada
|
||||
Ok((ttid, data))
|
||||
}
|
||||
let timelines = timelines.map_ok(|ttid| report_on_timeline(&s3_client, &target, ttid));
|
||||
let timelines = timelines.try_buffer_unordered(CONCURRENCY);
|
||||
let timelines = timelines.try_buffered(CONCURRENCY);
|
||||
|
||||
// We must gather all the TenantShardTimelineId->S3TimelineBlobData for each tenant, because different
|
||||
// shards in the same tenant might refer to one anothers' keys if a shard split has happened.
|
||||
|
||||
let mut tenant_id = None;
|
||||
let mut tenant_objects = TenantObjectListing::default();
|
||||
let mut tenant_timeline_results = Vec::new();
|
||||
|
||||
fn analyze_tenant(
|
||||
tenant_id: TenantId,
|
||||
summary: &mut MetadataSummary,
|
||||
mut tenant_objects: TenantObjectListing,
|
||||
timelines: Vec<(TenantShardTimelineId, S3TimelineBlobData)>,
|
||||
) {
|
||||
let mut timeline_generations = HashMap::new();
|
||||
for (ttid, data) in timelines {
|
||||
// Stash the generation of each timeline, for later use identifying orphan layers
|
||||
if let BlobDataParseResult::Parsed {
|
||||
index_part: _index_part,
|
||||
index_part_generation,
|
||||
s3_layers: _s3_layers,
|
||||
} = &data.blob_data
|
||||
{
|
||||
timeline_generations.insert(ttid, *index_part_generation);
|
||||
}
|
||||
|
||||
// Apply checks to this timeline shard's metadata, and in the process update `tenant_objects`
|
||||
// reference counts for layers across the tenant.
|
||||
let analysis =
|
||||
branch_cleanup_and_check_errors(&ttid, &mut tenant_objects, None, None, Some(data));
|
||||
summary.update_analysis(&ttid, &analysis);
|
||||
}
|
||||
|
||||
// Identifying orphan layers must be done on a tenant-wide basis, because individual
|
||||
// shards' layers may be referenced by other shards.
|
||||
//
|
||||
// Orphan layers are not a corruption, and not an indication of a problem. They are just
|
||||
// consuming some space in remote storage, and may be cleaned up at leisure.
|
||||
for (shard_index, timeline_id, layer_file, generation) in tenant_objects.get_orphans() {
|
||||
let ttid = TenantShardTimelineId {
|
||||
tenant_shard_id: TenantShardId {
|
||||
tenant_id,
|
||||
shard_count: shard_index.shard_count,
|
||||
shard_number: shard_index.shard_number,
|
||||
},
|
||||
timeline_id,
|
||||
};
|
||||
|
||||
if let Some(timeline_generation) = timeline_generations.get(&ttid) {
|
||||
if &generation >= timeline_generation {
|
||||
// Candidate orphan layer is in the current or future generation relative
|
||||
// to the index we read for this timeline shard, so its absence from the index
|
||||
// doesn't make it an orphan: more likely, it is a case where the layer was
|
||||
// uploaded, but the index referencing the layer wasn't written yet.
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
let orphan_path = remote_layer_path(
|
||||
&tenant_id,
|
||||
&timeline_id,
|
||||
shard_index,
|
||||
&layer_file,
|
||||
generation,
|
||||
);
|
||||
|
||||
tracing::info!("Orphan layer detected: {orphan_path}");
|
||||
|
||||
summary.notify_timeline_orphan(&ttid);
|
||||
}
|
||||
}
|
||||
|
||||
// Iterate through all the timeline results. These are in key-order, so
|
||||
// all results for the same tenant will be adjacent. We accumulate these,
|
||||
// and then call `analyze_tenant` to flush, when we see the next tenant ID.
|
||||
let mut summary = MetadataSummary::new();
|
||||
pin_mut!(timelines);
|
||||
while let Some(i) = timelines.next().await {
|
||||
let (ttid, data) = i?;
|
||||
summary.update_data(&data);
|
||||
|
||||
let analysis = branch_cleanup_and_check_errors(&ttid, &target, None, None, Some(data));
|
||||
match tenant_id {
|
||||
None => tenant_id = Some(ttid.tenant_shard_id.tenant_id),
|
||||
Some(prev_tenant_id) => {
|
||||
if prev_tenant_id != ttid.tenant_shard_id.tenant_id {
|
||||
let tenant_objects = std::mem::take(&mut tenant_objects);
|
||||
let timelines = std::mem::take(&mut tenant_timeline_results);
|
||||
analyze_tenant(prev_tenant_id, &mut summary, tenant_objects, timelines);
|
||||
tenant_id = Some(ttid.tenant_shard_id.tenant_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
summary.update_analysis(&ttid, &analysis);
|
||||
if let BlobDataParseResult::Parsed {
|
||||
index_part: _index_part,
|
||||
index_part_generation: _index_part_generation,
|
||||
s3_layers,
|
||||
} = &data.blob_data
|
||||
{
|
||||
tenant_objects.push(ttid, s3_layers.clone());
|
||||
}
|
||||
tenant_timeline_results.push((ttid, data));
|
||||
}
|
||||
|
||||
if !tenant_timeline_results.is_empty() {
|
||||
analyze_tenant(
|
||||
tenant_id.expect("Must be set if results are present"),
|
||||
&mut summary,
|
||||
tenant_objects,
|
||||
tenant_timeline_results,
|
||||
);
|
||||
}
|
||||
|
||||
Ok(summary)
|
||||
|
||||
@@ -1,73 +0,0 @@
|
||||
# Usage from top of repo:
|
||||
# poetry run python3 ./scripts/ps_duplicate_tenant.py c66e2e233057f7f05563caff664ecb14 .neon/remote_storage_local_fs
|
||||
import argparse
|
||||
import shutil
|
||||
import subprocess
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import sys
|
||||
|
||||
sys.path.append("test_runner")
|
||||
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.types import TenantId
|
||||
|
||||
parser = argparse.ArgumentParser(description="Duplicate tenant script.")
|
||||
parser.add_argument("initial_tenant", type=str, help="Initial tenant")
|
||||
parser.add_argument("remote_storage_local_fs_root", type=Path, help="Remote storage local fs root")
|
||||
parser.add_argument("--ncopies", type=int, help="Number of copies")
|
||||
parser.add_argument("--numthreads", type=int, default=1, help="Number of threads")
|
||||
parser.add_argument("--port", type=int, default=9898, help="Pageserver management api port")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
initial_tenant = args.initial_tenant
|
||||
remote_storage_local_fs_root: Path = args.remote_storage_local_fs_root
|
||||
ncopies = args.ncopies
|
||||
numthreads = args.numthreads
|
||||
|
||||
new_tenant = TenantId.generate()
|
||||
print(f"New tenant: {new_tenant}")
|
||||
|
||||
client = PageserverHttpClient(args.port, lambda: None)
|
||||
|
||||
src_tenant_gen = int(client.tenant_status(initial_tenant)["generation"])
|
||||
|
||||
assert remote_storage_local_fs_root.is_dir(), f"{remote_storage_local_fs_root} is not a directory"
|
||||
|
||||
src_timelines_dir: Path = remote_storage_local_fs_root / "tenants" / initial_tenant / "timelines"
|
||||
assert src_timelines_dir.is_dir(), f"{src_timelines_dir} is not a directory"
|
||||
|
||||
dst_timelines_dir: Path = remote_storage_local_fs_root / "tenants" / str(new_tenant) / "timelines"
|
||||
dst_timelines_dir.parent.mkdir(parents=False, exist_ok=False)
|
||||
dst_timelines_dir.mkdir(parents=False, exist_ok=False)
|
||||
|
||||
for tl in src_timelines_dir.iterdir():
|
||||
src_tl_dir = src_timelines_dir / tl.name
|
||||
assert src_tl_dir.is_dir(), f"{src_tl_dir} is not a directory"
|
||||
dst_tl_dir = dst_timelines_dir / tl.name
|
||||
dst_tl_dir.mkdir(parents=False, exist_ok=False)
|
||||
for file in tl.iterdir():
|
||||
shutil.copy2(file, dst_tl_dir)
|
||||
if "__" in file.name:
|
||||
cmd = [
|
||||
"./target/debug/pagectl", # TODO: abstract this like the other binaries
|
||||
"layer",
|
||||
"rewrite-summary",
|
||||
str(dst_tl_dir / file.name),
|
||||
"--new-tenant-id",
|
||||
str(new_tenant),
|
||||
]
|
||||
subprocess.run(cmd, check=True)
|
||||
|
||||
client.tenant_attach(new_tenant, generation=src_tenant_gen)
|
||||
|
||||
while True:
|
||||
status = client.tenant_status(new_tenant)
|
||||
if status["state"]["slug"] == "Active":
|
||||
break
|
||||
print("Waiting for tenant to be active..., is: " + status["state"]["slug"])
|
||||
time.sleep(1)
|
||||
|
||||
print("Tenant is active: " + str(new_tenant))
|
||||
2
scripts/sk_collect_dumps/.gitignore
vendored
2
scripts/sk_collect_dumps/.gitignore
vendored
@@ -1,2 +1,4 @@
|
||||
result
|
||||
*.json
|
||||
hosts
|
||||
poetry.lock
|
||||
|
||||
11
scripts/sk_collect_dumps/ansible.cfg
Normal file
11
scripts/sk_collect_dumps/ansible.cfg
Normal file
@@ -0,0 +1,11 @@
|
||||
[defaults]
|
||||
host_key_checking = False
|
||||
inventory=./hosts
|
||||
remote_tmp=/tmp
|
||||
remote_user=developer
|
||||
callbacks_enabled = profile_tasks
|
||||
|
||||
[ssh_connection]
|
||||
scp_if_ssh = True
|
||||
ssh_args = -F ./ssh.cfg
|
||||
pipelining = True
|
||||
16
scripts/sk_collect_dumps/pyproject.toml
Normal file
16
scripts/sk_collect_dumps/pyproject.toml
Normal file
@@ -0,0 +1,16 @@
|
||||
[tool.poetry]
|
||||
name = "sk-collect-dumps"
|
||||
version = "0.1.0"
|
||||
description = ""
|
||||
authors = ["Arseny Sher <sher-ars@yandex.ru>"]
|
||||
readme = "README.md"
|
||||
packages = [{include = "sk_collect_dumps"}]
|
||||
|
||||
[tool.poetry.dependencies]
|
||||
python = "^3.11"
|
||||
ansible = "^9.1.0"
|
||||
|
||||
|
||||
[build-system]
|
||||
requires = ["poetry-core"]
|
||||
build-backend = "poetry.core.masonry.api"
|
||||
@@ -1,25 +1,43 @@
|
||||
# Collect /v1/debug_dump from all safekeeper nodes
|
||||
|
||||
1. Run ansible playbooks to collect .json dumps from all safekeepers and store them in `./result` directory.
|
||||
2. Run `DB_CONNSTR=... ./upload.sh prod_feb30` to upload dumps to `prod_feb30` table in specified postgres database.
|
||||
|
||||
## How to use ansible (staging)
|
||||
|
||||
3. Issue admin token (add/remove .stage from url for staging/prod and setting proper API key):
|
||||
```
|
||||
AWS_DEFAULT_PROFILE=dev ansible-playbook -i ../../.github/ansible/staging.us-east-2.hosts.yaml -e @../../.github/ansible/ssm_config remote.yaml
|
||||
# staging:
|
||||
AUTH_TOKEN=$(curl https://console.stage.neon.tech/regions/console/api/v1/admin/issue_token -H "Accept: application/json" -H "Content-Type: application/json" -H "Authorization: Bearer $NEON_STAGING_KEY" -X POST -d '{"ttl_seconds": 43200, "scope": "safekeeperdata"}' 2>/dev/null | jq --raw-output '.jwt')
|
||||
# prod:
|
||||
AUTH_TOKEN=$(curl https://console.neon.tech/regions/console/api/v1/admin/issue_token -H "Accept: application/json" -H "Content-Type: application/json" -H "Authorization: Bearer $NEON_PROD_KEY" -X POST -d '{"ttl_seconds": 43200, "scope": "safekeeperdata"}' 2>/dev/null | jq --raw-output '.jwt')
|
||||
# check
|
||||
echo $AUTH_TOKEN
|
||||
```
|
||||
2. Run ansible playbooks to collect .json dumps from all safekeepers and store them in `./result` directory.
|
||||
|
||||
AWS_DEFAULT_PROFILE=dev ansible-playbook -i ../../.github/ansible/staging.eu-west-1.hosts.yaml -e @../../.github/ansible/ssm_config remote.yaml
|
||||
There are two ways to do that, with ssm or tsh. ssm:
|
||||
```
|
||||
# in aws repo, cd .github/ansible and run e.g. (adjusting profile and region in vars and limit):
|
||||
AWS_DEFAULT_PROFILE=dev ansible-playbook -i inventory_aws_ec2.yaml -i staging.us-east-2.vars.yaml -e @ssm_config -l 'safekeeper:&us_east_2' -e "auth_token=${AUTH_TOKEN}" ~/neon/neon/scripts/sk_collect_dumps/remote.yaml
|
||||
```
|
||||
It will put the results to .results directory *near the playbook*.
|
||||
|
||||
tsh:
|
||||
|
||||
Update the inventory, if needed, selecting .build/.tech and optionally region:
|
||||
```
|
||||
rm -f hosts && echo '[safekeeper]' >> hosts
|
||||
# staging:
|
||||
tsh ls | awk '{print $1}' | grep safekeeper | grep "neon.build" | grep us-east-2 >> hosts
|
||||
# prod:
|
||||
tsh ls | awk '{print $1}' | grep safekeeper | grep "neon.tech" | grep us-east-2 >> hosts
|
||||
```
|
||||
|
||||
## How to use ansible (prod)
|
||||
|
||||
Test ansible connection:
|
||||
```
|
||||
AWS_DEFAULT_PROFILE=prod ansible-playbook -i ../../.github/ansible/prod.us-west-2.hosts.yaml -e @../../.github/ansible/ssm_config remote.yaml
|
||||
|
||||
AWS_DEFAULT_PROFILE=prod ansible-playbook -i ../../.github/ansible/prod.us-east-2.hosts.yaml -e @../../.github/ansible/ssm_config remote.yaml
|
||||
|
||||
AWS_DEFAULT_PROFILE=prod ansible-playbook -i ../../.github/ansible/prod.eu-central-1.hosts.yaml -e @../../.github/ansible/ssm_config remote.yaml
|
||||
|
||||
AWS_DEFAULT_PROFILE=prod ansible-playbook -i ../../.github/ansible/prod.ap-southeast-1.hosts.yaml -e @../../.github/ansible/ssm_config remote.yaml
|
||||
ansible all -m ping -v
|
||||
```
|
||||
|
||||
Download the dumps:
|
||||
```
|
||||
mkdir -p result && rm -f result/*
|
||||
ansible-playbook -e "auth_token=${AUTH_TOKEN}" remote.yaml
|
||||
```
|
||||
|
||||
3. Run `DB_CONNSTR=... ./upload.sh prod_feb30` to upload dumps to `prod_feb30` table in specified postgres database.
|
||||
|
||||
@@ -1,18 +1,37 @@
|
||||
- name: Fetch state dumps from safekeepers
|
||||
hosts: safekeepers
|
||||
hosts: safekeeper
|
||||
gather_facts: False
|
||||
remote_user: "{{ remote_user }}"
|
||||
|
||||
tasks:
|
||||
- name: Download file
|
||||
- name: Dump file
|
||||
get_url:
|
||||
url: "http://{{ inventory_hostname }}:7676/v1/debug_dump?dump_all=true&dump_disk_content=false"
|
||||
dest: "/tmp/{{ inventory_hostname }}.json"
|
||||
dest: "/tmp/{{ inventory_hostname }}-dump.json"
|
||||
headers:
|
||||
Authorization: "Bearer {{ auth_token }}"
|
||||
|
||||
- name: Fetch file from remote hosts
|
||||
- name: install rsync
|
||||
ansible.builtin.apt:
|
||||
name: rsync
|
||||
update_cache: yes
|
||||
become: yes
|
||||
ignore_errors: true # it can be already installed and we don't always have sudo
|
||||
|
||||
- name: Fetch file from remote hosts (works only with ssm)
|
||||
fetch:
|
||||
src: "/tmp/{{ inventory_hostname }}.json"
|
||||
dest: "./result/{{ inventory_hostname }}.json"
|
||||
src: "/tmp/{{ inventory_hostname }}-dump.json"
|
||||
dest: "./result/{{ inventory_hostname }}-dump.json"
|
||||
flat: yes
|
||||
fail_on_missing: no
|
||||
when: ansible_connection == "aws_ssm"
|
||||
|
||||
# xxx not sure how to make ansible 'synchronize' work with tsh
|
||||
- name: Fetch file from remote hosts
|
||||
shell: rsync -e 'tsh ssh' -azvP "developer@{{ inventory_hostname }}:/tmp/{{ inventory_hostname }}-dump.json" "./result/{{ inventory_hostname }}-dump.json"
|
||||
delegate_to: localhost
|
||||
when: ansible_connection != "aws_ssm"
|
||||
|
||||
- name: remove remote dumps
|
||||
ansible.builtin.file:
|
||||
path: "/tmp/{{ inventory_hostname }}-dump.json"
|
||||
state: absent
|
||||
|
||||
13
scripts/sk_collect_dumps/ssh.cfg
Normal file
13
scripts/sk_collect_dumps/ssh.cfg
Normal file
@@ -0,0 +1,13 @@
|
||||
# Begin generated Teleport configuration for teleport.aws.neon.tech by tsh
|
||||
|
||||
# Common flags for all teleport.aws.neon.tech hosts
|
||||
Host *
|
||||
HostKeyAlgorithms rsa-sha2-512-cert-v01@openssh.com,rsa-sha2-256-cert-v01@openssh.com,ssh-rsa-cert-v01@openssh.com
|
||||
|
||||
# Flags for all teleport.aws.neon.tech hosts except the proxy
|
||||
Host * !teleport.aws.neon.tech
|
||||
Port 3022
|
||||
ProxyCommand "/usr/local/bin/tsh" proxy ssh --cluster=teleport.aws.neon.tech --proxy=teleport.aws.neon.tech:443 %r@%h:%p
|
||||
User developer
|
||||
|
||||
# End generated Teleport configuration
|
||||
@@ -31,22 +31,22 @@ SELECT
|
||||
(data->>'tenant_id') AS tenant_id,
|
||||
(data->>'timeline_id') AS timeline_id,
|
||||
(data->'memory'->>'active')::bool AS active,
|
||||
(data->'memory'->>'flush_lsn')::bigint AS flush_lsn,
|
||||
(data->'memory'->'mem_state'->>'backup_lsn')::bigint AS backup_lsn,
|
||||
(data->'memory'->'mem_state'->>'commit_lsn')::bigint AS commit_lsn,
|
||||
(data->'memory'->'mem_state'->>'peer_horizon_lsn')::bigint AS peer_horizon_lsn,
|
||||
(data->'memory'->'mem_state'->>'remote_consistent_lsn')::bigint AS remote_consistent_lsn,
|
||||
(data->'memory'->>'write_lsn')::bigint AS write_lsn,
|
||||
(data->'memory'->>'flush_lsn')::pg_lsn AS flush_lsn,
|
||||
(data->'memory'->'mem_state'->>'backup_lsn')::pg_lsn AS backup_lsn,
|
||||
(data->'memory'->'mem_state'->>'commit_lsn')::pg_lsn AS commit_lsn,
|
||||
(data->'memory'->'mem_state'->>'peer_horizon_lsn')::pg_lsn AS peer_horizon_lsn,
|
||||
(data->'memory'->'mem_state'->>'remote_consistent_lsn')::pg_lsn AS remote_consistent_lsn,
|
||||
(data->'memory'->>'write_lsn')::pg_lsn AS write_lsn,
|
||||
(data->'memory'->>'num_computes')::bigint AS num_computes,
|
||||
(data->'memory'->>'epoch_start_lsn')::bigint AS epoch_start_lsn,
|
||||
(data->'memory'->>'epoch_start_lsn')::pg_lsn AS epoch_start_lsn,
|
||||
(data->'memory'->>'last_removed_segno')::bigint AS last_removed_segno,
|
||||
(data->'memory'->>'is_cancelled')::bool AS is_cancelled,
|
||||
(data->'control_file'->>'backup_lsn')::bigint AS disk_backup_lsn,
|
||||
(data->'control_file'->>'commit_lsn')::bigint AS disk_commit_lsn,
|
||||
(data->'control_file'->>'backup_lsn')::pg_lsn AS disk_backup_lsn,
|
||||
(data->'control_file'->>'commit_lsn')::pg_lsn AS disk_commit_lsn,
|
||||
(data->'control_file'->'acceptor_state'->>'term')::bigint AS disk_term,
|
||||
(data->'control_file'->>'local_start_lsn')::bigint AS local_start_lsn,
|
||||
(data->'control_file'->>'peer_horizon_lsn')::bigint AS disk_peer_horizon_lsn,
|
||||
(data->'control_file'->>'timeline_start_lsn')::bigint AS timeline_start_lsn,
|
||||
(data->'control_file'->>'remote_consistent_lsn')::bigint AS disk_remote_consistent_lsn
|
||||
(data->'control_file'->>'local_start_lsn')::pg_lsn AS local_start_lsn,
|
||||
(data->'control_file'->>'peer_horizon_lsn')::pg_lsn AS disk_peer_horizon_lsn,
|
||||
(data->'control_file'->>'timeline_start_lsn')::pg_lsn AS timeline_start_lsn,
|
||||
(data->'control_file'->>'remote_consistent_lsn')::pg_lsn AS disk_remote_consistent_lsn
|
||||
FROM tmp_json
|
||||
EOF
|
||||
|
||||
10
scripts/sk_migrate/restart_ep.sh
Normal file
10
scripts/sk_migrate/restart_ep.sh
Normal file
@@ -0,0 +1,10 @@
|
||||
#!/bin/bash
|
||||
|
||||
# export NEON_API_KEY=
|
||||
|
||||
while IFS= read -r ENDPOINT
|
||||
do
|
||||
echo "$ENDPOINT"
|
||||
# curl -X POST -H "Authorization: Bearer $NEON_PROD_KEY" -H "Accept: application/json" -H "Content-Type: application/json" https://console.neon.tech/regions/console/api/v1/admin/endpoints/$ENDPOINT/restart
|
||||
curl -X POST -H "Authorization: Bearer $NEON_API_KEY" -H "Accept: application/json" -H "Content-Type: application/json" https://console.neon.tech/regions/aws-us-east-2/api/v1/admin/endpoints/$ENDPOINT/restart
|
||||
done < endpoints_cplane.txt
|
||||
137
scripts/sk_migrate/sk_migrate.py
Normal file
137
scripts/sk_migrate/sk_migrate.py
Normal file
@@ -0,0 +1,137 @@
|
||||
import argparse
|
||||
import sys
|
||||
import psycopg2
|
||||
import psycopg2.extras
|
||||
import os
|
||||
import requests
|
||||
|
||||
def migrate_project(conn, from_sk: dict[str, any], to_sk: dict[str, any], project_id: str, dry_run=True):
|
||||
print("###############################################################")
|
||||
|
||||
with conn.cursor(cursor_factory = psycopg2.extras.RealDictCursor) as cur:
|
||||
cur.execute("SELECT * FROM projects WHERE id = %s", (project_id,))
|
||||
project = cur.fetchone()
|
||||
|
||||
if project is None:
|
||||
print("Project with id {} does not exist".format(project_id))
|
||||
return
|
||||
|
||||
assert project['deleted'] == False, "Project with id {} is deleted".format(project_id)
|
||||
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("SELECT safekeeper_id FROM projects_safekeepers WHERE project_id = %s", (project_id, ))
|
||||
sk_ids = list(map(lambda x: x[0], cur.fetchall()))
|
||||
assert from_sk['id'] in sk_ids
|
||||
assert to_sk['id'] not in sk_ids
|
||||
|
||||
with conn.cursor(cursor_factory = psycopg2.extras.RealDictCursor) as cur:
|
||||
cur.execute("SELECT * FROM branches WHERE project_id = %s AND deleted = 'f'", (project_id, ))
|
||||
branches = cur.fetchall()
|
||||
|
||||
for branch in branches:
|
||||
if branch['deleted'] != False:
|
||||
continue
|
||||
|
||||
tenant_id = project['tenant_id']
|
||||
timeline_id = branch['timeline_id']
|
||||
print("tenant_id: {}, timeline_id: {}".format(tenant_id, timeline_id))
|
||||
print(f"Migrating from {from_sk['host']} to {to_sk['host']}, project={project_id}, branch={branch['id']}, deleted={branch['deleted']}")
|
||||
|
||||
print(list(sk_ids))
|
||||
|
||||
sk_hosts = list(map(
|
||||
lambda x: f"http://{safekeepers[x]['host']}:{safekeepers[x]['http_port']}",
|
||||
filter(lambda x: x != from_sk['id'], sk_ids)
|
||||
))
|
||||
|
||||
# make HTTP request to /pull_timeline
|
||||
# url = f"http://{to_sk['host']}:{to_sk['http_port']}/v1/tenant/{tenant_id}/timeline/{timeline_id}"
|
||||
url = f"http://{to_sk['host']}:{to_sk['http_port']}/v1/pull_timeline"
|
||||
body = {
|
||||
"tenant_id": str(tenant_id),
|
||||
"timeline_id": str(timeline_id),
|
||||
"http_hosts": sk_hosts,
|
||||
}
|
||||
print(body)
|
||||
|
||||
print("Making HTTP request to {}".format(url), flush=True)
|
||||
if not dry_run:
|
||||
response = requests.post(url, json=body)
|
||||
# response = requests.get(url)
|
||||
|
||||
if response.status_code != 200 and f"error decoding response body: missing field `tenant_id` at line 1 column 104" in response.text:
|
||||
print(f"WARN: Skipping branch {branch['id']} because it's empty on all safekeepers")
|
||||
continue
|
||||
|
||||
if response.status_code != 200 and f"Timeline {timeline_id} already exists" in response.text:
|
||||
print(f"WARN: Skipping timeline {timeline_id} because it is already exists (was migrated earlier)")
|
||||
continue
|
||||
|
||||
if response.status_code != 200:
|
||||
print("ERROR: {}".format(response.text))
|
||||
return
|
||||
print(response.text)
|
||||
|
||||
print(f"Updating safekeeper {from_sk['id']} -> {to_sk['id']} for project={project_id} in the database")
|
||||
if not dry_run:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("UPDATE projects_safekeepers SET safekeeper_id = %s WHERE project_id = %s AND safekeeper_id = %s RETURNING *", (to_sk['id'], project_id, from_sk['id']))
|
||||
print(cur.fetchone())
|
||||
conn.commit()
|
||||
|
||||
def find_projects(sk_from_id: int):
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("SELECT p.id FROM projects p, projects_safekeepers ps WHERE ps.project_id = p.id AND NOT p.deleted AND ps.safekeeper_id = %s", (sk_from_id, ))
|
||||
project_ids = list(map(lambda x: x[0], cur.fetchall()))
|
||||
return project_ids
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser(description='migrate sk')
|
||||
parser.add_argument("-d", help="database URL", type=str, required=True)
|
||||
parser.add_argument("--from-sk", help="from sk id as in the cplane db", type=int, required=True)
|
||||
parser.add_argument("--to-sk", help="to sk id as in the cplane db", type=int, required=True)
|
||||
parser.add_argument("--not-dry-run", help="", action='store_true')
|
||||
parser.add_argument("--project-id", help="project to migrate", type=str, default=None)
|
||||
args = parser.parse_args()
|
||||
|
||||
# Connect to postgresql database
|
||||
conn = psycopg2.connect(args.d)
|
||||
|
||||
safekeepers = dict()
|
||||
|
||||
# We need to fetch all objects from "safekeepers" table and store them in "safekeepers" list
|
||||
# Create cursor
|
||||
cur = conn.cursor(cursor_factory = psycopg2.extras.RealDictCursor)
|
||||
# Execute query
|
||||
cur.execute("SELECT * FROM safekeepers")
|
||||
# Fetch all rows
|
||||
rows = cur.fetchall()
|
||||
# Close cursor
|
||||
cur.close()
|
||||
|
||||
# Iterate over rows
|
||||
for row in rows:
|
||||
safekeepers[row['id']] = row
|
||||
|
||||
# Print all safekeepers
|
||||
# print(safekeepers)
|
||||
|
||||
assert args.from_sk in safekeepers, "Safekeeper with id {} does not exist".format(args.from_sk)
|
||||
from_sk_hostname = safekeepers[args.from_sk]['host']
|
||||
assert safekeepers[args.from_sk]['active'] == False, "Safekeeper with id {} should be inactive".format(args.from_sk)
|
||||
|
||||
assert args.to_sk in safekeepers, "Safekeeper with id {} does not exist".format(args.to_sk)
|
||||
to_sk_hostname = safekeepers[args.to_sk]['host']
|
||||
assert safekeepers[args.to_sk]['active'] == True, "Safekeeper with id {} should be active".format(args.to_sk)
|
||||
|
||||
print(f"migrating from id {args.from_sk} {from_sk_hostname} to {args.to_sk} {to_sk_hostname}")
|
||||
|
||||
if args.project_id is not None:
|
||||
project_ids = [args.project_id]
|
||||
else:
|
||||
project_ids = find_projects(args.from_sk)
|
||||
print(project_ids)
|
||||
|
||||
for project_id in project_ids:
|
||||
migrate_project(conn, safekeepers[args.from_sk], safekeepers[args.to_sk], project_id)
|
||||
@@ -1,41 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
if [ "$(cat /sys/class/block/nvme1n1/device/model)" != "Amazon EC2 NVMe Instance Storage " ]; then
|
||||
echo "nvme1n1 is not Amazon EC2 NVMe Instance Storage: '$(cat /sys/class/block/nvme1n1/device/model)'"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
rmdir bench_repo_dir || true
|
||||
|
||||
sudo mkfs.ext4 -E lazy_itable_init=0,lazy_journal_init=0 /dev/nvme1n1
|
||||
|
||||
sudo mount /dev/nvme1n1 /mnt
|
||||
sudo chown -R "$(id -u)":"$(id -g)" /mnt
|
||||
|
||||
mkdir /mnt/bench_repo_dir
|
||||
mkdir bench_repo_dir
|
||||
sudo mount --bind /mnt/bench_repo_dir bench_repo_dir
|
||||
|
||||
mkdir /mnt/test_output
|
||||
|
||||
mkdir /mnt/many_tenants
|
||||
|
||||
echo run the following commands
|
||||
|
||||
cat <<EOF
|
||||
# test suite run
|
||||
export TEST_OUTPUT="/mnt/test_output"
|
||||
DEFAULT_PG_VERSION=15 BUILD_TYPE=release ./scripts/pytest test_runner/performance/test_pageserver.py
|
||||
|
||||
# for interactive use
|
||||
export NEON_REPO_DIR="$(readlink -f ./bench_repo_dir)/repo"
|
||||
cargo build_testing --release
|
||||
./target/release/neon_local init
|
||||
# ... create tenant, seed it using pgbench
|
||||
# then duplicate the tenant using
|
||||
# poetry run python3 ./test_runner/duplicate_tenant.py TENANT_ID 200 8
|
||||
EOF
|
||||
|
||||
|
||||
@@ -3,9 +3,12 @@ use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use clap::Parser;
|
||||
use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey;
|
||||
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
|
||||
use storage_broker::proto::{SafekeeperTimelineInfo, SubscribeSafekeeperInfoRequest};
|
||||
|
||||
use storage_broker::proto::SafekeeperTimelineInfo;
|
||||
use storage_broker::proto::{
|
||||
FilterTenantTimelineId, MessageType, SubscribeByFilterRequest,
|
||||
TenantTimelineId as ProtoTenantTimelineId, TypeSubscription, TypedMessage,
|
||||
};
|
||||
|
||||
use storage_broker::{BrokerClientChannel, DEFAULT_ENDPOINT};
|
||||
use tokio::time;
|
||||
@@ -91,15 +94,23 @@ async fn subscribe(client: Option<BrokerClientChannel>, counter: Arc<AtomicU64>,
|
||||
None => storage_broker::connect(DEFAULT_ENDPOINT, Duration::from_secs(5)).unwrap(),
|
||||
};
|
||||
|
||||
let key = SubscriptionKey::TenantTimelineId(ProtoTenantTimelineId {
|
||||
let ttid = ProtoTenantTimelineId {
|
||||
tenant_id: vec![0xFF; 16],
|
||||
timeline_id: tli_from_u64(i),
|
||||
});
|
||||
let request = SubscribeSafekeeperInfoRequest {
|
||||
subscription_key: Some(key),
|
||||
};
|
||||
let mut stream = client
|
||||
.subscribe_safekeeper_info(request)
|
||||
|
||||
let request = SubscribeByFilterRequest {
|
||||
types: vec![TypeSubscription {
|
||||
r#type: MessageType::SafekeeperTimelineInfo.into(),
|
||||
}],
|
||||
tenant_timeline_id: Some(FilterTenantTimelineId {
|
||||
enabled: true,
|
||||
tenant_timeline_id: Some(ttid),
|
||||
}),
|
||||
};
|
||||
|
||||
let mut stream: tonic::Streaming<TypedMessage> = client
|
||||
.subscribe_by_filter(request)
|
||||
.await
|
||||
.unwrap()
|
||||
.into_inner();
|
||||
|
||||
@@ -10,6 +10,12 @@ service BrokerService {
|
||||
|
||||
// Publish safekeeper updates.
|
||||
rpc PublishSafekeeperInfo(stream SafekeeperTimelineInfo) returns (google.protobuf.Empty) {};
|
||||
|
||||
// Subscribe to all messages, limited by a filter.
|
||||
rpc SubscribeByFilter(SubscribeByFilterRequest) returns (stream TypedMessage) {};
|
||||
|
||||
// Publish one message.
|
||||
rpc PublishOne(TypedMessage) returns (google.protobuf.Empty) {};
|
||||
}
|
||||
|
||||
message SubscribeSafekeeperInfoRequest {
|
||||
@@ -48,3 +54,55 @@ message TenantTimelineId {
|
||||
bytes tenant_id = 1;
|
||||
bytes timeline_id = 2;
|
||||
}
|
||||
|
||||
message FilterTenantTimelineId {
|
||||
// If true, only messages related to `tenant_timeline_id` will be emitted.
|
||||
// Otherwise, messages for all timelines will be emitted.
|
||||
bool enabled = 1;
|
||||
TenantTimelineId tenant_timeline_id = 2;
|
||||
}
|
||||
|
||||
message TypeSubscription {
|
||||
MessageType type = 1;
|
||||
}
|
||||
|
||||
message SubscribeByFilterRequest {
|
||||
// Subscription will emit messages only of the specified types. You need to specify
|
||||
// at least one type to receive any messages.
|
||||
repeated TypeSubscription types = 1;
|
||||
|
||||
// If set and enabled, subscription will emit messages only for the specified tenant/timeline.
|
||||
optional FilterTenantTimelineId tenant_timeline_id = 2;
|
||||
}
|
||||
|
||||
enum MessageType {
|
||||
UNKNOWN = 0;
|
||||
SAFEKEEPER_TIMELINE_INFO = 2;
|
||||
SAFEKEEPER_DISCOVERY_REQUEST = 3;
|
||||
SAFEKEEPER_DISCOVERY_RESPONSE = 4;
|
||||
}
|
||||
|
||||
// A message with a type.
|
||||
message TypedMessage {
|
||||
MessageType type = 1;
|
||||
|
||||
optional SafekeeperTimelineInfo safekeeper_timeline_info = 2;
|
||||
optional SafekeeperDiscoveryRequest safekeeper_discovery_request = 3;
|
||||
optional SafekeeperDiscoveryResponse safekeeper_discovery_response = 4;
|
||||
}
|
||||
|
||||
message SafekeeperDiscoveryRequest {
|
||||
TenantTimelineId tenant_timeline_id = 1;
|
||||
}
|
||||
|
||||
// Shorter version of SafekeeperTimelineInfo, contains only necessary fields.
|
||||
message SafekeeperDiscoveryResponse {
|
||||
uint64 safekeeper_id = 1;
|
||||
TenantTimelineId tenant_timeline_id = 2;
|
||||
// WAL available to download.
|
||||
uint64 commit_lsn = 3;
|
||||
// A connection string to use for WAL downloading.
|
||||
string safekeeper_connstr = 4;
|
||||
// Availability zone of a safekeeper.
|
||||
optional string availability_zone = 5;
|
||||
}
|
||||
|
||||
@@ -35,10 +35,16 @@ use tracing::*;
|
||||
use utils::signals::ShutdownSignals;
|
||||
|
||||
use metrics::{Encoder, TextEncoder};
|
||||
use storage_broker::metrics::{NUM_PUBS, NUM_SUBS_ALL, NUM_SUBS_TIMELINE};
|
||||
use storage_broker::metrics::{
|
||||
BROADCASTED_MESSAGES_TOTAL, BROADCAST_DROPPED_MESSAGES_TOTAL, NUM_PUBS, NUM_SUBS_ALL,
|
||||
NUM_SUBS_TIMELINE, PROCESSED_MESSAGES_TOTAL, PUBLISHED_ONEOFF_MESSAGES_TOTAL,
|
||||
};
|
||||
use storage_broker::proto::broker_service_server::{BrokerService, BrokerServiceServer};
|
||||
use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey;
|
||||
use storage_broker::proto::{SafekeeperTimelineInfo, SubscribeSafekeeperInfoRequest};
|
||||
use storage_broker::proto::{
|
||||
FilterTenantTimelineId, MessageType, SafekeeperDiscoveryRequest, SafekeeperDiscoveryResponse,
|
||||
SafekeeperTimelineInfo, SubscribeByFilterRequest, SubscribeSafekeeperInfoRequest, TypedMessage,
|
||||
};
|
||||
use storage_broker::{
|
||||
parse_proto_ttid, EitherBody, DEFAULT_KEEPALIVE_INTERVAL, DEFAULT_LISTEN_ADDR,
|
||||
};
|
||||
@@ -73,8 +79,103 @@ struct Args {
|
||||
log_format: String,
|
||||
}
|
||||
|
||||
type PubId = u64; // id of publisher for registering in maps
|
||||
type SubId = u64; // id of subscriber for registering in maps
|
||||
/// Id of publisher for registering in maps
|
||||
type PubId = u64;
|
||||
|
||||
/// Id of subscriber for registering in maps
|
||||
type SubId = u64;
|
||||
|
||||
/// Single enum type for all messages.
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
enum Message {
|
||||
SafekeeperTimelineInfo(SafekeeperTimelineInfo),
|
||||
SafekeeperDiscoveryRequest(SafekeeperDiscoveryRequest),
|
||||
SafekeeperDiscoveryResponse(SafekeeperDiscoveryResponse),
|
||||
}
|
||||
|
||||
impl Message {
|
||||
/// Convert proto message to internal message.
|
||||
pub fn from(proto_msg: TypedMessage) -> Result<Self, Status> {
|
||||
match proto_msg.r#type() {
|
||||
MessageType::SafekeeperTimelineInfo => Ok(Message::SafekeeperTimelineInfo(
|
||||
proto_msg.safekeeper_timeline_info.ok_or_else(|| {
|
||||
Status::new(Code::InvalidArgument, "missing safekeeper_timeline_info")
|
||||
})?,
|
||||
)),
|
||||
MessageType::SafekeeperDiscoveryRequest => Ok(Message::SafekeeperDiscoveryRequest(
|
||||
proto_msg.safekeeper_discovery_request.ok_or_else(|| {
|
||||
Status::new(
|
||||
Code::InvalidArgument,
|
||||
"missing safekeeper_discovery_request",
|
||||
)
|
||||
})?,
|
||||
)),
|
||||
MessageType::SafekeeperDiscoveryResponse => Ok(Message::SafekeeperDiscoveryResponse(
|
||||
proto_msg.safekeeper_discovery_response.ok_or_else(|| {
|
||||
Status::new(
|
||||
Code::InvalidArgument,
|
||||
"missing safekeeper_discovery_response",
|
||||
)
|
||||
})?,
|
||||
)),
|
||||
MessageType::Unknown => Err(Status::new(
|
||||
Code::InvalidArgument,
|
||||
format!("invalid message type: {:?}", proto_msg.r#type),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the tenant_timeline_id from the message.
|
||||
pub fn tenant_timeline_id(&self) -> Result<Option<TenantTimelineId>, Status> {
|
||||
match self {
|
||||
Message::SafekeeperTimelineInfo(msg) => Ok(msg
|
||||
.tenant_timeline_id
|
||||
.as_ref()
|
||||
.map(parse_proto_ttid)
|
||||
.transpose()?),
|
||||
Message::SafekeeperDiscoveryRequest(msg) => Ok(msg
|
||||
.tenant_timeline_id
|
||||
.as_ref()
|
||||
.map(parse_proto_ttid)
|
||||
.transpose()?),
|
||||
Message::SafekeeperDiscoveryResponse(msg) => Ok(msg
|
||||
.tenant_timeline_id
|
||||
.as_ref()
|
||||
.map(parse_proto_ttid)
|
||||
.transpose()?),
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert internal message to the protobuf struct.
|
||||
pub fn as_typed_message(&self) -> TypedMessage {
|
||||
let mut res = TypedMessage {
|
||||
r#type: self.message_type() as i32,
|
||||
..Default::default()
|
||||
};
|
||||
match self {
|
||||
Message::SafekeeperTimelineInfo(msg) => {
|
||||
res.safekeeper_timeline_info = Some(msg.clone())
|
||||
}
|
||||
Message::SafekeeperDiscoveryRequest(msg) => {
|
||||
res.safekeeper_discovery_request = Some(msg.clone())
|
||||
}
|
||||
Message::SafekeeperDiscoveryResponse(msg) => {
|
||||
res.safekeeper_discovery_response = Some(msg.clone())
|
||||
}
|
||||
}
|
||||
res
|
||||
}
|
||||
|
||||
/// Get the message type.
|
||||
pub fn message_type(&self) -> MessageType {
|
||||
match self {
|
||||
Message::SafekeeperTimelineInfo(_) => MessageType::SafekeeperTimelineInfo,
|
||||
Message::SafekeeperDiscoveryRequest(_) => MessageType::SafekeeperDiscoveryRequest,
|
||||
Message::SafekeeperDiscoveryResponse(_) => MessageType::SafekeeperDiscoveryResponse,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
enum SubscriptionKey {
|
||||
@@ -83,7 +184,7 @@ enum SubscriptionKey {
|
||||
}
|
||||
|
||||
impl SubscriptionKey {
|
||||
// Parse protobuf subkey (protobuf doesn't have fixed size bytes, we get vectors).
|
||||
/// Parse protobuf subkey (protobuf doesn't have fixed size bytes, we get vectors).
|
||||
pub fn from_proto_subscription_key(key: ProtoSubscriptionKey) -> Result<Self, Status> {
|
||||
match key {
|
||||
ProtoSubscriptionKey::All(_) => Ok(SubscriptionKey::All),
|
||||
@@ -92,14 +193,29 @@ impl SubscriptionKey {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse from FilterTenantTimelineId
|
||||
pub fn from_proto_filter_tenant_timeline_id(
|
||||
f: &FilterTenantTimelineId,
|
||||
) -> Result<Self, Status> {
|
||||
if !f.enabled {
|
||||
return Ok(SubscriptionKey::All);
|
||||
}
|
||||
|
||||
let ttid =
|
||||
parse_proto_ttid(f.tenant_timeline_id.as_ref().ok_or_else(|| {
|
||||
Status::new(Code::InvalidArgument, "missing tenant_timeline_id")
|
||||
})?)?;
|
||||
Ok(SubscriptionKey::Timeline(ttid))
|
||||
}
|
||||
}
|
||||
|
||||
// Channel to timeline subscribers.
|
||||
/// Channel to timeline subscribers.
|
||||
struct ChanToTimelineSub {
|
||||
chan: broadcast::Sender<SafekeeperTimelineInfo>,
|
||||
// Tracked separately to know when delete the shmem entry. receiver_count()
|
||||
// is unhandy for that as unregistering and dropping the receiver side
|
||||
// happens at different moments.
|
||||
chan: broadcast::Sender<Message>,
|
||||
/// Tracked separately to know when delete the shmem entry. receiver_count()
|
||||
/// is unhandy for that as unregistering and dropping the receiver side
|
||||
/// happens at different moments.
|
||||
num_subscribers: u64,
|
||||
}
|
||||
|
||||
@@ -110,7 +226,7 @@ struct SharedState {
|
||||
num_subs_to_timelines: i64,
|
||||
chans_to_timeline_subs: HashMap<TenantTimelineId, ChanToTimelineSub>,
|
||||
num_subs_to_all: i64,
|
||||
chan_to_all_subs: broadcast::Sender<SafekeeperTimelineInfo>,
|
||||
chan_to_all_subs: broadcast::Sender<Message>,
|
||||
}
|
||||
|
||||
impl SharedState {
|
||||
@@ -146,7 +262,7 @@ impl SharedState {
|
||||
&mut self,
|
||||
sub_key: SubscriptionKey,
|
||||
timeline_chan_size: usize,
|
||||
) -> (SubId, broadcast::Receiver<SafekeeperTimelineInfo>) {
|
||||
) -> (SubId, broadcast::Receiver<Message>) {
|
||||
let sub_id = self.next_sub_id;
|
||||
self.next_sub_id += 1;
|
||||
let sub_rx = match sub_key {
|
||||
@@ -262,6 +378,29 @@ impl Registry {
|
||||
subscriber.id, subscriber.key, subscriber.remote_addr
|
||||
);
|
||||
}
|
||||
|
||||
/// Send msg to relevant subscribers.
|
||||
pub fn send_msg(&self, msg: &Message) -> Result<(), Status> {
|
||||
PROCESSED_MESSAGES_TOTAL.inc();
|
||||
|
||||
// send message to subscribers for everything
|
||||
let shared_state = self.shared_state.read();
|
||||
// Err means there is no subscribers, it is fine.
|
||||
shared_state.chan_to_all_subs.send(msg.clone()).ok();
|
||||
|
||||
// send message to per timeline subscribers, if there is ttid
|
||||
let ttid = msg.tenant_timeline_id()?;
|
||||
if let Some(ttid) = ttid {
|
||||
if let Some(subs) = shared_state.chans_to_timeline_subs.get(&ttid) {
|
||||
// Err can't happen here, as tx is destroyed only after removing
|
||||
// from the map the last subscriber along with tx.
|
||||
subs.chan
|
||||
.send(msg.clone())
|
||||
.expect("rx is still in the map with zero subscribers");
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// Private subscriber state.
|
||||
@@ -269,7 +408,7 @@ struct Subscriber {
|
||||
id: SubId,
|
||||
key: SubscriptionKey,
|
||||
// Subscriber receives messages from publishers here.
|
||||
sub_rx: broadcast::Receiver<SafekeeperTimelineInfo>,
|
||||
sub_rx: broadcast::Receiver<Message>,
|
||||
// to unregister itself from shared state in Drop
|
||||
registry: Registry,
|
||||
// for logging
|
||||
@@ -291,26 +430,9 @@ struct Publisher {
|
||||
}
|
||||
|
||||
impl Publisher {
|
||||
// Send msg to relevant subscribers.
|
||||
pub fn send_msg(&mut self, msg: &SafekeeperTimelineInfo) -> Result<(), Status> {
|
||||
// send message to subscribers for everything
|
||||
let shared_state = self.registry.shared_state.read();
|
||||
// Err means there is no subscribers, it is fine.
|
||||
shared_state.chan_to_all_subs.send(msg.clone()).ok();
|
||||
|
||||
// send message to per timeline subscribers
|
||||
let ttid =
|
||||
parse_proto_ttid(msg.tenant_timeline_id.as_ref().ok_or_else(|| {
|
||||
Status::new(Code::InvalidArgument, "missing tenant_timeline_id")
|
||||
})?)?;
|
||||
if let Some(subs) = shared_state.chans_to_timeline_subs.get(&ttid) {
|
||||
// Err can't happen here, as tx is destroyed only after removing
|
||||
// from the map the last subscriber along with tx.
|
||||
subs.chan
|
||||
.send(msg.clone())
|
||||
.expect("rx is still in the map with zero subscribers");
|
||||
}
|
||||
Ok(())
|
||||
/// Send msg to relevant subscribers.
|
||||
pub fn send_msg(&mut self, msg: &Message) -> Result<(), Status> {
|
||||
self.registry.send_msg(msg)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -339,7 +461,7 @@ impl BrokerService for Broker {
|
||||
|
||||
loop {
|
||||
match stream.next().await {
|
||||
Some(Ok(msg)) => publisher.send_msg(&msg)?,
|
||||
Some(Ok(msg)) => publisher.send_msg(&Message::SafekeeperTimelineInfo(msg))?,
|
||||
Some(Err(e)) => return Err(e), // grpc error from the stream
|
||||
None => break, // closed stream
|
||||
}
|
||||
@@ -371,8 +493,15 @@ impl BrokerService for Broker {
|
||||
let mut missed_msgs: u64 = 0;
|
||||
loop {
|
||||
match subscriber.sub_rx.recv().await {
|
||||
Ok(info) => yield info,
|
||||
Ok(info) => {
|
||||
match info {
|
||||
Message::SafekeeperTimelineInfo(info) => yield info,
|
||||
_ => {},
|
||||
}
|
||||
BROADCASTED_MESSAGES_TOTAL.inc();
|
||||
},
|
||||
Err(RecvError::Lagged(skipped_msg)) => {
|
||||
BROADCAST_DROPPED_MESSAGES_TOTAL.inc_by(skipped_msg);
|
||||
missed_msgs += skipped_msg;
|
||||
if (futures::poll!(Box::pin(warn_interval.tick()))).is_ready() {
|
||||
warn!("subscription id={}, key={:?} addr={:?} dropped {} messages, channel is full",
|
||||
@@ -392,6 +521,78 @@ impl BrokerService for Broker {
|
||||
Box::pin(output) as Self::SubscribeSafekeeperInfoStream
|
||||
))
|
||||
}
|
||||
|
||||
type SubscribeByFilterStream =
|
||||
Pin<Box<dyn Stream<Item = Result<TypedMessage, Status>> + Send + 'static>>;
|
||||
|
||||
/// Subscribe to all messages, limited by a filter.
|
||||
async fn subscribe_by_filter(
|
||||
&self,
|
||||
request: Request<SubscribeByFilterRequest>,
|
||||
) -> std::result::Result<Response<Self::SubscribeByFilterStream>, Status> {
|
||||
let remote_addr = request
|
||||
.remote_addr()
|
||||
.expect("TCPConnectInfo inserted by handler");
|
||||
let proto_filter = request.into_inner();
|
||||
let ttid_filter = proto_filter
|
||||
.tenant_timeline_id
|
||||
.as_ref()
|
||||
.ok_or_else(|| Status::new(Code::InvalidArgument, "missing tenant_timeline_id"))?;
|
||||
|
||||
let sub_key = SubscriptionKey::from_proto_filter_tenant_timeline_id(ttid_filter)?;
|
||||
let types_set = proto_filter
|
||||
.types
|
||||
.iter()
|
||||
.map(|t| t.r#type)
|
||||
.collect::<std::collections::HashSet<_>>();
|
||||
|
||||
let mut subscriber = self.registry.register_subscriber(sub_key, remote_addr);
|
||||
|
||||
// transform rx into stream with item = Result, as method result demands
|
||||
let output = async_stream::try_stream! {
|
||||
let mut warn_interval = time::interval(Duration::from_millis(1000));
|
||||
let mut missed_msgs: u64 = 0;
|
||||
loop {
|
||||
match subscriber.sub_rx.recv().await {
|
||||
Ok(msg) => {
|
||||
let msg_type = msg.message_type() as i32;
|
||||
if types_set.contains(&msg_type) {
|
||||
yield msg.as_typed_message();
|
||||
BROADCASTED_MESSAGES_TOTAL.inc();
|
||||
}
|
||||
},
|
||||
Err(RecvError::Lagged(skipped_msg)) => {
|
||||
BROADCAST_DROPPED_MESSAGES_TOTAL.inc_by(skipped_msg);
|
||||
missed_msgs += skipped_msg;
|
||||
if (futures::poll!(Box::pin(warn_interval.tick()))).is_ready() {
|
||||
warn!("subscription id={}, key={:?} addr={:?} dropped {} messages, channel is full",
|
||||
subscriber.id, subscriber.key, subscriber.remote_addr, missed_msgs);
|
||||
missed_msgs = 0;
|
||||
}
|
||||
}
|
||||
Err(RecvError::Closed) => {
|
||||
// can't happen, we never drop the channel while there is a subscriber
|
||||
Err(Status::new(Code::Internal, "channel unexpectantly closed"))?;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Ok(Response::new(
|
||||
Box::pin(output) as Self::SubscribeByFilterStream
|
||||
))
|
||||
}
|
||||
|
||||
/// Publish one message.
|
||||
async fn publish_one(
|
||||
&self,
|
||||
request: Request<TypedMessage>,
|
||||
) -> std::result::Result<Response<()>, Status> {
|
||||
let msg = Message::from(request.into_inner())?;
|
||||
PUBLISHED_ONEOFF_MESSAGES_TOTAL.inc();
|
||||
self.registry.send_msg(&msg)?;
|
||||
Ok(Response::new(()))
|
||||
}
|
||||
}
|
||||
|
||||
// We serve only metrics and healthcheck through http1.
|
||||
@@ -515,8 +716,8 @@ mod tests {
|
||||
use tokio::sync::broadcast::error::TryRecvError;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
fn msg(timeline_id: Vec<u8>) -> SafekeeperTimelineInfo {
|
||||
SafekeeperTimelineInfo {
|
||||
fn msg(timeline_id: Vec<u8>) -> Message {
|
||||
Message::SafekeeperTimelineInfo(SafekeeperTimelineInfo {
|
||||
safekeeper_id: 1,
|
||||
tenant_timeline_id: Some(ProtoTenantTimelineId {
|
||||
tenant_id: vec![0x00; 16],
|
||||
@@ -533,7 +734,7 @@ mod tests {
|
||||
http_connstr: "neon-1-sk-1.local:7677".to_owned(),
|
||||
local_start_lsn: 0,
|
||||
availability_zone: None,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn tli_from_u64(i: u64) -> Vec<u8> {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
//! Broker metrics.
|
||||
|
||||
use metrics::{register_int_gauge, IntGauge};
|
||||
use metrics::{register_int_counter, register_int_gauge, IntCounter, IntGauge};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
pub static NUM_PUBS: Lazy<IntGauge> = Lazy::new(|| {
|
||||
@@ -23,3 +23,35 @@ pub static NUM_SUBS_ALL: Lazy<IntGauge> = Lazy::new(|| {
|
||||
)
|
||||
.expect("Failed to register metric")
|
||||
});
|
||||
|
||||
pub static PROCESSED_MESSAGES_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"storage_broker_processed_messages_total",
|
||||
"Number of messages received by storage broker, before routing and broadcasting"
|
||||
)
|
||||
.expect("Failed to register metric")
|
||||
});
|
||||
|
||||
pub static BROADCASTED_MESSAGES_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"storage_broker_broadcasted_messages_total",
|
||||
"Number of messages broadcasted (sent over network) to subscribers"
|
||||
)
|
||||
.expect("Failed to register metric")
|
||||
});
|
||||
|
||||
pub static BROADCAST_DROPPED_MESSAGES_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"storage_broker_broadcast_dropped_messages_total",
|
||||
"Number of messages dropped due to channel capacity overflow"
|
||||
)
|
||||
.expect("Failed to register metric")
|
||||
});
|
||||
|
||||
pub static PUBLISHED_ONEOFF_MESSAGES_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"storage_broker_published_oneoff_messages_total",
|
||||
"Number of one-off messages sent via PublishOne method"
|
||||
)
|
||||
.expect("Failed to register metric")
|
||||
});
|
||||
|
||||
@@ -10,7 +10,7 @@ from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
# Type-related stuff
|
||||
from typing import Any, Callable, ClassVar, Dict, Iterator, Optional
|
||||
from typing import Callable, ClassVar, Dict, Iterator, Optional
|
||||
|
||||
import pytest
|
||||
from _pytest.config import Config
|
||||
@@ -20,7 +20,6 @@ from _pytest.terminal import TerminalReporter
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonPageserver
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
from fixtures.utils import humantime_to_ms
|
||||
|
||||
"""
|
||||
This file contains fixtures for micro-benchmarks.
|
||||
@@ -410,34 +409,6 @@ class NeonBenchmarker:
|
||||
report=MetricReport.LOWER_IS_BETTER,
|
||||
)
|
||||
|
||||
def record_pagebench_results(self, name: str, results: Dict[str, Any]):
|
||||
total = results["total"]
|
||||
|
||||
metric = "request_count"
|
||||
self.record(
|
||||
f"{name}.{metric}",
|
||||
total[metric],
|
||||
"",
|
||||
report=MetricReport.HIGHER_IS_BETTER,
|
||||
)
|
||||
|
||||
metric = "latency_mean"
|
||||
self.record(
|
||||
f"{name}.{metric}",
|
||||
humantime_to_ms(total[metric]),
|
||||
"ms",
|
||||
report=MetricReport.LOWER_IS_BETTER,
|
||||
)
|
||||
|
||||
metric = "latency_percentiles"
|
||||
for k, v in total[metric].items():
|
||||
self.record(
|
||||
f"{name}.{metric}.{k}",
|
||||
humantime_to_ms(v),
|
||||
"ms",
|
||||
report=MetricReport.LOWER_IS_BETTER,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def zenbenchmark(record_property: Callable[[str, object], None]) -> Iterator[NeonBenchmarker]:
|
||||
|
||||
@@ -457,7 +457,6 @@ class NeonEnvBuilder:
|
||||
self.preserve_database_files = preserve_database_files
|
||||
self.initial_tenant = initial_tenant or TenantId.generate()
|
||||
self.initial_timeline = initial_timeline or TimelineId.generate()
|
||||
self.enable_generations = True
|
||||
self.scrub_on_exit = False
|
||||
self.test_output_dir = test_output_dir
|
||||
|
||||
@@ -677,8 +676,7 @@ class NeonEnvBuilder:
|
||||
|
||||
pageserver.stop(immediate=True)
|
||||
|
||||
if self.env.attachment_service is not None:
|
||||
self.env.attachment_service.stop(immediate=True)
|
||||
self.env.attachment_service.stop(immediate=True)
|
||||
|
||||
cleanup_error = None
|
||||
|
||||
@@ -772,10 +770,9 @@ class NeonEnv:
|
||||
self.initial_tenant = config.initial_tenant
|
||||
self.initial_timeline = config.initial_timeline
|
||||
|
||||
self.control_plane_api: Optional[str] = None
|
||||
self.attachment_service: Optional[NeonAttachmentService] = None
|
||||
if config.enable_generations:
|
||||
self.enable_generations()
|
||||
attachment_service_port = self.port_distributor.get_port()
|
||||
self.control_plane_api: str = f"http://127.0.0.1:{attachment_service_port}"
|
||||
self.attachment_service: NeonAttachmentService = NeonAttachmentService(self)
|
||||
|
||||
# Create a config file corresponding to the options
|
||||
cfg: Dict[str, Any] = {
|
||||
@@ -844,24 +841,11 @@ class NeonEnv:
|
||||
log.info(f"Config: {cfg}")
|
||||
self.neon_cli.init(cfg)
|
||||
|
||||
def enable_generations(self, start=False):
|
||||
if not start:
|
||||
# TODO: assert that we haven't `self.start()`ed yet
|
||||
pass
|
||||
assert self.control_plane_api is None
|
||||
assert self.attachment_service is None
|
||||
attachment_service_port = self.port_distributor.get_port()
|
||||
self.control_plane_api = f"http://127.0.0.1:{attachment_service_port}"
|
||||
self.attachment_service = NeonAttachmentService(self)
|
||||
if start:
|
||||
self.attachment_service.start()
|
||||
|
||||
def start(self):
|
||||
# Start up broker, pageserver and all safekeepers
|
||||
self.broker.try_start()
|
||||
|
||||
if self.attachment_service is not None:
|
||||
self.attachment_service.start()
|
||||
self.attachment_service.start()
|
||||
|
||||
for pageserver in self.pageservers:
|
||||
pageserver.start()
|
||||
@@ -1589,16 +1573,6 @@ class Pagectl(AbstractNeonCli):
|
||||
parsed = json.loads(res.stdout)
|
||||
return IndexPartDump.from_json(parsed)
|
||||
|
||||
# class GetpageBenchLibpq(AbstractNeonCli):
|
||||
# """
|
||||
# A typed wrapper around the `getpage_bench_libpq` CLI.
|
||||
# """
|
||||
#
|
||||
# COMMAND = "getpage_bench_libpq"
|
||||
#
|
||||
# def run(self):
|
||||
# pass
|
||||
|
||||
|
||||
class NeonAttachmentService:
|
||||
def __init__(self, env: NeonEnv):
|
||||
@@ -1853,20 +1827,19 @@ class NeonPageserver(PgProtocol):
|
||||
"""
|
||||
client = self.http_client()
|
||||
return client.tenant_attach(
|
||||
tenant_id, config, config_null, generation=self.maybe_get_generation(tenant_id)
|
||||
tenant_id,
|
||||
config,
|
||||
config_null,
|
||||
generation=self.env.attachment_service.attach_hook_issue(tenant_id, self.id),
|
||||
)
|
||||
|
||||
def tenant_detach(self, tenant_id: TenantId):
|
||||
if self.env.attachment_service is not None:
|
||||
self.env.attachment_service.attach_hook_drop(tenant_id)
|
||||
self.env.attachment_service.attach_hook_drop(tenant_id)
|
||||
|
||||
client = self.http_client()
|
||||
return client.tenant_detach(tenant_id)
|
||||
|
||||
def tenant_location_configure(self, tenant_id: TenantId, config: dict[str, Any], **kwargs):
|
||||
# This API is only for use when generations are enabled
|
||||
assert self.env.attachment_service is not None
|
||||
|
||||
if config["mode"].startswith("Attached") and "generation" not in config:
|
||||
config["generation"] = self.env.attachment_service.attach_hook_issue(tenant_id, self.id)
|
||||
|
||||
@@ -1892,26 +1865,15 @@ class NeonPageserver(PgProtocol):
|
||||
generation: Optional[int] = None,
|
||||
) -> TenantId:
|
||||
if generation is None:
|
||||
generation = self.maybe_get_generation(tenant_id)
|
||||
generation = self.env.attachment_service.attach_hook_issue(tenant_id, self.id)
|
||||
client = self.http_client(auth_token=auth_token)
|
||||
return client.tenant_create(tenant_id, conf, generation=generation)
|
||||
|
||||
def tenant_load(self, tenant_id: TenantId):
|
||||
client = self.http_client()
|
||||
return client.tenant_load(tenant_id, generation=self.maybe_get_generation(tenant_id))
|
||||
|
||||
def maybe_get_generation(self, tenant_id: TenantId):
|
||||
"""
|
||||
For tests that would like to use an HTTP client directly instead of using
|
||||
the `tenant_attach` and `tenant_create` helpers here: issue a generation
|
||||
number for a tenant.
|
||||
|
||||
Returns None if the attachment service is not enabled (legacy mode)
|
||||
"""
|
||||
if self.env.attachment_service is not None:
|
||||
return self.env.attachment_service.attach_hook_issue(tenant_id, self.id)
|
||||
else:
|
||||
return None
|
||||
return client.tenant_load(
|
||||
tenant_id, generation=self.env.attachment_service.attach_hook_issue(tenant_id, self.id)
|
||||
)
|
||||
|
||||
|
||||
def append_pageserver_param_overrides(
|
||||
|
||||
@@ -510,13 +510,21 @@ class PageserverHttpClient(requests.Session):
|
||||
assert res_json is None
|
||||
|
||||
def timeline_get_lsn_by_timestamp(
|
||||
self, tenant_id: TenantId, timeline_id: TimelineId, timestamp, version: int
|
||||
self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
timestamp,
|
||||
version: Optional[int] = None,
|
||||
):
|
||||
log.info(
|
||||
f"Requesting lsn by timestamp {timestamp}, tenant {tenant_id}, timeline {timeline_id}"
|
||||
)
|
||||
if version is None:
|
||||
version_str = ""
|
||||
else:
|
||||
version_str = f"&version={version}"
|
||||
res = self.get(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp?timestamp={timestamp}&version={version}",
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp?timestamp={timestamp}{version_str}",
|
||||
)
|
||||
self.verbose_error(res)
|
||||
res_json = res.json()
|
||||
|
||||
@@ -125,3 +125,51 @@ class TenantId(Id):
|
||||
class TimelineId(Id):
|
||||
def __repr__(self) -> str:
|
||||
return f'TimelineId("{self.id.hex()}")'
|
||||
|
||||
|
||||
# Workaround for compat with python 3.9, which does not have `typing.Self`
|
||||
TTenantShardId = TypeVar("TTenantShardId", bound="TenantShardId")
|
||||
|
||||
|
||||
class TenantShardId:
|
||||
def __init__(self, tenant_id: TenantId, shard_number: int, shard_count: int):
|
||||
self.tenant_id = tenant_id
|
||||
self.shard_number = shard_number
|
||||
self.shard_count = shard_count
|
||||
assert self.shard_number < self.shard_count or self.shard_count == 0
|
||||
|
||||
@classmethod
|
||||
def parse(cls: Type[TTenantShardId], input) -> TTenantShardId:
|
||||
if len(input) == 32:
|
||||
return cls(
|
||||
tenant_id=TenantId(input),
|
||||
shard_number=0,
|
||||
shard_count=0,
|
||||
)
|
||||
elif len(input) == 37:
|
||||
return cls(
|
||||
tenant_id=TenantId(input[0:32]),
|
||||
shard_number=int(input[33:35], 16),
|
||||
shard_count=int(input[35:37], 16),
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"Invalid TenantShardId '{input}'")
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.tenant_id}-{self.shard_number:02x}{self.shard_count:02x}"
|
||||
|
||||
def _tuple(self) -> tuple[TenantId, int, int]:
|
||||
return (self.tenant_id, self.shard_number, self.shard_count)
|
||||
|
||||
def __lt__(self, other) -> bool:
|
||||
if not isinstance(other, type(self)):
|
||||
return NotImplemented
|
||||
return self._tuple() < other._tuple()
|
||||
|
||||
def __eq__(self, other) -> bool:
|
||||
if not isinstance(other, type(self)):
|
||||
return NotImplemented
|
||||
return self._tuple() == other._tuple()
|
||||
|
||||
def __hash__(self) -> int:
|
||||
return hash(self._tuple())
|
||||
|
||||
@@ -397,36 +397,3 @@ def run_pg_bench_small(pg_bin: "PgBin", connstr: str):
|
||||
}
|
||||
"""
|
||||
pg_bin.run(["pgbench", "-i", "-I dtGvp", "-s1", connstr])
|
||||
|
||||
|
||||
def humantime_to_ms(humantime: str) -> float:
|
||||
"""
|
||||
Converts Rust humantime's output string to milliseconds.
|
||||
|
||||
humantime_to_ms("1h 1ms 406us") -> 3600001.406
|
||||
"""
|
||||
|
||||
unit_multiplier_map = {
|
||||
"ns": 1e-6,
|
||||
"us": 1e-3,
|
||||
"ms": 1,
|
||||
"s": 1e3,
|
||||
"m": 1e3 * 60,
|
||||
"h": 1e3 * 60 * 60,
|
||||
}
|
||||
matcher = re.compile(rf"^(\d+)({'|'.join(unit_multiplier_map.keys())})$")
|
||||
total_ms = 0.0
|
||||
|
||||
if humantime == "0":
|
||||
return total_ms
|
||||
|
||||
for item in humantime.split():
|
||||
if (match := matcher.search(item)) is not None:
|
||||
n, unit = match.groups()
|
||||
total_ms += int(n) * unit_multiplier_map[unit]
|
||||
else:
|
||||
raise ValueError(
|
||||
f"can't parse '{item}' (from string '{humantime}'), known units are {', '.join(unit_multiplier_map.keys())}."
|
||||
)
|
||||
|
||||
return round(total_ms, 3)
|
||||
|
||||
@@ -61,7 +61,6 @@ def measure_recovery_time(env: NeonCompare):
|
||||
# of view, but the same as far as the safekeeper/WAL is concerned. To work around that,
|
||||
# we will explicitly create the tenant in the same generation that it was previously
|
||||
# attached in.
|
||||
assert env.env.attachment_service is not None
|
||||
attach_status = env.env.attachment_service.inspect(tenant_id=env.tenant)
|
||||
assert attach_status is not None
|
||||
(attach_gen, _) = attach_status
|
||||
|
||||
@@ -1,177 +0,0 @@
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import List, Tuple
|
||||
|
||||
import pytest
|
||||
from fixtures.benchmark_fixture import NeonBenchmarker
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, PgBin, last_flush_lsn_upload
|
||||
from fixtures.pageserver.utils import wait_until_tenant_active
|
||||
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def snapshotting_env(
|
||||
neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, test_output_dir: Path
|
||||
) -> Tuple[NeonEnv, TimelineId, List[TenantId]]:
|
||||
"""
|
||||
The fixture prepares environment or restores it from a snapshot.
|
||||
|
||||
The logic is the following:
|
||||
- if the snapshot directory exists, the snapshot is restored from it
|
||||
- if there is no snapshot, the environment is initialized from scratch and stored in a snapshot
|
||||
- if the fixture is executed on CI (it has CI=true in the environment), the snapshot is not saved
|
||||
"""
|
||||
|
||||
snapshot_dir = test_output_dir.parent / f"snapshot-{test_output_dir.name}"
|
||||
save_snapshot = os.getenv("CI", "false") != "true"
|
||||
|
||||
neon_env_builder.enable_generations = True
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
|
||||
# create our template tenant
|
||||
tenant_config_mgmt_api = {
|
||||
"gc_period": "0s",
|
||||
"checkpoint_timeout": "10 years",
|
||||
"compaction_period": "20 s",
|
||||
"compaction_threshold": 10,
|
||||
"compaction_target_size": 134217728,
|
||||
"checkpoint_distance": 268435456,
|
||||
"image_creation_threshold": 3,
|
||||
}
|
||||
|
||||
if snapshot_dir.exists():
|
||||
env = neon_env_builder.from_repo_dir(snapshot_dir)
|
||||
ps_http = env.pageserver.http_client()
|
||||
tenants = list({TenantId(t.name) for t in (snapshot_dir.glob("pageserver_*/tenants/*"))})
|
||||
template_timeline = env.initial_timeline
|
||||
|
||||
env.broker.try_start()
|
||||
|
||||
assert env.attachment_service is not None
|
||||
env.attachment_service.start()
|
||||
|
||||
# Wait for the attachment service to start
|
||||
time.sleep(5)
|
||||
|
||||
for tenant in tenants:
|
||||
env.attachment_service.attach_hook_issue(tenant, 1)
|
||||
|
||||
env.pageserver.start()
|
||||
else:
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
remote_storage = env.pageserver_remote_storage
|
||||
assert isinstance(remote_storage, LocalFsStorage)
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
# clean up the useless default tenant
|
||||
ps_http.tenant_delete(env.initial_tenant)
|
||||
|
||||
tenant_config_cli = {k: str(v) for k, v in tenant_config_mgmt_api.items()}
|
||||
|
||||
template_tenant, template_timeline = env.neon_cli.create_tenant(
|
||||
conf=tenant_config_cli, set_default=True
|
||||
)
|
||||
template_tenant_gen = int(ps_http.tenant_status(template_tenant)["generation"])
|
||||
with env.endpoints.create_start("main", tenant_id=template_tenant) as ep:
|
||||
pg_bin.run_capture(["pgbench", "-i", "-s50", ep.connstr()])
|
||||
last_flush_lsn_upload(env, ep, template_tenant, template_timeline)
|
||||
ps_http.tenant_detach(template_tenant)
|
||||
|
||||
# stop PS just for good measure
|
||||
env.pageserver.stop()
|
||||
|
||||
# duplicate the tenant in remote storage
|
||||
src_timelines_dir: Path = remote_storage.tenant_path(template_tenant) / "timelines"
|
||||
assert src_timelines_dir.is_dir(), f"{src_timelines_dir} is not a directory"
|
||||
tenants = [template_tenant]
|
||||
for i in range(0, 200):
|
||||
new_tenant = TenantId.generate()
|
||||
tenants.append(new_tenant)
|
||||
log.info("Duplicating tenant #%s: %s", i, new_tenant)
|
||||
|
||||
dst_timelines_dir: Path = remote_storage.tenant_path(new_tenant) / "timelines"
|
||||
dst_timelines_dir.parent.mkdir(parents=False, exist_ok=False)
|
||||
dst_timelines_dir.mkdir(parents=False, exist_ok=False)
|
||||
|
||||
for tl in src_timelines_dir.iterdir():
|
||||
src_tl_dir = src_timelines_dir / tl.name
|
||||
assert src_tl_dir.is_dir(), f"{src_tl_dir} is not a directory"
|
||||
dst_tl_dir = dst_timelines_dir / tl.name
|
||||
dst_tl_dir.mkdir(parents=False, exist_ok=False)
|
||||
for file in tl.iterdir():
|
||||
shutil.copy2(file, dst_tl_dir)
|
||||
if "__" in file.name:
|
||||
cmd: List[str] = [
|
||||
str(
|
||||
env.neon_binpath / "pagectl"
|
||||
), # TODO: abstract this like the other binaries
|
||||
"layer",
|
||||
"rewrite-summary",
|
||||
str(dst_tl_dir / file.name),
|
||||
"--new-tenant-id",
|
||||
str(new_tenant),
|
||||
]
|
||||
subprocess.run(cmd, check=True)
|
||||
else:
|
||||
# index_part etc need no patching
|
||||
pass
|
||||
|
||||
env.pageserver.start()
|
||||
assert ps_http.tenant_list() == []
|
||||
for tenant in tenants:
|
||||
ps_http.tenant_attach(
|
||||
tenant, config=tenant_config_mgmt_api, generation=template_tenant_gen + 1
|
||||
)
|
||||
|
||||
if save_snapshot and not snapshot_dir.exists():
|
||||
shutil.copytree(env.repo_dir, snapshot_dir)
|
||||
|
||||
for tenant in tenants:
|
||||
wait_until_tenant_active(ps_http, tenant)
|
||||
|
||||
# ensure all layers are resident for predictiable performance
|
||||
# TODO: ensure all kinds of eviction are disabled (per-tenant, disk-usage-based)
|
||||
for tenant in tenants:
|
||||
ps_http.download_all_layers(tenant, template_timeline)
|
||||
|
||||
return env, template_timeline, tenants
|
||||
|
||||
|
||||
def test_getpage_throughput(
|
||||
snapshotting_env: Tuple[NeonEnv, TimelineId, List[TenantId]],
|
||||
zenbenchmark: NeonBenchmarker,
|
||||
pg_bin: PgBin,
|
||||
):
|
||||
env, template_timeline, tenants = snapshotting_env
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
# run the benchmark with one client per timeline, each doing 10k requests to random keys.
|
||||
cmd = [
|
||||
str(env.neon_binpath / "pagebench"),
|
||||
"get-page-latest-lsn",
|
||||
"--mgmt-api-endpoint",
|
||||
ps_http.base_url,
|
||||
"--page-service-connstring",
|
||||
env.pageserver.connstr(password=None),
|
||||
"--runtime",
|
||||
"10s",
|
||||
*[f"{tenant}/{template_timeline}" for tenant in tenants],
|
||||
]
|
||||
log.info(f"command: {' '.join(cmd)}")
|
||||
basepath = pg_bin.run_capture(cmd, with_command_header=False)
|
||||
results_path = Path(basepath + ".stdout")
|
||||
log.info(f"Benchmark results at: {results_path}")
|
||||
|
||||
with open(results_path, "r") as f:
|
||||
results = json.load(f)
|
||||
|
||||
log.info(f"Results:\n{json.dumps(results, sort_keys=True, indent=2)}")
|
||||
|
||||
zenbenchmark.record_pagebench_results("get-page-latest-lsn", results)
|
||||
@@ -17,6 +17,27 @@ class LabelledQuery:
|
||||
query: str
|
||||
|
||||
|
||||
# This must run before all tests in this module
|
||||
# create extension pg_stat_statements if it does not exist
|
||||
# and TEST_OLAP_COLLECT_PG_STAT_STATEMENTS is set to true (default false)
|
||||
# Theoretically this could be in a module or session scope fixture,
|
||||
# however the code depends on other fixtures that have function scope
|
||||
@pytest.mark.skipif(
|
||||
os.getenv("TEST_OLAP_COLLECT_PG_STAT_STATEMENTS", "false").lower() == "false",
|
||||
reason="Skipping - Creating extension pg_stat_statements",
|
||||
)
|
||||
@pytest.mark.remote_cluster
|
||||
def test_clickbench_create_pg_stat_statements(remote_compare: RemoteCompare):
|
||||
log.info("Creating extension pg_stat_statements")
|
||||
query = LabelledQuery(
|
||||
"Q_CREATE_EXTENSION", r"CREATE EXTENSION IF NOT EXISTS pg_stat_statements;"
|
||||
)
|
||||
run_psql(remote_compare, query, times=1, explain=False)
|
||||
log.info("Reset pg_stat_statements")
|
||||
query = LabelledQuery("Q_RESET", r"SELECT pg_stat_statements_reset();")
|
||||
run_psql(remote_compare, query, times=1, explain=False)
|
||||
|
||||
|
||||
# A list of queries to run.
|
||||
# Please do not alter the label for the query, as it is used to identify it.
|
||||
# Labels for ClickBench queries match the labels in ClickBench reports
|
||||
@@ -78,6 +99,8 @@ QUERIES: Tuple[LabelledQuery, ...] = (
|
||||
# fmt: on
|
||||
)
|
||||
|
||||
EXPLAIN_STRING: str = "EXPLAIN (ANALYZE, VERBOSE, BUFFERS, COSTS, SETTINGS, FORMAT JSON)"
|
||||
|
||||
|
||||
def get_scale() -> List[str]:
|
||||
# We parametrize each tpc-h and clickbench test with scale
|
||||
@@ -88,7 +111,10 @@ def get_scale() -> List[str]:
|
||||
return [scale]
|
||||
|
||||
|
||||
def run_psql(env: RemoteCompare, labelled_query: LabelledQuery, times: int) -> None:
|
||||
# run the query times times plus once with EXPLAIN VERBOSE if explain is requestd
|
||||
def run_psql(
|
||||
env: RemoteCompare, labelled_query: LabelledQuery, times: int, explain: bool = False
|
||||
) -> None:
|
||||
# prepare connstr:
|
||||
# - cut out password from connstr to pass it via env
|
||||
# - add options to connstr
|
||||
@@ -108,6 +134,13 @@ def run_psql(env: RemoteCompare, labelled_query: LabelledQuery, times: int) -> N
|
||||
log.info(f"Run {run}/{times}")
|
||||
with env.zenbenchmark.record_duration(f"{label}/{run}"):
|
||||
env.pg_bin.run_capture(["psql", connstr, "-c", query], env=environ)
|
||||
if explain:
|
||||
log.info(f"Explaining query {label}")
|
||||
run += 1
|
||||
with env.zenbenchmark.record_duration(f"{label}/EXPLAIN"):
|
||||
env.pg_bin.run_capture(
|
||||
["psql", connstr, "-c", f"{EXPLAIN_STRING} {query}"], env=environ
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("scale", get_scale())
|
||||
@@ -120,8 +153,9 @@ def test_clickbench(query: LabelledQuery, remote_compare: RemoteCompare, scale:
|
||||
Based on https://github.com/ClickHouse/ClickBench/tree/c00135ca5b6a0d86fedcdbf998fdaa8ed85c1c3b/aurora-postgresql
|
||||
The DB prepared manually in advance
|
||||
"""
|
||||
explain: bool = os.getenv("TEST_OLAP_COLLECT_EXPLAIN", "false").lower() == "true"
|
||||
|
||||
run_psql(remote_compare, query, times=3)
|
||||
run_psql(remote_compare, query, times=3, explain=explain)
|
||||
|
||||
|
||||
def tpch_queuies() -> Tuple[ParameterSet, ...]:
|
||||
@@ -195,3 +229,16 @@ def test_user_examples(remote_compare: RemoteCompare):
|
||||
""",
|
||||
)
|
||||
run_psql(remote_compare, query, times=3)
|
||||
|
||||
|
||||
# This must run after all tests in this module
|
||||
# Collect pg_stat_statements after running the tests if TEST_OLAP_COLLECT_PG_STAT_STATEMENTS is set to true (default false)
|
||||
@pytest.mark.skipif(
|
||||
os.getenv("TEST_OLAP_COLLECT_PG_STAT_STATEMENTS", "false").lower() == "false",
|
||||
reason="Skipping - Collecting pg_stat_statements",
|
||||
)
|
||||
@pytest.mark.remote_cluster
|
||||
def test_clickbench_collect_pg_stat_statements(remote_compare: RemoteCompare):
|
||||
log.info("Collecting pg_stat_statements")
|
||||
query = LabelledQuery("Q_COLLECT_PG_STAT_STATEMENTS", r"SELECT * from pg_stat_statements;")
|
||||
run_psql(remote_compare, query, times=1, explain=False)
|
||||
|
||||
@@ -136,10 +136,7 @@ def test_no_config(positive_env: NeonEnv, content_type: Optional[str]):
|
||||
ps_http.tenant_detach(tenant_id)
|
||||
assert tenant_id not in [TenantId(t["id"]) for t in ps_http.tenant_list()]
|
||||
|
||||
body = {}
|
||||
gen = env.pageserver.maybe_get_generation(tenant_id)
|
||||
if gen is not None:
|
||||
body["generation"] = gen
|
||||
body = {"generation": env.attachment_service.attach_hook_issue(tenant_id, env.pageserver.id)}
|
||||
|
||||
ps_http.post(
|
||||
f"{ps_http.base_url}/v1/tenant/{tenant_id}/attach",
|
||||
|
||||
@@ -87,7 +87,6 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
|
||||
#
|
||||
# Since we're dual-attached, need to tip-off attachment service to treat the one we're
|
||||
# about to start as the attached pageserver
|
||||
assert env.attachment_service is not None
|
||||
env.attachment_service.attach_hook_issue(env.initial_tenant, env.pageservers[0].id)
|
||||
env.pageservers[0].start()
|
||||
env.pageservers[1].stop()
|
||||
|
||||
@@ -157,7 +157,6 @@ def test_issue_5878(neon_env_builder: NeonEnvBuilder):
|
||||
time.sleep(1.1) # so that we can use change in pre_stat.st_mtime to detect overwrites
|
||||
|
||||
def get_generation_number():
|
||||
assert env.attachment_service is not None
|
||||
attachment = env.attachment_service.inspect(tenant_id)
|
||||
assert attachment is not None
|
||||
return attachment[0]
|
||||
|
||||
@@ -8,71 +8,6 @@ from fixtures.types import Lsn
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
|
||||
#
|
||||
# Test pageserver get_lsn_by_timestamp API
|
||||
#
|
||||
def test_lsn_mapping_old(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
new_timeline_id = env.neon_cli.create_branch("test_lsn_mapping")
|
||||
endpoint_main = env.endpoints.create_start("test_lsn_mapping")
|
||||
log.info("postgres is running on 'test_lsn_mapping' branch")
|
||||
|
||||
cur = endpoint_main.connect().cursor()
|
||||
# Create table, and insert rows, each in a separate transaction
|
||||
# Disable synchronous_commit to make this initialization go faster.
|
||||
#
|
||||
# Each row contains current insert LSN and the current timestamp, when
|
||||
# the row was inserted.
|
||||
cur.execute("SET synchronous_commit=off")
|
||||
cur.execute("CREATE TABLE foo (x integer)")
|
||||
tbl = []
|
||||
for i in range(1000):
|
||||
cur.execute("INSERT INTO foo VALUES(%s)", (i,))
|
||||
# Get the timestamp at UTC
|
||||
after_timestamp = query_scalar(cur, "SELECT clock_timestamp()").replace(tzinfo=None)
|
||||
tbl.append([i, after_timestamp])
|
||||
|
||||
# Execute one more transaction with synchronous_commit enabled, to flush
|
||||
# all the previous transactions
|
||||
cur.execute("SET synchronous_commit=on")
|
||||
cur.execute("INSERT INTO foo VALUES (-1)")
|
||||
|
||||
# Wait until WAL is received by pageserver
|
||||
wait_for_last_flush_lsn(env, endpoint_main, env.initial_tenant, new_timeline_id)
|
||||
|
||||
with env.pageserver.http_client() as client:
|
||||
# Check edge cases: timestamp in the future
|
||||
probe_timestamp = tbl[-1][1] + timedelta(hours=1)
|
||||
result = client.timeline_get_lsn_by_timestamp(
|
||||
env.initial_tenant, new_timeline_id, f"{probe_timestamp.isoformat()}Z", 1
|
||||
)
|
||||
assert result == "future"
|
||||
|
||||
# timestamp too the far history
|
||||
probe_timestamp = tbl[0][1] - timedelta(hours=10)
|
||||
result = client.timeline_get_lsn_by_timestamp(
|
||||
env.initial_tenant, new_timeline_id, f"{probe_timestamp.isoformat()}Z", 1
|
||||
)
|
||||
assert result == "past"
|
||||
|
||||
# Probe a bunch of timestamps in the valid range
|
||||
for i in range(1, len(tbl), 100):
|
||||
probe_timestamp = tbl[i][1]
|
||||
lsn = client.timeline_get_lsn_by_timestamp(
|
||||
env.initial_tenant, new_timeline_id, f"{probe_timestamp.isoformat()}Z", 1
|
||||
)
|
||||
# Call get_lsn_by_timestamp to get the LSN
|
||||
# Launch a new read-only node at that LSN, and check that only the rows
|
||||
# that were supposed to be committed at that point in time are visible.
|
||||
endpoint_here = env.endpoints.create_start(
|
||||
branch_name="test_lsn_mapping", endpoint_id="ep-lsn_mapping_read", lsn=lsn
|
||||
)
|
||||
assert endpoint_here.safe_psql("SELECT max(x) FROM foo")[0][0] == i
|
||||
|
||||
endpoint_here.stop_and_destroy()
|
||||
|
||||
|
||||
#
|
||||
# Test pageserver get_lsn_by_timestamp API
|
||||
#
|
||||
@@ -130,7 +65,7 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
|
||||
# Timestamp is in the future
|
||||
probe_timestamp = tbl[-1][1] + timedelta(hours=1)
|
||||
result = client.timeline_get_lsn_by_timestamp(
|
||||
tenant_id, timeline_id, f"{probe_timestamp.isoformat()}Z", 2
|
||||
tenant_id, timeline_id, f"{probe_timestamp.isoformat()}Z"
|
||||
)
|
||||
assert result["kind"] == "future"
|
||||
# make sure that we return a well advanced lsn here
|
||||
@@ -139,7 +74,7 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
|
||||
# Timestamp is in the unreachable past
|
||||
probe_timestamp = tbl[0][1] - timedelta(hours=10)
|
||||
result = client.timeline_get_lsn_by_timestamp(
|
||||
tenant_id, timeline_id, f"{probe_timestamp.isoformat()}Z", 2
|
||||
tenant_id, timeline_id, f"{probe_timestamp.isoformat()}Z"
|
||||
)
|
||||
assert result["kind"] == "past"
|
||||
# make sure that we return the minimum lsn here at the start of the range
|
||||
@@ -149,7 +84,7 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
|
||||
for i in range(1, len(tbl), 100):
|
||||
probe_timestamp = tbl[i][1]
|
||||
result = client.timeline_get_lsn_by_timestamp(
|
||||
tenant_id, timeline_id, f"{probe_timestamp.isoformat()}Z", 2
|
||||
tenant_id, timeline_id, f"{probe_timestamp.isoformat()}Z"
|
||||
)
|
||||
assert result["kind"] not in ["past", "nodata"]
|
||||
lsn = result["lsn"]
|
||||
|
||||
@@ -72,7 +72,9 @@ def check_client(env: NeonEnv, client: PageserverHttpClient):
|
||||
|
||||
# create new tenant and check it is also there
|
||||
tenant_id = TenantId.generate()
|
||||
client.tenant_create(tenant_id, generation=env.pageserver.maybe_get_generation(tenant_id))
|
||||
client.tenant_create(
|
||||
tenant_id, generation=env.attachment_service.attach_hook_issue(tenant_id, env.pageserver.id)
|
||||
)
|
||||
assert tenant_id in {TenantId(t["id"]) for t in client.tenant_list()}
|
||||
|
||||
timelines = client.timeline_list(tenant_id)
|
||||
|
||||
@@ -187,7 +187,6 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
|
||||
- After upgrade, the bucket should contain a mixture.
|
||||
- In both cases, postgres I/O should work.
|
||||
"""
|
||||
neon_env_builder.enable_generations = True
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
RemoteStorageKind.MOCK_S3,
|
||||
)
|
||||
@@ -196,7 +195,6 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
|
||||
env.broker.try_start()
|
||||
for sk in env.safekeepers:
|
||||
sk.start()
|
||||
assert env.attachment_service is not None
|
||||
env.attachment_service.start()
|
||||
|
||||
env.pageserver.start(overrides=('--pageserver-config-override=control_plane_api=""',))
|
||||
@@ -262,12 +260,10 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
|
||||
def test_deferred_deletion(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.enable_generations = True
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
RemoteStorageKind.MOCK_S3,
|
||||
)
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
|
||||
assert env.attachment_service is not None
|
||||
|
||||
some_other_pageserver = 1234
|
||||
ps_http = env.pageserver.http_client()
|
||||
@@ -341,7 +337,6 @@ def test_deletion_queue_recovery(
|
||||
:param validate_before: whether to wait for deletions to be validated before restart. This
|
||||
makes them elegible to be executed after restart, if the same node keeps the attachment.
|
||||
"""
|
||||
neon_env_builder.enable_generations = True
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
RemoteStorageKind.MOCK_S3,
|
||||
)
|
||||
@@ -405,7 +400,6 @@ def test_deletion_queue_recovery(
|
||||
|
||||
if keep_attachment == KeepAttachment.LOSE:
|
||||
some_other_pageserver = 101010
|
||||
assert env.attachment_service is not None
|
||||
env.attachment_service.attach_hook_issue(env.initial_tenant, some_other_pageserver)
|
||||
|
||||
env.pageserver.start()
|
||||
@@ -453,7 +447,6 @@ def test_deletion_queue_recovery(
|
||||
|
||||
|
||||
def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
neon_env_builder.enable_generations = True
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
RemoteStorageKind.MOCK_S3,
|
||||
)
|
||||
@@ -473,7 +466,6 @@ def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
)
|
||||
|
||||
# Simulate a major incident: the control plane goes offline
|
||||
assert env.attachment_service is not None
|
||||
env.attachment_service.stop()
|
||||
|
||||
# Remember how many validations had happened before the control plane went offline
|
||||
@@ -545,7 +537,6 @@ def test_eviction_across_generations(neon_env_builder: NeonEnvBuilder):
|
||||
and must be constructed using the proper generation for the layer, which may not be the same generation
|
||||
that the tenant is running in.
|
||||
"""
|
||||
neon_env_builder.enable_generations = True
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
RemoteStorageKind.MOCK_S3,
|
||||
)
|
||||
@@ -575,7 +566,6 @@ def test_multi_attach(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
pg_bin: PgBin,
|
||||
):
|
||||
neon_env_builder.enable_generations = True
|
||||
neon_env_builder.num_pageservers = 3
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
remote_storage_kind=RemoteStorageKind.MOCK_S3,
|
||||
|
||||
@@ -9,9 +9,7 @@ from fixtures.utils import wait_until
|
||||
|
||||
# Test restarting page server, while safekeeper and compute node keep
|
||||
# running.
|
||||
@pytest.mark.parametrize("generations", [True, False])
|
||||
def test_pageserver_restart(neon_env_builder: NeonEnvBuilder, generations: bool):
|
||||
neon_env_builder.enable_generations = generations
|
||||
def test_pageserver_restart(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
|
||||
neon_env_builder.enable_scrub_on_exit()
|
||||
|
||||
|
||||
@@ -57,13 +57,11 @@ def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, seed: int):
|
||||
states are valid, so that we may test it in this way: the API should always
|
||||
work as long as the tenant exists.
|
||||
"""
|
||||
neon_env_builder.enable_generations = True
|
||||
neon_env_builder.num_pageservers = 3
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
remote_storage_kind=RemoteStorageKind.MOCK_S3,
|
||||
)
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
|
||||
assert env.attachment_service is not None
|
||||
|
||||
pageservers = env.pageservers
|
||||
list([p.http_client() for p in pageservers])
|
||||
@@ -210,13 +208,11 @@ def test_live_migration(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Test the sequence of location states that are used in a live migration.
|
||||
"""
|
||||
neon_env_builder.enable_generations = True
|
||||
neon_env_builder.num_pageservers = 2
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
remote_storage_kind=RemoteStorageKind.MOCK_S3,
|
||||
)
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
|
||||
assert env.attachment_service is not None
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
@@ -60,8 +60,6 @@ def test_remote_storage_backup_and_restore(
|
||||
|
||||
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
|
||||
|
||||
neon_env_builder.enable_generations = generations
|
||||
|
||||
# Exercise retry code path by making all uploads and downloads fail for the
|
||||
# first time. The retries print INFO-messages to the log; we will check
|
||||
# that they are present after the test.
|
||||
|
||||
@@ -263,15 +263,6 @@ def test_delete_timeline_exercise_crash_safety_failpoints(
|
||||
ps_http, env.initial_tenant, timeline_id, iterations=iterations
|
||||
)
|
||||
|
||||
if failpoint == "timeline-delete-after-index-delete":
|
||||
m = ps_http.get_metrics()
|
||||
assert (
|
||||
m.query_one(
|
||||
"remote_storage_s3_request_seconds_count",
|
||||
filter={"request_type": "get_object", "result": "ok"},
|
||||
).value
|
||||
== 1 # index part for initial timeline
|
||||
)
|
||||
elif check is Check.RETRY_WITHOUT_RESTART:
|
||||
# this should succeed
|
||||
# this also checks that delete can be retried even when timeline is in Broken state
|
||||
|
||||
@@ -56,7 +56,7 @@ regex = { version = "1" }
|
||||
regex-automata = { version = "0.4", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] }
|
||||
regex-syntax = { version = "0.8" }
|
||||
reqwest = { version = "0.11", default-features = false, features = ["blocking", "default-tls", "json", "multipart", "rustls-tls", "stream"] }
|
||||
ring = { version = "0.16", features = ["std"] }
|
||||
ring = { version = "0.16" }
|
||||
rustls = { version = "0.21", features = ["dangerous_configuration"] }
|
||||
scopeguard = { version = "1" }
|
||||
serde = { version = "1", features = ["alloc", "derive"] }
|
||||
@@ -75,8 +75,8 @@ tracing-core = { version = "0.1" }
|
||||
tungstenite = { version = "0.20" }
|
||||
url = { version = "2", features = ["serde"] }
|
||||
uuid = { version = "1", features = ["serde", "v4"] }
|
||||
zstd = { version = "0.12" }
|
||||
zstd-safe = { version = "6", default-features = false, features = ["arrays", "legacy", "std", "zdict_builder"] }
|
||||
zstd = { version = "0.13" }
|
||||
zstd-safe = { version = "7", default-features = false, features = ["arrays", "legacy", "std", "zdict_builder"] }
|
||||
zstd-sys = { version = "2", default-features = false, features = ["legacy", "std", "zdict_builder"] }
|
||||
|
||||
[build-dependencies]
|
||||
|
||||
Reference in New Issue
Block a user