Compare commits

..

6 Commits

Author SHA1 Message Date
Conrad Ludgate
a4a72c8075 support TLS for sql-over-http 2024-12-18 12:17:31 +00:00
Conrad Ludgate
90ce4f3002 properly remove clones 2024-12-18 12:06:57 +00:00
Conrad Ludgate
b79a1dd337 reduce cloning 2024-12-18 11:57:10 +00:00
Conrad Ludgate
bbc799ce77 chore(proxy): pre-load native tls certificates and propagate compute client config 2024-12-18 09:29:42 +00:00
Conrad Ludgate
cd0924c686 fmt 2024-12-17 19:43:23 +00:00
Conrad Ludgate
2548926ea6 chore(proxy): fully remove allow-self-signed-compute flag 2024-12-17 19:32:42 +00:00
76 changed files with 539 additions and 1222 deletions

View File

@@ -308,7 +308,6 @@ jobs:
"image": [ "'"$image_default"'" ],
"include": [{ "pg_version": 16, "region_id": "'"$region_id_default"'", "platform": "neonvm-captest-freetier", "db_size": "3gb" ,"runner": '"$runner_default"', "image": "'"$image_default"'" },
{ "pg_version": 16, "region_id": "'"$region_id_default"'", "platform": "neonvm-captest-new", "db_size": "10gb","runner": '"$runner_default"', "image": "'"$image_default"'" },
{ "pg_version": 16, "region_id": "'"$region_id_default"'", "platform": "neonvm-captest-new-many-tables","db_size": "10gb","runner": '"$runner_default"', "image": "'"$image_default"'" },
{ "pg_version": 16, "region_id": "'"$region_id_default"'", "platform": "neonvm-captest-new", "db_size": "50gb","runner": '"$runner_default"', "image": "'"$image_default"'" },
{ "pg_version": 16, "region_id": "azure-eastus2", "platform": "neonvm-azure-captest-freetier", "db_size": "3gb" ,"runner": '"$runner_azure"', "image": "neondatabase/build-tools:pinned-bookworm" },
{ "pg_version": 16, "region_id": "azure-eastus2", "platform": "neonvm-azure-captest-new", "db_size": "10gb","runner": '"$runner_azure"', "image": "neondatabase/build-tools:pinned-bookworm" },
@@ -411,7 +410,7 @@ jobs:
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Create Neon Project
if: contains(fromJson('["neonvm-captest-new", "neonvm-captest-new-many-tables", "neonvm-captest-freetier", "neonvm-azure-captest-freetier", "neonvm-azure-captest-new"]'), matrix.platform)
if: contains(fromJson('["neonvm-captest-new", "neonvm-captest-freetier", "neonvm-azure-captest-freetier", "neonvm-azure-captest-new"]'), matrix.platform)
id: create-neon-project
uses: ./.github/actions/neon-project-create
with:
@@ -430,7 +429,7 @@ jobs:
neonvm-captest-sharding-reuse)
CONNSTR=${{ secrets.BENCHMARK_CAPTEST_SHARDING_CONNSTR }}
;;
neonvm-captest-new | neonvm-captest-new-many-tables | neonvm-captest-freetier | neonvm-azure-captest-new | neonvm-azure-captest-freetier)
neonvm-captest-new | neonvm-captest-freetier | neonvm-azure-captest-new | neonvm-azure-captest-freetier)
CONNSTR=${{ steps.create-neon-project.outputs.dsn }}
;;
rds-aurora)
@@ -447,26 +446,6 @@ jobs:
echo "connstr=${CONNSTR}" >> $GITHUB_OUTPUT
# we want to compare Neon project OLTP throughput and latency at scale factor 10 GB
# without (neonvm-captest-new)
# and with (neonvm-captest-new-many-tables) many relations in the database
- name: Create many relations before the run
if: contains(fromJson('["neonvm-captest-new-many-tables"]'), matrix.platform)
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_perf_many_relations
pg_version: ${{ env.DEFAULT_PG_VERSION }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
TEST_NUM_RELATIONS: 10000
- name: Benchmark init
uses: ./.github/actions/run-python-test-set
with:

View File

@@ -21,8 +21,6 @@ concurrency:
permissions:
id-token: write # aws-actions/configure-aws-credentials
statuses: write
contents: write
jobs:
regress:

2
.gitignore vendored
View File

@@ -25,5 +25,3 @@ compaction-suite-results.*
# pgindent typedef lists
*.list
venv/

View File

@@ -1556,30 +1556,28 @@ RUN apt update && \
locales \
procps \
ca-certificates \
curl \
unzip \
$VERSION_INSTALLS && \
apt clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \
localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8
# aws cli is used by fast_import (curl and unzip above are at this time only used for this installation step)
# s5cmd 2.2.2 from https://github.com/peak/s5cmd/releases/tag/v2.2.2
# used by fast_import
ARG TARGETARCH
ADD https://github.com/peak/s5cmd/releases/download/v2.2.2/s5cmd_2.2.2_linux_$TARGETARCH.deb /tmp/s5cmd.deb
RUN set -ex; \
\
# Determine the expected checksum based on TARGETARCH
if [ "${TARGETARCH}" = "amd64" ]; then \
TARGETARCH_ALT="x86_64"; \
CHECKSUM="c9a9df3770a3ff9259cb469b6179e02829687a464e0824d5c32d378820b53a00"; \
CHECKSUM="392c385320cd5ffa435759a95af77c215553d967e4b1c0fffe52e4f14c29cf85"; \
elif [ "${TARGETARCH}" = "arm64" ]; then \
TARGETARCH_ALT="aarch64"; \
CHECKSUM="8181730be7891582b38b028112e81b4899ca817e8c616aad807c9e9d1289223a"; \
CHECKSUM="939bee3cf4b5604ddb00e67f8c157b91d7c7a5b553d1fbb6890fad32894b7b46"; \
else \
echo "Unsupported architecture: ${TARGETARCH}"; exit 1; \
fi; \
curl -L "https://awscli.amazonaws.com/awscli-exe-linux-${TARGETARCH_ALT}-2.17.5.zip" -o /tmp/awscliv2.zip; \
echo "${CHECKSUM} /tmp/awscliv2.zip" | sha256sum -c -; \
unzip /tmp/awscliv2.zip -d /tmp/awscliv2; \
/tmp/awscliv2/aws/install; \
rm -rf /tmp/awscliv2.zip /tmp/awscliv2; \
true
\
# Compute and validate the checksum
echo "${CHECKSUM} /tmp/s5cmd.deb" | sha256sum -c -
RUN dpkg -i /tmp/s5cmd.deb && rm /tmp/s5cmd.deb
ENV LANG=en_US.utf8
USER postgres

View File

@@ -34,12 +34,12 @@ use nix::unistd::Pid;
use tracing::{info, info_span, warn, Instrument};
use utils::fs_ext::is_directory_empty;
#[path = "fast_import/aws_s3_sync.rs"]
mod aws_s3_sync;
#[path = "fast_import/child_stdio_to_log.rs"]
mod child_stdio_to_log;
#[path = "fast_import/s3_uri.rs"]
mod s3_uri;
#[path = "fast_import/s5cmd.rs"]
mod s5cmd;
#[derive(clap::Parser)]
struct Args {
@@ -326,7 +326,7 @@ pub(crate) async fn main() -> anyhow::Result<()> {
}
info!("upload pgdata");
aws_s3_sync::sync(Utf8Path::new(&pgdata_dir), &s3_prefix.append("/pgdata/"))
s5cmd::sync(Utf8Path::new(&pgdata_dir), &s3_prefix.append("/"))
.await
.context("sync dump directory to destination")?;
@@ -334,10 +334,10 @@ pub(crate) async fn main() -> anyhow::Result<()> {
{
let status_dir = working_directory.join("status");
std::fs::create_dir(&status_dir).context("create status directory")?;
let status_file = status_dir.join("pgdata");
let status_file = status_dir.join("status");
std::fs::write(&status_file, serde_json::json!({"done": true}).to_string())
.context("write status file")?;
aws_s3_sync::sync(&status_dir, &s3_prefix.append("/status/"))
s5cmd::sync(&status_file, &s3_prefix.append("/status/pgdata"))
.await
.context("sync status directory to destination")?;
}

View File

@@ -4,21 +4,24 @@ use camino::Utf8Path;
use super::s3_uri::S3Uri;
pub(crate) async fn sync(local: &Utf8Path, remote: &S3Uri) -> anyhow::Result<()> {
let mut builder = tokio::process::Command::new("aws");
let mut builder = tokio::process::Command::new("s5cmd");
// s5cmd uses aws-sdk-go v1, hence doesn't support AWS_ENDPOINT_URL
if let Some(val) = std::env::var_os("AWS_ENDPOINT_URL") {
builder.arg("--endpoint-url").arg(val);
}
builder
.arg("s3")
.arg("sync")
.arg(local.as_str())
.arg(remote.to_string());
let st = builder
.spawn()
.context("spawn aws s3 sync")?
.context("spawn s5cmd")?
.wait()
.await
.context("wait for aws s3 sync")?;
.context("wait for s5cmd")?;
if st.success() {
Ok(())
} else {
Err(anyhow::anyhow!("aws s3 sync failed"))
Err(anyhow::anyhow!("s5cmd failed"))
}
}

View File

@@ -19,7 +19,6 @@ use control_plane::storage_controller::{
NeonStorageControllerStartArgs, NeonStorageControllerStopArgs, StorageController,
};
use control_plane::{broker, local_env};
use nix::fcntl::{flock, FlockArg};
use pageserver_api::config::{
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT,
@@ -37,8 +36,6 @@ use safekeeper_api::{
};
use std::borrow::Cow;
use std::collections::{BTreeSet, HashMap};
use std::fs::File;
use std::os::fd::AsRawFd;
use std::path::PathBuf;
use std::process::exit;
use std::str::FromStr;
@@ -692,21 +689,6 @@ struct TimelineTreeEl {
pub children: BTreeSet<TimelineId>,
}
/// A flock-based guard over the neon_local repository directory
struct RepoLock {
_file: File,
}
impl RepoLock {
fn new() -> Result<Self> {
let repo_dir = File::open(local_env::base_path())?;
let repo_dir_fd = repo_dir.as_raw_fd();
flock(repo_dir_fd, FlockArg::LockExclusive)?;
Ok(Self { _file: repo_dir })
}
}
// Main entry point for the 'neon_local' CLI utility
//
// This utility helps to manage neon installation. That includes following:
@@ -718,14 +700,9 @@ fn main() -> Result<()> {
let cli = Cli::parse();
// Check for 'neon init' command first.
let (subcommand_result, _lock) = if let NeonLocalCmd::Init(args) = cli.command {
(handle_init(&args).map(|env| Some(Cow::Owned(env))), None)
let subcommand_result = if let NeonLocalCmd::Init(args) = cli.command {
handle_init(&args).map(|env| Some(Cow::Owned(env)))
} else {
// This tool uses a collection of simple files to store its state, and consequently
// it is not generally safe to run multiple commands concurrently. Rather than expect
// all callers to know this, use a lock file to protect against concurrent execution.
let _repo_lock = RepoLock::new().unwrap();
// all other commands need an existing config
let env = LocalEnv::load_config(&local_env::base_path()).context("Error loading config")?;
let original_env = env.clone();
@@ -751,12 +728,11 @@ fn main() -> Result<()> {
NeonLocalCmd::Mappings(subcmd) => handle_mappings(&subcmd, env),
};
let subcommand_result = if &original_env != env {
if &original_env != env {
subcommand_result.map(|()| Some(Cow::Borrowed(env)))
} else {
subcommand_result.map(|()| None)
};
(subcommand_result, Some(_repo_lock))
}
};
match subcommand_result {
@@ -946,7 +922,7 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
} else {
// User (likely interactive) did not provide a description of the environment, give them the default
NeonLocalInitConf {
control_plane_api: Some(DEFAULT_PAGESERVER_CONTROL_PLANE_API.parse().unwrap()),
control_plane_api: Some(Some(DEFAULT_PAGESERVER_CONTROL_PLANE_API.parse().unwrap())),
broker: NeonBroker {
listen_addr: DEFAULT_BROKER_ADDR.parse().unwrap(),
},
@@ -1742,15 +1718,18 @@ async fn handle_start_all_impl(
broker::start_broker_process(env, &retry_timeout).await
});
js.spawn(async move {
let storage_controller = StorageController::from_env(env);
storage_controller
.start(NeonStorageControllerStartArgs::with_default_instance_id(
retry_timeout,
))
.await
.map_err(|e| e.context("start storage_controller"))
});
// Only start the storage controller if the pageserver is configured to need it
if env.control_plane_api.is_some() {
js.spawn(async move {
let storage_controller = StorageController::from_env(env);
storage_controller
.start(NeonStorageControllerStartArgs::with_default_instance_id(
retry_timeout,
))
.await
.map_err(|e| e.context("start storage_controller"))
});
}
for ps_conf in &env.pageservers {
js.spawn(async move {
@@ -1795,6 +1774,10 @@ async fn neon_start_status_check(
const RETRY_INTERVAL: Duration = Duration::from_millis(100);
const NOTICE_AFTER_RETRIES: Duration = Duration::from_secs(5);
if env.control_plane_api.is_none() {
return Ok(());
}
let storcon = StorageController::from_env(env);
let retries = retry_timeout.as_millis() / RETRY_INTERVAL.as_millis();

View File

@@ -316,10 +316,6 @@ impl Endpoint {
// and can cause errors like 'no unpinned buffers available', see
// <https://github.com/neondatabase/neon/issues/9956>
conf.append("shared_buffers", "1MB");
// Postgres defaults to effective_io_concurrency=1, which does not exercise the pageserver's
// batching logic. Set this to 2 so that we exercise the code a bit without letting
// individual tests do a lot of concurrent work on underpowered test machines
conf.append("effective_io_concurrency", "2");
conf.append("fsync", "off");
conf.append("max_connections", "100");
conf.append("wal_level", "logical");

View File

@@ -76,7 +76,7 @@ pub struct LocalEnv {
// Control plane upcall API for pageserver: if None, we will not run storage_controller If set, this will
// be propagated into each pageserver's configuration.
pub control_plane_api: Url,
pub control_plane_api: Option<Url>,
// Control plane upcall API for storage controller. If set, this will be propagated into the
// storage controller's configuration.
@@ -133,7 +133,7 @@ pub struct NeonLocalInitConf {
pub storage_controller: Option<NeonStorageControllerConf>,
pub pageservers: Vec<NeonLocalInitPageserverConf>,
pub safekeepers: Vec<SafekeeperConf>,
pub control_plane_api: Option<Url>,
pub control_plane_api: Option<Option<Url>>,
pub control_plane_compute_hook_api: Option<Option<Url>>,
}
@@ -180,7 +180,7 @@ impl NeonStorageControllerConf {
const DEFAULT_MAX_WARMING_UP_INTERVAL: std::time::Duration = std::time::Duration::from_secs(30);
// Very tight heartbeat interval to speed up tests
const DEFAULT_HEARTBEAT_INTERVAL: std::time::Duration = std::time::Duration::from_millis(1000);
const DEFAULT_HEARTBEAT_INTERVAL: std::time::Duration = std::time::Duration::from_millis(100);
}
impl Default for NeonStorageControllerConf {
@@ -535,7 +535,7 @@ impl LocalEnv {
storage_controller,
pageservers,
safekeepers,
control_plane_api: control_plane_api.unwrap(),
control_plane_api,
control_plane_compute_hook_api,
branch_name_mappings,
}
@@ -638,7 +638,7 @@ impl LocalEnv {
storage_controller: self.storage_controller.clone(),
pageservers: vec![], // it's skip_serializing anyway
safekeepers: self.safekeepers.clone(),
control_plane_api: Some(self.control_plane_api.clone()),
control_plane_api: self.control_plane_api.clone(),
control_plane_compute_hook_api: self.control_plane_compute_hook_api.clone(),
branch_name_mappings: self.branch_name_mappings.clone(),
},
@@ -768,7 +768,7 @@ impl LocalEnv {
storage_controller: storage_controller.unwrap_or_default(),
pageservers: pageservers.iter().map(Into::into).collect(),
safekeepers,
control_plane_api: control_plane_api.unwrap(),
control_plane_api: control_plane_api.unwrap_or_default(),
control_plane_compute_hook_api: control_plane_compute_hook_api.unwrap_or_default(),
branch_name_mappings: Default::default(),
};

View File

@@ -95,19 +95,21 @@ impl PageServerNode {
let mut overrides = vec![pg_distrib_dir_param, broker_endpoint_param];
overrides.push(format!(
"control_plane_api='{}'",
self.env.control_plane_api.as_str()
));
if let Some(control_plane_api) = &self.env.control_plane_api {
overrides.push(format!(
"control_plane_api='{}'",
control_plane_api.as_str()
));
// Storage controller uses the same auth as pageserver: if JWT is enabled
// for us, we will also need it to talk to them.
if matches!(conf.http_auth_type, AuthType::NeonJWT) {
let jwt_token = self
.env
.generate_auth_token(&Claims::new(None, Scope::GenerationsApi))
.unwrap();
overrides.push(format!("control_plane_api_token='{}'", jwt_token));
// Storage controller uses the same auth as pageserver: if JWT is enabled
// for us, we will also need it to talk to them.
if matches!(conf.http_auth_type, AuthType::NeonJWT) {
let jwt_token = self
.env
.generate_auth_token(&Claims::new(None, Scope::GenerationsApi))
.unwrap();
overrides.push(format!("control_plane_api_token='{}'", jwt_token));
}
}
if !conf.other.contains_key("remote_storage") {

View File

@@ -338,7 +338,7 @@ impl StorageController {
.port(),
)
} else {
let listen_url = self.env.control_plane_api.clone();
let listen_url = self.env.control_plane_api.clone().unwrap();
let listen = format!(
"{}:{}",
@@ -708,7 +708,7 @@ impl StorageController {
} else {
// The configured URL has the /upcall path prefix for pageservers to use: we will strip that out
// for general purpose API access.
let listen_url = self.env.control_plane_api.clone();
let listen_url = self.env.control_plane_api.clone().unwrap();
Url::from_str(&format!(
"http://{}:{}/{path}",
listen_url.host_str().unwrap(),

View File

@@ -5,8 +5,7 @@ use clap::{Parser, Subcommand};
use pageserver_api::{
controller_api::{
AvailabilityZone, NodeAvailabilityWrapper, NodeDescribeResponse, NodeShardResponse,
SafekeeperDescribeResponse, ShardSchedulingPolicy, TenantCreateRequest,
TenantDescribeResponse, TenantPolicyRequest,
ShardSchedulingPolicy, TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest,
},
models::{
EvictionPolicy, EvictionPolicyLayerAccessThreshold, LocationConfigSecondary,
@@ -212,8 +211,6 @@ enum Command {
#[arg(long)]
timeout: humantime::Duration,
},
/// List safekeepers known to the storage controller
Safekeepers {},
}
#[derive(Parser)]
@@ -1023,31 +1020,6 @@ async fn main() -> anyhow::Result<()> {
"Fill was cancelled for node {node_id}. Schedulling policy is now {final_policy:?}"
);
}
Command::Safekeepers {} => {
let mut resp = storcon_client
.dispatch::<(), Vec<SafekeeperDescribeResponse>>(
Method::GET,
"control/v1/safekeeper".to_string(),
None,
)
.await?;
resp.sort_by(|a, b| a.id.cmp(&b.id));
let mut table = comfy_table::Table::new();
table.set_header(["Id", "Version", "Host", "Port", "Http Port", "AZ Id"]);
for sk in resp {
table.add_row([
format!("{}", sk.id),
format!("{}", sk.version),
sk.host,
format!("{}", sk.port),
format!("{}", sk.http_port),
sk.availability_zone_id.to_string(),
]);
}
println!("{table}");
}
}
Ok(())

View File

@@ -1,96 +0,0 @@
version: '3.8'
x-build-args-bullseye: &build-args-bullseye
DEBIAN_VERSION: bullseye
GIT_VERSION: local # seems to be not used in compute node though
BUILD_TAG: ${BUILD_TAG:-local}
x-build-args-bookworm: &build-args-bookworm
DEBIAN_VERSION: bookworm
GIT_VERSION: local # seems to be not used in compute node though
BUILD_TAG: ${BUILD_TAG:-local}
services:
compute-node-v14: &compute-node-v14-base
image: neondatabase/compute-node-v14:${IMAGE_TAG:-local}
build:
context: .
dockerfile: compute/compute-node.Dockerfile
args:
<<: *build-args-bullseye
PG_VERSION: v14
cache_from:
- neondatabase/compute-node-v14:${CACHE_FROM_TAG:-latest}
compute-node-v14-amd64:
<<: *compute-node-v14-base
platform: linux/amd64
image: neondatabase/compute-node-v14:${IMAGE_TAG:-local}-amd64
compute-node-v14-arm64:
<<: *compute-node-v14-base
platform: linux/arm64
image: neondatabase/compute-node-v14:${IMAGE_TAG:-local}-arm64
compute-node-v15: &compute-node-v15-base
image: neondatabase/compute-node-v15:${IMAGE_TAG:-local}
build:
context: .
dockerfile: compute/compute-node.Dockerfile
args:
<<: *build-args-bullseye
PG_VERSION: v15
cache_from:
- neondatabase/compute-node-v15:${CACHE_FROM_TAG:-latest}
compute-node-v15-amd64:
<<: *compute-node-v15-base
platform: linux/amd64
image: neondatabase/compute-node-v15:${IMAGE_TAG:-local}-amd64
compute-node-v15-arm64:
<<: *compute-node-v15-base
platform: linux/arm64
image: neondatabase/compute-node-v15:${IMAGE_TAG:-local}-arm64
compute-node-v16: &compute-node-v16-base
image: neondatabase/compute-node-v16:${IMAGE_TAG:-local}
build:
context: .
dockerfile: compute/compute-node.Dockerfile
args:
<<: *build-args-bullseye
PG_VERSION: v16
cache_from:
- neondatabase/compute-node-v16:${CACHE_FROM_TAG:-latest}
compute-node-v16-amd64:
<<: *compute-node-v16-base
platform: linux/amd64
image: neondatabase/compute-node-v16:${IMAGE_TAG:-local}-amd64
compute-node-v16-arm64:
<<: *compute-node-v16-base
platform: linux/arm64
image: neondatabase/compute-node-v16:${IMAGE_TAG:-local}-arm64
compute-node-v17: &compute-node-v17-base
image: neondatabase/compute-node-v17:${IMAGE_TAG:-local}
build:
context: .
dockerfile: compute/compute-node.Dockerfile
args:
<<: *build-args-bookworm
PG_VERSION: v17
cache_from:
- neondatabase/compute-node-v17:${CACHE_FROM_TAG:-latest}
compute-node-v17-amd64:
<<: *compute-node-v17-base
platform: linux/amd64
image: neondatabase/compute-node-v17:${IMAGE_TAG:-local}-amd64
compute-node-v17-arm64:
<<: *compute-node-v17-base
platform: linux/arm64
image: neondatabase/compute-node-v17:${IMAGE_TAG:-local}-arm64

View File

@@ -372,23 +372,6 @@ pub struct MetadataHealthListOutdatedResponse {
pub health_records: Vec<MetadataHealthRecord>,
}
/// Publicly exposed safekeeper description
///
/// The `active` flag which we have in the DB is not included on purpose: it is deprecated.
#[derive(Serialize, Deserialize, Clone)]
pub struct SafekeeperDescribeResponse {
pub id: NodeId,
pub region_id: String,
/// 1 is special, it means just created (not currently posted to storcon).
/// Zero or negative is not really expected.
/// Otherwise the number from `release-$(number_of_commits_on_branch)` tag.
pub version: i64,
pub host: String,
pub port: i32,
pub http_port: i32,
pub availability_zone_id: String,
}
#[cfg(test)]
mod test {
use super::*;

View File

@@ -6,7 +6,6 @@ pub mod utilization;
use camino::Utf8PathBuf;
pub use utilization::PageserverUtilization;
use core::ops::Range;
use std::{
collections::HashMap,
fmt::Display,
@@ -29,7 +28,6 @@ use utils::{
};
use crate::{
key::Key,
reltag::RelTag,
shard::{ShardCount, ShardStripeSize, TenantShardId},
};
@@ -212,68 +210,6 @@ pub enum TimelineState {
Broken { reason: String, backtrace: String },
}
#[serde_with::serde_as]
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct CompactLsnRange {
pub start: Lsn,
pub end: Lsn,
}
#[serde_with::serde_as]
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct CompactKeyRange {
#[serde_as(as = "serde_with::DisplayFromStr")]
pub start: Key,
#[serde_as(as = "serde_with::DisplayFromStr")]
pub end: Key,
}
impl From<Range<Lsn>> for CompactLsnRange {
fn from(range: Range<Lsn>) -> Self {
Self {
start: range.start,
end: range.end,
}
}
}
impl From<Range<Key>> for CompactKeyRange {
fn from(range: Range<Key>) -> Self {
Self {
start: range.start,
end: range.end,
}
}
}
impl From<CompactLsnRange> for Range<Lsn> {
fn from(range: CompactLsnRange) -> Self {
range.start..range.end
}
}
impl From<CompactKeyRange> for Range<Key> {
fn from(range: CompactKeyRange) -> Self {
range.start..range.end
}
}
impl CompactLsnRange {
pub fn above(lsn: Lsn) -> Self {
Self {
start: lsn,
end: Lsn::MAX,
}
}
}
#[derive(Debug, Clone, Serialize)]
pub struct CompactInfoResponse {
pub compact_key_range: Option<CompactKeyRange>,
pub compact_lsn_range: Option<CompactLsnRange>,
pub sub_compaction: bool,
}
#[derive(Serialize, Deserialize, Clone)]
pub struct TimelineCreateRequest {
pub new_timeline_id: TimelineId,

View File

@@ -106,11 +106,11 @@ impl<R: RecordGenerator> WalGenerator<R> {
const TIMELINE_ID: u32 = 1;
/// Creates a new WAL generator with the given record generator.
pub fn new(record_generator: R, start_lsn: Lsn) -> WalGenerator<R> {
pub fn new(record_generator: R) -> WalGenerator<R> {
Self {
record_generator,
lsn: start_lsn,
prev_lsn: start_lsn,
lsn: Lsn(0),
prev_lsn: Lsn(0),
}
}

View File

@@ -1,7 +1,7 @@
[package]
name = "postgres-protocol2"
version = "0.1.0"
edition = "2021"
edition = "2018"
license = "MIT/Apache-2.0"
[dependencies]

View File

@@ -9,7 +9,8 @@
//!
//! This library assumes that the `client_encoding` backend parameter has been
//! set to `UTF8`. It will most likely not behave properly if that is not the case.
#![warn(missing_docs, clippy::all)]
#![doc(html_root_url = "https://docs.rs/postgres-protocol/0.6")]
#![warn(missing_docs, rust_2018_idioms, clippy::all)]
use byteorder::{BigEndian, ByteOrder};
use bytes::{BufMut, BytesMut};

View File

@@ -3,6 +3,7 @@
use byteorder::{BigEndian, ByteOrder};
use bytes::{Buf, BufMut, BytesMut};
use std::convert::TryFrom;
use std::error::Error;
use std::io;
use std::marker;

View File

@@ -1,7 +1,7 @@
[package]
name = "postgres-types2"
version = "0.1.0"
edition = "2021"
edition = "2018"
license = "MIT/Apache-2.0"
[dependencies]

View File

@@ -2,7 +2,8 @@
//!
//! This crate is used by the `tokio-postgres` and `postgres` crates. You normally don't need to depend directly on it
//! unless you want to define your own `ToSql` or `FromSql` definitions.
#![warn(clippy::all, missing_docs)]
#![doc(html_root_url = "https://docs.rs/postgres-types/0.2")]
#![warn(clippy::all, rust_2018_idioms, missing_docs)]
use fallible_iterator::FallibleIterator;
use postgres_protocol2::types;

View File

@@ -1,7 +1,7 @@
[package]
name = "tokio-postgres2"
version = "0.1.0"
edition = "2021"
edition = "2018"
license = "MIT/Apache-2.0"
[dependencies]

View File

@@ -9,7 +9,7 @@ use std::io;
pub(crate) async fn cancel_query<T>(
config: Option<SocketConfig>,
ssl_mode: SslMode,
mut tls: T,
tls: T,
process_id: i32,
secret_key: i32,
) -> Result<(), Error>

View File

@@ -10,7 +10,7 @@ use tokio::net::TcpStream;
use tokio::sync::mpsc;
pub async fn connect<T>(
mut tls: T,
tls: T,
config: &Config,
) -> Result<(Client, Connection<TcpStream, T::Stream>), Error>
where

View File

@@ -1,5 +1,5 @@
//! An asynchronous, pipelined, PostgreSQL client.
#![warn(clippy::all)]
#![warn(rust_2018_idioms, clippy::all)]
pub use crate::cancel_token::CancelToken;
pub use crate::client::{Client, SocketConfig};

View File

@@ -46,7 +46,7 @@ pub trait MakeTlsConnect<S> {
/// Creates a new `TlsConnect`or.
///
/// The domain name is provided for certificate verification and SNI.
fn make_tls_connect(&mut self, domain: &str) -> Result<Self::TlsConnect, Self::Error>;
fn make_tls_connect(self, domain: &str) -> Result<Self::TlsConnect, Self::Error>;
}
/// An asynchronous function wrapping a stream in a TLS session.
@@ -84,7 +84,7 @@ impl<S> MakeTlsConnect<S> for NoTls {
type TlsConnect = NoTls;
type Error = NoTlsError;
fn make_tls_connect(&mut self, _: &str) -> Result<NoTls, NoTlsError> {
fn make_tls_connect(self, _: &str) -> Result<NoTls, NoTlsError> {
Ok(NoTls)
}
}

View File

@@ -97,8 +97,8 @@ use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError};
use crate::DEFAULT_PG_VERSION;
use crate::{disk_usage_eviction_task, tenant};
use pageserver_api::models::{
CompactInfoResponse, StatusResponse, TenantConfigRequest, TenantInfo, TimelineCreateRequest,
TimelineGcRequest, TimelineInfo,
StatusResponse, TenantConfigRequest, TenantInfo, TimelineCreateRequest, TimelineGcRequest,
TimelineInfo,
};
use utils::{
auth::SwappableJwtAuth,
@@ -2039,34 +2039,6 @@ async fn timeline_cancel_compact_handler(
.await
}
// Get compact info of a timeline
async fn timeline_compact_info_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let state = get_state(&request);
async {
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
let res = tenant.get_scheduled_compaction_tasks(timeline_id);
let mut resp = Vec::new();
for item in res {
resp.push(CompactInfoResponse {
compact_key_range: item.compact_key_range,
compact_lsn_range: item.compact_lsn_range,
sub_compaction: item.sub_compaction,
});
}
json_response(StatusCode::OK, resp)
}
.instrument(info_span!("timeline_compact_info", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
.await
}
// Run compaction immediately on given timeline.
async fn timeline_compact_handler(
mut request: Request<Body>,
@@ -3428,10 +3400,6 @@ pub fn make_router(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/do_gc",
|r| api_handler(r, timeline_gc_handler),
)
.get(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/compact",
|r| api_handler(r, timeline_compact_info_handler),
)
.put(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/compact",
|r| api_handler(r, timeline_compact_handler),

View File

@@ -3122,23 +3122,6 @@ impl Tenant {
}
}
pub(crate) fn get_scheduled_compaction_tasks(
&self,
timeline_id: TimelineId,
) -> Vec<CompactOptions> {
use itertools::Itertools;
let guard = self.scheduled_compaction_tasks.lock().unwrap();
guard
.get(&timeline_id)
.map(|tline_pending_tasks| {
tline_pending_tasks
.iter()
.map(|x| x.options.clone())
.collect_vec()
})
.unwrap_or_default()
}
/// Schedule a compaction task for a timeline.
pub(crate) async fn schedule_compaction(
&self,
@@ -5776,13 +5759,13 @@ mod tests {
use timeline::{CompactOptions, DeltaLayerTestDesc};
use utils::id::TenantId;
#[cfg(feature = "testing")]
use models::CompactLsnRange;
#[cfg(feature = "testing")]
use pageserver_api::record::NeonWalRecord;
#[cfg(feature = "testing")]
use timeline::compaction::{KeyHistoryRetention, KeyLogAtLsn};
#[cfg(feature = "testing")]
use timeline::CompactLsnRange;
#[cfg(feature = "testing")]
use timeline::GcInfo;
static TEST_KEY: Lazy<Key> =
@@ -9651,7 +9634,7 @@ mod tests {
#[cfg(feature = "testing")]
#[tokio::test]
async fn test_simple_bottom_most_compaction_on_branch() -> anyhow::Result<()> {
use models::CompactLsnRange;
use timeline::CompactLsnRange;
let harness = TenantHarness::create("test_simple_bottom_most_compaction_on_branch").await?;
let (tenant, ctx) = harness.load().await;

View File

@@ -1,15 +1,12 @@
use std::collections::BTreeSet;
use itertools::Itertools;
use pageserver_compaction::helpers::overlaps_with;
use super::storage_layer::LayerName;
/// Checks whether a layer map is valid (i.e., is a valid result of the current compaction algorithm if nothing goes wrong).
///
/// The function implements a fast path check and a slow path check.
///
/// The fast path checks if we can split the LSN range of a delta layer only at the LSNs of the delta layers. For example,
/// The function checks if we can split the LSN range of a delta layer only at the LSNs of the delta layers. For example,
///
/// ```plain
/// | | | |
@@ -28,47 +25,31 @@ use super::storage_layer::LayerName;
/// | | | 4 | | |
///
/// If layer 2 and 4 contain the same single key, this is also a valid layer map.
///
/// However, if a partial compaction is still going on, it is possible that we get a layer map not satisfying the above condition.
/// Therefore, we fallback to simply check if any of the two delta layers overlap. (See "A slow path...")
pub fn check_valid_layermap(metadata: &[LayerName]) -> Option<String> {
let mut lsn_split_point = BTreeSet::new(); // TODO: use a better data structure (range tree / range set?)
let mut all_delta_layers = Vec::new();
for name in metadata {
if let LayerName::Delta(layer) = name {
all_delta_layers.push(layer.clone());
if layer.key_range.start.next() != layer.key_range.end {
all_delta_layers.push(layer.clone());
}
}
}
for layer in &all_delta_layers {
if layer.key_range.start.next() != layer.key_range.end {
let lsn_range = &layer.lsn_range;
lsn_split_point.insert(lsn_range.start);
lsn_split_point.insert(lsn_range.end);
}
let lsn_range = &layer.lsn_range;
lsn_split_point.insert(lsn_range.start);
lsn_split_point.insert(lsn_range.end);
}
for (idx, layer) in all_delta_layers.iter().enumerate() {
if layer.key_range.start.next() == layer.key_range.end {
continue;
}
for layer in &all_delta_layers {
let lsn_range = layer.lsn_range.clone();
let intersects = lsn_split_point.range(lsn_range).collect_vec();
if intersects.len() > 1 {
// A slow path to check if the layer intersects with any other delta layer.
for (other_idx, other_layer) in all_delta_layers.iter().enumerate() {
if other_idx == idx {
// do not check self intersects with self
continue;
}
if overlaps_with(&layer.lsn_range, &other_layer.lsn_range)
&& overlaps_with(&layer.key_range, &other_layer.key_range)
{
let err = format!(
"layer violates the layer map LSN split assumption: layer {} intersects with layer {}",
layer, other_layer
);
return Some(err);
}
}
let err = format!(
"layer violates the layer map LSN split assumption: layer {} intersects with LSN [{}]",
layer,
intersects.into_iter().map(|lsn| lsn.to_string()).join(", ")
);
return Some(err);
}
}
None

View File

@@ -31,9 +31,9 @@ use pageserver_api::{
},
keyspace::{KeySpaceAccum, KeySpaceRandomAccum, SparseKeyPartitioning},
models::{
CompactKeyRange, CompactLsnRange, CompactionAlgorithm, CompactionAlgorithmSettings,
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy,
InMemoryLayerInfo, LayerMapInfo, LsnLease, TimelineState,
CompactionAlgorithm, CompactionAlgorithmSettings, DownloadRemoteLayersTaskInfo,
DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy, InMemoryLayerInfo, LayerMapInfo,
LsnLease, TimelineState,
},
reltag::BlockNumber,
shard::{ShardIdentity, ShardNumber, TenantShardId},
@@ -788,6 +788,63 @@ pub(crate) struct CompactRequest {
pub sub_compaction_max_job_size_mb: Option<u64>,
}
#[serde_with::serde_as]
#[derive(Debug, Clone, serde::Deserialize)]
pub(crate) struct CompactLsnRange {
pub start: Lsn,
pub end: Lsn,
}
#[serde_with::serde_as]
#[derive(Debug, Clone, serde::Deserialize)]
pub(crate) struct CompactKeyRange {
#[serde_as(as = "serde_with::DisplayFromStr")]
pub start: Key,
#[serde_as(as = "serde_with::DisplayFromStr")]
pub end: Key,
}
impl From<Range<Lsn>> for CompactLsnRange {
fn from(range: Range<Lsn>) -> Self {
Self {
start: range.start,
end: range.end,
}
}
}
impl From<Range<Key>> for CompactKeyRange {
fn from(range: Range<Key>) -> Self {
Self {
start: range.start,
end: range.end,
}
}
}
impl From<CompactLsnRange> for Range<Lsn> {
fn from(range: CompactLsnRange) -> Self {
range.start..range.end
}
}
impl From<CompactKeyRange> for Range<Key> {
fn from(range: CompactKeyRange) -> Self {
range.start..range.end
}
}
impl CompactLsnRange {
#[cfg(test)]
#[cfg(feature = "testing")]
pub fn above(lsn: Lsn) -> Self {
Self {
start: lsn,
end: Lsn::MAX,
}
}
}
#[derive(Debug, Clone, Default)]
pub(crate) struct CompactOptions {
pub flags: EnumSet<CompactFlags>,

View File

@@ -29,7 +29,6 @@ use utils::id::TimelineId;
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
use crate::page_cache;
use crate::statvfs::Statvfs;
use crate::tenant::checks::check_valid_layermap;
use crate::tenant::remote_timeline_client::WaitCompletionError;
use crate::tenant::storage_layer::batch_split_writer::{
BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter,
@@ -1824,7 +1823,7 @@ impl Timeline {
// by estimating the amount of files read for a compaction job. We should also partition on LSN.
let ((dense_ks, sparse_ks), _) = {
let Ok(partition) = self.partitioning.try_lock() else {
bail!("failed to acquire partition lock during gc-compaction");
bail!("failed to acquire partition lock");
};
partition.clone()
};
@@ -2157,14 +2156,15 @@ impl Timeline {
// Step 1: construct a k-merge iterator over all layers.
// Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point.
let layer_names = job_desc
.selected_layers
.iter()
.map(|layer| layer.layer_desc().layer_name())
.collect_vec();
if let Some(err) = check_valid_layermap(&layer_names) {
bail!("gc-compaction layer map check failed because {}, cannot proceed with compaction due to potential data loss", err);
}
// disable the check for now because we need to adjust the check for partial compactions, will enable later.
// let layer_names = job_desc
// .selected_layers
// .iter()
// .map(|layer| layer.layer_desc().layer_name())
// .collect_vec();
// if let Some(err) = check_valid_layermap(&layer_names) {
// warn!("gc-compaction layer map check failed because {}, this is normal if partial compaction is not finished yet", err);
// }
// The maximum LSN we are processing in this compaction loop
let end_lsn = job_desc
.selected_layers
@@ -2546,48 +2546,13 @@ impl Timeline {
);
// Step 3: Place back to the layer map.
// First, do a sanity check to ensure the newly-created layer map does not contain overlaps.
let all_layers = {
let guard = self.layers.read().await;
let layer_map = guard.layer_map()?;
layer_map.iter_historic_layers().collect_vec()
};
let mut final_layers = all_layers
.iter()
.map(|layer| layer.layer_name())
.collect::<HashSet<_>>();
for layer in &layer_selection {
final_layers.remove(&layer.layer_desc().layer_name());
}
for layer in &compact_to {
final_layers.insert(layer.layer_desc().layer_name());
}
let final_layers = final_layers.into_iter().collect_vec();
// TODO: move this check before we call `finish` on image layer writers. However, this will require us to get the layer name before we finish
// the writer, so potentially, we will need a function like `ImageLayerBatchWriter::get_all_pending_layer_keys` to get all the keys that are
// in the writer before finalizing the persistent layers. Now we would leave some dangling layers on the disk if the check fails.
if let Some(err) = check_valid_layermap(&final_layers) {
bail!("gc-compaction layer map check failed after compaction because {}, compaction result not applied to the layer map due to potential data loss", err);
}
// Between the sanity check and this compaction update, there could be new layers being flushed, but it should be fine because we only
// operate on L1 layers.
{
// TODO: sanity check if the layer map is valid (i.e., should not have overlaps)
let mut guard = self.layers.write().await;
guard
.open_mut()?
.finish_gc_compaction(&layer_selection, &compact_to, &self.metrics)
};
// Schedule an index-only upload to update the `latest_gc_cutoff` in the index_part.json.
// Otherwise, after restart, the index_part only contains the old `latest_gc_cutoff` and
// find_gc_cutoffs will try accessing things below the cutoff. TODO: ideally, this should
// be batched into `schedule_compaction_update`.
let disk_consistent_lsn = self.disk_consistent_lsn.load();
self.schedule_uploads(disk_consistent_lsn, None)?;
self.remote_client
.schedule_compaction_update(&layer_selection, &compact_to)?;

View File

@@ -827,6 +827,7 @@ pageserver_send(shardno_t shard_no, NeonRequest *request)
{
while (!pageserver_connect(shard_no, shard->n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR))
{
HandleMainLoopInterrupts();
shard->n_reconnect_attempts += 1;
}
shard->n_reconnect_attempts = 0;

View File

@@ -678,9 +678,6 @@ mod tests {
.await
.unwrap();
// flush the final server message
stream.flush().await.unwrap();
handle.await.unwrap();
}

View File

@@ -13,7 +13,9 @@ use proxy::auth::backend::jwt::JwkCache;
use proxy::auth::backend::local::{LocalBackend, JWKS_ROLE_MAP};
use proxy::auth::{self};
use proxy::cancellation::CancellationHandlerMain;
use proxy::config::{self, AuthenticationConfig, HttpConfig, ProxyConfig, RetryConfig};
use proxy::config::{
self, AuthenticationConfig, ComputeConfig, HttpConfig, ProxyConfig, RetryConfig,
};
use proxy::control_plane::locks::ApiLocks;
use proxy::control_plane::messages::{EndpointJwksResponse, JwksSettings};
use proxy::http::health_server::AppMetrics;
@@ -32,6 +34,8 @@ project_git_version!(GIT_VERSION);
project_build_tag!(BUILD_TAG);
use clap::Parser;
use rustls::crypto::ring;
use rustls::RootCertStore;
use thiserror::Error;
use tokio::net::TcpListener;
use tokio::sync::Notify;
@@ -209,6 +213,7 @@ async fn main() -> anyhow::Result<()> {
http_listener,
shutdown.clone(),
Arc::new(CancellationHandlerMain::new(
&config.connect_to_compute,
Arc::new(DashMap::new()),
None,
proxy::metrics::CancellationSource::Local,
@@ -268,6 +273,22 @@ fn build_config(args: &LocalProxyCliArgs) -> anyhow::Result<&'static ProxyConfig
max_response_size_bytes: args.sql_over_http.sql_over_http_max_response_size_bytes,
};
// local_proxy won't use TLS to talk to postgres.
let root_store = RootCertStore::empty();
let client_config =
rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider()))
.with_safe_default_protocol_versions()
.expect("ring should support the default protocol versions")
.with_root_certificates(root_store)
.with_no_client_auth();
let compute_config = ComputeConfig {
retry: RetryConfig::parse(RetryConfig::CONNECT_TO_COMPUTE_DEFAULT_VALUES)?,
tls: Arc::new(client_config).into(),
timeout: Duration::from_secs(2),
};
Ok(Box::leak(Box::new(ProxyConfig {
tls_config: None,
metric_collection: None,
@@ -289,9 +310,7 @@ fn build_config(args: &LocalProxyCliArgs) -> anyhow::Result<&'static ProxyConfig
region: "local".into(),
wake_compute_retry_config: RetryConfig::parse(RetryConfig::WAKE_COMPUTE_DEFAULT_VALUES)?,
connect_compute_locks,
connect_to_compute_retry_config: RetryConfig::parse(
RetryConfig::CONNECT_TO_COMPUTE_DEFAULT_VALUES,
)?,
connect_to_compute: compute_config,
})))
}

View File

@@ -1,14 +1,15 @@
use std::net::SocketAddr;
use std::pin::pin;
use std::sync::Arc;
use std::time::Duration;
use anyhow::bail;
use anyhow::{bail, Context};
use futures::future::Either;
use proxy::auth::backend::jwt::JwkCache;
use proxy::auth::backend::{AuthRateLimiter, ConsoleRedirectBackend, MaybeOwned};
use proxy::cancellation::{CancelMap, CancellationHandler};
use proxy::config::{
self, remote_storage_from_toml, AuthenticationConfig, CacheOptions, HttpConfig,
self, remote_storage_from_toml, AuthenticationConfig, CacheOptions, ComputeConfig, HttpConfig,
ProjectInfoCacheOptions, ProxyConfig, ProxyProtocolV2,
};
use proxy::context::parquet::ParquetUploadArgs;
@@ -25,6 +26,7 @@ use proxy::serverless::cancel_set::CancelSet;
use proxy::serverless::GlobalConnPoolOptions;
use proxy::{auth, control_plane, http, serverless, usage_metrics};
use remote_storage::RemoteStorageConfig;
use rustls::crypto::ring;
use tokio::net::TcpListener;
use tokio::sync::Mutex;
use tokio::task::JoinSet;
@@ -397,6 +399,7 @@ async fn main() -> anyhow::Result<()> {
let cancellation_handler = Arc::new(CancellationHandler::<
Option<Arc<Mutex<RedisPublisherClient>>>,
>::new(
&config.connect_to_compute,
cancel_map.clone(),
redis_publisher,
proxy::metrics::CancellationSource::FromClient,
@@ -492,6 +495,7 @@ async fn main() -> anyhow::Result<()> {
let cache = api.caches.project_info.clone();
if let Some(client) = client1 {
maintenance_tasks.spawn(notifications::task_main(
config,
client,
cache.clone(),
cancel_map.clone(),
@@ -500,6 +504,7 @@ async fn main() -> anyhow::Result<()> {
}
if let Some(client) = client2 {
maintenance_tasks.spawn(notifications::task_main(
config,
client,
cache.clone(),
cancel_map.clone(),
@@ -632,6 +637,23 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
console_redirect_confirmation_timeout: args.webauth_confirmation_timeout,
};
let root_store = load_certs()
.context("loading native tls certificates")?
.clone();
let client_config =
rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider()))
.with_safe_default_protocol_versions()
.expect("ring should support the default protocol versions")
.with_root_certificates(root_store)
.with_no_client_auth();
let compute_config = ComputeConfig {
retry: config::RetryConfig::parse(&args.connect_to_compute_retry)?,
tls: Arc::new(client_config).into(),
timeout: Duration::from_secs(2),
};
let config = ProxyConfig {
tls_config,
metric_collection,
@@ -642,9 +664,7 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
region: args.region.clone(),
wake_compute_retry_config: config::RetryConfig::parse(&args.wake_compute_retry)?,
connect_compute_locks,
connect_to_compute_retry_config: config::RetryConfig::parse(
&args.connect_to_compute_retry,
)?,
connect_to_compute: compute_config,
};
let config = Box::leak(Box::new(config));
@@ -654,6 +674,18 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
Ok(config)
}
pub(crate) fn load_certs() -> anyhow::Result<Arc<rustls::RootCertStore>> {
let der_certs = rustls_native_certs::load_native_certs();
if !der_certs.errors.is_empty() {
bail!("could not parse certificates: {:?}", der_certs.errors);
}
let mut store = rustls::RootCertStore::empty();
store.add_parsable_certificates(der_certs.certs);
Ok(Arc::new(store))
}
/// auth::Backend is created at proxy startup, and lives forever.
fn build_auth_backend(
args: &ProxyCliArgs,

View File

@@ -3,11 +3,9 @@ use std::sync::Arc;
use dashmap::DashMap;
use ipnet::{IpNet, Ipv4Net, Ipv6Net};
use once_cell::sync::OnceCell;
use postgres_client::tls::MakeTlsConnect;
use postgres_client::CancelToken;
use pq_proto::CancelKeyData;
use rustls::crypto::ring;
use thiserror::Error;
use tokio::net::TcpStream;
use tokio::sync::Mutex;
@@ -15,7 +13,7 @@ use tracing::{debug, info};
use uuid::Uuid;
use crate::auth::{check_peer_addr_is_in_list, IpPattern};
use crate::compute::load_certs;
use crate::config::ComputeConfig;
use crate::error::ReportableError;
use crate::ext::LockExt;
use crate::metrics::{CancellationRequest, CancellationSource, Metrics};
@@ -35,6 +33,7 @@ type IpSubnetKey = IpNet;
///
/// If `CancellationPublisher` is available, cancel request will be used to publish the cancellation key to other proxy instances.
pub struct CancellationHandler<P> {
compute_config: &'static ComputeConfig,
map: CancelMap,
client: P,
/// This field used for the monitoring purposes.
@@ -183,7 +182,7 @@ impl<P: CancellationPublisher> CancellationHandler<P> {
"cancelling query per user's request using key {key}, hostname {}, address: {}",
cancel_closure.hostname, cancel_closure.socket_addr
);
cancel_closure.try_cancel_query().await
cancel_closure.try_cancel_query(self.compute_config).await
}
#[cfg(test)]
@@ -198,8 +197,13 @@ impl<P: CancellationPublisher> CancellationHandler<P> {
}
impl CancellationHandler<()> {
pub fn new(map: CancelMap, from: CancellationSource) -> Self {
pub fn new(
compute_config: &'static ComputeConfig,
map: CancelMap,
from: CancellationSource,
) -> Self {
Self {
compute_config,
map,
client: (),
from,
@@ -214,8 +218,14 @@ impl CancellationHandler<()> {
}
impl<P: CancellationPublisherMut> CancellationHandler<Option<Arc<Mutex<P>>>> {
pub fn new(map: CancelMap, client: Option<Arc<Mutex<P>>>, from: CancellationSource) -> Self {
pub fn new(
compute_config: &'static ComputeConfig,
map: CancelMap,
client: Option<Arc<Mutex<P>>>,
from: CancellationSource,
) -> Self {
Self {
compute_config,
map,
client,
from,
@@ -229,8 +239,6 @@ impl<P: CancellationPublisherMut> CancellationHandler<Option<Arc<Mutex<P>>>> {
}
}
static TLS_ROOTS: OnceCell<Arc<rustls::RootCertStore>> = OnceCell::new();
/// This should've been a [`std::future::Future`], but
/// it's impossible to name a type of an unboxed future
/// (we'd need something like `#![feature(type_alias_impl_trait)]`).
@@ -257,29 +265,14 @@ impl CancelClosure {
}
}
/// Cancels the query running on user's compute node.
pub(crate) async fn try_cancel_query(self) -> Result<(), CancelError> {
pub(crate) async fn try_cancel_query(
self,
compute_config: &ComputeConfig,
) -> Result<(), CancelError> {
let socket = TcpStream::connect(self.socket_addr).await?;
let root_store = TLS_ROOTS
.get_or_try_init(load_certs)
.map_err(|_e| {
CancelError::IO(std::io::Error::new(
std::io::ErrorKind::Other,
"TLS root store initialization failed".to_string(),
))
})?
.clone();
let client_config =
rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider()))
.with_safe_default_protocol_versions()
.expect("ring should support the default protocol versions")
.with_root_certificates(root_store)
.with_no_client_auth();
let mut mk_tls = crate::postgres_rustls::MakeRustlsConnect::new(client_config);
let tls = <MakeRustlsConnect as MakeTlsConnect<tokio::net::TcpStream>>::make_tls_connect(
&mut mk_tls,
crate::postgres_rustls::MakeRustlsConnect::new(&compute_config.tls),
&self.hostname,
)
.map_err(|e| {
@@ -329,11 +322,41 @@ impl<P> Drop for Session<P> {
#[cfg(test)]
#[expect(clippy::unwrap_used)]
mod tests {
use std::time::Duration;
use rustls::crypto::ring;
use rustls::RootCertStore;
use super::*;
use crate::config::RetryConfig;
fn config() -> ComputeConfig {
let retry = RetryConfig {
base_delay: Duration::from_secs(1),
max_retries: 5,
backoff_factor: 2.0,
};
let root_store = RootCertStore::empty();
let client_config =
rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider()))
.with_safe_default_protocol_versions()
.expect("ring should support the default protocol versions")
.with_root_certificates(root_store)
.with_no_client_auth();
ComputeConfig {
retry,
tls: Arc::new(client_config).into(),
timeout: Duration::from_secs(2),
}
}
#[tokio::test]
async fn check_session_drop() -> anyhow::Result<()> {
let cancellation_handler = Arc::new(CancellationHandler::<()>::new(
Box::leak(Box::new(config())),
CancelMap::default(),
CancellationSource::FromRedis,
));
@@ -349,8 +372,11 @@ mod tests {
#[tokio::test]
async fn cancel_session_noop_regression() {
let handler =
CancellationHandler::<()>::new(CancelMap::default(), CancellationSource::Local);
let handler = CancellationHandler::<()>::new(
Box::leak(Box::new(config())),
CancelMap::default(),
CancellationSource::Local,
);
handler
.cancel_session(
CancelKeyData {

View File

@@ -1,16 +1,13 @@
use std::io;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use futures::{FutureExt, TryFutureExt};
use itertools::Itertools;
use once_cell::sync::OnceCell;
use postgres_client::tls::MakeTlsConnect;
use postgres_client::{CancelToken, RawConnection};
use postgres_protocol::message::backend::NoticeResponseBody;
use pq_proto::StartupMessageParams;
use rustls::crypto::ring;
use rustls::pki_types::InvalidDnsNameError;
use thiserror::Error;
use tokio::net::TcpStream;
@@ -18,6 +15,7 @@ use tracing::{debug, error, info, warn};
use crate::auth::parse_endpoint_param;
use crate::cancellation::CancelClosure;
use crate::config::ComputeConfig;
use crate::context::RequestContext;
use crate::control_plane::client::ApiLockError;
use crate::control_plane::errors::WakeComputeError;
@@ -40,9 +38,6 @@ pub(crate) enum ConnectionError {
#[error("{COULD_NOT_CONNECT}: {0}")]
CouldNotConnect(#[from] io::Error),
#[error("Couldn't load native TLS certificates: {0:?}")]
TlsCertificateError(Vec<rustls_native_certs::Error>),
#[error("{COULD_NOT_CONNECT}: {0}")]
TlsError(#[from] InvalidDnsNameError),
@@ -89,7 +84,6 @@ impl ReportableError for ConnectionError {
}
ConnectionError::Postgres(_) => crate::error::ErrorKind::Compute,
ConnectionError::CouldNotConnect(_) => crate::error::ErrorKind::Compute,
ConnectionError::TlsCertificateError(_) => crate::error::ErrorKind::Service,
ConnectionError::TlsError(_) => crate::error::ErrorKind::Compute,
ConnectionError::WakeComputeError(e) => e.get_error_kind(),
ConnectionError::TooManyConnectionAttempts(e) => e.get_error_kind(),
@@ -227,7 +221,7 @@ impl ConnCfg {
}
}
type RustlsStream = <MakeRustlsConnect as MakeTlsConnect<tokio::net::TcpStream>>::Stream;
type RustlsStream = crate::postgres_rustls::RustlsStream<tokio::net::TcpStream>;
pub(crate) struct PostgresConnection {
/// Socket connected to a compute node.
@@ -251,27 +245,14 @@ impl ConnCfg {
&self,
ctx: &RequestContext,
aux: MetricsAuxInfo,
timeout: Duration,
config: &ComputeConfig,
) -> Result<PostgresConnection, ConnectionError> {
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
let (socket_addr, stream, host) = self.connect_raw(timeout).await?;
let (socket_addr, stream, host) = self.connect_raw(config.timeout).await?;
drop(pause);
let root_store = TLS_ROOTS
.get_or_try_init(load_certs)
.map_err(ConnectionError::TlsCertificateError)?
.clone();
let client_config =
rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider()))
.with_safe_default_protocol_versions()
.expect("ring should support the default protocol versions")
.with_root_certificates(root_store)
.with_no_client_auth();
let mut mk_tls = crate::postgres_rustls::MakeRustlsConnect::new(client_config);
let tls = <MakeRustlsConnect as MakeTlsConnect<tokio::net::TcpStream>>::make_tls_connect(
&mut mk_tls,
crate::postgres_rustls::MakeRustlsConnect::new(&config.tls),
host,
)?;
@@ -341,19 +322,6 @@ fn filtered_options(options: &str) -> Option<String> {
Some(options)
}
pub(crate) fn load_certs() -> Result<Arc<rustls::RootCertStore>, Vec<rustls_native_certs::Error>> {
let der_certs = rustls_native_certs::load_native_certs();
if !der_certs.errors.is_empty() {
return Err(der_certs.errors);
}
let mut store = rustls::RootCertStore::empty();
store.add_parsable_certificates(der_certs.certs);
Ok(Arc::new(store))
}
static TLS_ROOTS: OnceCell<Arc<rustls::RootCertStore>> = OnceCell::new();
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -10,6 +10,7 @@ use remote_storage::RemoteStorageConfig;
use rustls::crypto::ring::{self, sign};
use rustls::pki_types::{CertificateDer, PrivateKeyDer};
use sha2::{Digest, Sha256};
use tokio_rustls::TlsConnector;
use tracing::{error, info};
use x509_parser::oid_registry;
@@ -32,7 +33,13 @@ pub struct ProxyConfig {
pub handshake_timeout: Duration,
pub wake_compute_retry_config: RetryConfig,
pub connect_compute_locks: ApiLocks<Host>,
pub connect_to_compute_retry_config: RetryConfig,
pub connect_to_compute: ComputeConfig,
}
pub struct ComputeConfig {
pub retry: RetryConfig,
pub tls: TlsConnector,
pub timeout: Duration,
}
#[derive(Copy, Clone, Debug, ValueEnum, PartialEq)]

View File

@@ -115,7 +115,7 @@ pub async fn task_main(
Ok(Some(p)) => {
ctx.set_success();
let _disconnect = ctx.log_connect();
match p.proxy_pass().await {
match p.proxy_pass(&config.connect_to_compute).await {
Ok(()) => {}
Err(ErrorSource::Client(e)) => {
error!(?session_id, "per-client task finished with an IO error from the client: {e:#}");
@@ -216,7 +216,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
},
&user_info,
config.wake_compute_retry_config,
config.connect_to_compute_retry_config,
&config.connect_to_compute,
)
.or_else(|e| stream.throw_error(e))
.await?;

View File

@@ -10,13 +10,13 @@ pub mod client;
pub(crate) mod errors;
use std::sync::Arc;
use std::time::Duration;
use crate::auth::backend::jwt::AuthRule;
use crate::auth::backend::{ComputeCredentialKeys, ComputeUserInfo};
use crate::auth::IpPattern;
use crate::cache::project_info::ProjectInfoCacheImpl;
use crate::cache::{Cached, TimedLru};
use crate::config::ComputeConfig;
use crate::context::RequestContext;
use crate::control_plane::messages::{ControlPlaneErrorMessage, MetricsAuxInfo};
use crate::intern::ProjectIdInt;
@@ -73,9 +73,9 @@ impl NodeInfo {
pub(crate) async fn connect(
&self,
ctx: &RequestContext,
timeout: Duration,
config: &ComputeConfig,
) -> Result<compute::PostgresConnection, compute::ConnectionError> {
self.config.connect(ctx, self.aux.clone(), timeout).await
self.config.connect(ctx, self.aux.clone(), config).await
}
pub(crate) fn reuse_settings(&mut self, other: Self) {

View File

@@ -1,10 +1,10 @@
use std::convert::TryFrom;
use std::sync::Arc;
use postgres_client::tls::MakeTlsConnect;
pub use private::RustlsStream;
use rustls::pki_types::ServerName;
use rustls::ClientConfig;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_rustls::TlsConnector;
mod private {
use std::future::Future;
@@ -35,14 +35,12 @@ mod private {
}
}
pub struct RustlsConnect(pub RustlsConnectData);
pub struct RustlsConnectData {
pub struct RustlsConnectData<'a> {
pub hostname: ServerName<'static>,
pub connector: TlsConnector,
pub connector: &'a TlsConnector,
}
impl<S> TlsConnect<S> for RustlsConnect
impl<S> TlsConnect<S> for RustlsConnectData<'_>
where
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
@@ -52,7 +50,7 @@ mod private {
fn connect(self, stream: S) -> Self::Future {
TlsConnectFuture {
inner: self.0.connector.connect(self.0.hostname, stream),
inner: self.connector.connect(self.hostname, stream),
}
}
}
@@ -124,35 +122,30 @@ mod private {
/// A `MakeTlsConnect` implementation using `rustls`.
///
/// That way you can connect to PostgreSQL using `rustls` as the TLS stack.
#[derive(Clone)]
pub struct MakeRustlsConnect {
config: Arc<ClientConfig>,
pub struct MakeRustlsConnect<'a> {
pub connector: &'a TlsConnector,
}
impl MakeRustlsConnect {
/// Creates a new `MakeRustlsConnect` from the provided `ClientConfig`.
impl<'a> MakeRustlsConnect<'a> {
/// Creates a new `MakeRustlsConnect` from the provided `TlsConnector`.
#[must_use]
pub fn new(config: ClientConfig) -> Self {
Self {
config: Arc::new(config),
}
pub fn new(connector: &'a TlsConnector) -> Self {
Self { connector }
}
}
impl<S> MakeTlsConnect<S> for MakeRustlsConnect
impl<'a, S> MakeTlsConnect<S> for MakeRustlsConnect<'a>
where
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
type Stream = private::RustlsStream<S>;
type TlsConnect = private::RustlsConnect;
type TlsConnect = private::RustlsConnectData<'a>;
type Error = rustls::pki_types::InvalidDnsNameError;
fn make_tls_connect(&mut self, hostname: &str) -> Result<Self::TlsConnect, Self::Error> {
ServerName::try_from(hostname).map(|dns_name| {
private::RustlsConnect(private::RustlsConnectData {
hostname: dns_name.to_owned(),
connector: Arc::clone(&self.config).into(),
})
fn make_tls_connect(self, hostname: &str) -> Result<Self::TlsConnect, Self::Error> {
ServerName::try_from(hostname).map(|dns_name| private::RustlsConnectData {
hostname: dns_name.to_owned(),
connector: self.connector,
})
}
}

View File

@@ -6,7 +6,7 @@ use tracing::{debug, info, warn};
use super::retry::ShouldRetryWakeCompute;
use crate::auth::backend::ComputeCredentialKeys;
use crate::compute::{self, PostgresConnection, COULD_NOT_CONNECT};
use crate::config::RetryConfig;
use crate::config::{ComputeConfig, RetryConfig};
use crate::context::RequestContext;
use crate::control_plane::errors::WakeComputeError;
use crate::control_plane::locks::ApiLocks;
@@ -19,8 +19,6 @@ use crate::proxy::retry::{retry_after, should_retry, CouldRetry};
use crate::proxy::wake_compute::wake_compute;
use crate::types::Host;
const CONNECT_TIMEOUT: time::Duration = time::Duration::from_secs(2);
/// If we couldn't connect, a cached connection info might be to blame
/// (e.g. the compute node's address might've changed at the wrong time).
/// Invalidate the cache entry (if any) to prevent subsequent errors.
@@ -49,7 +47,7 @@ pub(crate) trait ConnectMechanism {
&self,
ctx: &RequestContext,
node_info: &control_plane::CachedNodeInfo,
timeout: time::Duration,
config: &ComputeConfig,
) -> Result<Self::Connection, Self::ConnectError>;
fn update_connect_config(&self, conf: &mut compute::ConnCfg);
@@ -86,11 +84,11 @@ impl ConnectMechanism for TcpMechanism<'_> {
&self,
ctx: &RequestContext,
node_info: &control_plane::CachedNodeInfo,
timeout: time::Duration,
config: &ComputeConfig,
) -> Result<PostgresConnection, Self::Error> {
let host = node_info.config.get_host();
let permit = self.locks.get_permit(&host).await?;
permit.release_result(node_info.connect(ctx, timeout).await)
permit.release_result(node_info.connect(ctx, config).await)
}
fn update_connect_config(&self, config: &mut compute::ConnCfg) {
@@ -105,7 +103,7 @@ pub(crate) async fn connect_to_compute<M: ConnectMechanism, B: ComputeConnectBac
mechanism: &M,
user_info: &B,
wake_compute_retry_config: RetryConfig,
connect_to_compute_retry_config: RetryConfig,
compute: &ComputeConfig,
) -> Result<M::Connection, M::Error>
where
M::ConnectError: CouldRetry + ShouldRetryWakeCompute + std::fmt::Debug,
@@ -119,10 +117,7 @@ where
mechanism.update_connect_config(&mut node_info.config);
// try once
let err = match mechanism
.connect_once(ctx, &node_info, CONNECT_TIMEOUT)
.await
{
let err = match mechanism.connect_once(ctx, &node_info, compute).await {
Ok(res) => {
ctx.success();
Metrics::get().proxy.retries_metric.observe(
@@ -142,7 +137,7 @@ where
let node_info = if !node_info.cached() || !err.should_retry_wake_compute() {
// If we just recieved this from cplane and didn't get it from cache, we shouldn't retry.
// Do not need to retrieve a new node_info, just return the old one.
if should_retry(&err, num_retries, connect_to_compute_retry_config) {
if should_retry(&err, num_retries, compute.retry) {
Metrics::get().proxy.retries_metric.observe(
RetriesMetricGroup {
outcome: ConnectOutcome::Failed,
@@ -172,10 +167,7 @@ where
debug!("wake_compute success. attempting to connect");
num_retries = 1;
loop {
match mechanism
.connect_once(ctx, &node_info, CONNECT_TIMEOUT)
.await
{
match mechanism.connect_once(ctx, &node_info, compute).await {
Ok(res) => {
ctx.success();
Metrics::get().proxy.retries_metric.observe(
@@ -190,7 +182,7 @@ where
return Ok(res);
}
Err(e) => {
if !should_retry(&e, num_retries, connect_to_compute_retry_config) {
if !should_retry(&e, num_retries, compute.retry) {
// Don't log an error here, caller will print the error
Metrics::get().proxy.retries_metric.observe(
RetriesMetricGroup {
@@ -206,7 +198,7 @@ where
}
};
let wait_duration = retry_after(num_retries, connect_to_compute_retry_config);
let wait_duration = retry_after(num_retries, compute.retry);
num_retries += 1;
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::RetryTimeout);

View File

@@ -152,7 +152,7 @@ pub async fn task_main(
Ok(Some(p)) => {
ctx.set_success();
let _disconnect = ctx.log_connect();
match p.proxy_pass().await {
match p.proxy_pass(&config.connect_to_compute).await {
Ok(()) => {}
Err(ErrorSource::Client(e)) => {
warn!(?session_id, "per-client task finished with an IO error from the client: {e:#}");
@@ -351,7 +351,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
},
&user_info,
config.wake_compute_retry_config,
config.connect_to_compute_retry_config,
&config.connect_to_compute,
)
.or_else(|e| stream.throw_error(e))
.await?;

View File

@@ -5,6 +5,7 @@ use utils::measured_stream::MeasuredStream;
use super::copy_bidirectional::ErrorSource;
use crate::cancellation;
use crate::compute::PostgresConnection;
use crate::config::ComputeConfig;
use crate::control_plane::messages::MetricsAuxInfo;
use crate::metrics::{Direction, Metrics, NumClientConnectionsGuard, NumConnectionRequestsGuard};
use crate::stream::Stream;
@@ -67,9 +68,17 @@ pub(crate) struct ProxyPassthrough<P, S> {
}
impl<P, S: AsyncRead + AsyncWrite + Unpin> ProxyPassthrough<P, S> {
pub(crate) async fn proxy_pass(self) -> Result<(), ErrorSource> {
pub(crate) async fn proxy_pass(
self,
compute_config: &ComputeConfig,
) -> Result<(), ErrorSource> {
let res = proxy_pass(self.client, self.compute.stream, self.aux).await;
if let Err(err) = self.compute.cancel_closure.try_cancel_query().await {
if let Err(err) = self
.compute
.cancel_closure
.try_cancel_query(compute_config)
.await
{
tracing::warn!(session_id = ?self.session_id, ?err, "could not cancel the query in the database");
}
res

View File

@@ -13,8 +13,9 @@ use postgres_client::tls::{MakeTlsConnect, NoTls};
use retry::{retry_after, ShouldRetryWakeCompute};
use rstest::rstest;
use rustls::crypto::ring;
use rustls::pki_types;
use rustls::{pki_types, RootCertStore};
use tokio::io::DuplexStream;
use tokio_rustls::TlsConnector;
use super::connect_compute::ConnectMechanism;
use super::retry::CouldRetry;
@@ -22,7 +23,7 @@ use super::*;
use crate::auth::backend::{
ComputeCredentialKeys, ComputeCredentials, ComputeUserInfo, MaybeOwned,
};
use crate::config::{CertResolver, RetryConfig};
use crate::config::{CertResolver, ComputeConfig, RetryConfig};
use crate::control_plane::client::{ControlPlaneClient, TestControlPlaneClient};
use crate::control_plane::messages::{ControlPlaneErrorMessage, Details, MetricsAuxInfo, Status};
use crate::control_plane::{
@@ -67,16 +68,16 @@ fn generate_certs(
}
struct ClientConfig<'a> {
config: rustls::ClientConfig,
config: TlsConnector,
hostname: &'a str,
}
type TlsConnect<S> = <MakeRustlsConnect as MakeTlsConnect<S>>::TlsConnect;
type TlsConnect<'a, S> = <MakeRustlsConnect<'a> as MakeTlsConnect<S>>::TlsConnect;
impl ClientConfig<'_> {
fn make_tls_connect(self) -> anyhow::Result<TlsConnect<DuplexStream>> {
let mut mk = MakeRustlsConnect::new(self.config);
let tls = MakeTlsConnect::<DuplexStream>::make_tls_connect(&mut mk, self.hostname)?;
fn make_tls_connect(&self) -> anyhow::Result<TlsConnect<DuplexStream>> {
let mk = MakeRustlsConnect::new(&self.config);
let tls = MakeTlsConnect::<DuplexStream>::make_tls_connect(mk, self.hostname)?;
Ok(tls)
}
}
@@ -120,8 +121,12 @@ fn generate_tls_config<'a>(
store
})
.with_no_client_auth();
let config = Arc::new(config);
ClientConfig { config, hostname }
ClientConfig {
config: TlsConnector::from(config),
hostname,
}
};
Ok((client_config, tls_config))
@@ -468,7 +473,7 @@ impl ConnectMechanism for TestConnectMechanism {
&self,
_ctx: &RequestContext,
_node_info: &control_plane::CachedNodeInfo,
_timeout: std::time::Duration,
_config: &ComputeConfig,
) -> Result<Self::Connection, Self::ConnectError> {
let mut counter = self.counter.lock().unwrap();
let action = self.sequence[*counter];
@@ -576,6 +581,29 @@ fn helper_create_connect_info(
user_info
}
fn config() -> ComputeConfig {
let retry = RetryConfig {
base_delay: Duration::from_secs(1),
max_retries: 5,
backoff_factor: 2.0,
};
let root_store = RootCertStore::empty();
let client_config =
rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider()))
.with_safe_default_protocol_versions()
.expect("ring should support the default protocol versions")
.with_root_certificates(root_store)
.with_no_client_auth();
ComputeConfig {
retry,
tls: Arc::new(client_config).into(),
timeout: Duration::from_secs(2),
}
}
#[tokio::test]
async fn connect_to_compute_success() {
let _ = env_logger::try_init();
@@ -583,12 +611,8 @@ async fn connect_to_compute_success() {
let ctx = RequestContext::test();
let mechanism = TestConnectMechanism::new(vec![Wake, Connect]);
let user_info = helper_create_connect_info(&mechanism);
let config = RetryConfig {
base_delay: Duration::from_secs(1),
max_retries: 5,
backoff_factor: 2.0,
};
connect_to_compute(&ctx, &mechanism, &user_info, config, config)
let config = config();
connect_to_compute(&ctx, &mechanism, &user_info, config.retry, &config)
.await
.unwrap();
mechanism.verify();
@@ -601,12 +625,8 @@ async fn connect_to_compute_retry() {
let ctx = RequestContext::test();
let mechanism = TestConnectMechanism::new(vec![Wake, Retry, Wake, Connect]);
let user_info = helper_create_connect_info(&mechanism);
let config = RetryConfig {
base_delay: Duration::from_secs(1),
max_retries: 5,
backoff_factor: 2.0,
};
connect_to_compute(&ctx, &mechanism, &user_info, config, config)
let config = config();
connect_to_compute(&ctx, &mechanism, &user_info, config.retry, &config)
.await
.unwrap();
mechanism.verify();
@@ -620,12 +640,8 @@ async fn connect_to_compute_non_retry_1() {
let ctx = RequestContext::test();
let mechanism = TestConnectMechanism::new(vec![Wake, Retry, Wake, Fail]);
let user_info = helper_create_connect_info(&mechanism);
let config = RetryConfig {
base_delay: Duration::from_secs(1),
max_retries: 5,
backoff_factor: 2.0,
};
connect_to_compute(&ctx, &mechanism, &user_info, config, config)
let config = config();
connect_to_compute(&ctx, &mechanism, &user_info, config.retry, &config)
.await
.unwrap_err();
mechanism.verify();
@@ -639,12 +655,8 @@ async fn connect_to_compute_non_retry_2() {
let ctx = RequestContext::test();
let mechanism = TestConnectMechanism::new(vec![Wake, Fail, Wake, Connect]);
let user_info = helper_create_connect_info(&mechanism);
let config = RetryConfig {
base_delay: Duration::from_secs(1),
max_retries: 5,
backoff_factor: 2.0,
};
connect_to_compute(&ctx, &mechanism, &user_info, config, config)
let config = config();
connect_to_compute(&ctx, &mechanism, &user_info, config.retry, &config)
.await
.unwrap();
mechanism.verify();
@@ -665,17 +677,13 @@ async fn connect_to_compute_non_retry_3() {
max_retries: 1,
backoff_factor: 2.0,
};
let connect_to_compute_retry_config = RetryConfig {
base_delay: Duration::from_secs(1),
max_retries: 5,
backoff_factor: 2.0,
};
let config = config();
connect_to_compute(
&ctx,
&mechanism,
&user_info,
wake_compute_retry_config,
connect_to_compute_retry_config,
&config,
)
.await
.unwrap_err();
@@ -690,12 +698,8 @@ async fn wake_retry() {
let ctx = RequestContext::test();
let mechanism = TestConnectMechanism::new(vec![WakeRetry, Wake, Connect]);
let user_info = helper_create_connect_info(&mechanism);
let config = RetryConfig {
base_delay: Duration::from_secs(1),
max_retries: 5,
backoff_factor: 2.0,
};
connect_to_compute(&ctx, &mechanism, &user_info, config, config)
let config = config();
connect_to_compute(&ctx, &mechanism, &user_info, config.retry, &config)
.await
.unwrap();
mechanism.verify();
@@ -709,12 +713,8 @@ async fn wake_non_retry() {
let ctx = RequestContext::test();
let mechanism = TestConnectMechanism::new(vec![WakeRetry, WakeFail]);
let user_info = helper_create_connect_info(&mechanism);
let config = RetryConfig {
base_delay: Duration::from_secs(1),
max_retries: 5,
backoff_factor: 2.0,
};
connect_to_compute(&ctx, &mechanism, &user_info, config, config)
let config = config();
connect_to_compute(&ctx, &mechanism, &user_info, config.retry, &config)
.await
.unwrap_err();
mechanism.verify();

View File

@@ -12,6 +12,7 @@ use uuid::Uuid;
use super::connection_with_credentials_provider::ConnectionWithCredentialsProvider;
use crate::cache::project_info::ProjectInfoCache;
use crate::cancellation::{CancelMap, CancellationHandler};
use crate::config::ProxyConfig;
use crate::intern::{ProjectIdInt, RoleNameInt};
use crate::metrics::{Metrics, RedisErrors, RedisEventsCount};
@@ -249,6 +250,7 @@ async fn handle_messages<C: ProjectInfoCache + Send + Sync + 'static>(
/// Handle console's invalidation messages.
#[tracing::instrument(name = "redis_notifications", skip_all)]
pub async fn task_main<C>(
config: &'static ProxyConfig,
redis: ConnectionWithCredentialsProvider,
cache: Arc<C>,
cancel_map: CancelMap,
@@ -258,6 +260,7 @@ where
C: ProjectInfoCache + Send + Sync + 'static,
{
let cancellation_handler = Arc::new(CancellationHandler::<()>::new(
&config.connect_to_compute,
cancel_map,
crate::metrics::CancellationSource::FromRedis,
));

View File

@@ -50,12 +50,6 @@ impl<S: AsyncWrite + Unpin> SaslStream<'_, S> {
self.stream.write_message(&msg.to_reply()).await?;
Ok(())
}
// Queue a SASL message for the client.
fn send_noflush(&mut self, msg: &ServerMessage<&str>) -> io::Result<()> {
self.stream.write_message_noflush(&msg.to_reply())?;
Ok(())
}
}
/// SASL authentication outcome.
@@ -91,7 +85,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> SaslStream<'_, S> {
continue;
}
Step::Success(result, reply) => {
self.send_noflush(&ServerMessage::Final(&reply))?;
self.send(&ServerMessage::Final(&reply)).await?;
Outcome::Success(result)
}
Step::Failure(reason) => Outcome::Failure(reason),

View File

@@ -22,7 +22,7 @@ use crate::compute;
use crate::compute_ctl::{
ComputeCtlError, ExtensionInstallRequest, Privilege, SetRoleGrantsRequest,
};
use crate::config::ProxyConfig;
use crate::config::{ComputeConfig, ProxyConfig};
use crate::context::RequestContext;
use crate::control_plane::client::ApiLockError;
use crate::control_plane::errors::{GetAuthInfoError, WakeComputeError};
@@ -30,6 +30,7 @@ use crate::control_plane::locks::ApiLocks;
use crate::control_plane::CachedNodeInfo;
use crate::error::{ErrorKind, ReportableError, UserFacingError};
use crate::intern::EndpointIdInt;
use crate::postgres_rustls::MakeRustlsConnect;
use crate::proxy::connect_compute::ConnectMechanism;
use crate::proxy::retry::{CouldRetry, ShouldRetryWakeCompute};
use crate::rate_limiter::EndpointRateLimiter;
@@ -196,7 +197,7 @@ impl PoolingBackend {
},
&backend,
self.config.wake_compute_retry_config,
self.config.connect_to_compute_retry_config,
&self.config.connect_to_compute,
)
.await
}
@@ -237,7 +238,7 @@ impl PoolingBackend {
},
&backend,
self.config.wake_compute_retry_config,
self.config.connect_to_compute_retry_config,
&self.config.connect_to_compute,
)
.await
}
@@ -502,7 +503,7 @@ impl ConnectMechanism for TokioMechanism {
&self,
ctx: &RequestContext,
node_info: &CachedNodeInfo,
timeout: Duration,
compute_config: &ComputeConfig,
) -> Result<Self::Connection, Self::ConnectError> {
let host = node_info.config.get_host();
let permit = self.locks.get_permit(&host).await?;
@@ -511,10 +512,12 @@ impl ConnectMechanism for TokioMechanism {
let config = config
.user(&self.conn_info.user_info.user)
.dbname(&self.conn_info.dbname)
.connect_timeout(timeout);
.connect_timeout(compute_config.timeout);
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
let res = config.connect(postgres_client::NoTls).await;
let res = config
.connect(MakeRustlsConnect::new(&compute_config.tls))
.await;
drop(pause);
let (client, connection) = permit.release_result(res)?;
@@ -552,7 +555,7 @@ impl ConnectMechanism for HyperMechanism {
&self,
ctx: &RequestContext,
node_info: &CachedNodeInfo,
timeout: Duration,
config: &ComputeConfig,
) -> Result<Self::Connection, Self::ConnectError> {
let host = node_info.config.get_host();
let permit = self.locks.get_permit(&host).await?;
@@ -560,7 +563,11 @@ impl ConnectMechanism for HyperMechanism {
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
let port = node_info.config.get_port();
let res = connect_http2(&host, port, timeout).await;
// TODO(conrad): how would we roll-out TLS for these connections?
// Postgres has negotiation, but no such thing for HTTP.
// Assume https, fall back to http (on the same port)?
let res = connect_http2(&host, port, config.timeout).await;
drop(pause);
let (client, connection) = permit.release_result(res)?;

View File

@@ -5,7 +5,6 @@ use std::task::{ready, Poll};
use futures::future::poll_fn;
use futures::Future;
use postgres_client::tls::NoTlsStream;
use postgres_client::AsyncMessage;
use smallvec::SmallVec;
use tokio::net::TcpStream;
@@ -26,6 +25,7 @@ use super::conn_pool_lib::{
use crate::context::RequestContext;
use crate::control_plane::messages::MetricsAuxInfo;
use crate::metrics::Metrics;
use crate::postgres_rustls::RustlsStream;
#[derive(Debug, Clone)]
pub(crate) struct ConnInfoWithAuth {
@@ -58,7 +58,7 @@ pub(crate) fn poll_client<C: ClientInnerExt>(
ctx: &RequestContext,
conn_info: ConnInfo,
client: C,
mut connection: postgres_client::Connection<TcpStream, NoTlsStream>,
mut connection: postgres_client::Connection<TcpStream, RustlsStream<tokio::net::TcpStream>>,
conn_id: uuid::Uuid,
aux: MetricsAuxInfo,
) -> Client<C> {

View File

@@ -168,7 +168,7 @@ pub(crate) async fn serve_websocket(
Ok(Some(p)) => {
ctx.set_success();
ctx.log_connect();
match p.proxy_pass().await {
match p.proxy_pass(&config.connect_to_compute).await {
Ok(()) => Ok(()),
Err(ErrorSource::Client(err)) => Err(err).context("client"),
Err(ErrorSource::Compute(err)) => Err(err).context("compute"),

View File

@@ -9,7 +9,6 @@ default = []
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
# which adds some runtime cost to run tests on outage conditions
testing = ["fail/failpoints"]
benchmarking = []
[dependencies]
async-stream.workspace = true
@@ -78,4 +77,3 @@ tracing-subscriber = { workspace = true, features = ["json"] }
[[bench]]
name = "receive_wal"
harness = false
required-features = ["benchmarking"]

View File

@@ -1,18 +1,18 @@
use std::sync::Arc;
use crate::rate_limit::RateLimiter;
use crate::safekeeper::{ProposerAcceptorMessage, ProposerElected, SafeKeeper, TermHistory};
use crate::state::{TimelinePersistentState, TimelineState};
use crate::timeline::{get_timeline_dir, SharedState, StateSK, Timeline};
use crate::timelines_set::TimelinesSet;
use crate::wal_backup::remote_timeline_path;
use crate::{control_file, wal_storage, SafeKeeperConf};
use camino_tempfile::Utf8TempDir;
use safekeeper::rate_limit::RateLimiter;
use safekeeper::safekeeper::{ProposerAcceptorMessage, ProposerElected, SafeKeeper, TermHistory};
use safekeeper::state::{TimelinePersistentState, TimelineState};
use safekeeper::timeline::{get_timeline_dir, SharedState, StateSK, Timeline};
use safekeeper::timelines_set::TimelinesSet;
use safekeeper::wal_backup::remote_timeline_path;
use safekeeper::{control_file, wal_storage, SafeKeeperConf};
use tokio::fs::create_dir_all;
use utils::id::{NodeId, TenantTimelineId};
use utils::lsn::Lsn;
/// A Safekeeper testing or benchmarking environment. Uses a tempdir for storage, removed on drop.
/// A Safekeeper benchmarking environment. Uses a tempdir for storage, removed on drop.
pub struct Env {
/// Whether to enable fsync.
pub fsync: bool,
@@ -21,7 +21,7 @@ pub struct Env {
}
impl Env {
/// Creates a new test or benchmarking environment in a temporary directory. fsync controls whether to
/// Creates a new benchmarking environment in a temporary directory. fsync controls whether to
/// enable fsyncing.
pub fn new(fsync: bool) -> anyhow::Result<Self> {
let tempdir = camino_tempfile::tempdir()?;
@@ -47,7 +47,6 @@ impl Env {
&self,
node_id: NodeId,
ttid: TenantTimelineId,
start_lsn: Lsn,
) -> anyhow::Result<SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>> {
let conf = self.make_conf(node_id);
@@ -68,9 +67,9 @@ impl Env {
safekeeper
.process_msg(&ProposerAcceptorMessage::Elected(ProposerElected {
term: 1,
start_streaming_at: start_lsn,
term_history: TermHistory(vec![(1, start_lsn).into()]),
timeline_start_lsn: start_lsn,
start_streaming_at: Lsn(0),
term_history: TermHistory(vec![(1, Lsn(0)).into()]),
timeline_start_lsn: Lsn(0),
}))
.await?;
@@ -83,13 +82,12 @@ impl Env {
&self,
node_id: NodeId,
ttid: TenantTimelineId,
start_lsn: Lsn,
) -> anyhow::Result<Arc<Timeline>> {
let conf = Arc::new(self.make_conf(node_id));
let timeline_dir = get_timeline_dir(&conf, &ttid);
let remote_path = remote_timeline_path(&ttid)?;
let safekeeper = self.make_safekeeper(node_id, ttid, start_lsn).await?;
let safekeeper = self.make_safekeeper(node_id, ttid).await?;
let shared_state = SharedState::new(StateSK::Loaded(safekeeper));
let timeline = Timeline::new(

View File

@@ -1,7 +1,11 @@
//! WAL ingestion benchmarks.
#[path = "benchutils.rs"]
mod benchutils;
use std::io::Write as _;
use benchutils::Env;
use bytes::BytesMut;
use camino_tempfile::tempfile;
use criterion::{criterion_group, criterion_main, BatchSize, Bencher, Criterion};
@@ -12,7 +16,6 @@ use safekeeper::receive_wal::{self, WalAcceptor};
use safekeeper::safekeeper::{
AcceptorProposerMessage, AppendRequest, AppendRequestHeader, ProposerAcceptorMessage,
};
use safekeeper::test_utils::Env;
use tokio::io::AsyncWriteExt as _;
use utils::id::{NodeId, TenantTimelineId};
use utils::lsn::Lsn;
@@ -73,15 +76,12 @@ fn bench_process_msg(c: &mut Criterion) {
assert!(size >= prefixlen);
let message = vec![0; size - prefixlen];
let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message), Lsn(0));
let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message));
// Set up the Safekeeper.
let env = Env::new(fsync)?;
let mut safekeeper = runtime.block_on(env.make_safekeeper(
NodeId(1),
TenantTimelineId::generate(),
Lsn(0),
))?;
let mut safekeeper =
runtime.block_on(env.make_safekeeper(NodeId(1), TenantTimelineId::generate()))?;
b.iter_batched_ref(
// Pre-construct WAL records and requests. Criterion will batch them.
@@ -134,8 +134,7 @@ fn bench_wal_acceptor(c: &mut Criterion) {
let runtime = tokio::runtime::Runtime::new()?; // needs multithreaded
let env = Env::new(fsync)?;
let walgen =
&mut WalGenerator::new(LogicalMessageGenerator::new(c"prefix", b"message"), Lsn(0));
let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(c"prefix", b"message"));
// Create buffered channels that can fit all requests, to avoid blocking on channels.
let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(n);
@@ -146,7 +145,7 @@ fn bench_wal_acceptor(c: &mut Criterion) {
// TODO: WalAcceptor doesn't actually need a full timeline, only
// Safekeeper::process_msg(). Consider decoupling them to simplify the setup.
let tli = env
.make_timeline(NodeId(1), TenantTimelineId::generate(), Lsn(0))
.make_timeline(NodeId(1), TenantTimelineId::generate())
.await?
.wal_residence_guard()
.await?;
@@ -240,7 +239,7 @@ fn bench_wal_acceptor_throughput(c: &mut Criterion) {
assert!(size >= prefixlen);
let message = vec![0; size - prefixlen];
let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message), Lsn(0));
let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message));
// Construct and spawn the WalAcceptor task.
let env = Env::new(fsync)?;
@@ -250,7 +249,7 @@ fn bench_wal_acceptor_throughput(c: &mut Criterion) {
runtime.block_on(async {
let tli = env
.make_timeline(NodeId(1), TenantTimelineId::generate(), Lsn(0))
.make_timeline(NodeId(1), TenantTimelineId::generate())
.await?
.wal_residence_guard()
.await?;

View File

@@ -564,7 +564,7 @@ pub fn make_router(
if conf.http_auth.is_some() {
router = router.middleware(auth_middleware(|request| {
const ALLOWLIST_ROUTES: &[&str] =
&["/v1/status", "/metrics", "/profile/cpu", "/profile/heap"];
&["/v1/status", "/metrics", "/profile/cpu", "profile/heap"];
if ALLOWLIST_ROUTES.contains(&request.uri().path()) {
None
} else {

View File

@@ -43,9 +43,6 @@ pub mod wal_reader_stream;
pub mod wal_service;
pub mod wal_storage;
#[cfg(any(test, feature = "benchmarking"))]
pub mod test_utils;
mod timelines_global_map;
use std::sync::Arc;
pub use timelines_global_map::GlobalTimelines;

View File

@@ -94,14 +94,9 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
}
}
let max_next_record_lsn = match max_next_record_lsn {
Some(lsn) => lsn,
None => { continue; }
};
let batch = InterpretedWalRecords {
records,
next_record_lsn: Some(max_next_record_lsn),
next_record_lsn: max_next_record_lsn
};
tx.send(Batch {wal_end_lsn, available_wal_end_lsn, records: batch}).await.unwrap();

View File

@@ -18,7 +18,7 @@ impl DiskWalProposer {
internal_available_lsn: Lsn(0),
prev_lsn: Lsn(0),
disk: BlockStorage::new(),
wal_generator: WalGenerator::new(LogicalMessageGenerator::new(c"", &[]), Lsn(0)),
wal_generator: WalGenerator::new(LogicalMessageGenerator::new(c"", &[])),
}),
})
}

View File

@@ -1,4 +1,3 @@
use std::borrow::Cow;
use std::error::Error as _;
use std::sync::Arc;
use std::{collections::HashMap, time::Duration};
@@ -7,7 +6,6 @@ use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
use control_plane::local_env::LocalEnv;
use futures::StreamExt;
use hyper::StatusCode;
use pageserver_api::controller_api::AvailabilityZone;
use pageserver_api::shard::{ShardCount, ShardNumber, ShardStripeSize, TenantShardId};
use postgres_connection::parse_host_port;
use serde::{Deserialize, Serialize};
@@ -30,9 +28,6 @@ struct UnshardedComputeHookTenant {
// Which node is this tenant attached to
node_id: NodeId,
// The tenant's preferred AZ, so that we may pass this on to the control plane
preferred_az: Option<AvailabilityZone>,
// Must hold this lock to send a notification.
send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>>,
}
@@ -41,9 +36,6 @@ struct ShardedComputeHookTenant {
shard_count: ShardCount,
shards: Vec<(ShardNumber, NodeId)>,
// The tenant's preferred AZ, so that we may pass this on to the control plane
preferred_az: Option<AvailabilityZone>,
// Must hold this lock to send a notification. The contents represent
// the last successfully sent notification, and are used to coalesce multiple
// updates by only sending when there is a chance since our last successful send.
@@ -72,24 +64,17 @@ enum ComputeHookTenant {
impl ComputeHookTenant {
/// Construct with at least one shard's information
fn new(
tenant_shard_id: TenantShardId,
stripe_size: ShardStripeSize,
preferred_az: Option<AvailabilityZone>,
node_id: NodeId,
) -> Self {
fn new(tenant_shard_id: TenantShardId, stripe_size: ShardStripeSize, node_id: NodeId) -> Self {
if tenant_shard_id.shard_count.count() > 1 {
Self::Sharded(ShardedComputeHookTenant {
shards: vec![(tenant_shard_id.shard_number, node_id)],
stripe_size,
shard_count: tenant_shard_id.shard_count,
preferred_az,
send_lock: Arc::default(),
})
} else {
Self::Unsharded(UnshardedComputeHookTenant {
node_id,
preferred_az,
send_lock: Arc::default(),
})
}
@@ -135,20 +120,15 @@ impl ComputeHookTenant {
/// Set one shard's location. If stripe size or shard count have changed, Self is reset
/// and drops existing content.
fn update(&mut self, shard_update: ShardUpdate) {
let tenant_shard_id = shard_update.tenant_shard_id;
let node_id = shard_update.node_id;
let stripe_size = shard_update.stripe_size;
let preferred_az = shard_update.preferred_az;
fn update(
&mut self,
tenant_shard_id: TenantShardId,
stripe_size: ShardStripeSize,
node_id: NodeId,
) {
match self {
Self::Unsharded(unsharded_tenant) if tenant_shard_id.shard_count.count() == 1 => {
unsharded_tenant.node_id = node_id;
if unsharded_tenant.preferred_az.as_ref()
!= preferred_az.as_ref().map(|az| az.as_ref())
{
unsharded_tenant.preferred_az = preferred_az.map(|az| az.as_ref().clone());
}
unsharded_tenant.node_id = node_id
}
Self::Sharded(sharded_tenant)
if sharded_tenant.stripe_size == stripe_size
@@ -166,21 +146,10 @@ impl ComputeHookTenant {
.push((tenant_shard_id.shard_number, node_id));
sharded_tenant.shards.sort_by_key(|s| s.0)
}
if sharded_tenant.preferred_az.as_ref()
!= preferred_az.as_ref().map(|az| az.as_ref())
{
sharded_tenant.preferred_az = preferred_az.map(|az| az.as_ref().clone());
}
}
_ => {
// Shard count changed: reset struct.
*self = Self::new(
tenant_shard_id,
stripe_size,
preferred_az.map(|az| az.into_owned()),
node_id,
);
*self = Self::new(tenant_shard_id, stripe_size, node_id);
}
}
}
@@ -196,7 +165,6 @@ struct ComputeHookNotifyRequestShard {
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
struct ComputeHookNotifyRequest {
tenant_id: TenantId,
preferred_az: Option<String>,
stripe_size: Option<ShardStripeSize>,
shards: Vec<ComputeHookNotifyRequestShard>,
}
@@ -270,10 +238,6 @@ impl ComputeHookTenant {
node_id: unsharded_tenant.node_id,
}],
stripe_size: None,
preferred_az: unsharded_tenant
.preferred_az
.as_ref()
.map(|az| az.0.clone()),
}),
Self::Sharded(sharded_tenant)
if sharded_tenant.shards.len() == sharded_tenant.shard_count.count() as usize =>
@@ -289,7 +253,6 @@ impl ComputeHookTenant {
})
.collect(),
stripe_size: Some(sharded_tenant.stripe_size),
preferred_az: sharded_tenant.preferred_az.as_ref().map(|az| az.0.clone()),
})
}
Self::Sharded(sharded_tenant) => {
@@ -350,17 +313,6 @@ pub(super) struct ComputeHook {
client: reqwest::Client,
}
/// Callers may give us a list of these when asking us to send a bulk batch
/// of notifications in the background. This is a 'notification' in the sense of
/// other code notifying us of a shard's status, rather than being the final notification
/// that we send upwards to the control plane for the whole tenant.
pub(crate) struct ShardUpdate<'a> {
pub(crate) tenant_shard_id: TenantShardId,
pub(crate) node_id: NodeId,
pub(crate) stripe_size: ShardStripeSize,
pub(crate) preferred_az: Option<Cow<'a, AvailabilityZone>>,
}
impl ComputeHook {
pub(super) fn new(config: Config) -> Self {
let authorization_header = config
@@ -411,7 +363,6 @@ impl ComputeHook {
tenant_id,
shards,
stripe_size,
preferred_az: _preferred_az,
} = reconfigure_request;
let compute_pageservers = shards
@@ -552,30 +503,24 @@ impl ComputeHook {
}
/// Synchronous phase: update the per-tenant state for the next intended notification
fn notify_prepare(&self, shard_update: ShardUpdate) -> MaybeSendResult {
fn notify_prepare(
&self,
tenant_shard_id: TenantShardId,
node_id: NodeId,
stripe_size: ShardStripeSize,
) -> MaybeSendResult {
let mut state_locked = self.state.lock().unwrap();
use std::collections::hash_map::Entry;
let tenant_shard_id = shard_update.tenant_shard_id;
let tenant = match state_locked.entry(tenant_shard_id.tenant_id) {
Entry::Vacant(e) => {
let ShardUpdate {
tenant_shard_id,
node_id,
stripe_size,
preferred_az,
} = shard_update;
e.insert(ComputeHookTenant::new(
tenant_shard_id,
stripe_size,
preferred_az.map(|az| az.into_owned()),
node_id,
))
}
Entry::Vacant(e) => e.insert(ComputeHookTenant::new(
tenant_shard_id,
stripe_size,
node_id,
)),
Entry::Occupied(e) => {
let tenant = e.into_mut();
tenant.update(shard_update);
tenant.update(tenant_shard_id, stripe_size, node_id);
tenant
}
};
@@ -663,14 +608,13 @@ impl ComputeHook {
/// if something failed.
pub(super) fn notify_background(
self: &Arc<Self>,
notifications: Vec<ShardUpdate>,
notifications: Vec<(TenantShardId, NodeId, ShardStripeSize)>,
result_tx: tokio::sync::mpsc::Sender<Result<(), (TenantShardId, NotifyError)>>,
cancel: &CancellationToken,
) {
let mut maybe_sends = Vec::new();
for shard_update in notifications {
let tenant_shard_id = shard_update.tenant_shard_id;
let maybe_send_result = self.notify_prepare(shard_update);
for (tenant_shard_id, node_id, stripe_size) in notifications {
let maybe_send_result = self.notify_prepare(tenant_shard_id, node_id, stripe_size);
maybe_sends.push((tenant_shard_id, maybe_send_result))
}
@@ -734,14 +678,15 @@ impl ComputeHook {
/// periods, but we don't retry forever. The **caller** is responsible for handling failures and
/// ensuring that they eventually call again to ensure that the compute is eventually notified of
/// the proper pageserver nodes for a tenant.
#[tracing::instrument(skip_all, fields(tenant_id=%shard_update.tenant_shard_id.tenant_id, shard_id=%shard_update.tenant_shard_id.shard_slug(), node_id))]
pub(super) async fn notify<'a>(
#[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), node_id))]
pub(super) async fn notify(
&self,
shard_update: ShardUpdate<'a>,
tenant_shard_id: TenantShardId,
node_id: NodeId,
stripe_size: ShardStripeSize,
cancel: &CancellationToken,
) -> Result<(), NotifyError> {
let tenant_shard_id = shard_update.tenant_shard_id;
let maybe_send_result = self.notify_prepare(shard_update);
let maybe_send_result = self.notify_prepare(tenant_shard_id, node_id, stripe_size);
self.notify_execute(maybe_send_result, tenant_shard_id, cancel)
.await
}
@@ -794,7 +739,6 @@ pub(crate) mod tests {
shard_number: ShardNumber(0),
},
ShardStripeSize(12345),
None,
NodeId(1),
);
@@ -821,32 +765,30 @@ pub(crate) mod tests {
// Writing the first shard of a multi-sharded situation (i.e. in a split)
// resets the tenant state and puts it in an non-notifying state (need to
// see all shards)
tenant_state.update(ShardUpdate {
tenant_shard_id: TenantShardId {
tenant_state.update(
TenantShardId {
tenant_id,
shard_count: ShardCount::new(2),
shard_number: ShardNumber(1),
},
stripe_size: ShardStripeSize(32768),
preferred_az: None,
node_id: NodeId(1),
});
ShardStripeSize(32768),
NodeId(1),
);
assert!(matches!(
tenant_state.maybe_send(tenant_id, None),
MaybeSendResult::Noop
));
// Writing the second shard makes it ready to notify
tenant_state.update(ShardUpdate {
tenant_shard_id: TenantShardId {
tenant_state.update(
TenantShardId {
tenant_id,
shard_count: ShardCount::new(2),
shard_number: ShardNumber(0),
},
stripe_size: ShardStripeSize(32768),
preferred_az: None,
node_id: NodeId(1),
});
ShardStripeSize(32768),
NodeId(1),
);
let send_result = tenant_state.maybe_send(tenant_id, None);
let MaybeSendResult::Transmit((request, mut guard)) = send_result else {

View File

@@ -11,7 +11,6 @@ use diesel::Connection;
use itertools::Itertools;
use pageserver_api::controller_api::AvailabilityZone;
use pageserver_api::controller_api::MetadataHealthRecord;
use pageserver_api::controller_api::SafekeeperDescribeResponse;
use pageserver_api::controller_api::ShardSchedulingPolicy;
use pageserver_api::controller_api::{NodeSchedulingPolicy, PlacementPolicy};
use pageserver_api::models::TenantConfig;
@@ -1242,18 +1241,6 @@ impl SafekeeperPersistence {
availability_zone_id: &self.availability_zone_id,
}
}
pub(crate) fn as_describe_response(&self) -> SafekeeperDescribeResponse {
// omit the `active` flag on purpose: it is deprecated.
SafekeeperDescribeResponse {
id: NodeId(self.id as u64),
region_id: self.region_id.clone(),
version: self.version,
host: self.host.clone(),
port: self.port,
http_port: self.http_port,
availability_zone_id: self.availability_zone_id.clone(),
}
}
}
#[derive(Insertable, AsChangeset)]

View File

@@ -1,14 +1,13 @@
use crate::pageserver_client::PageserverClient;
use crate::persistence::Persistence;
use crate::{compute_hook, service};
use pageserver_api::controller_api::{AvailabilityZone, PlacementPolicy};
use crate::service;
use pageserver_api::controller_api::PlacementPolicy;
use pageserver_api::models::{
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig,
};
use pageserver_api::shard::{ShardIdentity, TenantShardId};
use pageserver_client::mgmt_api;
use reqwest::StatusCode;
use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
@@ -46,7 +45,6 @@ pub(super) struct Reconciler {
pub(crate) reconciler_config: ReconcilerConfig,
pub(crate) config: TenantConfig,
pub(crate) preferred_az: Option<AvailabilityZone>,
/// Observed state from the point of view of the reconciler.
/// This gets updated as the reconciliation makes progress.
@@ -836,12 +834,9 @@ impl Reconciler {
let result = self
.compute_hook
.notify(
compute_hook::ShardUpdate {
tenant_shard_id: self.tenant_shard_id,
node_id: node.get_id(),
stripe_size: self.shard.stripe_size,
preferred_az: self.preferred_az.as_ref().map(Cow::Borrowed),
},
self.tenant_shard_id,
node.get_id(),
self.shard.stripe_size,
&self.cancel,
)
.await;

View File

@@ -18,7 +18,7 @@ use crate::{
background_node_operations::{
Drain, Fill, Operation, OperationError, OperationHandler, MAX_RECONCILES_PER_OPERATION,
},
compute_hook::{self, NotifyError},
compute_hook::NotifyError,
drain_utils::{self, TenantShardDrain, TenantShardIterator},
id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, TracingExclusiveGuard},
leadership::Leadership,
@@ -46,11 +46,10 @@ use pageserver_api::{
controller_api::{
AvailabilityZone, MetadataHealthRecord, MetadataHealthUpdateRequest, NodeAvailability,
NodeRegisterRequest, NodeSchedulingPolicy, NodeShard, NodeShardResponse, PlacementPolicy,
SafekeeperDescribeResponse, ShardSchedulingPolicy, ShardsPreferredAzsRequest,
ShardsPreferredAzsResponse, TenantCreateRequest, TenantCreateResponse,
TenantCreateResponseShard, TenantDescribeResponse, TenantDescribeResponseShard,
TenantLocateResponse, TenantPolicyRequest, TenantShardMigrateRequest,
TenantShardMigrateResponse,
ShardSchedulingPolicy, ShardsPreferredAzsRequest, ShardsPreferredAzsResponse,
TenantCreateRequest, TenantCreateResponse, TenantCreateResponseShard,
TenantDescribeResponse, TenantDescribeResponseShard, TenantLocateResponse,
TenantPolicyRequest, TenantShardMigrateRequest, TenantShardMigrateResponse,
},
models::{
SecondaryProgress, TenantConfigPatchRequest, TenantConfigRequest,
@@ -657,14 +656,11 @@ impl Service {
// emit a compute notification for this. In the case where our observed state does not
// yet match our intent, we will eventually reconcile, and that will emit a compute notification.
if let Some(attached_at) = tenant_shard.stably_attached() {
compute_notifications.push(compute_hook::ShardUpdate {
tenant_shard_id: *tenant_shard_id,
node_id: attached_at,
stripe_size: tenant_shard.shard.stripe_size,
preferred_az: tenant_shard
.preferred_az()
.map(|az| Cow::Owned(az.clone())),
});
compute_notifications.push((
*tenant_shard_id,
attached_at,
tenant_shard.shard.stripe_size,
));
}
}
}
@@ -4790,15 +4786,7 @@ impl Service {
for (child_id, child_ps, stripe_size) in child_locations {
if let Err(e) = self
.compute_hook
.notify(
compute_hook::ShardUpdate {
tenant_shard_id: child_id,
node_id: child_ps,
stripe_size,
preferred_az: preferred_az_id.as_ref().map(Cow::Borrowed),
},
&self.cancel,
)
.notify(child_id, child_ps, stripe_size, &self.cancel)
.await
{
tracing::warn!("Failed to update compute of {}->{} during split, proceeding anyway to complete split ({e})",
@@ -7170,24 +7158,15 @@ impl Service {
pub(crate) async fn safekeepers_list(
&self,
) -> Result<Vec<SafekeeperDescribeResponse>, DatabaseError> {
Ok(self
.persistence
.list_safekeepers()
.await?
.into_iter()
.map(|v| v.as_describe_response())
.collect::<Vec<_>>())
) -> Result<Vec<crate::persistence::SafekeeperPersistence>, DatabaseError> {
self.persistence.list_safekeepers().await
}
pub(crate) async fn get_safekeeper(
&self,
id: i64,
) -> Result<SafekeeperDescribeResponse, DatabaseError> {
self.persistence
.safekeeper_get(id)
.await
.map(|v| v.as_describe_response())
) -> Result<crate::persistence::SafekeeperPersistence, DatabaseError> {
self.persistence.safekeeper_get(id).await
}
pub(crate) async fn upsert_safekeeper(

View File

@@ -1198,7 +1198,6 @@ impl TenantShard {
detach,
reconciler_config,
config: self.config.clone(),
preferred_az: self.preferred_az_id.clone(),
observed: self.observed.clone(),
original_observed: self.observed.clone(),
compute_hook: compute_hook.clone(),

View File

@@ -310,7 +310,7 @@ pub(crate) enum BlobDataParseResult {
index_part_generation: Generation,
s3_layers: HashSet<(LayerName, Generation)>,
},
/// The remains of an uncleanly deleted Timeline or aborted timeline creation(e.g. an initdb archive only, or some layer without an index)
/// The remains of a deleted Timeline (i.e. an initdb archive only)
Relic,
Incorrect {
errors: Vec<String>,
@@ -346,7 +346,7 @@ pub(crate) async fn list_timeline_blobs(
match res {
ListTimelineBlobsResult::Ready(data) => Ok(data),
ListTimelineBlobsResult::MissingIndexPart(_) => {
// Retry if listing raced with removal of an index
// Retry if index is missing.
let data = list_timeline_blobs_impl(remote_client, id, root_target)
.await?
.into_data();
@@ -358,7 +358,7 @@ pub(crate) async fn list_timeline_blobs(
enum ListTimelineBlobsResult {
/// Blob data is ready to be intepreted.
Ready(RemoteTimelineBlobData),
/// The listing contained an index but when we tried to fetch it, we couldn't
/// List timeline blobs has layer files but is missing [`IndexPart`].
MissingIndexPart(RemoteTimelineBlobData),
}
@@ -467,19 +467,19 @@ async fn list_timeline_blobs_impl(
match index_part_object.as_ref() {
Some(selected) => index_part_keys.retain(|k| k != selected),
None => {
// This case does not indicate corruption, but it should be very unusual. It can
// happen if:
// - timeline creation is in progress (first layer is written before index is written)
// - timeline deletion happened while a stale pageserver was still attached, it might upload
// a layer after the deletion is done.
tracing::info!(
// It is possible that the branch gets deleted after we got some layer files listed
// and we no longer have the index file in the listing.
errors.push(
"S3 list response got no index_part.json file but still has layer files"
.to_string(),
);
return Ok(ListTimelineBlobsResult::Ready(RemoteTimelineBlobData {
blob_data: BlobDataParseResult::Relic,
unused_index_keys: index_part_keys,
unknown_keys,
}));
return Ok(ListTimelineBlobsResult::MissingIndexPart(
RemoteTimelineBlobData {
blob_data: BlobDataParseResult::Incorrect { errors, s3_layers },
unused_index_keys: index_part_keys,
unknown_keys,
},
));
}
}

View File

@@ -134,9 +134,6 @@ DEFAULT_BRANCH_NAME: str = "main"
BASE_PORT: int = 15000
# By default we create pageservers with this phony AZ
DEFAULT_AZ_ID: str = "us-east-2a"
@pytest.fixture(scope="session")
def neon_api_key() -> str:
@@ -1096,7 +1093,7 @@ class NeonEnv:
"pg_auth_type": pg_auth_type,
"http_auth_type": http_auth_type,
# Default which can be overriden with `NeonEnvBuilder.pageserver_config_override`
"availability_zone": DEFAULT_AZ_ID,
"availability_zone": "us-east-2a",
# Disable pageserver disk syncs in tests: when running tests concurrently, this avoids
# the pageserver taking a long time to start up due to syncfs flushing other tests' data
"no_sync": True,

View File

@@ -738,18 +738,6 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
res_json = res.json()
assert res_json is None
def timeline_compact_info(
self,
tenant_id: TenantId | TenantShardId,
timeline_id: TimelineId,
) -> Any:
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/compact",
)
self.verbose_error(res)
res_json = res.json()
return res_json
def timeline_compact(
self,
tenant_id: TenantId | TenantShardId,
@@ -761,6 +749,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
enhanced_gc_bottom_most_compaction=False,
body: dict[str, Any] | None = None,
):
self.is_testing_enabled_or_skip()
query = {}
if force_repartition:
query["force_repartition"] = "true"

View File

@@ -1,199 +0,0 @@
-- create a schema that simulates Neon control plane operations table
-- however use partitioned operations tables with many (e.g. 500) child partition tables per table
-- in summary we create multiple of these partitioned operations tables (with 500 childs each) - until we reach the requested number of tables
-- first we need some other tables that can be referenced by the operations table
-- Table for branches
CREATE TABLE public.branches (
id text PRIMARY KEY
);
-- Table for endpoints
CREATE TABLE public.endpoints (
id text PRIMARY KEY
);
-- Table for projects
CREATE TABLE public.projects (
id text PRIMARY KEY
);
INSERT INTO public.branches (id)
VALUES ('branch_1');
-- Insert one row into endpoints
INSERT INTO public.endpoints (id)
VALUES ('endpoint_1');
-- Insert one row into projects
INSERT INTO public.projects (id)
VALUES ('project_1');
-- now we create a procedure that can create n operations tables
-- we do that in a procedure to save roundtrip latency when scaling the test to many tables
-- prefix is the base table name, e.g. 'operations_scale_1000' if we create 1000 tables
CREATE OR REPLACE PROCEDURE create_partitioned_tables(prefix text, n INT)
LANGUAGE plpgsql AS $$
DECLARE
table_name TEXT; -- Variable to hold table names dynamically
i INT; -- Counter for the loop
BEGIN
-- Loop to create n partitioned tables
FOR i IN 1..n LOOP
table_name := format('%s_%s', prefix, i);
-- Create the partitioned table
EXECUTE format(
'CREATE TABLE public.%s (
project_id character varying NOT NULL,
id uuid NOT NULL,
status integer,
action character varying NOT NULL,
error character varying,
created_at timestamp with time zone NOT NULL DEFAULT now(),
updated_at timestamp with time zone NOT NULL DEFAULT now(),
spec jsonb,
retry_at timestamp with time zone,
failures_count integer DEFAULT 0,
metadata jsonb NOT NULL DEFAULT ''{}''::jsonb,
executor_id text NOT NULL,
attempt_duration_ms integer,
metrics jsonb DEFAULT ''{}''::jsonb,
branch_id text,
endpoint_id text,
next_operation_id uuid,
compute_id text,
connection_attempt_at timestamp with time zone,
concurrency_key text,
queue_id text,
CONSTRAINT %s_pkey PRIMARY KEY (id, created_at),
CONSTRAINT %s_branch_id_fk FOREIGN KEY (branch_id) REFERENCES branches(id) ON DELETE CASCADE,
CONSTRAINT %s_endpoint_id_fk FOREIGN KEY (endpoint_id) REFERENCES endpoints(id) ON DELETE CASCADE,
CONSTRAINT %s_next_operation_id_fk FOREIGN KEY (next_operation_id, created_at) REFERENCES %s(id, created_at),
CONSTRAINT %s_project_id_fk FOREIGN KEY (project_id) REFERENCES projects(id) ON DELETE CASCADE
) PARTITION BY RANGE (created_at)',
table_name, table_name, table_name, table_name, table_name, table_name, table_name
);
-- Add indexes for the partitioned table
EXECUTE format('CREATE INDEX index_%s_on_next_operation_id ON public.%s (next_operation_id)', table_name, table_name);
EXECUTE format('CREATE INDEX index_%s_on_project_id ON public.%s (project_id)', table_name, table_name);
EXECUTE format('CREATE INDEX %s_branch_id ON public.%s (branch_id)', table_name, table_name);
EXECUTE format('CREATE INDEX %s_branch_id_created_idx ON public.%s (branch_id, created_at)', table_name, table_name);
EXECUTE format('CREATE INDEX %s_created_at_idx ON public.%s (created_at)', table_name, table_name);
EXECUTE format('CREATE INDEX %s_created_at_project_id_id_cond_idx ON public.%s (created_at, project_id, id)', table_name, table_name);
EXECUTE format('CREATE INDEX %s_endpoint_id ON public.%s (endpoint_id)', table_name, table_name);
EXECUTE format(
'CREATE INDEX %s_for_redo_worker_idx ON public.%s (executor_id) WHERE status <> 1',
table_name, table_name
);
EXECUTE format(
'CREATE INDEX %s_project_id_status_index ON public.%s ((project_id::text), status)',
table_name, table_name
);
EXECUTE format(
'CREATE INDEX %s_status_not_finished ON public.%s (status) WHERE status <> 1',
table_name, table_name
);
EXECUTE format('CREATE INDEX %s_updated_at_desc_idx ON public.%s (updated_at DESC)', table_name, table_name);
EXECUTE format(
'CREATE INDEX %s_with_failures ON public.%s (failures_count) WHERE failures_count > 0',
table_name, table_name
);
END LOOP;
END;
$$;
-- next we create a procedure that can add the child partitions (one per day) to each of the operations tables
CREATE OR REPLACE PROCEDURE create_operations_partitions(
table_name TEXT,
start_date DATE,
end_date DATE
)
LANGUAGE plpgsql AS $$
DECLARE
partition_date DATE;
partition_name TEXT;
counter INT := 0; -- Counter to track the number of tables created in the current transaction
BEGIN
partition_date := start_date;
-- Create partitions in batches
WHILE partition_date < end_date LOOP
partition_name := format('%s_%s', table_name, to_char(partition_date,'YYYY_MM_DD'));
EXECUTE format(
'CREATE TABLE IF NOT EXISTS public.%s PARTITION OF public.%s
FOR VALUES FROM (''%s'') TO (''%s'')',
partition_name,
table_name,
partition_date,
partition_date + INTERVAL '1 day'
);
counter := counter + 1;
-- Commit and reset counter after every 100 partitions
IF counter >= 100 THEN
COMMIT;
counter := 0; -- Reset the counter
END IF;
-- Advance to the next day
partition_date := partition_date + INTERVAL '1 day';
END LOOP;
-- Final commit for remaining partitions
IF counter > 0 THEN
COMMIT;
END IF;
-- Insert synthetic rows into each partition
EXECUTE format(
'INSERT INTO %I (
project_id,
branch_id,
endpoint_id,
id,
status,
action,
created_at,
updated_at,
spec,
metadata,
executor_id,
failures_count
)
SELECT
''project_1'', -- project_id
''branch_1'', -- branch_id
''endpoint_1'', -- endpoint_id
''e8bba687-0df9-4291-bfcd-7d5f6aa7c158'', -- unique id
1, -- status
''SYNTHETIC_ACTION'', -- action
gs::timestamp + interval ''0 ms'', -- created_at
gs::timestamp + interval ''1 minute'', -- updated_at
''{"key": "value"}'', -- spec (JSONB)
''{"metadata_key": "metadata_value"}'', -- metadata (JSONB)
''executor_1'', -- executor_id
0 -- failures_count
FROM generate_series(%L, %L::DATE - INTERVAL ''1 day'', INTERVAL ''1 day'') AS gs',
table_name, start_date, end_date
);
-- Commit the inserted rows
COMMIT;
END;
$$;
-- we can now create partitioned tables using something like
-- CALL create_partitioned_tables('operations_scale_1000' ,10);
-- and we can create the child partitions for a table using something like
-- CALL create_operations_partitions(
-- 'operations_scale_1000_1',
-- '2000-01-01', -- Start date
-- ('2000-01-01'::DATE + INTERVAL '1 day' * 500)::DATE -- End date (start date + number of days)
-- );

View File

@@ -22,7 +22,7 @@ def gc_feedback_impl(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchma
"checkpoint_distance": f"{1024 ** 2}",
"compaction_target_size": f"{1024 ** 2}",
# set PITR interval to be small, so we can do GC
"pitr_interval": "10 s",
"pitr_interval": "60 s",
# "compaction_threshold": "3",
# "image_creation_threshold": "2",
}
@@ -32,7 +32,6 @@ def gc_feedback_impl(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchma
n_steps = 10
n_update_iters = 100
step_size = 10000
branch_created = 0
with endpoint.cursor() as cur:
cur.execute("SET statement_timeout='1000s'")
cur.execute(
@@ -67,7 +66,6 @@ def gc_feedback_impl(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchma
if mode == "with_snapshots":
if step == n_steps / 2:
env.create_branch("child")
branch_created += 1
max_num_of_deltas_above_image = 0
max_total_num_of_deltas = 0
@@ -144,15 +142,6 @@ def gc_feedback_impl(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchma
with layer_map_path.open("w") as f:
f.write(json.dumps(client.timeline_layer_map_info(tenant_id, timeline_id)))
# We should have collected all garbage
if mode == "normal":
# in theory we should get physical size ~= logical size, but given that gc interval is 10s,
# and the layer has indexes that might contribute to the fluctuation, we allow a small margin
# of 1 here, and the end ratio we are asserting is 1 (margin) + 1 (expected) = 2.
assert physical_size / logical_size < 2
elif mode == "with_snapshots":
assert physical_size / logical_size < (2 + branch_created)
@pytest.mark.timeout(10000)
def test_gc_feedback(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker):

View File

@@ -1,66 +0,0 @@
import os
from pathlib import Path
import pytest
from fixtures.compare_fixtures import RemoteCompare
from fixtures.log_helper import log
def get_num_relations(default: int = 1000) -> list[int]:
# We parametrize each run with scale specifying the number of wanted child partitions.
# Databases are pre-created and passed through BENCHMARK_CONNSTR env variable.
scales = os.getenv("TEST_NUM_RELATIONS", default=str(default))
rv = []
for s in scales.split(","):
scale = int(s)
rv.append(scale)
return rv
@pytest.mark.parametrize("num_relations", get_num_relations())
@pytest.mark.remote_cluster
def test_perf_many_relations(remote_compare: RemoteCompare, num_relations: int):
"""
Test creating many relations in a single database.
We use partitioned tables with child tables, indexes and constraints to have a realistic schema.
Also we include some common data types like text, uuid, timestamp, JSONB, etc.
see many_relations/create_many_relations.sql
"""
env = remote_compare
# prepare some base tables and the plpgsql procedures that we use to create the tables
sql_file = Path(__file__).parent / "many_relations" / "create_many_relations.sql"
env.pg_bin.run_capture(["psql", env.pg.connstr(), "-f", str(sql_file)])
num_parent_tables = num_relations // 500 + 1
log.info(f"Creating {num_relations} relations in {num_parent_tables} parent tables")
log.info(f"Creating {num_parent_tables} parent tables")
sql = f"CALL create_partitioned_tables('operations_scale_{num_relations}', {num_parent_tables})"
log.info(sql)
env.pg_bin.run_capture(["psql", env.pg.connstr(), "-c", sql])
current_table = 0
num_relations_remaining = num_relations
# now run and measure the actual relation creation
while num_relations_remaining > 0:
current_table += 1
parent_table_name = f"operations_scale_{num_relations}_{current_table}"
if num_relations_remaining > 500:
num_relations_to_create = 500
else:
num_relations_to_create = num_relations_remaining
num_relations_remaining -= num_relations_to_create
log.info(
f"Creating {num_relations_to_create} child tables in partitioned parent table '{parent_table_name}'"
)
sql = f"CALL create_operations_partitions( '{parent_table_name}', '2000-01-01', ('2000-01-01'::DATE + INTERVAL '1 day' * {num_relations_to_create})::DATE)"
log.info(sql)
with env.zenbenchmark.record_duration(
f"CREATE_TABLE/{current_table}/{num_relations_to_create}"
):
env.pg_bin.run_capture(
["psql", env.pg.connstr(options="-cstatement_timeout=1000s "), "-c", sql]
)

View File

@@ -134,10 +134,6 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder):
}
env = neon_env_builder.init_start(initial_tenant_conf=SMOKE_CONF)
env.pageserver.allowed_errors.append(
r".*failed to acquire partition lock during gc-compaction.*"
)
env.pageserver.allowed_errors.append(r".*repartition() called concurrently.*")
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
@@ -176,12 +172,6 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder):
workload.churn_rows(row_count, env.pageserver.id)
def compaction_finished():
queue_depth = len(ps_http.timeline_compact_info(tenant_id, timeline_id))
assert queue_depth == 0
wait_until(compaction_finished, timeout=60)
# ensure gc_compaction is scheduled and it's actually running (instead of skipping due to no layers picked)
env.pageserver.assert_log_contains(
"scheduled_compact_timeline.*picked .* layers for compaction"

View File

@@ -84,8 +84,6 @@ def test_pgdata_import_smoke(
elif rel_block_size == RelBlockSize.TWO_STRPES_PER_SHARD:
target_relblock_size = (shard_count or 1) * stripe_size * 8192 * 2
elif rel_block_size == RelBlockSize.MULTIPLE_RELATION_SEGMENTS:
# Postgres uses a 1GiB segment size, fixed at compile time, so we must use >2GB of data
# to exercise multiple segments.
target_relblock_size = int(((2.333 * 1024 * 1024 * 1024) // 8192) * 8192)
else:
raise ValueError
@@ -113,15 +111,9 @@ def test_pgdata_import_smoke(
def validate_vanilla_equivalence(ep):
# TODO: would be nicer to just compare pgdump
# Enable IO concurrency for batching on large sequential scan, to avoid making
# this test unnecessarily onerous on CPU
assert ep.safe_psql_many(
[
"set effective_io_concurrency=32;",
"select count(*), sum(data::bigint)::bigint from t",
]
) == [[], [(expect_nrows, expect_sum)]]
assert ep.safe_psql("select count(*), sum(data::bigint)::bigint from t") == [
(expect_nrows, expect_sum)
]
validate_vanilla_equivalence(vanilla_pg)

View File

@@ -22,10 +22,7 @@ CHECKPOINT_TIMEOUT_SECONDS = 60
async def run_worker_for_tenant(
env: NeonEnv,
entries: int,
tenant: TenantId,
offset: int | None = None,
env: NeonEnv, entries: int, tenant: TenantId, offset: int | None = None
) -> Lsn:
if offset is None:
offset = 0
@@ -40,20 +37,12 @@ async def run_worker_for_tenant(
finally:
await conn.close(timeout=10)
loop = asyncio.get_running_loop()
sql = await loop.run_in_executor(
None, lambda ep: ep.safe_psql("SELECT pg_current_wal_flush_lsn()"), ep
)
last_flush_lsn = Lsn(sql[0][0])
last_flush_lsn = Lsn(ep.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
return last_flush_lsn
async def run_worker(env: NeonEnv, tenant_conf, entries: int) -> tuple[TenantId, TimelineId, Lsn]:
loop = asyncio.get_running_loop()
# capture tenant_conf by specifying `tenant_conf=tenant_conf`, otherwise it will be evaluated to some random value
tenant, timeline = await loop.run_in_executor(
None, lambda tenant_conf, env: env.create_tenant(conf=tenant_conf), tenant_conf, env
)
tenant, timeline = env.create_tenant(conf=tenant_conf)
last_flush_lsn = await run_worker_for_tenant(env, entries, tenant)
return tenant, timeline, last_flush_lsn

View File

@@ -2,7 +2,7 @@ from __future__ import annotations
import time
from fixtures.neon_fixtures import NeonEnv, logical_replication_sync, wait_replica_caughtup
from fixtures.neon_fixtures import NeonEnv, logical_replication_sync
def test_physical_and_logical_replication_slot_not_copied(neon_simple_env: NeonEnv, vanilla_pg):
@@ -38,8 +38,6 @@ def test_physical_and_logical_replication_slot_not_copied(neon_simple_env: NeonE
for pk in range(n_records):
p_cur.execute("insert into t (pk) values (%s)", (pk,))
wait_replica_caughtup(primary, secondary)
s_cur.execute("select count(*) from t")
assert s_cur.fetchall()[0][0] == n_records

View File

@@ -11,7 +11,6 @@ from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineArchival
from fixtures.compute_reconfigure import ComputeReconfigure
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
DEFAULT_AZ_ID,
NeonEnv,
NeonEnvBuilder,
StorageControllerApiException,
@@ -794,7 +793,6 @@ def test_sharding_split_stripe_size(
"tenant_id": str(env.initial_tenant),
"stripe_size": None,
"shards": [{"node_id": int(env.pageservers[0].id), "shard_number": 0}],
"preferred_az": DEFAULT_AZ_ID,
}
assert notifications[0] == expect
@@ -814,7 +812,6 @@ def test_sharding_split_stripe_size(
{"node_id": int(env.pageservers[0].id), "shard_number": 0},
{"node_id": int(env.pageservers[0].id), "shard_number": 1},
],
"preferred_az": DEFAULT_AZ_ID,
}
log.info(f"Got notification: {notifications[1]}")
assert notifications[1] == expect_after

View File

@@ -16,7 +16,6 @@ from fixtures.common_types import TenantId, TenantShardId, TimelineId
from fixtures.compute_reconfigure import ComputeReconfigure
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
DEFAULT_AZ_ID,
NeonEnv,
NeonEnvBuilder,
NeonPageserver,
@@ -600,7 +599,6 @@ def test_storage_controller_compute_hook(
"tenant_id": str(env.initial_tenant),
"stripe_size": None,
"shards": [{"node_id": int(env.pageservers[0].id), "shard_number": 0}],
"preferred_az": DEFAULT_AZ_ID,
}
assert notifications[0] == expect
@@ -618,7 +616,6 @@ def test_storage_controller_compute_hook(
"tenant_id": str(env.initial_tenant),
"stripe_size": None,
"shards": [{"node_id": int(env.pageservers[1].id), "shard_number": 0}],
"preferred_az": DEFAULT_AZ_ID,
}
def received_migration_notification():
@@ -646,7 +643,6 @@ def test_storage_controller_compute_hook(
{"node_id": int(env.pageservers[1].id), "shard_number": 0},
{"node_id": int(env.pageservers[1].id), "shard_number": 1},
],
"preferred_az": DEFAULT_AZ_ID,
}
def received_split_notification():
@@ -718,7 +714,6 @@ def test_storage_controller_stuck_compute_hook(
"tenant_id": str(env.initial_tenant),
"stripe_size": None,
"shards": [{"node_id": int(env.pageservers[0].id), "shard_number": 0}],
"preferred_az": DEFAULT_AZ_ID,
}
assert notifications[0] == expect
@@ -3009,7 +3004,7 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
def eq_safekeeper_records(a: dict[str, Any], b: dict[str, Any]) -> bool:
compared = [dict(a), dict(b)]
masked_keys = ["created_at", "updated_at", "active"]
masked_keys = ["created_at", "updated_at"]
for d in compared:
# keep deleting these in case we are comparing the body as it will be uploaded by real scripts

View File

@@ -1,7 +1,7 @@
{
"v17": [
"17.2",
"7e3f3974bc8895938308f94d0e96879ffae638cd"
"65c4e46baf56ec05412c7dd63d62faff0b33dcfb"
],
"v16": [
"16.6",