mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 10:22:56 +00:00
Compare commits
22 Commits
release-pr
...
sharnoff/b
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8b11e3bc9c | ||
|
|
9c53b41245 | ||
|
|
197a89ab3d | ||
|
|
b89e02f3e8 | ||
|
|
04517c6ff3 | ||
|
|
628451d68e | ||
|
|
502d512fe2 | ||
|
|
afda6d4700 | ||
|
|
65042cbadd | ||
|
|
b135194090 | ||
|
|
43dc03459d | ||
|
|
a1b0558493 | ||
|
|
cc138b56f9 | ||
|
|
61fcf64c22 | ||
|
|
6d3e8096fc | ||
|
|
3d1c3a80ae | ||
|
|
835287ba3a | ||
|
|
d63602cc78 | ||
|
|
1668d39b7c | ||
|
|
1d12efc428 | ||
|
|
85696297c5 | ||
|
|
aaf980f70d |
25
.github/workflows/benchmarking.yml
vendored
25
.github/workflows/benchmarking.yml
vendored
@@ -308,6 +308,7 @@ jobs:
|
||||
"image": [ "'"$image_default"'" ],
|
||||
"include": [{ "pg_version": 16, "region_id": "'"$region_id_default"'", "platform": "neonvm-captest-freetier", "db_size": "3gb" ,"runner": '"$runner_default"', "image": "'"$image_default"'" },
|
||||
{ "pg_version": 16, "region_id": "'"$region_id_default"'", "platform": "neonvm-captest-new", "db_size": "10gb","runner": '"$runner_default"', "image": "'"$image_default"'" },
|
||||
{ "pg_version": 16, "region_id": "'"$region_id_default"'", "platform": "neonvm-captest-new-many-tables","db_size": "10gb","runner": '"$runner_default"', "image": "'"$image_default"'" },
|
||||
{ "pg_version": 16, "region_id": "'"$region_id_default"'", "platform": "neonvm-captest-new", "db_size": "50gb","runner": '"$runner_default"', "image": "'"$image_default"'" },
|
||||
{ "pg_version": 16, "region_id": "azure-eastus2", "platform": "neonvm-azure-captest-freetier", "db_size": "3gb" ,"runner": '"$runner_azure"', "image": "neondatabase/build-tools:pinned-bookworm" },
|
||||
{ "pg_version": 16, "region_id": "azure-eastus2", "platform": "neonvm-azure-captest-new", "db_size": "10gb","runner": '"$runner_azure"', "image": "neondatabase/build-tools:pinned-bookworm" },
|
||||
@@ -410,7 +411,7 @@ jobs:
|
||||
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
|
||||
|
||||
- name: Create Neon Project
|
||||
if: contains(fromJson('["neonvm-captest-new", "neonvm-captest-freetier", "neonvm-azure-captest-freetier", "neonvm-azure-captest-new"]'), matrix.platform)
|
||||
if: contains(fromJson('["neonvm-captest-new", "neonvm-captest-new-many-tables", "neonvm-captest-freetier", "neonvm-azure-captest-freetier", "neonvm-azure-captest-new"]'), matrix.platform)
|
||||
id: create-neon-project
|
||||
uses: ./.github/actions/neon-project-create
|
||||
with:
|
||||
@@ -429,7 +430,7 @@ jobs:
|
||||
neonvm-captest-sharding-reuse)
|
||||
CONNSTR=${{ secrets.BENCHMARK_CAPTEST_SHARDING_CONNSTR }}
|
||||
;;
|
||||
neonvm-captest-new | neonvm-captest-freetier | neonvm-azure-captest-new | neonvm-azure-captest-freetier)
|
||||
neonvm-captest-new | neonvm-captest-new-many-tables | neonvm-captest-freetier | neonvm-azure-captest-new | neonvm-azure-captest-freetier)
|
||||
CONNSTR=${{ steps.create-neon-project.outputs.dsn }}
|
||||
;;
|
||||
rds-aurora)
|
||||
@@ -446,6 +447,26 @@ jobs:
|
||||
|
||||
echo "connstr=${CONNSTR}" >> $GITHUB_OUTPUT
|
||||
|
||||
# we want to compare Neon project OLTP throughput and latency at scale factor 10 GB
|
||||
# without (neonvm-captest-new)
|
||||
# and with (neonvm-captest-new-many-tables) many relations in the database
|
||||
- name: Create many relations before the run
|
||||
if: contains(fromJson('["neonvm-captest-new-many-tables"]'), matrix.platform)
|
||||
uses: ./.github/actions/run-python-test-set
|
||||
with:
|
||||
build_type: ${{ env.BUILD_TYPE }}
|
||||
test_selection: performance
|
||||
run_in_parallel: false
|
||||
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
|
||||
extra_params: -m remote_cluster --timeout 21600 -k test_perf_many_relations
|
||||
pg_version: ${{ env.DEFAULT_PG_VERSION }}
|
||||
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
|
||||
env:
|
||||
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
|
||||
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
|
||||
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
|
||||
TEST_NUM_RELATIONS: 10000
|
||||
|
||||
- name: Benchmark init
|
||||
uses: ./.github/actions/run-python-test-set
|
||||
with:
|
||||
|
||||
@@ -1556,28 +1556,30 @@ RUN apt update && \
|
||||
locales \
|
||||
procps \
|
||||
ca-certificates \
|
||||
curl \
|
||||
unzip \
|
||||
$VERSION_INSTALLS && \
|
||||
apt clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \
|
||||
localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8
|
||||
|
||||
# s5cmd 2.2.2 from https://github.com/peak/s5cmd/releases/tag/v2.2.2
|
||||
# used by fast_import
|
||||
# aws cli is used by fast_import (curl and unzip above are at this time only used for this installation step)
|
||||
ARG TARGETARCH
|
||||
ADD https://github.com/peak/s5cmd/releases/download/v2.2.2/s5cmd_2.2.2_linux_$TARGETARCH.deb /tmp/s5cmd.deb
|
||||
RUN set -ex; \
|
||||
\
|
||||
# Determine the expected checksum based on TARGETARCH
|
||||
if [ "${TARGETARCH}" = "amd64" ]; then \
|
||||
CHECKSUM="392c385320cd5ffa435759a95af77c215553d967e4b1c0fffe52e4f14c29cf85"; \
|
||||
TARGETARCH_ALT="x86_64"; \
|
||||
CHECKSUM="c9a9df3770a3ff9259cb469b6179e02829687a464e0824d5c32d378820b53a00"; \
|
||||
elif [ "${TARGETARCH}" = "arm64" ]; then \
|
||||
CHECKSUM="939bee3cf4b5604ddb00e67f8c157b91d7c7a5b553d1fbb6890fad32894b7b46"; \
|
||||
TARGETARCH_ALT="aarch64"; \
|
||||
CHECKSUM="8181730be7891582b38b028112e81b4899ca817e8c616aad807c9e9d1289223a"; \
|
||||
else \
|
||||
echo "Unsupported architecture: ${TARGETARCH}"; exit 1; \
|
||||
fi; \
|
||||
\
|
||||
# Compute and validate the checksum
|
||||
echo "${CHECKSUM} /tmp/s5cmd.deb" | sha256sum -c -
|
||||
RUN dpkg -i /tmp/s5cmd.deb && rm /tmp/s5cmd.deb
|
||||
curl -L "https://awscli.amazonaws.com/awscli-exe-linux-${TARGETARCH_ALT}-2.17.5.zip" -o /tmp/awscliv2.zip; \
|
||||
echo "${CHECKSUM} /tmp/awscliv2.zip" | sha256sum -c -; \
|
||||
unzip /tmp/awscliv2.zip -d /tmp/awscliv2; \
|
||||
/tmp/awscliv2/aws/install; \
|
||||
rm -rf /tmp/awscliv2.zip /tmp/awscliv2; \
|
||||
true
|
||||
|
||||
ENV LANG=en_US.utf8
|
||||
USER postgres
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
-- NOTE: This is the "internal" / "machine-readable" version. This outputs the
|
||||
-- working set size looking back 1..60 minutes, labeled with the number of
|
||||
-- working set size looking back 1..180 minutes, labeled with the number of
|
||||
-- minutes.
|
||||
|
||||
SELECT
|
||||
x::text as duration_seconds,
|
||||
neon.approximate_working_set_size_seconds(x) AS size
|
||||
FROM (SELECT generate_series * 60 AS x FROM generate_series(1, 60)) AS t (x);
|
||||
FROM (SELECT generate_series * 60 AS x FROM generate_series(1, 180)) AS t (x);
|
||||
|
||||
@@ -4,5 +4,5 @@
|
||||
SELECT
|
||||
x AS duration,
|
||||
neon.approximate_working_set_size_seconds(extract('epoch' FROM x::interval)::int) AS size FROM (
|
||||
VALUES ('5m'), ('15m'), ('1h')
|
||||
VALUES ('5m'), ('15m'), ('1h'), ('3h')
|
||||
) AS t (x);
|
||||
|
||||
@@ -34,12 +34,12 @@ use nix::unistd::Pid;
|
||||
use tracing::{info, info_span, warn, Instrument};
|
||||
use utils::fs_ext::is_directory_empty;
|
||||
|
||||
#[path = "fast_import/aws_s3_sync.rs"]
|
||||
mod aws_s3_sync;
|
||||
#[path = "fast_import/child_stdio_to_log.rs"]
|
||||
mod child_stdio_to_log;
|
||||
#[path = "fast_import/s3_uri.rs"]
|
||||
mod s3_uri;
|
||||
#[path = "fast_import/s5cmd.rs"]
|
||||
mod s5cmd;
|
||||
|
||||
#[derive(clap::Parser)]
|
||||
struct Args {
|
||||
@@ -326,7 +326,7 @@ pub(crate) async fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
|
||||
info!("upload pgdata");
|
||||
s5cmd::sync(Utf8Path::new(&pgdata_dir), &s3_prefix.append("/"))
|
||||
aws_s3_sync::sync(Utf8Path::new(&pgdata_dir), &s3_prefix.append("/pgdata/"))
|
||||
.await
|
||||
.context("sync dump directory to destination")?;
|
||||
|
||||
@@ -334,10 +334,10 @@ pub(crate) async fn main() -> anyhow::Result<()> {
|
||||
{
|
||||
let status_dir = working_directory.join("status");
|
||||
std::fs::create_dir(&status_dir).context("create status directory")?;
|
||||
let status_file = status_dir.join("status");
|
||||
let status_file = status_dir.join("pgdata");
|
||||
std::fs::write(&status_file, serde_json::json!({"done": true}).to_string())
|
||||
.context("write status file")?;
|
||||
s5cmd::sync(&status_file, &s3_prefix.append("/status/pgdata"))
|
||||
aws_s3_sync::sync(&status_dir, &s3_prefix.append("/status/"))
|
||||
.await
|
||||
.context("sync status directory to destination")?;
|
||||
}
|
||||
|
||||
@@ -4,24 +4,21 @@ use camino::Utf8Path;
|
||||
use super::s3_uri::S3Uri;
|
||||
|
||||
pub(crate) async fn sync(local: &Utf8Path, remote: &S3Uri) -> anyhow::Result<()> {
|
||||
let mut builder = tokio::process::Command::new("s5cmd");
|
||||
// s5cmd uses aws-sdk-go v1, hence doesn't support AWS_ENDPOINT_URL
|
||||
if let Some(val) = std::env::var_os("AWS_ENDPOINT_URL") {
|
||||
builder.arg("--endpoint-url").arg(val);
|
||||
}
|
||||
let mut builder = tokio::process::Command::new("aws");
|
||||
builder
|
||||
.arg("s3")
|
||||
.arg("sync")
|
||||
.arg(local.as_str())
|
||||
.arg(remote.to_string());
|
||||
let st = builder
|
||||
.spawn()
|
||||
.context("spawn s5cmd")?
|
||||
.context("spawn aws s3 sync")?
|
||||
.wait()
|
||||
.await
|
||||
.context("wait for s5cmd")?;
|
||||
.context("wait for aws s3 sync")?;
|
||||
if st.success() {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(anyhow::anyhow!("s5cmd failed"))
|
||||
Err(anyhow::anyhow!("aws s3 sync failed"))
|
||||
}
|
||||
}
|
||||
@@ -19,6 +19,7 @@ use control_plane::storage_controller::{
|
||||
NeonStorageControllerStartArgs, NeonStorageControllerStopArgs, StorageController,
|
||||
};
|
||||
use control_plane::{broker, local_env};
|
||||
use nix::fcntl::{flock, FlockArg};
|
||||
use pageserver_api::config::{
|
||||
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
|
||||
DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT,
|
||||
@@ -36,6 +37,8 @@ use safekeeper_api::{
|
||||
};
|
||||
use std::borrow::Cow;
|
||||
use std::collections::{BTreeSet, HashMap};
|
||||
use std::fs::File;
|
||||
use std::os::fd::AsRawFd;
|
||||
use std::path::PathBuf;
|
||||
use std::process::exit;
|
||||
use std::str::FromStr;
|
||||
@@ -689,6 +692,21 @@ struct TimelineTreeEl {
|
||||
pub children: BTreeSet<TimelineId>,
|
||||
}
|
||||
|
||||
/// A flock-based guard over the neon_local repository directory
|
||||
struct RepoLock {
|
||||
_file: File,
|
||||
}
|
||||
|
||||
impl RepoLock {
|
||||
fn new() -> Result<Self> {
|
||||
let repo_dir = File::open(local_env::base_path())?;
|
||||
let repo_dir_fd = repo_dir.as_raw_fd();
|
||||
flock(repo_dir_fd, FlockArg::LockExclusive)?;
|
||||
|
||||
Ok(Self { _file: repo_dir })
|
||||
}
|
||||
}
|
||||
|
||||
// Main entry point for the 'neon_local' CLI utility
|
||||
//
|
||||
// This utility helps to manage neon installation. That includes following:
|
||||
@@ -700,9 +718,14 @@ fn main() -> Result<()> {
|
||||
let cli = Cli::parse();
|
||||
|
||||
// Check for 'neon init' command first.
|
||||
let subcommand_result = if let NeonLocalCmd::Init(args) = cli.command {
|
||||
handle_init(&args).map(|env| Some(Cow::Owned(env)))
|
||||
let (subcommand_result, _lock) = if let NeonLocalCmd::Init(args) = cli.command {
|
||||
(handle_init(&args).map(|env| Some(Cow::Owned(env))), None)
|
||||
} else {
|
||||
// This tool uses a collection of simple files to store its state, and consequently
|
||||
// it is not generally safe to run multiple commands concurrently. Rather than expect
|
||||
// all callers to know this, use a lock file to protect against concurrent execution.
|
||||
let _repo_lock = RepoLock::new().unwrap();
|
||||
|
||||
// all other commands need an existing config
|
||||
let env = LocalEnv::load_config(&local_env::base_path()).context("Error loading config")?;
|
||||
let original_env = env.clone();
|
||||
@@ -728,11 +751,12 @@ fn main() -> Result<()> {
|
||||
NeonLocalCmd::Mappings(subcmd) => handle_mappings(&subcmd, env),
|
||||
};
|
||||
|
||||
if &original_env != env {
|
||||
let subcommand_result = if &original_env != env {
|
||||
subcommand_result.map(|()| Some(Cow::Borrowed(env)))
|
||||
} else {
|
||||
subcommand_result.map(|()| None)
|
||||
}
|
||||
};
|
||||
(subcommand_result, Some(_repo_lock))
|
||||
};
|
||||
|
||||
match subcommand_result {
|
||||
@@ -922,7 +946,7 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
|
||||
} else {
|
||||
// User (likely interactive) did not provide a description of the environment, give them the default
|
||||
NeonLocalInitConf {
|
||||
control_plane_api: Some(Some(DEFAULT_PAGESERVER_CONTROL_PLANE_API.parse().unwrap())),
|
||||
control_plane_api: Some(DEFAULT_PAGESERVER_CONTROL_PLANE_API.parse().unwrap()),
|
||||
broker: NeonBroker {
|
||||
listen_addr: DEFAULT_BROKER_ADDR.parse().unwrap(),
|
||||
},
|
||||
@@ -1718,18 +1742,15 @@ async fn handle_start_all_impl(
|
||||
broker::start_broker_process(env, &retry_timeout).await
|
||||
});
|
||||
|
||||
// Only start the storage controller if the pageserver is configured to need it
|
||||
if env.control_plane_api.is_some() {
|
||||
js.spawn(async move {
|
||||
let storage_controller = StorageController::from_env(env);
|
||||
storage_controller
|
||||
.start(NeonStorageControllerStartArgs::with_default_instance_id(
|
||||
retry_timeout,
|
||||
))
|
||||
.await
|
||||
.map_err(|e| e.context("start storage_controller"))
|
||||
});
|
||||
}
|
||||
js.spawn(async move {
|
||||
let storage_controller = StorageController::from_env(env);
|
||||
storage_controller
|
||||
.start(NeonStorageControllerStartArgs::with_default_instance_id(
|
||||
retry_timeout,
|
||||
))
|
||||
.await
|
||||
.map_err(|e| e.context("start storage_controller"))
|
||||
});
|
||||
|
||||
for ps_conf in &env.pageservers {
|
||||
js.spawn(async move {
|
||||
@@ -1774,10 +1795,6 @@ async fn neon_start_status_check(
|
||||
const RETRY_INTERVAL: Duration = Duration::from_millis(100);
|
||||
const NOTICE_AFTER_RETRIES: Duration = Duration::from_secs(5);
|
||||
|
||||
if env.control_plane_api.is_none() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let storcon = StorageController::from_env(env);
|
||||
|
||||
let retries = retry_timeout.as_millis() / RETRY_INTERVAL.as_millis();
|
||||
|
||||
@@ -316,6 +316,10 @@ impl Endpoint {
|
||||
// and can cause errors like 'no unpinned buffers available', see
|
||||
// <https://github.com/neondatabase/neon/issues/9956>
|
||||
conf.append("shared_buffers", "1MB");
|
||||
// Postgres defaults to effective_io_concurrency=1, which does not exercise the pageserver's
|
||||
// batching logic. Set this to 2 so that we exercise the code a bit without letting
|
||||
// individual tests do a lot of concurrent work on underpowered test machines
|
||||
conf.append("effective_io_concurrency", "2");
|
||||
conf.append("fsync", "off");
|
||||
conf.append("max_connections", "100");
|
||||
conf.append("wal_level", "logical");
|
||||
|
||||
@@ -76,7 +76,7 @@ pub struct LocalEnv {
|
||||
|
||||
// Control plane upcall API for pageserver: if None, we will not run storage_controller If set, this will
|
||||
// be propagated into each pageserver's configuration.
|
||||
pub control_plane_api: Option<Url>,
|
||||
pub control_plane_api: Url,
|
||||
|
||||
// Control plane upcall API for storage controller. If set, this will be propagated into the
|
||||
// storage controller's configuration.
|
||||
@@ -133,7 +133,7 @@ pub struct NeonLocalInitConf {
|
||||
pub storage_controller: Option<NeonStorageControllerConf>,
|
||||
pub pageservers: Vec<NeonLocalInitPageserverConf>,
|
||||
pub safekeepers: Vec<SafekeeperConf>,
|
||||
pub control_plane_api: Option<Option<Url>>,
|
||||
pub control_plane_api: Option<Url>,
|
||||
pub control_plane_compute_hook_api: Option<Option<Url>>,
|
||||
}
|
||||
|
||||
@@ -180,7 +180,7 @@ impl NeonStorageControllerConf {
|
||||
const DEFAULT_MAX_WARMING_UP_INTERVAL: std::time::Duration = std::time::Duration::from_secs(30);
|
||||
|
||||
// Very tight heartbeat interval to speed up tests
|
||||
const DEFAULT_HEARTBEAT_INTERVAL: std::time::Duration = std::time::Duration::from_millis(100);
|
||||
const DEFAULT_HEARTBEAT_INTERVAL: std::time::Duration = std::time::Duration::from_millis(1000);
|
||||
}
|
||||
|
||||
impl Default for NeonStorageControllerConf {
|
||||
@@ -535,7 +535,7 @@ impl LocalEnv {
|
||||
storage_controller,
|
||||
pageservers,
|
||||
safekeepers,
|
||||
control_plane_api,
|
||||
control_plane_api: control_plane_api.unwrap(),
|
||||
control_plane_compute_hook_api,
|
||||
branch_name_mappings,
|
||||
}
|
||||
@@ -638,7 +638,7 @@ impl LocalEnv {
|
||||
storage_controller: self.storage_controller.clone(),
|
||||
pageservers: vec![], // it's skip_serializing anyway
|
||||
safekeepers: self.safekeepers.clone(),
|
||||
control_plane_api: self.control_plane_api.clone(),
|
||||
control_plane_api: Some(self.control_plane_api.clone()),
|
||||
control_plane_compute_hook_api: self.control_plane_compute_hook_api.clone(),
|
||||
branch_name_mappings: self.branch_name_mappings.clone(),
|
||||
},
|
||||
@@ -768,7 +768,7 @@ impl LocalEnv {
|
||||
storage_controller: storage_controller.unwrap_or_default(),
|
||||
pageservers: pageservers.iter().map(Into::into).collect(),
|
||||
safekeepers,
|
||||
control_plane_api: control_plane_api.unwrap_or_default(),
|
||||
control_plane_api: control_plane_api.unwrap(),
|
||||
control_plane_compute_hook_api: control_plane_compute_hook_api.unwrap_or_default(),
|
||||
branch_name_mappings: Default::default(),
|
||||
};
|
||||
|
||||
@@ -95,21 +95,19 @@ impl PageServerNode {
|
||||
|
||||
let mut overrides = vec![pg_distrib_dir_param, broker_endpoint_param];
|
||||
|
||||
if let Some(control_plane_api) = &self.env.control_plane_api {
|
||||
overrides.push(format!(
|
||||
"control_plane_api='{}'",
|
||||
control_plane_api.as_str()
|
||||
));
|
||||
overrides.push(format!(
|
||||
"control_plane_api='{}'",
|
||||
self.env.control_plane_api.as_str()
|
||||
));
|
||||
|
||||
// Storage controller uses the same auth as pageserver: if JWT is enabled
|
||||
// for us, we will also need it to talk to them.
|
||||
if matches!(conf.http_auth_type, AuthType::NeonJWT) {
|
||||
let jwt_token = self
|
||||
.env
|
||||
.generate_auth_token(&Claims::new(None, Scope::GenerationsApi))
|
||||
.unwrap();
|
||||
overrides.push(format!("control_plane_api_token='{}'", jwt_token));
|
||||
}
|
||||
// Storage controller uses the same auth as pageserver: if JWT is enabled
|
||||
// for us, we will also need it to talk to them.
|
||||
if matches!(conf.http_auth_type, AuthType::NeonJWT) {
|
||||
let jwt_token = self
|
||||
.env
|
||||
.generate_auth_token(&Claims::new(None, Scope::GenerationsApi))
|
||||
.unwrap();
|
||||
overrides.push(format!("control_plane_api_token='{}'", jwt_token));
|
||||
}
|
||||
|
||||
if !conf.other.contains_key("remote_storage") {
|
||||
|
||||
@@ -338,7 +338,7 @@ impl StorageController {
|
||||
.port(),
|
||||
)
|
||||
} else {
|
||||
let listen_url = self.env.control_plane_api.clone().unwrap();
|
||||
let listen_url = self.env.control_plane_api.clone();
|
||||
|
||||
let listen = format!(
|
||||
"{}:{}",
|
||||
@@ -708,7 +708,7 @@ impl StorageController {
|
||||
} else {
|
||||
// The configured URL has the /upcall path prefix for pageservers to use: we will strip that out
|
||||
// for general purpose API access.
|
||||
let listen_url = self.env.control_plane_api.clone().unwrap();
|
||||
let listen_url = self.env.control_plane_api.clone();
|
||||
Url::from_str(&format!(
|
||||
"http://{}:{}/{path}",
|
||||
listen_url.host_str().unwrap(),
|
||||
|
||||
@@ -5,7 +5,8 @@ use clap::{Parser, Subcommand};
|
||||
use pageserver_api::{
|
||||
controller_api::{
|
||||
AvailabilityZone, NodeAvailabilityWrapper, NodeDescribeResponse, NodeShardResponse,
|
||||
ShardSchedulingPolicy, TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest,
|
||||
SafekeeperDescribeResponse, ShardSchedulingPolicy, TenantCreateRequest,
|
||||
TenantDescribeResponse, TenantPolicyRequest,
|
||||
},
|
||||
models::{
|
||||
EvictionPolicy, EvictionPolicyLayerAccessThreshold, LocationConfigSecondary,
|
||||
@@ -211,6 +212,8 @@ enum Command {
|
||||
#[arg(long)]
|
||||
timeout: humantime::Duration,
|
||||
},
|
||||
/// List safekeepers known to the storage controller
|
||||
Safekeepers {},
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
@@ -1020,6 +1023,31 @@ async fn main() -> anyhow::Result<()> {
|
||||
"Fill was cancelled for node {node_id}. Schedulling policy is now {final_policy:?}"
|
||||
);
|
||||
}
|
||||
Command::Safekeepers {} => {
|
||||
let mut resp = storcon_client
|
||||
.dispatch::<(), Vec<SafekeeperDescribeResponse>>(
|
||||
Method::GET,
|
||||
"control/v1/safekeeper".to_string(),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
resp.sort_by(|a, b| a.id.cmp(&b.id));
|
||||
|
||||
let mut table = comfy_table::Table::new();
|
||||
table.set_header(["Id", "Version", "Host", "Port", "Http Port", "AZ Id"]);
|
||||
for sk in resp {
|
||||
table.add_row([
|
||||
format!("{}", sk.id),
|
||||
format!("{}", sk.version),
|
||||
sk.host,
|
||||
format!("{}", sk.port),
|
||||
format!("{}", sk.http_port),
|
||||
sk.availability_zone_id.to_string(),
|
||||
]);
|
||||
}
|
||||
println!("{table}");
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -372,6 +372,23 @@ pub struct MetadataHealthListOutdatedResponse {
|
||||
pub health_records: Vec<MetadataHealthRecord>,
|
||||
}
|
||||
|
||||
/// Publicly exposed safekeeper description
|
||||
///
|
||||
/// The `active` flag which we have in the DB is not included on purpose: it is deprecated.
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub struct SafekeeperDescribeResponse {
|
||||
pub id: NodeId,
|
||||
pub region_id: String,
|
||||
/// 1 is special, it means just created (not currently posted to storcon).
|
||||
/// Zero or negative is not really expected.
|
||||
/// Otherwise the number from `release-$(number_of_commits_on_branch)` tag.
|
||||
pub version: i64,
|
||||
pub host: String,
|
||||
pub port: i32,
|
||||
pub http_port: i32,
|
||||
pub availability_zone_id: String,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
||||
@@ -6,6 +6,7 @@ pub mod utilization;
|
||||
use camino::Utf8PathBuf;
|
||||
pub use utilization::PageserverUtilization;
|
||||
|
||||
use core::ops::Range;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
fmt::Display,
|
||||
@@ -28,6 +29,7 @@ use utils::{
|
||||
};
|
||||
|
||||
use crate::{
|
||||
key::Key,
|
||||
reltag::RelTag,
|
||||
shard::{ShardCount, ShardStripeSize, TenantShardId},
|
||||
};
|
||||
@@ -210,6 +212,68 @@ pub enum TimelineState {
|
||||
Broken { reason: String, backtrace: String },
|
||||
}
|
||||
|
||||
#[serde_with::serde_as]
|
||||
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
|
||||
pub struct CompactLsnRange {
|
||||
pub start: Lsn,
|
||||
pub end: Lsn,
|
||||
}
|
||||
|
||||
#[serde_with::serde_as]
|
||||
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
|
||||
pub struct CompactKeyRange {
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
pub start: Key,
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
pub end: Key,
|
||||
}
|
||||
|
||||
impl From<Range<Lsn>> for CompactLsnRange {
|
||||
fn from(range: Range<Lsn>) -> Self {
|
||||
Self {
|
||||
start: range.start,
|
||||
end: range.end,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Range<Key>> for CompactKeyRange {
|
||||
fn from(range: Range<Key>) -> Self {
|
||||
Self {
|
||||
start: range.start,
|
||||
end: range.end,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<CompactLsnRange> for Range<Lsn> {
|
||||
fn from(range: CompactLsnRange) -> Self {
|
||||
range.start..range.end
|
||||
}
|
||||
}
|
||||
|
||||
impl From<CompactKeyRange> for Range<Key> {
|
||||
fn from(range: CompactKeyRange) -> Self {
|
||||
range.start..range.end
|
||||
}
|
||||
}
|
||||
|
||||
impl CompactLsnRange {
|
||||
pub fn above(lsn: Lsn) -> Self {
|
||||
Self {
|
||||
start: lsn,
|
||||
end: Lsn::MAX,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct CompactInfoResponse {
|
||||
pub compact_key_range: Option<CompactKeyRange>,
|
||||
pub compact_lsn_range: Option<CompactLsnRange>,
|
||||
pub sub_compaction: bool,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub struct TimelineCreateRequest {
|
||||
pub new_timeline_id: TimelineId,
|
||||
|
||||
@@ -106,11 +106,11 @@ impl<R: RecordGenerator> WalGenerator<R> {
|
||||
const TIMELINE_ID: u32 = 1;
|
||||
|
||||
/// Creates a new WAL generator with the given record generator.
|
||||
pub fn new(record_generator: R) -> WalGenerator<R> {
|
||||
pub fn new(record_generator: R, start_lsn: Lsn) -> WalGenerator<R> {
|
||||
Self {
|
||||
record_generator,
|
||||
lsn: Lsn(0),
|
||||
prev_lsn: Lsn(0),
|
||||
lsn: start_lsn,
|
||||
prev_lsn: start_lsn,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -97,8 +97,8 @@ use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError};
|
||||
use crate::DEFAULT_PG_VERSION;
|
||||
use crate::{disk_usage_eviction_task, tenant};
|
||||
use pageserver_api::models::{
|
||||
StatusResponse, TenantConfigRequest, TenantInfo, TimelineCreateRequest, TimelineGcRequest,
|
||||
TimelineInfo,
|
||||
CompactInfoResponse, StatusResponse, TenantConfigRequest, TenantInfo, TimelineCreateRequest,
|
||||
TimelineGcRequest, TimelineInfo,
|
||||
};
|
||||
use utils::{
|
||||
auth::SwappableJwtAuth,
|
||||
@@ -2039,6 +2039,34 @@ async fn timeline_cancel_compact_handler(
|
||||
.await
|
||||
}
|
||||
|
||||
// Get compact info of a timeline
|
||||
async fn timeline_compact_info_handler(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||
let state = get_state(&request);
|
||||
async {
|
||||
let tenant = state
|
||||
.tenant_manager
|
||||
.get_attached_tenant_shard(tenant_shard_id)?;
|
||||
let res = tenant.get_scheduled_compaction_tasks(timeline_id);
|
||||
let mut resp = Vec::new();
|
||||
for item in res {
|
||||
resp.push(CompactInfoResponse {
|
||||
compact_key_range: item.compact_key_range,
|
||||
compact_lsn_range: item.compact_lsn_range,
|
||||
sub_compaction: item.sub_compaction,
|
||||
});
|
||||
}
|
||||
json_response(StatusCode::OK, resp)
|
||||
}
|
||||
.instrument(info_span!("timeline_compact_info", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
|
||||
.await
|
||||
}
|
||||
|
||||
// Run compaction immediately on given timeline.
|
||||
async fn timeline_compact_handler(
|
||||
mut request: Request<Body>,
|
||||
@@ -3400,6 +3428,10 @@ pub fn make_router(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/do_gc",
|
||||
|r| api_handler(r, timeline_gc_handler),
|
||||
)
|
||||
.get(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/compact",
|
||||
|r| api_handler(r, timeline_compact_info_handler),
|
||||
)
|
||||
.put(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/compact",
|
||||
|r| api_handler(r, timeline_compact_handler),
|
||||
|
||||
@@ -3122,6 +3122,23 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_scheduled_compaction_tasks(
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
) -> Vec<CompactOptions> {
|
||||
use itertools::Itertools;
|
||||
let guard = self.scheduled_compaction_tasks.lock().unwrap();
|
||||
guard
|
||||
.get(&timeline_id)
|
||||
.map(|tline_pending_tasks| {
|
||||
tline_pending_tasks
|
||||
.iter()
|
||||
.map(|x| x.options.clone())
|
||||
.collect_vec()
|
||||
})
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
/// Schedule a compaction task for a timeline.
|
||||
pub(crate) async fn schedule_compaction(
|
||||
&self,
|
||||
@@ -5759,13 +5776,13 @@ mod tests {
|
||||
use timeline::{CompactOptions, DeltaLayerTestDesc};
|
||||
use utils::id::TenantId;
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
use models::CompactLsnRange;
|
||||
#[cfg(feature = "testing")]
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
#[cfg(feature = "testing")]
|
||||
use timeline::compaction::{KeyHistoryRetention, KeyLogAtLsn};
|
||||
#[cfg(feature = "testing")]
|
||||
use timeline::CompactLsnRange;
|
||||
#[cfg(feature = "testing")]
|
||||
use timeline::GcInfo;
|
||||
|
||||
static TEST_KEY: Lazy<Key> =
|
||||
@@ -9634,7 +9651,7 @@ mod tests {
|
||||
#[cfg(feature = "testing")]
|
||||
#[tokio::test]
|
||||
async fn test_simple_bottom_most_compaction_on_branch() -> anyhow::Result<()> {
|
||||
use timeline::CompactLsnRange;
|
||||
use models::CompactLsnRange;
|
||||
|
||||
let harness = TenantHarness::create("test_simple_bottom_most_compaction_on_branch").await?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
@@ -1,12 +1,15 @@
|
||||
use std::collections::BTreeSet;
|
||||
|
||||
use itertools::Itertools;
|
||||
use pageserver_compaction::helpers::overlaps_with;
|
||||
|
||||
use super::storage_layer::LayerName;
|
||||
|
||||
/// Checks whether a layer map is valid (i.e., is a valid result of the current compaction algorithm if nothing goes wrong).
|
||||
///
|
||||
/// The function checks if we can split the LSN range of a delta layer only at the LSNs of the delta layers. For example,
|
||||
/// The function implements a fast path check and a slow path check.
|
||||
///
|
||||
/// The fast path checks if we can split the LSN range of a delta layer only at the LSNs of the delta layers. For example,
|
||||
///
|
||||
/// ```plain
|
||||
/// | | | |
|
||||
@@ -25,31 +28,47 @@ use super::storage_layer::LayerName;
|
||||
/// | | | 4 | | |
|
||||
///
|
||||
/// If layer 2 and 4 contain the same single key, this is also a valid layer map.
|
||||
///
|
||||
/// However, if a partial compaction is still going on, it is possible that we get a layer map not satisfying the above condition.
|
||||
/// Therefore, we fallback to simply check if any of the two delta layers overlap. (See "A slow path...")
|
||||
pub fn check_valid_layermap(metadata: &[LayerName]) -> Option<String> {
|
||||
let mut lsn_split_point = BTreeSet::new(); // TODO: use a better data structure (range tree / range set?)
|
||||
let mut all_delta_layers = Vec::new();
|
||||
for name in metadata {
|
||||
if let LayerName::Delta(layer) = name {
|
||||
if layer.key_range.start.next() != layer.key_range.end {
|
||||
all_delta_layers.push(layer.clone());
|
||||
}
|
||||
all_delta_layers.push(layer.clone());
|
||||
}
|
||||
}
|
||||
for layer in &all_delta_layers {
|
||||
let lsn_range = &layer.lsn_range;
|
||||
lsn_split_point.insert(lsn_range.start);
|
||||
lsn_split_point.insert(lsn_range.end);
|
||||
if layer.key_range.start.next() != layer.key_range.end {
|
||||
let lsn_range = &layer.lsn_range;
|
||||
lsn_split_point.insert(lsn_range.start);
|
||||
lsn_split_point.insert(lsn_range.end);
|
||||
}
|
||||
}
|
||||
for layer in &all_delta_layers {
|
||||
for (idx, layer) in all_delta_layers.iter().enumerate() {
|
||||
if layer.key_range.start.next() == layer.key_range.end {
|
||||
continue;
|
||||
}
|
||||
let lsn_range = layer.lsn_range.clone();
|
||||
let intersects = lsn_split_point.range(lsn_range).collect_vec();
|
||||
if intersects.len() > 1 {
|
||||
let err = format!(
|
||||
"layer violates the layer map LSN split assumption: layer {} intersects with LSN [{}]",
|
||||
layer,
|
||||
intersects.into_iter().map(|lsn| lsn.to_string()).join(", ")
|
||||
);
|
||||
return Some(err);
|
||||
// A slow path to check if the layer intersects with any other delta layer.
|
||||
for (other_idx, other_layer) in all_delta_layers.iter().enumerate() {
|
||||
if other_idx == idx {
|
||||
// do not check self intersects with self
|
||||
continue;
|
||||
}
|
||||
if overlaps_with(&layer.lsn_range, &other_layer.lsn_range)
|
||||
&& overlaps_with(&layer.key_range, &other_layer.key_range)
|
||||
{
|
||||
let err = format!(
|
||||
"layer violates the layer map LSN split assumption: layer {} intersects with layer {}",
|
||||
layer, other_layer
|
||||
);
|
||||
return Some(err);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
|
||||
@@ -31,9 +31,9 @@ use pageserver_api::{
|
||||
},
|
||||
keyspace::{KeySpaceAccum, KeySpaceRandomAccum, SparseKeyPartitioning},
|
||||
models::{
|
||||
CompactionAlgorithm, CompactionAlgorithmSettings, DownloadRemoteLayersTaskInfo,
|
||||
DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy, InMemoryLayerInfo, LayerMapInfo,
|
||||
LsnLease, TimelineState,
|
||||
CompactKeyRange, CompactLsnRange, CompactionAlgorithm, CompactionAlgorithmSettings,
|
||||
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy,
|
||||
InMemoryLayerInfo, LayerMapInfo, LsnLease, TimelineState,
|
||||
},
|
||||
reltag::BlockNumber,
|
||||
shard::{ShardIdentity, ShardNumber, TenantShardId},
|
||||
@@ -788,63 +788,6 @@ pub(crate) struct CompactRequest {
|
||||
pub sub_compaction_max_job_size_mb: Option<u64>,
|
||||
}
|
||||
|
||||
#[serde_with::serde_as]
|
||||
#[derive(Debug, Clone, serde::Deserialize)]
|
||||
pub(crate) struct CompactLsnRange {
|
||||
pub start: Lsn,
|
||||
pub end: Lsn,
|
||||
}
|
||||
|
||||
#[serde_with::serde_as]
|
||||
#[derive(Debug, Clone, serde::Deserialize)]
|
||||
pub(crate) struct CompactKeyRange {
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
pub start: Key,
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
pub end: Key,
|
||||
}
|
||||
|
||||
impl From<Range<Lsn>> for CompactLsnRange {
|
||||
fn from(range: Range<Lsn>) -> Self {
|
||||
Self {
|
||||
start: range.start,
|
||||
end: range.end,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Range<Key>> for CompactKeyRange {
|
||||
fn from(range: Range<Key>) -> Self {
|
||||
Self {
|
||||
start: range.start,
|
||||
end: range.end,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<CompactLsnRange> for Range<Lsn> {
|
||||
fn from(range: CompactLsnRange) -> Self {
|
||||
range.start..range.end
|
||||
}
|
||||
}
|
||||
|
||||
impl From<CompactKeyRange> for Range<Key> {
|
||||
fn from(range: CompactKeyRange) -> Self {
|
||||
range.start..range.end
|
||||
}
|
||||
}
|
||||
|
||||
impl CompactLsnRange {
|
||||
#[cfg(test)]
|
||||
#[cfg(feature = "testing")]
|
||||
pub fn above(lsn: Lsn) -> Self {
|
||||
Self {
|
||||
start: lsn,
|
||||
end: Lsn::MAX,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub(crate) struct CompactOptions {
|
||||
pub flags: EnumSet<CompactFlags>,
|
||||
|
||||
@@ -29,6 +29,7 @@ use utils::id::TimelineId;
|
||||
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
|
||||
use crate::page_cache;
|
||||
use crate::statvfs::Statvfs;
|
||||
use crate::tenant::checks::check_valid_layermap;
|
||||
use crate::tenant::remote_timeline_client::WaitCompletionError;
|
||||
use crate::tenant::storage_layer::batch_split_writer::{
|
||||
BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter,
|
||||
@@ -1823,7 +1824,7 @@ impl Timeline {
|
||||
// by estimating the amount of files read for a compaction job. We should also partition on LSN.
|
||||
let ((dense_ks, sparse_ks), _) = {
|
||||
let Ok(partition) = self.partitioning.try_lock() else {
|
||||
bail!("failed to acquire partition lock");
|
||||
bail!("failed to acquire partition lock during gc-compaction");
|
||||
};
|
||||
partition.clone()
|
||||
};
|
||||
@@ -2156,15 +2157,14 @@ impl Timeline {
|
||||
|
||||
// Step 1: construct a k-merge iterator over all layers.
|
||||
// Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point.
|
||||
// disable the check for now because we need to adjust the check for partial compactions, will enable later.
|
||||
// let layer_names = job_desc
|
||||
// .selected_layers
|
||||
// .iter()
|
||||
// .map(|layer| layer.layer_desc().layer_name())
|
||||
// .collect_vec();
|
||||
// if let Some(err) = check_valid_layermap(&layer_names) {
|
||||
// warn!("gc-compaction layer map check failed because {}, this is normal if partial compaction is not finished yet", err);
|
||||
// }
|
||||
let layer_names = job_desc
|
||||
.selected_layers
|
||||
.iter()
|
||||
.map(|layer| layer.layer_desc().layer_name())
|
||||
.collect_vec();
|
||||
if let Some(err) = check_valid_layermap(&layer_names) {
|
||||
bail!("gc-compaction layer map check failed because {}, cannot proceed with compaction due to potential data loss", err);
|
||||
}
|
||||
// The maximum LSN we are processing in this compaction loop
|
||||
let end_lsn = job_desc
|
||||
.selected_layers
|
||||
@@ -2546,13 +2546,48 @@ impl Timeline {
|
||||
);
|
||||
|
||||
// Step 3: Place back to the layer map.
|
||||
|
||||
// First, do a sanity check to ensure the newly-created layer map does not contain overlaps.
|
||||
let all_layers = {
|
||||
let guard = self.layers.read().await;
|
||||
let layer_map = guard.layer_map()?;
|
||||
layer_map.iter_historic_layers().collect_vec()
|
||||
};
|
||||
|
||||
let mut final_layers = all_layers
|
||||
.iter()
|
||||
.map(|layer| layer.layer_name())
|
||||
.collect::<HashSet<_>>();
|
||||
for layer in &layer_selection {
|
||||
final_layers.remove(&layer.layer_desc().layer_name());
|
||||
}
|
||||
for layer in &compact_to {
|
||||
final_layers.insert(layer.layer_desc().layer_name());
|
||||
}
|
||||
let final_layers = final_layers.into_iter().collect_vec();
|
||||
|
||||
// TODO: move this check before we call `finish` on image layer writers. However, this will require us to get the layer name before we finish
|
||||
// the writer, so potentially, we will need a function like `ImageLayerBatchWriter::get_all_pending_layer_keys` to get all the keys that are
|
||||
// in the writer before finalizing the persistent layers. Now we would leave some dangling layers on the disk if the check fails.
|
||||
if let Some(err) = check_valid_layermap(&final_layers) {
|
||||
bail!("gc-compaction layer map check failed after compaction because {}, compaction result not applied to the layer map due to potential data loss", err);
|
||||
}
|
||||
|
||||
// Between the sanity check and this compaction update, there could be new layers being flushed, but it should be fine because we only
|
||||
// operate on L1 layers.
|
||||
{
|
||||
// TODO: sanity check if the layer map is valid (i.e., should not have overlaps)
|
||||
let mut guard = self.layers.write().await;
|
||||
guard
|
||||
.open_mut()?
|
||||
.finish_gc_compaction(&layer_selection, &compact_to, &self.metrics)
|
||||
};
|
||||
|
||||
// Schedule an index-only upload to update the `latest_gc_cutoff` in the index_part.json.
|
||||
// Otherwise, after restart, the index_part only contains the old `latest_gc_cutoff` and
|
||||
// find_gc_cutoffs will try accessing things below the cutoff. TODO: ideally, this should
|
||||
// be batched into `schedule_compaction_update`.
|
||||
let disk_consistent_lsn = self.disk_consistent_lsn.load();
|
||||
self.schedule_uploads(disk_consistent_lsn, None)?;
|
||||
self.remote_client
|
||||
.schedule_compaction_update(&layer_selection, &compact_to)?;
|
||||
|
||||
|
||||
@@ -827,7 +827,6 @@ pageserver_send(shardno_t shard_no, NeonRequest *request)
|
||||
{
|
||||
while (!pageserver_connect(shard_no, shard->n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR))
|
||||
{
|
||||
HandleMainLoopInterrupts();
|
||||
shard->n_reconnect_attempts += 1;
|
||||
}
|
||||
shard->n_reconnect_attempts = 0;
|
||||
|
||||
@@ -678,6 +678,9 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// flush the final server message
|
||||
stream.flush().await.unwrap();
|
||||
|
||||
handle.await.unwrap();
|
||||
}
|
||||
|
||||
|
||||
@@ -271,7 +271,6 @@ fn build_config(args: &LocalProxyCliArgs) -> anyhow::Result<&'static ProxyConfig
|
||||
Ok(Box::leak(Box::new(ProxyConfig {
|
||||
tls_config: None,
|
||||
metric_collection: None,
|
||||
allow_self_signed_compute: false,
|
||||
http_config,
|
||||
authentication_config: AuthenticationConfig {
|
||||
jwks_cache: JwkCache::default(),
|
||||
|
||||
@@ -129,9 +129,6 @@ struct ProxyCliArgs {
|
||||
/// lock for `connect_compute` api method. example: "shards=32,permits=4,epoch=10m,timeout=1s". (use `permits=0` to disable).
|
||||
#[clap(long, default_value = config::ConcurrencyLockOptions::DEFAULT_OPTIONS_CONNECT_COMPUTE_LOCK)]
|
||||
connect_compute_lock: String,
|
||||
/// Allow self-signed certificates for compute nodes (for testing)
|
||||
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
|
||||
allow_self_signed_compute: bool,
|
||||
#[clap(flatten)]
|
||||
sql_over_http: SqlOverHttpArgs,
|
||||
/// timeout for scram authentication protocol
|
||||
@@ -564,9 +561,6 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
|
||||
_ => bail!("either both or neither tls-key and tls-cert must be specified"),
|
||||
};
|
||||
|
||||
if args.allow_self_signed_compute {
|
||||
warn!("allowing self-signed compute certificates");
|
||||
}
|
||||
let backup_metric_collection_config = config::MetricBackupCollectionConfig {
|
||||
interval: args.metric_backup_collection_interval,
|
||||
remote_storage_config: args.metric_backup_collection_remote_storage.clone(),
|
||||
@@ -641,7 +635,6 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
|
||||
let config = ProxyConfig {
|
||||
tls_config,
|
||||
metric_collection,
|
||||
allow_self_signed_compute: args.allow_self_signed_compute,
|
||||
http_config,
|
||||
authentication_config,
|
||||
proxy_protocol_v2: args.proxy_protocol_v2,
|
||||
|
||||
@@ -4,7 +4,8 @@ use std::sync::Arc;
|
||||
use dashmap::DashMap;
|
||||
use ipnet::{IpNet, Ipv4Net, Ipv6Net};
|
||||
use once_cell::sync::OnceCell;
|
||||
use postgres_client::{tls::MakeTlsConnect, CancelToken};
|
||||
use postgres_client::tls::MakeTlsConnect;
|
||||
use postgres_client::CancelToken;
|
||||
use pq_proto::CancelKeyData;
|
||||
use rustls::crypto::ring;
|
||||
use thiserror::Error;
|
||||
@@ -14,17 +15,16 @@ use tracing::{debug, info};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::auth::{check_peer_addr_is_in_list, IpPattern};
|
||||
use crate::compute::load_certs;
|
||||
use crate::error::ReportableError;
|
||||
use crate::ext::LockExt;
|
||||
use crate::metrics::{CancellationRequest, CancellationSource, Metrics};
|
||||
use crate::postgres_rustls::MakeRustlsConnect;
|
||||
use crate::rate_limiter::LeakyBucketRateLimiter;
|
||||
use crate::redis::cancellation_publisher::{
|
||||
CancellationPublisher, CancellationPublisherMut, RedisPublisherClient,
|
||||
};
|
||||
|
||||
use crate::compute::{load_certs, AcceptEverythingVerifier};
|
||||
use crate::postgres_rustls::MakeRustlsConnect;
|
||||
|
||||
pub type CancelMap = Arc<DashMap<CancelKeyData, Option<CancelClosure>>>;
|
||||
pub type CancellationHandlerMain = CancellationHandler<Option<Arc<Mutex<RedisPublisherClient>>>>;
|
||||
pub(crate) type CancellationHandlerMainInternal = Option<Arc<Mutex<RedisPublisherClient>>>;
|
||||
@@ -240,7 +240,6 @@ pub struct CancelClosure {
|
||||
cancel_token: CancelToken,
|
||||
ip_allowlist: Vec<IpPattern>,
|
||||
hostname: String, // for pg_sni router
|
||||
allow_self_signed_compute: bool,
|
||||
}
|
||||
|
||||
impl CancelClosure {
|
||||
@@ -249,45 +248,34 @@ impl CancelClosure {
|
||||
cancel_token: CancelToken,
|
||||
ip_allowlist: Vec<IpPattern>,
|
||||
hostname: String,
|
||||
allow_self_signed_compute: bool,
|
||||
) -> Self {
|
||||
Self {
|
||||
socket_addr,
|
||||
cancel_token,
|
||||
ip_allowlist,
|
||||
hostname,
|
||||
allow_self_signed_compute,
|
||||
}
|
||||
}
|
||||
/// Cancels the query running on user's compute node.
|
||||
pub(crate) async fn try_cancel_query(self) -> Result<(), CancelError> {
|
||||
let socket = TcpStream::connect(self.socket_addr).await?;
|
||||
|
||||
let client_config = if self.allow_self_signed_compute {
|
||||
// Allow all certificates for creating the connection. Used only for tests
|
||||
let verifier = Arc::new(AcceptEverythingVerifier);
|
||||
rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider()))
|
||||
.with_safe_default_protocol_versions()
|
||||
.expect("ring should support the default protocol versions")
|
||||
.dangerous()
|
||||
.with_custom_certificate_verifier(verifier)
|
||||
} else {
|
||||
let root_store = TLS_ROOTS
|
||||
.get_or_try_init(load_certs)
|
||||
.map_err(|_e| {
|
||||
CancelError::IO(std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
"TLS root store initialization failed".to_string(),
|
||||
))
|
||||
})?
|
||||
.clone();
|
||||
let root_store = TLS_ROOTS
|
||||
.get_or_try_init(load_certs)
|
||||
.map_err(|_e| {
|
||||
CancelError::IO(std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
"TLS root store initialization failed".to_string(),
|
||||
))
|
||||
})?
|
||||
.clone();
|
||||
|
||||
let client_config =
|
||||
rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider()))
|
||||
.with_safe_default_protocol_versions()
|
||||
.expect("ring should support the default protocol versions")
|
||||
.with_root_certificates(root_store)
|
||||
};
|
||||
|
||||
let client_config = client_config.with_no_client_auth();
|
||||
.with_no_client_auth();
|
||||
|
||||
let mut mk_tls = crate::postgres_rustls::MakeRustlsConnect::new(client_config);
|
||||
let tls = <MakeRustlsConnect as MakeTlsConnect<tokio::net::TcpStream>>::make_tls_connect(
|
||||
|
||||
@@ -10,7 +10,6 @@ use postgres_client::tls::MakeTlsConnect;
|
||||
use postgres_client::{CancelToken, RawConnection};
|
||||
use postgres_protocol::message::backend::NoticeResponseBody;
|
||||
use pq_proto::StartupMessageParams;
|
||||
use rustls::client::danger::ServerCertVerifier;
|
||||
use rustls::crypto::ring;
|
||||
use rustls::pki_types::InvalidDnsNameError;
|
||||
use thiserror::Error;
|
||||
@@ -251,7 +250,6 @@ impl ConnCfg {
|
||||
pub(crate) async fn connect(
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
allow_self_signed_compute: bool,
|
||||
aux: MetricsAuxInfo,
|
||||
timeout: Duration,
|
||||
) -> Result<PostgresConnection, ConnectionError> {
|
||||
@@ -259,25 +257,17 @@ impl ConnCfg {
|
||||
let (socket_addr, stream, host) = self.connect_raw(timeout).await?;
|
||||
drop(pause);
|
||||
|
||||
let client_config = if allow_self_signed_compute {
|
||||
// Allow all certificates for creating the connection
|
||||
let verifier = Arc::new(AcceptEverythingVerifier);
|
||||
rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider()))
|
||||
.with_safe_default_protocol_versions()
|
||||
.expect("ring should support the default protocol versions")
|
||||
.dangerous()
|
||||
.with_custom_certificate_verifier(verifier)
|
||||
} else {
|
||||
let root_store = TLS_ROOTS
|
||||
.get_or_try_init(load_certs)
|
||||
.map_err(ConnectionError::TlsCertificateError)?
|
||||
.clone();
|
||||
let root_store = TLS_ROOTS
|
||||
.get_or_try_init(load_certs)
|
||||
.map_err(ConnectionError::TlsCertificateError)?
|
||||
.clone();
|
||||
|
||||
let client_config =
|
||||
rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider()))
|
||||
.with_safe_default_protocol_versions()
|
||||
.expect("ring should support the default protocol versions")
|
||||
.with_root_certificates(root_store)
|
||||
};
|
||||
let client_config = client_config.with_no_client_auth();
|
||||
.with_no_client_auth();
|
||||
|
||||
let mut mk_tls = crate::postgres_rustls::MakeRustlsConnect::new(client_config);
|
||||
let tls = <MakeRustlsConnect as MakeTlsConnect<tokio::net::TcpStream>>::make_tls_connect(
|
||||
@@ -320,7 +310,6 @@ impl ConnCfg {
|
||||
},
|
||||
vec![],
|
||||
host.to_string(),
|
||||
allow_self_signed_compute,
|
||||
);
|
||||
|
||||
let connection = PostgresConnection {
|
||||
@@ -365,50 +354,6 @@ pub(crate) fn load_certs() -> Result<Arc<rustls::RootCertStore>, Vec<rustls_nati
|
||||
}
|
||||
static TLS_ROOTS: OnceCell<Arc<rustls::RootCertStore>> = OnceCell::new();
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct AcceptEverythingVerifier;
|
||||
impl ServerCertVerifier for AcceptEverythingVerifier {
|
||||
fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
|
||||
use rustls::SignatureScheme;
|
||||
// The schemes for which `SignatureScheme::supported_in_tls13` returns true.
|
||||
vec![
|
||||
SignatureScheme::ECDSA_NISTP521_SHA512,
|
||||
SignatureScheme::ECDSA_NISTP384_SHA384,
|
||||
SignatureScheme::ECDSA_NISTP256_SHA256,
|
||||
SignatureScheme::RSA_PSS_SHA512,
|
||||
SignatureScheme::RSA_PSS_SHA384,
|
||||
SignatureScheme::RSA_PSS_SHA256,
|
||||
SignatureScheme::ED25519,
|
||||
]
|
||||
}
|
||||
fn verify_server_cert(
|
||||
&self,
|
||||
_end_entity: &rustls::pki_types::CertificateDer<'_>,
|
||||
_intermediates: &[rustls::pki_types::CertificateDer<'_>],
|
||||
_server_name: &rustls::pki_types::ServerName<'_>,
|
||||
_ocsp_response: &[u8],
|
||||
_now: rustls::pki_types::UnixTime,
|
||||
) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
|
||||
Ok(rustls::client::danger::ServerCertVerified::assertion())
|
||||
}
|
||||
fn verify_tls12_signature(
|
||||
&self,
|
||||
_message: &[u8],
|
||||
_cert: &rustls::pki_types::CertificateDer<'_>,
|
||||
_dss: &rustls::DigitallySignedStruct,
|
||||
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
|
||||
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
|
||||
}
|
||||
fn verify_tls13_signature(
|
||||
&self,
|
||||
_message: &[u8],
|
||||
_cert: &rustls::pki_types::CertificateDer<'_>,
|
||||
_dss: &rustls::DigitallySignedStruct,
|
||||
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
|
||||
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -25,7 +25,6 @@ use crate::types::Host;
|
||||
pub struct ProxyConfig {
|
||||
pub tls_config: Option<TlsConfig>,
|
||||
pub metric_collection: Option<MetricCollectionConfig>,
|
||||
pub allow_self_signed_compute: bool,
|
||||
pub http_config: HttpConfig,
|
||||
pub authentication_config: AuthenticationConfig,
|
||||
pub proxy_protocol_v2: ProxyProtocolV2,
|
||||
|
||||
@@ -213,7 +213,6 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
params_compat: true,
|
||||
params: ¶ms,
|
||||
locks: &config.connect_compute_locks,
|
||||
allow_self_signed_compute: config.allow_self_signed_compute,
|
||||
},
|
||||
&user_info,
|
||||
config.wake_compute_retry_config,
|
||||
|
||||
@@ -73,12 +73,9 @@ impl NodeInfo {
|
||||
pub(crate) async fn connect(
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
allow_self_signed_compute: bool,
|
||||
timeout: Duration,
|
||||
) -> Result<compute::PostgresConnection, compute::ConnectionError> {
|
||||
self.config
|
||||
.connect(ctx, allow_self_signed_compute, self.aux.clone(), timeout)
|
||||
.await
|
||||
self.config.connect(ctx, self.aux.clone(), timeout).await
|
||||
}
|
||||
|
||||
pub(crate) fn reuse_settings(&mut self, other: Self) {
|
||||
|
||||
@@ -73,9 +73,6 @@ pub(crate) struct TcpMechanism<'a> {
|
||||
|
||||
/// connect_to_compute concurrency lock
|
||||
pub(crate) locks: &'static ApiLocks<Host>,
|
||||
|
||||
/// Whether we should accept self-signed certificates (for testing)
|
||||
pub(crate) allow_self_signed_compute: bool,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -93,11 +90,7 @@ impl ConnectMechanism for TcpMechanism<'_> {
|
||||
) -> Result<PostgresConnection, Self::Error> {
|
||||
let host = node_info.config.get_host();
|
||||
let permit = self.locks.get_permit(&host).await?;
|
||||
permit.release_result(
|
||||
node_info
|
||||
.connect(ctx, self.allow_self_signed_compute, timeout)
|
||||
.await,
|
||||
)
|
||||
permit.release_result(node_info.connect(ctx, timeout).await)
|
||||
}
|
||||
|
||||
fn update_connect_config(&self, config: &mut compute::ConnCfg) {
|
||||
|
||||
@@ -348,8 +348,6 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
params_compat,
|
||||
params: ¶ms,
|
||||
locks: &config.connect_compute_locks,
|
||||
// only used for console redirect testing.
|
||||
allow_self_signed_compute: false,
|
||||
},
|
||||
&user_info,
|
||||
config.wake_compute_retry_config,
|
||||
|
||||
@@ -50,6 +50,12 @@ impl<S: AsyncWrite + Unpin> SaslStream<'_, S> {
|
||||
self.stream.write_message(&msg.to_reply()).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Queue a SASL message for the client.
|
||||
fn send_noflush(&mut self, msg: &ServerMessage<&str>) -> io::Result<()> {
|
||||
self.stream.write_message_noflush(&msg.to_reply())?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// SASL authentication outcome.
|
||||
@@ -85,7 +91,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> SaslStream<'_, S> {
|
||||
continue;
|
||||
}
|
||||
Step::Success(result, reply) => {
|
||||
self.send(&ServerMessage::Final(&reply)).await?;
|
||||
self.send_noflush(&ServerMessage::Final(&reply))?;
|
||||
Outcome::Success(result)
|
||||
}
|
||||
Step::Failure(reason) => Outcome::Failure(reason),
|
||||
|
||||
@@ -9,6 +9,7 @@ default = []
|
||||
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
|
||||
# which adds some runtime cost to run tests on outage conditions
|
||||
testing = ["fail/failpoints"]
|
||||
benchmarking = []
|
||||
|
||||
[dependencies]
|
||||
async-stream.workspace = true
|
||||
@@ -77,3 +78,4 @@ tracing-subscriber = { workspace = true, features = ["json"] }
|
||||
[[bench]]
|
||||
name = "receive_wal"
|
||||
harness = false
|
||||
required-features = ["benchmarking"]
|
||||
|
||||
@@ -1,11 +1,7 @@
|
||||
//! WAL ingestion benchmarks.
|
||||
|
||||
#[path = "benchutils.rs"]
|
||||
mod benchutils;
|
||||
|
||||
use std::io::Write as _;
|
||||
|
||||
use benchutils::Env;
|
||||
use bytes::BytesMut;
|
||||
use camino_tempfile::tempfile;
|
||||
use criterion::{criterion_group, criterion_main, BatchSize, Bencher, Criterion};
|
||||
@@ -16,6 +12,7 @@ use safekeeper::receive_wal::{self, WalAcceptor};
|
||||
use safekeeper::safekeeper::{
|
||||
AcceptorProposerMessage, AppendRequest, AppendRequestHeader, ProposerAcceptorMessage,
|
||||
};
|
||||
use safekeeper::test_utils::Env;
|
||||
use tokio::io::AsyncWriteExt as _;
|
||||
use utils::id::{NodeId, TenantTimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
@@ -76,12 +73,15 @@ fn bench_process_msg(c: &mut Criterion) {
|
||||
assert!(size >= prefixlen);
|
||||
let message = vec![0; size - prefixlen];
|
||||
|
||||
let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message));
|
||||
let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message), Lsn(0));
|
||||
|
||||
// Set up the Safekeeper.
|
||||
let env = Env::new(fsync)?;
|
||||
let mut safekeeper =
|
||||
runtime.block_on(env.make_safekeeper(NodeId(1), TenantTimelineId::generate()))?;
|
||||
let mut safekeeper = runtime.block_on(env.make_safekeeper(
|
||||
NodeId(1),
|
||||
TenantTimelineId::generate(),
|
||||
Lsn(0),
|
||||
))?;
|
||||
|
||||
b.iter_batched_ref(
|
||||
// Pre-construct WAL records and requests. Criterion will batch them.
|
||||
@@ -134,7 +134,8 @@ fn bench_wal_acceptor(c: &mut Criterion) {
|
||||
let runtime = tokio::runtime::Runtime::new()?; // needs multithreaded
|
||||
|
||||
let env = Env::new(fsync)?;
|
||||
let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(c"prefix", b"message"));
|
||||
let walgen =
|
||||
&mut WalGenerator::new(LogicalMessageGenerator::new(c"prefix", b"message"), Lsn(0));
|
||||
|
||||
// Create buffered channels that can fit all requests, to avoid blocking on channels.
|
||||
let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(n);
|
||||
@@ -145,7 +146,7 @@ fn bench_wal_acceptor(c: &mut Criterion) {
|
||||
// TODO: WalAcceptor doesn't actually need a full timeline, only
|
||||
// Safekeeper::process_msg(). Consider decoupling them to simplify the setup.
|
||||
let tli = env
|
||||
.make_timeline(NodeId(1), TenantTimelineId::generate())
|
||||
.make_timeline(NodeId(1), TenantTimelineId::generate(), Lsn(0))
|
||||
.await?
|
||||
.wal_residence_guard()
|
||||
.await?;
|
||||
@@ -239,7 +240,7 @@ fn bench_wal_acceptor_throughput(c: &mut Criterion) {
|
||||
assert!(size >= prefixlen);
|
||||
let message = vec![0; size - prefixlen];
|
||||
|
||||
let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message));
|
||||
let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message), Lsn(0));
|
||||
|
||||
// Construct and spawn the WalAcceptor task.
|
||||
let env = Env::new(fsync)?;
|
||||
@@ -249,7 +250,7 @@ fn bench_wal_acceptor_throughput(c: &mut Criterion) {
|
||||
|
||||
runtime.block_on(async {
|
||||
let tli = env
|
||||
.make_timeline(NodeId(1), TenantTimelineId::generate())
|
||||
.make_timeline(NodeId(1), TenantTimelineId::generate(), Lsn(0))
|
||||
.await?
|
||||
.wal_residence_guard()
|
||||
.await?;
|
||||
|
||||
@@ -564,7 +564,7 @@ pub fn make_router(
|
||||
if conf.http_auth.is_some() {
|
||||
router = router.middleware(auth_middleware(|request| {
|
||||
const ALLOWLIST_ROUTES: &[&str] =
|
||||
&["/v1/status", "/metrics", "/profile/cpu", "profile/heap"];
|
||||
&["/v1/status", "/metrics", "/profile/cpu", "/profile/heap"];
|
||||
if ALLOWLIST_ROUTES.contains(&request.uri().path()) {
|
||||
None
|
||||
} else {
|
||||
|
||||
@@ -43,6 +43,9 @@ pub mod wal_reader_stream;
|
||||
pub mod wal_service;
|
||||
pub mod wal_storage;
|
||||
|
||||
#[cfg(any(test, feature = "benchmarking"))]
|
||||
pub mod test_utils;
|
||||
|
||||
mod timelines_global_map;
|
||||
use std::sync::Arc;
|
||||
pub use timelines_global_map::GlobalTimelines;
|
||||
|
||||
@@ -94,9 +94,14 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
|
||||
}
|
||||
}
|
||||
|
||||
let max_next_record_lsn = match max_next_record_lsn {
|
||||
Some(lsn) => lsn,
|
||||
None => { continue; }
|
||||
};
|
||||
|
||||
let batch = InterpretedWalRecords {
|
||||
records,
|
||||
next_record_lsn: max_next_record_lsn
|
||||
next_record_lsn: Some(max_next_record_lsn),
|
||||
};
|
||||
|
||||
tx.send(Batch {wal_end_lsn, available_wal_end_lsn, records: batch}).await.unwrap();
|
||||
|
||||
@@ -1,18 +1,18 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::rate_limit::RateLimiter;
|
||||
use crate::safekeeper::{ProposerAcceptorMessage, ProposerElected, SafeKeeper, TermHistory};
|
||||
use crate::state::{TimelinePersistentState, TimelineState};
|
||||
use crate::timeline::{get_timeline_dir, SharedState, StateSK, Timeline};
|
||||
use crate::timelines_set::TimelinesSet;
|
||||
use crate::wal_backup::remote_timeline_path;
|
||||
use crate::{control_file, wal_storage, SafeKeeperConf};
|
||||
use camino_tempfile::Utf8TempDir;
|
||||
use safekeeper::rate_limit::RateLimiter;
|
||||
use safekeeper::safekeeper::{ProposerAcceptorMessage, ProposerElected, SafeKeeper, TermHistory};
|
||||
use safekeeper::state::{TimelinePersistentState, TimelineState};
|
||||
use safekeeper::timeline::{get_timeline_dir, SharedState, StateSK, Timeline};
|
||||
use safekeeper::timelines_set::TimelinesSet;
|
||||
use safekeeper::wal_backup::remote_timeline_path;
|
||||
use safekeeper::{control_file, wal_storage, SafeKeeperConf};
|
||||
use tokio::fs::create_dir_all;
|
||||
use utils::id::{NodeId, TenantTimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
/// A Safekeeper benchmarking environment. Uses a tempdir for storage, removed on drop.
|
||||
/// A Safekeeper testing or benchmarking environment. Uses a tempdir for storage, removed on drop.
|
||||
pub struct Env {
|
||||
/// Whether to enable fsync.
|
||||
pub fsync: bool,
|
||||
@@ -21,7 +21,7 @@ pub struct Env {
|
||||
}
|
||||
|
||||
impl Env {
|
||||
/// Creates a new benchmarking environment in a temporary directory. fsync controls whether to
|
||||
/// Creates a new test or benchmarking environment in a temporary directory. fsync controls whether to
|
||||
/// enable fsyncing.
|
||||
pub fn new(fsync: bool) -> anyhow::Result<Self> {
|
||||
let tempdir = camino_tempfile::tempdir()?;
|
||||
@@ -47,6 +47,7 @@ impl Env {
|
||||
&self,
|
||||
node_id: NodeId,
|
||||
ttid: TenantTimelineId,
|
||||
start_lsn: Lsn,
|
||||
) -> anyhow::Result<SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>> {
|
||||
let conf = self.make_conf(node_id);
|
||||
|
||||
@@ -67,9 +68,9 @@ impl Env {
|
||||
safekeeper
|
||||
.process_msg(&ProposerAcceptorMessage::Elected(ProposerElected {
|
||||
term: 1,
|
||||
start_streaming_at: Lsn(0),
|
||||
term_history: TermHistory(vec![(1, Lsn(0)).into()]),
|
||||
timeline_start_lsn: Lsn(0),
|
||||
start_streaming_at: start_lsn,
|
||||
term_history: TermHistory(vec![(1, start_lsn).into()]),
|
||||
timeline_start_lsn: start_lsn,
|
||||
}))
|
||||
.await?;
|
||||
|
||||
@@ -82,12 +83,13 @@ impl Env {
|
||||
&self,
|
||||
node_id: NodeId,
|
||||
ttid: TenantTimelineId,
|
||||
start_lsn: Lsn,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
let conf = Arc::new(self.make_conf(node_id));
|
||||
let timeline_dir = get_timeline_dir(&conf, &ttid);
|
||||
let remote_path = remote_timeline_path(&ttid)?;
|
||||
|
||||
let safekeeper = self.make_safekeeper(node_id, ttid).await?;
|
||||
let safekeeper = self.make_safekeeper(node_id, ttid, start_lsn).await?;
|
||||
let shared_state = SharedState::new(StateSK::Loaded(safekeeper));
|
||||
|
||||
let timeline = Timeline::new(
|
||||
@@ -18,7 +18,7 @@ impl DiskWalProposer {
|
||||
internal_available_lsn: Lsn(0),
|
||||
prev_lsn: Lsn(0),
|
||||
disk: BlockStorage::new(),
|
||||
wal_generator: WalGenerator::new(LogicalMessageGenerator::new(c"", &[])),
|
||||
wal_generator: WalGenerator::new(LogicalMessageGenerator::new(c"", &[]), Lsn(0)),
|
||||
}),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ use diesel::Connection;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::controller_api::AvailabilityZone;
|
||||
use pageserver_api::controller_api::MetadataHealthRecord;
|
||||
use pageserver_api::controller_api::SafekeeperDescribeResponse;
|
||||
use pageserver_api::controller_api::ShardSchedulingPolicy;
|
||||
use pageserver_api::controller_api::{NodeSchedulingPolicy, PlacementPolicy};
|
||||
use pageserver_api::models::TenantConfig;
|
||||
@@ -1241,6 +1242,18 @@ impl SafekeeperPersistence {
|
||||
availability_zone_id: &self.availability_zone_id,
|
||||
}
|
||||
}
|
||||
pub(crate) fn as_describe_response(&self) -> SafekeeperDescribeResponse {
|
||||
// omit the `active` flag on purpose: it is deprecated.
|
||||
SafekeeperDescribeResponse {
|
||||
id: NodeId(self.id as u64),
|
||||
region_id: self.region_id.clone(),
|
||||
version: self.version,
|
||||
host: self.host.clone(),
|
||||
port: self.port,
|
||||
http_port: self.http_port,
|
||||
availability_zone_id: self.availability_zone_id.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Insertable, AsChangeset)]
|
||||
|
||||
@@ -46,10 +46,11 @@ use pageserver_api::{
|
||||
controller_api::{
|
||||
AvailabilityZone, MetadataHealthRecord, MetadataHealthUpdateRequest, NodeAvailability,
|
||||
NodeRegisterRequest, NodeSchedulingPolicy, NodeShard, NodeShardResponse, PlacementPolicy,
|
||||
ShardSchedulingPolicy, ShardsPreferredAzsRequest, ShardsPreferredAzsResponse,
|
||||
TenantCreateRequest, TenantCreateResponse, TenantCreateResponseShard,
|
||||
TenantDescribeResponse, TenantDescribeResponseShard, TenantLocateResponse,
|
||||
TenantPolicyRequest, TenantShardMigrateRequest, TenantShardMigrateResponse,
|
||||
SafekeeperDescribeResponse, ShardSchedulingPolicy, ShardsPreferredAzsRequest,
|
||||
ShardsPreferredAzsResponse, TenantCreateRequest, TenantCreateResponse,
|
||||
TenantCreateResponseShard, TenantDescribeResponse, TenantDescribeResponseShard,
|
||||
TenantLocateResponse, TenantPolicyRequest, TenantShardMigrateRequest,
|
||||
TenantShardMigrateResponse,
|
||||
},
|
||||
models::{
|
||||
SecondaryProgress, TenantConfigPatchRequest, TenantConfigRequest,
|
||||
@@ -7169,15 +7170,24 @@ impl Service {
|
||||
|
||||
pub(crate) async fn safekeepers_list(
|
||||
&self,
|
||||
) -> Result<Vec<crate::persistence::SafekeeperPersistence>, DatabaseError> {
|
||||
self.persistence.list_safekeepers().await
|
||||
) -> Result<Vec<SafekeeperDescribeResponse>, DatabaseError> {
|
||||
Ok(self
|
||||
.persistence
|
||||
.list_safekeepers()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|v| v.as_describe_response())
|
||||
.collect::<Vec<_>>())
|
||||
}
|
||||
|
||||
pub(crate) async fn get_safekeeper(
|
||||
&self,
|
||||
id: i64,
|
||||
) -> Result<crate::persistence::SafekeeperPersistence, DatabaseError> {
|
||||
self.persistence.safekeeper_get(id).await
|
||||
) -> Result<SafekeeperDescribeResponse, DatabaseError> {
|
||||
self.persistence
|
||||
.safekeeper_get(id)
|
||||
.await
|
||||
.map(|v| v.as_describe_response())
|
||||
}
|
||||
|
||||
pub(crate) async fn upsert_safekeeper(
|
||||
|
||||
@@ -310,7 +310,7 @@ pub(crate) enum BlobDataParseResult {
|
||||
index_part_generation: Generation,
|
||||
s3_layers: HashSet<(LayerName, Generation)>,
|
||||
},
|
||||
/// The remains of a deleted Timeline (i.e. an initdb archive only)
|
||||
/// The remains of an uncleanly deleted Timeline or aborted timeline creation(e.g. an initdb archive only, or some layer without an index)
|
||||
Relic,
|
||||
Incorrect {
|
||||
errors: Vec<String>,
|
||||
@@ -346,7 +346,7 @@ pub(crate) async fn list_timeline_blobs(
|
||||
match res {
|
||||
ListTimelineBlobsResult::Ready(data) => Ok(data),
|
||||
ListTimelineBlobsResult::MissingIndexPart(_) => {
|
||||
// Retry if index is missing.
|
||||
// Retry if listing raced with removal of an index
|
||||
let data = list_timeline_blobs_impl(remote_client, id, root_target)
|
||||
.await?
|
||||
.into_data();
|
||||
@@ -358,7 +358,7 @@ pub(crate) async fn list_timeline_blobs(
|
||||
enum ListTimelineBlobsResult {
|
||||
/// Blob data is ready to be intepreted.
|
||||
Ready(RemoteTimelineBlobData),
|
||||
/// List timeline blobs has layer files but is missing [`IndexPart`].
|
||||
/// The listing contained an index but when we tried to fetch it, we couldn't
|
||||
MissingIndexPart(RemoteTimelineBlobData),
|
||||
}
|
||||
|
||||
@@ -467,19 +467,19 @@ async fn list_timeline_blobs_impl(
|
||||
match index_part_object.as_ref() {
|
||||
Some(selected) => index_part_keys.retain(|k| k != selected),
|
||||
None => {
|
||||
// It is possible that the branch gets deleted after we got some layer files listed
|
||||
// and we no longer have the index file in the listing.
|
||||
errors.push(
|
||||
// This case does not indicate corruption, but it should be very unusual. It can
|
||||
// happen if:
|
||||
// - timeline creation is in progress (first layer is written before index is written)
|
||||
// - timeline deletion happened while a stale pageserver was still attached, it might upload
|
||||
// a layer after the deletion is done.
|
||||
tracing::info!(
|
||||
"S3 list response got no index_part.json file but still has layer files"
|
||||
.to_string(),
|
||||
);
|
||||
return Ok(ListTimelineBlobsResult::MissingIndexPart(
|
||||
RemoteTimelineBlobData {
|
||||
blob_data: BlobDataParseResult::Incorrect { errors, s3_layers },
|
||||
unused_index_keys: index_part_keys,
|
||||
unknown_keys,
|
||||
},
|
||||
));
|
||||
return Ok(ListTimelineBlobsResult::Ready(RemoteTimelineBlobData {
|
||||
blob_data: BlobDataParseResult::Relic,
|
||||
unused_index_keys: index_part_keys,
|
||||
unknown_keys,
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -3222,7 +3222,6 @@ class NeonProxy(PgProtocol):
|
||||
# Link auth backend params
|
||||
*["--auth-backend", "link"],
|
||||
*["--uri", NeonProxy.link_auth_uri],
|
||||
*["--allow-self-signed-compute", "true"],
|
||||
]
|
||||
|
||||
class ProxyV1(AuthBackend):
|
||||
|
||||
@@ -738,6 +738,18 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
res_json = res.json()
|
||||
assert res_json is None
|
||||
|
||||
def timeline_compact_info(
|
||||
self,
|
||||
tenant_id: TenantId | TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
) -> Any:
|
||||
res = self.get(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/compact",
|
||||
)
|
||||
self.verbose_error(res)
|
||||
res_json = res.json()
|
||||
return res_json
|
||||
|
||||
def timeline_compact(
|
||||
self,
|
||||
tenant_id: TenantId | TenantShardId,
|
||||
@@ -749,7 +761,6 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
enhanced_gc_bottom_most_compaction=False,
|
||||
body: dict[str, Any] | None = None,
|
||||
):
|
||||
self.is_testing_enabled_or_skip()
|
||||
query = {}
|
||||
if force_repartition:
|
||||
query["force_repartition"] = "true"
|
||||
|
||||
199
test_runner/performance/many_relations/create_many_relations.sql
Normal file
199
test_runner/performance/many_relations/create_many_relations.sql
Normal file
@@ -0,0 +1,199 @@
|
||||
-- create a schema that simulates Neon control plane operations table
|
||||
-- however use partitioned operations tables with many (e.g. 500) child partition tables per table
|
||||
-- in summary we create multiple of these partitioned operations tables (with 500 childs each) - until we reach the requested number of tables
|
||||
|
||||
|
||||
-- first we need some other tables that can be referenced by the operations table
|
||||
|
||||
-- Table for branches
|
||||
CREATE TABLE public.branches (
|
||||
id text PRIMARY KEY
|
||||
);
|
||||
|
||||
-- Table for endpoints
|
||||
CREATE TABLE public.endpoints (
|
||||
id text PRIMARY KEY
|
||||
);
|
||||
|
||||
-- Table for projects
|
||||
CREATE TABLE public.projects (
|
||||
id text PRIMARY KEY
|
||||
);
|
||||
|
||||
INSERT INTO public.branches (id)
|
||||
VALUES ('branch_1');
|
||||
|
||||
-- Insert one row into endpoints
|
||||
INSERT INTO public.endpoints (id)
|
||||
VALUES ('endpoint_1');
|
||||
|
||||
-- Insert one row into projects
|
||||
INSERT INTO public.projects (id)
|
||||
VALUES ('project_1');
|
||||
|
||||
-- now we create a procedure that can create n operations tables
|
||||
-- we do that in a procedure to save roundtrip latency when scaling the test to many tables
|
||||
-- prefix is the base table name, e.g. 'operations_scale_1000' if we create 1000 tables
|
||||
CREATE OR REPLACE PROCEDURE create_partitioned_tables(prefix text, n INT)
|
||||
LANGUAGE plpgsql AS $$
|
||||
DECLARE
|
||||
table_name TEXT; -- Variable to hold table names dynamically
|
||||
i INT; -- Counter for the loop
|
||||
BEGIN
|
||||
-- Loop to create n partitioned tables
|
||||
FOR i IN 1..n LOOP
|
||||
table_name := format('%s_%s', prefix, i);
|
||||
|
||||
-- Create the partitioned table
|
||||
EXECUTE format(
|
||||
'CREATE TABLE public.%s (
|
||||
project_id character varying NOT NULL,
|
||||
id uuid NOT NULL,
|
||||
status integer,
|
||||
action character varying NOT NULL,
|
||||
error character varying,
|
||||
created_at timestamp with time zone NOT NULL DEFAULT now(),
|
||||
updated_at timestamp with time zone NOT NULL DEFAULT now(),
|
||||
spec jsonb,
|
||||
retry_at timestamp with time zone,
|
||||
failures_count integer DEFAULT 0,
|
||||
metadata jsonb NOT NULL DEFAULT ''{}''::jsonb,
|
||||
executor_id text NOT NULL,
|
||||
attempt_duration_ms integer,
|
||||
metrics jsonb DEFAULT ''{}''::jsonb,
|
||||
branch_id text,
|
||||
endpoint_id text,
|
||||
next_operation_id uuid,
|
||||
compute_id text,
|
||||
connection_attempt_at timestamp with time zone,
|
||||
concurrency_key text,
|
||||
queue_id text,
|
||||
CONSTRAINT %s_pkey PRIMARY KEY (id, created_at),
|
||||
CONSTRAINT %s_branch_id_fk FOREIGN KEY (branch_id) REFERENCES branches(id) ON DELETE CASCADE,
|
||||
CONSTRAINT %s_endpoint_id_fk FOREIGN KEY (endpoint_id) REFERENCES endpoints(id) ON DELETE CASCADE,
|
||||
CONSTRAINT %s_next_operation_id_fk FOREIGN KEY (next_operation_id, created_at) REFERENCES %s(id, created_at),
|
||||
CONSTRAINT %s_project_id_fk FOREIGN KEY (project_id) REFERENCES projects(id) ON DELETE CASCADE
|
||||
) PARTITION BY RANGE (created_at)',
|
||||
table_name, table_name, table_name, table_name, table_name, table_name, table_name
|
||||
);
|
||||
|
||||
-- Add indexes for the partitioned table
|
||||
EXECUTE format('CREATE INDEX index_%s_on_next_operation_id ON public.%s (next_operation_id)', table_name, table_name);
|
||||
EXECUTE format('CREATE INDEX index_%s_on_project_id ON public.%s (project_id)', table_name, table_name);
|
||||
EXECUTE format('CREATE INDEX %s_branch_id ON public.%s (branch_id)', table_name, table_name);
|
||||
EXECUTE format('CREATE INDEX %s_branch_id_created_idx ON public.%s (branch_id, created_at)', table_name, table_name);
|
||||
EXECUTE format('CREATE INDEX %s_created_at_idx ON public.%s (created_at)', table_name, table_name);
|
||||
EXECUTE format('CREATE INDEX %s_created_at_project_id_id_cond_idx ON public.%s (created_at, project_id, id)', table_name, table_name);
|
||||
EXECUTE format('CREATE INDEX %s_endpoint_id ON public.%s (endpoint_id)', table_name, table_name);
|
||||
EXECUTE format(
|
||||
'CREATE INDEX %s_for_redo_worker_idx ON public.%s (executor_id) WHERE status <> 1',
|
||||
table_name, table_name
|
||||
);
|
||||
EXECUTE format(
|
||||
'CREATE INDEX %s_project_id_status_index ON public.%s ((project_id::text), status)',
|
||||
table_name, table_name
|
||||
);
|
||||
EXECUTE format(
|
||||
'CREATE INDEX %s_status_not_finished ON public.%s (status) WHERE status <> 1',
|
||||
table_name, table_name
|
||||
);
|
||||
EXECUTE format('CREATE INDEX %s_updated_at_desc_idx ON public.%s (updated_at DESC)', table_name, table_name);
|
||||
EXECUTE format(
|
||||
'CREATE INDEX %s_with_failures ON public.%s (failures_count) WHERE failures_count > 0',
|
||||
table_name, table_name
|
||||
);
|
||||
END LOOP;
|
||||
END;
|
||||
$$;
|
||||
|
||||
-- next we create a procedure that can add the child partitions (one per day) to each of the operations tables
|
||||
CREATE OR REPLACE PROCEDURE create_operations_partitions(
|
||||
table_name TEXT,
|
||||
start_date DATE,
|
||||
end_date DATE
|
||||
)
|
||||
LANGUAGE plpgsql AS $$
|
||||
DECLARE
|
||||
partition_date DATE;
|
||||
partition_name TEXT;
|
||||
counter INT := 0; -- Counter to track the number of tables created in the current transaction
|
||||
BEGIN
|
||||
partition_date := start_date;
|
||||
|
||||
-- Create partitions in batches
|
||||
WHILE partition_date < end_date LOOP
|
||||
partition_name := format('%s_%s', table_name, to_char(partition_date,'YYYY_MM_DD'));
|
||||
|
||||
EXECUTE format(
|
||||
'CREATE TABLE IF NOT EXISTS public.%s PARTITION OF public.%s
|
||||
FOR VALUES FROM (''%s'') TO (''%s'')',
|
||||
partition_name,
|
||||
table_name,
|
||||
partition_date,
|
||||
partition_date + INTERVAL '1 day'
|
||||
);
|
||||
|
||||
counter := counter + 1;
|
||||
|
||||
-- Commit and reset counter after every 100 partitions
|
||||
IF counter >= 100 THEN
|
||||
COMMIT;
|
||||
counter := 0; -- Reset the counter
|
||||
END IF;
|
||||
|
||||
-- Advance to the next day
|
||||
partition_date := partition_date + INTERVAL '1 day';
|
||||
END LOOP;
|
||||
|
||||
-- Final commit for remaining partitions
|
||||
IF counter > 0 THEN
|
||||
COMMIT;
|
||||
END IF;
|
||||
|
||||
-- Insert synthetic rows into each partition
|
||||
EXECUTE format(
|
||||
'INSERT INTO %I (
|
||||
project_id,
|
||||
branch_id,
|
||||
endpoint_id,
|
||||
id,
|
||||
status,
|
||||
action,
|
||||
created_at,
|
||||
updated_at,
|
||||
spec,
|
||||
metadata,
|
||||
executor_id,
|
||||
failures_count
|
||||
)
|
||||
SELECT
|
||||
''project_1'', -- project_id
|
||||
''branch_1'', -- branch_id
|
||||
''endpoint_1'', -- endpoint_id
|
||||
''e8bba687-0df9-4291-bfcd-7d5f6aa7c158'', -- unique id
|
||||
1, -- status
|
||||
''SYNTHETIC_ACTION'', -- action
|
||||
gs::timestamp + interval ''0 ms'', -- created_at
|
||||
gs::timestamp + interval ''1 minute'', -- updated_at
|
||||
''{"key": "value"}'', -- spec (JSONB)
|
||||
''{"metadata_key": "metadata_value"}'', -- metadata (JSONB)
|
||||
''executor_1'', -- executor_id
|
||||
0 -- failures_count
|
||||
FROM generate_series(%L, %L::DATE - INTERVAL ''1 day'', INTERVAL ''1 day'') AS gs',
|
||||
table_name, start_date, end_date
|
||||
);
|
||||
|
||||
-- Commit the inserted rows
|
||||
COMMIT;
|
||||
END;
|
||||
$$;
|
||||
|
||||
-- we can now create partitioned tables using something like
|
||||
-- CALL create_partitioned_tables('operations_scale_1000' ,10);
|
||||
|
||||
-- and we can create the child partitions for a table using something like
|
||||
-- CALL create_operations_partitions(
|
||||
-- 'operations_scale_1000_1',
|
||||
-- '2000-01-01', -- Start date
|
||||
-- ('2000-01-01'::DATE + INTERVAL '1 day' * 500)::DATE -- End date (start date + number of days)
|
||||
-- );
|
||||
@@ -22,7 +22,7 @@ def gc_feedback_impl(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchma
|
||||
"checkpoint_distance": f"{1024 ** 2}",
|
||||
"compaction_target_size": f"{1024 ** 2}",
|
||||
# set PITR interval to be small, so we can do GC
|
||||
"pitr_interval": "60 s",
|
||||
"pitr_interval": "10 s",
|
||||
# "compaction_threshold": "3",
|
||||
# "image_creation_threshold": "2",
|
||||
}
|
||||
@@ -32,6 +32,7 @@ def gc_feedback_impl(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchma
|
||||
n_steps = 10
|
||||
n_update_iters = 100
|
||||
step_size = 10000
|
||||
branch_created = 0
|
||||
with endpoint.cursor() as cur:
|
||||
cur.execute("SET statement_timeout='1000s'")
|
||||
cur.execute(
|
||||
@@ -66,6 +67,7 @@ def gc_feedback_impl(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchma
|
||||
if mode == "with_snapshots":
|
||||
if step == n_steps / 2:
|
||||
env.create_branch("child")
|
||||
branch_created += 1
|
||||
|
||||
max_num_of_deltas_above_image = 0
|
||||
max_total_num_of_deltas = 0
|
||||
@@ -142,6 +144,15 @@ def gc_feedback_impl(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchma
|
||||
with layer_map_path.open("w") as f:
|
||||
f.write(json.dumps(client.timeline_layer_map_info(tenant_id, timeline_id)))
|
||||
|
||||
# We should have collected all garbage
|
||||
if mode == "normal":
|
||||
# in theory we should get physical size ~= logical size, but given that gc interval is 10s,
|
||||
# and the layer has indexes that might contribute to the fluctuation, we allow a small margin
|
||||
# of 1 here, and the end ratio we are asserting is 1 (margin) + 1 (expected) = 2.
|
||||
assert physical_size / logical_size < 2
|
||||
elif mode == "with_snapshots":
|
||||
assert physical_size / logical_size < (2 + branch_created)
|
||||
|
||||
|
||||
@pytest.mark.timeout(10000)
|
||||
def test_gc_feedback(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker):
|
||||
|
||||
66
test_runner/performance/test_perf_many_relations.py
Normal file
66
test_runner/performance/test_perf_many_relations.py
Normal file
@@ -0,0 +1,66 @@
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from fixtures.compare_fixtures import RemoteCompare
|
||||
from fixtures.log_helper import log
|
||||
|
||||
|
||||
def get_num_relations(default: int = 1000) -> list[int]:
|
||||
# We parametrize each run with scale specifying the number of wanted child partitions.
|
||||
# Databases are pre-created and passed through BENCHMARK_CONNSTR env variable.
|
||||
scales = os.getenv("TEST_NUM_RELATIONS", default=str(default))
|
||||
rv = []
|
||||
for s in scales.split(","):
|
||||
scale = int(s)
|
||||
rv.append(scale)
|
||||
return rv
|
||||
|
||||
|
||||
@pytest.mark.parametrize("num_relations", get_num_relations())
|
||||
@pytest.mark.remote_cluster
|
||||
def test_perf_many_relations(remote_compare: RemoteCompare, num_relations: int):
|
||||
"""
|
||||
Test creating many relations in a single database.
|
||||
We use partitioned tables with child tables, indexes and constraints to have a realistic schema.
|
||||
Also we include some common data types like text, uuid, timestamp, JSONB, etc.
|
||||
|
||||
see many_relations/create_many_relations.sql
|
||||
"""
|
||||
env = remote_compare
|
||||
|
||||
# prepare some base tables and the plpgsql procedures that we use to create the tables
|
||||
sql_file = Path(__file__).parent / "many_relations" / "create_many_relations.sql"
|
||||
env.pg_bin.run_capture(["psql", env.pg.connstr(), "-f", str(sql_file)])
|
||||
|
||||
num_parent_tables = num_relations // 500 + 1
|
||||
log.info(f"Creating {num_relations} relations in {num_parent_tables} parent tables")
|
||||
|
||||
log.info(f"Creating {num_parent_tables} parent tables")
|
||||
sql = f"CALL create_partitioned_tables('operations_scale_{num_relations}', {num_parent_tables})"
|
||||
log.info(sql)
|
||||
env.pg_bin.run_capture(["psql", env.pg.connstr(), "-c", sql])
|
||||
|
||||
current_table = 0
|
||||
num_relations_remaining = num_relations
|
||||
|
||||
# now run and measure the actual relation creation
|
||||
while num_relations_remaining > 0:
|
||||
current_table += 1
|
||||
parent_table_name = f"operations_scale_{num_relations}_{current_table}"
|
||||
if num_relations_remaining > 500:
|
||||
num_relations_to_create = 500
|
||||
else:
|
||||
num_relations_to_create = num_relations_remaining
|
||||
num_relations_remaining -= num_relations_to_create
|
||||
log.info(
|
||||
f"Creating {num_relations_to_create} child tables in partitioned parent table '{parent_table_name}'"
|
||||
)
|
||||
sql = f"CALL create_operations_partitions( '{parent_table_name}', '2000-01-01', ('2000-01-01'::DATE + INTERVAL '1 day' * {num_relations_to_create})::DATE)"
|
||||
log.info(sql)
|
||||
with env.zenbenchmark.record_duration(
|
||||
f"CREATE_TABLE/{current_table}/{num_relations_to_create}"
|
||||
):
|
||||
env.pg_bin.run_capture(
|
||||
["psql", env.pg.connstr(options="-cstatement_timeout=1000s "), "-c", sql]
|
||||
)
|
||||
@@ -134,6 +134,10 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder):
|
||||
}
|
||||
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=SMOKE_CONF)
|
||||
env.pageserver.allowed_errors.append(
|
||||
r".*failed to acquire partition lock during gc-compaction.*"
|
||||
)
|
||||
env.pageserver.allowed_errors.append(r".*repartition() called concurrently.*")
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
@@ -172,6 +176,12 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
workload.churn_rows(row_count, env.pageserver.id)
|
||||
|
||||
def compaction_finished():
|
||||
queue_depth = len(ps_http.timeline_compact_info(tenant_id, timeline_id))
|
||||
assert queue_depth == 0
|
||||
|
||||
wait_until(compaction_finished, timeout=60)
|
||||
|
||||
# ensure gc_compaction is scheduled and it's actually running (instead of skipping due to no layers picked)
|
||||
env.pageserver.assert_log_contains(
|
||||
"scheduled_compact_timeline.*picked .* layers for compaction"
|
||||
|
||||
@@ -84,6 +84,8 @@ def test_pgdata_import_smoke(
|
||||
elif rel_block_size == RelBlockSize.TWO_STRPES_PER_SHARD:
|
||||
target_relblock_size = (shard_count or 1) * stripe_size * 8192 * 2
|
||||
elif rel_block_size == RelBlockSize.MULTIPLE_RELATION_SEGMENTS:
|
||||
# Postgres uses a 1GiB segment size, fixed at compile time, so we must use >2GB of data
|
||||
# to exercise multiple segments.
|
||||
target_relblock_size = int(((2.333 * 1024 * 1024 * 1024) // 8192) * 8192)
|
||||
else:
|
||||
raise ValueError
|
||||
@@ -111,9 +113,15 @@ def test_pgdata_import_smoke(
|
||||
|
||||
def validate_vanilla_equivalence(ep):
|
||||
# TODO: would be nicer to just compare pgdump
|
||||
assert ep.safe_psql("select count(*), sum(data::bigint)::bigint from t") == [
|
||||
(expect_nrows, expect_sum)
|
||||
]
|
||||
|
||||
# Enable IO concurrency for batching on large sequential scan, to avoid making
|
||||
# this test unnecessarily onerous on CPU
|
||||
assert ep.safe_psql_many(
|
||||
[
|
||||
"set effective_io_concurrency=32;",
|
||||
"select count(*), sum(data::bigint)::bigint from t",
|
||||
]
|
||||
) == [[], [(expect_nrows, expect_sum)]]
|
||||
|
||||
validate_vanilla_equivalence(vanilla_pg)
|
||||
|
||||
|
||||
@@ -22,7 +22,10 @@ CHECKPOINT_TIMEOUT_SECONDS = 60
|
||||
|
||||
|
||||
async def run_worker_for_tenant(
|
||||
env: NeonEnv, entries: int, tenant: TenantId, offset: int | None = None
|
||||
env: NeonEnv,
|
||||
entries: int,
|
||||
tenant: TenantId,
|
||||
offset: int | None = None,
|
||||
) -> Lsn:
|
||||
if offset is None:
|
||||
offset = 0
|
||||
@@ -37,12 +40,20 @@ async def run_worker_for_tenant(
|
||||
finally:
|
||||
await conn.close(timeout=10)
|
||||
|
||||
last_flush_lsn = Lsn(ep.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
|
||||
loop = asyncio.get_running_loop()
|
||||
sql = await loop.run_in_executor(
|
||||
None, lambda ep: ep.safe_psql("SELECT pg_current_wal_flush_lsn()"), ep
|
||||
)
|
||||
last_flush_lsn = Lsn(sql[0][0])
|
||||
return last_flush_lsn
|
||||
|
||||
|
||||
async def run_worker(env: NeonEnv, tenant_conf, entries: int) -> tuple[TenantId, TimelineId, Lsn]:
|
||||
tenant, timeline = env.create_tenant(conf=tenant_conf)
|
||||
loop = asyncio.get_running_loop()
|
||||
# capture tenant_conf by specifying `tenant_conf=tenant_conf`, otherwise it will be evaluated to some random value
|
||||
tenant, timeline = await loop.run_in_executor(
|
||||
None, lambda tenant_conf, env: env.create_tenant(conf=tenant_conf), tenant_conf, env
|
||||
)
|
||||
last_flush_lsn = await run_worker_for_tenant(env, entries, tenant)
|
||||
return tenant, timeline, last_flush_lsn
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@ from __future__ import annotations
|
||||
|
||||
import time
|
||||
|
||||
from fixtures.neon_fixtures import NeonEnv, logical_replication_sync
|
||||
from fixtures.neon_fixtures import NeonEnv, logical_replication_sync, wait_replica_caughtup
|
||||
|
||||
|
||||
def test_physical_and_logical_replication_slot_not_copied(neon_simple_env: NeonEnv, vanilla_pg):
|
||||
@@ -38,6 +38,8 @@ def test_physical_and_logical_replication_slot_not_copied(neon_simple_env: NeonE
|
||||
for pk in range(n_records):
|
||||
p_cur.execute("insert into t (pk) values (%s)", (pk,))
|
||||
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
|
||||
s_cur.execute("select count(*) from t")
|
||||
assert s_cur.fetchall()[0][0] == n_records
|
||||
|
||||
|
||||
@@ -3009,7 +3009,7 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
|
||||
def eq_safekeeper_records(a: dict[str, Any], b: dict[str, Any]) -> bool:
|
||||
compared = [dict(a), dict(b)]
|
||||
|
||||
masked_keys = ["created_at", "updated_at"]
|
||||
masked_keys = ["created_at", "updated_at", "active"]
|
||||
|
||||
for d in compared:
|
||||
# keep deleting these in case we are comparing the body as it will be uploaded by real scripts
|
||||
|
||||
2
vendor/postgres-v17
vendored
2
vendor/postgres-v17
vendored
Submodule vendor/postgres-v17 updated: 65c4e46baf...7e3f3974bc
2
vendor/revisions.json
vendored
2
vendor/revisions.json
vendored
@@ -1,7 +1,7 @@
|
||||
{
|
||||
"v17": [
|
||||
"17.2",
|
||||
"65c4e46baf56ec05412c7dd63d62faff0b33dcfb"
|
||||
"7e3f3974bc8895938308f94d0e96879ffae638cd"
|
||||
],
|
||||
"v16": [
|
||||
"16.6",
|
||||
|
||||
Reference in New Issue
Block a user