mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-02 21:10:38 +00:00
Compare commits
8 Commits
skyzh/keys
...
arpad/remo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3bc3f71418 | ||
|
|
19bea5fd0c | ||
|
|
5be94e28c4 | ||
|
|
63a106021a | ||
|
|
9a6ace9bde | ||
|
|
8c77ccfc01 | ||
|
|
cbd2fc2395 | ||
|
|
028a191040 |
1
.github/actionlint.yml
vendored
1
.github/actionlint.yml
vendored
@@ -6,6 +6,7 @@ self-hosted-runner:
|
||||
- small
|
||||
- small-metal
|
||||
- small-arm64
|
||||
- unit-perf
|
||||
- us-east-2
|
||||
config-variables:
|
||||
- AWS_ECR_REGION
|
||||
|
||||
@@ -70,6 +70,7 @@ runs:
|
||||
|
||||
- name: Install Allure
|
||||
shell: bash -euxo pipefail {0}
|
||||
working-directory: /tmp
|
||||
run: |
|
||||
if ! which allure; then
|
||||
ALLURE_ZIP=allure-${ALLURE_VERSION}.zip
|
||||
|
||||
4
.github/workflows/build_and_test.yml
vendored
4
.github/workflows/build_and_test.yml
vendored
@@ -284,7 +284,7 @@ jobs:
|
||||
statuses: write
|
||||
contents: write
|
||||
pull-requests: write
|
||||
runs-on: [ self-hosted, small-metal ]
|
||||
runs-on: [ self-hosted, unit-perf ]
|
||||
container:
|
||||
image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
|
||||
credentials:
|
||||
@@ -1271,7 +1271,7 @@ jobs:
|
||||
exit 1
|
||||
|
||||
deploy:
|
||||
needs: [ check-permissions, push-neon-image-dev, push-compute-image-dev, push-neon-image-prod, push-compute-image-prod, meta, build-and-test-locally, trigger-custom-extensions-build-and-wait ]
|
||||
needs: [ check-permissions, push-neon-image-dev, push-compute-image-dev, push-neon-image-prod, push-compute-image-prod, meta, trigger-custom-extensions-build-and-wait ]
|
||||
# `!failure() && !cancelled()` is required because the workflow depends on the job that can be skipped: `push-neon-image-prod` and `push-compute-image-prod`
|
||||
if: ${{ contains(fromJSON('["push-main", "storage-release", "proxy-release", "compute-release"]'), needs.meta.outputs.run-kind) && !failure() && !cancelled() }}
|
||||
permissions:
|
||||
|
||||
11
Cargo.lock
generated
11
Cargo.lock
generated
@@ -5495,6 +5495,17 @@ version = "1.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c707298afce11da2efef2f600116fa93ffa7a032b5d7b628aa17711ec81383ca"
|
||||
|
||||
[[package]]
|
||||
name = "remote_keys"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"aws-config",
|
||||
"aws-sdk-kms",
|
||||
"aws-smithy-types",
|
||||
"utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "remote_storage"
|
||||
version = "0.1.0"
|
||||
|
||||
@@ -30,6 +30,7 @@ members = [
|
||||
"libs/tenant_size_model",
|
||||
"libs/metrics",
|
||||
"libs/postgres_connection",
|
||||
"libs/remote_keys",
|
||||
"libs/remote_storage",
|
||||
"libs/tracing-utils",
|
||||
"libs/postgres_ffi/wal_craft",
|
||||
|
||||
@@ -139,7 +139,7 @@ fn main() -> Result<()> {
|
||||
|
||||
let scenario = failpoint_support::init();
|
||||
|
||||
// For historical reasons, the main thread that processes the spec and launches postgres
|
||||
// For historical reasons, the main thread that processes the config and launches postgres
|
||||
// is synchronous, but we always have this tokio runtime available and we "enter" it so
|
||||
// that you can use tokio::spawn() and tokio::runtime::Handle::current().block_on(...)
|
||||
// from all parts of compute_ctl.
|
||||
@@ -155,7 +155,7 @@ fn main() -> Result<()> {
|
||||
|
||||
let connstr = Url::parse(&cli.connstr).context("cannot parse connstr as a URL")?;
|
||||
|
||||
let cli_spec = get_config(&cli)?;
|
||||
let config = get_config(&cli)?;
|
||||
|
||||
let compute_node = ComputeNode::new(
|
||||
ComputeNodeParams {
|
||||
@@ -176,8 +176,7 @@ fn main() -> Result<()> {
|
||||
#[cfg(target_os = "linux")]
|
||||
vm_monitor_addr: cli.vm_monitor_addr,
|
||||
},
|
||||
cli_spec.spec,
|
||||
cli_spec.compute_ctl_config,
|
||||
config,
|
||||
)?;
|
||||
|
||||
let exit_code = compute_node.run()?;
|
||||
|
||||
@@ -11,7 +11,7 @@ use std::{env, fs};
|
||||
use anyhow::{Context, Result};
|
||||
use chrono::{DateTime, Utc};
|
||||
use compute_api::privilege::Privilege;
|
||||
use compute_api::responses::{ComputeCtlConfig, ComputeMetrics, ComputeStatus};
|
||||
use compute_api::responses::{ComputeConfig, ComputeCtlConfig, ComputeMetrics, ComputeStatus};
|
||||
use compute_api::spec::{
|
||||
ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, PgIdent,
|
||||
};
|
||||
@@ -303,11 +303,7 @@ struct StartVmMonitorResult {
|
||||
}
|
||||
|
||||
impl ComputeNode {
|
||||
pub fn new(
|
||||
params: ComputeNodeParams,
|
||||
cli_spec: Option<ComputeSpec>,
|
||||
compute_ctl_config: ComputeCtlConfig,
|
||||
) -> Result<Self> {
|
||||
pub fn new(params: ComputeNodeParams, config: ComputeConfig) -> Result<Self> {
|
||||
let connstr = params.connstr.as_str();
|
||||
let conn_conf = postgres::config::Config::from_str(connstr)
|
||||
.context("cannot build postgres config from connstr")?;
|
||||
@@ -315,8 +311,8 @@ impl ComputeNode {
|
||||
.context("cannot build tokio postgres config from connstr")?;
|
||||
|
||||
let mut new_state = ComputeState::new();
|
||||
if let Some(cli_spec) = cli_spec {
|
||||
let pspec = ParsedSpec::try_from(cli_spec).map_err(|msg| anyhow::anyhow!(msg))?;
|
||||
if let Some(spec) = config.spec {
|
||||
let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
|
||||
new_state.pspec = Some(pspec);
|
||||
}
|
||||
|
||||
@@ -327,7 +323,7 @@ impl ComputeNode {
|
||||
state: Mutex::new(new_state),
|
||||
state_changed: Condvar::new(),
|
||||
ext_download_progress: RwLock::new(HashMap::new()),
|
||||
compute_ctl_config,
|
||||
compute_ctl_config: config.compute_ctl_config,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -11,7 +11,7 @@ use futures::future::BoxFuture;
|
||||
use http::{Request, Response, StatusCode};
|
||||
use jsonwebtoken::{Algorithm, DecodingKey, TokenData, Validation, jwk::JwkSet};
|
||||
use tower_http::auth::AsyncAuthorizeRequest;
|
||||
use tracing::warn;
|
||||
use tracing::{debug, warn};
|
||||
|
||||
use crate::http::{JsonResponse, extract::RequestId};
|
||||
|
||||
@@ -92,7 +92,7 @@ impl AsyncAuthorizeRequest<Body> for Authorize {
|
||||
if data.claims.compute_id != compute_id {
|
||||
return Err(JsonResponse::error(
|
||||
StatusCode::UNAUTHORIZED,
|
||||
"invalid claims in authorization token",
|
||||
"invalid compute ID in authorization token claims",
|
||||
));
|
||||
}
|
||||
|
||||
@@ -112,12 +112,14 @@ impl Authorize {
|
||||
token: &str,
|
||||
validation: &Validation,
|
||||
) -> Result<TokenData<ComputeClaims>> {
|
||||
debug!("verifying token {}", token);
|
||||
|
||||
for jwk in jwks.keys.iter() {
|
||||
let decoding_key = match DecodingKey::from_jwk(jwk) {
|
||||
Ok(key) => key,
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"Failed to construct decoding key from {}: {}",
|
||||
"failed to construct decoding key from {}: {}",
|
||||
jwk.common.key_id.as_ref().unwrap(),
|
||||
e
|
||||
);
|
||||
@@ -130,7 +132,7 @@ impl Authorize {
|
||||
Ok(data) => return Ok(data),
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"Failed to decode authorization token using {}: {}",
|
||||
"failed to decode authorization token using {}: {}",
|
||||
jwk.common.key_id.as_ref().unwrap(),
|
||||
e
|
||||
);
|
||||
@@ -140,6 +142,6 @@ impl Authorize {
|
||||
}
|
||||
}
|
||||
|
||||
Err(anyhow!("Failed to verify authorization token"))
|
||||
Err(anyhow!("failed to verify authorization token"))
|
||||
}
|
||||
}
|
||||
|
||||
0
explained_queries.sql
Normal file
0
explained_queries.sql
Normal file
12
libs/remote_keys/Cargo.toml
Normal file
12
libs/remote_keys/Cargo.toml
Normal file
@@ -0,0 +1,12 @@
|
||||
[package]
|
||||
name = "remote_keys"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
aws-smithy-types.workspace = true
|
||||
aws-sdk-kms.workspace = true
|
||||
aws-config.workspace = true
|
||||
utils.workspace = true
|
||||
47
libs/remote_keys/src/aws_keys.rs
Normal file
47
libs/remote_keys/src/aws_keys.rs
Normal file
@@ -0,0 +1,47 @@
|
||||
use aws_config::BehaviorVersion;
|
||||
|
||||
use crate::KeyId;
|
||||
|
||||
pub struct AwsRemoteKeyClient {
|
||||
client: aws_sdk_kms::Client,
|
||||
}
|
||||
|
||||
impl AwsRemoteKeyClient {
|
||||
pub async fn new() -> Self {
|
||||
let sdk_config = aws_config::defaults(BehaviorVersion::v2024_03_28())
|
||||
.retry_config(
|
||||
aws_config::retry::RetryConfig::standard()
|
||||
.with_max_attempts(5) // Retry up to 5 times
|
||||
.with_initial_backoff(std::time::Duration::from_millis(200)) // Start with 200ms delay
|
||||
.with_max_backoff(std::time::Duration::from_secs(5)), // Cap at 5 seconds
|
||||
)
|
||||
.load()
|
||||
.await;
|
||||
let client = aws_sdk_kms::Client::new(&sdk_config);
|
||||
Self { client }
|
||||
}
|
||||
|
||||
pub async fn decrypt(&self, key_id: &KeyId, ciphertext: impl Into<Vec<u8>>) -> Vec<u8> {
|
||||
let output = self
|
||||
.client
|
||||
.decrypt()
|
||||
.key_id(&key_id.0)
|
||||
.ciphertext_blob(aws_smithy_types::Blob::new(ciphertext.into()))
|
||||
.send()
|
||||
.await
|
||||
.expect("decrypt");
|
||||
output.plaintext.expect("plaintext").into_inner()
|
||||
}
|
||||
|
||||
pub async fn encrypt(&self, key_id: &KeyId, ciphertext: impl Into<Vec<u8>>) -> Vec<u8> {
|
||||
let output = self
|
||||
.client
|
||||
.encrypt()
|
||||
.key_id(&key_id.0)
|
||||
.plaintext(aws_smithy_types::Blob::new(ciphertext.into()))
|
||||
.send()
|
||||
.await
|
||||
.expect("decrypt");
|
||||
output.ciphertext_blob.expect("ciphertext").into_inner()
|
||||
}
|
||||
}
|
||||
6
libs/remote_keys/src/lib.rs
Normal file
6
libs/remote_keys/src/lib.rs
Normal file
@@ -0,0 +1,6 @@
|
||||
mod aws_keys;
|
||||
pub use aws_keys::AwsRemoteKeyClient;
|
||||
|
||||
/// A string uniquely identifying a key
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub struct KeyId(pub String);
|
||||
@@ -714,7 +714,7 @@ impl LayerMap {
|
||||
true
|
||||
}
|
||||
|
||||
pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<PersistentLayerDesc>> {
|
||||
pub fn iter_historic_layers(&self) -> impl ExactSizeIterator<Item = Arc<PersistentLayerDesc>> {
|
||||
self.historic.iter()
|
||||
}
|
||||
|
||||
|
||||
@@ -504,7 +504,7 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
|
||||
}
|
||||
|
||||
/// Iterate all the layers
|
||||
pub fn iter(&self) -> impl '_ + Iterator<Item = Value> {
|
||||
pub fn iter(&self) -> impl ExactSizeIterator<Item = Value> {
|
||||
// NOTE we can actually perform this without rebuilding,
|
||||
// but it's not necessary for now.
|
||||
if !self.buffer.is_empty() {
|
||||
|
||||
@@ -1273,7 +1273,10 @@ impl Timeline {
|
||||
let pitr_cutoff = self.gc_info.read().unwrap().cutoffs.time;
|
||||
|
||||
let layers = self.layers.read().await;
|
||||
for layer_desc in layers.layer_map()?.iter_historic_layers() {
|
||||
let layers_iter = layers.layer_map()?.iter_historic_layers();
|
||||
let (layers_total, mut layers_checked) = (layers_iter.len(), 0);
|
||||
for layer_desc in layers_iter {
|
||||
layers_checked += 1;
|
||||
let layer = layers.get_from_desc(&layer_desc);
|
||||
if layer.metadata().shard.shard_count == self.shard_identity.count {
|
||||
// This layer does not belong to a historic ancestor, no need to re-image it.
|
||||
@@ -1371,7 +1374,8 @@ impl Timeline {
|
||||
}
|
||||
|
||||
info!(
|
||||
"starting shard ancestor compaction, rewriting {} layers and dropping {} layers \
|
||||
"starting shard ancestor compaction, rewriting {} layers and dropping {} layers, \
|
||||
checked {layers_checked}/{layers_total} layers \
|
||||
(latest_gc_cutoff={} pitr_cutoff={})",
|
||||
layers_to_rewrite.len(),
|
||||
drop_layers.len(),
|
||||
|
||||
@@ -3,19 +3,35 @@
|
||||
* Create a Neon project on staging.
|
||||
* Grant the superuser privileges to the DB user.
|
||||
* (Optional) create a branch for testing
|
||||
* Configure the endpoint by updating the control-plane database with the following settings:
|
||||
* Add the following settings to the `pg_settings` section of the default endpoint configuration for the project using the admin interface:
|
||||
* `Timeone`: `America/Los_Angeles`
|
||||
* `DateStyle`: `Postgres,MDY`
|
||||
* `compute_query_id`: `off`
|
||||
* Add the following section to the project configuration:
|
||||
```json
|
||||
"preload_libraries": {
|
||||
"use_defaults": false,
|
||||
"enabled_libraries": []
|
||||
}
|
||||
```
|
||||
* Checkout the actual `Neon` sources
|
||||
* Patch the sql and expected files for the specific PostgreSQL version, e.g. for v17:
|
||||
```bash
|
||||
$ cd vendor/postgres-v17
|
||||
$ patch -p1 <../../compute/patches/cloud_regress_pg17.patch
|
||||
```
|
||||
* Set the environment variables (please modify according your configuration):
|
||||
```bash
|
||||
$ export DEFAULT_PG_VERSION=17
|
||||
$ export BUILD_TYPE=release
|
||||
```
|
||||
* Build the Neon binaries see [README.md](../../README.md)
|
||||
* Set the environment variable `BENCHMARK_CONNSTR` to the connection URI of your project.
|
||||
* Set the environment variable `PG_VERSION` to the version of your project.
|
||||
* Update poetry, run
|
||||
```bash
|
||||
$ scripts/pysync
|
||||
```
|
||||
* Run
|
||||
```bash
|
||||
$ pytest -m remote_cluster -k cloud_regress
|
||||
$ scripts/pytest -m remote_cluster -k cloud_regress
|
||||
```
|
||||
@@ -199,7 +199,7 @@ def wait_for_last_record_lsn(
|
||||
"""waits for pageserver to catch up to a certain lsn, returns the last observed lsn."""
|
||||
|
||||
current_lsn = Lsn(0)
|
||||
for i in range(1000):
|
||||
for i in range(2000):
|
||||
current_lsn = last_record_lsn(pageserver_http, tenant, timeline)
|
||||
if current_lsn >= lsn:
|
||||
return current_lsn
|
||||
|
||||
@@ -97,6 +97,7 @@ def test_branch_creation_heavy_write(neon_compare: NeonCompare, n_branches: int)
|
||||
_record_branch_creation_durations(neon_compare, branch_creation_durations)
|
||||
|
||||
|
||||
@pytest.mark.timeout(1000)
|
||||
@pytest.mark.parametrize("n_branches", [500, 1024])
|
||||
@pytest.mark.parametrize("shape", ["one_ancestor", "random"])
|
||||
def test_branch_creation_many(neon_compare: NeonCompare, n_branches: int, shape: str):
|
||||
@@ -205,7 +206,7 @@ def wait_and_record_startup_metrics(
|
||||
assert len(matching) == len(expected_labels)
|
||||
return matching
|
||||
|
||||
samples = wait_until(metrics_are_filled)
|
||||
samples = wait_until(metrics_are_filled, timeout=60)
|
||||
|
||||
for sample in samples:
|
||||
phase = sample.labels["phase"]
|
||||
|
||||
@@ -52,6 +52,8 @@ def test_ingest_insert_bulk(
|
||||
# would compete with Pageserver for bandwidth.
|
||||
# neon_env_builder.enable_safekeeper_remote_storage(s3_storage())
|
||||
|
||||
neon_env_builder.pageserver_config_override = "wait_lsn_timeout='600 s'"
|
||||
|
||||
neon_env_builder.disable_scrub_on_exit() # immediate shutdown may leave stray layers
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
@@ -92,7 +94,18 @@ def test_ingest_insert_bulk(
|
||||
worker_rows = rows / CONCURRENCY
|
||||
pool.submit(insert_rows, endpoint, f"table{i}", worker_rows, value)
|
||||
|
||||
end_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0])
|
||||
for attempt in range(5):
|
||||
try:
|
||||
end_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0])
|
||||
break
|
||||
except Exception as e:
|
||||
# if we disable backpressure, postgres can become unresponsive for longer than a minute
|
||||
# and new connection attempts time out in postgres after 1 minute
|
||||
# so if this happens we retry new connection
|
||||
log.error(f"Attempt {attempt + 1}/5: Failed to select current wal lsn: {e}")
|
||||
if attempt == 4:
|
||||
log.error("Exceeded maximum retry attempts for selecting current wal lsn")
|
||||
raise
|
||||
|
||||
# Wait for pageserver to ingest the WAL.
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
@@ -13,7 +13,7 @@ from fixtures.neon_fixtures import (
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.timeout(600)
|
||||
@pytest.mark.timeout(1200)
|
||||
@pytest.mark.parametrize("shard_count", [1, 8, 32])
|
||||
@pytest.mark.parametrize(
|
||||
"wal_receiver_protocol",
|
||||
|
||||
Reference in New Issue
Block a user