mirror of
https://github.com/neondatabase/neon.git
synced 2026-07-03 20:20:38 +00:00
Compare commits
22 Commits
release-pr
...
skyzh/fake
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6f87e11cdc | ||
|
|
7c2b2325f1 | ||
|
|
7d9f677a22 | ||
|
|
c450d3224d | ||
|
|
0beaf10ccb | ||
|
|
6f629abfe5 | ||
|
|
1da54b7e01 | ||
|
|
443b052eec | ||
|
|
76a044d1fa | ||
|
|
77c9154564 | ||
|
|
c5115518e9 | ||
|
|
931f8c4300 | ||
|
|
0f7c2cc382 | ||
|
|
983d56502b | ||
|
|
bcef542d5b | ||
|
|
e31455d936 | ||
|
|
a4ea7d6194 | ||
|
|
19bea5fd0c | ||
|
|
5be94e28c4 | ||
|
|
63a106021a | ||
|
|
9a6ace9bde | ||
|
|
8c77ccfc01 |
1
.github/actionlint.yml
vendored
1
.github/actionlint.yml
vendored
@@ -6,6 +6,7 @@ self-hosted-runner:
|
||||
- small
|
||||
- small-metal
|
||||
- small-arm64
|
||||
- unit-perf
|
||||
- us-east-2
|
||||
config-variables:
|
||||
- AWS_ECR_REGION
|
||||
|
||||
@@ -70,6 +70,7 @@ runs:
|
||||
|
||||
- name: Install Allure
|
||||
shell: bash -euxo pipefail {0}
|
||||
working-directory: /tmp
|
||||
run: |
|
||||
if ! which allure; then
|
||||
ALLURE_ZIP=allure-${ALLURE_VERSION}.zip
|
||||
|
||||
11
.github/workflows/_create-release-pr.yml
vendored
11
.github/workflows/_create-release-pr.yml
vendored
@@ -53,10 +53,13 @@ jobs:
|
||||
|| inputs.component-name == 'Compute' && 'release-compute'
|
||||
}}
|
||||
run: |
|
||||
today=$(date +'%Y-%m-%d')
|
||||
echo "title=${COMPONENT_NAME} release ${today}" | tee -a ${GITHUB_OUTPUT}
|
||||
echo "rc-branch=rc/${RELEASE_BRANCH}/${today}" | tee -a ${GITHUB_OUTPUT}
|
||||
echo "release-branch=${RELEASE_BRANCH}" | tee -a ${GITHUB_OUTPUT}
|
||||
now_date=$(date -u +'%Y-%m-%d')
|
||||
now_time=$(date -u +'%H-%M-%Z')
|
||||
{
|
||||
echo "title=${COMPONENT_NAME} release ${now_date}"
|
||||
echo "rc-branch=rc/${RELEASE_BRANCH}/${now_date}_${now_time}"
|
||||
echo "release-branch=${RELEASE_BRANCH}"
|
||||
} | tee -a ${GITHUB_OUTPUT}
|
||||
|
||||
- name: Configure git
|
||||
run: |
|
||||
|
||||
4
.github/workflows/build_and_test.yml
vendored
4
.github/workflows/build_and_test.yml
vendored
@@ -284,7 +284,7 @@ jobs:
|
||||
statuses: write
|
||||
contents: write
|
||||
pull-requests: write
|
||||
runs-on: [ self-hosted, small-metal ]
|
||||
runs-on: [ self-hosted, unit-perf ]
|
||||
container:
|
||||
image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
|
||||
credentials:
|
||||
@@ -1271,7 +1271,7 @@ jobs:
|
||||
exit 1
|
||||
|
||||
deploy:
|
||||
needs: [ check-permissions, push-neon-image-dev, push-compute-image-dev, push-neon-image-prod, push-compute-image-prod, meta, build-and-test-locally, trigger-custom-extensions-build-and-wait ]
|
||||
needs: [ check-permissions, push-neon-image-dev, push-compute-image-dev, push-neon-image-prod, push-compute-image-prod, meta, trigger-custom-extensions-build-and-wait ]
|
||||
# `!failure() && !cancelled()` is required because the workflow depends on the job that can be skipped: `push-neon-image-prod` and `push-compute-image-prod`
|
||||
if: ${{ contains(fromJSON('["push-main", "storage-release", "proxy-release", "compute-release"]'), needs.meta.outputs.run-kind) && !failure() && !cancelled() }}
|
||||
permissions:
|
||||
|
||||
12
Cargo.lock
generated
12
Cargo.lock
generated
@@ -5495,6 +5495,16 @@ version = "1.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c707298afce11da2efef2f600116fa93ffa7a032b5d7b628aa17711ec81383ca"
|
||||
|
||||
[[package]]
|
||||
name = "remote_keys"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"rand 0.8.5",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "remote_storage"
|
||||
version = "0.1.0"
|
||||
@@ -5510,6 +5520,7 @@ dependencies = [
|
||||
"azure_identity",
|
||||
"azure_storage",
|
||||
"azure_storage_blobs",
|
||||
"base64 0.13.1",
|
||||
"bytes",
|
||||
"camino",
|
||||
"camino-tempfile",
|
||||
@@ -5520,6 +5531,7 @@ dependencies = [
|
||||
"humantime-serde",
|
||||
"hyper 1.4.1",
|
||||
"itertools 0.10.5",
|
||||
"md5",
|
||||
"metrics",
|
||||
"once_cell",
|
||||
"pin-project-lite",
|
||||
|
||||
@@ -30,6 +30,7 @@ members = [
|
||||
"libs/tenant_size_model",
|
||||
"libs/metrics",
|
||||
"libs/postgres_connection",
|
||||
"libs/remote_keys",
|
||||
"libs/remote_storage",
|
||||
"libs/tracing-utils",
|
||||
"libs/postgres_ffi/wal_craft",
|
||||
|
||||
8
docker-compose/ext-src/pg_jsonschema-src/Makefile
Normal file
8
docker-compose/ext-src/pg_jsonschema-src/Makefile
Normal file
@@ -0,0 +1,8 @@
|
||||
EXTENSION = pg_jsonschema
|
||||
DATA = pg_jsonschema--1.0.sql
|
||||
REGRESS = jsonschema_valid_api jsonschema_edge_cases
|
||||
REGRESS_OPTS = --load-extension=pg_jsonschema
|
||||
|
||||
PG_CONFIG ?= pg_config
|
||||
PGXS := $(shell $(PG_CONFIG) --pgxs)
|
||||
include $(PGXS)
|
||||
@@ -0,0 +1,87 @@
|
||||
-- Schema with enums, nulls, extra properties disallowed
|
||||
SELECT jsonschema_is_valid('{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"status": { "type": "string", "enum": ["active", "inactive", "pending"] },
|
||||
"email": { "type": ["string", "null"], "format": "email" }
|
||||
},
|
||||
"required": ["status"],
|
||||
"additionalProperties": false
|
||||
}'::json);
|
||||
jsonschema_is_valid
|
||||
---------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
-- Valid enum and null email
|
||||
SELECT jsonschema_validation_errors(
|
||||
'{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"status": { "type": "string", "enum": ["active", "inactive", "pending"] },
|
||||
"email": { "type": ["string", "null"], "format": "email" }
|
||||
},
|
||||
"required": ["status"],
|
||||
"additionalProperties": false
|
||||
}'::json,
|
||||
'{"status": "active", "email": null}'::json
|
||||
);
|
||||
jsonschema_validation_errors
|
||||
------------------------------
|
||||
{}
|
||||
(1 row)
|
||||
|
||||
-- Invalid enum value
|
||||
SELECT jsonschema_validation_errors(
|
||||
'{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"status": { "type": "string", "enum": ["active", "inactive", "pending"] },
|
||||
"email": { "type": ["string", "null"], "format": "email" }
|
||||
},
|
||||
"required": ["status"],
|
||||
"additionalProperties": false
|
||||
}'::json,
|
||||
'{"status": "disabled", "email": null}'::json
|
||||
);
|
||||
jsonschema_validation_errors
|
||||
----------------------------------------------------------------------
|
||||
{"\"disabled\" is not one of [\"active\",\"inactive\",\"pending\"]"}
|
||||
(1 row)
|
||||
|
||||
-- Invalid email format (assuming format is validated)
|
||||
SELECT jsonschema_validation_errors(
|
||||
'{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"status": { "type": "string", "enum": ["active", "inactive", "pending"] },
|
||||
"email": { "type": ["string", "null"], "format": "email" }
|
||||
},
|
||||
"required": ["status"],
|
||||
"additionalProperties": false
|
||||
}'::json,
|
||||
'{"status": "active", "email": "not-an-email"}'::json
|
||||
);
|
||||
jsonschema_validation_errors
|
||||
-----------------------------------------
|
||||
{"\"not-an-email\" is not a \"email\""}
|
||||
(1 row)
|
||||
|
||||
-- Extra property not allowed
|
||||
SELECT jsonschema_validation_errors(
|
||||
'{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"status": { "type": "string", "enum": ["active", "inactive", "pending"] },
|
||||
"email": { "type": ["string", "null"], "format": "email" }
|
||||
},
|
||||
"required": ["status"],
|
||||
"additionalProperties": false
|
||||
}'::json,
|
||||
'{"status": "active", "extra": "should not be here"}'::json
|
||||
);
|
||||
jsonschema_validation_errors
|
||||
--------------------------------------------------------------------
|
||||
{"Additional properties are not allowed ('extra' was unexpected)"}
|
||||
(1 row)
|
||||
|
||||
@@ -0,0 +1,65 @@
|
||||
-- Define schema
|
||||
SELECT jsonschema_is_valid('{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"username": { "type": "string" },
|
||||
"age": { "type": "integer" }
|
||||
},
|
||||
"required": ["username"]
|
||||
}'::json);
|
||||
jsonschema_is_valid
|
||||
---------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
-- Valid instance
|
||||
SELECT jsonschema_validation_errors(
|
||||
'{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"username": { "type": "string" },
|
||||
"age": { "type": "integer" }
|
||||
},
|
||||
"required": ["username"]
|
||||
}'::json,
|
||||
'{"username": "alice", "age": 25}'::json
|
||||
);
|
||||
jsonschema_validation_errors
|
||||
------------------------------
|
||||
{}
|
||||
(1 row)
|
||||
|
||||
-- Invalid instance: missing required "username"
|
||||
SELECT jsonschema_validation_errors(
|
||||
'{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"username": { "type": "string" },
|
||||
"age": { "type": "integer" }
|
||||
},
|
||||
"required": ["username"]
|
||||
}'::json,
|
||||
'{"age": 25}'::json
|
||||
);
|
||||
jsonschema_validation_errors
|
||||
-----------------------------------------
|
||||
{"\"username\" is a required property"}
|
||||
(1 row)
|
||||
|
||||
-- Invalid instance: wrong type for "age"
|
||||
SELECT jsonschema_validation_errors(
|
||||
'{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"username": { "type": "string" },
|
||||
"age": { "type": "integer" }
|
||||
},
|
||||
"required": ["username"]
|
||||
}'::json,
|
||||
'{"username": "bob", "age": "twenty"}'::json
|
||||
);
|
||||
jsonschema_validation_errors
|
||||
-------------------------------------------
|
||||
{"\"twenty\" is not of type \"integer\""}
|
||||
(1 row)
|
||||
|
||||
@@ -0,0 +1,66 @@
|
||||
-- Schema with enums, nulls, extra properties disallowed
|
||||
SELECT jsonschema_is_valid('{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"status": { "type": "string", "enum": ["active", "inactive", "pending"] },
|
||||
"email": { "type": ["string", "null"], "format": "email" }
|
||||
},
|
||||
"required": ["status"],
|
||||
"additionalProperties": false
|
||||
}'::json);
|
||||
|
||||
-- Valid enum and null email
|
||||
SELECT jsonschema_validation_errors(
|
||||
'{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"status": { "type": "string", "enum": ["active", "inactive", "pending"] },
|
||||
"email": { "type": ["string", "null"], "format": "email" }
|
||||
},
|
||||
"required": ["status"],
|
||||
"additionalProperties": false
|
||||
}'::json,
|
||||
'{"status": "active", "email": null}'::json
|
||||
);
|
||||
|
||||
-- Invalid enum value
|
||||
SELECT jsonschema_validation_errors(
|
||||
'{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"status": { "type": "string", "enum": ["active", "inactive", "pending"] },
|
||||
"email": { "type": ["string", "null"], "format": "email" }
|
||||
},
|
||||
"required": ["status"],
|
||||
"additionalProperties": false
|
||||
}'::json,
|
||||
'{"status": "disabled", "email": null}'::json
|
||||
);
|
||||
|
||||
-- Invalid email format (assuming format is validated)
|
||||
SELECT jsonschema_validation_errors(
|
||||
'{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"status": { "type": "string", "enum": ["active", "inactive", "pending"] },
|
||||
"email": { "type": ["string", "null"], "format": "email" }
|
||||
},
|
||||
"required": ["status"],
|
||||
"additionalProperties": false
|
||||
}'::json,
|
||||
'{"status": "active", "email": "not-an-email"}'::json
|
||||
);
|
||||
|
||||
-- Extra property not allowed
|
||||
SELECT jsonschema_validation_errors(
|
||||
'{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"status": { "type": "string", "enum": ["active", "inactive", "pending"] },
|
||||
"email": { "type": ["string", "null"], "format": "email" }
|
||||
},
|
||||
"required": ["status"],
|
||||
"additionalProperties": false
|
||||
}'::json,
|
||||
'{"status": "active", "extra": "should not be here"}'::json
|
||||
);
|
||||
@@ -0,0 +1,48 @@
|
||||
-- Define schema
|
||||
SELECT jsonschema_is_valid('{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"username": { "type": "string" },
|
||||
"age": { "type": "integer" }
|
||||
},
|
||||
"required": ["username"]
|
||||
}'::json);
|
||||
|
||||
-- Valid instance
|
||||
SELECT jsonschema_validation_errors(
|
||||
'{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"username": { "type": "string" },
|
||||
"age": { "type": "integer" }
|
||||
},
|
||||
"required": ["username"]
|
||||
}'::json,
|
||||
'{"username": "alice", "age": 25}'::json
|
||||
);
|
||||
|
||||
-- Invalid instance: missing required "username"
|
||||
SELECT jsonschema_validation_errors(
|
||||
'{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"username": { "type": "string" },
|
||||
"age": { "type": "integer" }
|
||||
},
|
||||
"required": ["username"]
|
||||
}'::json,
|
||||
'{"age": 25}'::json
|
||||
);
|
||||
|
||||
-- Invalid instance: wrong type for "age"
|
||||
SELECT jsonschema_validation_errors(
|
||||
'{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"username": { "type": "string" },
|
||||
"age": { "type": "integer" }
|
||||
},
|
||||
"required": ["username"]
|
||||
}'::json,
|
||||
'{"username": "bob", "age": "twenty"}'::json
|
||||
);
|
||||
9
docker-compose/ext-src/pg_session_jwt-src/Makefile
Normal file
9
docker-compose/ext-src/pg_session_jwt-src/Makefile
Normal file
@@ -0,0 +1,9 @@
|
||||
EXTENSION = pg_session_jwt
|
||||
|
||||
REGRESS = basic_functions
|
||||
REGRESS_OPTS = --load-extension=$(EXTENSION)
|
||||
export PGOPTIONS = -c pg_session_jwt.jwk={"crv":"Ed25519","kty":"OKP","x":"R_Abz-63zJ00l-IraL5fQhwkhGVZCSooQFV5ntC3C7M"}
|
||||
|
||||
PG_CONFIG ?= pg_config
|
||||
PGXS := $(shell $(PG_CONFIG) --pgxs)
|
||||
include $(PGXS)
|
||||
@@ -0,0 +1,35 @@
|
||||
-- Basic functionality tests for pg_session_jwt
|
||||
-- Test auth.init() function
|
||||
SELECT auth.init();
|
||||
init
|
||||
------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Test an invalid JWT
|
||||
SELECT auth.jwt_session_init('INVALID-JWT');
|
||||
ERROR: invalid JWT encoding
|
||||
-- Test creating a session with an expired JWT
|
||||
SELECT auth.jwt_session_init('eyJhbGciOiJFZERTQSJ9.eyJleHAiOjE3NDI1NjQ0MzIsImlhdCI6MTc0MjU2NDI1MiwianRpIjo0MjQyNDIsInN1YiI6InVzZXIxMjMifQ.A6FwKuaSduHB9O7Gz37g0uoD_U9qVS0JNtT7YABGVgB7HUD1AMFc9DeyhNntWBqncg8k5brv-hrNTuUh5JYMAw');
|
||||
ERROR: Token used after it has expired
|
||||
-- Test creating a session with a valid JWT
|
||||
SELECT auth.jwt_session_init('eyJhbGciOiJFZERTQSJ9.eyJleHAiOjQ4OTYxNjQyNTIsImlhdCI6MTc0MjU2NDI1MiwianRpIjo0MzQzNDMsInN1YiI6InVzZXIxMjMifQ.2TXVgjb6JSUq6_adlvp-m_SdOxZSyGS30RS9TLB0xu2N83dMSs2NybwE1NMU8Fb0tcAZR_ET7M2rSxbTrphfCg');
|
||||
jwt_session_init
|
||||
------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Test auth.session() function
|
||||
SELECT auth.session();
|
||||
session
|
||||
-------------------------------------------------------------------------
|
||||
{"exp": 4896164252, "iat": 1742564252, "jti": 434343, "sub": "user123"}
|
||||
(1 row)
|
||||
|
||||
-- Test auth.user_id() function
|
||||
SELECT auth.user_id() AS user_id;
|
||||
user_id
|
||||
---------
|
||||
user123
|
||||
(1 row)
|
||||
|
||||
@@ -0,0 +1,19 @@
|
||||
-- Basic functionality tests for pg_session_jwt
|
||||
|
||||
-- Test auth.init() function
|
||||
SELECT auth.init();
|
||||
|
||||
-- Test an invalid JWT
|
||||
SELECT auth.jwt_session_init('INVALID-JWT');
|
||||
|
||||
-- Test creating a session with an expired JWT
|
||||
SELECT auth.jwt_session_init('eyJhbGciOiJFZERTQSJ9.eyJleHAiOjE3NDI1NjQ0MzIsImlhdCI6MTc0MjU2NDI1MiwianRpIjo0MjQyNDIsInN1YiI6InVzZXIxMjMifQ.A6FwKuaSduHB9O7Gz37g0uoD_U9qVS0JNtT7YABGVgB7HUD1AMFc9DeyhNntWBqncg8k5brv-hrNTuUh5JYMAw');
|
||||
|
||||
-- Test creating a session with a valid JWT
|
||||
SELECT auth.jwt_session_init('eyJhbGciOiJFZERTQSJ9.eyJleHAiOjQ4OTYxNjQyNTIsImlhdCI6MTc0MjU2NDI1MiwianRpIjo0MzQzNDMsInN1YiI6InVzZXIxMjMifQ.2TXVgjb6JSUq6_adlvp-m_SdOxZSyGS30RS9TLB0xu2N83dMSs2NybwE1NMU8Fb0tcAZR_ET7M2rSxbTrphfCg');
|
||||
|
||||
-- Test auth.session() function
|
||||
SELECT auth.session();
|
||||
|
||||
-- Test auth.user_id() function
|
||||
SELECT auth.user_id() AS user_id;
|
||||
13
libs/remote_keys/Cargo.toml
Normal file
13
libs/remote_keys/Cargo.toml
Normal file
@@ -0,0 +1,13 @@
|
||||
[package]
|
||||
name = "remote_keys"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
utils.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
rand.workspace = true
|
||||
42
libs/remote_keys/src/lib.rs
Normal file
42
libs/remote_keys/src/lib.rs
Normal file
@@ -0,0 +1,42 @@
|
||||
//! A module that provides a KMS implementation that generates and unwraps the keys.
|
||||
//!
|
||||
|
||||
/// A KMS implementation that does static wrapping and unwrapping of the keys.
|
||||
pub struct NaiveKms {
|
||||
account_id: String,
|
||||
}
|
||||
|
||||
impl NaiveKms {
|
||||
pub fn new(account_id: String) -> Self {
|
||||
Self { account_id }
|
||||
}
|
||||
|
||||
pub fn encrypt(&self, plain: &[u8]) -> anyhow::Result<Vec<u8>> {
|
||||
let wrapped = [self.account_id.as_bytes(), "-wrapped-".as_bytes(), plain].concat();
|
||||
Ok(wrapped)
|
||||
}
|
||||
|
||||
pub fn decrypt(&self, wrapped: &[u8]) -> anyhow::Result<Vec<u8>> {
|
||||
let Some(wrapped) = wrapped.strip_prefix(self.account_id.as_bytes()) else {
|
||||
return Err(anyhow::anyhow!("invalid key"));
|
||||
};
|
||||
let Some(plain) = wrapped.strip_prefix(b"-wrapped-") else {
|
||||
return Err(anyhow::anyhow!("invalid key"));
|
||||
};
|
||||
Ok(plain.to_vec())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_generate_key() {
|
||||
let kms = NaiveKms::new("test-tenant".to_string());
|
||||
let data = rand::random::<[u8; 32]>().to_vec();
|
||||
let encrypted = kms.encrypt(&data).unwrap();
|
||||
let decrypted = kms.decrypt(&encrypted).unwrap();
|
||||
assert_eq!(data, decrypted);
|
||||
}
|
||||
}
|
||||
@@ -13,6 +13,7 @@ aws-smithy-async.workspace = true
|
||||
aws-smithy-types.workspace = true
|
||||
aws-config.workspace = true
|
||||
aws-sdk-s3.workspace = true
|
||||
base64.workspace = true
|
||||
bytes.workspace = true
|
||||
camino = { workspace = true, features = ["serde1"] }
|
||||
humantime-serde.workspace = true
|
||||
@@ -27,6 +28,7 @@ tokio-util = { workspace = true, features = ["compat"] }
|
||||
toml_edit.workspace = true
|
||||
tracing.workspace = true
|
||||
scopeguard.workspace = true
|
||||
md5.workspace = true
|
||||
metrics.workspace = true
|
||||
utils = { path = "../utils", default-features = false }
|
||||
pin-project-lite.workspace = true
|
||||
|
||||
@@ -550,6 +550,19 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
self.download_for_builder(builder, timeout, cancel).await
|
||||
}
|
||||
|
||||
#[allow(unused_variables)]
|
||||
async fn upload_with_encryption(
|
||||
&self,
|
||||
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
|
||||
data_size_bytes: usize,
|
||||
to: &RemotePath,
|
||||
metadata: Option<StorageMetadata>,
|
||||
encryption_key: Option<&[u8]>,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
|
||||
self.delete_objects(std::array::from_ref(path), cancel)
|
||||
.await
|
||||
|
||||
@@ -190,6 +190,8 @@ pub struct DownloadOpts {
|
||||
/// timeouts: for something like an index/manifest/heatmap, we should time out faster than
|
||||
/// for layer files
|
||||
pub kind: DownloadKind,
|
||||
/// The encryption key to use for the download.
|
||||
pub encryption_key: Option<Vec<u8>>,
|
||||
}
|
||||
|
||||
pub enum DownloadKind {
|
||||
@@ -204,6 +206,7 @@ impl Default for DownloadOpts {
|
||||
byte_start: Bound::Unbounded,
|
||||
byte_end: Bound::Unbounded,
|
||||
kind: DownloadKind::Large,
|
||||
encryption_key: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -241,6 +244,15 @@ impl DownloadOpts {
|
||||
None => format!("bytes={start}-"),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn with_encryption_key(mut self, encryption_key: Option<impl AsRef<[u8]>>) -> Self {
|
||||
self.encryption_key = encryption_key.map(|k| k.as_ref().to_vec());
|
||||
self
|
||||
}
|
||||
|
||||
pub fn encryption_key(&self) -> Option<&[u8]> {
|
||||
self.encryption_key.as_deref()
|
||||
}
|
||||
}
|
||||
|
||||
/// Storage (potentially remote) API to manage its state.
|
||||
@@ -331,6 +343,19 @@ pub trait RemoteStorage: Send + Sync + 'static {
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<Download, DownloadError>;
|
||||
|
||||
/// Same as upload, but with remote encryption if the backend supports it (e.g. SSE-C on AWS).
|
||||
async fn upload_with_encryption(
|
||||
&self,
|
||||
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
|
||||
// S3 PUT request requires the content length to be specified,
|
||||
// otherwise it starts to fail with the concurrent connection count increasing.
|
||||
data_size_bytes: usize,
|
||||
to: &RemotePath,
|
||||
metadata: Option<StorageMetadata>,
|
||||
encryption_key: Option<&[u8]>,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()>;
|
||||
|
||||
/// Delete a single path from remote storage.
|
||||
///
|
||||
/// If the operation fails because of timeout or cancellation, the root cause of the error will be
|
||||
@@ -615,6 +640,63 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn upload_with_encryption(
|
||||
&self,
|
||||
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
|
||||
data_size_bytes: usize,
|
||||
to: &RemotePath,
|
||||
metadata: Option<StorageMetadata>,
|
||||
encryption_key: Option<&[u8]>,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
match self {
|
||||
Self::LocalFs(s) => {
|
||||
s.upload_with_encryption(
|
||||
from,
|
||||
data_size_bytes,
|
||||
to,
|
||||
metadata,
|
||||
encryption_key,
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
}
|
||||
Self::AwsS3(s) => {
|
||||
s.upload_with_encryption(
|
||||
from,
|
||||
data_size_bytes,
|
||||
to,
|
||||
metadata,
|
||||
encryption_key,
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
}
|
||||
Self::AzureBlob(s) => {
|
||||
s.upload_with_encryption(
|
||||
from,
|
||||
data_size_bytes,
|
||||
to,
|
||||
metadata,
|
||||
encryption_key,
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
}
|
||||
Self::Unreliable(s) => {
|
||||
s.upload_with_encryption(
|
||||
from,
|
||||
data_size_bytes,
|
||||
to,
|
||||
metadata,
|
||||
encryption_key,
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl GenericRemoteStorage {
|
||||
|
||||
@@ -560,6 +560,19 @@ impl RemoteStorage for LocalFs {
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(unused_variables)]
|
||||
async fn upload_with_encryption(
|
||||
&self,
|
||||
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
|
||||
data_size_bytes: usize,
|
||||
to: &RemotePath,
|
||||
metadata: Option<StorageMetadata>,
|
||||
encryption_key: Option<&[u8]>,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn delete_objects(
|
||||
&self,
|
||||
paths: &[RemotePath],
|
||||
|
||||
@@ -66,7 +66,10 @@ struct GetObjectRequest {
|
||||
key: String,
|
||||
etag: Option<String>,
|
||||
range: Option<String>,
|
||||
/// Base64 encoded SSE-C key for server-side encryption.
|
||||
sse_c_key: Option<Vec<u8>>,
|
||||
}
|
||||
|
||||
impl S3Bucket {
|
||||
/// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
|
||||
pub async fn new(remote_storage_config: &S3Config, timeout: Duration) -> anyhow::Result<Self> {
|
||||
@@ -257,6 +260,13 @@ impl S3Bucket {
|
||||
builder = builder.if_none_match(etag);
|
||||
}
|
||||
|
||||
if let Some(encryption_key) = request.sse_c_key {
|
||||
builder = builder.sse_customer_algorithm("AES256");
|
||||
builder = builder.sse_customer_key(base64::encode(&encryption_key));
|
||||
builder = builder
|
||||
.sse_customer_key_md5(base64::encode(md5::compute(&encryption_key).as_slice()));
|
||||
}
|
||||
|
||||
let get_object = builder.send();
|
||||
|
||||
let get_object = tokio::select! {
|
||||
@@ -693,12 +703,13 @@ impl RemoteStorage for S3Bucket {
|
||||
})
|
||||
}
|
||||
|
||||
async fn upload(
|
||||
async fn upload_with_encryption(
|
||||
&self,
|
||||
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
|
||||
from_size_bytes: usize,
|
||||
to: &RemotePath,
|
||||
metadata: Option<StorageMetadata>,
|
||||
encryption_key: Option<&[u8]>,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
let kind = RequestKind::Put;
|
||||
@@ -709,7 +720,7 @@ impl RemoteStorage for S3Bucket {
|
||||
let body = StreamBody::new(from.map(|x| x.map(Frame::data)));
|
||||
let bytes_stream = ByteStream::new(SdkBody::from_body_1_x(body));
|
||||
|
||||
let upload = self
|
||||
let mut upload = self
|
||||
.client
|
||||
.put_object()
|
||||
.bucket(self.bucket_name.clone())
|
||||
@@ -717,8 +728,17 @@ impl RemoteStorage for S3Bucket {
|
||||
.set_metadata(metadata.map(|m| m.0))
|
||||
.set_storage_class(self.upload_storage_class.clone())
|
||||
.content_length(from_size_bytes.try_into()?)
|
||||
.body(bytes_stream)
|
||||
.send();
|
||||
.body(bytes_stream);
|
||||
|
||||
if let Some(encryption_key) = encryption_key {
|
||||
upload = upload.sse_customer_algorithm("AES256");
|
||||
let base64_key = base64::encode(encryption_key);
|
||||
upload = upload.sse_customer_key(&base64_key);
|
||||
upload = upload
|
||||
.sse_customer_key_md5(base64::encode(md5::compute(encryption_key).as_slice()));
|
||||
}
|
||||
|
||||
let upload = upload.send();
|
||||
|
||||
let upload = tokio::time::timeout(self.timeout, upload);
|
||||
|
||||
@@ -742,6 +762,18 @@ impl RemoteStorage for S3Bucket {
|
||||
}
|
||||
}
|
||||
|
||||
async fn upload(
|
||||
&self,
|
||||
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
|
||||
data_size_bytes: usize,
|
||||
to: &RemotePath,
|
||||
metadata: Option<StorageMetadata>,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
self.upload_with_encryption(from, data_size_bytes, to, metadata, None, cancel)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn copy(
|
||||
&self,
|
||||
from: &RemotePath,
|
||||
@@ -801,6 +833,7 @@ impl RemoteStorage for S3Bucket {
|
||||
key: self.relative_path_to_s3_object(from),
|
||||
etag: opts.etag.as_ref().map(|e| e.to_string()),
|
||||
range: opts.byte_range_header(),
|
||||
sse_c_key: opts.encryption_key.clone(),
|
||||
},
|
||||
cancel,
|
||||
)
|
||||
|
||||
@@ -178,6 +178,19 @@ impl RemoteStorage for UnreliableWrapper {
|
||||
self.inner.download(from, opts, cancel).await
|
||||
}
|
||||
|
||||
#[allow(unused_variables)]
|
||||
async fn upload_with_encryption(
|
||||
&self,
|
||||
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
|
||||
data_size_bytes: usize,
|
||||
to: &RemotePath,
|
||||
metadata: Option<StorageMetadata>,
|
||||
encryption_key: Option<&[u8]>,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
|
||||
self.delete_inner(path, true, cancel).await
|
||||
}
|
||||
|
||||
@@ -421,7 +421,7 @@ async fn download_is_timeouted(ctx: &mut MaybeEnabledStorage) {
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
let len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
|
||||
let len = upload_large_enough_file(&ctx.client, &path, &cancel, None).await;
|
||||
|
||||
let timeout = std::time::Duration::from_secs(5);
|
||||
|
||||
@@ -500,7 +500,7 @@ async fn download_is_cancelled(ctx: &mut MaybeEnabledStorage) {
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
let file_len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
|
||||
let file_len = upload_large_enough_file(&ctx.client, &path, &cancel, None).await;
|
||||
|
||||
{
|
||||
let stream = ctx
|
||||
@@ -555,6 +555,7 @@ async fn upload_large_enough_file(
|
||||
client: &GenericRemoteStorage,
|
||||
path: &RemotePath,
|
||||
cancel: &CancellationToken,
|
||||
encryption_key: Option<&[u8]>,
|
||||
) -> usize {
|
||||
let header = bytes::Bytes::from_static("remote blob data content".as_bytes());
|
||||
let body = bytes::Bytes::from(vec![0u8; 1024]);
|
||||
@@ -565,9 +566,54 @@ async fn upload_large_enough_file(
|
||||
let contents = futures::stream::iter(contents.map(std::io::Result::Ok));
|
||||
|
||||
client
|
||||
.upload(contents, len, path, None, cancel)
|
||||
.upload_with_encryption(contents, len, path, None, encryption_key, cancel)
|
||||
.await
|
||||
.expect("upload succeeds");
|
||||
|
||||
len
|
||||
}
|
||||
|
||||
#[test_context(MaybeEnabledStorage)]
|
||||
#[tokio::test]
|
||||
async fn encryption_works(ctx: &mut MaybeEnabledStorage) {
|
||||
let MaybeEnabledStorage::Enabled(ctx) = ctx else {
|
||||
return;
|
||||
};
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let path = RemotePath::new(Utf8Path::new(
|
||||
format!("{}/file_to_copy", ctx.base_prefix).as_str(),
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
let key = rand::random::<[u8; 32]>();
|
||||
let file_len = upload_large_enough_file(&ctx.client, &path, &cancel, Some(&key)).await;
|
||||
|
||||
{
|
||||
let download = ctx
|
||||
.client
|
||||
.download(
|
||||
&path,
|
||||
&DownloadOpts::default().with_encryption_key(Some(&key)),
|
||||
&cancel,
|
||||
)
|
||||
.await
|
||||
.expect("should succeed");
|
||||
let vec = download_to_vec(download).await.expect("should succeed");
|
||||
assert_eq!(vec.len(), file_len);
|
||||
}
|
||||
|
||||
{
|
||||
// Download without encryption key should fail
|
||||
let download = ctx
|
||||
.client
|
||||
.download(&path, &DownloadOpts::default(), &cancel)
|
||||
.await;
|
||||
assert!(download.is_err());
|
||||
}
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
ctx.client.delete_objects(&[path], &cancel).await.unwrap();
|
||||
}
|
||||
|
||||
@@ -714,7 +714,7 @@ impl LayerMap {
|
||||
true
|
||||
}
|
||||
|
||||
pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<PersistentLayerDesc>> {
|
||||
pub fn iter_historic_layers(&self) -> impl ExactSizeIterator<Item = Arc<PersistentLayerDesc>> {
|
||||
self.historic.iter()
|
||||
}
|
||||
|
||||
|
||||
@@ -504,7 +504,7 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
|
||||
}
|
||||
|
||||
/// Iterate all the layers
|
||||
pub fn iter(&self) -> impl '_ + Iterator<Item = Value> {
|
||||
pub fn iter(&self) -> impl ExactSizeIterator<Item = Value> {
|
||||
// NOTE we can actually perform this without rebuilding,
|
||||
// but it's not necessary for now.
|
||||
if !self.buffer.is_empty() {
|
||||
|
||||
@@ -5702,6 +5702,12 @@ impl Timeline {
|
||||
return;
|
||||
}
|
||||
|
||||
if self.cancel.is_cancelled() {
|
||||
// We already requested stopping the tenant, so we cannot wait for the logical size
|
||||
// calculation to complete given the task might have been already cancelled.
|
||||
return;
|
||||
}
|
||||
|
||||
if let Some(await_bg_cancel) = self
|
||||
.current_logical_size
|
||||
.cancel_wait_for_background_loop_concurrency_limit_semaphore
|
||||
|
||||
@@ -56,7 +56,8 @@ use crate::tenant::storage_layer::batch_split_writer::{
|
||||
use crate::tenant::storage_layer::filter_iterator::FilterIterator;
|
||||
use crate::tenant::storage_layer::merge_iterator::MergeIterator;
|
||||
use crate::tenant::storage_layer::{
|
||||
AsLayerDesc, PersistentLayerDesc, PersistentLayerKey, ValueReconstructState,
|
||||
AsLayerDesc, LayerVisibilityHint, PersistentLayerDesc, PersistentLayerKey,
|
||||
ValueReconstructState,
|
||||
};
|
||||
use crate::tenant::tasks::log_compaction_error;
|
||||
use crate::tenant::timeline::{
|
||||
@@ -69,6 +70,13 @@ use crate::virtual_file::{MaybeFatalIo, VirtualFile};
|
||||
/// Maximum number of deltas before generating an image layer in bottom-most compaction.
|
||||
const COMPACTION_DELTA_THRESHOLD: usize = 5;
|
||||
|
||||
/// Ratio of shard-local pages below which we trigger shard ancestor layer rewrites. 0.3 means that
|
||||
/// <= 30% of layer pages must belong to the descendant shard to rewrite the layer.
|
||||
///
|
||||
/// We choose a value < 0.5 to avoid rewriting all visible layers every time we do a power-of-two
|
||||
/// shard split, which gets expensive for large tenants.
|
||||
const ANCESTOR_COMPACTION_REWRITE_THRESHOLD: f64 = 0.3;
|
||||
|
||||
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
|
||||
pub struct GcCompactionJobId(pub usize);
|
||||
|
||||
@@ -819,7 +827,15 @@ impl KeyHistoryRetention {
|
||||
base_img: &Option<(Lsn, &Bytes)>,
|
||||
history: &[(Lsn, &NeonWalRecord)],
|
||||
tline: &Arc<Timeline>,
|
||||
skip_empty: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
if base_img.is_none() && history.is_empty() {
|
||||
if skip_empty {
|
||||
return Ok(());
|
||||
}
|
||||
anyhow::bail!("verification failed: key {} has no history at {}", key, lsn);
|
||||
};
|
||||
|
||||
let mut records = history
|
||||
.iter()
|
||||
.map(|(lsn, val)| (*lsn, (*val).clone()))
|
||||
@@ -860,17 +876,12 @@ impl KeyHistoryRetention {
|
||||
if *retain_lsn >= min_lsn {
|
||||
// Only verify after the key appears in the full history for the first time.
|
||||
|
||||
if base_img.is_none() && history.is_empty() {
|
||||
anyhow::bail!(
|
||||
"verificatoin failed: key {} has no history at {}",
|
||||
key,
|
||||
retain_lsn
|
||||
);
|
||||
};
|
||||
// We don't modify history: in theory, we could replace the history with a single
|
||||
// image as in `generate_key_retention` to make redos at later LSNs faster. But we
|
||||
// want to verify everything as if they are read from the real layer map.
|
||||
collect_and_verify(key, *retain_lsn, &base_img, &history, tline).await?;
|
||||
collect_and_verify(key, *retain_lsn, &base_img, &history, tline, false)
|
||||
.await
|
||||
.context("below horizon retain_lsn")?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -878,13 +889,17 @@ impl KeyHistoryRetention {
|
||||
match val {
|
||||
Value::Image(img) => {
|
||||
// Above the GC horizon, we verify every time we see an image.
|
||||
collect_and_verify(key, *lsn, &base_img, &history, tline).await?;
|
||||
collect_and_verify(key, *lsn, &base_img, &history, tline, true)
|
||||
.await
|
||||
.context("above horizon full image")?;
|
||||
base_img = Some((*lsn, img));
|
||||
history.clear();
|
||||
}
|
||||
Value::WalRecord(rec) if val.will_init() => {
|
||||
// Above the GC horizon, we verify every time we see an init record.
|
||||
collect_and_verify(key, *lsn, &base_img, &history, tline).await?;
|
||||
collect_and_verify(key, *lsn, &base_img, &history, tline, true)
|
||||
.await
|
||||
.context("above horizon init record")?;
|
||||
base_img = None;
|
||||
history.clear();
|
||||
history.push((*lsn, rec));
|
||||
@@ -895,7 +910,9 @@ impl KeyHistoryRetention {
|
||||
}
|
||||
}
|
||||
// Ensure the latest record is readable.
|
||||
collect_and_verify(key, max_lsn, &base_img, &history, tline).await?;
|
||||
collect_and_verify(key, max_lsn, &base_img, &history, tline, false)
|
||||
.await
|
||||
.context("latest record")?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -1273,7 +1290,10 @@ impl Timeline {
|
||||
let pitr_cutoff = self.gc_info.read().unwrap().cutoffs.time;
|
||||
|
||||
let layers = self.layers.read().await;
|
||||
for layer_desc in layers.layer_map()?.iter_historic_layers() {
|
||||
let layers_iter = layers.layer_map()?.iter_historic_layers();
|
||||
let (layers_total, mut layers_checked) = (layers_iter.len(), 0);
|
||||
for layer_desc in layers_iter {
|
||||
layers_checked += 1;
|
||||
let layer = layers.get_from_desc(&layer_desc);
|
||||
if layer.metadata().shard.shard_count == self.shard_identity.count {
|
||||
// This layer does not belong to a historic ancestor, no need to re-image it.
|
||||
@@ -1317,14 +1337,15 @@ impl Timeline {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Don't bother re-writing a layer unless it will at least halve its size
|
||||
// Only rewrite a layer if we can reclaim significant space.
|
||||
if layer_local_page_count != u32::MAX
|
||||
&& layer_local_page_count > layer_raw_page_count / 2
|
||||
&& layer_local_page_count as f64 / layer_raw_page_count as f64
|
||||
<= ANCESTOR_COMPACTION_REWRITE_THRESHOLD
|
||||
{
|
||||
debug!(%layer,
|
||||
"layer is already mostly local ({}/{}), not rewriting",
|
||||
layer_local_page_count,
|
||||
layer_raw_page_count
|
||||
"layer has a large share of local pages \
|
||||
({layer_local_page_count}/{layer_raw_page_count} > \
|
||||
{ANCESTOR_COMPACTION_REWRITE_THRESHOLD}), not rewriting",
|
||||
);
|
||||
}
|
||||
|
||||
@@ -1336,12 +1357,19 @@ impl Timeline {
|
||||
continue;
|
||||
}
|
||||
|
||||
// We do not yet implement rewrite of delta layers.
|
||||
if layer_desc.is_delta() {
|
||||
// We do not yet implement rewrite of delta layers
|
||||
debug!(%layer, "Skipping rewrite of delta layer");
|
||||
continue;
|
||||
}
|
||||
|
||||
// We don't bother rewriting layers that aren't visible, since these won't be needed by
|
||||
// reads and will likely be garbage collected soon.
|
||||
if layer.visibility() != LayerVisibilityHint::Visible {
|
||||
debug!(%layer, "Skipping rewrite of invisible layer");
|
||||
continue;
|
||||
}
|
||||
|
||||
// Only rewrite layers if their generations differ. This guarantees:
|
||||
// - that local rewrite is safe, as local layer paths will differ between existing layer and rewritten one
|
||||
// - that the layer is persistent in remote storage, as we only see old-generation'd layer via loading from remote storage
|
||||
@@ -1371,7 +1399,8 @@ impl Timeline {
|
||||
}
|
||||
|
||||
info!(
|
||||
"starting shard ancestor compaction, rewriting {} layers and dropping {} layers \
|
||||
"starting shard ancestor compaction, rewriting {} layers and dropping {} layers, \
|
||||
checked {layers_checked}/{layers_total} layers \
|
||||
(latest_gc_cutoff={} pitr_cutoff={})",
|
||||
layers_to_rewrite.len(),
|
||||
drop_layers.len(),
|
||||
|
||||
@@ -244,7 +244,8 @@ impl RemoteStorageWrapper {
|
||||
kind: DownloadKind::Large,
|
||||
etag: None,
|
||||
byte_start: Bound::Included(start_inclusive),
|
||||
byte_end: Bound::Excluded(end_exclusive)
|
||||
byte_end: Bound::Excluded(end_exclusive),
|
||||
encryption_key: None,
|
||||
},
|
||||
&self.cancel)
|
||||
.await?;
|
||||
|
||||
@@ -3,19 +3,35 @@
|
||||
* Create a Neon project on staging.
|
||||
* Grant the superuser privileges to the DB user.
|
||||
* (Optional) create a branch for testing
|
||||
* Configure the endpoint by updating the control-plane database with the following settings:
|
||||
* Add the following settings to the `pg_settings` section of the default endpoint configuration for the project using the admin interface:
|
||||
* `Timeone`: `America/Los_Angeles`
|
||||
* `DateStyle`: `Postgres,MDY`
|
||||
* `compute_query_id`: `off`
|
||||
* Add the following section to the project configuration:
|
||||
```json
|
||||
"preload_libraries": {
|
||||
"use_defaults": false,
|
||||
"enabled_libraries": []
|
||||
}
|
||||
```
|
||||
* Checkout the actual `Neon` sources
|
||||
* Patch the sql and expected files for the specific PostgreSQL version, e.g. for v17:
|
||||
```bash
|
||||
$ cd vendor/postgres-v17
|
||||
$ patch -p1 <../../compute/patches/cloud_regress_pg17.patch
|
||||
```
|
||||
* Set the environment variables (please modify according your configuration):
|
||||
```bash
|
||||
$ export DEFAULT_PG_VERSION=17
|
||||
$ export BUILD_TYPE=release
|
||||
```
|
||||
* Build the Neon binaries see [README.md](../../README.md)
|
||||
* Set the environment variable `BENCHMARK_CONNSTR` to the connection URI of your project.
|
||||
* Set the environment variable `PG_VERSION` to the version of your project.
|
||||
* Update poetry, run
|
||||
```bash
|
||||
$ scripts/pysync
|
||||
```
|
||||
* Run
|
||||
```bash
|
||||
$ pytest -m remote_cluster -k cloud_regress
|
||||
$ scripts/pytest -m remote_cluster -k cloud_regress
|
||||
```
|
||||
@@ -199,7 +199,7 @@ def wait_for_last_record_lsn(
|
||||
"""waits for pageserver to catch up to a certain lsn, returns the last observed lsn."""
|
||||
|
||||
current_lsn = Lsn(0)
|
||||
for i in range(1000):
|
||||
for i in range(2000):
|
||||
current_lsn = last_record_lsn(pageserver_http, tenant, timeline)
|
||||
if current_lsn >= lsn:
|
||||
return current_lsn
|
||||
|
||||
@@ -97,6 +97,7 @@ def test_branch_creation_heavy_write(neon_compare: NeonCompare, n_branches: int)
|
||||
_record_branch_creation_durations(neon_compare, branch_creation_durations)
|
||||
|
||||
|
||||
@pytest.mark.timeout(1000)
|
||||
@pytest.mark.parametrize("n_branches", [500, 1024])
|
||||
@pytest.mark.parametrize("shape", ["one_ancestor", "random"])
|
||||
def test_branch_creation_many(neon_compare: NeonCompare, n_branches: int, shape: str):
|
||||
@@ -205,7 +206,7 @@ def wait_and_record_startup_metrics(
|
||||
assert len(matching) == len(expected_labels)
|
||||
return matching
|
||||
|
||||
samples = wait_until(metrics_are_filled)
|
||||
samples = wait_until(metrics_are_filled, timeout=60)
|
||||
|
||||
for sample in samples:
|
||||
phase = sample.labels["phase"]
|
||||
|
||||
@@ -52,6 +52,8 @@ def test_ingest_insert_bulk(
|
||||
# would compete with Pageserver for bandwidth.
|
||||
# neon_env_builder.enable_safekeeper_remote_storage(s3_storage())
|
||||
|
||||
neon_env_builder.pageserver_config_override = "wait_lsn_timeout='600 s'"
|
||||
|
||||
neon_env_builder.disable_scrub_on_exit() # immediate shutdown may leave stray layers
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
@@ -92,7 +94,18 @@ def test_ingest_insert_bulk(
|
||||
worker_rows = rows / CONCURRENCY
|
||||
pool.submit(insert_rows, endpoint, f"table{i}", worker_rows, value)
|
||||
|
||||
end_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0])
|
||||
for attempt in range(5):
|
||||
try:
|
||||
end_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0])
|
||||
break
|
||||
except Exception as e:
|
||||
# if we disable backpressure, postgres can become unresponsive for longer than a minute
|
||||
# and new connection attempts time out in postgres after 1 minute
|
||||
# so if this happens we retry new connection
|
||||
log.error(f"Attempt {attempt + 1}/5: Failed to select current wal lsn: {e}")
|
||||
if attempt == 4:
|
||||
log.error("Exceeded maximum retry attempts for selecting current wal lsn")
|
||||
raise
|
||||
|
||||
# Wait for pageserver to ingest the WAL.
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
@@ -13,7 +13,7 @@ from fixtures.neon_fixtures import (
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.timeout(600)
|
||||
@pytest.mark.timeout(1200)
|
||||
@pytest.mark.parametrize("shard_count", [1, 8, 32])
|
||||
@pytest.mark.parametrize(
|
||||
"wal_receiver_protocol",
|
||||
|
||||
Reference in New Issue
Block a user