Compare commits

..

6 Commits

Author SHA1 Message Date
Joonas Koivunen
767c8bb95f implement without closing the semaphore
not closing the semaphore requires an additional atomic boolean of state
which is used to describe if the permit received is unique or not. this
does not however change the problem of progress for the
losing/descheduled initializer future.

take_and_deinit does become async in this version, because all pending
initializers are waited out before actually taking anything. in
practice, there should be no-one to await for.

commit also adds DEBUG level span, debug and trace level logging.
2024-02-07 08:51:47 +00:00
Joonas Koivunen
f5b0e723cb test: rewrite back from macro_rules 2024-02-07 06:50:43 +00:00
Joonas Koivunen
9d11fcca02 add comforting debug_assert_eq 2024-02-06 18:34:30 +00:00
Joonas Koivunen
013496c42b cleanup 2024-02-06 18:34:30 +00:00
Joonas Koivunen
0d417d7c0f fix without additional arc clones 2024-02-06 18:34:30 +00:00
Joonas Koivunen
fb5420a959 test: reproduce the race 2024-02-06 18:34:30 +00:00
25 changed files with 512 additions and 492 deletions

View File

@@ -179,12 +179,6 @@ runs:
aws s3 rm "s3://${BUCKET}/${LOCK_FILE}"
fi
- name: Cache poetry deps
uses: actions/cache@v3
with:
path: ~/.cache/pypoetry/virtualenvs
key: v2-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }}
- name: Store Allure test stat in the DB (new)
if: ${{ !cancelled() && inputs.store-test-results-into-db == 'true' }}
shell: bash -euxo pipefail {0}

View File

@@ -44,10 +44,6 @@ inputs:
description: 'Postgres version to use for tests'
required: false
default: 'v14'
benchmark_durations:
description: 'benchmark durations JSON'
required: false
default: '{}'
runs:
using: "composite"
@@ -86,10 +82,11 @@ runs:
fetch-depth: 1
- name: Cache poetry deps
id: cache_poetry
uses: actions/cache@v3
with:
path: ~/.cache/pypoetry/virtualenvs
key: v2-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }}
key: v1-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }}
- name: Install Python deps
shell: bash -euxo pipefail {0}
@@ -163,7 +160,7 @@ runs:
# We use pytest-split plugin to run benchmarks in parallel on different CI runners
if [ "${TEST_SELECTION}" = "test_runner/performance" ] && [ "${{ inputs.build_type }}" != "remote" ]; then
mkdir -p $TEST_OUTPUT
echo '${{ inputs.benchmark_durations || '{}' }}' > $TEST_OUTPUT/benchmark_durations.json
poetry run ./scripts/benchmark_durations.py "${TEST_RESULT_CONNSTR}" --days 10 --output "$TEST_OUTPUT/benchmark_durations.json"
EXTRA_PARAMS="--durations-path $TEST_OUTPUT/benchmark_durations.json $EXTRA_PARAMS"
fi

View File

@@ -112,10 +112,11 @@ jobs:
fetch-depth: 1
- name: Cache poetry deps
id: cache_poetry
uses: actions/cache@v3
with:
path: ~/.cache/pypoetry/virtualenvs
key: v2-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }}
key: v1-codestyle-python-deps-${{ hashFiles('poetry.lock') }}
- name: Install Python deps
run: ./scripts/pysync
@@ -131,7 +132,7 @@ jobs:
check-codestyle-rust:
needs: [ check-permissions, build-buildtools-image ]
runs-on: [ self-hosted, gen3, small ]
runs-on: [ self-hosted, gen3, large ]
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:${{ needs.build-buildtools-image.outputs.build-tools-tag }}
options: --init
@@ -477,40 +478,8 @@ jobs:
if: matrix.build_type == 'debug' && matrix.pg_version == 'v14'
uses: ./.github/actions/save-coverage-data
get-benchmarks-durations:
outputs:
json: ${{ steps.get-benchmark-durations.outputs.json }}
needs: [ check-permissions, build-buildtools-image ]
runs-on: [ self-hosted, gen3, small ]
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:${{ needs.build-buildtools-image.outputs.build-tools-tag }}
options: --init
if: github.ref_name == 'main' || contains(github.event.pull_request.labels.*.name, 'run-benchmarks')
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Cache poetry deps
uses: actions/cache@v3
with:
path: ~/.cache/pypoetry/virtualenvs
key: v1-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }}
- name: Install Python deps
run: ./scripts/pysync
- name: get benchmark durations
id: get-benchmark-durations
env:
TEST_RESULT_CONNSTR: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
run: |
poetry run ./scripts/benchmark_durations.py "${TEST_RESULT_CONNSTR}" \
--days 10 \
--output /tmp/benchmark_durations.json
echo "json=$(jq --compact-output '.' /tmp/benchmark_durations.json)" >> $GITHUB_OUTPUT
benchmarks:
needs: [ check-permissions, build-neon, build-buildtools-image, get-benchmarks-durations ]
needs: [ check-permissions, build-neon, build-buildtools-image ]
runs-on: [ self-hosted, gen3, small ]
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:${{ needs.build-buildtools-image.outputs.build-tools-tag }}
@@ -521,7 +490,7 @@ jobs:
fail-fast: false
matrix:
# the amount of groups (N) should be reflected in `extra_params: --splits N ...`
pytest_split_group: [ 1, 2, 3, 4, 5 ]
pytest_split_group: [ 1, 2, 3, 4 ]
build_type: [ release ]
steps:
- name: Checkout
@@ -534,8 +503,7 @@ jobs:
test_selection: performance
run_in_parallel: false
save_perf_report: ${{ github.ref_name == 'main' }}
extra_params: --splits 5 --group ${{ matrix.pytest_split_group }}
benchmark_durations: ${{ needs.get-benchmarks-durations.outputs.json }}
extra_params: --splits 4 --group ${{ matrix.pytest_split_group }}
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"

View File

@@ -38,10 +38,11 @@ jobs:
uses: snok/install-poetry@v1
- name: Cache poetry deps
id: cache_poetry
uses: actions/cache@v3
with:
path: ~/.cache/pypoetry/virtualenvs
key: v2-${{ runner.os }}-python-deps-ubunutu-latest-${{ hashFiles('poetry.lock') }}
key: v1-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }}
- name: Install Python deps
shell: bash -euxo pipefail {0}

23
Cargo.lock generated
View File

@@ -289,7 +289,6 @@ dependencies = [
"pageserver_api",
"pageserver_client",
"postgres_connection",
"r2d2",
"reqwest",
"serde",
"serde_json",
@@ -1652,7 +1651,6 @@ dependencies = [
"diesel_derives",
"itoa",
"pq-sys",
"r2d2",
"serde_json",
]
@@ -4168,17 +4166,6 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "r2d2"
version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93"
dependencies = [
"log",
"parking_lot 0.12.1",
"scheduled-thread-pool",
]
[[package]]
name = "rand"
version = "0.7.3"
@@ -4892,15 +4879,6 @@ dependencies = [
"windows-sys 0.42.0",
]
[[package]]
name = "scheduled-thread-pool"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19"
dependencies = [
"parking_lot 0.12.1",
]
[[package]]
name = "scopeguard"
version = "1.1.0"
@@ -6829,7 +6807,6 @@ dependencies = [
"clap_builder",
"crossbeam-utils",
"diesel",
"diesel_derives",
"either",
"fail",
"futures-channel",

View File

@@ -111,7 +111,7 @@ USER nonroot:nonroot
WORKDIR /home/nonroot
# Python
ENV PYTHON_VERSION=3.9.18 \
ENV PYTHON_VERSION=3.9.2 \
PYENV_ROOT=/home/nonroot/.pyenv \
PATH=/home/nonroot/.pyenv/shims:/home/nonroot/.pyenv/bin:/home/nonroot/.poetry/bin:$PATH
RUN set -e \

View File

@@ -207,7 +207,6 @@ fn maybe_cgexec(cmd: &str) -> Command {
/// Create special neon_superuser role, that's a slightly nerfed version of a real superuser
/// that we give to customers
#[instrument(skip_all)]
fn create_neon_superuser(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
let roles = spec
.cluster

View File

@@ -24,9 +24,8 @@ tokio.workspace = true
tokio-util.workspace = true
tracing.workspace = true
diesel = { version = "2.1.4", features = ["serde_json", "postgres", "r2d2"] }
diesel = { version = "2.1.4", features = ["serde_json", "postgres"] }
diesel_migrations = { version = "2.1.0" }
r2d2 = { version = "0.8.10" }
utils = { path = "../../libs/utils/" }
metrics = { path = "../../libs/metrics/" }

View File

@@ -170,14 +170,14 @@ impl ComputeHook {
reconfigure_request: &ComputeHookNotifyRequest,
cancel: &CancellationToken,
) -> Result<(), NotifyError> {
let req = client.request(Method::PUT, url);
let req = client.request(Method::POST, url);
let req = if let Some(value) = &self.authorization_header {
req.header(reqwest::header::AUTHORIZATION, value)
} else {
req
};
tracing::info!(
tracing::debug!(
"Sending notify request to {} ({:?})",
url,
reconfigure_request

View File

@@ -34,9 +34,9 @@ struct Cli {
#[arg(short, long)]
listen: std::net::SocketAddr,
/// Public key for JWT authentication of clients
/// Path to public key for JWT authentication of clients
#[arg(long)]
public_key: Option<String>,
public_key: Option<camino::Utf8PathBuf>,
/// Token for authenticating this service with the pageservers it controls
#[arg(long)]
@@ -159,7 +159,7 @@ impl Secrets {
fn load_cli(database_url: &str, args: &Cli) -> anyhow::Result<Self> {
let public_key = match &args.public_key {
None => None,
Some(key) => Some(JwtAuth::from_key(key.clone()).context("Loading public key")?),
Some(key_path) => Some(JwtAuth::from_key_path(key_path)?),
};
Ok(Self {
database_url: database_url.to_owned(),
@@ -170,7 +170,6 @@ impl Secrets {
}
}
/// Execute the diesel migrations that are built into this binary
async fn migration_run(database_url: &str) -> anyhow::Result<()> {
use diesel::PgConnection;
use diesel_migrations::{HarnessWithOutput, MigrationHarness};
@@ -184,18 +183,8 @@ async fn migration_run(database_url: &str) -> anyhow::Result<()> {
Ok(())
}
fn main() -> anyhow::Result<()> {
tokio::runtime::Builder::new_current_thread()
// We use spawn_blocking for database operations, so require approximately
// as many blocking threads as we will open database connections.
.max_blocking_threads(Persistence::MAX_CONNECTIONS as usize)
.enable_all()
.build()
.unwrap()
.block_on(async_main())
}
async fn async_main() -> anyhow::Result<()> {
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let launch_ts = Box::leak(Box::new(LaunchTimestamp::generate()));
logging::init(

View File

@@ -1,6 +1,5 @@
use std::collections::HashMap;
use std::str::FromStr;
use std::time::Duration;
use camino::Utf8Path;
use camino::Utf8PathBuf;
@@ -45,7 +44,7 @@ use crate::PlacementPolicy;
/// updated, and reads of nodes are always from memory, not the database. We only require that
/// we can UPDATE a node's scheduling mode reasonably quickly to mark a bad node offline.
pub struct Persistence {
connection_pool: diesel::r2d2::Pool<diesel::r2d2::ConnectionManager<PgConnection>>,
database_url: String,
// In test environments, we support loading+saving a JSON file. This is temporary, for the benefit of
// test_compatibility.py, so that we don't have to commit to making the database contents fully backward/forward
@@ -65,8 +64,6 @@ pub(crate) enum DatabaseError {
Query(#[from] diesel::result::Error),
#[error(transparent)]
Connection(#[from] diesel::result::ConnectionError),
#[error(transparent)]
ConnectionPool(#[from] r2d2::Error),
#[error("Logical error: {0}")]
Logical(String),
}
@@ -74,31 +71,9 @@ pub(crate) enum DatabaseError {
pub(crate) type DatabaseResult<T> = Result<T, DatabaseError>;
impl Persistence {
// The default postgres connection limit is 100. We use up to 99, to leave one free for a human admin under
// normal circumstances. This assumes we have exclusive use of the database cluster to which we connect.
pub const MAX_CONNECTIONS: u32 = 99;
// We don't want to keep a lot of connections alive: close them down promptly if they aren't being used.
const IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10);
const MAX_CONNECTION_LIFETIME: Duration = Duration::from_secs(60);
pub fn new(database_url: String, json_path: Option<Utf8PathBuf>) -> Self {
let manager = diesel::r2d2::ConnectionManager::<PgConnection>::new(database_url);
// We will use a connection pool: this is primarily to _limit_ our connection count, rather than to optimize time
// to execute queries (database queries are not generally on latency-sensitive paths).
let connection_pool = diesel::r2d2::Pool::builder()
.max_size(Self::MAX_CONNECTIONS)
.max_lifetime(Some(Self::MAX_CONNECTION_LIFETIME))
.idle_timeout(Some(Self::IDLE_CONNECTION_TIMEOUT))
// Always keep at least one connection ready to go
.min_idle(Some(1))
.test_on_check_out(true)
.build(manager)
.expect("Could not build connection pool");
Self {
connection_pool,
database_url,
json_path,
}
}
@@ -109,10 +84,14 @@ impl Persistence {
F: Fn(&mut PgConnection) -> DatabaseResult<R> + Send + 'static,
R: Send + 'static,
{
let mut conn = self.connection_pool.get()?;
tokio::task::spawn_blocking(move || -> DatabaseResult<R> { func(&mut conn) })
.await
.expect("Task panic")
let database_url = self.database_url.clone();
tokio::task::spawn_blocking(move || -> DatabaseResult<R> {
// TODO: connection pooling, such as via diesel::r2d2
let mut conn = PgConnection::establish(&database_url)?;
func(&mut conn)
})
.await
.expect("Task panic")
}
/// When a node is first registered, persist it before using it for anything

View File

@@ -103,9 +103,7 @@ impl From<DatabaseError> for ApiError {
match err {
DatabaseError::Query(e) => ApiError::InternalServerError(e.into()),
// FIXME: ApiError doesn't have an Unavailable variant, but ShuttingDown maps to 503.
DatabaseError::Connection(_) | DatabaseError::ConnectionPool(_) => {
ApiError::ShuttingDown
}
DatabaseError::Connection(_e) => ApiError::ShuttingDown,
DatabaseError::Logical(reason) => {
ApiError::InternalServerError(anyhow::anyhow!(reason))
}

View File

@@ -28,7 +28,7 @@ pub struct AttachmentService {
listen: String,
path: Utf8PathBuf,
jwt_token: Option<String>,
public_key: Option<String>,
public_key_path: Option<Utf8PathBuf>,
postgres_port: u16,
client: reqwest::Client,
}
@@ -207,7 +207,7 @@ impl AttachmentService {
.pageservers
.first()
.expect("Config is validated to contain at least one pageserver");
let (jwt_token, public_key) = match ps_conf.http_auth_type {
let (jwt_token, public_key_path) = match ps_conf.http_auth_type {
AuthType::Trust => (None, None),
AuthType::NeonJWT => {
let jwt_token = env
@@ -219,26 +219,7 @@ impl AttachmentService {
let public_key_path =
camino::Utf8PathBuf::try_from(env.base_data_dir.join("auth_public_key.pem"))
.unwrap();
// This service takes keys as a string rather than as a path to a file/dir: read the key into memory.
let public_key = if std::fs::metadata(&public_key_path)
.expect("Can't stat public key")
.is_dir()
{
// Our config may specify a directory: this is for the pageserver's ability to handle multiple
// keys. We only use one key at a time, so, arbitrarily load the first one in the directory.
let mut dir =
std::fs::read_dir(&public_key_path).expect("Can't readdir public key path");
let dent = dir
.next()
.expect("Empty key dir")
.expect("Error reading key dir");
std::fs::read_to_string(dent.path()).expect("Can't read public key")
} else {
std::fs::read_to_string(&public_key_path).expect("Can't read public key")
};
(Some(jwt_token), Some(public_key))
(Some(jwt_token), Some(public_key_path))
}
};
@@ -247,7 +228,7 @@ impl AttachmentService {
path,
listen,
jwt_token,
public_key,
public_key_path,
postgres_port,
client: reqwest::ClientBuilder::new()
.build()
@@ -472,8 +453,8 @@ impl AttachmentService {
args.push(format!("--jwt-token={jwt_token}"));
}
if let Some(public_key) = &self.public_key {
args.push(format!("--public-key=\"{public_key}\""));
if let Some(public_key_path) = &self.public_key_path {
args.push(format!("--public-key={public_key_path}"));
}
if let Some(control_plane_compute_hook_api) = &self.env.control_plane_compute_hook_api {

View File

@@ -1,5 +1,5 @@
use std::sync::{
atomic::{AtomicUsize, Ordering},
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
};
use tokio::sync::Semaphore;
@@ -14,6 +14,18 @@ use tokio::sync::Semaphore;
pub struct OnceCell<T> {
inner: tokio::sync::RwLock<Inner<T>>,
initializers: AtomicUsize,
/// Do we have one permit or `u32::MAX` permits?
///
/// Having one permit means the cell is not initialized, and one winning future could
/// initialize it. The act of initializing the cell adds `u32::MAX` permits and set this to
/// `false`.
///
/// Deinitializing an initialized cell will first take `u32::MAX` permits handing one of them
/// out, then set this back to `true`.
///
/// Because we need to see all changes to this variable, always use Acquire to read, AcqRel to
/// compare_exchange.
has_one_permit: AtomicBool,
}
impl<T> Default for OnceCell<T> {
@@ -22,6 +34,7 @@ impl<T> Default for OnceCell<T> {
Self {
inner: Default::default(),
initializers: AtomicUsize::new(0),
has_one_permit: AtomicBool::new(true),
}
}
}
@@ -47,14 +60,14 @@ impl<T> Default for Inner<T> {
impl<T> OnceCell<T> {
/// Creates an already initialized `OnceCell` with the given value.
pub fn new(value: T) -> Self {
let sem = Semaphore::new(1);
sem.close();
let sem = Semaphore::new(u32::MAX as usize);
Self {
inner: tokio::sync::RwLock::new(Inner {
init_semaphore: Arc::new(sem),
value: Some(value),
}),
initializers: AtomicUsize::new(0),
has_one_permit: AtomicBool::new(false),
}
}
@@ -64,42 +77,47 @@ impl<T> OnceCell<T> {
/// Initializing might wait on any existing [`GuardMut::take_and_deinit`] deinitialization.
///
/// Initialization is panic-safe and cancellation-safe.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
pub async fn get_mut_or_init<F, Fut, E>(&self, factory: F) -> Result<GuardMut<'_, T>, E>
where
F: FnOnce(InitPermit) -> Fut,
Fut: std::future::Future<Output = Result<(T, InitPermit), E>>,
{
let sem = {
loop {
let sem = {
let guard = self.inner.write().await;
if guard.value.is_some() {
tracing::debug!("returning GuardMut over existing value");
return Ok(GuardMut(guard));
}
guard.init_semaphore.clone()
};
{
let permit = {
let _guard = CountWaitingInitializers::start(self);
sem.acquire().await
};
let permit = permit.expect("semaphore is never closed");
if !self.has_one_permit.load(Ordering::Acquire) {
// it is important that the permit is dropped here otherwise there would be a
// deadlock with `take_and_deinit` happening at the same time.
tracing::trace!("seems initialization happened already, trying again");
continue;
}
permit.forget();
}
tracing::trace!("calling factory");
let permit = InitPermit::from(sem);
let (value, permit) = factory(permit).await?;
let guard = self.inner.write().await;
if guard.value.is_some() {
return Ok(GuardMut(guard));
}
guard.init_semaphore.clone()
};
let permit = {
// increment the count for the duration of queued
let _guard = CountWaitingInitializers::start(self);
sem.acquire_owned().await
};
match permit {
Ok(permit) => {
let permit = InitPermit(permit);
let (value, _permit) = factory(permit).await?;
let guard = self.inner.write().await;
Ok(Self::set0(value, guard))
}
Err(_closed) => {
let guard = self.inner.write().await;
assert!(
guard.value.is_some(),
"semaphore got closed, must be initialized"
);
return Ok(GuardMut(guard));
}
return Ok(self.set0(value, guard, permit));
}
}
@@ -107,42 +125,48 @@ impl<T> OnceCell<T> {
/// returning the guard.
///
/// Initialization is panic-safe and cancellation-safe.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
pub async fn get_or_init<F, Fut, E>(&self, factory: F) -> Result<GuardRef<'_, T>, E>
where
F: FnOnce(InitPermit) -> Fut,
Fut: std::future::Future<Output = Result<(T, InitPermit), E>>,
{
let sem = {
let guard = self.inner.read().await;
if guard.value.is_some() {
return Ok(GuardRef(guard));
}
guard.init_semaphore.clone()
};
let permit = {
// increment the count for the duration of queued
let _guard = CountWaitingInitializers::start(self);
sem.acquire_owned().await
};
match permit {
Ok(permit) => {
let permit = InitPermit(permit);
let (value, _permit) = factory(permit).await?;
let guard = self.inner.write().await;
Ok(Self::set0(value, guard).downgrade())
}
Err(_closed) => {
loop {
let sem = {
let guard = self.inner.read().await;
assert!(
guard.value.is_some(),
"semaphore got closed, must be initialized"
);
return Ok(GuardRef(guard));
if guard.value.is_some() {
tracing::debug!("returning GuardRef over existing value");
return Ok(GuardRef(guard));
}
guard.init_semaphore.clone()
};
{
let permit = {
// increment the count for the duration of queued
let _guard = CountWaitingInitializers::start(self);
sem.acquire().await
};
let permit = permit.expect("semaphore is never closed");
if !self.has_one_permit.load(Ordering::Acquire) {
tracing::trace!("seems initialization happened already, trying again");
continue;
} else {
// it is our turn to initialize for sure
}
permit.forget();
}
tracing::trace!("calling factory");
let permit = InitPermit::from(sem);
let (value, permit) = factory(permit).await?;
let guard = self.inner.write().await;
return Ok(self.set0(value, guard, permit).downgrade());
}
}
@@ -152,26 +176,47 @@ impl<T> OnceCell<T> {
/// # Panics
///
/// If the inner has already been initialized.
pub async fn set(&self, value: T, _permit: InitPermit) -> GuardMut<'_, T> {
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
pub async fn set(&self, value: T, permit: InitPermit) -> GuardMut<'_, T> {
let guard = self.inner.write().await;
assert!(
self.has_one_permit.load(Ordering::Acquire),
"cannot set when there are multiple permits"
);
// cannot assert that this permit is for self.inner.semaphore, but we can assert it cannot
// give more permits right now.
if guard.init_semaphore.try_acquire().is_ok() {
let available = guard.init_semaphore.available_permits();
drop(guard);
panic!("permit is of wrong origin");
panic!("permit is of wrong origin: {available}");
}
Self::set0(value, guard)
self.set0(value, guard, permit)
}
fn set0(value: T, mut guard: tokio::sync::RwLockWriteGuard<'_, Inner<T>>) -> GuardMut<'_, T> {
fn set0<'a>(
&'a self,
value: T,
mut guard: tokio::sync::RwLockWriteGuard<'a, Inner<T>>,
permit: InitPermit,
) -> GuardMut<'a, T> {
if guard.value.is_some() {
drop(guard);
unreachable!("we won permit, must not be initialized");
}
guard.value = Some(value);
guard.init_semaphore.close();
assert!(
self.has_one_permit
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
.is_ok(),
"should had only had one permit"
);
permit.forget();
guard.init_semaphore.add_permits(u32::MAX as usize);
tracing::debug!("value initialized");
GuardMut(guard)
}
@@ -199,6 +244,48 @@ impl<T> OnceCell<T> {
pub fn initializer_count(&self) -> usize {
self.initializers.load(Ordering::Relaxed)
}
/// Take the current value, and a new permit for it's deinitialization.
///
/// The permit will be on a semaphore part of the new internal value, and any following
/// [`OnceCell::get_or_init`] will wait on it to complete.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
pub async fn take_and_deinit(&self, mut guard: GuardMut<'_, T>) -> (T, InitPermit) {
// guard exists => we have been initialized
assert!(
!self.has_one_permit.load(Ordering::Acquire),
"has to have all permits after initializing"
);
assert!(guard.0.value.is_some(), "guard exists => initialized");
// we must first drain out all "waiting to initialize" stragglers
tracing::trace!("draining other initializers");
let all_permits = guard
.0
.init_semaphore
.acquire_many(u32::MAX)
.await
.expect("never closed");
all_permits.forget();
tracing::debug!("other initializers drained");
assert_eq!(guard.0.init_semaphore.available_permits(), 0);
// now that the permits have been drained, switch the state
assert!(
self.has_one_permit
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok(),
"there should be only one GuardMut attempting take_and_deinit"
);
let value = guard.0.value.take().unwrap();
// act of creating an init_permit is the same as "adding back one when this is dropped"
let init_permit = InitPermit::from(guard.0.init_semaphore.clone());
(value, init_permit)
}
}
/// DropGuard counter for queued tasks waiting to initialize, mainly accessible for the
@@ -244,24 +331,6 @@ impl<T> std::ops::DerefMut for GuardMut<'_, T> {
}
impl<'a, T> GuardMut<'a, T> {
/// Take the current value, and a new permit for it's deinitialization.
///
/// The permit will be on a semaphore part of the new internal value, and any following
/// [`OnceCell::get_or_init`] will wait on it to complete.
pub fn take_and_deinit(&mut self) -> (T, InitPermit) {
let mut swapped = Inner::default();
let permit = swapped
.init_semaphore
.clone()
.try_acquire_owned()
.expect("we just created this");
std::mem::swap(&mut *self.0, &mut swapped);
swapped
.value
.map(|v| (v, InitPermit(permit)))
.expect("guard is not created unless value has been initialized")
}
pub fn downgrade(self) -> GuardRef<'a, T> {
GuardRef(self.0.downgrade())
}
@@ -282,13 +351,39 @@ impl<T> std::ops::Deref for GuardRef<'_, T> {
}
/// Type held by OnceCell (de)initializing task.
pub struct InitPermit(tokio::sync::OwnedSemaphorePermit);
pub struct InitPermit(Option<Arc<tokio::sync::Semaphore>>);
impl From<Arc<tokio::sync::Semaphore>> for InitPermit {
fn from(value: Arc<tokio::sync::Semaphore>) -> Self {
InitPermit(Some(value))
}
}
impl InitPermit {
fn forget(mut self) {
self.0
.take()
.expect("unable to forget twice, created with None?");
}
}
impl Drop for InitPermit {
fn drop(&mut self) {
if let Some(sem) = self.0.take() {
debug_assert_eq!(sem.available_permits(), 0);
sem.add_permits(1);
}
}
}
#[cfg(test)]
mod tests {
use futures::Future;
use super::*;
use std::{
convert::Infallible,
pin::{pin, Pin},
sync::atomic::{AtomicUsize, Ordering},
time::Duration,
};
@@ -366,11 +461,8 @@ mod tests {
let cell = cell.clone();
let deinitialization_started = deinitialization_started.clone();
async move {
let (answer, _permit) = cell
.get_mut()
.await
.expect("initialized to value")
.take_and_deinit();
let guard = cell.get_mut().await.unwrap();
let (answer, _permit) = cell.take_and_deinit(guard).await;
assert_eq!(answer, initial);
deinitialization_started.wait().await;
@@ -399,12 +491,31 @@ mod tests {
#[tokio::test]
async fn reinit_with_deinit_permit() {
let cell = Arc::new(OnceCell::new(42));
assert!(!cell.has_one_permit.load(Ordering::Acquire));
assert_eq!(
cell.inner.read().await.init_semaphore.available_permits(),
u32::MAX as usize
);
let guard = cell.get_mut().await.unwrap();
assert!(!cell.has_one_permit.load(Ordering::Acquire));
assert_eq!(
guard.0.init_semaphore.available_permits(),
u32::MAX as usize
);
let (mol, permit) = cell.take_and_deinit(guard).await;
assert!(cell.has_one_permit.load(Ordering::Acquire));
assert_eq!(
cell.inner.read().await.init_semaphore.available_permits(),
0
);
let (mol, permit) = cell.get_mut().await.unwrap().take_and_deinit();
cell.set(5, permit).await;
assert_eq!(*cell.get_mut().await.unwrap(), 5);
let (five, permit) = cell.get_mut().await.unwrap().take_and_deinit();
let guard = cell.get_mut().await.unwrap();
let (five, permit) = cell.take_and_deinit(guard).await;
assert_eq!(5, five);
cell.set(mol, permit).await;
assert_eq!(*cell.get_mut().await.unwrap(), 42);
@@ -455,4 +566,110 @@ mod tests {
.unwrap();
assert_eq!(*g, "now initialized");
}
#[tokio::test(start_paused = true)]
async fn reproduce_init_take_deinit_race_ref() {
init_take_deinit_scenario(|cell, factory| {
Box::pin(async {
cell.get_or_init(factory).await.unwrap();
})
})
.await;
}
#[tokio::test(start_paused = true)]
async fn reproduce_init_take_deinit_race_mut() {
init_take_deinit_scenario(|cell, factory| {
Box::pin(async {
cell.get_mut_or_init(factory).await.unwrap();
})
})
.await;
}
type BoxedInitFuture<T, E> = Pin<Box<dyn Future<Output = Result<(T, InitPermit), E>>>>;
type BoxedInitFunction<T, E> = Box<dyn Fn(InitPermit) -> BoxedInitFuture<T, E>>;
/// Reproduce an assertion failure with both initialization methods.
///
/// This has interesting generics to be generic between `get_or_init` and `get_mut_or_init`.
/// Alternative would be a macro_rules! but that is the last resort.
async fn init_take_deinit_scenario<F>(init_way: F)
where
F: for<'a> Fn(
&'a OnceCell<&'static str>,
BoxedInitFunction<&'static str, Infallible>,
) -> Pin<Box<dyn Future<Output = ()> + 'a>>,
{
use tracing::Instrument;
let cell = OnceCell::default();
// acquire the init_semaphore only permit to drive initializing tasks in order to waiting
// on the same semaphore.
let permit = cell
.inner
.read()
.await
.init_semaphore
.clone()
.try_acquire_owned()
.unwrap();
let mut t1 = pin!(init_way(
&cell,
Box::new(|permit| Box::pin(async move { Ok(("t1", permit)) })),
)
.instrument(tracing::info_span!("t1")));
let mut t2 = pin!(init_way(
&cell,
Box::new(|permit| Box::pin(async move { Ok(("t2", permit)) })),
)
.instrument(tracing::info_span!("t2")));
// drive t2 first to the init_semaphore
tokio::select! {
_ = &mut t2 => unreachable!("it cannot get permit"),
_ = tokio::time::sleep(Duration::from_secs(3600 * 24 * 7 * 365)) => {}
}
// followed by t1 in the init_semaphore
tokio::select! {
_ = &mut t1 => unreachable!("it cannot get permit"),
_ = tokio::time::sleep(Duration::from_secs(3600 * 24 * 7 * 365)) => {}
}
// now let t2 proceed and initialize
drop(permit);
t2.await;
// in original implementation which did closing and re-creation of the semaphore, t1 was
// still stuck on the first semaphore, but now that t1 and deinit are using the same
// semaphore, deinit will have to wait for t1.
let mut deinit = pin!(async {
let guard = cell.get_mut().await.unwrap();
cell.take_and_deinit(guard).await
}
.instrument(tracing::info_span!("deinit")));
tokio::select! {
_ = &mut deinit => unreachable!("deinit must not make progress before t1 is complete"),
_ = tokio::time::sleep(Duration::from_secs(3600 * 24 * 7 * 365)) => {}
}
// now originally t1 would see the semaphore it has as closed. it cannot yet get a permit from
// the new one.
tokio::select! {
_ = &mut t1 => unreachable!("it cannot get permit"),
_ = tokio::time::sleep(Duration::from_secs(3600 * 24 * 7 * 365)) => {}
}
let (s, _) = deinit.await;
assert_eq!("t2", s);
t1.await;
assert_eq!("t1", *cell.get().await.unwrap());
}
}

View File

@@ -278,7 +278,7 @@ pub struct Tenant {
// with timelines, which in turn may cause dropping replication connection, expiration of wait_for_lsn
// timeout...
gc_cs: tokio::sync::Mutex<()>,
walredo_mgr: Option<Arc<WalRedoManager>>,
walredo_mgr: Arc<WalRedoManager>,
// provides access to timeline data sitting in the remote storage
pub(crate) remote_storage: Option<GenericRemoteStorage>,
@@ -635,7 +635,7 @@ impl Tenant {
conf,
attached_conf,
shard_identity,
Some(wal_redo_manager),
wal_redo_manager,
tenant_shard_id,
remote_storage.clone(),
deletion_queue_client,
@@ -1195,6 +1195,10 @@ impl Tenant {
tenant_shard_id: TenantShardId,
reason: String,
) -> Arc<Tenant> {
let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
conf,
tenant_shard_id,
)));
Arc::new(Tenant::new(
TenantState::Broken {
reason,
@@ -1205,7 +1209,7 @@ impl Tenant {
// Shard identity isn't meaningful for a broken tenant: it's just a placeholder
// to occupy the slot for this TenantShardId.
ShardIdentity::broken(tenant_shard_id.shard_number, tenant_shard_id.shard_count),
None,
wal_redo_manager,
tenant_shard_id,
None,
DeletionQueueClient::broken(),
@@ -1974,7 +1978,7 @@ impl Tenant {
}
pub(crate) fn wal_redo_manager_status(&self) -> Option<WalRedoManagerStatus> {
self.walredo_mgr.as_ref().and_then(|mgr| mgr.status())
self.walredo_mgr.status()
}
/// Changes tenant status to active, unless shutdown was already requested.
@@ -2609,7 +2613,7 @@ impl Tenant {
self.tenant_shard_id,
self.generation,
self.shard_identity,
self.walredo_mgr.as_ref().map(Arc::clone),
Arc::clone(&self.walredo_mgr),
resources,
pg_version,
state,
@@ -2627,7 +2631,7 @@ impl Tenant {
conf: &'static PageServerConf,
attached_conf: AttachedTenantConf,
shard_identity: ShardIdentity,
walredo_mgr: Option<Arc<WalRedoManager>>,
walredo_mgr: Arc<WalRedoManager>,
tenant_shard_id: TenantShardId,
remote_storage: Option<GenericRemoteStorage>,
deletion_queue_client: DeletionQueueClient,
@@ -4051,7 +4055,7 @@ pub(crate) mod harness {
.unwrap(),
// This is a legacy/test code path: sharding isn't supported here.
ShardIdentity::unsharded(),
Some(walredo_mgr),
walredo_mgr,
self.tenant_shard_id,
Some(self.remote_storage.clone()),
self.deletion_queue.new_client(),

View File

@@ -199,9 +199,7 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
// Perhaps we did no work and the walredo process has been idle for some time:
// give it a chance to shut down to avoid leaving walredo process running indefinitely.
if let Some(walredo_mgr) = &tenant.walredo_mgr {
walredo_mgr.maybe_quiesce(period * 10);
}
tenant.walredo_mgr.maybe_quiesce(period * 10);
// Sleep
if tokio::time::timeout(sleep_duration, cancel.cancelled())

View File

@@ -215,8 +215,8 @@ pub struct Timeline {
// Atomic would be more appropriate here.
last_freeze_ts: RwLock<Instant>,
// WAL redo manager. `None` only for broken tenants.
walredo_mgr: Option<Arc<super::WalRedoManager>>,
// WAL redo manager
walredo_mgr: Arc<super::WalRedoManager>,
/// Remote storage client.
/// See [`remote_timeline_client`](super::remote_timeline_client) module comment for details.
@@ -1427,7 +1427,7 @@ impl Timeline {
tenant_shard_id: TenantShardId,
generation: Generation,
shard_identity: ShardIdentity,
walredo_mgr: Option<Arc<super::WalRedoManager>>,
walredo_mgr: Arc<super::WalRedoManager>,
resources: TimelineResources,
pg_version: u32,
state: TimelineState,
@@ -4457,9 +4457,6 @@ impl Timeline {
let img = match self
.walredo_mgr
.as_ref()
.context("timeline has no walredo manager")
.map_err(PageReconstructError::WalRedo)?
.request_redo(key, request_lsn, data.img, data.records, self.pg_version)
.await
.context("reconstruct a page image")

View File

@@ -343,23 +343,6 @@ pub(super) async fn handle_walreceiver_connection(
modification.commit(&ctx).await?;
uncommitted_records = 0;
filtered_records = 0;
//
// We should check checkpoint distance after appending each ingest_batch_size bytes because otherwise
// layer size can become much larger than `checkpoint_distance`.
// It can append because wal-sender is sending WAL using 125kb chucks and some WAL records can cause writing large
// amount of data to key-value storage. So performing this check only after processing
// all WAL records in the chunk, can cause huge L0 layer files.
//
timeline
.check_checkpoint_distance()
.await
.with_context(|| {
format!(
"Failed to check checkpoint distance for timeline {}",
timeline.timeline_id
)
})?;
}
}

View File

@@ -20,7 +20,7 @@ BENCHMARKS_DURATION_QUERY = """
FROM results
WHERE
started_at > CURRENT_DATE - INTERVAL '%s' day
AND starts_with(parent_suite, 'test_runner.performance')
AND parent_suite = 'test_runner.performance'
AND status = 'passed'
GROUP BY
parent_suite, suite, name
@@ -31,75 +31,68 @@ BENCHMARKS_DURATION_QUERY = """
# the total duration varies from 8 to 40 minutes.
# We use some pre-collected durations as a fallback to have a better distribution.
FALLBACK_DURATION = {
"test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py::test_pageserver_max_throughput_getpage_at_latest_lsn[1-13-30]": 400.15,
"test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py::test_pageserver_max_throughput_getpage_at_latest_lsn[1-6-30]": 372.521,
"test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py::test_pageserver_max_throughput_getpage_at_latest_lsn[10-13-30]": 420.017,
"test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py::test_pageserver_max_throughput_getpage_at_latest_lsn[10-6-30]": 373.769,
"test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py::test_pageserver_max_throughput_getpage_at_latest_lsn[100-13-30]": 678.742,
"test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py::test_pageserver_max_throughput_getpage_at_latest_lsn[100-6-30]": 512.135,
"test_runner/performance/test_branch_creation.py::test_branch_creation_heavy_write[20]": 58.036,
"test_runner/performance/test_branch_creation.py::test_branch_creation_many_relations": 22.104,
"test_runner/performance/test_branch_creation.py::test_branch_creation_many[1024]": 126.073,
"test_runner/performance/test_branching.py::test_compare_child_and_root_pgbench_perf": 25.759,
"test_runner/performance/test_branching.py::test_compare_child_and_root_read_perf": 6.885,
"test_runner/performance/test_branching.py::test_compare_child_and_root_write_perf": 8.758,
"test_runner/performance/test_bulk_insert.py::test_bulk_insert[neon]": 18.275,
"test_runner/performance/test_bulk_insert.py::test_bulk_insert[vanilla]": 9.533,
"test_runner/performance/test_bulk_tenant_create.py::test_bulk_tenant_create[1]": 12.09,
"test_runner/performance/test_bulk_tenant_create.py::test_bulk_tenant_create[10]": 35.145,
"test_runner/performance/test_bulk_tenant_create.py::test_bulk_tenant_create[5]": 22.28,
"test_runner/performance/test_bulk_update.py::test_bulk_update[10]": 66.353,
"test_runner/performance/test_bulk_update.py::test_bulk_update[100]": 75.487,
"test_runner/performance/test_bulk_update.py::test_bulk_update[50]": 54.142,
"test_runner/performance/test_compaction.py::test_compaction": 110.715,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_ro_with_pgbench_select_only[neon-5-10-100]": 11.68,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_ro_with_pgbench_select_only[vanilla-5-10-100]": 16.384,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_rw_with_pgbench_default[neon-5-10-100]": 11.315,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_rw_with_pgbench_default[vanilla-5-10-100]": 18.783,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wal_with_pgbench_default[neon-5-10-100]": 11.647,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wal_with_pgbench_default[vanilla-5-10-100]": 17.04,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wo_with_heavy_write[neon-10-1]": 11.01,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wo_with_heavy_write[neon-10-10]": 11.902,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wo_with_heavy_write[vanilla-10-1]": 10.077,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wo_with_heavy_write[vanilla-10-10]": 10.4,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wo_with_pgbench_simple_update[neon-5-10-100]": 11.33,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wo_with_pgbench_simple_update[vanilla-5-10-100]": 16.434,
"test_runner/performance/test_copy.py::test_copy[neon]": 13.817,
"test_runner/performance/test_copy.py::test_copy[vanilla]": 11.736,
"test_runner/performance/test_gc_feedback.py::test_gc_feedback": 575.735,
"test_runner/performance/test_gist_build.py::test_gist_buffering_build[neon]": 14.868,
"test_runner/performance/test_gist_build.py::test_gist_buffering_build[vanilla]": 14.393,
"test_runner/performance/test_latency.py::test_measure_read_latency_heavy_write_workload[neon-1]": 20.588,
"test_runner/performance/test_latency.py::test_measure_read_latency_heavy_write_workload[vanilla-1]": 30.849,
"test_runner/performance/test_layer_map.py::test_layer_map": 39.378,
"test_runner/performance/test_lazy_startup.py::test_lazy_startup": 2848.938,
"test_runner/performance/test_logical_replication.py::test_logical_replication": 120.952,
"test_runner/performance/test_parallel_copy_to.py::test_parallel_copy_different_tables[neon]": 35.552,
"test_runner/performance/test_parallel_copy_to.py::test_parallel_copy_different_tables[vanilla]": 66.762,
"test_runner/performance/test_parallel_copy_to.py::test_parallel_copy_same_table[neon]": 85.177,
"test_runner/performance/test_parallel_copy_to.py::test_parallel_copy_same_table[vanilla]": 92.12,
"test_runner/performance/test_perf_pgbench.py::test_pgbench[neon-45-10]": 107.009,
"test_runner/performance/test_perf_pgbench.py::test_pgbench[vanilla-45-10]": 99.582,
"test_runner/performance/test_random_writes.py::test_random_writes[neon]": 4.737,
"test_runner/performance/test_random_writes.py::test_random_writes[vanilla]": 2.686,
"test_runner/performance/test_seqscans.py::test_seqscans[neon-100000-100-0]": 3.271,
"test_runner/performance/test_seqscans.py::test_seqscans[neon-10000000-1-0]": 50.719,
"test_runner/performance/test_seqscans.py::test_seqscans[neon-10000000-1-4]": 15.992,
"test_runner/performance/test_seqscans.py::test_seqscans[vanilla-100000-100-0]": 0.566,
"test_runner/performance/test_seqscans.py::test_seqscans[vanilla-10000000-1-0]": 13.542,
"test_runner/performance/test_seqscans.py::test_seqscans[vanilla-10000000-1-4]": 13.35,
"test_runner/performance/test_startup.py::test_startup_simple": 13.043,
"test_runner/performance/test_wal_backpressure.py::test_heavy_write_workload[neon_off-10-5-5]": 194.841,
"test_runner/performance/test_wal_backpressure.py::test_heavy_write_workload[neon_on-10-5-5]": 286.667,
"test_runner/performance/test_wal_backpressure.py::test_heavy_write_workload[vanilla-10-5-5]": 85.577,
"test_runner/performance/test_wal_backpressure.py::test_pgbench_intensive_init_workload[neon_off-1000]": 297.626,
"test_runner/performance/test_wal_backpressure.py::test_pgbench_intensive_init_workload[neon_on-1000]": 646.187,
"test_runner/performance/test_wal_backpressure.py::test_pgbench_intensive_init_workload[vanilla-1000]": 989.776,
"test_runner/performance/test_wal_backpressure.py::test_pgbench_simple_update_workload[neon_off-45-100]": 125.638,
"test_runner/performance/test_wal_backpressure.py::test_pgbench_simple_update_workload[neon_on-45-100]": 123.554,
"test_runner/performance/test_wal_backpressure.py::test_pgbench_simple_update_workload[vanilla-45-100]": 190.083,
"test_runner/performance/test_write_amplification.py::test_write_amplification[neon]": 21.016,
"test_runner/performance/test_write_amplification.py::test_write_amplification[vanilla]": 23.028,
"test_runner/performance/test_branch_creation.py::test_branch_creation_heavy_write[20]": 62.144,
"test_runner/performance/test_branch_creation.py::test_branch_creation_many[1024]": 90.941,
"test_runner/performance/test_branch_creation.py::test_branch_creation_many_relations": 26.053,
"test_runner/performance/test_branching.py::test_compare_child_and_root_pgbench_perf": 25.67,
"test_runner/performance/test_branching.py::test_compare_child_and_root_read_perf": 14.497,
"test_runner/performance/test_branching.py::test_compare_child_and_root_write_perf": 18.852,
"test_runner/performance/test_bulk_insert.py::test_bulk_insert[neon]": 26.572,
"test_runner/performance/test_bulk_insert.py::test_bulk_insert[vanilla]": 6.259,
"test_runner/performance/test_bulk_tenant_create.py::test_bulk_tenant_create[10]": 21.206,
"test_runner/performance/test_bulk_tenant_create.py::test_bulk_tenant_create[1]": 3.474,
"test_runner/performance/test_bulk_tenant_create.py::test_bulk_tenant_create[5]": 11.262,
"test_runner/performance/test_bulk_update.py::test_bulk_update[100]": 94.225,
"test_runner/performance/test_bulk_update.py::test_bulk_update[10]": 68.159,
"test_runner/performance/test_bulk_update.py::test_bulk_update[50]": 76.719,
"test_runner/performance/test_compaction.py::test_compaction": 110.222,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_ro_with_pgbench_select_only[neon-5-10-100]": 10.743,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_ro_with_pgbench_select_only[vanilla-5-10-100]": 16.541,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_rw_with_pgbench_default[neon-5-10-100]": 11.109,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_rw_with_pgbench_default[vanilla-5-10-100]": 18.121,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wal_with_pgbench_default[neon-5-10-100]": 11.3,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wal_with_pgbench_default[vanilla-5-10-100]": 16.086,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wo_with_heavy_write[neon-10-10]": 12.024,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wo_with_heavy_write[neon-10-1]": 11.14,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wo_with_heavy_write[vanilla-10-10]": 10.375,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wo_with_heavy_write[vanilla-10-1]": 10.075,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wo_with_pgbench_simple_update[neon-5-10-100]": 11.147,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wo_with_pgbench_simple_update[vanilla-5-10-100]": 16.321,
"test_runner/performance/test_copy.py::test_copy[neon]": 16.579,
"test_runner/performance/test_copy.py::test_copy[vanilla]": 10.094,
"test_runner/performance/test_gc_feedback.py::test_gc_feedback": 590.157,
"test_runner/performance/test_gist_build.py::test_gist_buffering_build[neon]": 14.102,
"test_runner/performance/test_gist_build.py::test_gist_buffering_build[vanilla]": 8.677,
"test_runner/performance/test_latency.py::test_measure_read_latency_heavy_write_workload[neon-1]": 31.079,
"test_runner/performance/test_latency.py::test_measure_read_latency_heavy_write_workload[vanilla-1]": 38.119,
"test_runner/performance/test_layer_map.py::test_layer_map": 24.784,
"test_runner/performance/test_logical_replication.py::test_logical_replication": 117.707,
"test_runner/performance/test_parallel_copy_to.py::test_parallel_copy_different_tables[neon]": 21.194,
"test_runner/performance/test_parallel_copy_to.py::test_parallel_copy_different_tables[vanilla]": 59.068,
"test_runner/performance/test_parallel_copy_to.py::test_parallel_copy_same_table[neon]": 73.235,
"test_runner/performance/test_parallel_copy_to.py::test_parallel_copy_same_table[vanilla]": 82.586,
"test_runner/performance/test_perf_pgbench.py::test_pgbench[neon-45-10]": 106.536,
"test_runner/performance/test_perf_pgbench.py::test_pgbench[vanilla-45-10]": 98.753,
"test_runner/performance/test_random_writes.py::test_random_writes[neon]": 6.975,
"test_runner/performance/test_random_writes.py::test_random_writes[vanilla]": 3.69,
"test_runner/performance/test_seqscans.py::test_seqscans[neon-100000-100-0]": 3.529,
"test_runner/performance/test_seqscans.py::test_seqscans[neon-10000000-1-0]": 64.522,
"test_runner/performance/test_seqscans.py::test_seqscans[neon-10000000-1-4]": 40.964,
"test_runner/performance/test_seqscans.py::test_seqscans[vanilla-100000-100-0]": 0.55,
"test_runner/performance/test_seqscans.py::test_seqscans[vanilla-10000000-1-0]": 12.189,
"test_runner/performance/test_seqscans.py::test_seqscans[vanilla-10000000-1-4]": 13.899,
"test_runner/performance/test_startup.py::test_startup_simple": 2.51,
"test_runner/performance/test_wal_backpressure.py::test_heavy_write_workload[neon_off-10-5-5]": 527.245,
"test_runner/performance/test_wal_backpressure.py::test_heavy_write_workload[neon_on-10-5-5]": 583.46,
"test_runner/performance/test_wal_backpressure.py::test_heavy_write_workload[vanilla-10-5-5]": 113.653,
"test_runner/performance/test_wal_backpressure.py::test_pgbench_intensive_init_workload[neon_off-1000]": 233.728,
"test_runner/performance/test_wal_backpressure.py::test_pgbench_intensive_init_workload[neon_on-1000]": 419.093,
"test_runner/performance/test_wal_backpressure.py::test_pgbench_intensive_init_workload[vanilla-1000]": 982.461,
"test_runner/performance/test_wal_backpressure.py::test_pgbench_simple_update_workload[neon_off-45-100]": 116.522,
"test_runner/performance/test_wal_backpressure.py::test_pgbench_simple_update_workload[neon_on-45-100]": 115.583,
"test_runner/performance/test_wal_backpressure.py::test_pgbench_simple_update_workload[vanilla-45-100]": 155.282,
"test_runner/performance/test_write_amplification.py::test_write_amplification[neon]": 26.704,
"test_runner/performance/test_write_amplification.py::test_write_amplification[vanilla]": 16.088,
}

View File

@@ -563,13 +563,13 @@ class PageserverHttpClient(requests.Session):
self,
tenant_id: Union[TenantId, TenantShardId],
timeline_id: TimelineId,
timestamp: datetime,
timestamp,
):
log.info(
f"Requesting lsn by timestamp {timestamp}, tenant {tenant_id}, timeline {timeline_id}"
)
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp?timestamp={timestamp.isoformat()}Z",
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp?timestamp={timestamp}",
)
self.verbose_error(res)
res_json = res.json()

View File

@@ -26,81 +26,86 @@ from fixtures.neon_fixtures import NeonEnvBuilder
# apply during config step, like more users, databases, or extensions. By default
# we load extensions 'neon,pg_stat_statements,timescaledb,pg_cron', but in this
# test we only load neon.
@pytest.mark.timeout(1800)
@pytest.mark.parametrize("slru", ["lazy", "eager"])
def test_lazy_startup(slru: str, neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker):
@pytest.mark.timeout(1000)
def test_lazy_startup(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
lazy_slru_download = "true" if slru == "lazy" else "false"
tenant, _ = env.neon_cli.create_tenant(
lazy_tenant, _ = env.neon_cli.create_tenant(
conf={
"lazy_slru_download": lazy_slru_download,
"lazy_slru_download": "true",
}
)
endpoint = env.endpoints.create_start("main", tenant_id=tenant)
with endpoint.cursor() as cur:
cur.execute("CREATE TABLE t (pk integer PRIMARY KEY, x integer)")
cur.execute("ALTER TABLE t SET (autovacuum_enabled = false)")
cur.execute("INSERT INTO t VALUES (1, 0)")
cur.execute(
"""
CREATE PROCEDURE updating() as
$$
DECLARE
i integer;
BEGIN
FOR i IN 1..1000000 LOOP
UPDATE t SET x = x + 1 WHERE pk=1;
COMMIT;
END LOOP;
END
$$ LANGUAGE plpgsql
"""
)
cur.execute("SET statement_timeout=0")
cur.execute("call updating()")
endpoint.stop()
# We do two iterations so we can see if the second startup is faster. It should
# be because the compute node should already be configured with roles, databases,
# extensions, etc from the first run.
for i in range(2):
# Start
with zenbenchmark.record_duration(f"{slru}_{i}_start"):
endpoint.start()
with zenbenchmark.record_duration(f"{slru}_{i}_select"):
sum = endpoint.safe_psql("select sum(x) from t")[0][0]
assert sum == 1000000
# Get metrics
metrics = requests.get(f"http://localhost:{endpoint.http_port}/metrics.json").json()
durations = {
"wait_for_spec_ms": f"{slru}_{i}_wait_for_spec",
"sync_safekeepers_ms": f"{slru}_{i}_sync_safekeepers",
"sync_sk_check_ms": f"{slru}_{i}_sync_sk_check",
"basebackup_ms": f"{slru}_{i}_basebackup",
"start_postgres_ms": f"{slru}_{i}_start_postgres",
"config_ms": f"{slru}_{i}_config",
"total_startup_ms": f"{slru}_{i}_total_startup",
eager_tenant, _ = env.neon_cli.create_tenant(
conf={
"lazy_slru_download": "false",
}
for key, name in durations.items():
value = metrics[key]
zenbenchmark.record(name, value, "ms", report=MetricReport.LOWER_IS_BETTER)
basebackup_bytes = metrics["basebackup_bytes"]
zenbenchmark.record(
f"{slru}_{i}_basebackup_bytes",
basebackup_bytes,
"bytes",
report=MetricReport.LOWER_IS_BETTER,
)
tenants = [lazy_tenant, eager_tenant]
slru = "lazy"
for tenant in tenants:
endpoint = env.endpoints.create_start("main", tenant_id=tenant)
endpoint.safe_psql("CREATE TABLE t (pk integer PRIMARY KEY, x integer)")
endpoint.safe_psql("ALTER TABLE t SET (autovacuum_enabled = false)")
endpoint.safe_psql("INSERT INTO t VALUES (1, 0)")
endpoint.safe_psql(
"""
CREATE PROCEDURE updating() as
$$
DECLARE
i integer;
BEGIN
FOR i IN 1..10000000 LOOP
UPDATE t SET x = x + 1 WHERE pk=1;
COMMIT;
END LOOP;
END
$$ LANGUAGE plpgsql
"""
)
endpoint.safe_psql("SET statement_timeout=0")
endpoint.safe_psql("call updating()")
# Stop so we can restart
endpoint.stop()
# Imitate optimizations that console would do for the second start
endpoint.respec(skip_pg_catalog_updates=True)
# We do two iterations so we can see if the second startup is faster. It should
# be because the compute node should already be configured with roles, databases,
# extensions, etc from the first run.
for i in range(2):
# Start
with zenbenchmark.record_duration(f"{slru}_{i}_start"):
endpoint.start()
with zenbenchmark.record_duration(f"{slru}_{i}_select"):
sum = endpoint.safe_psql("select sum(x) from t")[0][0]
assert sum == 10000000
# Get metrics
metrics = requests.get(f"http://localhost:{endpoint.http_port}/metrics.json").json()
durations = {
"wait_for_spec_ms": f"{slru}_{i}_wait_for_spec",
"sync_safekeepers_ms": f"{slru}_{i}_sync_safekeepers",
"sync_sk_check_ms": f"{slru}_{i}_sync_sk_check",
"basebackup_ms": f"{slru}_{i}_basebackup",
"start_postgres_ms": f"{slru}_{i}_start_postgres",
"config_ms": f"{slru}_{i}_config",
"total_startup_ms": f"{slru}_{i}_total_startup",
}
for key, name in durations.items():
value = metrics[key]
zenbenchmark.record(name, value, "ms", report=MetricReport.LOWER_IS_BETTER)
basebackup_bytes = metrics["basebackup_bytes"]
zenbenchmark.record(
f"{slru}_{i}_basebackup_bytes",
basebackup_bytes,
"bytes",
report=MetricReport.LOWER_IS_BETTER,
)
# Stop so we can restart
endpoint.stop()
# Imitate optimizations that console would do for the second start
endpoint.respec(skip_pg_catalog_updates=True)
slru = "eager"

View File

@@ -1,66 +0,0 @@
import os
import time
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
logical_replication_sync,
)
from fixtures.pg_version import PgVersion
def test_layer_bloating(neon_simple_env: NeonEnv, vanilla_pg):
env = neon_simple_env
if env.pg_version != PgVersion.V16:
pytest.skip("pg_log_standby_snapshot() function is available only in PG16")
timeline = env.neon_cli.create_branch("test_logical_replication", "empty")
endpoint = env.endpoints.create_start(
"test_logical_replication", config_lines=["log_statement=all"]
)
log.info("postgres is running on 'test_logical_replication' branch")
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
# create table...
cur.execute("create table t(pk integer primary key)")
cur.execute("create publication pub1 for table t")
# Create slot to hold WAL
cur.execute("select pg_create_logical_replication_slot('my_slot', 'pgoutput')")
# now start subscriber
vanilla_pg.start()
vanilla_pg.safe_psql("create table t(pk integer primary key)")
connstr = endpoint.connstr().replace("'", "''")
log.info(f"ep connstr is {endpoint.connstr()}, subscriber connstr {vanilla_pg.connstr()}")
vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1")
cur.execute(
"""create or replace function create_snapshots(n integer) returns void as $$
declare
i integer;
begin
for i in 1..n loop
perform pg_log_standby_snapshot();
end loop;
end; $$ language plpgsql"""
)
cur.execute("set statement_timeout=0")
cur.execute("select create_snapshots(10000)")
# Wait logical replication to sync
logical_replication_sync(vanilla_pg, endpoint)
time.sleep(10)
# Check layer file sizes
timeline_path = "{}/tenants/{}/timelines/{}/".format(
env.pageserver.workdir, env.initial_tenant, timeline
)
log.info(f"Check {timeline_path}")
for filename in os.listdir(timeline_path):
if filename.startswith("00000"):
log.info(f"layer {filename} size is {os.path.getsize(timeline_path + filename)}")
assert os.path.getsize(timeline_path + filename) < 512_000_000

View File

@@ -64,14 +64,18 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
# Check edge cases
# Timestamp is in the future
probe_timestamp = tbl[-1][1] + timedelta(hours=1)
result = client.timeline_get_lsn_by_timestamp(tenant_id, timeline_id, probe_timestamp)
result = client.timeline_get_lsn_by_timestamp(
tenant_id, timeline_id, f"{probe_timestamp.isoformat()}Z"
)
assert result["kind"] == "future"
# make sure that we return a well advanced lsn here
assert Lsn(result["lsn"]) > start_lsn
# Timestamp is in the unreachable past
probe_timestamp = tbl[0][1] - timedelta(hours=10)
result = client.timeline_get_lsn_by_timestamp(tenant_id, timeline_id, probe_timestamp)
result = client.timeline_get_lsn_by_timestamp(
tenant_id, timeline_id, f"{probe_timestamp.isoformat()}Z"
)
assert result["kind"] == "past"
# make sure that we return the minimum lsn here at the start of the range
assert Lsn(result["lsn"]) < start_lsn
@@ -79,7 +83,9 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
# Probe a bunch of timestamps in the valid range
for i in range(1, len(tbl), 100):
probe_timestamp = tbl[i][1]
result = client.timeline_get_lsn_by_timestamp(tenant_id, timeline_id, probe_timestamp)
result = client.timeline_get_lsn_by_timestamp(
tenant_id, timeline_id, f"{probe_timestamp.isoformat()}Z"
)
assert result["kind"] not in ["past", "nodata"]
lsn = result["lsn"]
# Call get_lsn_by_timestamp to get the LSN
@@ -102,7 +108,9 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
# Timestamp is in the unreachable past
probe_timestamp = tbl[0][1] - timedelta(hours=10)
result = client.timeline_get_lsn_by_timestamp(tenant_id, timeline_id_child, probe_timestamp)
result = client.timeline_get_lsn_by_timestamp(
tenant_id, timeline_id_child, f"{probe_timestamp.isoformat()}Z"
)
assert result["kind"] == "past"
# make sure that we return the minimum lsn here at the start of the range
assert Lsn(result["lsn"]) >= last_flush_lsn

View File

@@ -310,7 +310,7 @@ def test_sharding_service_compute_hook(
notifications.append(request.json)
return Response(status=200)
httpserver.expect_request("/notify", method="PUT").respond_with_handler(handler)
httpserver.expect_request("/notify", method="POST").respond_with_handler(handler)
# Start running
env = neon_env_builder.init_start()

View File

@@ -29,7 +29,7 @@ chrono = { version = "0.4", default-features = false, features = ["clock", "serd
clap = { version = "4", features = ["derive", "string"] }
clap_builder = { version = "4", default-features = false, features = ["color", "help", "std", "string", "suggestions", "usage"] }
crossbeam-utils = { version = "0.8" }
diesel = { version = "2", features = ["postgres", "r2d2", "serde_json"] }
diesel = { version = "2", features = ["postgres", "serde_json"] }
either = { version = "1" }
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
futures-channel = { version = "0.3", features = ["sink"] }
@@ -90,7 +90,6 @@ anyhow = { version = "1", features = ["backtrace"] }
bytes = { version = "1", features = ["serde"] }
cc = { version = "1", default-features = false, features = ["parallel"] }
chrono = { version = "0.4", default-features = false, features = ["clock", "serde", "wasmbind"] }
diesel_derives = { version = "2", features = ["32-column-tables", "postgres", "r2d2", "with-deprecated"] }
either = { version = "1" }
getrandom = { version = "0.2", default-features = false, features = ["std"] }
hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", default-features = false, features = ["raw"] }