mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-19 14:10:37 +00:00
Compare commits
16 Commits
testing_ou
...
problame/l
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9ac1efbccd | ||
|
|
6bfc0492ac | ||
|
|
3adaec3ab2 | ||
|
|
79c577c2eb | ||
|
|
74601238ee | ||
|
|
edf24e7afc | ||
|
|
c5f24bab55 | ||
|
|
6124ad694a | ||
|
|
49bf66a467 | ||
|
|
d0497786d9 | ||
|
|
926d53de2d | ||
|
|
d598481894 | ||
|
|
b1fd8db8b3 | ||
|
|
219bc223f4 | ||
|
|
b22675c6ac | ||
|
|
356a18fa4c |
27
Cargo.lock
generated
27
Cargo.lock
generated
@@ -158,6 +158,17 @@ dependencies = [
|
||||
"syn 1.0.109",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-channel"
|
||||
version = "1.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35"
|
||||
dependencies = [
|
||||
"concurrent-queue",
|
||||
"event-listener",
|
||||
"futures-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-compression"
|
||||
version = "0.4.0"
|
||||
@@ -1015,6 +1026,15 @@ dependencies = [
|
||||
"zstd",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "concurrent-queue"
|
||||
version = "2.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f057a694a54f12365049b0958a1685bb52d567f5593b355fbf685838e873d400"
|
||||
dependencies = [
|
||||
"crossbeam-utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "const_format"
|
||||
version = "0.2.30"
|
||||
@@ -1435,6 +1455,12 @@ dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "event-listener"
|
||||
version = "2.5.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
|
||||
|
||||
[[package]]
|
||||
name = "fail"
|
||||
version = "0.5.1"
|
||||
@@ -2656,6 +2682,7 @@ name = "pageserver"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-channel",
|
||||
"async-compression",
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
|
||||
@@ -612,13 +612,51 @@ RUN wget https://gitlab.com/dalibo/postgresql_anonymizer/-/archive/1.1.0/postgre
|
||||
sort -o /before.txt /before.txt && sort -o /after.txt /after.txt && \
|
||||
comm -13 /before.txt /after.txt | tar --directory=/usr/local/pgsql --zstd -cf /extensions/anon.tar.zst -T -
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "rust extensions" for older extension which hasn't been updated to `pgrx` yet
|
||||
# This layer is used to build `pgx` deps
|
||||
#
|
||||
#########################################################################################
|
||||
FROM build-deps AS rust-extensions-build-pgx
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
RUN apt-get update && \
|
||||
apt-get install -y curl libclang-dev cmake && \
|
||||
useradd -ms /bin/bash nonroot -b /home
|
||||
|
||||
ENV HOME=/home/nonroot
|
||||
ENV PATH="/home/nonroot/.cargo/bin:/usr/local/pgsql/bin/:$PATH"
|
||||
USER nonroot
|
||||
WORKDIR /home/nonroot
|
||||
ARG PG_VERSION
|
||||
|
||||
RUN case "${PG_VERSION}" in \
|
||||
"v14" | "v15") \
|
||||
;; \
|
||||
"v16") \
|
||||
echo "TODO: Not yet supported for PostgreSQL 16. Need to update pgrx dependencies" && exit 0 \
|
||||
;; \
|
||||
*) \
|
||||
echo "unexpected PostgreSQL version ${PG_VERSION}" && exit 1 \
|
||||
;; \
|
||||
esac && \
|
||||
curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && \
|
||||
chmod +x rustup-init && \
|
||||
./rustup-init -y --no-modify-path --profile minimal --default-toolchain stable && \
|
||||
rm rustup-init && \
|
||||
cargo install --locked --version 0.7.3 cargo-pgx && \
|
||||
/bin/bash -c 'cargo pgx init --pg${PG_VERSION:1}=/usr/local/pgsql/bin/pg_config'
|
||||
|
||||
USER root
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "rust extensions"
|
||||
# This layer is used to build `pgrx` deps
|
||||
#
|
||||
#########################################################################################
|
||||
FROM build-deps AS rust-extensions-build
|
||||
FROM build-deps AS rust-extensions-build-pgrx
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
RUN apt-get update && \
|
||||
@@ -647,14 +685,26 @@ USER root
|
||||
#
|
||||
#########################################################################################
|
||||
|
||||
FROM rust-extensions-build AS pg-jsonschema-pg-build
|
||||
FROM rust-extensions-build-pgx AS pg-jsonschema-pg-build
|
||||
ARG PG_VERSION
|
||||
|
||||
RUN wget https://github.com/supabase/pg_jsonschema/archive/refs/tags/v0.2.0.tar.gz -O pg_jsonschema.tar.gz && \
|
||||
echo "9118fc508a6e231e7a39acaa6f066fcd79af17a5db757b47d2eefbe14f7794f0 pg_jsonschema.tar.gz" | sha256sum --check && \
|
||||
# caeab60d70b2fd3ae421ec66466a3abbb37b7ee6 made on 06/03/2023
|
||||
# there is no release tag yet, but we need it due to the superuser fix in the control file, switch to git tag after release >= 0.1.5
|
||||
RUN case "${PG_VERSION}" in \
|
||||
"v14" | "v15") \
|
||||
;; \
|
||||
"v16") \
|
||||
echo "TODO: Not yet supported for PostgreSQL 16. Need to update pgrx dependencies" && exit 0 \
|
||||
;; \
|
||||
*) \
|
||||
echo "unexpected PostgreSQL version \"${PG_VERSION}\"" && exit 1 \
|
||||
;; \
|
||||
esac && \
|
||||
wget https://github.com/supabase/pg_jsonschema/archive/caeab60d70b2fd3ae421ec66466a3abbb37b7ee6.tar.gz -O pg_jsonschema.tar.gz && \
|
||||
echo "54129ce2e7ee7a585648dbb4cef6d73f795d94fe72f248ac01119992518469a4 pg_jsonschema.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_jsonschema-src && cd pg_jsonschema-src && tar xvzf ../pg_jsonschema.tar.gz --strip-components=1 -C . && \
|
||||
sed -i 's/pgrx = "0.10.2"/pgrx = { version = "0.10.2", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
|
||||
cargo pgrx install --release && \
|
||||
sed -i 's/pgx = "0.7.1"/pgx = { version = "0.7.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
|
||||
cargo pgx install --release && \
|
||||
echo "trusted = true" >> /usr/local/pgsql/share/extension/pg_jsonschema.control
|
||||
|
||||
#########################################################################################
|
||||
@@ -664,14 +714,29 @@ RUN wget https://github.com/supabase/pg_jsonschema/archive/refs/tags/v0.2.0.tar.
|
||||
#
|
||||
#########################################################################################
|
||||
|
||||
FROM rust-extensions-build AS pg-graphql-pg-build
|
||||
FROM rust-extensions-build-pgx AS pg-graphql-pg-build
|
||||
ARG PG_VERSION
|
||||
|
||||
RUN wget https://github.com/supabase/pg_graphql/archive/refs/tags/v1.4.0.tar.gz -O pg_graphql.tar.gz && \
|
||||
echo "bd8dc7230282b3efa9ae5baf053a54151ed0e66881c7c53750e2d0c765776edc pg_graphql.tar.gz" | sha256sum --check && \
|
||||
# b4988843647450a153439be367168ed09971af85 made on 22/02/2023 (from remove-pgx-contrib-spiext branch)
|
||||
# Currently pgx version bump to >= 0.7.2 causes "call to unsafe function" compliation errors in
|
||||
# pgx-contrib-spiext. There is a branch that removes that dependency, so use it. It is on the
|
||||
# same 1.1 version we've used before.
|
||||
RUN case "${PG_VERSION}" in \
|
||||
"v14" | "v15") \
|
||||
;; \
|
||||
"v16") \
|
||||
echo "TODO: Not yet supported for PostgreSQL 16. Need to update pgrx dependencies" && exit 0 \
|
||||
;; \
|
||||
*) \
|
||||
echo "unexpected PostgreSQL version" && exit 1 \
|
||||
;; \
|
||||
esac && \
|
||||
wget https://github.com/yrashk/pg_graphql/archive/b4988843647450a153439be367168ed09971af85.tar.gz -O pg_graphql.tar.gz && \
|
||||
echo "0c7b0e746441b2ec24187d0e03555faf935c2159e2839bddd14df6dafbc8c9bd pg_graphql.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_graphql-src && cd pg_graphql-src && tar xvzf ../pg_graphql.tar.gz --strip-components=1 -C . && \
|
||||
sed -i 's/pgrx = "=0.10.2"/pgrx = { version = "0.10.2", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
|
||||
cargo pgrx install --release && \
|
||||
sed -i 's/pgx = "~0.7.1"/pgx = { version = "0.7.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
|
||||
sed -i 's/pgx-tests = "~0.7.1"/pgx-tests = "0.7.3"/g' Cargo.toml && \
|
||||
cargo pgx install --release && \
|
||||
# it's needed to enable extension because it uses untrusted C language
|
||||
sed -i 's/superuser = false/superuser = true/g' /usr/local/pgsql/share/extension/pg_graphql.control && \
|
||||
echo "trusted = true" >> /usr/local/pgsql/share/extension/pg_graphql.control
|
||||
@@ -683,7 +748,7 @@ RUN wget https://github.com/supabase/pg_graphql/archive/refs/tags/v1.4.0.tar.gz
|
||||
#
|
||||
#########################################################################################
|
||||
|
||||
FROM rust-extensions-build AS pg-tiktoken-pg-build
|
||||
FROM rust-extensions-build-pgrx AS pg-tiktoken-pg-build
|
||||
ARG PG_VERSION
|
||||
|
||||
# 26806147b17b60763039c6a6878884c41a262318 made on 26/09/2023
|
||||
@@ -700,7 +765,7 @@ RUN wget https://github.com/kelvich/pg_tiktoken/archive/26806147b17b60763039c6a6
|
||||
#
|
||||
#########################################################################################
|
||||
|
||||
FROM rust-extensions-build AS pg-pgx-ulid-build
|
||||
FROM rust-extensions-build-pgrx AS pg-pgx-ulid-build
|
||||
ARG PG_VERSION
|
||||
|
||||
RUN wget https://github.com/pksunkara/pgx_ulid/archive/refs/tags/v0.1.3.tar.gz -O pgx_ulid.tar.gz && \
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use std::sync::Arc;
|
||||
use std::{thread, time::Duration};
|
||||
use std::{thread, time};
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use postgres::{Client, NoTls};
|
||||
@@ -7,7 +7,7 @@ use tracing::{debug, info};
|
||||
|
||||
use crate::compute::ComputeNode;
|
||||
|
||||
const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500);
|
||||
const MONITOR_CHECK_INTERVAL: u64 = 500; // milliseconds
|
||||
|
||||
// Spin in a loop and figure out the last activity time in the Postgres.
|
||||
// Then update it in the shared state. This function never errors out.
|
||||
@@ -17,12 +17,13 @@ fn watch_compute_activity(compute: &ComputeNode) {
|
||||
let connstr = compute.connstr.as_str();
|
||||
// Define `client` outside of the loop to reuse existing connection if it's active.
|
||||
let mut client = Client::connect(connstr, NoTls);
|
||||
let timeout = time::Duration::from_millis(MONITOR_CHECK_INTERVAL);
|
||||
|
||||
info!("watching Postgres activity at {}", connstr);
|
||||
|
||||
loop {
|
||||
// Should be outside of the write lock to allow others to read while we sleep.
|
||||
thread::sleep(MONITOR_CHECK_INTERVAL);
|
||||
thread::sleep(timeout);
|
||||
|
||||
match &mut client {
|
||||
Ok(cli) => {
|
||||
|
||||
@@ -47,47 +47,10 @@ pub struct S3Bucket {
|
||||
bucket_name: String,
|
||||
prefix_in_bucket: Option<String>,
|
||||
max_keys_per_list_response: Option<i32>,
|
||||
concurrency_limiter: ConcurrencyLimiter,
|
||||
}
|
||||
|
||||
struct ConcurrencyLimiter {
|
||||
// Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded.
|
||||
// Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold.
|
||||
// The helps to ensure we don't exceed the thresholds.
|
||||
write: Arc<Semaphore>,
|
||||
read: Arc<Semaphore>,
|
||||
}
|
||||
|
||||
impl ConcurrencyLimiter {
|
||||
fn for_kind(&self, kind: RequestKind) -> &Arc<Semaphore> {
|
||||
match kind {
|
||||
RequestKind::Get => &self.read,
|
||||
RequestKind::Put => &self.write,
|
||||
RequestKind::List => &self.read,
|
||||
RequestKind::Delete => &self.write,
|
||||
}
|
||||
}
|
||||
|
||||
async fn acquire(
|
||||
&self,
|
||||
kind: RequestKind,
|
||||
) -> Result<tokio::sync::SemaphorePermit<'_>, tokio::sync::AcquireError> {
|
||||
self.for_kind(kind).acquire().await
|
||||
}
|
||||
|
||||
async fn acquire_owned(
|
||||
&self,
|
||||
kind: RequestKind,
|
||||
) -> Result<tokio::sync::OwnedSemaphorePermit, tokio::sync::AcquireError> {
|
||||
Arc::clone(self.for_kind(kind)).acquire_owned().await
|
||||
}
|
||||
|
||||
fn new(limit: usize) -> ConcurrencyLimiter {
|
||||
Self {
|
||||
read: Arc::new(Semaphore::new(limit)),
|
||||
write: Arc::new(Semaphore::new(limit)),
|
||||
}
|
||||
}
|
||||
concurrency_limiter: Arc<Semaphore>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
@@ -154,7 +117,7 @@ impl S3Bucket {
|
||||
bucket_name: aws_config.bucket_name.clone(),
|
||||
max_keys_per_list_response: aws_config.max_keys_per_list_response,
|
||||
prefix_in_bucket,
|
||||
concurrency_limiter: ConcurrencyLimiter::new(aws_config.concurrency_limit.get()),
|
||||
concurrency_limiter: Arc::new(Semaphore::new(aws_config.concurrency_limit.get())),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -193,7 +156,7 @@ impl S3Bucket {
|
||||
let started_at = start_counting_cancelled_wait(kind);
|
||||
let permit = self
|
||||
.concurrency_limiter
|
||||
.acquire(kind)
|
||||
.acquire()
|
||||
.await
|
||||
.expect("semaphore is never closed");
|
||||
|
||||
@@ -209,7 +172,8 @@ impl S3Bucket {
|
||||
let started_at = start_counting_cancelled_wait(kind);
|
||||
let permit = self
|
||||
.concurrency_limiter
|
||||
.acquire_owned(kind)
|
||||
.clone()
|
||||
.acquire_owned()
|
||||
.await
|
||||
.expect("semaphore is never closed");
|
||||
|
||||
|
||||
@@ -24,9 +24,6 @@ pub enum ApiError {
|
||||
#[error("Precondition failed: {0}")]
|
||||
PreconditionFailed(Box<str>),
|
||||
|
||||
#[error("Resource temporarily unavailable: {0}")]
|
||||
ResourceUnavailable(String),
|
||||
|
||||
#[error("Shutting down")]
|
||||
ShuttingDown,
|
||||
|
||||
@@ -62,10 +59,6 @@ impl ApiError {
|
||||
"Shutting down".to_string(),
|
||||
StatusCode::SERVICE_UNAVAILABLE,
|
||||
),
|
||||
ApiError::ResourceUnavailable(err) => HttpErrorBody::response_from_msg_and_status(
|
||||
err.to_string(),
|
||||
StatusCode::SERVICE_UNAVAILABLE,
|
||||
),
|
||||
ApiError::InternalServerError(err) => HttpErrorBody::response_from_msg_and_status(
|
||||
err.to_string(),
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
|
||||
@@ -81,6 +81,7 @@ enumset.workspace = true
|
||||
strum.workspace = true
|
||||
strum_macros.workspace = true
|
||||
tempfile.workspace = true
|
||||
async-channel = "1.9.0"
|
||||
|
||||
[dev-dependencies]
|
||||
criterion.workspace = true
|
||||
|
||||
@@ -605,6 +605,31 @@ fn start_pageserver(
|
||||
);
|
||||
}
|
||||
|
||||
task_mgr::spawn(
|
||||
BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::BackgroundRuntimeTurnaroundMeasure,
|
||||
None,
|
||||
None,
|
||||
"background runtime turnaround measure",
|
||||
true,
|
||||
async move {
|
||||
let server = hyper::Server::try_bind(&"0.0.0.0:2342".parse().unwrap()).expect("bind");
|
||||
let server = server
|
||||
.serve(hyper::service::make_service_fn(|_| async move {
|
||||
Ok::<_, std::convert::Infallible>(hyper::service::service_fn(
|
||||
move |_: hyper::Request<hyper::Body>| async move {
|
||||
Ok::<_, std::convert::Infallible>(hyper::Response::new(
|
||||
hyper::Body::from(format!("alive")),
|
||||
))
|
||||
},
|
||||
))
|
||||
}))
|
||||
.with_graceful_shutdown(task_mgr::shutdown_watcher());
|
||||
server.await?;
|
||||
Ok(())
|
||||
},
|
||||
);
|
||||
|
||||
let mut shutdown_pageserver = Some(shutdown_pageserver.drop_guard());
|
||||
|
||||
// All started up! Now just sit and wait for shutdown signal.
|
||||
|
||||
@@ -132,7 +132,7 @@ impl From<PageReconstructError> for ApiError {
|
||||
ApiError::InternalServerError(anyhow::anyhow!("request was cancelled"))
|
||||
}
|
||||
PageReconstructError::AncestorStopping(_) => {
|
||||
ApiError::ResourceUnavailable(format!("{pre}"))
|
||||
ApiError::InternalServerError(anyhow::Error::new(pre))
|
||||
}
|
||||
PageReconstructError::WalRedo(pre) => {
|
||||
ApiError::InternalServerError(anyhow::Error::new(pre))
|
||||
@@ -145,7 +145,7 @@ impl From<TenantMapInsertError> for ApiError {
|
||||
fn from(tmie: TenantMapInsertError) -> ApiError {
|
||||
match tmie {
|
||||
TenantMapInsertError::StillInitializing | TenantMapInsertError::ShuttingDown => {
|
||||
ApiError::ResourceUnavailable(format!("{tmie}"))
|
||||
ApiError::InternalServerError(anyhow::Error::new(tmie))
|
||||
}
|
||||
TenantMapInsertError::TenantAlreadyExists(id, state) => {
|
||||
ApiError::Conflict(format!("tenant {id} already exists, state: {state:?}"))
|
||||
@@ -159,12 +159,6 @@ impl From<TenantStateError> for ApiError {
|
||||
fn from(tse: TenantStateError) -> ApiError {
|
||||
match tse {
|
||||
TenantStateError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid).into()),
|
||||
TenantStateError::NotActive(_) => {
|
||||
ApiError::ResourceUnavailable("Tenant not yet active".into())
|
||||
}
|
||||
TenantStateError::IsStopping(_) => {
|
||||
ApiError::ResourceUnavailable("Tenant is stopping".into())
|
||||
}
|
||||
_ => ApiError::InternalServerError(anyhow::Error::new(tse)),
|
||||
}
|
||||
}
|
||||
@@ -174,17 +168,14 @@ impl From<GetTenantError> for ApiError {
|
||||
fn from(tse: GetTenantError) -> ApiError {
|
||||
match tse {
|
||||
GetTenantError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid).into()),
|
||||
GetTenantError::Broken(reason) => {
|
||||
ApiError::InternalServerError(anyhow!("tenant is broken: {}", reason))
|
||||
}
|
||||
GetTenantError::NotActive(_) => {
|
||||
e @ GetTenantError::NotActive(_) => {
|
||||
// Why is this not `ApiError::NotFound`?
|
||||
// Because we must be careful to never return 404 for a tenant if it does
|
||||
// in fact exist locally. If we did, the caller could draw the conclusion
|
||||
// that it can attach the tenant to another PS and we'd be in split-brain.
|
||||
//
|
||||
// (We can produce this variant only in `mgr::get_tenant(..., active=true)` calls).
|
||||
ApiError::ResourceUnavailable("Tenant not yet active".into())
|
||||
ApiError::InternalServerError(anyhow::Error::new(e))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -631,9 +622,8 @@ async fn tenant_list_handler(
|
||||
let response_data = mgr::list_tenants()
|
||||
.instrument(info_span!("tenant_list"))
|
||||
.await
|
||||
.map_err(|_| {
|
||||
ApiError::ResourceUnavailable("Tenant map is initializing or shutting down".to_string())
|
||||
})?
|
||||
.map_err(anyhow::Error::new)
|
||||
.map_err(ApiError::InternalServerError)?
|
||||
.iter()
|
||||
.map(|(id, state)| TenantInfo {
|
||||
id: *id,
|
||||
|
||||
@@ -79,6 +79,7 @@ use std::{
|
||||
atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
|
||||
Arc, Weak,
|
||||
},
|
||||
task::Poll,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
@@ -252,6 +253,11 @@ pub struct PageCache {
|
||||
/// This is interpreted modulo the page cache size.
|
||||
next_evict_slot: AtomicUsize,
|
||||
|
||||
find_victim_sender:
|
||||
async_channel::Sender<(usize, tokio::sync::RwLockWriteGuard<'static, SlotInner>)>,
|
||||
find_victim_waiters:
|
||||
async_channel::Receiver<(usize, tokio::sync::RwLockWriteGuard<'static, SlotInner>)>,
|
||||
|
||||
size_metrics: &'static PageCacheSizeMetrics,
|
||||
}
|
||||
|
||||
@@ -291,18 +297,23 @@ impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
|
||||
/// to initialize.
|
||||
///
|
||||
pub struct PageWriteGuard<'i> {
|
||||
inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
|
||||
state: PageWriteGuardState<'i>,
|
||||
}
|
||||
|
||||
_permit: PinnedSlotsPermit,
|
||||
|
||||
// Are the page contents currently valid?
|
||||
// Used to mark pages as invalid that are assigned but not yet filled with data.
|
||||
valid: bool,
|
||||
enum PageWriteGuardState<'i> {
|
||||
Invalid {
|
||||
inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
|
||||
_permit: PinnedSlotsPermit,
|
||||
},
|
||||
Downgraded,
|
||||
}
|
||||
|
||||
impl std::ops::DerefMut for PageWriteGuard<'_> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
self.inner.buf
|
||||
match &mut self.state {
|
||||
PageWriteGuardState::Invalid { inner, _permit } => &mut inner.buf,
|
||||
PageWriteGuardState::Downgraded => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -310,25 +321,37 @@ impl std::ops::Deref for PageWriteGuard<'_> {
|
||||
type Target = [u8; PAGE_SZ];
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.inner.buf
|
||||
match &self.state {
|
||||
PageWriteGuardState::Invalid { inner, _permit } => &inner.buf,
|
||||
PageWriteGuardState::Downgraded => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AsMut<[u8; PAGE_SZ]> for PageWriteGuard<'_> {
|
||||
fn as_mut(&mut self) -> &mut [u8; PAGE_SZ] {
|
||||
self.inner.buf
|
||||
match &mut self.state {
|
||||
PageWriteGuardState::Invalid { inner, _permit } => &mut inner.buf,
|
||||
PageWriteGuardState::Downgraded => todo!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PageWriteGuard<'_> {
|
||||
impl<'a> PageWriteGuard<'a> {
|
||||
/// Mark that the buffer contents are now valid.
|
||||
pub fn mark_valid(&mut self) {
|
||||
assert!(self.inner.key.is_some());
|
||||
assert!(
|
||||
!self.valid,
|
||||
"mark_valid called on a buffer that was already valid"
|
||||
);
|
||||
self.valid = true;
|
||||
#[must_use]
|
||||
pub fn mark_valid(mut self) -> PageReadGuard<'a> {
|
||||
let prev = std::mem::replace(&mut self.state, PageWriteGuardState::Downgraded);
|
||||
match prev {
|
||||
PageWriteGuardState::Invalid { inner, _permit } => {
|
||||
assert!(inner.key.is_some());
|
||||
PageReadGuard {
|
||||
_permit: Arc::new(_permit),
|
||||
slot_guard: inner.downgrade(),
|
||||
}
|
||||
}
|
||||
PageWriteGuardState::Downgraded => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -339,11 +362,13 @@ impl Drop for PageWriteGuard<'_> {
|
||||
/// initializing it, remove the mapping from the page cache.
|
||||
///
|
||||
fn drop(&mut self) {
|
||||
assert!(self.inner.key.is_some());
|
||||
if !self.valid {
|
||||
let self_key = self.inner.key.as_ref().unwrap();
|
||||
PAGE_CACHE.get().unwrap().remove_mapping(self_key);
|
||||
self.inner.key = None;
|
||||
match &mut self.state {
|
||||
PageWriteGuardState::Invalid { inner, _permit } => {
|
||||
let self_key = inner.key.as_ref().unwrap();
|
||||
PAGE_CACHE.get().unwrap().remove_mapping(self_key);
|
||||
inner.key = None;
|
||||
}
|
||||
PageWriteGuardState::Downgraded => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -356,7 +381,7 @@ pub enum ReadBufResult<'a> {
|
||||
|
||||
/// lock_for_write() return value
|
||||
pub enum WriteBufResult<'a> {
|
||||
Found(PageWriteGuard<'a>),
|
||||
Found(PageReadGuard<'a>),
|
||||
NotFound(PageWriteGuard<'a>),
|
||||
}
|
||||
|
||||
@@ -430,7 +455,7 @@ impl PageCache {
|
||||
/// Store an image of the given page in the cache.
|
||||
///
|
||||
pub async fn memorize_materialized_page(
|
||||
&self,
|
||||
&'static self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
key: Key,
|
||||
@@ -447,15 +472,15 @@ impl PageCache {
|
||||
};
|
||||
|
||||
match self.lock_for_write(&cache_key).await? {
|
||||
WriteBufResult::Found(write_guard) => {
|
||||
WriteBufResult::Found(read_guard) => {
|
||||
// We already had it in cache. Another thread must've put it there
|
||||
// concurrently. Check that it had the same contents that we
|
||||
// replayed.
|
||||
assert!(*write_guard == img);
|
||||
assert!(*read_guard == img);
|
||||
}
|
||||
WriteBufResult::NotFound(mut write_guard) => {
|
||||
write_guard.copy_from_slice(img);
|
||||
write_guard.mark_valid();
|
||||
let _ = write_guard.mark_valid();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -465,7 +490,7 @@ impl PageCache {
|
||||
// Section 1.2: Public interface functions for working with immutable file pages.
|
||||
|
||||
pub async fn read_immutable_buf(
|
||||
&self,
|
||||
&'static self,
|
||||
file_id: FileId,
|
||||
blkno: u32,
|
||||
ctx: &RequestContext,
|
||||
@@ -484,26 +509,13 @@ impl PageCache {
|
||||
// not require changes.
|
||||
|
||||
async fn try_get_pinned_slot_permit(&self) -> anyhow::Result<PinnedSlotsPermit> {
|
||||
let timer = crate::metrics::PAGE_CACHE_ACQUIRE_PINNED_SLOT_TIME.start_timer();
|
||||
match tokio::time::timeout(
|
||||
// Choose small timeout, neon_smgr does its own retries.
|
||||
// https://neondb.slack.com/archives/C04DGM6SMTM/p1694786876476869
|
||||
Duration::from_secs(10),
|
||||
Arc::clone(&self.pinned_slots).acquire_owned(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(res) => Ok(PinnedSlotsPermit(
|
||||
res.expect("this semaphore is never closed"),
|
||||
)),
|
||||
Err(_timeout) => {
|
||||
timer.stop_and_discard();
|
||||
crate::metrics::page_cache_errors_inc(
|
||||
crate::metrics::PageCacheErrorKind::AcquirePinnedSlotTimeout,
|
||||
);
|
||||
anyhow::bail!("timeout: there were page guards alive for all page cache slots")
|
||||
}
|
||||
}
|
||||
let _timer = crate::metrics::PAGE_CACHE_ACQUIRE_PINNED_SLOT_TIME.start_timer();
|
||||
Ok(PinnedSlotsPermit(
|
||||
Arc::clone(&self.pinned_slots)
|
||||
.acquire_owned()
|
||||
.await
|
||||
.unwrap(),
|
||||
))
|
||||
}
|
||||
|
||||
/// Look up a page in the cache.
|
||||
@@ -571,7 +583,7 @@ impl PageCache {
|
||||
/// ```
|
||||
///
|
||||
async fn lock_for_read(
|
||||
&self,
|
||||
&'static self,
|
||||
cache_key: &mut CacheKey,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ReadBufResult> {
|
||||
@@ -638,41 +650,31 @@ impl PageCache {
|
||||
);
|
||||
|
||||
return Ok(ReadBufResult::NotFound(PageWriteGuard {
|
||||
_permit: permit.take().unwrap(),
|
||||
inner,
|
||||
valid: false,
|
||||
state: PageWriteGuardState::Invalid {
|
||||
_permit: permit.take().unwrap(),
|
||||
inner,
|
||||
},
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
/// Look up a page in the cache and lock it in write mode. If it's not
|
||||
/// found, returns None.
|
||||
///
|
||||
/// When locking a page for writing, the search criteria is always "exact".
|
||||
// FIXME: the name is wrong.
|
||||
async fn try_lock_for_write(
|
||||
&self,
|
||||
cache_key: &CacheKey,
|
||||
permit: &mut Option<PinnedSlotsPermit>,
|
||||
) -> Option<PageWriteGuard> {
|
||||
) -> Option<PageReadGuard> {
|
||||
if let Some(slot_idx) = self.search_mapping_for_write(cache_key) {
|
||||
// The page was found in the mapping. Lock the slot, and re-check
|
||||
// that it's still what we expected (because we don't released the mapping
|
||||
// lock already, another thread could have evicted the page)
|
||||
let slot = &self.slots[slot_idx];
|
||||
let inner = slot.inner.write().await;
|
||||
let inner = slot.inner.read().await;
|
||||
if inner.key.as_ref() == Some(cache_key) {
|
||||
slot.inc_usage_count();
|
||||
debug_assert!(
|
||||
{
|
||||
let guard = inner.permit.lock().unwrap();
|
||||
guard.upgrade().is_none()
|
||||
},
|
||||
"we hold a write lock, so, no one else should have a permit"
|
||||
);
|
||||
return Some(PageWriteGuard {
|
||||
_permit: permit.take().unwrap(),
|
||||
inner,
|
||||
valid: true,
|
||||
return Some(PageReadGuard {
|
||||
_permit: inner.coalesce_readers_permit(permit.take().unwrap()),
|
||||
slot_guard: inner,
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -683,7 +685,7 @@ impl PageCache {
|
||||
///
|
||||
/// Similar to lock_for_read(), but the returned buffer is write-locked and
|
||||
/// may be modified by the caller even if it's already found in the cache.
|
||||
async fn lock_for_write(&self, cache_key: &CacheKey) -> anyhow::Result<WriteBufResult> {
|
||||
async fn lock_for_write(&'static self, cache_key: &CacheKey) -> anyhow::Result<WriteBufResult> {
|
||||
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
|
||||
loop {
|
||||
// First check if the key already exists in the cache.
|
||||
@@ -728,9 +730,10 @@ impl PageCache {
|
||||
);
|
||||
|
||||
return Ok(WriteBufResult::NotFound(PageWriteGuard {
|
||||
_permit: permit.take().unwrap(),
|
||||
inner,
|
||||
valid: false,
|
||||
state: PageWriteGuardState::Invalid {
|
||||
_permit: permit.take().unwrap(),
|
||||
inner,
|
||||
},
|
||||
}));
|
||||
}
|
||||
}
|
||||
@@ -882,10 +885,20 @@ impl PageCache {
|
||||
///
|
||||
/// On return, the slot is empty and write-locked.
|
||||
async fn find_victim(
|
||||
&self,
|
||||
&'static self,
|
||||
_permit_witness: &PinnedSlotsPermit,
|
||||
) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
|
||||
let iter_limit = self.slots.len() * 10;
|
||||
// Get in line.
|
||||
let mut receiver = self.find_victim_waiters.recv();
|
||||
// If we get cancelled at the receiver.await below, the victim slot
|
||||
// remains in the channel. Consume these first before going into
|
||||
// the loop below.
|
||||
match futures::poll!(&mut receiver) {
|
||||
Poll::Ready(Ok(res)) => return Ok(res),
|
||||
Poll::Ready(Err(_closed)) => unreachable!("we never close the channel"),
|
||||
Poll::Pending => {} // the regular case where we aren't cancelled below
|
||||
};
|
||||
|
||||
let mut iters = 0;
|
||||
loop {
|
||||
iters += 1;
|
||||
@@ -897,41 +910,8 @@ impl PageCache {
|
||||
let mut inner = match slot.inner.try_write() {
|
||||
Ok(inner) => inner,
|
||||
Err(_err) => {
|
||||
if iters > iter_limit {
|
||||
// NB: Even with the permits, there's no hard guarantee that we will find a slot with
|
||||
// any particular number of iterations: other threads might race ahead and acquire and
|
||||
// release pins just as we're scanning the array.
|
||||
//
|
||||
// Imagine that nslots is 2, and as starting point, usage_count==1 on all
|
||||
// slots. There are two threads running concurrently, A and B. A has just
|
||||
// acquired the permit from the semaphore.
|
||||
//
|
||||
// A: Look at slot 1. Its usage_count == 1, so decrement it to zero, and continue the search
|
||||
// B: Acquire permit.
|
||||
// B: Look at slot 2, decrement its usage_count to zero and continue the search
|
||||
// B: Look at slot 1. Its usage_count is zero, so pin it and bump up its usage_count to 1.
|
||||
// B: Release pin and permit again
|
||||
// B: Acquire permit.
|
||||
// B: Look at slot 2. Its usage_count is zero, so pin it and bump up its usage_count to 1.
|
||||
// B: Release pin and permit again
|
||||
//
|
||||
// Now we're back in the starting situation that both slots have
|
||||
// usage_count 1, but A has now been through one iteration of the
|
||||
// find_victim() loop. This can repeat indefinitely and on each
|
||||
// iteration, A's iteration count increases by one.
|
||||
//
|
||||
// So, even though the semaphore for the permits is fair, the victim search
|
||||
// itself happens in parallel and is not fair.
|
||||
// Hence even with a permit, a task can theoretically be starved.
|
||||
// To avoid this, we'd need tokio to give priority to tasks that are holding
|
||||
// permits for longer.
|
||||
// Note that just yielding to tokio during iteration without such
|
||||
// priority boosting is likely counter-productive. We'd just give more opportunities
|
||||
// for B to bump usage count, further starving A.
|
||||
crate::metrics::page_cache_errors_inc(
|
||||
crate::metrics::PageCacheErrorKind::EvictIterLimit,
|
||||
);
|
||||
anyhow::bail!("exceeded evict iter limit");
|
||||
if iters > self.slots.len() * (MAX_USAGE_COUNT as usize) {
|
||||
unreachable!("find_victim_waiters prevents starvation");
|
||||
}
|
||||
continue;
|
||||
}
|
||||
@@ -942,7 +922,10 @@ impl PageCache {
|
||||
inner.key = None;
|
||||
}
|
||||
crate::metrics::PAGE_CACHE_FIND_VICTIMS_ITERS_TOTAL.inc_by(iters as u64);
|
||||
return Ok((slot_idx, inner));
|
||||
self.find_victim_sender
|
||||
.try_send((slot_idx, inner))
|
||||
.expect("we always get in line first");
|
||||
return Ok(receiver.await.unwrap());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -979,6 +962,7 @@ impl PageCache {
|
||||
})
|
||||
.collect();
|
||||
|
||||
let (find_victim_sender, find_victim_waiters) = async_channel::bounded(num_pages);
|
||||
Self {
|
||||
materialized_page_map: Default::default(),
|
||||
immutable_page_map: Default::default(),
|
||||
@@ -986,6 +970,8 @@ impl PageCache {
|
||||
next_evict_slot: AtomicUsize::new(0),
|
||||
size_metrics,
|
||||
pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)),
|
||||
find_victim_sender,
|
||||
find_victim_waiters,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1265,10 +1265,7 @@ async fn get_active_tenant_with_timeout(
|
||||
Ok(tenant) => tenant,
|
||||
Err(e @ GetTenantError::NotFound(_)) => return Err(GetActiveTenantError::NotFound(e)),
|
||||
Err(GetTenantError::NotActive(_)) => {
|
||||
unreachable!("we're calling get_tenant with active_only=false")
|
||||
}
|
||||
Err(GetTenantError::Broken(_)) => {
|
||||
unreachable!("we're calling get_tenant with active_only=false")
|
||||
unreachable!("we're calling get_tenant with active=false")
|
||||
}
|
||||
};
|
||||
let wait_time = Duration::from_secs(30);
|
||||
|
||||
@@ -293,6 +293,8 @@ pub enum TaskKind {
|
||||
|
||||
DebugTool,
|
||||
|
||||
BackgroundRuntimeTurnaroundMeasure,
|
||||
|
||||
#[cfg(test)]
|
||||
UnitTest,
|
||||
}
|
||||
|
||||
@@ -186,27 +186,22 @@ impl FileBlockReader {
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BlockLease, std::io::Error> {
|
||||
let cache = page_cache::get();
|
||||
loop {
|
||||
match cache
|
||||
.read_immutable_buf(self.file_id, blknum, ctx)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
format!("Failed to read immutable buf: {e:#}"),
|
||||
)
|
||||
})? {
|
||||
ReadBufResult::Found(guard) => break Ok(guard.into()),
|
||||
ReadBufResult::NotFound(mut write_guard) => {
|
||||
// Read the page from disk into the buffer
|
||||
self.fill_buffer(write_guard.deref_mut(), blknum).await?;
|
||||
write_guard.mark_valid();
|
||||
|
||||
// Swap for read lock
|
||||
continue;
|
||||
}
|
||||
};
|
||||
}
|
||||
match cache
|
||||
.read_immutable_buf(self.file_id, blknum, ctx)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
format!("Failed to read immutable buf: {e:#}"),
|
||||
)
|
||||
})? {
|
||||
ReadBufResult::Found(guard) => return Ok(guard.into()),
|
||||
ReadBufResult::NotFound(mut write_guard) => {
|
||||
// Read the page from disk into the buffer
|
||||
self.fill_buffer(write_guard.deref_mut(), blknum).await?;
|
||||
return Ok(write_guard.mark_valid().into());
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -70,38 +70,34 @@ impl EphemeralFile {
|
||||
let flushed_blknums = 0..self.len / PAGE_SZ as u64;
|
||||
if flushed_blknums.contains(&(blknum as u64)) {
|
||||
let cache = page_cache::get();
|
||||
loop {
|
||||
match cache
|
||||
.read_immutable_buf(self.page_cache_file_id, blknum, ctx)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
// order path before error because error is anyhow::Error => might have many contexts
|
||||
format!(
|
||||
"ephemeral file: read immutable page #{}: {}: {:#}",
|
||||
blknum,
|
||||
self.file.path.display(),
|
||||
e,
|
||||
),
|
||||
)
|
||||
})? {
|
||||
page_cache::ReadBufResult::Found(guard) => {
|
||||
return Ok(BlockLease::PageReadGuard(guard))
|
||||
}
|
||||
page_cache::ReadBufResult::NotFound(mut write_guard) => {
|
||||
let buf: &mut [u8] = write_guard.deref_mut();
|
||||
debug_assert_eq!(buf.len(), PAGE_SZ);
|
||||
self.file
|
||||
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)
|
||||
.await?;
|
||||
write_guard.mark_valid();
|
||||
|
||||
// Swap for read lock
|
||||
continue;
|
||||
}
|
||||
};
|
||||
}
|
||||
match cache
|
||||
.read_immutable_buf(self.page_cache_file_id, blknum, ctx)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
// order path before error because error is anyhow::Error => might have many contexts
|
||||
format!(
|
||||
"ephemeral file: read immutable page #{}: {}: {:#}",
|
||||
blknum,
|
||||
self.file.path.display(),
|
||||
e,
|
||||
),
|
||||
)
|
||||
})? {
|
||||
page_cache::ReadBufResult::Found(guard) => {
|
||||
return Ok(BlockLease::PageReadGuard(guard))
|
||||
}
|
||||
page_cache::ReadBufResult::NotFound(mut write_guard) => {
|
||||
let buf: &mut [u8] = write_guard.deref_mut();
|
||||
debug_assert_eq!(buf.len(), PAGE_SZ);
|
||||
self.file
|
||||
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)
|
||||
.await?;
|
||||
let read_guard = write_guard.mark_valid();
|
||||
return Ok(BlockLease::PageReadGuard(read_guard));
|
||||
}
|
||||
};
|
||||
} else {
|
||||
debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64);
|
||||
Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail))
|
||||
@@ -171,7 +167,7 @@ impl EphemeralFile {
|
||||
let buf: &mut [u8] = write_guard.deref_mut();
|
||||
debug_assert_eq!(buf.len(), PAGE_SZ);
|
||||
buf.copy_from_slice(&self.ephemeral_file.mutable_tail);
|
||||
write_guard.mark_valid();
|
||||
let _ = write_guard.mark_valid();
|
||||
// pre-warm successful
|
||||
}
|
||||
Err(e) => {
|
||||
|
||||
@@ -510,11 +510,6 @@ pub enum GetTenantError {
|
||||
NotFound(TenantId),
|
||||
#[error("Tenant {0} is not active")]
|
||||
NotActive(TenantId),
|
||||
/// Broken is logically a subset of NotActive, but a distinct error is useful as
|
||||
/// NotActive is usually a retryable state for API purposes, whereas Broken
|
||||
/// is a stuck error state
|
||||
#[error("Tenant is broken: {0}")]
|
||||
Broken(String),
|
||||
}
|
||||
|
||||
/// Gets the tenant from the in-memory data, erroring if it's absent or is not fitting to the query.
|
||||
@@ -529,20 +524,10 @@ pub async fn get_tenant(
|
||||
let tenant = m
|
||||
.get(&tenant_id)
|
||||
.ok_or(GetTenantError::NotFound(tenant_id))?;
|
||||
|
||||
match tenant.current_state() {
|
||||
TenantState::Broken {
|
||||
reason,
|
||||
backtrace: _,
|
||||
} if active_only => Err(GetTenantError::Broken(reason)),
|
||||
TenantState::Active => Ok(Arc::clone(tenant)),
|
||||
_ => {
|
||||
if active_only {
|
||||
Err(GetTenantError::NotActive(tenant_id))
|
||||
} else {
|
||||
Ok(Arc::clone(tenant))
|
||||
}
|
||||
}
|
||||
if active_only && !tenant.is_active() {
|
||||
Err(GetTenantError::NotActive(tenant_id))
|
||||
} else {
|
||||
Ok(Arc::clone(tenant))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -864,11 +864,11 @@ impl DeltaLayerInner {
|
||||
expected_summary.index_start_blk = actual_summary.index_start_blk;
|
||||
expected_summary.index_root_blk = actual_summary.index_root_blk;
|
||||
if actual_summary != expected_summary {
|
||||
bail!(
|
||||
"in-file summary does not match expected summary. actual = {:?} expected = {:?}",
|
||||
actual_summary,
|
||||
expected_summary
|
||||
);
|
||||
// bail!(
|
||||
// "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
|
||||
// actual_summary,
|
||||
// expected_summary
|
||||
// );
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -457,11 +457,11 @@ impl ImageLayerInner {
|
||||
expected_summary.index_root_blk = actual_summary.index_root_blk;
|
||||
|
||||
if actual_summary != expected_summary {
|
||||
bail!(
|
||||
"in-file summary does not match expected summary. actual = {:?} expected = {:?}",
|
||||
actual_summary,
|
||||
expected_summary
|
||||
);
|
||||
// bail!(
|
||||
// "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
|
||||
// actual_summary,
|
||||
// expected_summary
|
||||
// );
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -496,36 +496,13 @@ impl Timeline {
|
||||
};
|
||||
|
||||
let timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME.start_timer();
|
||||
let path = self
|
||||
.get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
|
||||
self.get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
|
||||
.await?;
|
||||
timer.stop_and_record();
|
||||
|
||||
let timer = RECONSTRUCT_TIME.start_timer();
|
||||
let res = self.reconstruct_value(key, lsn, reconstruct_state).await;
|
||||
timer.stop_and_record();
|
||||
|
||||
if cfg!(feature = "testing") && res.is_err() {
|
||||
// it can only be walredo issue
|
||||
use std::fmt::Write;
|
||||
|
||||
let mut msg = String::new();
|
||||
|
||||
path.into_iter().for_each(|(res, cont_lsn, layer)| {
|
||||
writeln!(
|
||||
msg,
|
||||
"- layer traversal: result {res:?}, cont_lsn {cont_lsn}, layer: {}",
|
||||
layer(),
|
||||
)
|
||||
.expect("string grows")
|
||||
});
|
||||
|
||||
// this is to rule out or provide evidence that we could in some cases read a duplicate
|
||||
// walrecord
|
||||
tracing::info!("walredo failed, path:\n{msg}");
|
||||
}
|
||||
|
||||
res
|
||||
RECONSTRUCT_TIME
|
||||
.observe_closure_duration(|| self.reconstruct_value(key, lsn, reconstruct_state))
|
||||
.await
|
||||
}
|
||||
|
||||
/// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
|
||||
@@ -678,38 +655,38 @@ impl Timeline {
|
||||
) -> anyhow::Result<()> {
|
||||
const ROUNDS: usize = 2;
|
||||
|
||||
static CONCURRENT_COMPACTIONS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
|
||||
once_cell::sync::Lazy::new(|| {
|
||||
let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS;
|
||||
let permits = usize::max(
|
||||
1,
|
||||
// while a lot of the work is done on spawn_blocking, we still do
|
||||
// repartitioning in the async context. this should give leave us some workers
|
||||
// unblocked to be blocked on other work, hopefully easing any outside visible
|
||||
// effects of restarts.
|
||||
//
|
||||
// 6/8 is a guess; previously we ran with unlimited 8 and more from
|
||||
// spawn_blocking.
|
||||
(total_threads * 3).checked_div(4).unwrap_or(0),
|
||||
);
|
||||
assert_ne!(permits, 0, "we will not be adding in permits later");
|
||||
assert!(
|
||||
permits < total_threads,
|
||||
"need threads avail for shorter work"
|
||||
);
|
||||
tokio::sync::Semaphore::new(permits)
|
||||
});
|
||||
// static CONCURRENT_COMPACTIONS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
|
||||
// once_cell::sync::Lazy::new(|| {
|
||||
// let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS;
|
||||
// let permits = usize::max(
|
||||
// 1,
|
||||
// // while a lot of the work is done on spawn_blocking, we still do
|
||||
// // repartitioning in the async context. this should give leave us some workers
|
||||
// // unblocked to be blocked on other work, hopefully easing any outside visible
|
||||
// // effects of restarts.
|
||||
// //
|
||||
// // 6/8 is a guess; previously we ran with unlimited 8 and more from
|
||||
// // spawn_blocking.
|
||||
// (total_threads * 3).checked_div(4).unwrap_or(0),
|
||||
// );
|
||||
// assert_ne!(permits, 0, "we will not be adding in permits later");
|
||||
// assert!(
|
||||
// permits < total_threads,
|
||||
// "need threads avail for shorter work"
|
||||
// );
|
||||
// tokio::sync::Semaphore::new(permits)
|
||||
// });
|
||||
|
||||
// this wait probably never needs any "long time spent" logging, because we already nag if
|
||||
// compaction task goes over it's period (20s) which is quite often in production.
|
||||
let _permit = tokio::select! {
|
||||
permit = CONCURRENT_COMPACTIONS.acquire() => {
|
||||
permit
|
||||
},
|
||||
_ = cancel.cancelled() => {
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
// // this wait probably never needs any "long time spent" logging, because we already nag if
|
||||
// // compaction task goes over it's period (20s) which is quite often in production.
|
||||
// let _permit = tokio::select! {
|
||||
// permit = CONCURRENT_COMPACTIONS.acquire() => {
|
||||
// permit
|
||||
// },
|
||||
// _ = cancel.cancelled() => {
|
||||
// return Ok(());
|
||||
// }
|
||||
// };
|
||||
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
|
||||
@@ -2247,7 +2224,7 @@ impl Timeline {
|
||||
request_lsn: Lsn,
|
||||
reconstruct_state: &mut ValueReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Vec<TraversalPathItem>, PageReconstructError> {
|
||||
) -> Result<(), PageReconstructError> {
|
||||
// Start from the current timeline.
|
||||
let mut timeline_owned;
|
||||
let mut timeline = self;
|
||||
@@ -2278,12 +2255,12 @@ impl Timeline {
|
||||
// The function should have updated 'state'
|
||||
//info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn);
|
||||
match result {
|
||||
ValueReconstructResult::Complete => return Ok(traversal_path),
|
||||
ValueReconstructResult::Complete => return Ok(()),
|
||||
ValueReconstructResult::Continue => {
|
||||
// If we reached an earlier cached page image, we're done.
|
||||
if cont_lsn == cached_lsn + 1 {
|
||||
MATERIALIZED_PAGE_CACHE_HIT.inc_by(1);
|
||||
return Ok(traversal_path);
|
||||
return Ok(());
|
||||
}
|
||||
if prev_lsn <= cont_lsn {
|
||||
// Didn't make any progress in last iteration. Error out to avoid
|
||||
|
||||
@@ -18,7 +18,8 @@ use std::io::{Error, ErrorKind, Seek, SeekFrom};
|
||||
use std::os::unix::fs::FileExt;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{RwLock, RwLockWriteGuard};
|
||||
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
use tokio::time::Instant;
|
||||
|
||||
///
|
||||
/// A virtual file descriptor. You can use this just like std::fs::File, but internally
|
||||
@@ -110,7 +111,7 @@ impl OpenFiles {
|
||||
///
|
||||
/// On return, we hold a lock on the slot, and its 'tag' has been updated
|
||||
/// recently_used has been set. It's all ready for reuse.
|
||||
fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
|
||||
async fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
|
||||
//
|
||||
// Run the clock algorithm to find a slot to replace.
|
||||
//
|
||||
@@ -142,7 +143,7 @@ impl OpenFiles {
|
||||
}
|
||||
retries += 1;
|
||||
} else {
|
||||
slot_guard = slot.inner.write().unwrap();
|
||||
slot_guard = slot.inner.write().await;
|
||||
index = next;
|
||||
break;
|
||||
}
|
||||
@@ -153,7 +154,7 @@ impl OpenFiles {
|
||||
// old file.
|
||||
//
|
||||
if let Some(old_file) = slot_guard.file.take() {
|
||||
// the normal path of dropping VirtualFile uses "close", use "close-by-replace" here to
|
||||
// the normal path of dropping VirtualFile uses `Close`, use `CloseByReplace` here to
|
||||
// distinguish the two.
|
||||
STORAGE_IO_TIME_METRIC
|
||||
.get(StorageIoOperation::CloseByReplace)
|
||||
@@ -177,19 +178,19 @@ impl OpenFiles {
|
||||
pub enum CrashsafeOverwriteError {
|
||||
#[error("final path has no parent dir")]
|
||||
FinalPathHasNoParentDir,
|
||||
#[error("remove tempfile")]
|
||||
#[error("remove tempfile: {0}")]
|
||||
RemovePreviousTempfile(#[source] std::io::Error),
|
||||
#[error("create tempfile")]
|
||||
#[error("create tempfile: {0}")]
|
||||
CreateTempfile(#[source] std::io::Error),
|
||||
#[error("write tempfile")]
|
||||
#[error("write tempfile: {0}")]
|
||||
WriteContents(#[source] std::io::Error),
|
||||
#[error("sync tempfile")]
|
||||
#[error("sync tempfile: {0}")]
|
||||
SyncTempfile(#[source] std::io::Error),
|
||||
#[error("rename tempfile to final path")]
|
||||
#[error("rename tempfile to final path: {0}")]
|
||||
RenameTempfileToFinalPath(#[source] std::io::Error),
|
||||
#[error("open final path parent dir")]
|
||||
#[error("open final path parent dir: {0}")]
|
||||
OpenFinalPathParentDir(#[source] std::io::Error),
|
||||
#[error("sync final path parent dir")]
|
||||
#[error("sync final path parent dir: {0}")]
|
||||
SyncFinalPathParentDir(#[source] std::io::Error),
|
||||
}
|
||||
impl CrashsafeOverwriteError {
|
||||
@@ -208,6 +209,29 @@ impl CrashsafeOverwriteError {
|
||||
}
|
||||
}
|
||||
|
||||
/// Observe duration for the given storage I/O operation
|
||||
///
|
||||
/// Unlike `observe_closure_duration`, this supports async,
|
||||
/// where "support" means that we measure wall clock time.
|
||||
macro_rules! observe_duration {
|
||||
($op:expr, $($body:tt)*) => {{
|
||||
let instant = Instant::now();
|
||||
let result = $($body)*;
|
||||
let elapsed = instant.elapsed().as_secs_f64();
|
||||
STORAGE_IO_TIME_METRIC
|
||||
.get($op)
|
||||
.observe(elapsed);
|
||||
result
|
||||
}}
|
||||
}
|
||||
|
||||
macro_rules! with_file {
|
||||
($this:expr, $op:expr, | $ident:ident | $($body:tt)*) => {{
|
||||
let $ident = $this.lock_file().await?;
|
||||
observe_duration!($op, $($body)*)
|
||||
}};
|
||||
}
|
||||
|
||||
impl VirtualFile {
|
||||
/// Open a file in read-only mode. Like File::open.
|
||||
pub async fn open(path: &Path) -> Result<VirtualFile, std::io::Error> {
|
||||
@@ -244,11 +268,9 @@ impl VirtualFile {
|
||||
tenant_id = "*".to_string();
|
||||
timeline_id = "*".to_string();
|
||||
}
|
||||
let (handle, mut slot_guard) = get_open_files().find_victim_slot();
|
||||
let (handle, mut slot_guard) = get_open_files().find_victim_slot().await;
|
||||
|
||||
let file = STORAGE_IO_TIME_METRIC
|
||||
.get(StorageIoOperation::Open)
|
||||
.observe_closure_duration(|| open_options.open(path))?;
|
||||
let file = observe_duration!(StorageIoOperation::Open, open_options.open(path))?;
|
||||
|
||||
// Strip all options other than read and write.
|
||||
//
|
||||
@@ -331,22 +353,24 @@ impl VirtualFile {
|
||||
|
||||
/// Call File::sync_all() on the underlying File.
|
||||
pub async fn sync_all(&self) -> Result<(), Error> {
|
||||
self.with_file(StorageIoOperation::Fsync, |file| file.sync_all())
|
||||
.await?
|
||||
with_file!(self, StorageIoOperation::Fsync, |file| file
|
||||
.as_ref()
|
||||
.sync_all())
|
||||
}
|
||||
|
||||
pub async fn metadata(&self) -> Result<fs::Metadata, Error> {
|
||||
self.with_file(StorageIoOperation::Metadata, |file| file.metadata())
|
||||
.await?
|
||||
with_file!(self, StorageIoOperation::Metadata, |file| file
|
||||
.as_ref()
|
||||
.metadata())
|
||||
}
|
||||
|
||||
/// Helper function that looks up the underlying File for this VirtualFile,
|
||||
/// opening it and evicting some other File if necessary. It calls 'func'
|
||||
/// with the physical File.
|
||||
async fn with_file<F, R>(&self, op: StorageIoOperation, mut func: F) -> Result<R, Error>
|
||||
where
|
||||
F: FnMut(&File) -> R,
|
||||
{
|
||||
/// Helper function internal to `VirtualFile` that looks up the underlying File,
|
||||
/// opens it and evicts some other File if necessary. The passed parameter is
|
||||
/// assumed to be a function available for the physical `File`.
|
||||
///
|
||||
/// We are doing it via a macro as Rust doesn't support async closures that
|
||||
/// take on parameters with lifetimes.
|
||||
async fn lock_file(&self) -> Result<FileGuard<'_>, Error> {
|
||||
let open_files = get_open_files();
|
||||
|
||||
let mut handle_guard = {
|
||||
@@ -356,27 +380,23 @@ impl VirtualFile {
|
||||
// We only need to hold the handle lock while we read the current handle. If
|
||||
// another thread closes the file and recycles the slot for a different file,
|
||||
// we will notice that the handle we read is no longer valid and retry.
|
||||
let mut handle = *self.handle.read().unwrap();
|
||||
let mut handle = *self.handle.read().await;
|
||||
loop {
|
||||
// Check if the slot contains our File
|
||||
{
|
||||
let slot = &open_files.slots[handle.index];
|
||||
let slot_guard = slot.inner.read().unwrap();
|
||||
if slot_guard.tag == handle.tag {
|
||||
if let Some(file) = &slot_guard.file {
|
||||
// Found a cached file descriptor.
|
||||
slot.recently_used.store(true, Ordering::Relaxed);
|
||||
return Ok(STORAGE_IO_TIME_METRIC
|
||||
.get(op)
|
||||
.observe_closure_duration(|| func(file)));
|
||||
}
|
||||
let slot_guard = slot.inner.read().await;
|
||||
if slot_guard.tag == handle.tag && slot_guard.file.is_some() {
|
||||
// Found a cached file descriptor.
|
||||
slot.recently_used.store(true, Ordering::Relaxed);
|
||||
return Ok(FileGuard { slot_guard });
|
||||
}
|
||||
}
|
||||
|
||||
// The slot didn't contain our File. We will have to open it ourselves,
|
||||
// but before that, grab a write lock on handle in the VirtualFile, so
|
||||
// that no other thread will try to concurrently open the same file.
|
||||
let handle_guard = self.handle.write().unwrap();
|
||||
let handle_guard = self.handle.write().await;
|
||||
|
||||
// If another thread changed the handle while we were not holding the lock,
|
||||
// then the handle might now be valid again. Loop back to retry.
|
||||
@@ -390,17 +410,10 @@ impl VirtualFile {
|
||||
|
||||
// We need to open the file ourselves. The handle in the VirtualFile is
|
||||
// now locked in write-mode. Find a free slot to put it in.
|
||||
let (handle, mut slot_guard) = open_files.find_victim_slot();
|
||||
let (handle, mut slot_guard) = open_files.find_victim_slot().await;
|
||||
|
||||
// Open the physical file
|
||||
let file = STORAGE_IO_TIME_METRIC
|
||||
.get(StorageIoOperation::Open)
|
||||
.observe_closure_duration(|| self.open_options.open(&self.path))?;
|
||||
|
||||
// Perform the requested operation on it
|
||||
let result = STORAGE_IO_TIME_METRIC
|
||||
.get(op)
|
||||
.observe_closure_duration(|| func(&file));
|
||||
let file = observe_duration!(StorageIoOperation::Open, self.open_options.open(&self.path))?;
|
||||
|
||||
// Store the File in the slot and update the handle in the VirtualFile
|
||||
// to point to it.
|
||||
@@ -408,7 +421,9 @@ impl VirtualFile {
|
||||
|
||||
*handle_guard = handle;
|
||||
|
||||
Ok(result)
|
||||
return Ok(FileGuard {
|
||||
slot_guard: slot_guard.downgrade(),
|
||||
});
|
||||
}
|
||||
|
||||
pub fn remove(self) {
|
||||
@@ -423,11 +438,9 @@ impl VirtualFile {
|
||||
self.pos = offset;
|
||||
}
|
||||
SeekFrom::End(offset) => {
|
||||
self.pos = self
|
||||
.with_file(StorageIoOperation::Seek, |mut file| {
|
||||
file.seek(SeekFrom::End(offset))
|
||||
})
|
||||
.await??
|
||||
self.pos = with_file!(self, StorageIoOperation::Seek, |file| file
|
||||
.as_ref()
|
||||
.seek(SeekFrom::End(offset)))?
|
||||
}
|
||||
SeekFrom::Current(offset) => {
|
||||
let pos = self.pos as i128 + offset as i128;
|
||||
@@ -515,9 +528,9 @@ impl VirtualFile {
|
||||
}
|
||||
|
||||
pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
|
||||
let result = self
|
||||
.with_file(StorageIoOperation::Read, |file| file.read_at(buf, offset))
|
||||
.await?;
|
||||
let result = with_file!(self, StorageIoOperation::Read, |file| file
|
||||
.as_ref()
|
||||
.read_at(buf, offset));
|
||||
if let Ok(size) = result {
|
||||
STORAGE_IO_SIZE
|
||||
.with_label_values(&["read", &self.tenant_id, &self.timeline_id])
|
||||
@@ -527,9 +540,9 @@ impl VirtualFile {
|
||||
}
|
||||
|
||||
async fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
|
||||
let result = self
|
||||
.with_file(StorageIoOperation::Write, |file| file.write_at(buf, offset))
|
||||
.await?;
|
||||
let result = with_file!(self, StorageIoOperation::Write, |file| file
|
||||
.as_ref()
|
||||
.write_at(buf, offset));
|
||||
if let Ok(size) = result {
|
||||
STORAGE_IO_SIZE
|
||||
.with_label_values(&["write", &self.tenant_id, &self.timeline_id])
|
||||
@@ -539,6 +552,18 @@ impl VirtualFile {
|
||||
}
|
||||
}
|
||||
|
||||
struct FileGuard<'a> {
|
||||
slot_guard: RwLockReadGuard<'a, SlotInner>,
|
||||
}
|
||||
|
||||
impl<'a> AsRef<File> for FileGuard<'a> {
|
||||
fn as_ref(&self) -> &File {
|
||||
// This unwrap is safe because we only create `FileGuard`s
|
||||
// if we know that the file is Some.
|
||||
self.slot_guard.file.as_ref().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl VirtualFile {
|
||||
pub(crate) async fn read_blk(
|
||||
@@ -571,20 +596,39 @@ impl VirtualFile {
|
||||
impl Drop for VirtualFile {
|
||||
/// If a VirtualFile is dropped, close the underlying file if it was open.
|
||||
fn drop(&mut self) {
|
||||
let handle = self.handle.get_mut().unwrap();
|
||||
let handle = self.handle.get_mut();
|
||||
|
||||
// We could check with a read-lock first, to avoid waiting on an
|
||||
// unrelated I/O.
|
||||
let slot = &get_open_files().slots[handle.index];
|
||||
let mut slot_guard = slot.inner.write().unwrap();
|
||||
if slot_guard.tag == handle.tag {
|
||||
slot.recently_used.store(false, Ordering::Relaxed);
|
||||
// there is also operation "close-by-replace" for closes done on eviction for
|
||||
// comparison.
|
||||
STORAGE_IO_TIME_METRIC
|
||||
.get(StorageIoOperation::Close)
|
||||
.observe_closure_duration(|| drop(slot_guard.file.take()));
|
||||
fn clean_slot(slot: &Slot, mut slot_guard: RwLockWriteGuard<'_, SlotInner>, tag: u64) {
|
||||
if slot_guard.tag == tag {
|
||||
slot.recently_used.store(false, Ordering::Relaxed);
|
||||
// there is also the `CloseByReplace` operation for closes done on eviction for
|
||||
// comparison.
|
||||
STORAGE_IO_TIME_METRIC
|
||||
.get(StorageIoOperation::Close)
|
||||
.observe_closure_duration(|| drop(slot_guard.file.take()));
|
||||
}
|
||||
}
|
||||
|
||||
// We don't have async drop so we cannot directly await the lock here.
|
||||
// Instead, first do a best-effort attempt at closing the underlying
|
||||
// file descriptor by using `try_write`, and if that fails, spawn
|
||||
// a tokio task to do it asynchronously: we just want it to be
|
||||
// cleaned up eventually.
|
||||
// Most of the time, the `try_lock` should succeed though,
|
||||
// as we have `&mut self` access. In other words, if the slot
|
||||
// is still occupied by our file, there should be no access from
|
||||
// other I/O operations; the only other possible place to lock
|
||||
// the slot is the lock algorithm looking for free slots.
|
||||
let slot = &get_open_files().slots[handle.index];
|
||||
if let Ok(slot_guard) = slot.inner.try_write() {
|
||||
clean_slot(slot, slot_guard, handle.tag);
|
||||
} else {
|
||||
let tag = handle.tag;
|
||||
tokio::spawn(async move {
|
||||
let slot_guard = slot.inner.write().await;
|
||||
clean_slot(slot, slot_guard, tag);
|
||||
});
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -38,9 +38,6 @@ use tracing::*;
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
use utils::{bin_ser::BeSer, id::TenantId, lsn::Lsn, nonblock::set_nonblock};
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
||||
use crate::metrics::{
|
||||
WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_RECORDS_HISTOGRAM, WAL_REDO_RECORD_COUNTER, WAL_REDO_TIME,
|
||||
WAL_REDO_WAIT_TIME,
|
||||
@@ -116,9 +113,6 @@ struct ProcessOutput {
|
||||
pub struct PostgresRedoManager {
|
||||
tenant_id: TenantId,
|
||||
conf: &'static PageServerConf,
|
||||
/// Counter to separate same sized walredo inputs failing at the same millisecond.
|
||||
#[cfg(feature = "testing")]
|
||||
dump_sequence: AtomicUsize,
|
||||
|
||||
stdout: Mutex<Option<ProcessOutput>>,
|
||||
stdin: Mutex<Option<ProcessInput>>,
|
||||
@@ -230,8 +224,6 @@ impl PostgresRedoManager {
|
||||
PostgresRedoManager {
|
||||
tenant_id,
|
||||
conf,
|
||||
#[cfg(feature = "testing")]
|
||||
dump_sequence: AtomicUsize::default(),
|
||||
stdin: Mutex::new(None),
|
||||
stdout: Mutex::new(None),
|
||||
stderr: Mutex::new(None),
|
||||
@@ -298,25 +290,25 @@ impl PostgresRedoManager {
|
||||
WAL_REDO_BYTES_HISTOGRAM.observe(nbytes as f64);
|
||||
|
||||
debug!(
|
||||
"postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {}",
|
||||
len,
|
||||
nbytes,
|
||||
duration.as_micros(),
|
||||
lsn
|
||||
);
|
||||
"postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {}",
|
||||
len,
|
||||
nbytes,
|
||||
duration.as_micros(),
|
||||
lsn
|
||||
);
|
||||
|
||||
// If something went wrong, don't try to reuse the process. Kill it, and
|
||||
// next request will launch a new one.
|
||||
if result.is_err() {
|
||||
error!(
|
||||
"error applying {} WAL records {}..{} ({} bytes) to base image with LSN {} to reconstruct page image at LSN {}",
|
||||
records.len(),
|
||||
records.first().map(|p| p.0).unwrap_or(Lsn(0)),
|
||||
records.last().map(|p| p.0).unwrap_or(Lsn(0)),
|
||||
nbytes,
|
||||
base_img_lsn,
|
||||
lsn
|
||||
);
|
||||
"error applying {} WAL records {}..{} ({} bytes) to base image with LSN {} to reconstruct page image at LSN {}",
|
||||
records.len(),
|
||||
records.first().map(|p| p.0).unwrap_or(Lsn(0)),
|
||||
records.last().map(|p| p.0).unwrap_or(Lsn(0)),
|
||||
nbytes,
|
||||
base_img_lsn,
|
||||
lsn
|
||||
);
|
||||
// self.stdin only holds stdin & stderr as_raw_fd().
|
||||
// Dropping it as part of take() doesn't close them.
|
||||
// The owning objects (ChildStdout and ChildStderr) are stored in
|
||||
@@ -750,7 +742,7 @@ impl PostgresRedoManager {
|
||||
#[instrument(skip_all, fields(tenant_id=%self.tenant_id, pid=%input.as_ref().unwrap().child.id()))]
|
||||
fn apply_wal_records(
|
||||
&self,
|
||||
input: MutexGuard<Option<ProcessInput>>,
|
||||
mut input: MutexGuard<Option<ProcessInput>>,
|
||||
tag: BufferTag,
|
||||
base_img: &Option<Bytes>,
|
||||
records: &[(Lsn, NeonWalRecord)],
|
||||
@@ -787,23 +779,6 @@ impl PostgresRedoManager {
|
||||
build_get_page_msg(tag, &mut writebuf);
|
||||
WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
|
||||
|
||||
let res = self.apply_wal_records0(&writebuf, input, wal_redo_timeout);
|
||||
|
||||
if res.is_err() {
|
||||
// not all of these can be caused by this particular input, however these are so rare
|
||||
// in tests so capture all.
|
||||
self.record_and_log(&writebuf);
|
||||
}
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
fn apply_wal_records0(
|
||||
&self,
|
||||
writebuf: &[u8],
|
||||
mut input: MutexGuard<Option<ProcessInput>>,
|
||||
wal_redo_timeout: Duration,
|
||||
) -> Result<Bytes, std::io::Error> {
|
||||
let proc = input.as_mut().unwrap();
|
||||
let mut nwrite = 0usize;
|
||||
let stdout_fd = proc.stdout_fd;
|
||||
@@ -1009,38 +984,6 @@ impl PostgresRedoManager {
|
||||
}
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
fn record_and_log(&self, writebuf: &[u8]) {
|
||||
let millis = std::time::SystemTime::now()
|
||||
.duration_since(std::time::SystemTime::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis();
|
||||
|
||||
let seq = self.dump_sequence.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
// these files will be collected to an allure report
|
||||
let filename = format!("walredo-{millis}-{}-{seq}.walredo", writebuf.len());
|
||||
|
||||
let path = self.conf.tenant_path(&self.tenant_id).join(&filename);
|
||||
|
||||
let res = std::fs::OpenOptions::new()
|
||||
.write(true)
|
||||
.create_new(true)
|
||||
.read(true)
|
||||
.open(path)
|
||||
.and_then(|mut f| f.write_all(writebuf));
|
||||
|
||||
// trip up allowed_errors
|
||||
if let Err(e) = res {
|
||||
tracing::error!(target=%filename, length=writebuf.len(), "failed to write out the walredo errored input: {e}");
|
||||
} else {
|
||||
tracing::error!(filename, "erroring walredo input saved");
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "testing"))]
|
||||
fn record_and_log(&self, _: &[u8]) {}
|
||||
}
|
||||
|
||||
/// Wrapper type around `std::process::Child` which guarantees that the child
|
||||
|
||||
12
poetry.lock
generated
12
poetry.lock
generated
@@ -2415,18 +2415,18 @@ files = [
|
||||
|
||||
[[package]]
|
||||
name = "urllib3"
|
||||
version = "1.26.17"
|
||||
version = "1.26.11"
|
||||
description = "HTTP library with thread-safe connection pooling, file post, and more."
|
||||
optional = false
|
||||
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*"
|
||||
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*, <4"
|
||||
files = [
|
||||
{file = "urllib3-1.26.17-py2.py3-none-any.whl", hash = "sha256:94a757d178c9be92ef5539b8840d48dc9cf1b2709c9d6b588232a055c524458b"},
|
||||
{file = "urllib3-1.26.17.tar.gz", hash = "sha256:24d6a242c28d29af46c3fae832c36db3bbebcc533dd1bb549172cd739c82df21"},
|
||||
{file = "urllib3-1.26.11-py2.py3-none-any.whl", hash = "sha256:c33ccba33c819596124764c23a97d25f32b28433ba0dedeb77d873a38722c9bc"},
|
||||
{file = "urllib3-1.26.11.tar.gz", hash = "sha256:ea6e8fb210b19d950fab93b60c9009226c63a28808bc8386e05301e25883ac0a"},
|
||||
]
|
||||
|
||||
[package.extras]
|
||||
brotli = ["brotli (==1.0.9)", "brotli (>=1.0.9)", "brotlicffi (>=0.8.0)", "brotlipy (>=0.6.0)"]
|
||||
secure = ["certifi", "cryptography (>=1.3.4)", "idna (>=2.0.0)", "ipaddress", "pyOpenSSL (>=0.14)", "urllib3-secure-extra"]
|
||||
brotli = ["brotli (>=1.0.9)", "brotlicffi (>=0.8.0)", "brotlipy (>=0.6.0)"]
|
||||
secure = ["certifi", "cryptography (>=1.3.4)", "idna (>=2.0.0)", "ipaddress", "pyOpenSSL (>=0.14)"]
|
||||
socks = ["PySocks (>=1.5.6,!=1.5.7,<2.0)"]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -37,7 +37,6 @@ from psycopg2.extensions import connection as PgConnection
|
||||
from psycopg2.extensions import cursor as PgCursor
|
||||
from psycopg2.extensions import make_dsn, parse_dsn
|
||||
from typing_extensions import Literal
|
||||
from urllib3.util.retry import Retry
|
||||
|
||||
from fixtures.broker import NeonBroker
|
||||
from fixtures.log_helper import log
|
||||
@@ -1652,14 +1651,11 @@ class NeonPageserver(PgProtocol):
|
||||
if '"testing"' not in self.version:
|
||||
pytest.skip("pageserver was built without 'testing' feature")
|
||||
|
||||
def http_client(
|
||||
self, auth_token: Optional[str] = None, retries: Optional[Retry] = None
|
||||
) -> PageserverHttpClient:
|
||||
def http_client(self, auth_token: Optional[str] = None) -> PageserverHttpClient:
|
||||
return PageserverHttpClient(
|
||||
port=self.service_port.http,
|
||||
auth_token=auth_token,
|
||||
is_testing_enabled_or_skip=self.is_testing_enabled_or_skip,
|
||||
retries=retries,
|
||||
)
|
||||
|
||||
@property
|
||||
|
||||
@@ -7,8 +7,6 @@ from dataclasses import dataclass
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
import requests
|
||||
from requests.adapters import HTTPAdapter
|
||||
from urllib3.util.retry import Retry
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.metrics import Metrics, parse_metrics
|
||||
@@ -115,40 +113,12 @@ class TenantConfig:
|
||||
|
||||
|
||||
class PageserverHttpClient(requests.Session):
|
||||
def __init__(
|
||||
self,
|
||||
port: int,
|
||||
is_testing_enabled_or_skip: Fn,
|
||||
auth_token: Optional[str] = None,
|
||||
retries: Optional[Retry] = None,
|
||||
):
|
||||
def __init__(self, port: int, is_testing_enabled_or_skip: Fn, auth_token: Optional[str] = None):
|
||||
super().__init__()
|
||||
self.port = port
|
||||
self.auth_token = auth_token
|
||||
self.is_testing_enabled_or_skip = is_testing_enabled_or_skip
|
||||
|
||||
if retries is None:
|
||||
# We apply a retry policy that is different to the default `requests` behavior,
|
||||
# because the pageserver has various transiently unavailable states that benefit
|
||||
# from a client retrying on 503
|
||||
|
||||
retries = Retry(
|
||||
# Status retries are for retrying on 503 while e.g. waiting for tenants to activate
|
||||
status=5,
|
||||
# Connection retries are for waiting for the pageserver to come up and listen
|
||||
connect=5,
|
||||
# No read retries: if a request hangs that is not expected behavior
|
||||
# (this may change in future if we do fault injection of a kind that causes
|
||||
# requests TCP flows to stick)
|
||||
read=False,
|
||||
backoff_factor=0,
|
||||
status_forcelist=[503],
|
||||
allowed_methods=None,
|
||||
remove_headers_on_redirect=[],
|
||||
)
|
||||
|
||||
self.mount("http://", HTTPAdapter(max_retries=retries))
|
||||
|
||||
if auth_token is not None:
|
||||
self.headers["Authorization"] = f"Bearer {auth_token}"
|
||||
|
||||
|
||||
@@ -222,7 +222,7 @@ def get_scale_for_db(size_mb: int) -> int:
|
||||
|
||||
|
||||
ATTACHMENT_NAME_REGEX: re.Pattern = re.compile( # type: ignore[type-arg]
|
||||
r"regression\.diffs|.+\.(?:log|stderr|stdout|filediff|metrics|html|walredo)"
|
||||
r"regression\.diffs|.+\.(?:log|stderr|stdout|filediff|metrics|html)"
|
||||
)
|
||||
|
||||
|
||||
@@ -250,9 +250,6 @@ def allure_attach_from_dir(dir: Path):
|
||||
elif source.endswith(".html"):
|
||||
attachment_type = "text/html"
|
||||
extension = "html"
|
||||
elif source.endswith(".walredo"):
|
||||
attachment_type = "application/octet-stream"
|
||||
extension = "walredo"
|
||||
else:
|
||||
attachment_type = "text/plain"
|
||||
extension = attachment.suffix.removeprefix(".")
|
||||
|
||||
@@ -0,0 +1,52 @@
|
||||
import queue
|
||||
import threading
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, wait_for_last_flush_lsn
|
||||
from fixtures.types import TenantId
|
||||
|
||||
"""
|
||||
553 sudo mkfs.ext4 /dev/nvme1n1
|
||||
555 mkdir test_output
|
||||
556 sudo mount /dev/nvme1n1 test_output
|
||||
557 htop
|
||||
559 ./scripts/pysync
|
||||
560 NEON_BIN=/home/admin/neon/target/release DEFAULT_PG_VERSION=15 ./scripts/pytest --preserve-database-files --timeout=0 ./test_runner/performance/test_pageserver_startup_many_tenants.py
|
||||
561 sudo chown -R admin:admin test_output
|
||||
|
||||
cargo build_testing --release
|
||||
|
||||
562 NEON_BIN=$PWD/target/release DEFAULT_PG_VERSION=15 ./scripts/pytest --preserve-database-files --timeout=0 ./test_runner/performance/test_pageserver_startup_many_tenants.py
|
||||
|
||||
cd test_output/test_pageserver_startup_many_tenants/repo
|
||||
|
||||
sudo env NEON_REPO_DIR=$PWD prlimit --nofile=300000:300000 ../../../target/release/neon_local start
|
||||
# watch initial load complete, then background jobs start. That's the interesting part.
|
||||
sudo env NEON_REPO_DIR=$PWD prlimit --nofile=300000:300000 ../../../target/release/neon_local stop
|
||||
# usually pageserver won't be responsive, kill with
|
||||
sudo pkill -9 pageserver
|
||||
"""
|
||||
def test_pageserver_startup_many_tenants(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# below doesn't work because summaries contain tenant and timeline ids and we check for them
|
||||
|
||||
tenant_id, timeline_id = env.initial_tenant, env.initial_timeline
|
||||
pshttp = env.pageserver.http_client()
|
||||
ep = env.endpoints.create_start("main")
|
||||
ep.safe_psql("create table foo(b text)")
|
||||
for i in range(0, 8):
|
||||
ep.safe_psql("insert into foo(b) values ('some text')")
|
||||
# pg_bin.run_capture(["pgbench", "-i", "-s1", ep.connstr()])
|
||||
wait_for_last_flush_lsn(env, ep, tenant_id, timeline_id)
|
||||
pshttp.timeline_checkpoint(tenant_id, timeline_id)
|
||||
ep.stop_and_destroy()
|
||||
|
||||
env.pageserver.stop()
|
||||
for sk in env.safekeepers:
|
||||
sk.stop()
|
||||
|
||||
tenant_dir = env.repo_dir / "pageserver_1" / "tenants" / str(env.initial_tenant)
|
||||
|
||||
for i in range(0, 20_000):
|
||||
import shutil
|
||||
|
||||
shutil.copytree(tenant_dir, tenant_dir.parent / str(TenantId.generate()))
|
||||
@@ -169,6 +169,3 @@ def test_pageserver_chaos(neon_env_builder: NeonEnvBuilder):
|
||||
# Check that all the updates are visible
|
||||
num_updates = endpoint.safe_psql("SELECT sum(updates) FROM foo")[0][0]
|
||||
assert num_updates == i * 100000
|
||||
|
||||
with open(neon_env_builder.test_output_dir / "foobar.walredo", "w") as file:
|
||||
file.write("lets see if this ends in the report")
|
||||
|
||||
@@ -34,7 +34,6 @@ from fixtures.remote_storage import (
|
||||
)
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
from fixtures.utils import query_scalar, run_pg_bench_small, wait_until
|
||||
from urllib3.util.retry import Retry
|
||||
|
||||
|
||||
def test_timeline_delete(neon_simple_env: NeonEnv):
|
||||
@@ -615,7 +614,7 @@ def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
child_timeline_id = env.neon_cli.create_branch("child", "main")
|
||||
|
||||
ps_http = env.pageserver.http_client(retries=Retry(0, read=False))
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
failpoint_name = "persist_deleted_index_part"
|
||||
ps_http.configure_failpoints((failpoint_name, "pause"))
|
||||
@@ -855,7 +854,7 @@ def test_timeline_delete_resumed_on_attach(
|
||||
# error from http response is also logged
|
||||
".*InternalServerError\\(Tenant is marked as deleted on remote storage.*",
|
||||
# Polling after attach may fail with this
|
||||
".*Resource temporarily unavailable.*Tenant not yet active",
|
||||
f".*InternalServerError\\(Tenant {tenant_id} is not active.*",
|
||||
'.*shutdown_pageserver{exit_code=0}: stopping left-over name="remote upload".*',
|
||||
)
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user