mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-18 13:40:37 +00:00
Compare commits
33 Commits
skyzh/feat
...
skyzh/fake
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6f87e11cdc | ||
|
|
7c2b2325f1 | ||
|
|
7d9f677a22 | ||
|
|
c450d3224d | ||
|
|
0beaf10ccb | ||
|
|
6f629abfe5 | ||
|
|
1da54b7e01 | ||
|
|
443b052eec | ||
|
|
76a044d1fa | ||
|
|
77c9154564 | ||
|
|
c5115518e9 | ||
|
|
931f8c4300 | ||
|
|
0f7c2cc382 | ||
|
|
983d56502b | ||
|
|
bcef542d5b | ||
|
|
e31455d936 | ||
|
|
a4ea7d6194 | ||
|
|
19bea5fd0c | ||
|
|
5be94e28c4 | ||
|
|
63a106021a | ||
|
|
9a6ace9bde | ||
|
|
8c77ccfc01 | ||
|
|
cbd2fc2395 | ||
|
|
028a191040 | ||
|
|
8cce27bedb | ||
|
|
90b706cd96 | ||
|
|
057ce115de | ||
|
|
e85607eed8 | ||
|
|
437071888e | ||
|
|
148b3701cf | ||
|
|
daebe50e19 | ||
|
|
e0ee6fbeff | ||
|
|
307fa2ceb7 |
1
.github/actionlint.yml
vendored
1
.github/actionlint.yml
vendored
@@ -6,6 +6,7 @@ self-hosted-runner:
|
||||
- small
|
||||
- small-metal
|
||||
- small-arm64
|
||||
- unit-perf
|
||||
- us-east-2
|
||||
config-variables:
|
||||
- AWS_ECR_REGION
|
||||
|
||||
@@ -70,6 +70,7 @@ runs:
|
||||
|
||||
- name: Install Allure
|
||||
shell: bash -euxo pipefail {0}
|
||||
working-directory: /tmp
|
||||
run: |
|
||||
if ! which allure; then
|
||||
ALLURE_ZIP=allure-${ALLURE_VERSION}.zip
|
||||
|
||||
11
.github/workflows/_create-release-pr.yml
vendored
11
.github/workflows/_create-release-pr.yml
vendored
@@ -53,10 +53,13 @@ jobs:
|
||||
|| inputs.component-name == 'Compute' && 'release-compute'
|
||||
}}
|
||||
run: |
|
||||
today=$(date +'%Y-%m-%d')
|
||||
echo "title=${COMPONENT_NAME} release ${today}" | tee -a ${GITHUB_OUTPUT}
|
||||
echo "rc-branch=rc/${RELEASE_BRANCH}/${today}" | tee -a ${GITHUB_OUTPUT}
|
||||
echo "release-branch=${RELEASE_BRANCH}" | tee -a ${GITHUB_OUTPUT}
|
||||
now_date=$(date -u +'%Y-%m-%d')
|
||||
now_time=$(date -u +'%H-%M-%Z')
|
||||
{
|
||||
echo "title=${COMPONENT_NAME} release ${now_date}"
|
||||
echo "rc-branch=rc/${RELEASE_BRANCH}/${now_date}_${now_time}"
|
||||
echo "release-branch=${RELEASE_BRANCH}"
|
||||
} | tee -a ${GITHUB_OUTPUT}
|
||||
|
||||
- name: Configure git
|
||||
run: |
|
||||
|
||||
4
.github/workflows/build_and_test.yml
vendored
4
.github/workflows/build_and_test.yml
vendored
@@ -284,7 +284,7 @@ jobs:
|
||||
statuses: write
|
||||
contents: write
|
||||
pull-requests: write
|
||||
runs-on: [ self-hosted, small-metal ]
|
||||
runs-on: [ self-hosted, unit-perf ]
|
||||
container:
|
||||
image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
|
||||
credentials:
|
||||
@@ -1271,7 +1271,7 @@ jobs:
|
||||
exit 1
|
||||
|
||||
deploy:
|
||||
needs: [ check-permissions, push-neon-image-dev, push-compute-image-dev, push-neon-image-prod, push-compute-image-prod, meta, build-and-test-locally, trigger-custom-extensions-build-and-wait ]
|
||||
needs: [ check-permissions, push-neon-image-dev, push-compute-image-dev, push-neon-image-prod, push-compute-image-prod, meta, trigger-custom-extensions-build-and-wait ]
|
||||
# `!failure() && !cancelled()` is required because the workflow depends on the job that can be skipped: `push-neon-image-prod` and `push-compute-image-prod`
|
||||
if: ${{ contains(fromJSON('["push-main", "storage-release", "proxy-release", "compute-release"]'), needs.meta.outputs.run-kind) && !failure() && !cancelled() }}
|
||||
permissions:
|
||||
|
||||
12
Cargo.lock
generated
12
Cargo.lock
generated
@@ -5495,6 +5495,16 @@ version = "1.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c707298afce11da2efef2f600116fa93ffa7a032b5d7b628aa17711ec81383ca"
|
||||
|
||||
[[package]]
|
||||
name = "remote_keys"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"rand 0.8.5",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "remote_storage"
|
||||
version = "0.1.0"
|
||||
@@ -5510,6 +5520,7 @@ dependencies = [
|
||||
"azure_identity",
|
||||
"azure_storage",
|
||||
"azure_storage_blobs",
|
||||
"base64 0.13.1",
|
||||
"bytes",
|
||||
"camino",
|
||||
"camino-tempfile",
|
||||
@@ -5520,6 +5531,7 @@ dependencies = [
|
||||
"humantime-serde",
|
||||
"hyper 1.4.1",
|
||||
"itertools 0.10.5",
|
||||
"md5",
|
||||
"metrics",
|
||||
"once_cell",
|
||||
"pin-project-lite",
|
||||
|
||||
@@ -30,6 +30,7 @@ members = [
|
||||
"libs/tenant_size_model",
|
||||
"libs/metrics",
|
||||
"libs/postgres_connection",
|
||||
"libs/remote_keys",
|
||||
"libs/remote_storage",
|
||||
"libs/tracing-utils",
|
||||
"libs/postgres_ffi/wal_craft",
|
||||
|
||||
@@ -139,7 +139,7 @@ fn main() -> Result<()> {
|
||||
|
||||
let scenario = failpoint_support::init();
|
||||
|
||||
// For historical reasons, the main thread that processes the spec and launches postgres
|
||||
// For historical reasons, the main thread that processes the config and launches postgres
|
||||
// is synchronous, but we always have this tokio runtime available and we "enter" it so
|
||||
// that you can use tokio::spawn() and tokio::runtime::Handle::current().block_on(...)
|
||||
// from all parts of compute_ctl.
|
||||
@@ -155,7 +155,7 @@ fn main() -> Result<()> {
|
||||
|
||||
let connstr = Url::parse(&cli.connstr).context("cannot parse connstr as a URL")?;
|
||||
|
||||
let cli_spec = get_config(&cli)?;
|
||||
let config = get_config(&cli)?;
|
||||
|
||||
let compute_node = ComputeNode::new(
|
||||
ComputeNodeParams {
|
||||
@@ -176,8 +176,7 @@ fn main() -> Result<()> {
|
||||
#[cfg(target_os = "linux")]
|
||||
vm_monitor_addr: cli.vm_monitor_addr,
|
||||
},
|
||||
cli_spec.spec,
|
||||
cli_spec.compute_ctl_config,
|
||||
config,
|
||||
)?;
|
||||
|
||||
let exit_code = compute_node.run()?;
|
||||
|
||||
@@ -11,7 +11,7 @@ use std::{env, fs};
|
||||
use anyhow::{Context, Result};
|
||||
use chrono::{DateTime, Utc};
|
||||
use compute_api::privilege::Privilege;
|
||||
use compute_api::responses::{ComputeCtlConfig, ComputeMetrics, ComputeStatus};
|
||||
use compute_api::responses::{ComputeConfig, ComputeCtlConfig, ComputeMetrics, ComputeStatus};
|
||||
use compute_api::spec::{
|
||||
ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, PgIdent,
|
||||
};
|
||||
@@ -303,11 +303,7 @@ struct StartVmMonitorResult {
|
||||
}
|
||||
|
||||
impl ComputeNode {
|
||||
pub fn new(
|
||||
params: ComputeNodeParams,
|
||||
cli_spec: Option<ComputeSpec>,
|
||||
compute_ctl_config: ComputeCtlConfig,
|
||||
) -> Result<Self> {
|
||||
pub fn new(params: ComputeNodeParams, config: ComputeConfig) -> Result<Self> {
|
||||
let connstr = params.connstr.as_str();
|
||||
let conn_conf = postgres::config::Config::from_str(connstr)
|
||||
.context("cannot build postgres config from connstr")?;
|
||||
@@ -315,8 +311,8 @@ impl ComputeNode {
|
||||
.context("cannot build tokio postgres config from connstr")?;
|
||||
|
||||
let mut new_state = ComputeState::new();
|
||||
if let Some(cli_spec) = cli_spec {
|
||||
let pspec = ParsedSpec::try_from(cli_spec).map_err(|msg| anyhow::anyhow!(msg))?;
|
||||
if let Some(spec) = config.spec {
|
||||
let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
|
||||
new_state.pspec = Some(pspec);
|
||||
}
|
||||
|
||||
@@ -327,7 +323,7 @@ impl ComputeNode {
|
||||
state: Mutex::new(new_state),
|
||||
state_changed: Condvar::new(),
|
||||
ext_download_progress: RwLock::new(HashMap::new()),
|
||||
compute_ctl_config,
|
||||
compute_ctl_config: config.compute_ctl_config,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -11,7 +11,7 @@ use futures::future::BoxFuture;
|
||||
use http::{Request, Response, StatusCode};
|
||||
use jsonwebtoken::{Algorithm, DecodingKey, TokenData, Validation, jwk::JwkSet};
|
||||
use tower_http::auth::AsyncAuthorizeRequest;
|
||||
use tracing::warn;
|
||||
use tracing::{debug, warn};
|
||||
|
||||
use crate::http::{JsonResponse, extract::RequestId};
|
||||
|
||||
@@ -92,7 +92,7 @@ impl AsyncAuthorizeRequest<Body> for Authorize {
|
||||
if data.claims.compute_id != compute_id {
|
||||
return Err(JsonResponse::error(
|
||||
StatusCode::UNAUTHORIZED,
|
||||
"invalid claims in authorization token",
|
||||
"invalid compute ID in authorization token claims",
|
||||
));
|
||||
}
|
||||
|
||||
@@ -112,12 +112,14 @@ impl Authorize {
|
||||
token: &str,
|
||||
validation: &Validation,
|
||||
) -> Result<TokenData<ComputeClaims>> {
|
||||
debug!("verifying token {}", token);
|
||||
|
||||
for jwk in jwks.keys.iter() {
|
||||
let decoding_key = match DecodingKey::from_jwk(jwk) {
|
||||
Ok(key) => key,
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"Failed to construct decoding key from {}: {}",
|
||||
"failed to construct decoding key from {}: {}",
|
||||
jwk.common.key_id.as_ref().unwrap(),
|
||||
e
|
||||
);
|
||||
@@ -130,7 +132,7 @@ impl Authorize {
|
||||
Ok(data) => return Ok(data),
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"Failed to decode authorization token using {}: {}",
|
||||
"failed to decode authorization token using {}: {}",
|
||||
jwk.common.key_id.as_ref().unwrap(),
|
||||
e
|
||||
);
|
||||
@@ -140,6 +142,6 @@ impl Authorize {
|
||||
}
|
||||
}
|
||||
|
||||
Err(anyhow!("Failed to verify authorization token"))
|
||||
Err(anyhow!("failed to verify authorization token"))
|
||||
}
|
||||
}
|
||||
|
||||
8
docker-compose/ext-src/pg_jsonschema-src/Makefile
Normal file
8
docker-compose/ext-src/pg_jsonschema-src/Makefile
Normal file
@@ -0,0 +1,8 @@
|
||||
EXTENSION = pg_jsonschema
|
||||
DATA = pg_jsonschema--1.0.sql
|
||||
REGRESS = jsonschema_valid_api jsonschema_edge_cases
|
||||
REGRESS_OPTS = --load-extension=pg_jsonschema
|
||||
|
||||
PG_CONFIG ?= pg_config
|
||||
PGXS := $(shell $(PG_CONFIG) --pgxs)
|
||||
include $(PGXS)
|
||||
@@ -0,0 +1,87 @@
|
||||
-- Schema with enums, nulls, extra properties disallowed
|
||||
SELECT jsonschema_is_valid('{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"status": { "type": "string", "enum": ["active", "inactive", "pending"] },
|
||||
"email": { "type": ["string", "null"], "format": "email" }
|
||||
},
|
||||
"required": ["status"],
|
||||
"additionalProperties": false
|
||||
}'::json);
|
||||
jsonschema_is_valid
|
||||
---------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
-- Valid enum and null email
|
||||
SELECT jsonschema_validation_errors(
|
||||
'{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"status": { "type": "string", "enum": ["active", "inactive", "pending"] },
|
||||
"email": { "type": ["string", "null"], "format": "email" }
|
||||
},
|
||||
"required": ["status"],
|
||||
"additionalProperties": false
|
||||
}'::json,
|
||||
'{"status": "active", "email": null}'::json
|
||||
);
|
||||
jsonschema_validation_errors
|
||||
------------------------------
|
||||
{}
|
||||
(1 row)
|
||||
|
||||
-- Invalid enum value
|
||||
SELECT jsonschema_validation_errors(
|
||||
'{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"status": { "type": "string", "enum": ["active", "inactive", "pending"] },
|
||||
"email": { "type": ["string", "null"], "format": "email" }
|
||||
},
|
||||
"required": ["status"],
|
||||
"additionalProperties": false
|
||||
}'::json,
|
||||
'{"status": "disabled", "email": null}'::json
|
||||
);
|
||||
jsonschema_validation_errors
|
||||
----------------------------------------------------------------------
|
||||
{"\"disabled\" is not one of [\"active\",\"inactive\",\"pending\"]"}
|
||||
(1 row)
|
||||
|
||||
-- Invalid email format (assuming format is validated)
|
||||
SELECT jsonschema_validation_errors(
|
||||
'{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"status": { "type": "string", "enum": ["active", "inactive", "pending"] },
|
||||
"email": { "type": ["string", "null"], "format": "email" }
|
||||
},
|
||||
"required": ["status"],
|
||||
"additionalProperties": false
|
||||
}'::json,
|
||||
'{"status": "active", "email": "not-an-email"}'::json
|
||||
);
|
||||
jsonschema_validation_errors
|
||||
-----------------------------------------
|
||||
{"\"not-an-email\" is not a \"email\""}
|
||||
(1 row)
|
||||
|
||||
-- Extra property not allowed
|
||||
SELECT jsonschema_validation_errors(
|
||||
'{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"status": { "type": "string", "enum": ["active", "inactive", "pending"] },
|
||||
"email": { "type": ["string", "null"], "format": "email" }
|
||||
},
|
||||
"required": ["status"],
|
||||
"additionalProperties": false
|
||||
}'::json,
|
||||
'{"status": "active", "extra": "should not be here"}'::json
|
||||
);
|
||||
jsonschema_validation_errors
|
||||
--------------------------------------------------------------------
|
||||
{"Additional properties are not allowed ('extra' was unexpected)"}
|
||||
(1 row)
|
||||
|
||||
@@ -0,0 +1,65 @@
|
||||
-- Define schema
|
||||
SELECT jsonschema_is_valid('{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"username": { "type": "string" },
|
||||
"age": { "type": "integer" }
|
||||
},
|
||||
"required": ["username"]
|
||||
}'::json);
|
||||
jsonschema_is_valid
|
||||
---------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
-- Valid instance
|
||||
SELECT jsonschema_validation_errors(
|
||||
'{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"username": { "type": "string" },
|
||||
"age": { "type": "integer" }
|
||||
},
|
||||
"required": ["username"]
|
||||
}'::json,
|
||||
'{"username": "alice", "age": 25}'::json
|
||||
);
|
||||
jsonschema_validation_errors
|
||||
------------------------------
|
||||
{}
|
||||
(1 row)
|
||||
|
||||
-- Invalid instance: missing required "username"
|
||||
SELECT jsonschema_validation_errors(
|
||||
'{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"username": { "type": "string" },
|
||||
"age": { "type": "integer" }
|
||||
},
|
||||
"required": ["username"]
|
||||
}'::json,
|
||||
'{"age": 25}'::json
|
||||
);
|
||||
jsonschema_validation_errors
|
||||
-----------------------------------------
|
||||
{"\"username\" is a required property"}
|
||||
(1 row)
|
||||
|
||||
-- Invalid instance: wrong type for "age"
|
||||
SELECT jsonschema_validation_errors(
|
||||
'{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"username": { "type": "string" },
|
||||
"age": { "type": "integer" }
|
||||
},
|
||||
"required": ["username"]
|
||||
}'::json,
|
||||
'{"username": "bob", "age": "twenty"}'::json
|
||||
);
|
||||
jsonschema_validation_errors
|
||||
-------------------------------------------
|
||||
{"\"twenty\" is not of type \"integer\""}
|
||||
(1 row)
|
||||
|
||||
@@ -0,0 +1,66 @@
|
||||
-- Schema with enums, nulls, extra properties disallowed
|
||||
SELECT jsonschema_is_valid('{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"status": { "type": "string", "enum": ["active", "inactive", "pending"] },
|
||||
"email": { "type": ["string", "null"], "format": "email" }
|
||||
},
|
||||
"required": ["status"],
|
||||
"additionalProperties": false
|
||||
}'::json);
|
||||
|
||||
-- Valid enum and null email
|
||||
SELECT jsonschema_validation_errors(
|
||||
'{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"status": { "type": "string", "enum": ["active", "inactive", "pending"] },
|
||||
"email": { "type": ["string", "null"], "format": "email" }
|
||||
},
|
||||
"required": ["status"],
|
||||
"additionalProperties": false
|
||||
}'::json,
|
||||
'{"status": "active", "email": null}'::json
|
||||
);
|
||||
|
||||
-- Invalid enum value
|
||||
SELECT jsonschema_validation_errors(
|
||||
'{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"status": { "type": "string", "enum": ["active", "inactive", "pending"] },
|
||||
"email": { "type": ["string", "null"], "format": "email" }
|
||||
},
|
||||
"required": ["status"],
|
||||
"additionalProperties": false
|
||||
}'::json,
|
||||
'{"status": "disabled", "email": null}'::json
|
||||
);
|
||||
|
||||
-- Invalid email format (assuming format is validated)
|
||||
SELECT jsonschema_validation_errors(
|
||||
'{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"status": { "type": "string", "enum": ["active", "inactive", "pending"] },
|
||||
"email": { "type": ["string", "null"], "format": "email" }
|
||||
},
|
||||
"required": ["status"],
|
||||
"additionalProperties": false
|
||||
}'::json,
|
||||
'{"status": "active", "email": "not-an-email"}'::json
|
||||
);
|
||||
|
||||
-- Extra property not allowed
|
||||
SELECT jsonschema_validation_errors(
|
||||
'{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"status": { "type": "string", "enum": ["active", "inactive", "pending"] },
|
||||
"email": { "type": ["string", "null"], "format": "email" }
|
||||
},
|
||||
"required": ["status"],
|
||||
"additionalProperties": false
|
||||
}'::json,
|
||||
'{"status": "active", "extra": "should not be here"}'::json
|
||||
);
|
||||
@@ -0,0 +1,48 @@
|
||||
-- Define schema
|
||||
SELECT jsonschema_is_valid('{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"username": { "type": "string" },
|
||||
"age": { "type": "integer" }
|
||||
},
|
||||
"required": ["username"]
|
||||
}'::json);
|
||||
|
||||
-- Valid instance
|
||||
SELECT jsonschema_validation_errors(
|
||||
'{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"username": { "type": "string" },
|
||||
"age": { "type": "integer" }
|
||||
},
|
||||
"required": ["username"]
|
||||
}'::json,
|
||||
'{"username": "alice", "age": 25}'::json
|
||||
);
|
||||
|
||||
-- Invalid instance: missing required "username"
|
||||
SELECT jsonschema_validation_errors(
|
||||
'{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"username": { "type": "string" },
|
||||
"age": { "type": "integer" }
|
||||
},
|
||||
"required": ["username"]
|
||||
}'::json,
|
||||
'{"age": 25}'::json
|
||||
);
|
||||
|
||||
-- Invalid instance: wrong type for "age"
|
||||
SELECT jsonschema_validation_errors(
|
||||
'{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"username": { "type": "string" },
|
||||
"age": { "type": "integer" }
|
||||
},
|
||||
"required": ["username"]
|
||||
}'::json,
|
||||
'{"username": "bob", "age": "twenty"}'::json
|
||||
);
|
||||
9
docker-compose/ext-src/pg_session_jwt-src/Makefile
Normal file
9
docker-compose/ext-src/pg_session_jwt-src/Makefile
Normal file
@@ -0,0 +1,9 @@
|
||||
EXTENSION = pg_session_jwt
|
||||
|
||||
REGRESS = basic_functions
|
||||
REGRESS_OPTS = --load-extension=$(EXTENSION)
|
||||
export PGOPTIONS = -c pg_session_jwt.jwk={"crv":"Ed25519","kty":"OKP","x":"R_Abz-63zJ00l-IraL5fQhwkhGVZCSooQFV5ntC3C7M"}
|
||||
|
||||
PG_CONFIG ?= pg_config
|
||||
PGXS := $(shell $(PG_CONFIG) --pgxs)
|
||||
include $(PGXS)
|
||||
@@ -0,0 +1,35 @@
|
||||
-- Basic functionality tests for pg_session_jwt
|
||||
-- Test auth.init() function
|
||||
SELECT auth.init();
|
||||
init
|
||||
------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Test an invalid JWT
|
||||
SELECT auth.jwt_session_init('INVALID-JWT');
|
||||
ERROR: invalid JWT encoding
|
||||
-- Test creating a session with an expired JWT
|
||||
SELECT auth.jwt_session_init('eyJhbGciOiJFZERTQSJ9.eyJleHAiOjE3NDI1NjQ0MzIsImlhdCI6MTc0MjU2NDI1MiwianRpIjo0MjQyNDIsInN1YiI6InVzZXIxMjMifQ.A6FwKuaSduHB9O7Gz37g0uoD_U9qVS0JNtT7YABGVgB7HUD1AMFc9DeyhNntWBqncg8k5brv-hrNTuUh5JYMAw');
|
||||
ERROR: Token used after it has expired
|
||||
-- Test creating a session with a valid JWT
|
||||
SELECT auth.jwt_session_init('eyJhbGciOiJFZERTQSJ9.eyJleHAiOjQ4OTYxNjQyNTIsImlhdCI6MTc0MjU2NDI1MiwianRpIjo0MzQzNDMsInN1YiI6InVzZXIxMjMifQ.2TXVgjb6JSUq6_adlvp-m_SdOxZSyGS30RS9TLB0xu2N83dMSs2NybwE1NMU8Fb0tcAZR_ET7M2rSxbTrphfCg');
|
||||
jwt_session_init
|
||||
------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Test auth.session() function
|
||||
SELECT auth.session();
|
||||
session
|
||||
-------------------------------------------------------------------------
|
||||
{"exp": 4896164252, "iat": 1742564252, "jti": 434343, "sub": "user123"}
|
||||
(1 row)
|
||||
|
||||
-- Test auth.user_id() function
|
||||
SELECT auth.user_id() AS user_id;
|
||||
user_id
|
||||
---------
|
||||
user123
|
||||
(1 row)
|
||||
|
||||
@@ -0,0 +1,19 @@
|
||||
-- Basic functionality tests for pg_session_jwt
|
||||
|
||||
-- Test auth.init() function
|
||||
SELECT auth.init();
|
||||
|
||||
-- Test an invalid JWT
|
||||
SELECT auth.jwt_session_init('INVALID-JWT');
|
||||
|
||||
-- Test creating a session with an expired JWT
|
||||
SELECT auth.jwt_session_init('eyJhbGciOiJFZERTQSJ9.eyJleHAiOjE3NDI1NjQ0MzIsImlhdCI6MTc0MjU2NDI1MiwianRpIjo0MjQyNDIsInN1YiI6InVzZXIxMjMifQ.A6FwKuaSduHB9O7Gz37g0uoD_U9qVS0JNtT7YABGVgB7HUD1AMFc9DeyhNntWBqncg8k5brv-hrNTuUh5JYMAw');
|
||||
|
||||
-- Test creating a session with a valid JWT
|
||||
SELECT auth.jwt_session_init('eyJhbGciOiJFZERTQSJ9.eyJleHAiOjQ4OTYxNjQyNTIsImlhdCI6MTc0MjU2NDI1MiwianRpIjo0MzQzNDMsInN1YiI6InVzZXIxMjMifQ.2TXVgjb6JSUq6_adlvp-m_SdOxZSyGS30RS9TLB0xu2N83dMSs2NybwE1NMU8Fb0tcAZR_ET7M2rSxbTrphfCg');
|
||||
|
||||
-- Test auth.session() function
|
||||
SELECT auth.session();
|
||||
|
||||
-- Test auth.user_id() function
|
||||
SELECT auth.user_id() AS user_id;
|
||||
@@ -1,242 +0,0 @@
|
||||
|
||||
# Storage Encryption Key Management
|
||||
|
||||
## Summary
|
||||
|
||||
As a precursor to adding new encryption capabilities to Neon's storage services, this RFC proposes
|
||||
mechanisms for creating and storing fine-grained encryption keys for user data in Neon. We aim
|
||||
to provide at least tenant granularity, but will use timeline granularity when it is simpler to do
|
||||
so.
|
||||
|
||||
Out of scope:
|
||||
- We describe an abstract KMS interface, but not particular platform implementations (such as how
|
||||
to authenticate with KMS).
|
||||
|
||||
## Terminology
|
||||
|
||||
_wrapped/unwrapped_: a wrapped encryption key is a key encrypted by another key. For example, the key for
|
||||
encrypting a timeline's pageserver data might be wrapped by some "root" key for the tenant's user account, stored in a KMS system.
|
||||
|
||||
_key hierarchy_: the relationships between keys which wrap each other. For example, a layer file key might
|
||||
be wrapped by a pageserver tenant key, which is wrapped by a tenant's root key.
|
||||
|
||||
## Design Choices
|
||||
|
||||
Storage: S3 will be the store of record for wrapped keys.
|
||||
|
||||
Separate keys: Safekeeper and Pageserver will use independent keys.
|
||||
|
||||
AES256: rather than building a generic system for keys, we will assume that all the keys
|
||||
we manage are AES256 keys -- this is the de-facto standard for enterprise data storage.
|
||||
|
||||
Per-object keys: rather than encrypting data objects (layer files and segment files) with
|
||||
the tenant keys directly, they will be encrypted with separate keys. This avoids cryptographic
|
||||
safety issues from re-using the same key for large quantities of potentially repetitive plaintext.
|
||||
|
||||
S3 objects are self-contained: each encrypted file will have a metadata block in the file itself
|
||||
storing the KMS-wrapped key to decrypt itself.
|
||||
|
||||
Key storage is optional at a per-tenant granularity: eventually this would be on by default, but:
|
||||
- initially only some environments will have a KMS set up.
|
||||
- Encryption has some overhead and it may be that some tenants don't want or need it.
|
||||
|
||||
## Design
|
||||
|
||||
### Summary of format changes
|
||||
|
||||
- Pageserver layer files and safekeeper segment objects are split into blocks and each
|
||||
block is encrypted by the layer key.
|
||||
- Pageserver layer files and safekeeper segment objects get new metadata fields to
|
||||
store wrapped layer key and the KMS-wrapped timeline key.
|
||||
|
||||
### Summary of API changes
|
||||
|
||||
- Pageserver TenantConf API gets a new field for account ID
|
||||
- Pageserver TenantConf API gets a new field for encryption mode
|
||||
- Safekeeper timeline creation API gets a new field for account ID
|
||||
- Controller, pageserver & safekeeper get a new timeline-scoped `rotate_key` API
|
||||
|
||||
### KMS interface
|
||||
|
||||
Neon will interoperate with different KMS APIs on different platforms. We will implement a generic interface,
|
||||
similar to how `remote_storage` wraps different object storage APIs:
|
||||
- `generate(accountId, keyType, alias) -> (wrapped key, plaintext key)`
|
||||
- `unwrap(accountId, ciphertext key) -> plaintext key`
|
||||
|
||||
Hereafter, when we talk about generating or unwrapping a key, this means a call into the KMS API.
|
||||
|
||||
The KMS deals with abstract "account IDs", which are not equal to tenant IDs and may not be
|
||||
1:1 with tenants. The account ID will be provided as part of tenant configuration, along
|
||||
with a field to identify an encryption mode.
|
||||
|
||||
|
||||
### Pageserver Layer File Format
|
||||
|
||||
Encryption blocks are the minimum of unit of read. To read the part of the data within the encryption block
|
||||
we must decrypt the whole block. All encryption blocks share the same layer key within the layer (is this safe?).
|
||||
|
||||
Image layers: each image is one encryption block.
|
||||
|
||||
Delta layers: for the first stage of the project, each delta is encrypted separately; in the future, we can batch
|
||||
several small deltas into a single encryption block.
|
||||
|
||||
Indicies: each B+ tree node is an encryption block.
|
||||
|
||||
Layer format:
|
||||
|
||||
```
|
||||
| Data Block | Data Block | Data Block | ... | Index Block | Index Block | Index Block | Metadata |
|
||||
Data block = encrypt(data, layer_key)
|
||||
Index block = encrypt(index, layer_key); index points a key to a offset of the data block inside the layer file.
|
||||
Metadata = wrap(layer_key, timeline_key), wrap_kms(tenant_key), and other metadata we want to store in the future
|
||||
```
|
||||
|
||||
Note that we generate a random layer_key for each of the layer. We store the layer key wrapped by the current
|
||||
tenant key (described in later sections) and the KMS-wrapped tenant key in the layer.
|
||||
|
||||
If data compression is enabled, the data is compressed first before being encrypted (is this safe?)
|
||||
|
||||
This file format is used across both object storage and local storage. We do not decrypt when downloading
|
||||
the layer file to the disk. Decryption is done when reading the layer.
|
||||
|
||||
### Layer File Format Migration
|
||||
|
||||
We record the file format for each of the layer file in both the index_part and the layer file name (suffix v2?).
|
||||
The layer file format version will be passed into the layer readers. The re-keying operation (described below)
|
||||
will migrate all layer files automatically to v2.
|
||||
|
||||
### Safekeeper Segment Format
|
||||
|
||||
TBD
|
||||
|
||||
### Pageserver Timeline Index
|
||||
|
||||
We will add a `created_at` for each of the layer file so that during re-keying (described in later sections)
|
||||
we can determine which layer files to rewrite. We also record the offset of the metadata block so that it is
|
||||
possible to obtain more information about the layer file without downloading the full layer file (i.e., the
|
||||
exact timeline key being used to encrypt the layer file).
|
||||
|
||||
```
|
||||
# LayerFileMetadata
|
||||
{
|
||||
"format": 2,
|
||||
"created_at": "<time>",
|
||||
"metadata_block_offset": u64,
|
||||
}
|
||||
```
|
||||
|
||||
TODO: create an index for safekeeper so that it's faster to determine what files to re-key? Or we can scan all
|
||||
files.
|
||||
|
||||
### Pageserver Key Cache
|
||||
|
||||
We have a hashmap from KMS-wrapped tenant key to plain key for each of the tenant so that we do not need to repeatly
|
||||
unwrap the same key.
|
||||
|
||||
### Key rotation
|
||||
|
||||
Each tenant stores a tenant key in memory to encrypt all layer files generated across all timelines within
|
||||
its active period. When the key rotation API gets called, we rotate the timeline key in memory by calling the
|
||||
KMS API to generate a new key-pair, and all new layer files' layer keys will be encrypted using this key.
|
||||
|
||||
### Re-keying
|
||||
|
||||
While re-keying and key-rotation are sometimes used synonymously, we distinguish them:
|
||||
- Key rotation is generating a new key to use for new data
|
||||
- Re-keying is rewriting existing data so that old keys are no longer used at all
|
||||
|
||||
Re-keying is a bulk data operation, and not fully defined in this RFC: it can be defined
|
||||
quite simply as "For object in objects, if object key version is < the rekeying horizon,
|
||||
then do a read/write cycle on the object using latest key". This is a simple but potentially very
|
||||
expensive operation, so we discuss efficiency here.
|
||||
|
||||
#### Pageserver re-key
|
||||
|
||||
For pageservers, occasional rekeying may be implemented efficiently if one tolerates using
|
||||
the last few keys and doesn't insist on the latest, because pageservers periodically rewrite
|
||||
their data for GC-compaction anyway. Thereby an API call to re-key any data with an overly old
|
||||
key would often be a no-op because all data was rewritten recently anyway.
|
||||
|
||||
When object versioning is enabled in storage, re-keying is not fully accomplished by just
|
||||
re-writing live data: old versions would still contain user data encrypted with older keys. To
|
||||
fully re-key, an extra step is needed to purge old objects. Ideally, we should only purge
|
||||
old objects which were encrypted using old keys. To this end, it would be useful to store
|
||||
the encryption key version as metadata on objects, so that a scrub of deleted object versions
|
||||
can efficiently select those objects that should be purged during re-key.
|
||||
|
||||
Checks on object versions should not only be on deleted objects: because pageserver can emit
|
||||
"orphan" objects not referenced in the index under some circumstances, re-key must also
|
||||
check non-deleted objects.
|
||||
|
||||
To summarize, the pageserver re-key operation is:
|
||||
- Iterate over index of layer files, select those with too-old key and rewrite them
|
||||
- Iterate over all versions in object storage, select those with a too-old key version
|
||||
in their metadata and purge them (with a safety check that these are not referenced
|
||||
by the latest index).
|
||||
|
||||
It would be wise to combine the re-key procedure with an exhaustive read of a timeline's data,
|
||||
to ensure that when testing & rolling this feature out we are not rendering anything unreadable
|
||||
due to bugs in implementation. Since we are deleting old versions in object storage, our
|
||||
time travel recovery tool will not be any help if we get something wrong in this process.
|
||||
|
||||
#### Safekeeper re-key
|
||||
|
||||
Re-keying a safekeeper timeline requires an exhaustive walk of segment objects, read
|
||||
metadata on each one and decide whether it requires rewrite.
|
||||
|
||||
Safekeeper currently keeps historic objects forever, so re-keying this data will get
|
||||
more expensive as time goes on. This would be a good time to add cleanup of old safekeeper
|
||||
segments, but doing so is beyond the scope of this RFC.
|
||||
|
||||
### Enabling encryption for existing tenants
|
||||
|
||||
To enable encryption for an existing tenant, we may simply call key-rotation API (to generate a key),
|
||||
and then re-key API (to rewrite existing data using this key).
|
||||
|
||||
## Observability
|
||||
|
||||
- To enable some external service to implement re-keying, we should publish metrics per-timeline
|
||||
on the age of their latest encryption key.
|
||||
- Calls to KMS should be tracked with typical request rate/result/latency histograms to enable
|
||||
detection of a slow KMS server and/or errors.
|
||||
|
||||
## Alternatives considered
|
||||
|
||||
### Use same tenant key for safekeeper and pageserver
|
||||
|
||||
We could halve the number of keys in circulation by having the safekeeper and pageserver
|
||||
share a key rather than working independently.
|
||||
|
||||
However, this would be substantially more complex to implement, as safekeepers and pageservers
|
||||
currently share no storage, so some new communication path would be needed. There is minimal
|
||||
upside in sharing a key.
|
||||
|
||||
### No KMS dependency
|
||||
|
||||
We could choose to do all key management ourselves. However, the industry standard approach
|
||||
to enabling users of cloud SaaS software to self-manage keys is to use the KMS as the intermediary
|
||||
between our system and the user's control of their key. Although this RFC does not propose user-managed keys, we should design with this in mind.
|
||||
|
||||
### Do all key generation/wrapping in KMS service
|
||||
|
||||
We could avoid generating and wrapping/unwrapping object keys in our storage
|
||||
services by delegating all responsibility for key operations to the KMS. However,
|
||||
KMS services have limited throughput and in some cases may charge per operation, so
|
||||
it is useful to avoid doing KMS operations per-object, and restrict them to per-timeline
|
||||
frequency.
|
||||
|
||||
### Per-tenant instead of per-timeline pageserver keys
|
||||
|
||||
For tenants with many timelines, we may reduce load on KMS service by
|
||||
using per-tenant instead of per-timeline keys, so that we may do operations
|
||||
such as creating a timeline without needing to do a KMS unwrap operation.
|
||||
|
||||
However, per-timeline key management is much simpler to implement on the safekeeper,
|
||||
which currently has no concept of a tenant (other than as a namespace for timelines).
|
||||
It is also slightly simpler to implement on the pageserver, as it avoids implementing
|
||||
a tenant-scoped creation operation to initialize keys (instead, we may initialize keys
|
||||
during timeline creation).
|
||||
|
||||
As a side benefit, per-timeline key management also enables implementing secure deletion in future
|
||||
at a per-timeline granularity.
|
||||
|
||||
13
libs/remote_keys/Cargo.toml
Normal file
13
libs/remote_keys/Cargo.toml
Normal file
@@ -0,0 +1,13 @@
|
||||
[package]
|
||||
name = "remote_keys"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
utils.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
rand.workspace = true
|
||||
42
libs/remote_keys/src/lib.rs
Normal file
42
libs/remote_keys/src/lib.rs
Normal file
@@ -0,0 +1,42 @@
|
||||
//! A module that provides a KMS implementation that generates and unwraps the keys.
|
||||
//!
|
||||
|
||||
/// A KMS implementation that does static wrapping and unwrapping of the keys.
|
||||
pub struct NaiveKms {
|
||||
account_id: String,
|
||||
}
|
||||
|
||||
impl NaiveKms {
|
||||
pub fn new(account_id: String) -> Self {
|
||||
Self { account_id }
|
||||
}
|
||||
|
||||
pub fn encrypt(&self, plain: &[u8]) -> anyhow::Result<Vec<u8>> {
|
||||
let wrapped = [self.account_id.as_bytes(), "-wrapped-".as_bytes(), plain].concat();
|
||||
Ok(wrapped)
|
||||
}
|
||||
|
||||
pub fn decrypt(&self, wrapped: &[u8]) -> anyhow::Result<Vec<u8>> {
|
||||
let Some(wrapped) = wrapped.strip_prefix(self.account_id.as_bytes()) else {
|
||||
return Err(anyhow::anyhow!("invalid key"));
|
||||
};
|
||||
let Some(plain) = wrapped.strip_prefix(b"-wrapped-") else {
|
||||
return Err(anyhow::anyhow!("invalid key"));
|
||||
};
|
||||
Ok(plain.to_vec())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_generate_key() {
|
||||
let kms = NaiveKms::new("test-tenant".to_string());
|
||||
let data = rand::random::<[u8; 32]>().to_vec();
|
||||
let encrypted = kms.encrypt(&data).unwrap();
|
||||
let decrypted = kms.decrypt(&encrypted).unwrap();
|
||||
assert_eq!(data, decrypted);
|
||||
}
|
||||
}
|
||||
@@ -13,6 +13,7 @@ aws-smithy-async.workspace = true
|
||||
aws-smithy-types.workspace = true
|
||||
aws-config.workspace = true
|
||||
aws-sdk-s3.workspace = true
|
||||
base64.workspace = true
|
||||
bytes.workspace = true
|
||||
camino = { workspace = true, features = ["serde1"] }
|
||||
humantime-serde.workspace = true
|
||||
@@ -27,6 +28,7 @@ tokio-util = { workspace = true, features = ["compat"] }
|
||||
toml_edit.workspace = true
|
||||
tracing.workspace = true
|
||||
scopeguard.workspace = true
|
||||
md5.workspace = true
|
||||
metrics.workspace = true
|
||||
utils = { path = "../utils", default-features = false }
|
||||
pin-project-lite.workspace = true
|
||||
|
||||
@@ -550,6 +550,19 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
self.download_for_builder(builder, timeout, cancel).await
|
||||
}
|
||||
|
||||
#[allow(unused_variables)]
|
||||
async fn upload_with_encryption(
|
||||
&self,
|
||||
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
|
||||
data_size_bytes: usize,
|
||||
to: &RemotePath,
|
||||
metadata: Option<StorageMetadata>,
|
||||
encryption_key: Option<&[u8]>,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
|
||||
self.delete_objects(std::array::from_ref(path), cancel)
|
||||
.await
|
||||
|
||||
@@ -190,6 +190,8 @@ pub struct DownloadOpts {
|
||||
/// timeouts: for something like an index/manifest/heatmap, we should time out faster than
|
||||
/// for layer files
|
||||
pub kind: DownloadKind,
|
||||
/// The encryption key to use for the download.
|
||||
pub encryption_key: Option<Vec<u8>>,
|
||||
}
|
||||
|
||||
pub enum DownloadKind {
|
||||
@@ -204,6 +206,7 @@ impl Default for DownloadOpts {
|
||||
byte_start: Bound::Unbounded,
|
||||
byte_end: Bound::Unbounded,
|
||||
kind: DownloadKind::Large,
|
||||
encryption_key: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -241,6 +244,15 @@ impl DownloadOpts {
|
||||
None => format!("bytes={start}-"),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn with_encryption_key(mut self, encryption_key: Option<impl AsRef<[u8]>>) -> Self {
|
||||
self.encryption_key = encryption_key.map(|k| k.as_ref().to_vec());
|
||||
self
|
||||
}
|
||||
|
||||
pub fn encryption_key(&self) -> Option<&[u8]> {
|
||||
self.encryption_key.as_deref()
|
||||
}
|
||||
}
|
||||
|
||||
/// Storage (potentially remote) API to manage its state.
|
||||
@@ -331,6 +343,19 @@ pub trait RemoteStorage: Send + Sync + 'static {
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<Download, DownloadError>;
|
||||
|
||||
/// Same as upload, but with remote encryption if the backend supports it (e.g. SSE-C on AWS).
|
||||
async fn upload_with_encryption(
|
||||
&self,
|
||||
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
|
||||
// S3 PUT request requires the content length to be specified,
|
||||
// otherwise it starts to fail with the concurrent connection count increasing.
|
||||
data_size_bytes: usize,
|
||||
to: &RemotePath,
|
||||
metadata: Option<StorageMetadata>,
|
||||
encryption_key: Option<&[u8]>,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()>;
|
||||
|
||||
/// Delete a single path from remote storage.
|
||||
///
|
||||
/// If the operation fails because of timeout or cancellation, the root cause of the error will be
|
||||
@@ -615,6 +640,63 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn upload_with_encryption(
|
||||
&self,
|
||||
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
|
||||
data_size_bytes: usize,
|
||||
to: &RemotePath,
|
||||
metadata: Option<StorageMetadata>,
|
||||
encryption_key: Option<&[u8]>,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
match self {
|
||||
Self::LocalFs(s) => {
|
||||
s.upload_with_encryption(
|
||||
from,
|
||||
data_size_bytes,
|
||||
to,
|
||||
metadata,
|
||||
encryption_key,
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
}
|
||||
Self::AwsS3(s) => {
|
||||
s.upload_with_encryption(
|
||||
from,
|
||||
data_size_bytes,
|
||||
to,
|
||||
metadata,
|
||||
encryption_key,
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
}
|
||||
Self::AzureBlob(s) => {
|
||||
s.upload_with_encryption(
|
||||
from,
|
||||
data_size_bytes,
|
||||
to,
|
||||
metadata,
|
||||
encryption_key,
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
}
|
||||
Self::Unreliable(s) => {
|
||||
s.upload_with_encryption(
|
||||
from,
|
||||
data_size_bytes,
|
||||
to,
|
||||
metadata,
|
||||
encryption_key,
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl GenericRemoteStorage {
|
||||
|
||||
@@ -560,6 +560,19 @@ impl RemoteStorage for LocalFs {
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(unused_variables)]
|
||||
async fn upload_with_encryption(
|
||||
&self,
|
||||
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
|
||||
data_size_bytes: usize,
|
||||
to: &RemotePath,
|
||||
metadata: Option<StorageMetadata>,
|
||||
encryption_key: Option<&[u8]>,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn delete_objects(
|
||||
&self,
|
||||
paths: &[RemotePath],
|
||||
|
||||
@@ -66,7 +66,10 @@ struct GetObjectRequest {
|
||||
key: String,
|
||||
etag: Option<String>,
|
||||
range: Option<String>,
|
||||
/// Base64 encoded SSE-C key for server-side encryption.
|
||||
sse_c_key: Option<Vec<u8>>,
|
||||
}
|
||||
|
||||
impl S3Bucket {
|
||||
/// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
|
||||
pub async fn new(remote_storage_config: &S3Config, timeout: Duration) -> anyhow::Result<Self> {
|
||||
@@ -257,6 +260,13 @@ impl S3Bucket {
|
||||
builder = builder.if_none_match(etag);
|
||||
}
|
||||
|
||||
if let Some(encryption_key) = request.sse_c_key {
|
||||
builder = builder.sse_customer_algorithm("AES256");
|
||||
builder = builder.sse_customer_key(base64::encode(&encryption_key));
|
||||
builder = builder
|
||||
.sse_customer_key_md5(base64::encode(md5::compute(&encryption_key).as_slice()));
|
||||
}
|
||||
|
||||
let get_object = builder.send();
|
||||
|
||||
let get_object = tokio::select! {
|
||||
@@ -693,12 +703,13 @@ impl RemoteStorage for S3Bucket {
|
||||
})
|
||||
}
|
||||
|
||||
async fn upload(
|
||||
async fn upload_with_encryption(
|
||||
&self,
|
||||
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
|
||||
from_size_bytes: usize,
|
||||
to: &RemotePath,
|
||||
metadata: Option<StorageMetadata>,
|
||||
encryption_key: Option<&[u8]>,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
let kind = RequestKind::Put;
|
||||
@@ -709,7 +720,7 @@ impl RemoteStorage for S3Bucket {
|
||||
let body = StreamBody::new(from.map(|x| x.map(Frame::data)));
|
||||
let bytes_stream = ByteStream::new(SdkBody::from_body_1_x(body));
|
||||
|
||||
let upload = self
|
||||
let mut upload = self
|
||||
.client
|
||||
.put_object()
|
||||
.bucket(self.bucket_name.clone())
|
||||
@@ -717,8 +728,17 @@ impl RemoteStorage for S3Bucket {
|
||||
.set_metadata(metadata.map(|m| m.0))
|
||||
.set_storage_class(self.upload_storage_class.clone())
|
||||
.content_length(from_size_bytes.try_into()?)
|
||||
.body(bytes_stream)
|
||||
.send();
|
||||
.body(bytes_stream);
|
||||
|
||||
if let Some(encryption_key) = encryption_key {
|
||||
upload = upload.sse_customer_algorithm("AES256");
|
||||
let base64_key = base64::encode(encryption_key);
|
||||
upload = upload.sse_customer_key(&base64_key);
|
||||
upload = upload
|
||||
.sse_customer_key_md5(base64::encode(md5::compute(encryption_key).as_slice()));
|
||||
}
|
||||
|
||||
let upload = upload.send();
|
||||
|
||||
let upload = tokio::time::timeout(self.timeout, upload);
|
||||
|
||||
@@ -742,6 +762,18 @@ impl RemoteStorage for S3Bucket {
|
||||
}
|
||||
}
|
||||
|
||||
async fn upload(
|
||||
&self,
|
||||
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
|
||||
data_size_bytes: usize,
|
||||
to: &RemotePath,
|
||||
metadata: Option<StorageMetadata>,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
self.upload_with_encryption(from, data_size_bytes, to, metadata, None, cancel)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn copy(
|
||||
&self,
|
||||
from: &RemotePath,
|
||||
@@ -801,6 +833,7 @@ impl RemoteStorage for S3Bucket {
|
||||
key: self.relative_path_to_s3_object(from),
|
||||
etag: opts.etag.as_ref().map(|e| e.to_string()),
|
||||
range: opts.byte_range_header(),
|
||||
sse_c_key: opts.encryption_key.clone(),
|
||||
},
|
||||
cancel,
|
||||
)
|
||||
|
||||
@@ -178,6 +178,19 @@ impl RemoteStorage for UnreliableWrapper {
|
||||
self.inner.download(from, opts, cancel).await
|
||||
}
|
||||
|
||||
#[allow(unused_variables)]
|
||||
async fn upload_with_encryption(
|
||||
&self,
|
||||
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
|
||||
data_size_bytes: usize,
|
||||
to: &RemotePath,
|
||||
metadata: Option<StorageMetadata>,
|
||||
encryption_key: Option<&[u8]>,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
|
||||
self.delete_inner(path, true, cancel).await
|
||||
}
|
||||
|
||||
@@ -421,7 +421,7 @@ async fn download_is_timeouted(ctx: &mut MaybeEnabledStorage) {
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
let len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
|
||||
let len = upload_large_enough_file(&ctx.client, &path, &cancel, None).await;
|
||||
|
||||
let timeout = std::time::Duration::from_secs(5);
|
||||
|
||||
@@ -500,7 +500,7 @@ async fn download_is_cancelled(ctx: &mut MaybeEnabledStorage) {
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
let file_len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
|
||||
let file_len = upload_large_enough_file(&ctx.client, &path, &cancel, None).await;
|
||||
|
||||
{
|
||||
let stream = ctx
|
||||
@@ -555,6 +555,7 @@ async fn upload_large_enough_file(
|
||||
client: &GenericRemoteStorage,
|
||||
path: &RemotePath,
|
||||
cancel: &CancellationToken,
|
||||
encryption_key: Option<&[u8]>,
|
||||
) -> usize {
|
||||
let header = bytes::Bytes::from_static("remote blob data content".as_bytes());
|
||||
let body = bytes::Bytes::from(vec![0u8; 1024]);
|
||||
@@ -565,9 +566,54 @@ async fn upload_large_enough_file(
|
||||
let contents = futures::stream::iter(contents.map(std::io::Result::Ok));
|
||||
|
||||
client
|
||||
.upload(contents, len, path, None, cancel)
|
||||
.upload_with_encryption(contents, len, path, None, encryption_key, cancel)
|
||||
.await
|
||||
.expect("upload succeeds");
|
||||
|
||||
len
|
||||
}
|
||||
|
||||
#[test_context(MaybeEnabledStorage)]
|
||||
#[tokio::test]
|
||||
async fn encryption_works(ctx: &mut MaybeEnabledStorage) {
|
||||
let MaybeEnabledStorage::Enabled(ctx) = ctx else {
|
||||
return;
|
||||
};
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let path = RemotePath::new(Utf8Path::new(
|
||||
format!("{}/file_to_copy", ctx.base_prefix).as_str(),
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
let key = rand::random::<[u8; 32]>();
|
||||
let file_len = upload_large_enough_file(&ctx.client, &path, &cancel, Some(&key)).await;
|
||||
|
||||
{
|
||||
let download = ctx
|
||||
.client
|
||||
.download(
|
||||
&path,
|
||||
&DownloadOpts::default().with_encryption_key(Some(&key)),
|
||||
&cancel,
|
||||
)
|
||||
.await
|
||||
.expect("should succeed");
|
||||
let vec = download_to_vec(download).await.expect("should succeed");
|
||||
assert_eq!(vec.len(), file_len);
|
||||
}
|
||||
|
||||
{
|
||||
// Download without encryption key should fail
|
||||
let download = ctx
|
||||
.client
|
||||
.download(&path, &DownloadOpts::default(), &cancel)
|
||||
.await;
|
||||
assert!(download.is_err());
|
||||
}
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
ctx.client.delete_objects(&[path], &cancel).await.unwrap();
|
||||
}
|
||||
|
||||
@@ -10,6 +10,8 @@ default = []
|
||||
# which adds some runtime cost to run tests on outage conditions
|
||||
testing = ["fail/failpoints", "pageserver_api/testing", "wal_decoder/testing", "pageserver_client/testing"]
|
||||
|
||||
fuzz-read-path = ["testing"]
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
arc-swap.workspace = true
|
||||
|
||||
@@ -126,7 +126,7 @@ async fn ingest(
|
||||
max_concurrency: NonZeroUsize::new(1).unwrap(),
|
||||
});
|
||||
let (_desc, path) = layer
|
||||
.write_to_disk(&ctx, None, l0_flush_state.inner())
|
||||
.write_to_disk(&ctx, None, l0_flush_state.inner(), &gate, cancel.clone())
|
||||
.await?
|
||||
.unwrap();
|
||||
tokio::fs::remove_file(path).await?;
|
||||
|
||||
@@ -1714,6 +1714,28 @@ pub enum SmgrQueryType {
|
||||
Test,
|
||||
}
|
||||
|
||||
#[derive(
|
||||
Debug,
|
||||
Clone,
|
||||
Copy,
|
||||
IntoStaticStr,
|
||||
strum_macros::EnumCount,
|
||||
strum_macros::EnumIter,
|
||||
strum_macros::FromRepr,
|
||||
enum_map::Enum,
|
||||
)]
|
||||
#[strum(serialize_all = "snake_case")]
|
||||
pub enum GetPageBatchBreakReason {
|
||||
BatchFull,
|
||||
NonBatchableRequest,
|
||||
NonUniformLsn,
|
||||
SamePageAtDifferentLsn,
|
||||
NonUniformTimeline,
|
||||
ExecutorSteal,
|
||||
#[cfg(feature = "testing")]
|
||||
NonUniformKey,
|
||||
}
|
||||
|
||||
pub(crate) struct SmgrQueryTimePerTimeline {
|
||||
global_started: [IntCounter; SmgrQueryType::COUNT],
|
||||
global_latency: [Histogram; SmgrQueryType::COUNT],
|
||||
@@ -1725,6 +1747,8 @@ pub(crate) struct SmgrQueryTimePerTimeline {
|
||||
per_timeline_flush_in_progress_micros: IntCounter,
|
||||
global_batch_wait_time: Histogram,
|
||||
per_timeline_batch_wait_time: Histogram,
|
||||
global_batch_break_reason: [IntCounter; GetPageBatchBreakReason::COUNT],
|
||||
per_timeline_batch_break_reason: GetPageBatchBreakReasonTimelineMetrics,
|
||||
throttling: Arc<tenant_throttling::Pagestream>,
|
||||
}
|
||||
|
||||
@@ -1858,6 +1882,49 @@ static PAGE_SERVICE_BATCH_SIZE_PER_TENANT_TIMELINE: Lazy<HistogramVec> = Lazy::n
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static PAGE_SERVICE_BATCH_BREAK_REASON_GLOBAL: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
// it's a counter, but, name is prepared to extend it to a histogram of queue depth
|
||||
"pageserver_page_service_batch_break_reason_global",
|
||||
"Reason for breaking batches of get page requests",
|
||||
&["reason"],
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
struct GetPageBatchBreakReasonTimelineMetrics {
|
||||
map: EnumMap<GetPageBatchBreakReason, IntCounter>,
|
||||
}
|
||||
|
||||
impl GetPageBatchBreakReasonTimelineMetrics {
|
||||
fn new(tenant_id: &str, shard_slug: &str, timeline_id: &str) -> Self {
|
||||
GetPageBatchBreakReasonTimelineMetrics {
|
||||
map: EnumMap::from_array(std::array::from_fn(|reason_idx| {
|
||||
let reason = GetPageBatchBreakReason::from_usize(reason_idx);
|
||||
PAGE_SERVICE_BATCH_BREAK_REASON_PER_TENANT_TIMELINE.with_label_values(&[
|
||||
tenant_id,
|
||||
shard_slug,
|
||||
timeline_id,
|
||||
reason.into(),
|
||||
])
|
||||
})),
|
||||
}
|
||||
}
|
||||
|
||||
fn inc(&self, reason: GetPageBatchBreakReason) {
|
||||
self.map[reason].inc()
|
||||
}
|
||||
}
|
||||
|
||||
static PAGE_SERVICE_BATCH_BREAK_REASON_PER_TENANT_TIMELINE: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_page_service_batch_break_reason",
|
||||
"Reason for breaking batches of get page requests",
|
||||
&["tenant_id", "shard_id", "timeline_id", "reason"],
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static PAGE_SERVICE_CONFIG_MAX_BATCH_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
register_int_gauge_vec!(
|
||||
"pageserver_page_service_config_max_batch_size",
|
||||
@@ -1985,6 +2052,15 @@ impl SmgrQueryTimePerTimeline {
|
||||
.get_metric_with_label_values(&[&tenant_id, &shard_slug, &timeline_id])
|
||||
.unwrap();
|
||||
|
||||
let global_batch_break_reason = std::array::from_fn(|i| {
|
||||
let reason = GetPageBatchBreakReason::from_usize(i);
|
||||
PAGE_SERVICE_BATCH_BREAK_REASON_GLOBAL
|
||||
.get_metric_with_label_values(&[reason.into()])
|
||||
.unwrap()
|
||||
});
|
||||
let per_timeline_batch_break_reason =
|
||||
GetPageBatchBreakReasonTimelineMetrics::new(&tenant_id, &shard_slug, &timeline_id);
|
||||
|
||||
let global_flush_in_progress_micros =
|
||||
PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS_GLOBAL.clone();
|
||||
let per_timeline_flush_in_progress_micros = PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS
|
||||
@@ -2002,6 +2078,8 @@ impl SmgrQueryTimePerTimeline {
|
||||
per_timeline_flush_in_progress_micros,
|
||||
global_batch_wait_time,
|
||||
per_timeline_batch_wait_time,
|
||||
global_batch_break_reason,
|
||||
per_timeline_batch_break_reason,
|
||||
throttling: pagestream_throttle_metrics,
|
||||
}
|
||||
}
|
||||
@@ -2030,9 +2108,16 @@ impl SmgrQueryTimePerTimeline {
|
||||
}
|
||||
|
||||
/// TODO: do something about this? seems odd, we have a similar call on SmgrOpTimer
|
||||
pub(crate) fn observe_getpage_batch_start(&self, batch_size: usize) {
|
||||
pub(crate) fn observe_getpage_batch_start(
|
||||
&self,
|
||||
batch_size: usize,
|
||||
break_reason: GetPageBatchBreakReason,
|
||||
) {
|
||||
self.global_batch_size.observe(batch_size as f64);
|
||||
self.per_timeline_batch_size.observe(batch_size as f64);
|
||||
|
||||
self.global_batch_break_reason[break_reason.into_usize()].inc();
|
||||
self.per_timeline_batch_break_reason.inc(break_reason);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3398,6 +3483,15 @@ impl TimelineMetrics {
|
||||
shard_id,
|
||||
timeline_id,
|
||||
]);
|
||||
|
||||
for reason in GetPageBatchBreakReason::iter() {
|
||||
let _ = PAGE_SERVICE_BATCH_BREAK_REASON_PER_TENANT_TIMELINE.remove_label_values(&[
|
||||
tenant_id,
|
||||
shard_id,
|
||||
timeline_id,
|
||||
reason.into(),
|
||||
]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4276,6 +4370,7 @@ pub fn preinitialize_metrics(
|
||||
[
|
||||
&BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT,
|
||||
&SMGR_QUERY_STARTED_GLOBAL,
|
||||
&PAGE_SERVICE_BATCH_BREAK_REASON_GLOBAL,
|
||||
]
|
||||
.into_iter()
|
||||
.for_each(|c| {
|
||||
|
||||
@@ -58,8 +58,8 @@ use crate::context::{
|
||||
DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
|
||||
};
|
||||
use crate::metrics::{
|
||||
self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, LIVE_CONNECTIONS, SmgrOpTimer,
|
||||
TimelineMetrics,
|
||||
self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, GetPageBatchBreakReason, LIVE_CONNECTIONS,
|
||||
SmgrOpTimer, TimelineMetrics,
|
||||
};
|
||||
use crate::pgdatadir_mapping::Version;
|
||||
use crate::span::{
|
||||
@@ -672,6 +672,7 @@ enum BatchedFeMessage {
|
||||
span: Span,
|
||||
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
|
||||
pages: smallvec::SmallVec<[BatchedGetPageRequest; 1]>,
|
||||
batch_break_reason: GetPageBatchBreakReason,
|
||||
},
|
||||
DbSize {
|
||||
span: Span,
|
||||
@@ -724,6 +725,119 @@ impl BatchedFeMessage {
|
||||
BatchedFeMessage::RespondError { .. } => {}
|
||||
}
|
||||
}
|
||||
|
||||
fn should_break_batch(
|
||||
&self,
|
||||
other: &BatchedFeMessage,
|
||||
max_batch_size: NonZeroUsize,
|
||||
batching_strategy: PageServiceProtocolPipelinedBatchingStrategy,
|
||||
) -> Option<GetPageBatchBreakReason> {
|
||||
match (self, other) {
|
||||
(
|
||||
BatchedFeMessage::GetPage {
|
||||
shard: accum_shard,
|
||||
pages: accum_pages,
|
||||
..
|
||||
},
|
||||
BatchedFeMessage::GetPage {
|
||||
shard: this_shard,
|
||||
pages: this_pages,
|
||||
..
|
||||
},
|
||||
) => {
|
||||
assert_eq!(this_pages.len(), 1);
|
||||
if accum_pages.len() >= max_batch_size.get() {
|
||||
trace!(%max_batch_size, "stopping batching because of batch size");
|
||||
assert_eq!(accum_pages.len(), max_batch_size.get());
|
||||
|
||||
return Some(GetPageBatchBreakReason::BatchFull);
|
||||
}
|
||||
if !accum_shard.is_same_handle_as(this_shard) {
|
||||
trace!("stopping batching because timeline object mismatch");
|
||||
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
|
||||
// But the current logic for keeping responses in order does not support that.
|
||||
|
||||
return Some(GetPageBatchBreakReason::NonUniformTimeline);
|
||||
}
|
||||
|
||||
match batching_strategy {
|
||||
PageServiceProtocolPipelinedBatchingStrategy::UniformLsn => {
|
||||
if let Some(last_in_batch) = accum_pages.last() {
|
||||
if last_in_batch.effective_request_lsn
|
||||
!= this_pages[0].effective_request_lsn
|
||||
{
|
||||
trace!(
|
||||
accum_lsn = %last_in_batch.effective_request_lsn,
|
||||
this_lsn = %this_pages[0].effective_request_lsn,
|
||||
"stopping batching because LSN changed"
|
||||
);
|
||||
|
||||
return Some(GetPageBatchBreakReason::NonUniformLsn);
|
||||
}
|
||||
}
|
||||
}
|
||||
PageServiceProtocolPipelinedBatchingStrategy::ScatteredLsn => {
|
||||
// The read path doesn't curently support serving the same page at different LSNs.
|
||||
// While technically possible, it's uncertain if the complexity is worth it.
|
||||
// Break the batch if such a case is encountered.
|
||||
let same_page_different_lsn = accum_pages.iter().any(|batched| {
|
||||
batched.req.rel == this_pages[0].req.rel
|
||||
&& batched.req.blkno == this_pages[0].req.blkno
|
||||
&& batched.effective_request_lsn
|
||||
!= this_pages[0].effective_request_lsn
|
||||
});
|
||||
|
||||
if same_page_different_lsn {
|
||||
trace!(
|
||||
rel=%this_pages[0].req.rel,
|
||||
blkno=%this_pages[0].req.blkno,
|
||||
lsn=%this_pages[0].effective_request_lsn,
|
||||
"stopping batching because same page was requested at different LSNs"
|
||||
);
|
||||
|
||||
return Some(GetPageBatchBreakReason::SamePageAtDifferentLsn);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
#[cfg(feature = "testing")]
|
||||
(
|
||||
BatchedFeMessage::Test {
|
||||
shard: accum_shard,
|
||||
requests: accum_requests,
|
||||
..
|
||||
},
|
||||
BatchedFeMessage::Test {
|
||||
shard: this_shard,
|
||||
requests: this_requests,
|
||||
..
|
||||
},
|
||||
) => {
|
||||
assert!(this_requests.len() == 1);
|
||||
if accum_requests.len() >= max_batch_size.get() {
|
||||
trace!(%max_batch_size, "stopping batching because of batch size");
|
||||
assert_eq!(accum_requests.len(), max_batch_size.get());
|
||||
return Some(GetPageBatchBreakReason::BatchFull);
|
||||
}
|
||||
if !accum_shard.is_same_handle_as(this_shard) {
|
||||
trace!("stopping batching because timeline object mismatch");
|
||||
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
|
||||
// But the current logic for keeping responses in order does not support that.
|
||||
return Some(GetPageBatchBreakReason::NonUniformTimeline);
|
||||
}
|
||||
let this_batch_key = this_requests[0].req.batch_key;
|
||||
let accum_batch_key = accum_requests[0].req.batch_key;
|
||||
if this_requests[0].req.batch_key != accum_requests[0].req.batch_key {
|
||||
trace!(%accum_batch_key, %this_batch_key, "stopping batching because batch key changed");
|
||||
return Some(GetPageBatchBreakReason::NonUniformKey);
|
||||
}
|
||||
None
|
||||
}
|
||||
(_, _) => Some(GetPageBatchBreakReason::NonBatchableRequest),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PageServerHandler {
|
||||
@@ -1047,6 +1161,10 @@ impl PageServerHandler {
|
||||
effective_request_lsn,
|
||||
ctx,
|
||||
}],
|
||||
// The executor grabs the batch when it becomes idle.
|
||||
// Hence, [`GetPageBatchBreakReason::ExecutorSteal`] is the
|
||||
// default reason for breaking the batch.
|
||||
batch_break_reason: GetPageBatchBreakReason::ExecutorSteal,
|
||||
}
|
||||
}
|
||||
#[cfg(feature = "testing")]
|
||||
@@ -1084,118 +1202,59 @@ impl PageServerHandler {
|
||||
Err(e) => return Err(Err(e)),
|
||||
};
|
||||
|
||||
match (&mut *batch, this_msg) {
|
||||
// something batched already, let's see if we can add this message to the batch
|
||||
(
|
||||
Ok(BatchedFeMessage::GetPage {
|
||||
span: _,
|
||||
shard: accum_shard,
|
||||
pages: accum_pages,
|
||||
}),
|
||||
BatchedFeMessage::GetPage {
|
||||
span: _,
|
||||
shard: this_shard,
|
||||
pages: this_pages,
|
||||
},
|
||||
) if (|| {
|
||||
assert_eq!(this_pages.len(), 1);
|
||||
if accum_pages.len() >= max_batch_size.get() {
|
||||
trace!(%max_batch_size, "stopping batching because of batch size");
|
||||
assert_eq!(accum_pages.len(), max_batch_size.get());
|
||||
return false;
|
||||
}
|
||||
if !accum_shard.is_same_handle_as(&this_shard) {
|
||||
trace!("stopping batching because timeline object mismatch");
|
||||
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
|
||||
// But the current logic for keeping responses in order does not support that.
|
||||
return false;
|
||||
}
|
||||
|
||||
match batching_strategy {
|
||||
PageServiceProtocolPipelinedBatchingStrategy::UniformLsn => {
|
||||
if let Some(last_in_batch) = accum_pages.last() {
|
||||
if last_in_batch.effective_request_lsn
|
||||
!= this_pages[0].effective_request_lsn
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
PageServiceProtocolPipelinedBatchingStrategy::ScatteredLsn => {
|
||||
// The read path doesn't curently support serving the same page at different LSNs.
|
||||
// While technically possible, it's uncertain if the complexity is worth it.
|
||||
// Break the batch if such a case is encountered.
|
||||
//
|
||||
// TODO(vlad): Include a metric for batch breaks with a reason label.
|
||||
let same_page_different_lsn = accum_pages.iter().any(|batched| {
|
||||
batched.req.rel == this_pages[0].req.rel
|
||||
&& batched.req.blkno == this_pages[0].req.blkno
|
||||
&& batched.effective_request_lsn
|
||||
!= this_pages[0].effective_request_lsn
|
||||
});
|
||||
|
||||
if same_page_different_lsn {
|
||||
trace!(
|
||||
rel=%this_pages[0].req.rel,
|
||||
blkno=%this_pages[0].req.blkno,
|
||||
lsn=%this_pages[0].effective_request_lsn,
|
||||
"stopping batching because same page was requested at different LSNs"
|
||||
);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
true
|
||||
})() =>
|
||||
{
|
||||
// ok to batch
|
||||
accum_pages.extend(this_pages);
|
||||
Ok(())
|
||||
let eligible_batch = match batch {
|
||||
Ok(b) => b,
|
||||
Err(_) => {
|
||||
return Err(Ok(this_msg));
|
||||
}
|
||||
#[cfg(feature = "testing")]
|
||||
(
|
||||
Ok(BatchedFeMessage::Test {
|
||||
shard: accum_shard,
|
||||
requests: accum_requests,
|
||||
..
|
||||
}),
|
||||
BatchedFeMessage::Test {
|
||||
shard: this_shard,
|
||||
requests: this_requests,
|
||||
..
|
||||
},
|
||||
) if (|| {
|
||||
assert!(this_requests.len() == 1);
|
||||
if accum_requests.len() >= max_batch_size.get() {
|
||||
trace!(%max_batch_size, "stopping batching because of batch size");
|
||||
assert_eq!(accum_requests.len(), max_batch_size.get());
|
||||
return false;
|
||||
};
|
||||
|
||||
let batch_break =
|
||||
eligible_batch.should_break_batch(&this_msg, max_batch_size, batching_strategy);
|
||||
|
||||
match batch_break {
|
||||
Some(reason) => {
|
||||
if let BatchedFeMessage::GetPage {
|
||||
batch_break_reason, ..
|
||||
} = eligible_batch
|
||||
{
|
||||
*batch_break_reason = reason;
|
||||
}
|
||||
if !accum_shard.is_same_handle_as(&this_shard) {
|
||||
trace!("stopping batching because timeline object mismatch");
|
||||
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
|
||||
// But the current logic for keeping responses in order does not support that.
|
||||
return false;
|
||||
}
|
||||
let this_batch_key = this_requests[0].req.batch_key;
|
||||
let accum_batch_key = accum_requests[0].req.batch_key;
|
||||
if this_requests[0].req.batch_key != accum_requests[0].req.batch_key {
|
||||
trace!(%accum_batch_key, %this_batch_key, "stopping batching because batch key changed");
|
||||
return false;
|
||||
}
|
||||
true
|
||||
})() =>
|
||||
{
|
||||
// ok to batch
|
||||
accum_requests.extend(this_requests);
|
||||
Ok(())
|
||||
}
|
||||
// something batched already but this message is unbatchable
|
||||
(_, this_msg) => {
|
||||
// by default, don't continue batching
|
||||
|
||||
Err(Ok(this_msg))
|
||||
}
|
||||
None => {
|
||||
// ok to batch
|
||||
match (eligible_batch, this_msg) {
|
||||
(
|
||||
BatchedFeMessage::GetPage {
|
||||
pages: accum_pages, ..
|
||||
},
|
||||
BatchedFeMessage::GetPage {
|
||||
pages: this_pages, ..
|
||||
},
|
||||
) => {
|
||||
accum_pages.extend(this_pages);
|
||||
Ok(())
|
||||
}
|
||||
#[cfg(feature = "testing")]
|
||||
(
|
||||
BatchedFeMessage::Test {
|
||||
requests: accum_requests,
|
||||
..
|
||||
},
|
||||
BatchedFeMessage::Test {
|
||||
requests: this_requests,
|
||||
..
|
||||
},
|
||||
) => {
|
||||
accum_requests.extend(this_requests);
|
||||
Ok(())
|
||||
}
|
||||
// Shape guaranteed by [`BatchedFeMessage::should_break_batch`]
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1413,7 +1472,12 @@ impl PageServerHandler {
|
||||
span,
|
||||
)
|
||||
}
|
||||
BatchedFeMessage::GetPage { span, shard, pages } => {
|
||||
BatchedFeMessage::GetPage {
|
||||
span,
|
||||
shard,
|
||||
pages,
|
||||
batch_break_reason,
|
||||
} => {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::getpage");
|
||||
let (shard, ctx) = upgrade_handle_and_set_context!(shard);
|
||||
(
|
||||
@@ -1425,6 +1489,7 @@ impl PageServerHandler {
|
||||
&shard,
|
||||
pages,
|
||||
io_concurrency,
|
||||
batch_break_reason,
|
||||
&ctx,
|
||||
)
|
||||
.instrument(span.clone())
|
||||
@@ -2113,13 +2178,14 @@ impl PageServerHandler {
|
||||
timeline: &Timeline,
|
||||
requests: smallvec::SmallVec<[BatchedGetPageRequest; 1]>,
|
||||
io_concurrency: IoConcurrency,
|
||||
batch_break_reason: GetPageBatchBreakReason,
|
||||
ctx: &RequestContext,
|
||||
) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
timeline
|
||||
.query_metrics
|
||||
.observe_getpage_batch_start(requests.len());
|
||||
.observe_getpage_batch_start(requests.len(), batch_break_reason);
|
||||
|
||||
// If a page trace is running, submit an event for this request.
|
||||
if let Some(page_trace) = timeline.page_trace.load().as_ref() {
|
||||
|
||||
@@ -5933,12 +5933,20 @@ mod tests {
|
||||
use models::CompactLsnRange;
|
||||
use pageserver_api::key::{AUX_KEY_PREFIX, Key, NON_INHERITED_RANGE, RELATION_SIZE_PREFIX};
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
#[cfg(feature = "testing")]
|
||||
use pageserver_api::keyspace::KeySpaceRandomAccum;
|
||||
use pageserver_api::models::{CompactionAlgorithm, CompactionAlgorithmSettings};
|
||||
#[cfg(feature = "testing")]
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
use pageserver_api::value::Value;
|
||||
use pageserver_compaction::helpers::overlaps_with;
|
||||
#[cfg(feature = "testing")]
|
||||
use rand::SeedableRng;
|
||||
#[cfg(feature = "testing")]
|
||||
use rand::rngs::StdRng;
|
||||
use rand::{Rng, thread_rng};
|
||||
#[cfg(feature = "testing")]
|
||||
use std::ops::Range;
|
||||
use storage_layer::{IoConcurrency, PersistentLayerKey};
|
||||
use tests::storage_layer::ValuesReconstructState;
|
||||
use tests::timeline::{GetVectoredError, ShutdownMode};
|
||||
@@ -5960,6 +5968,318 @@ mod tests {
|
||||
static TEST_KEY: Lazy<Key> =
|
||||
Lazy::new(|| Key::from_slice(&hex!("010000000033333333444444445500000001")));
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
struct TestTimelineSpecification {
|
||||
start_lsn: Lsn,
|
||||
last_record_lsn: Lsn,
|
||||
|
||||
in_memory_layers_shape: Vec<(Range<Key>, Range<Lsn>)>,
|
||||
delta_layers_shape: Vec<(Range<Key>, Range<Lsn>)>,
|
||||
image_layers_shape: Vec<(Range<Key>, Lsn)>,
|
||||
|
||||
gap_chance: u8,
|
||||
will_init_chance: u8,
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
struct Storage {
|
||||
storage: HashMap<(Key, Lsn), Value>,
|
||||
start_lsn: Lsn,
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
impl Storage {
|
||||
fn get(&self, key: Key, lsn: Lsn) -> Bytes {
|
||||
use bytes::BufMut;
|
||||
|
||||
let mut crnt_lsn = lsn;
|
||||
let mut got_base = false;
|
||||
|
||||
let mut acc = Vec::new();
|
||||
|
||||
while crnt_lsn >= self.start_lsn {
|
||||
if let Some(value) = self.storage.get(&(key, crnt_lsn)) {
|
||||
acc.push(value.clone());
|
||||
|
||||
match value {
|
||||
Value::WalRecord(NeonWalRecord::Test { will_init, .. }) => {
|
||||
if *will_init {
|
||||
got_base = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
Value::Image(_) => {
|
||||
got_base = true;
|
||||
break;
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
crnt_lsn = crnt_lsn.checked_sub(1u64).unwrap();
|
||||
}
|
||||
|
||||
assert!(
|
||||
got_base,
|
||||
"Input data was incorrect. No base image for {key}@{lsn}"
|
||||
);
|
||||
|
||||
tracing::debug!("Wal redo depth for {key}@{lsn} is {}", acc.len());
|
||||
|
||||
let mut blob = BytesMut::new();
|
||||
for value in acc.into_iter().rev() {
|
||||
match value {
|
||||
Value::WalRecord(NeonWalRecord::Test { append, .. }) => {
|
||||
blob.extend_from_slice(append.as_bytes());
|
||||
}
|
||||
Value::Image(img) => {
|
||||
blob.put(img);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
blob.into()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn randomize_timeline(
|
||||
tenant: &Arc<Tenant>,
|
||||
new_timeline_id: TimelineId,
|
||||
pg_version: u32,
|
||||
spec: TestTimelineSpecification,
|
||||
random: &mut rand::rngs::StdRng,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<(Arc<Timeline>, Storage, Vec<Lsn>)> {
|
||||
let mut storage: HashMap<(Key, Lsn), Value> = HashMap::default();
|
||||
let mut interesting_lsns = vec![spec.last_record_lsn];
|
||||
|
||||
for (key_range, lsn_range) in spec.in_memory_layers_shape.iter() {
|
||||
let mut lsn = lsn_range.start;
|
||||
while lsn < lsn_range.end {
|
||||
let mut key = key_range.start;
|
||||
while key < key_range.end {
|
||||
let gap = random.gen_range(1..=100) <= spec.gap_chance;
|
||||
let will_init = random.gen_range(1..=100) <= spec.will_init_chance;
|
||||
|
||||
if gap {
|
||||
continue;
|
||||
}
|
||||
|
||||
let record = if will_init {
|
||||
Value::WalRecord(NeonWalRecord::wal_init(format!("[wil_init {key}@{lsn}]")))
|
||||
} else {
|
||||
Value::WalRecord(NeonWalRecord::wal_append(format!("[delta {key}@{lsn}]")))
|
||||
};
|
||||
|
||||
storage.insert((key, lsn), record);
|
||||
|
||||
key = key.next();
|
||||
}
|
||||
lsn = Lsn(lsn.0 + 1);
|
||||
}
|
||||
|
||||
// Stash some interesting LSN for future use
|
||||
for offset in [0, 5, 100].iter() {
|
||||
if *offset == 0 {
|
||||
interesting_lsns.push(lsn_range.start);
|
||||
} else {
|
||||
let below = lsn_range.start.checked_sub(*offset);
|
||||
match below {
|
||||
Some(v) if v >= spec.start_lsn => {
|
||||
interesting_lsns.push(v);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
let above = Lsn(lsn_range.start.0 + offset);
|
||||
interesting_lsns.push(above);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (key_range, lsn_range) in spec.delta_layers_shape.iter() {
|
||||
let mut lsn = lsn_range.start;
|
||||
while lsn < lsn_range.end {
|
||||
let mut key = key_range.start;
|
||||
while key < key_range.end {
|
||||
let gap = random.gen_range(1..=100) <= spec.gap_chance;
|
||||
let will_init = random.gen_range(1..=100) <= spec.will_init_chance;
|
||||
|
||||
if gap {
|
||||
continue;
|
||||
}
|
||||
|
||||
let record = if will_init {
|
||||
Value::WalRecord(NeonWalRecord::wal_init(format!("[wil_init {key}@{lsn}]")))
|
||||
} else {
|
||||
Value::WalRecord(NeonWalRecord::wal_append(format!("[delta {key}@{lsn}]")))
|
||||
};
|
||||
|
||||
storage.insert((key, lsn), record);
|
||||
|
||||
key = key.next();
|
||||
}
|
||||
lsn = Lsn(lsn.0 + 1);
|
||||
}
|
||||
|
||||
// Stash some interesting LSN for future use
|
||||
for offset in [0, 5, 100].iter() {
|
||||
if *offset == 0 {
|
||||
interesting_lsns.push(lsn_range.start);
|
||||
} else {
|
||||
let below = lsn_range.start.checked_sub(*offset);
|
||||
match below {
|
||||
Some(v) if v >= spec.start_lsn => {
|
||||
interesting_lsns.push(v);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
let above = Lsn(lsn_range.start.0 + offset);
|
||||
interesting_lsns.push(above);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (key_range, lsn) in spec.image_layers_shape.iter() {
|
||||
let mut key = key_range.start;
|
||||
while key < key_range.end {
|
||||
let blob = Bytes::from(format!("[image {key}@{lsn}]"));
|
||||
let record = Value::Image(blob.clone());
|
||||
storage.insert((key, *lsn), record);
|
||||
|
||||
key = key.next();
|
||||
}
|
||||
|
||||
// Stash some interesting LSN for future use
|
||||
for offset in [0, 5, 100].iter() {
|
||||
if *offset == 0 {
|
||||
interesting_lsns.push(*lsn);
|
||||
} else {
|
||||
let below = lsn.checked_sub(*offset);
|
||||
match below {
|
||||
Some(v) if v >= spec.start_lsn => {
|
||||
interesting_lsns.push(v);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
let above = Lsn(lsn.0 + offset);
|
||||
interesting_lsns.push(above);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let in_memory_test_layers = {
|
||||
let mut acc = Vec::new();
|
||||
|
||||
for (key_range, lsn_range) in spec.in_memory_layers_shape.iter() {
|
||||
let mut data = Vec::new();
|
||||
|
||||
let mut lsn = lsn_range.start;
|
||||
while lsn < lsn_range.end {
|
||||
let mut key = key_range.start;
|
||||
while key < key_range.end {
|
||||
if let Some(record) = storage.get(&(key, lsn)) {
|
||||
data.push((key, lsn, record.clone()));
|
||||
}
|
||||
|
||||
key = key.next();
|
||||
}
|
||||
lsn = Lsn(lsn.0 + 1);
|
||||
}
|
||||
|
||||
acc.push(InMemoryLayerTestDesc {
|
||||
data,
|
||||
lsn_range: lsn_range.clone(),
|
||||
is_open: false,
|
||||
})
|
||||
}
|
||||
|
||||
acc
|
||||
};
|
||||
|
||||
let delta_test_layers = {
|
||||
let mut acc = Vec::new();
|
||||
|
||||
for (key_range, lsn_range) in spec.delta_layers_shape.iter() {
|
||||
let mut data = Vec::new();
|
||||
|
||||
let mut lsn = lsn_range.start;
|
||||
while lsn < lsn_range.end {
|
||||
let mut key = key_range.start;
|
||||
while key < key_range.end {
|
||||
if let Some(record) = storage.get(&(key, lsn)) {
|
||||
data.push((key, lsn, record.clone()));
|
||||
}
|
||||
|
||||
key = key.next();
|
||||
}
|
||||
lsn = Lsn(lsn.0 + 1);
|
||||
}
|
||||
|
||||
acc.push(DeltaLayerTestDesc {
|
||||
data,
|
||||
lsn_range: lsn_range.clone(),
|
||||
key_range: key_range.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
acc
|
||||
};
|
||||
|
||||
let image_test_layers = {
|
||||
let mut acc = Vec::new();
|
||||
|
||||
for (key_range, lsn) in spec.image_layers_shape.iter() {
|
||||
let mut data = Vec::new();
|
||||
|
||||
let mut key = key_range.start;
|
||||
while key < key_range.end {
|
||||
if let Some(record) = storage.get(&(key, *lsn)) {
|
||||
let blob = match record {
|
||||
Value::Image(blob) => blob.clone(),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
data.push((key, blob));
|
||||
}
|
||||
|
||||
key = key.next();
|
||||
}
|
||||
|
||||
acc.push((*lsn, data));
|
||||
}
|
||||
|
||||
acc
|
||||
};
|
||||
|
||||
let tline = tenant
|
||||
.create_test_timeline_with_layers(
|
||||
new_timeline_id,
|
||||
spec.start_lsn,
|
||||
pg_version,
|
||||
ctx,
|
||||
in_memory_test_layers,
|
||||
delta_test_layers,
|
||||
image_test_layers,
|
||||
spec.last_record_lsn,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok((
|
||||
tline,
|
||||
Storage {
|
||||
storage,
|
||||
start_lsn: spec.start_lsn,
|
||||
},
|
||||
interesting_lsns,
|
||||
))
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_basic() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_basic").await?.load().await;
|
||||
@@ -10543,6 +10863,214 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// A randomized read path test. Generates a layer map according to a deterministic
|
||||
// specification. Fills the (key, LSN) space in random manner and then performs
|
||||
// random scattered queries validating the results against in-memory storage.
|
||||
//
|
||||
// See this internal Notion page for a diagram of the layer map:
|
||||
// https://www.notion.so/neondatabase/Read-Path-Unit-Testing-Fuzzing-1d1f189e0047806c8e5cd37781b0a350?pvs=4
|
||||
//
|
||||
// A fuzzing mode is also supported. In this mode, the test will use a random
|
||||
// seed instead of a hardcoded one. Use it in conjunction with `cargo stress`
|
||||
// to run multiple instances in parallel:
|
||||
//
|
||||
// $ RUST_BACKTRACE=1 RUST_LOG=INFO \
|
||||
// cargo stress --package=pageserver --features=testing,fuzz-read-path --release -- test_read_path
|
||||
#[cfg(feature = "testing")]
|
||||
#[tokio::test]
|
||||
async fn test_read_path() -> anyhow::Result<()> {
|
||||
use rand::seq::SliceRandom;
|
||||
|
||||
let seed = if cfg!(feature = "fuzz-read-path") {
|
||||
let seed: u64 = thread_rng().r#gen();
|
||||
seed
|
||||
} else {
|
||||
// Use a hard-coded seed when not in fuzzing mode.
|
||||
// Note that with the current approach results are not reproducible
|
||||
// accross platforms and Rust releases.
|
||||
const SEED: u64 = 0;
|
||||
SEED
|
||||
};
|
||||
|
||||
let mut random = StdRng::seed_from_u64(seed);
|
||||
|
||||
let (queries, will_init_chance, gap_chance) = if cfg!(feature = "fuzz-read-path") {
|
||||
const QUERIES: u64 = 5000;
|
||||
let will_init_chance: u8 = random.gen_range(0..=10);
|
||||
let gap_chance: u8 = random.gen_range(0..=50);
|
||||
|
||||
(QUERIES, will_init_chance, gap_chance)
|
||||
} else {
|
||||
const QUERIES: u64 = 1000;
|
||||
const WILL_INIT_CHANCE: u8 = 1;
|
||||
const GAP_CHANCE: u8 = 5;
|
||||
|
||||
(QUERIES, WILL_INIT_CHANCE, GAP_CHANCE)
|
||||
};
|
||||
|
||||
let harness = TenantHarness::create("test_read_path").await?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
tracing::info!("Using random seed: {seed}");
|
||||
tracing::info!(%will_init_chance, %gap_chance, "Fill params");
|
||||
|
||||
// Define the layer map shape. Note that this part is not randomized.
|
||||
|
||||
const KEY_DIMENSION_SIZE: u32 = 99;
|
||||
let start_key = Key::from_hex("110000000033333333444444445500000000").unwrap();
|
||||
let end_key = start_key.add(KEY_DIMENSION_SIZE);
|
||||
let total_key_range = start_key..end_key;
|
||||
let total_key_range_size = end_key.to_i128() - start_key.to_i128();
|
||||
let total_start_lsn = Lsn(104);
|
||||
let last_record_lsn = Lsn(504);
|
||||
|
||||
assert!(total_key_range_size % 3 == 0);
|
||||
|
||||
let in_memory_layers_shape = vec![
|
||||
(total_key_range.clone(), Lsn(304)..Lsn(400)),
|
||||
(total_key_range.clone(), Lsn(400)..last_record_lsn),
|
||||
];
|
||||
|
||||
let delta_layers_shape = vec![
|
||||
(
|
||||
start_key..(start_key.add((total_key_range_size / 3) as u32)),
|
||||
Lsn(200)..Lsn(304),
|
||||
),
|
||||
(
|
||||
(start_key.add((total_key_range_size / 3) as u32))
|
||||
..(start_key.add((total_key_range_size * 2 / 3) as u32)),
|
||||
Lsn(200)..Lsn(304),
|
||||
),
|
||||
(
|
||||
(start_key.add((total_key_range_size * 2 / 3) as u32))
|
||||
..(start_key.add(total_key_range_size as u32)),
|
||||
Lsn(200)..Lsn(304),
|
||||
),
|
||||
];
|
||||
|
||||
let image_layers_shape = vec![
|
||||
(
|
||||
start_key.add((total_key_range_size * 2 / 3 - 10) as u32)
|
||||
..start_key.add((total_key_range_size * 2 / 3 + 10) as u32),
|
||||
Lsn(456),
|
||||
),
|
||||
(
|
||||
start_key.add((total_key_range_size / 3 - 10) as u32)
|
||||
..start_key.add((total_key_range_size / 3 + 10) as u32),
|
||||
Lsn(256),
|
||||
),
|
||||
(total_key_range.clone(), total_start_lsn),
|
||||
];
|
||||
|
||||
let specification = TestTimelineSpecification {
|
||||
start_lsn: total_start_lsn,
|
||||
last_record_lsn,
|
||||
in_memory_layers_shape,
|
||||
delta_layers_shape,
|
||||
image_layers_shape,
|
||||
gap_chance,
|
||||
will_init_chance,
|
||||
};
|
||||
|
||||
// Create and randomly fill in the layers according to the specification
|
||||
let (tline, storage, interesting_lsns) = randomize_timeline(
|
||||
&tenant,
|
||||
TIMELINE_ID,
|
||||
DEFAULT_PG_VERSION,
|
||||
specification,
|
||||
&mut random,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Now generate queries based on the interesting lsns that we've collected.
|
||||
//
|
||||
// While there's still room in the query, pick and interesting LSN and a random
|
||||
// key. Then roll the dice to see if the next key should also be included in
|
||||
// the query. When the roll fails, break the "batch" and pick another point in the
|
||||
// (key, LSN) space.
|
||||
|
||||
const PICK_NEXT_CHANCE: u8 = 50;
|
||||
for _ in 0..queries {
|
||||
let query = {
|
||||
let mut keyspaces_at_lsn: HashMap<Lsn, KeySpaceRandomAccum> = HashMap::default();
|
||||
let mut used_keys: HashSet<Key> = HashSet::default();
|
||||
|
||||
while used_keys.len() < Timeline::MAX_GET_VECTORED_KEYS as usize {
|
||||
let selected_lsn = interesting_lsns.choose(&mut random).expect("not empty");
|
||||
let mut selected_key = start_key.add(random.gen_range(0..KEY_DIMENSION_SIZE));
|
||||
|
||||
while used_keys.len() < Timeline::MAX_GET_VECTORED_KEYS as usize {
|
||||
if used_keys.contains(&selected_key)
|
||||
|| selected_key >= start_key.add(KEY_DIMENSION_SIZE)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
keyspaces_at_lsn
|
||||
.entry(*selected_lsn)
|
||||
.or_default()
|
||||
.add_key(selected_key);
|
||||
used_keys.insert(selected_key);
|
||||
|
||||
let pick_next = random.gen_range(0..=100) <= PICK_NEXT_CHANCE;
|
||||
if pick_next {
|
||||
selected_key = selected_key.next();
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
VersionedKeySpaceQuery::scattered(
|
||||
keyspaces_at_lsn
|
||||
.into_iter()
|
||||
.map(|(lsn, acc)| (lsn, acc.to_keyspace()))
|
||||
.collect(),
|
||||
)
|
||||
};
|
||||
|
||||
// Run the query and validate the results
|
||||
|
||||
let results = tline
|
||||
.get_vectored(query.clone(), IoConcurrency::Sequential, &ctx)
|
||||
.await;
|
||||
|
||||
let blobs = match results {
|
||||
Ok(ok) => ok,
|
||||
Err(err) => {
|
||||
panic!("seed={seed} Error returned for query {query}: {err}");
|
||||
}
|
||||
};
|
||||
|
||||
for (key, key_res) in blobs.into_iter() {
|
||||
match key_res {
|
||||
Ok(blob) => {
|
||||
let requested_at_lsn = query.map_key_to_lsn(&key);
|
||||
let expected = storage.get(key, requested_at_lsn);
|
||||
|
||||
if blob != expected {
|
||||
tracing::error!(
|
||||
"seed={seed} Mismatch for {key}@{requested_at_lsn} from query: {query}"
|
||||
);
|
||||
}
|
||||
|
||||
assert_eq!(blob, expected);
|
||||
}
|
||||
Err(err) => {
|
||||
let requested_at_lsn = query.map_key_to_lsn(&key);
|
||||
|
||||
panic!(
|
||||
"seed={seed} Error returned for {key}@{requested_at_lsn} from query {query}: {err}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn sort_layer_key(k1: &PersistentLayerKey, k2: &PersistentLayerKey) -> std::cmp::Ordering {
|
||||
(
|
||||
k1.is_delta,
|
||||
|
||||
@@ -22,6 +22,7 @@ use bytes::{BufMut, BytesMut};
|
||||
use pageserver_api::models::ImageCompressionAlgorithm;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::context::RequestContext;
|
||||
@@ -169,7 +170,13 @@ pub struct BlobWriter<const BUFFERED: bool> {
|
||||
}
|
||||
|
||||
impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
pub fn new(inner: VirtualFile, start_offset: u64) -> Self {
|
||||
pub fn new(
|
||||
inner: VirtualFile,
|
||||
start_offset: u64,
|
||||
_gate: &utils::sync::gate::Gate,
|
||||
_cancel: CancellationToken,
|
||||
_ctx: &RequestContext,
|
||||
) -> Self {
|
||||
Self {
|
||||
inner,
|
||||
offset: start_offset,
|
||||
@@ -432,12 +439,14 @@ pub(crate) mod tests {
|
||||
) -> Result<(Utf8TempDir, Utf8PathBuf, Vec<u64>), Error> {
|
||||
let temp_dir = camino_tempfile::tempdir()?;
|
||||
let pathbuf = temp_dir.path().join("file");
|
||||
let gate = utils::sync::gate::Gate::default();
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
// Write part (in block to drop the file)
|
||||
let mut offsets = Vec::new();
|
||||
{
|
||||
let file = VirtualFile::create(pathbuf.as_path(), ctx).await?;
|
||||
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
|
||||
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0, &gate, cancel.clone(), ctx);
|
||||
for blob in blobs.iter() {
|
||||
let (_, res) = if compression {
|
||||
let res = wtr
|
||||
|
||||
@@ -714,7 +714,7 @@ impl LayerMap {
|
||||
true
|
||||
}
|
||||
|
||||
pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<PersistentLayerDesc>> {
|
||||
pub fn iter_historic_layers(&self) -> impl ExactSizeIterator<Item = Arc<PersistentLayerDesc>> {
|
||||
self.historic.iter()
|
||||
}
|
||||
|
||||
|
||||
@@ -504,7 +504,7 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
|
||||
}
|
||||
|
||||
/// Iterate all the layers
|
||||
pub fn iter(&self) -> impl '_ + Iterator<Item = Value> {
|
||||
pub fn iter(&self) -> impl ExactSizeIterator<Item = Value> {
|
||||
// NOTE we can actually perform this without rebuilding,
|
||||
// but it's not necessary for now.
|
||||
if !self.buffer.is_empty() {
|
||||
|
||||
@@ -5,6 +5,7 @@ use std::sync::Arc;
|
||||
use bytes::Bytes;
|
||||
use pageserver_api::key::{KEY_SIZE, Key};
|
||||
use pageserver_api::value::Value;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::id::TimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::shard::TenantShardId;
|
||||
@@ -179,7 +180,7 @@ impl BatchLayerWriter {
|
||||
|
||||
/// An image writer that takes images and produces multiple image layers.
|
||||
#[must_use]
|
||||
pub struct SplitImageLayerWriter {
|
||||
pub struct SplitImageLayerWriter<'a> {
|
||||
inner: ImageLayerWriter,
|
||||
target_layer_size: u64,
|
||||
lsn: Lsn,
|
||||
@@ -188,9 +189,12 @@ pub struct SplitImageLayerWriter {
|
||||
tenant_shard_id: TenantShardId,
|
||||
batches: BatchLayerWriter,
|
||||
start_key: Key,
|
||||
gate: &'a utils::sync::gate::Gate,
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
impl SplitImageLayerWriter {
|
||||
impl<'a> SplitImageLayerWriter<'a> {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn new(
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
@@ -198,6 +202,8 @@ impl SplitImageLayerWriter {
|
||||
start_key: Key,
|
||||
lsn: Lsn,
|
||||
target_layer_size: u64,
|
||||
gate: &'a utils::sync::gate::Gate,
|
||||
cancel: CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Self> {
|
||||
Ok(Self {
|
||||
@@ -208,6 +214,8 @@ impl SplitImageLayerWriter {
|
||||
tenant_shard_id,
|
||||
&(start_key..Key::MAX),
|
||||
lsn,
|
||||
gate,
|
||||
cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
@@ -217,6 +225,8 @@ impl SplitImageLayerWriter {
|
||||
batches: BatchLayerWriter::new(conf).await?,
|
||||
lsn,
|
||||
start_key,
|
||||
gate,
|
||||
cancel,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -239,6 +249,8 @@ impl SplitImageLayerWriter {
|
||||
self.tenant_shard_id,
|
||||
&(key..Key::MAX),
|
||||
self.lsn,
|
||||
self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -291,7 +303,7 @@ impl SplitImageLayerWriter {
|
||||
/// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
|
||||
/// will split them into multiple files based on size.
|
||||
#[must_use]
|
||||
pub struct SplitDeltaLayerWriter {
|
||||
pub struct SplitDeltaLayerWriter<'a> {
|
||||
inner: Option<(Key, DeltaLayerWriter)>,
|
||||
target_layer_size: u64,
|
||||
conf: &'static PageServerConf,
|
||||
@@ -300,15 +312,19 @@ pub struct SplitDeltaLayerWriter {
|
||||
lsn_range: Range<Lsn>,
|
||||
last_key_written: Key,
|
||||
batches: BatchLayerWriter,
|
||||
gate: &'a utils::sync::gate::Gate,
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
impl SplitDeltaLayerWriter {
|
||||
impl<'a> SplitDeltaLayerWriter<'a> {
|
||||
pub async fn new(
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
lsn_range: Range<Lsn>,
|
||||
target_layer_size: u64,
|
||||
gate: &'a utils::sync::gate::Gate,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<Self> {
|
||||
Ok(Self {
|
||||
target_layer_size,
|
||||
@@ -319,6 +335,8 @@ impl SplitDeltaLayerWriter {
|
||||
lsn_range,
|
||||
last_key_written: Key::MIN,
|
||||
batches: BatchLayerWriter::new(conf).await?,
|
||||
gate,
|
||||
cancel,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -344,6 +362,8 @@ impl SplitDeltaLayerWriter {
|
||||
self.tenant_shard_id,
|
||||
key,
|
||||
self.lsn_range.clone(),
|
||||
self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
@@ -362,6 +382,8 @@ impl SplitDeltaLayerWriter {
|
||||
self.tenant_shard_id,
|
||||
key,
|
||||
self.lsn_range.clone(),
|
||||
self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -469,6 +491,8 @@ mod tests {
|
||||
get_key(0),
|
||||
Lsn(0x18),
|
||||
4 * 1024 * 1024,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
@@ -480,6 +504,8 @@ mod tests {
|
||||
tenant.tenant_shard_id,
|
||||
Lsn(0x18)..Lsn(0x20),
|
||||
4 * 1024 * 1024,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -546,6 +572,8 @@ mod tests {
|
||||
get_key(0),
|
||||
Lsn(0x18),
|
||||
4 * 1024 * 1024,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
@@ -556,6 +584,8 @@ mod tests {
|
||||
tenant.tenant_shard_id,
|
||||
Lsn(0x18)..Lsn(0x20),
|
||||
4 * 1024 * 1024,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -643,6 +673,8 @@ mod tests {
|
||||
get_key(0),
|
||||
Lsn(0x18),
|
||||
4 * 1024,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
@@ -654,6 +686,8 @@ mod tests {
|
||||
tenant.tenant_shard_id,
|
||||
Lsn(0x18)..Lsn(0x20),
|
||||
4 * 1024,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -730,6 +764,8 @@ mod tests {
|
||||
tenant.tenant_shard_id,
|
||||
Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
|
||||
4 * 1024 * 1024,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -50,6 +50,7 @@ use rand::distributions::Alphanumeric;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::OnceCell;
|
||||
use tokio_epoll_uring::IoBuf;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::bin_ser::BeSer;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
@@ -400,12 +401,15 @@ impl DeltaLayerWriterInner {
|
||||
///
|
||||
/// Start building a new delta layer.
|
||||
///
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn new(
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
key_start: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
gate: &utils::sync::gate::Gate,
|
||||
cancel: CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Self> {
|
||||
// Create the file initially with a temporary filename. We don't know
|
||||
@@ -420,7 +424,7 @@ impl DeltaLayerWriterInner {
|
||||
let mut file = VirtualFile::create(&path, ctx).await?;
|
||||
// make room for the header block
|
||||
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
|
||||
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
|
||||
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx);
|
||||
|
||||
// Initialize the b-tree index builder
|
||||
let block_buf = BlockBuf::new();
|
||||
@@ -628,12 +632,15 @@ impl DeltaLayerWriter {
|
||||
///
|
||||
/// Start building a new delta layer.
|
||||
///
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn new(
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
key_start: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
gate: &utils::sync::gate::Gate,
|
||||
cancel: CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Self> {
|
||||
Ok(Self {
|
||||
@@ -644,6 +651,8 @@ impl DeltaLayerWriter {
|
||||
tenant_shard_id,
|
||||
key_start,
|
||||
lsn_range,
|
||||
gate,
|
||||
cancel,
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
@@ -1885,6 +1894,8 @@ pub(crate) mod test {
|
||||
harness.tenant_shard_id,
|
||||
entries_meta.key_range.start,
|
||||
entries_meta.lsn_range.clone(),
|
||||
&timeline.gate,
|
||||
timeline.cancel.clone(),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -2079,6 +2090,8 @@ pub(crate) mod test {
|
||||
tenant.tenant_shard_id,
|
||||
Key::MIN,
|
||||
Lsn(0x11)..truncate_at,
|
||||
&branch.gate,
|
||||
branch.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
@@ -2213,6 +2226,8 @@ pub(crate) mod test {
|
||||
tenant.tenant_shard_id,
|
||||
*key_start,
|
||||
(*lsn_min)..lsn_end,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -48,6 +48,7 @@ use rand::distributions::Alphanumeric;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::OnceCell;
|
||||
use tokio_stream::StreamExt;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::bin_ser::BeSer;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
@@ -748,12 +749,15 @@ impl ImageLayerWriterInner {
|
||||
///
|
||||
/// Start building a new image layer.
|
||||
///
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn new(
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
key_range: &Range<Key>,
|
||||
lsn: Lsn,
|
||||
gate: &utils::sync::gate::Gate,
|
||||
cancel: CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Self> {
|
||||
// Create the file initially with a temporary filename.
|
||||
@@ -780,7 +784,7 @@ impl ImageLayerWriterInner {
|
||||
};
|
||||
// make room for the header block
|
||||
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
|
||||
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
|
||||
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx);
|
||||
|
||||
// Initialize the b-tree index builder
|
||||
let block_buf = BlockBuf::new();
|
||||
@@ -988,18 +992,30 @@ impl ImageLayerWriter {
|
||||
///
|
||||
/// Start building a new image layer.
|
||||
///
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn new(
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
key_range: &Range<Key>,
|
||||
lsn: Lsn,
|
||||
gate: &utils::sync::gate::Gate,
|
||||
cancel: CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ImageLayerWriter> {
|
||||
Ok(Self {
|
||||
inner: Some(
|
||||
ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn, ctx)
|
||||
.await?,
|
||||
ImageLayerWriterInner::new(
|
||||
conf,
|
||||
timeline_id,
|
||||
tenant_shard_id,
|
||||
key_range,
|
||||
lsn,
|
||||
gate,
|
||||
cancel,
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
),
|
||||
})
|
||||
}
|
||||
@@ -1203,6 +1219,8 @@ mod test {
|
||||
harness.tenant_shard_id,
|
||||
&range,
|
||||
lsn,
|
||||
&timeline.gate,
|
||||
timeline.cancel.clone(),
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
@@ -1268,6 +1286,8 @@ mod test {
|
||||
harness.tenant_shard_id,
|
||||
&range,
|
||||
lsn,
|
||||
&timeline.gate,
|
||||
timeline.cancel.clone(),
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
@@ -1346,6 +1366,8 @@ mod test {
|
||||
tenant.tenant_shard_id,
|
||||
&key_range,
|
||||
lsn,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -719,6 +719,8 @@ impl InMemoryLayer {
|
||||
ctx: &RequestContext,
|
||||
key_range: Option<Range<Key>>,
|
||||
l0_flush_global_state: &l0_flush::Inner,
|
||||
gate: &utils::sync::gate::Gate,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<Option<(PersistentLayerDesc, Utf8PathBuf)>> {
|
||||
// Grab the lock in read-mode. We hold it over the I/O, but because this
|
||||
// layer is not writeable anymore, no one should be trying to acquire the
|
||||
@@ -759,6 +761,8 @@ impl InMemoryLayer {
|
||||
self.tenant_shard_id,
|
||||
Key::MIN,
|
||||
self.start_lsn..end_lsn,
|
||||
gate,
|
||||
cancel,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -4026,7 +4026,7 @@ impl VersionedKeySpaceQuery {
|
||||
/// Returns LSN for a specific key.
|
||||
///
|
||||
/// Invariant: requested key must be part of [`Self::total_keyspace`]
|
||||
fn map_key_to_lsn(&self, key: &Key) -> Lsn {
|
||||
pub(super) fn map_key_to_lsn(&self, key: &Key) -> Lsn {
|
||||
match self {
|
||||
Self::Uniform { lsn, .. } => *lsn,
|
||||
Self::Scattered { keyspaces_at_lsn } => {
|
||||
@@ -4986,7 +4986,13 @@ impl Timeline {
|
||||
let ctx = ctx.attached_child();
|
||||
let work = async move {
|
||||
let Some((desc, path)) = frozen_layer
|
||||
.write_to_disk(&ctx, key_range, self_clone.l0_flush_global_state.inner())
|
||||
.write_to_disk(
|
||||
&ctx,
|
||||
key_range,
|
||||
self_clone.l0_flush_global_state.inner(),
|
||||
&self_clone.gate,
|
||||
self_clone.cancel.clone(),
|
||||
)
|
||||
.await?
|
||||
else {
|
||||
return Ok(None);
|
||||
@@ -5526,6 +5532,8 @@ impl Timeline {
|
||||
self.tenant_shard_id,
|
||||
&img_range,
|
||||
lsn,
|
||||
&self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -5694,6 +5702,12 @@ impl Timeline {
|
||||
return;
|
||||
}
|
||||
|
||||
if self.cancel.is_cancelled() {
|
||||
// We already requested stopping the tenant, so we cannot wait for the logical size
|
||||
// calculation to complete given the task might have been already cancelled.
|
||||
return;
|
||||
}
|
||||
|
||||
if let Some(await_bg_cancel) = self
|
||||
.current_logical_size
|
||||
.cancel_wait_for_background_loop_concurrency_limit_semaphore
|
||||
@@ -6890,6 +6904,8 @@ impl Timeline {
|
||||
self.tenant_shard_id,
|
||||
&(min_key..end_key),
|
||||
lsn,
|
||||
&self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -6951,6 +6967,8 @@ impl Timeline {
|
||||
self.tenant_shard_id,
|
||||
deltas.key_range.start,
|
||||
deltas.lsn_range,
|
||||
&self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -56,7 +56,8 @@ use crate::tenant::storage_layer::batch_split_writer::{
|
||||
use crate::tenant::storage_layer::filter_iterator::FilterIterator;
|
||||
use crate::tenant::storage_layer::merge_iterator::MergeIterator;
|
||||
use crate::tenant::storage_layer::{
|
||||
AsLayerDesc, PersistentLayerDesc, PersistentLayerKey, ValueReconstructState,
|
||||
AsLayerDesc, LayerVisibilityHint, PersistentLayerDesc, PersistentLayerKey,
|
||||
ValueReconstructState,
|
||||
};
|
||||
use crate::tenant::tasks::log_compaction_error;
|
||||
use crate::tenant::timeline::{
|
||||
@@ -69,6 +70,13 @@ use crate::virtual_file::{MaybeFatalIo, VirtualFile};
|
||||
/// Maximum number of deltas before generating an image layer in bottom-most compaction.
|
||||
const COMPACTION_DELTA_THRESHOLD: usize = 5;
|
||||
|
||||
/// Ratio of shard-local pages below which we trigger shard ancestor layer rewrites. 0.3 means that
|
||||
/// <= 30% of layer pages must belong to the descendant shard to rewrite the layer.
|
||||
///
|
||||
/// We choose a value < 0.5 to avoid rewriting all visible layers every time we do a power-of-two
|
||||
/// shard split, which gets expensive for large tenants.
|
||||
const ANCESTOR_COMPACTION_REWRITE_THRESHOLD: f64 = 0.3;
|
||||
|
||||
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
|
||||
pub struct GcCompactionJobId(pub usize);
|
||||
|
||||
@@ -749,8 +757,8 @@ impl KeyHistoryRetention {
|
||||
async fn pipe_to(
|
||||
self,
|
||||
key: Key,
|
||||
delta_writer: &mut SplitDeltaLayerWriter,
|
||||
mut image_writer: Option<&mut SplitImageLayerWriter>,
|
||||
delta_writer: &mut SplitDeltaLayerWriter<'_>,
|
||||
mut image_writer: Option<&mut SplitImageLayerWriter<'_>>,
|
||||
stat: &mut CompactionStatistics,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
@@ -819,7 +827,15 @@ impl KeyHistoryRetention {
|
||||
base_img: &Option<(Lsn, &Bytes)>,
|
||||
history: &[(Lsn, &NeonWalRecord)],
|
||||
tline: &Arc<Timeline>,
|
||||
skip_empty: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
if base_img.is_none() && history.is_empty() {
|
||||
if skip_empty {
|
||||
return Ok(());
|
||||
}
|
||||
anyhow::bail!("verification failed: key {} has no history at {}", key, lsn);
|
||||
};
|
||||
|
||||
let mut records = history
|
||||
.iter()
|
||||
.map(|(lsn, val)| (*lsn, (*val).clone()))
|
||||
@@ -860,17 +876,12 @@ impl KeyHistoryRetention {
|
||||
if *retain_lsn >= min_lsn {
|
||||
// Only verify after the key appears in the full history for the first time.
|
||||
|
||||
if base_img.is_none() && history.is_empty() {
|
||||
anyhow::bail!(
|
||||
"verificatoin failed: key {} has no history at {}",
|
||||
key,
|
||||
retain_lsn
|
||||
);
|
||||
};
|
||||
// We don't modify history: in theory, we could replace the history with a single
|
||||
// image as in `generate_key_retention` to make redos at later LSNs faster. But we
|
||||
// want to verify everything as if they are read from the real layer map.
|
||||
collect_and_verify(key, *retain_lsn, &base_img, &history, tline).await?;
|
||||
collect_and_verify(key, *retain_lsn, &base_img, &history, tline, false)
|
||||
.await
|
||||
.context("below horizon retain_lsn")?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -878,13 +889,17 @@ impl KeyHistoryRetention {
|
||||
match val {
|
||||
Value::Image(img) => {
|
||||
// Above the GC horizon, we verify every time we see an image.
|
||||
collect_and_verify(key, *lsn, &base_img, &history, tline).await?;
|
||||
collect_and_verify(key, *lsn, &base_img, &history, tline, true)
|
||||
.await
|
||||
.context("above horizon full image")?;
|
||||
base_img = Some((*lsn, img));
|
||||
history.clear();
|
||||
}
|
||||
Value::WalRecord(rec) if val.will_init() => {
|
||||
// Above the GC horizon, we verify every time we see an init record.
|
||||
collect_and_verify(key, *lsn, &base_img, &history, tline).await?;
|
||||
collect_and_verify(key, *lsn, &base_img, &history, tline, true)
|
||||
.await
|
||||
.context("above horizon init record")?;
|
||||
base_img = None;
|
||||
history.clear();
|
||||
history.push((*lsn, rec));
|
||||
@@ -895,7 +910,9 @@ impl KeyHistoryRetention {
|
||||
}
|
||||
}
|
||||
// Ensure the latest record is readable.
|
||||
collect_and_verify(key, max_lsn, &base_img, &history, tline).await?;
|
||||
collect_and_verify(key, max_lsn, &base_img, &history, tline, false)
|
||||
.await
|
||||
.context("latest record")?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -1273,7 +1290,10 @@ impl Timeline {
|
||||
let pitr_cutoff = self.gc_info.read().unwrap().cutoffs.time;
|
||||
|
||||
let layers = self.layers.read().await;
|
||||
for layer_desc in layers.layer_map()?.iter_historic_layers() {
|
||||
let layers_iter = layers.layer_map()?.iter_historic_layers();
|
||||
let (layers_total, mut layers_checked) = (layers_iter.len(), 0);
|
||||
for layer_desc in layers_iter {
|
||||
layers_checked += 1;
|
||||
let layer = layers.get_from_desc(&layer_desc);
|
||||
if layer.metadata().shard.shard_count == self.shard_identity.count {
|
||||
// This layer does not belong to a historic ancestor, no need to re-image it.
|
||||
@@ -1317,14 +1337,15 @@ impl Timeline {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Don't bother re-writing a layer unless it will at least halve its size
|
||||
// Only rewrite a layer if we can reclaim significant space.
|
||||
if layer_local_page_count != u32::MAX
|
||||
&& layer_local_page_count > layer_raw_page_count / 2
|
||||
&& layer_local_page_count as f64 / layer_raw_page_count as f64
|
||||
<= ANCESTOR_COMPACTION_REWRITE_THRESHOLD
|
||||
{
|
||||
debug!(%layer,
|
||||
"layer is already mostly local ({}/{}), not rewriting",
|
||||
layer_local_page_count,
|
||||
layer_raw_page_count
|
||||
"layer has a large share of local pages \
|
||||
({layer_local_page_count}/{layer_raw_page_count} > \
|
||||
{ANCESTOR_COMPACTION_REWRITE_THRESHOLD}), not rewriting",
|
||||
);
|
||||
}
|
||||
|
||||
@@ -1336,12 +1357,19 @@ impl Timeline {
|
||||
continue;
|
||||
}
|
||||
|
||||
// We do not yet implement rewrite of delta layers.
|
||||
if layer_desc.is_delta() {
|
||||
// We do not yet implement rewrite of delta layers
|
||||
debug!(%layer, "Skipping rewrite of delta layer");
|
||||
continue;
|
||||
}
|
||||
|
||||
// We don't bother rewriting layers that aren't visible, since these won't be needed by
|
||||
// reads and will likely be garbage collected soon.
|
||||
if layer.visibility() != LayerVisibilityHint::Visible {
|
||||
debug!(%layer, "Skipping rewrite of invisible layer");
|
||||
continue;
|
||||
}
|
||||
|
||||
// Only rewrite layers if their generations differ. This guarantees:
|
||||
// - that local rewrite is safe, as local layer paths will differ between existing layer and rewritten one
|
||||
// - that the layer is persistent in remote storage, as we only see old-generation'd layer via loading from remote storage
|
||||
@@ -1371,7 +1399,8 @@ impl Timeline {
|
||||
}
|
||||
|
||||
info!(
|
||||
"starting shard ancestor compaction, rewriting {} layers and dropping {} layers \
|
||||
"starting shard ancestor compaction, rewriting {} layers and dropping {} layers, \
|
||||
checked {layers_checked}/{layers_total} layers \
|
||||
(latest_gc_cutoff={} pitr_cutoff={})",
|
||||
layers_to_rewrite.len(),
|
||||
drop_layers.len(),
|
||||
@@ -1394,6 +1423,8 @@ impl Timeline {
|
||||
self.tenant_shard_id,
|
||||
&layer.layer_desc().key_range,
|
||||
layer.layer_desc().image_layer_lsn(),
|
||||
&self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
@@ -2033,6 +2064,8 @@ impl Timeline {
|
||||
debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
|
||||
lsn_range.clone()
|
||||
},
|
||||
&self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
@@ -3232,6 +3265,8 @@ impl Timeline {
|
||||
job_desc.compaction_key_range.start,
|
||||
lowest_retain_lsn,
|
||||
self.get_compaction_target_size(),
|
||||
&self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
@@ -3248,6 +3283,8 @@ impl Timeline {
|
||||
self.tenant_shard_id,
|
||||
lowest_retain_lsn..end_lsn,
|
||||
self.get_compaction_target_size(),
|
||||
&self.gate,
|
||||
self.cancel.clone(),
|
||||
)
|
||||
.await
|
||||
.context("failed to create delta layer writer")
|
||||
@@ -3344,6 +3381,8 @@ impl Timeline {
|
||||
self.tenant_shard_id,
|
||||
desc.key_range.start,
|
||||
desc.lsn_range.clone(),
|
||||
&self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
@@ -3361,6 +3400,8 @@ impl Timeline {
|
||||
self.tenant_shard_id,
|
||||
job_desc.compaction_key_range.end,
|
||||
desc.lsn_range.clone(),
|
||||
&self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
@@ -3932,6 +3973,8 @@ impl CompactionJobExecutor for TimelineAdaptor {
|
||||
self.timeline.tenant_shard_id,
|
||||
key_range.start,
|
||||
lsn_range.clone(),
|
||||
&self.timeline.gate,
|
||||
self.timeline.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -4007,6 +4050,8 @@ impl TimelineAdaptor {
|
||||
self.timeline.tenant_shard_id,
|
||||
key_range,
|
||||
lsn,
|
||||
&self.timeline.gate,
|
||||
self.timeline.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -228,6 +228,8 @@ async fn generate_tombstone_image_layer(
|
||||
detached.tenant_shard_id,
|
||||
&key_range,
|
||||
image_lsn,
|
||||
&detached.gate,
|
||||
detached.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
@@ -776,6 +778,8 @@ async fn copy_lsn_prefix(
|
||||
target_timeline.tenant_shard_id,
|
||||
layer.layer_desc().key_range.start,
|
||||
layer.layer_desc().lsn_range.start..end_lsn,
|
||||
&target_timeline.gate,
|
||||
target_timeline.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -738,6 +738,8 @@ impl ChunkProcessingJob {
|
||||
self.timeline.tenant_shard_id,
|
||||
&self.range,
|
||||
self.pgdata_lsn,
|
||||
&self.timeline.gate,
|
||||
self.timeline.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -244,7 +244,8 @@ impl RemoteStorageWrapper {
|
||||
kind: DownloadKind::Large,
|
||||
etag: None,
|
||||
byte_start: Bound::Included(start_inclusive),
|
||||
byte_end: Bound::Excluded(end_exclusive)
|
||||
byte_end: Bound::Excluded(end_exclusive),
|
||||
encryption_key: None,
|
||||
},
|
||||
&self.cancel)
|
||||
.await?;
|
||||
|
||||
@@ -2118,9 +2118,6 @@ HandleSafekeeperResponse(WalProposer *wp, Safekeeper *fromsk)
|
||||
*/
|
||||
if (wp->config->syncSafekeepers)
|
||||
{
|
||||
int n_synced;
|
||||
|
||||
n_synced = 0;
|
||||
for (int i = 0; i < wp->n_safekeepers; i++)
|
||||
{
|
||||
Safekeeper *sk = &wp->safekeeper[i];
|
||||
@@ -2129,8 +2126,6 @@ HandleSafekeeperResponse(WalProposer *wp, Safekeeper *fromsk)
|
||||
/* alive safekeeper which is not synced yet; wait for it */
|
||||
if (sk->state != SS_OFFLINE && !synced)
|
||||
return;
|
||||
if (synced)
|
||||
n_synced++;
|
||||
}
|
||||
|
||||
if (newCommitLsn >= wp->propTermStartLsn)
|
||||
|
||||
@@ -629,15 +629,13 @@ impl ComputeHook {
|
||||
};
|
||||
|
||||
let result = if !self.config.use_local_compute_notifications {
|
||||
let compute_hook_url = if let Some(control_plane_url) = &self.config.control_plane_url {
|
||||
Some(if control_plane_url.ends_with('/') {
|
||||
format!("{control_plane_url}notify-attach")
|
||||
} else {
|
||||
format!("{control_plane_url}/notify-attach")
|
||||
})
|
||||
} else {
|
||||
self.config.compute_hook_url.clone()
|
||||
};
|
||||
let compute_hook_url =
|
||||
self.config
|
||||
.control_plane_url
|
||||
.as_ref()
|
||||
.map(|control_plane_url| {
|
||||
format!("{}/notify-attach", control_plane_url.trim_end_matches('/'))
|
||||
});
|
||||
|
||||
// We validate this at startup
|
||||
let notify_url = compute_hook_url.as_ref().unwrap();
|
||||
|
||||
@@ -86,10 +86,6 @@ struct Cli {
|
||||
#[arg(long)]
|
||||
peer_jwt_token: Option<String>,
|
||||
|
||||
/// URL to control plane compute notification endpoint
|
||||
#[arg(long)]
|
||||
compute_hook_url: Option<String>,
|
||||
|
||||
/// URL to control plane storage API prefix
|
||||
#[arg(long)]
|
||||
control_plane_url: Option<String>,
|
||||
@@ -360,13 +356,11 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
"Insecure config! One or more secrets is not set. This is only permitted in `--dev` mode"
|
||||
);
|
||||
}
|
||||
StrictMode::Strict
|
||||
if args.compute_hook_url.is_none() && args.control_plane_url.is_none() =>
|
||||
{
|
||||
StrictMode::Strict if args.control_plane_url.is_none() => {
|
||||
// Production systems should always have a control plane URL set, to prevent falling
|
||||
// back to trying to use neon_local.
|
||||
anyhow::bail!(
|
||||
"neither `--compute-hook-url` nor `--control-plane-url` are set: this is only permitted in `--dev` mode"
|
||||
"`--control-plane-url` is not set: this is only permitted in `--dev` mode"
|
||||
);
|
||||
}
|
||||
StrictMode::Strict if args.use_local_compute_notifications => {
|
||||
@@ -394,7 +388,6 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
safekeeper_jwt_token: secrets.safekeeper_jwt_token,
|
||||
control_plane_jwt_token: secrets.control_plane_jwt_token,
|
||||
peer_jwt_token: secrets.peer_jwt_token,
|
||||
compute_hook_url: args.compute_hook_url,
|
||||
control_plane_url: args.control_plane_url,
|
||||
max_offline_interval: args
|
||||
.max_offline_interval
|
||||
|
||||
@@ -357,18 +357,10 @@ pub struct Config {
|
||||
// This JWT token will be used to authenticate with other storage controller instances
|
||||
pub peer_jwt_token: Option<String>,
|
||||
|
||||
/// Where the compute hook should send notifications of pageserver attachment locations
|
||||
/// (this URL points to the control plane in prod). If this is None, the compute hook will
|
||||
/// assume it is running in a test environment and try to update neon_local.
|
||||
pub compute_hook_url: Option<String>,
|
||||
|
||||
/// Prefix for storage API endpoints of the control plane. We use this prefix to compute
|
||||
/// URLs that we use to send pageserver and safekeeper attachment locations.
|
||||
/// If this is None, the compute hook will assume it is running in a test environment
|
||||
/// and try to invoke neon_local instead.
|
||||
///
|
||||
/// For now, there is also `compute_hook_url` which allows configuration of the pageserver
|
||||
/// specific endpoint, but it is in the process of being phased out.
|
||||
pub control_plane_url: Option<String>,
|
||||
|
||||
/// Grace period within which a pageserver does not respond to heartbeats, but is still
|
||||
|
||||
@@ -3,19 +3,35 @@
|
||||
* Create a Neon project on staging.
|
||||
* Grant the superuser privileges to the DB user.
|
||||
* (Optional) create a branch for testing
|
||||
* Configure the endpoint by updating the control-plane database with the following settings:
|
||||
* Add the following settings to the `pg_settings` section of the default endpoint configuration for the project using the admin interface:
|
||||
* `Timeone`: `America/Los_Angeles`
|
||||
* `DateStyle`: `Postgres,MDY`
|
||||
* `compute_query_id`: `off`
|
||||
* Add the following section to the project configuration:
|
||||
```json
|
||||
"preload_libraries": {
|
||||
"use_defaults": false,
|
||||
"enabled_libraries": []
|
||||
}
|
||||
```
|
||||
* Checkout the actual `Neon` sources
|
||||
* Patch the sql and expected files for the specific PostgreSQL version, e.g. for v17:
|
||||
```bash
|
||||
$ cd vendor/postgres-v17
|
||||
$ patch -p1 <../../compute/patches/cloud_regress_pg17.patch
|
||||
```
|
||||
* Set the environment variables (please modify according your configuration):
|
||||
```bash
|
||||
$ export DEFAULT_PG_VERSION=17
|
||||
$ export BUILD_TYPE=release
|
||||
```
|
||||
* Build the Neon binaries see [README.md](../../README.md)
|
||||
* Set the environment variable `BENCHMARK_CONNSTR` to the connection URI of your project.
|
||||
* Set the environment variable `PG_VERSION` to the version of your project.
|
||||
* Update poetry, run
|
||||
```bash
|
||||
$ scripts/pysync
|
||||
```
|
||||
* Run
|
||||
```bash
|
||||
$ pytest -m remote_cluster -k cloud_regress
|
||||
$ scripts/pytest -m remote_cluster -k cloud_regress
|
||||
```
|
||||
@@ -194,6 +194,7 @@ PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = (
|
||||
counter("pageserver_wait_lsn_started_count"),
|
||||
counter("pageserver_wait_lsn_finished_count"),
|
||||
counter("pageserver_wait_ondemand_download_seconds_sum"),
|
||||
counter("pageserver_page_service_batch_break_reason"),
|
||||
*histogram("pageserver_page_service_batch_size"),
|
||||
*histogram("pageserver_page_service_pagestream_batch_wait_time_seconds"),
|
||||
*PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS,
|
||||
|
||||
@@ -947,6 +947,8 @@ class NeonEnvBuilder:
|
||||
continue
|
||||
if SMALL_DB_FILE_NAME_REGEX.fullmatch(test_file.name):
|
||||
continue
|
||||
if FINAL_METRICS_FILE_NAME == test_file.name:
|
||||
continue
|
||||
log.debug(f"Removing large database {test_file} file")
|
||||
test_file.unlink()
|
||||
elif test_entry.is_dir():
|
||||
@@ -1322,10 +1324,6 @@ class NeonEnv:
|
||||
log.info("test may use old binaries, ignoring warnings about unknown config items")
|
||||
ps.allowed_errors.append(".*ignoring unknown configuration item.*")
|
||||
|
||||
# Allow old software to start until https://github.com/neondatabase/neon/pull/11275
|
||||
# lands in the compatiblity snapshot.
|
||||
ps_cfg["page_service_pipelining"].pop("batching")
|
||||
|
||||
self.pageservers.append(ps)
|
||||
cfg["pageservers"].append(ps_cfg)
|
||||
|
||||
@@ -1461,6 +1459,12 @@ class NeonEnv:
|
||||
except Exception as e:
|
||||
metric_errors.append(e)
|
||||
log.error(f"metric validation failed on {pageserver.id}: {e}")
|
||||
|
||||
try:
|
||||
pageserver.snapshot_final_metrics()
|
||||
except Exception as e:
|
||||
log.error(f"metric snapshot failed on {pageserver.id}: {e}")
|
||||
|
||||
try:
|
||||
pageserver.stop(immediate=immediate)
|
||||
except RuntimeError:
|
||||
@@ -2976,6 +2980,20 @@ class NeonPageserver(PgProtocol, LogUtils):
|
||||
value = self.http_client().get_metric_value(metric)
|
||||
assert value == 0, f"Nonzero {metric} == {value}"
|
||||
|
||||
def snapshot_final_metrics(self):
|
||||
"""
|
||||
Take a snapshot of this pageserver's metrics and stash in its work directory.
|
||||
"""
|
||||
if not self.running:
|
||||
log.info(f"Skipping metrics snapshot on pageserver {self.id}, it is not running")
|
||||
return
|
||||
|
||||
metrics = self.http_client().get_metrics_str()
|
||||
metrics_snapshot_path = self.workdir / FINAL_METRICS_FILE_NAME
|
||||
|
||||
with open(metrics_snapshot_path, "w") as f:
|
||||
f.write(metrics)
|
||||
|
||||
def tenant_attach(
|
||||
self,
|
||||
tenant_id: TenantId,
|
||||
@@ -5138,6 +5156,8 @@ SMALL_DB_FILE_NAME_REGEX: re.Pattern[str] = re.compile(
|
||||
r"config-v1|heatmap-v1|tenant-manifest|metadata|.+\.(?:toml|pid|json|sql|conf)"
|
||||
)
|
||||
|
||||
FINAL_METRICS_FILE_NAME: str = "final_metrics.txt"
|
||||
|
||||
|
||||
SKIP_DIRS = frozenset(
|
||||
(
|
||||
|
||||
@@ -199,7 +199,7 @@ def wait_for_last_record_lsn(
|
||||
"""waits for pageserver to catch up to a certain lsn, returns the last observed lsn."""
|
||||
|
||||
current_lsn = Lsn(0)
|
||||
for i in range(1000):
|
||||
for i in range(2000):
|
||||
current_lsn = last_record_lsn(pageserver_http, tenant, timeline)
|
||||
if current_lsn >= lsn:
|
||||
return current_lsn
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
import concurrent.futures
|
||||
import dataclasses
|
||||
import json
|
||||
import re
|
||||
import threading
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
@@ -170,6 +169,7 @@ def test_throughput(
|
||||
time: float
|
||||
pageserver_batch_size_histo_sum: float
|
||||
pageserver_batch_size_histo_count: float
|
||||
pageserver_batch_breaks_reason_count: dict[str, int]
|
||||
compute_getpage_count: float
|
||||
pageserver_cpu_seconds_total: float
|
||||
|
||||
@@ -183,6 +183,10 @@ def test_throughput(
|
||||
compute_getpage_count=self.compute_getpage_count - other.compute_getpage_count,
|
||||
pageserver_cpu_seconds_total=self.pageserver_cpu_seconds_total
|
||||
- other.pageserver_cpu_seconds_total,
|
||||
pageserver_batch_breaks_reason_count={
|
||||
reason: count - other.pageserver_batch_breaks_reason_count.get(reason, 0)
|
||||
for reason, count in self.pageserver_batch_breaks_reason_count.items()
|
||||
},
|
||||
)
|
||||
|
||||
def normalize(self, by) -> "Metrics":
|
||||
@@ -192,6 +196,10 @@ def test_throughput(
|
||||
pageserver_batch_size_histo_count=self.pageserver_batch_size_histo_count / by,
|
||||
compute_getpage_count=self.compute_getpage_count / by,
|
||||
pageserver_cpu_seconds_total=self.pageserver_cpu_seconds_total / by,
|
||||
pageserver_batch_breaks_reason_count={
|
||||
reason: count / by
|
||||
for reason, count in self.pageserver_batch_breaks_reason_count.items()
|
||||
},
|
||||
)
|
||||
|
||||
def get_metrics() -> Metrics:
|
||||
@@ -201,6 +209,20 @@ def test_throughput(
|
||||
)
|
||||
compute_getpage_count = cur.fetchall()[0][0]
|
||||
pageserver_metrics = ps_http.get_metrics()
|
||||
for name, samples in pageserver_metrics.metrics.items():
|
||||
for sample in samples:
|
||||
log.info(f"{name=} labels={sample.labels} {sample.value}")
|
||||
|
||||
raw_batch_break_reason_count = pageserver_metrics.query_all(
|
||||
"pageserver_page_service_batch_break_reason_total",
|
||||
filter={"timeline_id": str(env.initial_timeline)},
|
||||
)
|
||||
|
||||
batch_break_reason_count = {
|
||||
sample.labels["reason"]: int(sample.value)
|
||||
for sample in raw_batch_break_reason_count
|
||||
}
|
||||
|
||||
return Metrics(
|
||||
time=time.time(),
|
||||
pageserver_batch_size_histo_sum=pageserver_metrics.query_one(
|
||||
@@ -209,6 +231,7 @@ def test_throughput(
|
||||
pageserver_batch_size_histo_count=pageserver_metrics.query_one(
|
||||
"pageserver_page_service_batch_size_count"
|
||||
).value,
|
||||
pageserver_batch_breaks_reason_count=batch_break_reason_count,
|
||||
compute_getpage_count=compute_getpage_count,
|
||||
pageserver_cpu_seconds_total=pageserver_metrics.query_one(
|
||||
"libmetrics_process_cpu_seconds_highres"
|
||||
@@ -263,25 +286,6 @@ def test_throughput(
|
||||
|
||||
log.info("Results: %s", metrics)
|
||||
|
||||
since_last_start: list[str] = []
|
||||
for line in env.pageserver.logfile.read_text().splitlines():
|
||||
if "git:" in line:
|
||||
since_last_start = []
|
||||
since_last_start.append(line)
|
||||
|
||||
stopping_batching_because_re = re.compile(
|
||||
r"stopping batching because (LSN changed|of batch size|timeline object mismatch|batch key changed|same page was requested at different LSNs|.*)"
|
||||
)
|
||||
reasons_for_stopping_batching = {}
|
||||
for line in since_last_start:
|
||||
match = stopping_batching_because_re.search(line)
|
||||
if match:
|
||||
if match.group(1) not in reasons_for_stopping_batching:
|
||||
reasons_for_stopping_batching[match.group(1)] = 0
|
||||
reasons_for_stopping_batching[match.group(1)] += 1
|
||||
|
||||
log.info("Reasons for stopping batching: %s", reasons_for_stopping_batching)
|
||||
|
||||
#
|
||||
# Sanity-checks on the collected data
|
||||
#
|
||||
@@ -295,7 +299,16 @@ def test_throughput(
|
||||
#
|
||||
|
||||
for metric, value in dataclasses.asdict(metrics).items():
|
||||
zenbenchmark.record(f"counters.{metric}", value, unit="", report=MetricReport.TEST_PARAM)
|
||||
if metric == "pageserver_batch_breaks_reason_count":
|
||||
assert isinstance(value, dict)
|
||||
for reason, count in value.items():
|
||||
zenbenchmark.record(
|
||||
f"counters.{metric}_{reason}", count, unit="", report=MetricReport.TEST_PARAM
|
||||
)
|
||||
else:
|
||||
zenbenchmark.record(
|
||||
f"counters.{metric}", value, unit="", report=MetricReport.TEST_PARAM
|
||||
)
|
||||
|
||||
zenbenchmark.record(
|
||||
"perfmetric.batching_factor",
|
||||
|
||||
@@ -97,6 +97,7 @@ def test_branch_creation_heavy_write(neon_compare: NeonCompare, n_branches: int)
|
||||
_record_branch_creation_durations(neon_compare, branch_creation_durations)
|
||||
|
||||
|
||||
@pytest.mark.timeout(1000)
|
||||
@pytest.mark.parametrize("n_branches", [500, 1024])
|
||||
@pytest.mark.parametrize("shape", ["one_ancestor", "random"])
|
||||
def test_branch_creation_many(neon_compare: NeonCompare, n_branches: int, shape: str):
|
||||
@@ -205,7 +206,7 @@ def wait_and_record_startup_metrics(
|
||||
assert len(matching) == len(expected_labels)
|
||||
return matching
|
||||
|
||||
samples = wait_until(metrics_are_filled)
|
||||
samples = wait_until(metrics_are_filled, timeout=60)
|
||||
|
||||
for sample in samples:
|
||||
phase = sample.labels["phase"]
|
||||
|
||||
@@ -52,6 +52,8 @@ def test_ingest_insert_bulk(
|
||||
# would compete with Pageserver for bandwidth.
|
||||
# neon_env_builder.enable_safekeeper_remote_storage(s3_storage())
|
||||
|
||||
neon_env_builder.pageserver_config_override = "wait_lsn_timeout='600 s'"
|
||||
|
||||
neon_env_builder.disable_scrub_on_exit() # immediate shutdown may leave stray layers
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
@@ -92,7 +94,18 @@ def test_ingest_insert_bulk(
|
||||
worker_rows = rows / CONCURRENCY
|
||||
pool.submit(insert_rows, endpoint, f"table{i}", worker_rows, value)
|
||||
|
||||
end_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0])
|
||||
for attempt in range(5):
|
||||
try:
|
||||
end_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0])
|
||||
break
|
||||
except Exception as e:
|
||||
# if we disable backpressure, postgres can become unresponsive for longer than a minute
|
||||
# and new connection attempts time out in postgres after 1 minute
|
||||
# so if this happens we retry new connection
|
||||
log.error(f"Attempt {attempt + 1}/5: Failed to select current wal lsn: {e}")
|
||||
if attempt == 4:
|
||||
log.error("Exceeded maximum retry attempts for selecting current wal lsn")
|
||||
raise
|
||||
|
||||
# Wait for pageserver to ingest the WAL.
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
@@ -64,8 +64,8 @@ def test_ro_replica_lag(
|
||||
|
||||
project = neon_api.create_project(pg_version)
|
||||
project_id = project["project"]["id"]
|
||||
log.info("Project ID: {}", project_id)
|
||||
log.info("Primary endpoint ID: {}", project["project"]["endpoints"][0]["id"])
|
||||
log.info("Project ID: %s", project_id)
|
||||
log.info("Primary endpoint ID: %s", project["project"]["endpoints"][0]["id"])
|
||||
neon_api.wait_for_operation_to_finish(project_id)
|
||||
error_occurred = False
|
||||
try:
|
||||
@@ -81,7 +81,7 @@ def test_ro_replica_lag(
|
||||
endpoint_type="read_only",
|
||||
settings={"pg_settings": {"hot_standby_feedback": "on"}},
|
||||
)
|
||||
log.info("Replica endpoint ID: {}", replica["endpoint"]["id"])
|
||||
log.info("Replica endpoint ID: %s", replica["endpoint"]["id"])
|
||||
replica_env = master_env.copy()
|
||||
replica_env["PGHOST"] = replica["endpoint"]["host"]
|
||||
neon_api.wait_for_operation_to_finish(project_id)
|
||||
@@ -197,8 +197,8 @@ def test_replication_start_stop(
|
||||
|
||||
project = neon_api.create_project(pg_version)
|
||||
project_id = project["project"]["id"]
|
||||
log.info("Project ID: {}", project_id)
|
||||
log.info("Primary endpoint ID: {}", project["project"]["endpoints"][0]["id"])
|
||||
log.info("Project ID: %s", project_id)
|
||||
log.info("Primary endpoint ID: %s", project["project"]["endpoints"][0]["id"])
|
||||
neon_api.wait_for_operation_to_finish(project_id)
|
||||
try:
|
||||
branch_id = project["branch"]["id"]
|
||||
@@ -215,7 +215,7 @@ def test_replication_start_stop(
|
||||
endpoint_type="read_only",
|
||||
settings={"pg_settings": {"hot_standby_feedback": "on"}},
|
||||
)
|
||||
log.info("Replica {} endpoint ID: {}", i + 1, replica["endpoint"]["id"])
|
||||
log.info("Replica %d endpoint ID: %s", i + 1, replica["endpoint"]["id"])
|
||||
replicas.append(replica)
|
||||
neon_api.wait_for_operation_to_finish(project_id)
|
||||
|
||||
|
||||
@@ -13,7 +13,7 @@ from fixtures.neon_fixtures import (
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.timeout(600)
|
||||
@pytest.mark.timeout(1200)
|
||||
@pytest.mark.parametrize("shard_count", [1, 8, 32])
|
||||
@pytest.mark.parametrize(
|
||||
"wal_receiver_protocol",
|
||||
|
||||
@@ -390,6 +390,7 @@ def test_create_churn_during_restart(neon_env_builder: NeonEnvBuilder):
|
||||
# Tenant creation requests which arrive out of order will generate complaints about
|
||||
# generation nubmers out of order.
|
||||
env.pageserver.allowed_errors.append(".*Generation .+ is less than existing .+")
|
||||
env.pageserver.allowed_errors.append(".*due to stale generation.+")
|
||||
|
||||
# Timeline::flush_and_shutdown cannot tell if it is hitting a failure because of
|
||||
# an incomplete attach, or some other problem. In the field this should be rare,
|
||||
|
||||
Reference in New Issue
Block a user