Compare commits

..

2 Commits

Author SHA1 Message Date
Konstantin Knizhnik
d71bece102 Trace 'out of shared memory' errors 2025-01-16 21:19:38 +02:00
Konstantin Knizhnik
21725fe377 Trace 'out of shared memory' errors 2025-01-16 21:19:23 +02:00
38 changed files with 369 additions and 1354 deletions

View File

@@ -824,7 +824,7 @@ jobs:
docker compose -f ./docker-compose/docker-compose.yml down
promote-images-dev:
needs: [ check-permissions, tag, vm-compute-node-image, neon-image ]
needs: [ check-permissions, tag, vm-compute-node-image ]
runs-on: ubuntu-22.04
permissions:

1
Cargo.lock generated
View File

@@ -1312,7 +1312,6 @@ dependencies = [
"tracing-utils",
"url",
"utils",
"uuid",
"vm_monitor",
"workspace_hack",
"zstd",

View File

@@ -995,50 +995,24 @@ RUN wget https://github.com/kelvich/pg_tiktoken/archive/9118dd4549b7d8c0bbc98e04
#########################################################################################
#
# Layer "pg-pgx-ulid-build"
# Compile "pgx_ulid" extension for v16 and below
# Compile "pgx_ulid" extension
#
#########################################################################################
FROM rust-extensions-build AS pg-pgx-ulid-build
ARG PG_VERSION
RUN case "${PG_VERSION}" in \
"v14" | "v15" | "v16") \
;; \
*) \
echo "skipping the version of pgx_ulid for $PG_VERSION" && exit 0 \
;; \
# doesn't support v17 yet
# https://github.com/pksunkara/pgx_ulid/pull/52
RUN case "${PG_VERSION}" in "v17") \
echo "pgx_ulid does not support pg17 as of the latest version (0.1.5)" && exit 0;; \
esac && \
wget https://github.com/pksunkara/pgx_ulid/archive/refs/tags/v0.1.5.tar.gz -O pgx_ulid.tar.gz && \
echo "9d1659a2da65af0133d5451c454de31b37364e3502087dadf579f790bc8bef17 pgx_ulid.tar.gz" | sha256sum --check && \
echo "9d1659a2da65af0133d5451c454de31b37364e3502087dadf579f790bc8bef17 pgx_ulid.tar.gz" | sha256sum --check && \
mkdir pgx_ulid-src && cd pgx_ulid-src && tar xzf ../pgx_ulid.tar.gz --strip-components=1 -C . && \
sed -i 's/pgrx = "^0.11.2"/pgrx = { version = "0.11.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
sed -i 's/pgrx = "^0.11.2"/pgrx = { version = "=0.11.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
cargo pgrx install --release && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/ulid.control
#########################################################################################
#
# Layer "pg-pgx-ulid-pgrx12-build"
# Compile "pgx_ulid" extension for v17 and up
#
#########################################################################################
FROM rust-extensions-build-pgrx12 AS pg-pgx-ulid-pgrx12-build
ARG PG_VERSION
RUN case "${PG_VERSION}" in \
"v17") \
;; \
*) \
echo "skipping the version of pgx_ulid for $PG_VERSION" && exit 0 \
;; \
esac && \
wget https://github.com/pksunkara/pgx_ulid/archive/refs/tags/v0.2.0.tar.gz -O pgx_ulid.tar.gz && \
echo "cef6a9a2e5e7bd1a10a18989286586ee9e6c1c06005a4055cff190de41bf3e9f pgx_ulid.tar.gz" | sha256sum --check && \
mkdir pgx_ulid-src && cd pgx_ulid-src && tar xzf ../pgx_ulid.tar.gz --strip-components=1 -C . && \
sed -i 's/pgrx = "^0.12.7"/pgrx = { version = "0.12.9", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
cargo pgrx install --release && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgx_ulid.control
echo "trusted = true" >> /usr/local/pgsql/share/extension/ulid.control
#########################################################################################
#
@@ -1183,7 +1157,6 @@ COPY --from=timescaledb-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-hint-plan-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-cron-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-pgx-ulid-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-pgx-ulid-pgrx12-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-session-jwt-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=rdkit-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-uuidv7-pg-build /usr/local/pgsql/ /usr/local/pgsql/

View File

@@ -51,7 +51,6 @@ tracing-subscriber.workspace = true
tracing-utils.workspace = true
thiserror.workspace = true
url.workspace = true
uuid.workspace = true
prometheus.workspace = true
postgres_initdb.workspace = true

View File

@@ -1,14 +1,15 @@
use std::{
net::{IpAddr, Ipv6Addr, SocketAddr},
sync::Arc,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
thread,
time::Duration,
};
use anyhow::Result;
use axum::{
extract::Request,
middleware::{self, Next},
response::{IntoResponse, Response},
routing::{get, post},
Router,
@@ -16,9 +17,11 @@ use axum::{
use http::StatusCode;
use tokio::net::TcpListener;
use tower::ServiceBuilder;
use tower_http::{request_id::PropagateRequestIdLayer, trace::TraceLayer};
use tower_http::{
request_id::{MakeRequestId, PropagateRequestIdLayer, RequestId, SetRequestIdLayer},
trace::TraceLayer,
};
use tracing::{debug, error, info, Span};
use uuid::Uuid;
use super::routes::{
check_writability, configure, database_schema, dbs_and_roles, extension_server, extensions,
@@ -31,24 +34,30 @@ async fn handle_404() -> Response {
StatusCode::NOT_FOUND.into_response()
}
const X_REQUEST_ID: &str = "x-request-id";
#[derive(Clone, Default)]
struct ComputeMakeRequestId(Arc<AtomicU64>);
/// This middleware function allows compute_ctl to generate its own request ID
/// if one isn't supplied. The control plane will always send one as a UUID. The
/// neon Postgres extension on the other hand does not send one.
async fn maybe_add_request_id_header(mut request: Request, next: Next) -> Response {
let headers = request.headers_mut();
impl MakeRequestId for ComputeMakeRequestId {
fn make_request_id<B>(
&mut self,
_request: &http::Request<B>,
) -> Option<tower_http::request_id::RequestId> {
let request_id = self
.0
.fetch_add(1, Ordering::SeqCst)
.to_string()
.parse()
.unwrap();
if headers.get(X_REQUEST_ID).is_none() {
headers.append(X_REQUEST_ID, Uuid::new_v4().to_string().parse().unwrap());
Some(RequestId::new(request_id))
}
next.run(request).await
}
/// Run the HTTP server and wait on it forever.
#[tokio::main]
async fn serve(port: u16, compute: Arc<ComputeNode>) {
const X_REQUEST_ID: &str = "x-request-id";
let mut app = Router::new()
.route("/check_writability", post(check_writability::is_writable))
.route("/configure", post(configure::configure))
@@ -73,8 +82,9 @@ async fn serve(port: u16, compute: Arc<ComputeNode>) {
.fallback(handle_404)
.layer(
ServiceBuilder::new()
// Add this middleware since we assume the request ID exists
.layer(middleware::from_fn(maybe_add_request_id_header))
.layer(SetRequestIdLayer::x_request_id(
ComputeMakeRequestId::default(),
))
.layer(
TraceLayer::new_for_http()
.on_request(|request: &http::Request<_>, _span: &Span| {

View File

@@ -33,6 +33,7 @@ use crate::{
reltag::RelTag,
shard::{ShardCount, ShardStripeSize, TenantShardId},
};
use anyhow::bail;
use bytes::{Buf, BufMut, Bytes, BytesMut};
/// The state of a tenant in this pageserver.
@@ -1399,8 +1400,6 @@ pub enum PagestreamFeMessage {
GetPage(PagestreamGetPageRequest),
DbSize(PagestreamDbSizeRequest),
GetSlruSegment(PagestreamGetSlruSegmentRequest),
#[cfg(feature = "testing")]
Test(PagestreamTestRequest),
}
// Wrapped in libpq CopyData
@@ -1412,22 +1411,6 @@ pub enum PagestreamBeMessage {
Error(PagestreamErrorResponse),
DbSize(PagestreamDbSizeResponse),
GetSlruSegment(PagestreamGetSlruSegmentResponse),
#[cfg(feature = "testing")]
Test(PagestreamTestResponse),
}
// Keep in sync with `pagestore_client.h`
#[repr(u8)]
enum PagestreamFeMessageTag {
Exists = 0,
Nblocks = 1,
GetPage = 2,
DbSize = 3,
GetSlruSegment = 4,
/* future tags above this line */
/// For testing purposes, not available in production.
#[cfg(feature = "testing")]
Test = 99,
}
// Keep in sync with `pagestore_client.h`
@@ -1439,28 +1422,7 @@ enum PagestreamBeMessageTag {
Error = 103,
DbSize = 104,
GetSlruSegment = 105,
/* future tags above this line */
/// For testing purposes, not available in production.
#[cfg(feature = "testing")]
Test = 199,
}
impl TryFrom<u8> for PagestreamFeMessageTag {
type Error = u8;
fn try_from(value: u8) -> Result<Self, u8> {
match value {
0 => Ok(PagestreamFeMessageTag::Exists),
1 => Ok(PagestreamFeMessageTag::Nblocks),
2 => Ok(PagestreamFeMessageTag::GetPage),
3 => Ok(PagestreamFeMessageTag::DbSize),
4 => Ok(PagestreamFeMessageTag::GetSlruSegment),
#[cfg(feature = "testing")]
99 => Ok(PagestreamFeMessageTag::Test),
_ => Err(value),
}
}
}
impl TryFrom<u8> for PagestreamBeMessageTag {
type Error = u8;
fn try_from(value: u8) -> Result<Self, u8> {
@@ -1471,8 +1433,6 @@ impl TryFrom<u8> for PagestreamBeMessageTag {
103 => Ok(PagestreamBeMessageTag::Error),
104 => Ok(PagestreamBeMessageTag::DbSize),
105 => Ok(PagestreamBeMessageTag::GetSlruSegment),
#[cfg(feature = "testing")]
199 => Ok(PagestreamBeMessageTag::Test),
_ => Err(value),
}
}
@@ -1590,20 +1550,6 @@ pub struct PagestreamDbSizeResponse {
pub db_size: i64,
}
#[cfg(feature = "testing")]
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct PagestreamTestRequest {
pub hdr: PagestreamRequest,
pub batch_key: u64,
pub message: String,
}
#[cfg(feature = "testing")]
#[derive(Debug)]
pub struct PagestreamTestResponse {
pub req: PagestreamTestRequest,
}
// This is a cut-down version of TenantHistorySize from the pageserver crate, omitting fields
// that require pageserver-internal types. It is sufficient to get the total size.
#[derive(Serialize, Deserialize, Debug)]
@@ -1623,7 +1569,7 @@ impl PagestreamFeMessage {
match self {
Self::Exists(req) => {
bytes.put_u8(PagestreamFeMessageTag::Exists as u8);
bytes.put_u8(0);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
@@ -1634,7 +1580,7 @@ impl PagestreamFeMessage {
}
Self::Nblocks(req) => {
bytes.put_u8(PagestreamFeMessageTag::Nblocks as u8);
bytes.put_u8(1);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
@@ -1645,7 +1591,7 @@ impl PagestreamFeMessage {
}
Self::GetPage(req) => {
bytes.put_u8(PagestreamFeMessageTag::GetPage as u8);
bytes.put_u8(2);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
@@ -1657,7 +1603,7 @@ impl PagestreamFeMessage {
}
Self::DbSize(req) => {
bytes.put_u8(PagestreamFeMessageTag::DbSize as u8);
bytes.put_u8(3);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
@@ -1665,24 +1611,13 @@ impl PagestreamFeMessage {
}
Self::GetSlruSegment(req) => {
bytes.put_u8(PagestreamFeMessageTag::GetSlruSegment as u8);
bytes.put_u8(4);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
bytes.put_u8(req.kind);
bytes.put_u32(req.segno);
}
#[cfg(feature = "testing")]
Self::Test(req) => {
bytes.put_u8(PagestreamFeMessageTag::Test as u8);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
bytes.put_u64(req.batch_key);
let message = req.message.as_bytes();
bytes.put_u64(message.len() as u64);
bytes.put_slice(message);
}
}
bytes.into()
@@ -1710,66 +1645,56 @@ impl PagestreamFeMessage {
),
};
match PagestreamFeMessageTag::try_from(msg_tag)
.map_err(|tag: u8| anyhow::anyhow!("invalid tag {tag}"))?
{
PagestreamFeMessageTag::Exists => {
Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
}))
}
PagestreamFeMessageTag::Nblocks => {
Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
}))
}
PagestreamFeMessageTag::GetPage => {
Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
blkno: body.read_u32::<BigEndian>()?,
}))
}
PagestreamFeMessageTag::DbSize => {
Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
match msg_tag {
0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
}))
}
PagestreamFeMessageTag::GetSlruSegment => Ok(PagestreamFeMessage::GetSlruSegment(
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
})),
1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
})),
2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
blkno: body.read_u32::<BigEndian>()?,
})),
3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
dbnode: body.read_u32::<BigEndian>()?,
})),
4 => Ok(PagestreamFeMessage::GetSlruSegment(
PagestreamGetSlruSegmentRequest {
hdr: PagestreamRequest {
reqid,
@@ -1780,21 +1705,7 @@ impl PagestreamFeMessage {
segno: body.read_u32::<BigEndian>()?,
},
)),
#[cfg(feature = "testing")]
PagestreamFeMessageTag::Test => Ok(PagestreamFeMessage::Test(PagestreamTestRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
batch_key: body.read_u64::<BigEndian>()?,
message: {
let len = body.read_u64::<BigEndian>()?;
let mut buf = vec![0; len as usize];
body.read_exact(&mut buf)?;
String::from_utf8(buf)?
},
})),
_ => bail!("unknown smgr message tag: {:?}", msg_tag),
}
}
}
@@ -1837,15 +1748,6 @@ impl PagestreamBeMessage {
bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
bytes.put(&resp.segment[..]);
}
#[cfg(feature = "testing")]
Self::Test(resp) => {
bytes.put_u8(Tag::Test as u8);
bytes.put_u64(resp.req.batch_key);
let message = resp.req.message.as_bytes();
bytes.put_u64(message.len() as u64);
bytes.put_slice(message);
}
}
}
PagestreamProtocolVersion::V3 => {
@@ -1914,18 +1816,6 @@ impl PagestreamBeMessage {
bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
bytes.put(&resp.segment[..]);
}
#[cfg(feature = "testing")]
Self::Test(resp) => {
bytes.put_u8(Tag::Test as u8);
bytes.put_u64(resp.req.hdr.reqid);
bytes.put_u64(resp.req.hdr.request_lsn.0);
bytes.put_u64(resp.req.hdr.not_modified_since.0);
bytes.put_u64(resp.req.batch_key);
let message = resp.req.message.as_bytes();
bytes.put_u64(message.len() as u64);
bytes.put_slice(message);
}
}
}
}
@@ -2068,28 +1958,6 @@ impl PagestreamBeMessage {
segment: segment.into(),
})
}
#[cfg(feature = "testing")]
Tag::Test => {
let reqid = buf.read_u64::<BigEndian>()?;
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
let batch_key = buf.read_u64::<BigEndian>()?;
let len = buf.read_u64::<BigEndian>()?;
let mut msg = vec![0; len as usize];
buf.read_exact(&mut msg)?;
let message = String::from_utf8(msg)?;
Self::Test(PagestreamTestResponse {
req: PagestreamTestRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
batch_key,
message,
},
})
}
};
let remaining = buf.into_inner();
if !remaining.is_empty() {
@@ -2109,8 +1977,6 @@ impl PagestreamBeMessage {
Self::Error(_) => "Error",
Self::DbSize(_) => "DbSize",
Self::GetSlruSegment(_) => "GetSlruSegment",
#[cfg(feature = "testing")]
Self::Test(_) => "Test",
}
}
}

View File

@@ -277,8 +277,3 @@ pub struct TimelineTermBumpResponse {
pub previous_term: u64,
pub current_term: u64,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct SafekeeperUtilization {
pub timeline_count: u64,
}

View File

@@ -8,7 +8,7 @@ license.workspace = true
default = []
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
# which adds some runtime cost to run tests on outage conditions
testing = ["fail/failpoints", "pageserver_api/testing", "wal_decoder/testing", "pageserver_client/testing"]
testing = ["fail/failpoints", "pageserver_api/testing", "wal_decoder/testing"]
[dependencies]
anyhow.workspace = true
@@ -114,7 +114,3 @@ harness = false
[[bench]]
name = "upload_queue"
harness = false
[[bin]]
name = "test_helper_slow_client_reads"
required-features = [ "testing" ]

View File

@@ -4,9 +4,6 @@ version = "0.1.0"
edition.workspace = true
license.workspace = true
[features]
testing = [ "pageserver_api/testing" ]
[dependencies]
pageserver_api.workspace = true
thiserror.workspace = true

View File

@@ -1,9 +1,6 @@
use std::sync::{Arc, Mutex};
use std::pin::Pin;
use futures::{
stream::{SplitSink, SplitStream},
SinkExt, StreamExt,
};
use futures::SinkExt;
use pageserver_api::{
models::{
PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest,
@@ -13,6 +10,7 @@ use pageserver_api::{
};
use tokio::task::JoinHandle;
use tokio_postgres::CopyOutStream;
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
use utils::{
id::{TenantId, TimelineId},
@@ -64,28 +62,15 @@ impl Client {
.client
.copy_both_simple(&format!("pagestream_v3 {tenant_id} {timeline_id}"))
.await?;
let (sink, stream) = copy_both.split(); // TODO: actually support splitting of the CopyBothDuplex so the lock inside this split adaptor goes away.
let Client {
cancel_on_client_drop,
conn_task,
client: _,
} = self;
let shared = Arc::new(Mutex::new(PagestreamShared::ConnTaskRunning(
ConnTaskRunning {
cancel_on_client_drop,
conn_task,
},
)));
Ok(PagestreamClient {
sink: PagestreamSender {
shared: shared.clone(),
sink,
},
stream: PagestreamReceiver {
shared: shared.clone(),
stream,
},
shared,
copy_both: Box::pin(copy_both),
conn_task,
cancel_on_client_drop,
})
}
@@ -112,28 +97,7 @@ impl Client {
/// Create using [`Client::pagestream`].
pub struct PagestreamClient {
shared: Arc<Mutex<PagestreamShared>>,
sink: PagestreamSender,
stream: PagestreamReceiver,
}
pub struct PagestreamSender {
#[allow(dead_code)]
shared: Arc<Mutex<PagestreamShared>>,
sink: SplitSink<tokio_postgres::CopyBothDuplex<bytes::Bytes>, bytes::Bytes>,
}
pub struct PagestreamReceiver {
#[allow(dead_code)]
shared: Arc<Mutex<PagestreamShared>>,
stream: SplitStream<tokio_postgres::CopyBothDuplex<bytes::Bytes>>,
}
enum PagestreamShared {
ConnTaskRunning(ConnTaskRunning),
ConnTaskCancelledJoinHandleReturnedOrDropped,
}
struct ConnTaskRunning {
copy_both: Pin<Box<tokio_postgres::CopyBothDuplex<bytes::Bytes>>>,
cancel_on_client_drop: Option<tokio_util::sync::DropGuard>,
conn_task: JoinHandle<()>,
}
@@ -146,11 +110,11 @@ pub struct RelTagBlockNo {
impl PagestreamClient {
pub async fn shutdown(self) {
let Self {
shared,
sink,
stream,
} = { self };
// The `copy_both` split into `sink` and `stream` contains internal channel sender, the receiver of which is polled by `conn_task`.
copy_both,
cancel_on_client_drop: cancel_conn_task,
conn_task,
} = self;
// The `copy_both` contains internal channel sender, the receiver of which is polled by `conn_task`.
// When `conn_task` observes the sender has been dropped, it sends a `FeMessage::CopyFail` into the connection.
// (see https://github.com/neondatabase/rust-postgres/blob/2005bf79573b8add5cf205b52a2b208e356cc8b0/tokio-postgres/src/copy_both.rs#L56).
//
@@ -167,77 +131,27 @@ impl PagestreamClient {
//
// NB: page_service doesn't have a use case to exit the `pagestream` mode currently.
// => https://github.com/neondatabase/neon/issues/6390
let ConnTaskRunning {
cancel_on_client_drop,
conn_task,
} = {
let mut guard = shared.lock().unwrap();
match std::mem::replace(
&mut *guard,
PagestreamShared::ConnTaskCancelledJoinHandleReturnedOrDropped,
) {
PagestreamShared::ConnTaskRunning(conn_task_running) => conn_task_running,
PagestreamShared::ConnTaskCancelledJoinHandleReturnedOrDropped => unreachable!(),
}
};
let _ = cancel_on_client_drop.unwrap();
let _ = cancel_conn_task.unwrap();
conn_task.await.unwrap();
// Now drop the split copy_both.
drop(sink);
drop(stream);
}
pub fn split(self) -> (PagestreamSender, PagestreamReceiver) {
let Self {
shared: _,
sink,
stream,
} = self;
(sink, stream)
drop(copy_both);
}
pub async fn getpage(
&mut self,
req: PagestreamGetPageRequest,
) -> anyhow::Result<PagestreamGetPageResponse> {
self.getpage_send(req).await?;
self.getpage_recv().await
}
let req = PagestreamFeMessage::GetPage(req);
let req: bytes::Bytes = req.serialize();
// let mut req = tokio_util::io::ReaderStream::new(&req);
let mut req = tokio_stream::once(Ok(req));
pub async fn getpage_send(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()> {
self.sink.getpage_send(req).await
}
self.copy_both.send_all(&mut req).await?;
pub async fn getpage_recv(&mut self) -> anyhow::Result<PagestreamGetPageResponse> {
self.stream.getpage_recv().await
}
}
impl PagestreamSender {
// TODO: maybe make this impl Sink instead for better composability?
pub async fn send(&mut self, msg: PagestreamFeMessage) -> anyhow::Result<()> {
let msg = msg.serialize();
self.sink.send_all(&mut tokio_stream::once(Ok(msg))).await?;
Ok(())
}
pub async fn getpage_send(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()> {
self.send(PagestreamFeMessage::GetPage(req)).await
}
}
impl PagestreamReceiver {
// TODO: maybe make this impl Stream instead for better composability?
pub async fn recv(&mut self) -> anyhow::Result<PagestreamBeMessage> {
let next: Option<Result<bytes::Bytes, _>> = self.stream.next().await;
let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await;
let next: bytes::Bytes = next.unwrap()?;
PagestreamBeMessage::deserialize(next)
}
pub async fn getpage_recv(&mut self) -> anyhow::Result<PagestreamGetPageResponse> {
let next: PagestreamBeMessage = self.recv().await?;
match next {
let msg = PagestreamBeMessage::deserialize(next)?;
match msg {
PagestreamBeMessage::GetPage(p) => Ok(p),
PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e),
PagestreamBeMessage::Exists(_)
@@ -246,14 +160,7 @@ impl PagestreamReceiver {
| PagestreamBeMessage::GetSlruSegment(_) => {
anyhow::bail!(
"unexpected be message kind in response to getpage request: {}",
next.kind()
)
}
#[cfg(feature = "testing")]
PagestreamBeMessage::Test(_) => {
anyhow::bail!(
"unexpected be message kind in response to getpage request: {}",
next.kind()
msg.kind()
)
}
}

View File

@@ -1,65 +0,0 @@
use std::{
io::{stdin, stdout, Read, Write},
time::Duration,
};
use clap::Parser;
use pageserver_api::models::{PagestreamRequest, PagestreamTestRequest};
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
};
#[derive(clap::Parser)]
struct Args {
connstr: String,
tenant_id: TenantId,
timeline_id: TimelineId,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let Args {
connstr,
tenant_id,
timeline_id,
} = Args::parse();
let client = pageserver_client::page_service::Client::new(connstr).await?;
let client = client.pagestream(tenant_id, timeline_id).await?;
let (mut sender, _receiver) = client.split();
eprintln!("filling the pipe");
let mut msg = 0;
loop {
msg += 1;
let fut = sender.send(pageserver_api::models::PagestreamFeMessage::Test(
PagestreamTestRequest {
hdr: PagestreamRequest {
reqid: 0,
request_lsn: Lsn(23),
not_modified_since: Lsn(23),
},
batch_key: 42,
message: format!("message {}", msg),
},
));
let Ok(res) = tokio::time::timeout(Duration::from_secs(10), fut).await else {
eprintln!("pipe seems full");
break;
};
let _: () = res?;
}
let n = stdout().write(b"R")?;
assert_eq!(n, 1);
stdout().flush()?;
eprintln!("waiting for signal to tell us to exit");
let mut buf = [0u8; 1];
stdin().read_exact(&mut buf)?;
eprintln!("termination signal received, exiting");
anyhow::Ok(())
}

View File

@@ -100,32 +100,6 @@ pub(crate) static VEC_READ_NUM_LAYERS_VISITED: Lazy<Histogram> = Lazy::new(|| {
.expect("failed to define a metric")
});
pub(crate) static CONCURRENT_INITDBS: Lazy<UIntGauge> = Lazy::new(|| {
register_uint_gauge!(
"pageserver_concurrent_initdb",
"Number of initdb processes running"
)
.expect("failed to define a metric")
});
pub(crate) static INITDB_SEMAPHORE_ACQUISITION_TIME: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_initdb_semaphore_seconds_global",
"Time spent getting a permit from the global initdb semaphore",
STORAGE_OP_BUCKETS.into()
)
.expect("failed to define metric")
});
pub(crate) static INITDB_RUN_TIME: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_initdb_seconds_global",
"Time spent performing initdb",
STORAGE_OP_BUCKETS.into()
)
.expect("failed to define metric")
});
// Metrics collected on operations on the storage repository.
#[derive(
Clone, Copy, enum_map::Enum, strum_macros::EnumString, strum_macros::Display, IntoStaticStr,
@@ -1489,8 +1463,6 @@ pub enum SmgrQueryType {
GetPageAtLsn,
GetDbSize,
GetSlruSegment,
#[cfg(feature = "testing")]
Test,
}
pub(crate) struct SmgrQueryTimePerTimeline {

View File

@@ -555,52 +555,37 @@ struct BatchedGetPageRequest {
timer: SmgrOpTimer,
}
#[cfg(feature = "testing")]
struct BatchedTestRequest {
req: models::PagestreamTestRequest,
timer: SmgrOpTimer,
}
/// NB: we only hold [`timeline::handle::WeakHandle`] inside this enum,
/// so that we don't keep the [`Timeline::gate`] open while the batch
/// is being built up inside the [`spsc_fold`] (pagestream pipelining).
enum BatchedFeMessage {
Exists {
span: Span,
timer: SmgrOpTimer,
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
shard: timeline::handle::Handle<TenantManagerTypes>,
req: models::PagestreamExistsRequest,
},
Nblocks {
span: Span,
timer: SmgrOpTimer,
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
shard: timeline::handle::Handle<TenantManagerTypes>,
req: models::PagestreamNblocksRequest,
},
GetPage {
span: Span,
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
shard: timeline::handle::Handle<TenantManagerTypes>,
effective_request_lsn: Lsn,
pages: smallvec::SmallVec<[BatchedGetPageRequest; 1]>,
},
DbSize {
span: Span,
timer: SmgrOpTimer,
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
shard: timeline::handle::Handle<TenantManagerTypes>,
req: models::PagestreamDbSizeRequest,
},
GetSlruSegment {
span: Span,
timer: SmgrOpTimer,
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
shard: timeline::handle::Handle<TenantManagerTypes>,
req: models::PagestreamGetSlruSegmentRequest,
},
#[cfg(feature = "testing")]
Test {
span: Span,
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
requests: Vec<BatchedTestRequest>,
},
RespondError {
span: Span,
error: BatchedPageStreamError,
@@ -621,12 +606,6 @@ impl BatchedFeMessage {
page.timer.observe_execution_start(at);
}
}
#[cfg(feature = "testing")]
BatchedFeMessage::Test { requests, .. } => {
for req in requests {
req.timer.observe_execution_start(at);
}
}
BatchedFeMessage::RespondError { .. } => {}
}
}
@@ -756,7 +735,7 @@ impl PageServerHandler {
BatchedFeMessage::Exists {
span,
timer,
shard: shard.downgrade(),
shard,
req,
}
}
@@ -775,7 +754,7 @@ impl PageServerHandler {
BatchedFeMessage::Nblocks {
span,
timer,
shard: shard.downgrade(),
shard,
req,
}
}
@@ -794,7 +773,7 @@ impl PageServerHandler {
BatchedFeMessage::DbSize {
span,
timer,
shard: shard.downgrade(),
shard,
req,
}
}
@@ -813,7 +792,7 @@ impl PageServerHandler {
BatchedFeMessage::GetSlruSegment {
span,
timer,
shard: shard.downgrade(),
shard,
req,
}
}
@@ -865,7 +844,6 @@ impl PageServerHandler {
)
.await?;
// We're holding the Handle
let effective_request_lsn = match Self::wait_or_get_last_lsn(
&shard,
req.hdr.request_lsn,
@@ -883,27 +861,11 @@ impl PageServerHandler {
};
BatchedFeMessage::GetPage {
span,
shard: shard.downgrade(),
shard,
effective_request_lsn,
pages: smallvec::smallvec![BatchedGetPageRequest { req, timer }],
}
}
#[cfg(feature = "testing")]
PagestreamFeMessage::Test(req) => {
let span = tracing::info_span!(parent: parent_span, "handle_test_request");
let shard = timeline_handles
.get(tenant_id, timeline_id, ShardSelector::Zero)
.instrument(span.clone()) // sets `shard_id` field
.await?;
let timer =
record_op_start_and_throttle(&shard, metrics::SmgrQueryType::Test, received_at)
.await?;
BatchedFeMessage::Test {
span,
shard: shard.downgrade(),
requests: vec![BatchedTestRequest { req, timer }],
}
}
};
Ok(Some(batched_msg))
}
@@ -945,7 +907,9 @@ impl PageServerHandler {
assert_eq!(accum_pages.len(), max_batch_size.get());
return false;
}
if !accum_shard.is_same_handle_as(&this_shard) {
if (accum_shard.tenant_shard_id, accum_shard.timeline_id)
!= (this_shard.tenant_shard_id, this_shard.timeline_id)
{
trace!(%accum_lsn, %this_lsn, "stopping batching because timeline object mismatch");
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
// But the current logic for keeping responses in order does not support that.
@@ -964,44 +928,6 @@ impl PageServerHandler {
accum_pages.extend(this_pages);
Ok(())
}
#[cfg(feature = "testing")]
(
Ok(BatchedFeMessage::Test {
shard: accum_shard,
requests: accum_requests,
..
}),
BatchedFeMessage::Test {
shard: this_shard,
requests: this_requests,
..
},
) if (|| {
assert!(this_requests.len() == 1);
if accum_requests.len() >= max_batch_size.get() {
trace!(%max_batch_size, "stopping batching because of batch size");
assert_eq!(accum_requests.len(), max_batch_size.get());
return false;
}
if !accum_shard.is_same_handle_as(&this_shard) {
trace!("stopping batching because timeline object mismatch");
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
// But the current logic for keeping responses in order does not support that.
return false;
}
let this_batch_key = this_requests[0].req.batch_key;
let accum_batch_key = accum_requests[0].req.batch_key;
if this_requests[0].req.batch_key != accum_requests[0].req.batch_key {
trace!(%accum_batch_key, %this_batch_key, "stopping batching because batch key changed");
return false;
}
true
})() =>
{
// ok to batch
accum_requests.extend(this_requests);
Ok(())
}
// something batched already but this message is unbatchable
(_, this_msg) => {
// by default, don't continue batching
@@ -1043,7 +969,7 @@ impl PageServerHandler {
fail::fail_point!("ps::handle-pagerequest-message::exists");
(
vec![self
.handle_get_rel_exists_request(&*shard.upgrade()?, &req, ctx)
.handle_get_rel_exists_request(&shard, &req, ctx)
.instrument(span.clone())
.await
.map(|msg| (msg, timer))
@@ -1060,7 +986,7 @@ impl PageServerHandler {
fail::fail_point!("ps::handle-pagerequest-message::nblocks");
(
vec![self
.handle_get_nblocks_request(&*shard.upgrade()?, &req, ctx)
.handle_get_nblocks_request(&shard, &req, ctx)
.instrument(span.clone())
.await
.map(|msg| (msg, timer))
@@ -1081,7 +1007,7 @@ impl PageServerHandler {
trace!(npages, "handling getpage request");
let res = self
.handle_get_page_at_lsn_request_batched(
&*shard.upgrade()?,
&shard,
effective_request_lsn,
pages,
ctx,
@@ -1103,7 +1029,7 @@ impl PageServerHandler {
fail::fail_point!("ps::handle-pagerequest-message::dbsize");
(
vec![self
.handle_db_size_request(&*shard.upgrade()?, &req, ctx)
.handle_db_size_request(&shard, &req, ctx)
.instrument(span.clone())
.await
.map(|msg| (msg, timer))
@@ -1120,7 +1046,7 @@ impl PageServerHandler {
fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
(
vec![self
.handle_get_slru_segment_request(&*shard.upgrade()?, &req, ctx)
.handle_get_slru_segment_request(&shard, &req, ctx)
.instrument(span.clone())
.await
.map(|msg| (msg, timer))
@@ -1128,27 +1054,6 @@ impl PageServerHandler {
span,
)
}
#[cfg(feature = "testing")]
BatchedFeMessage::Test {
span,
shard,
requests,
} => {
fail::fail_point!("ps::handle-pagerequest-message::test");
(
{
let npages = requests.len();
trace!(npages, "handling getpage request");
let res = self
.handle_test_request_batch(&*shard.upgrade()?, requests, ctx)
.instrument(span.clone())
.await;
assert_eq!(res.len(), npages);
res
},
span,
)
}
BatchedFeMessage::RespondError { span, error } => {
// We've already decided to respond with an error, so we don't need to
// call the handler.
@@ -1886,51 +1791,6 @@ impl PageServerHandler {
))
}
// NB: this impl mimics what we do for batched getpage requests.
#[cfg(feature = "testing")]
#[instrument(skip_all, fields(shard_id))]
async fn handle_test_request_batch(
&mut self,
timeline: &Timeline,
requests: Vec<BatchedTestRequest>,
_ctx: &RequestContext,
) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>> {
// real requests would do something with the timeline
let mut results = Vec::with_capacity(requests.len());
for _req in requests.iter() {
tokio::task::yield_now().await;
results.push({
if timeline.cancel.is_cancelled() {
Err(PageReconstructError::Cancelled)
} else {
Ok(())
}
});
}
// TODO: avoid creating the new Vec here
Vec::from_iter(
requests
.into_iter()
.zip(results.into_iter())
.map(|(req, res)| {
res.map(|()| {
(
PagestreamBeMessage::Test(models::PagestreamTestResponse {
req: req.req.clone(),
}),
req.timer,
)
})
.map_err(|e| BatchedPageStreamError {
err: PageStreamError::from(e),
req: req.req.hdr,
})
}),
)
}
/// Note on "fullbackup":
/// Full basebackups should only be used for debugging purposes.
/// Originally, it was introduced to enable breaking storage format changes,
@@ -2546,14 +2406,6 @@ impl From<GetActiveTimelineError> for QueryError {
}
}
impl From<crate::tenant::timeline::handle::HandleUpgradeError> for QueryError {
fn from(e: crate::tenant::timeline::handle::HandleUpgradeError) -> Self {
match e {
crate::tenant::timeline::handle::HandleUpgradeError::ShutDown => QueryError::Shutdown,
}
}
}
fn set_tracing_field_shard_id(timeline: &Timeline) {
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
tracing::Span::current().record(

View File

@@ -95,9 +95,6 @@ use crate::deletion_queue::DeletionQueueError;
use crate::import_datadir;
use crate::is_uninit_mark;
use crate::l0_flush::L0FlushGlobalState;
use crate::metrics::CONCURRENT_INITDBS;
use crate::metrics::INITDB_RUN_TIME;
use crate::metrics::INITDB_SEMAPHORE_ACQUISITION_TIME;
use crate::metrics::TENANT;
use crate::metrics::{
remove_tenant_metrics, BROKEN_TENANTS_SET, CIRCUIT_BREAKERS_BROKEN, CIRCUIT_BREAKERS_UNBROKEN,
@@ -5350,17 +5347,8 @@ async fn run_initdb(
initdb_bin_path, initdb_target_dir, initdb_lib_dir,
);
let _permit = {
let _timer = INITDB_SEMAPHORE_ACQUISITION_TIME.start_timer();
INIT_DB_SEMAPHORE.acquire().await
};
let _permit = INIT_DB_SEMAPHORE.acquire().await;
CONCURRENT_INITDBS.inc();
scopeguard::defer! {
CONCURRENT_INITDBS.dec();
}
let _timer = INITDB_RUN_TIME.start_timer();
let res = postgres_initdb::do_run_initdb(postgres_initdb::RunInitdbArgs {
superuser: &conf.superuser,
locale: &conf.locale,

View File

@@ -382,12 +382,6 @@ pub(crate) struct RemoteTimelineClient {
cancel: CancellationToken,
}
impl Drop for RemoteTimelineClient {
fn drop(&mut self) {
debug!("dropping RemoteTimelineClient");
}
}
impl RemoteTimelineClient {
///
/// Create a remote storage client for given timeline
@@ -803,12 +797,6 @@ impl RemoteTimelineClient {
upload_queue.dirty.metadata.apply(update);
// Defense in depth: if we somehow generated invalid metadata, do not persist it.
upload_queue
.dirty
.validate()
.map_err(|e| anyhow::anyhow!(e))?;
self.schedule_index_upload(upload_queue);
Ok(())

View File

@@ -152,21 +152,6 @@ impl IndexPart {
};
is_same_remote_layer_path(name, metadata, name, index_metadata)
}
/// Check for invariants in the index: this is useful when uploading an index to ensure that if
/// we encounter a bug, we do not persist buggy metadata.
pub(crate) fn validate(&self) -> Result<(), String> {
if self.import_pgdata.is_none()
&& self.metadata.ancestor_timeline().is_none()
&& self.layer_metadata.is_empty()
{
// Unless we're in the middle of a raw pgdata import, or this is a child timeline,the index must
// always have at least one layer.
return Err("Index has no ancestor and no layers".to_string());
}
Ok(())
}
}
/// Metadata gathered for each of the layer files.

View File

@@ -40,10 +40,6 @@ pub(crate) async fn upload_index_part(
});
pausable_failpoint!("before-upload-index-pausable");
// Safety: refuse to persist invalid index metadata, to mitigate the impact of any bug that produces this
// (this should never happen)
index_part.validate().map_err(|e| anyhow::anyhow!(e))?;
// FIXME: this error comes too late
let serialized = index_part.to_json_bytes()?;
let serialized = Bytes::from(serialized);

View File

@@ -1,6 +1,6 @@
use std::time::UNIX_EPOCH;
use pageserver_api::key::{Key, CONTROLFILE_KEY};
use pageserver_api::key::CONTROLFILE_KEY;
use tokio::task::JoinSet;
use utils::{
completion::{self, Completion},
@@ -9,10 +9,7 @@ use utils::{
use super::failpoints::{Failpoint, FailpointKind};
use super::*;
use crate::{
context::DownloadBehavior,
tenant::{harness::test_img, storage_layer::LayerVisibilityHint},
};
use crate::{context::DownloadBehavior, tenant::storage_layer::LayerVisibilityHint};
use crate::{task_mgr::TaskKind, tenant::harness::TenantHarness};
/// Used in tests to advance a future to wanted await point, and not futher.
@@ -34,51 +31,20 @@ async fn smoke_test() {
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Download);
let image_layers = vec![(
Lsn(0x40),
vec![(
Key::from_hex("620000000033333333444444445500000000").unwrap(),
test_img("foo"),
)],
)];
// Create a test timeline with one real layer, and one synthetic test layer. The synthetic
// one is only there so that we can GC the real one without leaving the timeline's metadata
// empty, which is an illegal state (see [`IndexPart::validate`]).
let timeline = tenant
.create_test_timeline_with_layers(
TimelineId::generate(),
Lsn(0x10),
14,
&ctx,
Default::default(),
image_layers,
Lsn(0x100),
)
.create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
.await
.unwrap();
// Grab one of the timeline's layers to exercise in the test, and the other layer that is just
// there to avoid the timeline being illegally empty
let (layer, dummy_layer) = {
let layer = {
let mut layers = {
let layers = timeline.layers.read().await;
layers.likely_resident_layers().cloned().collect::<Vec<_>>()
};
assert_eq!(layers.len(), 2);
assert_eq!(layers.len(), 1);
layers.sort_by_key(|l| l.layer_desc().get_key_range().start);
let synthetic_layer = layers.pop().unwrap();
let real_layer = layers.pop().unwrap();
tracing::info!(
"real_layer={:?} ({}), synthetic_layer={:?} ({})",
real_layer,
real_layer.layer_desc().file_size,
synthetic_layer,
synthetic_layer.layer_desc().file_size
);
(real_layer, synthetic_layer)
layers.swap_remove(0)
};
// all layers created at pageserver are like `layer`, initialized with strong
@@ -207,13 +173,10 @@ async fn smoke_test() {
let rtc = &timeline.remote_client;
// Simulate GC removing our test layer.
{
let mut g = timeline.layers.write().await;
let layers = &[layer];
let mut g = timeline.layers.write().await;
g.open_mut().unwrap().finish_gc_timeline(layers);
// this just updates the remote_physical_size for demonstration purposes
rtc.schedule_gc_update(layers).unwrap();
}
@@ -228,10 +191,7 @@ async fn smoke_test() {
rtc.wait_completion().await.unwrap();
assert_eq!(
rtc.get_remote_physical_size(),
dummy_layer.metadata().file_size
);
assert_eq!(rtc.get_remote_physical_size(), 0);
assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
}

View File

@@ -76,7 +76,6 @@ use std::{pin::pin, sync::OnceLock};
use crate::{
aux_file::AuxFileSizeEstimator,
page_service::TenantManagerTypes,
tenant::{
config::AttachmentMode,
layer_map::{LayerMap, SearchResult},
@@ -432,7 +431,7 @@ pub struct Timeline {
pub(crate) l0_flush_global_state: L0FlushGlobalState,
pub(crate) handles: handle::PerTimelineState<TenantManagerTypes>,
pub(crate) handles: handle::PerTimelineState<crate::page_service::TenantManagerTypes>,
pub(crate) attach_wal_lag_cooldown: Arc<OnceLock<WalLagCooldown>>,
@@ -4626,10 +4625,6 @@ impl Drop for Timeline {
}
}
}
info!(
"Timeline {} for tenant {} is being dropped",
self.timeline_id, self.tenant_shard_id.tenant_id
);
}
}
@@ -5678,17 +5673,9 @@ impl Timeline {
info!("force created image layer {}", image_layer.local_path());
{
let mut guard = self.layers.write().await;
guard
.open_mut()
.unwrap()
.force_insert_layer(image_layer.clone());
guard.open_mut().unwrap().force_insert_layer(image_layer);
}
// Update remote_timeline_client state to reflect existence of this layer
self.remote_client
.schedule_layer_file_upload(image_layer)
.unwrap();
Ok(())
}
@@ -5739,17 +5726,9 @@ impl Timeline {
info!("force created delta layer {}", delta_layer.local_path());
{
let mut guard = self.layers.write().await;
guard
.open_mut()
.unwrap()
.force_insert_layer(delta_layer.clone());
guard.open_mut().unwrap().force_insert_layer(delta_layer);
}
// Update remote_timeline_client state to reflect existence of this layer
self.remote_client
.schedule_layer_file_upload(delta_layer)
.unwrap();
Ok(())
}

View File

@@ -32,151 +32,54 @@
//!
//! # Design
//!
//! ## Data Structures
//!
//! There are three user-facing data structures:
//! - `PerTimelineState`: a struct embedded into each Timeline struct. Lifetime == Timeline lifetime.
//! - `Cache`: a struct private to each connection handler; Lifetime == connection lifetime.
//! - `Handle`: a smart pointer that holds the Timeline gate open and derefs to `&Timeline`.
//! - `WeakHandle`: downgrade of a `Handle` that does not keep the gate open, but allows
//! trying to ugprade back to a `Handle`, guaranteeing it's the same `Timeline` *object*.
//! Lifetime: for a single request dispatch on the Timeline (i.e., one getpage request)
//!
//! Internally, there is 0 or 1 `HandleInner` per `(Cache,Timeline)`.
//! Since Cache:Connection is 1:1, there is 0 or 1 `HandleInner` per `(Connection,Timeline)`.
//! The `Handle` is just a wrapper around an `Arc<HandleInner>`.
//!
//! The `HandleInner` is allocated as a `Arc<Mutex<HandleInner>>` and
//! referenced weakly and strongly from various places which we are now illustrating.
//! For brevity, we will omit the `Arc<Mutex<>>` part in the following and instead
//! use `strong ref` and `weak ref` when referring to the `Arc<Mutex<HandleInner>>`
//! or `Weak<Mutex<HandleInner>>`, respectively.
//!
//! - The `Handle` is a strong ref.
//! - The `WeakHandle` is a weak ref.
//! - The `PerTimelineState` contains a `HashMap<CacheId, strong ref>`.
//! - The `Cache` is a `HashMap<unique identifier for the shard, weak ref>`.
//!
//! Lifetimes:
//! - `WeakHandle` and `Handle`: single pagestream request.
//! - `Cache`: single page service connection.
//! - `PerTimelineState`: lifetime of the Timeline object (i.e., i.e., till `Timeline::shutdown`).
//!
//! ## Request Handling Flow (= filling and using the `Cache``)
//! There is one long-lived `Arc<HandleInner>`, which is stored in the `PerTimelineState`.
//! The `Cache` stores a `Weak<HandleInner>` for each cached Timeline.
//!
//! To dispatch a request, the page service connection calls `Cache::get`.
//!
//! A cache miss means we consult the tenant manager for shard routing,
//! resulting in an `Arc<Timeline>`. We enter its gate _once_ and store it in the the
//! `Arc<Mutex<HandleInner>>>`. A weak ref is stored in the `Cache`
//! and a strong ref in the `PerTimelineState`.
//! A strong ref is returned wrapped in a `Handle`.
//! resulting in an `Arc<Timeline>`. We enter its gate _once_ and construct an
//! `Arc<HandleInner>`. We store a `Weak<HandleInner>` in the cache
//! and the `Arc<HandleInner>` in the `PerTimelineState`.
//!
//! For subsequent requests, `Cache::get` will perform a "fast path" shard routing
//! and find the weak ref in the cache.
//! We upgrade the weak ref to a strong ref and return it wrapped in a `Handle`.
//! and find the `Weak<HandleInner>` in the cache.
//! We upgrade the `Weak<HandleInner>` to an `Arc<HandleInner>` and wrap it in the user-facing `Handle` type.
//!
//! The pagestream processing is pipelined and involves a batching step.
//! While a request is batching, the `Handle` is downgraded to a `WeakHandle`.
//! When the batch is ready to be executed, the `WeakHandle` is upgraded back to a `Handle`
//! and the request handler dispatches the request to the right `<Handle as Deref<Target = Timeline>>::$request_method`.
//! The request handler dispatches the request to the right `<Handle as Deref<Target = Timeline>>::$request_method`.
//! It then drops the `Handle`, which drops the `Arc<HandleInner>`.
//!
//! # Performance
//! # Memory Management / How The Reference Cycle Is Broken
//!
//! Remember from the introductory section:
//! The attentive reader may have noticed the strong reference cycle
//! from `Arc<HandleInner>` to `PerTimelineState` to `Arc<Timeline>`.
//!
//! > However, we want to avoid the overhead of entering the gate for every
//! > method invocation.
//!
//! Why do we want to avoid that?
//! Because the gate is a shared location in memory and entering it involves
//! bumping refcounts, which leads to cache contention if done frequently
//! from multiple cores in parallel.
//!
//! So, we only acquire the `GateGuard` once on `Cache` miss, and wrap it in an `Arc`.
//! That `Arc` is private to the `HandleInner` and hence to the connection.
//! (Review the "Data Structures" section if that is unclear to you.)
//!
//! A `WeakHandle` is a weak ref to the `HandleInner`.
//! When upgrading a `WeakHandle`, we upgrade to a strong ref to the `HandleInner` and
//! further acquire an additional strong ref to the `Arc<GateGuard>` inside it.
//! Again, this manipulation of ref counts is is cheap because `Arc` is private to the connection.
//!
//! When downgrading a `Handle` to a `WeakHandle`, we drop the `Arc<GateGuard>`.
//! Again, this is cheap because the `Arc` is private to the connection.
//!
//! In addition to the GateGuard, we need to provide `Deref<Target=Timeline>` impl.
//! For this, both `Handle` need infallible access to an `Arc<Timeline>`.
//! We could clone the `Arc<Timeline>` when upgrading a `WeakHandle`, but that would cause contention
//! on the shared memory location that trakcs the refcount of the `Arc<Timeline>`.
//! Instead, we wrap the `Arc<Timeline>` into another `Arc`.
//! so that we can clone it cheaply when upgrading a `WeakHandle`.
//!
//! # Shutdown
//!
//! The attentive reader may have noticed the following reference cycle around the `Arc<Timeline>`:
//!
//! ```text
//! Timeline --owns--> PerTimelineState --strong--> HandleInner --strong--> Timeline
//! ```
//!
//! Further, there is this cycle:
//!
//! ```text
//! Timeline --owns--> PerTimelineState --strong--> HandleInner --strong--> GateGuard --keepalive--> Timeline
//! ```
//!
//! The former cycle is a memory leak if not broken.
//! The latter cycle further prevents the Timeline from shutting down
//! because we certainly won't drop the Timeline while the GateGuard is alive.
//! Preventing shutdown is the whole point of this handle/cache system,
//! but when the Timeline needs to shut down, we need to break the cycle.
//! This cycle is intentional: while it exists, the `Cache` can upgrade its
//! `Weak<HandleInner>` to an `Arc<HandleInner>` in a single atomic operation.
//!
//! The cycle is broken by either
//! - Timeline shutdown (=> `PerTimelineState::shutdown`)
//! - Connection shutdown (=> dropping the `Cache`).
//! - `PerTimelineState::shutdown` or
//! - dropping the `Cache`.
//!
//! Both transition the `HandleInner` from [`HandleInner::KeepingTimelineGateOpen`] to
//! [`HandleInner::ShutDown`], which drops the only long-lived strong ref to the
//! `Arc<GateGuard>`.
//!
//! `PerTimelineState::shutdown` drops all the `HandleInners` it contains,
//! thereby breaking the cycle.
//! It also initiates draining of already existing `Handle`s by
//! poisoning things so that no new `HandleInner`'s can be added
//! to the `PerTimelineState`, which will make subsequent `Cache::get` fail.
//!
//! Concurrently existing / already upgraded `Handle`s will extend the
//! lifetime of the `Arc<Mutex<HandleInner>>` and hence cycles.
//! Concurrently existing `Handle`s will extend the existence of the cycle.
//! However, since `Handle`s are short-lived and new `Handle`s are not
//! handed out from `Cache::get` or `WeakHandle::upgrade` after
//! `PerTimelineState::shutdown`, that extension of the cycle is bounded.
//!
//! Concurrently existing `WeakHandle`s will fail to `upgrade()`:
//! while they will succeed in upgrading `Weak<Mutex<HandleInner>>`,
//! they will find the inner in state `HandleInner::ShutDown` state where the
//! `Arc<GateGuard>` and Timeline has already been dropped.
//!
//! Dropping the `Cache` undoes the registration of this `Cache`'s
//! `HandleInner`s from all the `PerTimelineState`s, i.e., it
//! removes the strong ref to each of its `HandleInner`s
//! from all the `PerTimelineState`.
//!
//! # Locking Rules
//!
//! To prevent deadlocks we:
//!
//! 1. Only ever hold one of the locks at a time.
//! 2. Don't add more than one Drop impl that locks on the
//! cycles above.
//!
//! As per (2), that impl is in `Drop for Cache`.
//! handed out after either `PerTimelineState::shutdown` or `Cache` drop,
//! that extension of the cycle is bounded.
//!
//! # Fast Path for Shard Routing
//!
//! The `Cache` has a fast path for shard routing to avoid calling into
//! the tenant manager for every request.
//!
//! The `Cache` maintains a hash map of `ShardTimelineId` to `WeakHandle`s.
//! The `Cache` maintains a hash map of `ShardTimelineId` to `Weak<HandleInner>`.
//!
//! The current implementation uses the first entry in the hash map
//! to determine the `ShardParameters` and derive the correct
@@ -184,18 +87,18 @@
//!
//! It then looks up the hash map for that `ShardTimelineId := {ShardIndex,TimelineId}`.
//!
//! If the lookup is successful and the `WeakHandle` can be upgraded,
//! If the lookup is successful and the `Weak<HandleInner>` can be upgraded,
//! it's a hit.
//!
//! ## Cache invalidation
//!
//! The insight is that cache invalidation is sufficient and most efficiently if done lazily.
//! The insight is that cache invalidation is sufficient and most efficiently done lazily.
//! The only reasons why an entry in the cache can become stale are:
//! 1. The `PerTimelineState` / Timeline is shutting down e.g. because the shard is
//! being detached, timeline or shard deleted, or pageserver is shutting down.
//! 2. We're doing a shard split and new traffic should be routed to the child shards.
//!
//! Regarding (1), we will eventually fail to upgrade the `WeakHandle` once the
//! Regarding (1), we will eventually fail to upgrade the `Weak<HandleInner>` once the
//! timeline has shut down, and when that happens, we remove the entry from the cache.
//!
//! Regarding (2), the insight is that it is toally fine to keep dispatching requests
@@ -204,6 +107,8 @@
use std::collections::hash_map;
use std::collections::HashMap;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::Weak;
@@ -247,7 +152,7 @@ pub(crate) struct Cache<T: Types> {
map: Map<T>,
}
type Map<T> = HashMap<ShardTimelineId, WeakHandle<T>>;
type Map<T> = HashMap<ShardTimelineId, Weak<HandleInner<T>>>;
impl<T: Types> Default for Cache<T> {
fn default() -> Self {
@@ -265,22 +170,12 @@ pub(crate) struct ShardTimelineId {
}
/// See module-level comment.
pub(crate) struct Handle<T: Types> {
timeline: Arc<T::Timeline>,
#[allow(dead_code)] // the field exists to keep the gate open
gate_guard: Arc<utils::sync::gate::GateGuard>,
inner: Arc<Mutex<HandleInner<T>>>,
}
pub(crate) struct WeakHandle<T: Types> {
inner: Weak<Mutex<HandleInner<T>>>,
}
enum HandleInner<T: Types> {
KeepingTimelineGateOpen {
#[allow(dead_code)]
gate_guard: Arc<utils::sync::gate::GateGuard>,
timeline: Arc<T::Timeline>,
},
ShutDown,
pub(crate) struct Handle<T: Types>(Arc<HandleInner<T>>);
struct HandleInner<T: Types> {
shut_down: AtomicBool,
timeline: T::Timeline,
// The timeline's gate held open.
_gate_guard: utils::sync::gate::GateGuard,
}
/// Embedded in each [`Types::Timeline`] as the anchor for the only long-lived strong ref to `HandleInner`.
@@ -288,8 +183,7 @@ enum HandleInner<T: Types> {
/// See module-level comment for details.
pub struct PerTimelineState<T: Types> {
// None = shutting down
#[allow(clippy::type_complexity)]
handles: Mutex<Option<HashMap<CacheId, Arc<Mutex<HandleInner<T>>>>>>,
handles: Mutex<Option<HashMap<CacheId, Arc<HandleInner<T>>>>>,
}
impl<T: Types> Default for PerTimelineState<T> {
@@ -349,24 +243,49 @@ impl<T: Types> Cache<T> {
shard_selector: ShardSelector,
tenant_manager: &T::TenantManager,
) -> Result<Handle<T>, GetError<T>> {
// terminates because when every iteration we remove an element from the map
let miss: ShardSelector = loop {
// terminates because each iteration removes an element from the map
loop {
let handle = self
.get_impl(timeline_id, shard_selector, tenant_manager)
.await?;
if handle.0.shut_down.load(Ordering::Relaxed) {
let removed = self
.map
.remove(&handle.0.timeline.shard_timeline_id())
.expect("invariant of get_impl is that the returned handle is in the map");
assert!(
Weak::ptr_eq(&removed, &Arc::downgrade(&handle.0)),
"shard_timeline_id() incorrect?"
);
} else {
return Ok(handle);
}
}
}
#[instrument(level = "trace", skip_all)]
async fn get_impl(
&mut self,
timeline_id: TimelineId,
shard_selector: ShardSelector,
tenant_manager: &T::TenantManager,
) -> Result<Handle<T>, GetError<T>> {
let miss: ShardSelector = {
let routing_state = self.shard_routing(timeline_id, shard_selector);
match routing_state {
RoutingResult::FastPath(handle) => return Ok(handle),
RoutingResult::SlowPath(key) => match self.map.get(&key) {
Some(cached) => match cached.upgrade() {
Ok(upgraded) => return Ok(upgraded),
Err(HandleUpgradeError::ShutDown) => {
// TODO: dedup with shard_routing()
Some(upgraded) => return Ok(Handle(upgraded)),
None => {
trace!("handle cache stale");
self.map.remove(&key).unwrap();
continue;
ShardSelector::Known(key.shard_index)
}
},
None => break ShardSelector::Known(key.shard_index),
None => ShardSelector::Known(key.shard_index),
},
RoutingResult::NeedConsultTenantManager => break shard_selector,
RoutingResult::NeedConsultTenantManager => shard_selector,
}
};
self.get_miss(timeline_id, miss, tenant_manager).await
@@ -383,7 +302,7 @@ impl<T: Types> Cache<T> {
let Some((first_key, first_handle)) = self.map.iter().next() else {
return RoutingResult::NeedConsultTenantManager;
};
let Ok(first_handle) = first_handle.upgrade() else {
let Some(first_handle) = first_handle.upgrade() else {
// TODO: dedup with get()
trace!("handle cache stale");
let first_key_owned = *first_key;
@@ -391,7 +310,7 @@ impl<T: Types> Cache<T> {
continue;
};
let first_handle_shard_identity = first_handle.get_shard_identity();
let first_handle_shard_identity = first_handle.timeline.get_shard_identity();
let make_shard_index = |shard_num: ShardNumber| ShardIndex {
shard_number: shard_num,
shard_count: first_handle_shard_identity.count,
@@ -410,11 +329,11 @@ impl<T: Types> Cache<T> {
};
let first_handle_shard_timeline_id = ShardTimelineId {
shard_index: first_handle_shard_identity.shard_index(),
timeline_id: first_handle.shard_timeline_id().timeline_id,
timeline_id: first_handle.timeline.shard_timeline_id().timeline_id,
};
if need_shard_timeline_id == first_handle_shard_timeline_id {
return RoutingResult::FastPath(first_handle);
return RoutingResult::FastPath(Handle(first_handle));
} else {
return RoutingResult::SlowPath(need_shard_timeline_id);
}
@@ -438,30 +357,23 @@ impl<T: Types> Cache<T> {
ShardSelector::Known(idx) => assert_eq!(idx, &key.shard_index),
}
trace!("creating new HandleInner");
let handle_inner_arc = Arc::new(Mutex::new(HandleInner::KeepingTimelineGateOpen {
gate_guard: Arc::new(
// this enter() is expensive in production code because
// it hits the global Arc<Timeline>::gate refcounts
match timeline.gate().enter() {
Ok(guard) => guard,
Err(_) => {
return Err(GetError::TimelineGateClosed);
}
},
),
// this clone is expensive in production code because
// it hits the global Arc<Timeline>::clone refcounts
timeline: Arc::new(timeline.clone()),
}));
let handle_weak = WeakHandle {
inner: Arc::downgrade(&handle_inner_arc),
let gate_guard = match timeline.gate().enter() {
Ok(guard) => guard,
Err(_) => {
return Err(GetError::TimelineGateClosed);
}
};
let handle = handle_weak
.upgrade()
.ok()
.expect("we just created it and it's not linked anywhere yet");
{
trace!("creating new HandleInner");
let handle = Arc::new(
// TODO: global metric that keeps track of the number of live HandlerTimeline instances
// so we can identify reference cycle bugs.
HandleInner {
shut_down: AtomicBool::new(false),
_gate_guard: gate_guard,
timeline: timeline.clone(),
},
);
let handle = {
let mut lock_guard = timeline
.per_timeline_state()
.handles
@@ -469,8 +381,7 @@ impl<T: Types> Cache<T> {
.expect("mutex poisoned");
match &mut *lock_guard {
Some(per_timeline_state) => {
let replaced =
per_timeline_state.insert(self.id, Arc::clone(&handle_inner_arc));
let replaced = per_timeline_state.insert(self.id, Arc::clone(&handle));
assert!(replaced.is_none(), "some earlier code left a stale handle");
match self.map.entry(key) {
hash_map::Entry::Occupied(_o) => {
@@ -481,7 +392,8 @@ impl<T: Types> Cache<T> {
unreachable!()
}
hash_map::Entry::Vacant(v) => {
v.insert(handle_weak);
v.insert(Arc::downgrade(&handle));
handle
}
}
}
@@ -489,62 +401,14 @@ impl<T: Types> Cache<T> {
return Err(GetError::PerTimelineStateShutDown);
}
}
}
Ok(handle)
};
Ok(Handle(handle))
}
Err(e) => Err(GetError::TenantManager(e)),
}
}
}
pub(crate) enum HandleUpgradeError {
ShutDown,
}
impl<T: Types> WeakHandle<T> {
pub(crate) fn upgrade(&self) -> Result<Handle<T>, HandleUpgradeError> {
let Some(inner) = Weak::upgrade(&self.inner) else {
return Err(HandleUpgradeError::ShutDown);
};
let lock_guard = inner.lock().expect("poisoned");
match &*lock_guard {
HandleInner::KeepingTimelineGateOpen {
timeline,
gate_guard,
} => {
let gate_guard = Arc::clone(gate_guard);
let timeline = Arc::clone(timeline);
drop(lock_guard);
Ok(Handle {
timeline,
gate_guard,
inner,
})
}
HandleInner::ShutDown => Err(HandleUpgradeError::ShutDown),
}
}
pub(crate) fn is_same_handle_as(&self, other: &WeakHandle<T>) -> bool {
Weak::ptr_eq(&self.inner, &other.inner)
}
}
impl<T: Types> std::ops::Deref for Handle<T> {
type Target = T::Timeline;
fn deref(&self) -> &Self::Target {
&self.timeline
}
}
impl<T: Types> Handle<T> {
pub(crate) fn downgrade(&self) -> WeakHandle<T> {
WeakHandle {
inner: Arc::downgrade(&self.inner),
}
}
}
impl<T: Types> PerTimelineState<T> {
/// After this method returns, [`Cache::get`] will never again return a [`Handle`]
/// to the [`Types::Timeline`] that embeds this per-timeline state.
@@ -566,54 +430,43 @@ impl<T: Types> PerTimelineState<T> {
trace!("already shut down");
return;
};
for handle_inner_arc in handles.values() {
for handle in handles.values() {
// Make hits fail.
let mut lock_guard = handle_inner_arc.lock().expect("poisoned");
lock_guard.shutdown();
handle.shut_down.store(true, Ordering::Relaxed);
}
drop(handles);
}
}
impl<T: Types> std::ops::Deref for Handle<T> {
type Target = T::Timeline;
fn deref(&self) -> &Self::Target {
&self.0.timeline
}
}
#[cfg(test)]
impl<T: Types> Drop for HandleInner<T> {
fn drop(&mut self) {
trace!("HandleInner dropped");
}
}
// When dropping a [`Cache`], prune its handles in the [`PerTimelineState`] to break the reference cycle.
impl<T: Types> Drop for Cache<T> {
fn drop(&mut self) {
for (
_,
WeakHandle {
inner: handle_inner_weak,
},
) in self.map.drain()
{
let Some(handle_inner_arc) = handle_inner_weak.upgrade() else {
continue;
};
let handle_timeline = handle_inner_arc
// locking rules: drop lock before acquiring other lock below
.lock()
.expect("poisoned")
.shutdown();
let per_timeline_state = handle_timeline.per_timeline_state();
let mut handles_lock_guard = per_timeline_state.handles.lock().expect("mutex poisoned");
let Some(handles) = &mut *handles_lock_guard else {
continue;
};
let Some(removed_handle_inner_arc) = handles.remove(&self.id) else {
// There could have been a shutdown inbetween us upgrading the weak and locking the mutex.
continue;
};
drop(handles_lock_guard); // locking rules: remember them when!
assert!(Arc::ptr_eq(&removed_handle_inner_arc, &handle_inner_arc,));
}
}
}
impl<T: Types> HandleInner<T> {
fn shutdown(&mut self) -> Arc<T::Timeline> {
match std::mem::replace(self, HandleInner::ShutDown) {
HandleInner::KeepingTimelineGateOpen { timeline, .. } => timeline,
HandleInner::ShutDown => {
unreachable!("handles are only shut down once in their lifetime");
for (_, weak) in self.map.drain() {
if let Some(strong) = weak.upgrade() {
// handle is still being kept alive in PerTimelineState
let timeline = strong.timeline.per_timeline_state();
let mut handles = timeline.handles.lock().expect("mutex poisoned");
if let Some(handles) = &mut *handles {
let Some(removed) = handles.remove(&self.id) else {
// There could have been a shutdown inbetween us upgrading the weak and locking the mutex.
continue;
};
assert!(Arc::ptr_eq(&removed, &strong));
}
}
}
}
@@ -621,8 +474,6 @@ impl<T: Types> HandleInner<T> {
#[cfg(test)]
mod tests {
use std::sync::Weak;
use pageserver_api::{
key::{rel_block_to_key, Key, DBDIR_KEY},
models::ShardParameters,
@@ -732,13 +583,39 @@ mod tests {
//
// fill the cache
//
assert_eq!(
(Arc::strong_count(&shard0), Arc::weak_count(&shard0)),
(2, 1),
"strong: shard0, mgr; weak: myself"
);
let handle: Handle<_> = cache
.get(timeline_id, ShardSelector::Page(key), &mgr)
.await
.expect("we have the timeline");
let handle_inner_weak = Arc::downgrade(&handle.0);
assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
assert_eq!(
(
Weak::strong_count(&handle_inner_weak),
Weak::weak_count(&handle_inner_weak)
),
(2, 2),
"strong: handle, per_timeline_state, weak: handle_inner_weak, cache"
);
assert_eq!(cache.map.len(), 1);
assert_eq!(
(Arc::strong_count(&shard0), Arc::weak_count(&shard0)),
(3, 1),
"strong: handleinner(per_timeline_state), shard0, mgr; weak: myself"
);
drop(handle);
assert_eq!(
(Arc::strong_count(&shard0), Arc::weak_count(&shard0)),
(3, 1),
"strong: handleinner(per_timeline_state), shard0, mgr; weak: myself"
);
//
// demonstrate that Handle holds up gate closure
@@ -763,11 +640,21 @@ mod tests {
// SHUTDOWN
shard0.per_timeline_state.shutdown(); // keeping handle alive across shutdown
assert_eq!(
1,
Weak::strong_count(&handle_inner_weak),
"through local var handle"
);
assert_eq!(
cache.map.len(),
1,
"this is an implementation detail but worth pointing out: we can't clear the cache from shutdown(), it's cleared on first access after"
);
assert_eq!(
(Arc::strong_count(&shard0), Arc::weak_count(&shard0)),
(3, 1),
"strong: handleinner(via handle), shard0, mgr; weak: myself"
);
// this handle is perfectly usable
handle.getpage();
@@ -791,6 +678,16 @@ mod tests {
}
drop(handle);
assert_eq!(
0,
Weak::strong_count(&handle_inner_weak),
"the HandleInner destructor already ran"
);
assert_eq!(
(Arc::strong_count(&shard0), Arc::weak_count(&shard0)),
(2, 1),
"strong: shard0, mgr; weak: myself"
);
// closing gate succeeds after dropping handle
tokio::select! {
@@ -809,8 +706,10 @@ mod tests {
assert_eq!(cache.map.len(), 0);
// ensure all refs to shard0 are gone and we're not leaking anything
let myself = Weak::clone(&shard0.myself);
drop(shard0);
drop(mgr);
assert_eq!(Weak::strong_count(&myself), 0);
}
#[tokio::test]
@@ -1049,11 +948,15 @@ mod tests {
handle
};
handle.getpage();
used_handles.push(Arc::downgrade(&handle.timeline));
used_handles.push(Arc::downgrade(&handle.0));
}
// No handles exist, thus gates are closed and don't require shutdown.
// Thus the gate should close immediately, even without shutdown.
// No handles exist, thus gates are closed and don't require shutdown
assert!(used_handles
.iter()
.all(|weak| Weak::strong_count(weak) == 0));
// ... thus the gate should close immediately, even without shutdown
tokio::select! {
_ = shard0.gate.close() => { }
_ = tokio::time::sleep(FOREVER) => {
@@ -1061,75 +964,4 @@ mod tests {
}
}
}
#[tokio::test(start_paused = true)]
async fn test_weak_handles() {
crate::tenant::harness::setup_logging();
let timeline_id = TimelineId::generate();
let shard0 = Arc::new_cyclic(|myself| StubTimeline {
gate: Default::default(),
id: timeline_id,
shard: ShardIdentity::unsharded(),
per_timeline_state: PerTimelineState::default(),
myself: myself.clone(),
});
let mgr = StubManager {
shards: vec![shard0.clone()],
};
let refcount_start = Arc::strong_count(&shard0);
let key = DBDIR_KEY;
let mut cache = Cache::<TestTypes>::default();
let handle = cache
.get(timeline_id, ShardSelector::Page(key), &mgr)
.await
.expect("we have the timeline");
assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
let weak_handle = handle.downgrade();
drop(handle);
let upgraded_handle = weak_handle.upgrade().ok().expect("we can upgrade it");
// Start shutdown
shard0.per_timeline_state.shutdown();
// Upgrades during shutdown don't work, even if upgraded_handle exists.
weak_handle
.upgrade()
.err()
.expect("can't upgrade weak handle as soon as shutdown started");
// But upgraded_handle is still alive, so the gate won't close.
tokio::select! {
_ = shard0.gate.close() => {
panic!("handle is keeping gate open");
}
_ = tokio::time::sleep(FOREVER) => { }
}
// Drop the last handle.
drop(upgraded_handle);
// The gate should close now, despite there still being a weak_handle.
tokio::select! {
_ = shard0.gate.close() => { }
_ = tokio::time::sleep(FOREVER) => {
panic!("only strong handle is dropped and we shut down per-timeline-state")
}
}
// The weak handle still can't be upgraded.
weak_handle
.upgrade()
.err()
.expect("still shouldn't be able to upgrade the weak handle");
// There should be no strong references to the timeline object except the one on "stack".
assert_eq!(Arc::strong_count(&shard0), refcount_start);
}
}

View File

@@ -34,8 +34,6 @@ typedef enum
T_NeonGetPageRequest,
T_NeonDbSizeRequest,
T_NeonGetSlruSegmentRequest,
/* future tags above this line */
T_NeonTestRequest = 99, /* only in cfg(feature = "testing") */
/* pagestore -> pagestore_client */
T_NeonExistsResponse = 100,
@@ -44,8 +42,6 @@ typedef enum
T_NeonErrorResponse,
T_NeonDbSizeResponse,
T_NeonGetSlruSegmentResponse,
/* future tags above this line */
T_NeonTestResponse = 199, /* only in cfg(feature = "testing") */
} NeonMessageTag;
typedef uint64 NeonRequestId;

View File

@@ -14,7 +14,7 @@ use proxy::auth::backend::local::{LocalBackend, JWKS_ROLE_MAP};
use proxy::auth::{self};
use proxy::cancellation::CancellationHandlerMain;
use proxy::config::{
self, obfuscated_proxy_id, AuthenticationConfig, ComputeConfig, HttpConfig, ProxyConfig, RetryConfig
self, AuthenticationConfig, ComputeConfig, HttpConfig, ProxyConfig, RetryConfig,
};
use proxy::control_plane::locks::ApiLocks;
use proxy::control_plane::messages::{EndpointJwksResponse, JwksSettings};
@@ -218,7 +218,6 @@ async fn main() -> anyhow::Result<()> {
proxy::metrics::CancellationSource::Local,
)),
endpoint_rate_limiter,
obfuscated_proxy_id(std::process::id(), "local"),
);
match futures::future::select(pin!(maintenance_tasks.join_next()), pin!(task)).await {

View File

@@ -10,7 +10,6 @@ use clap::Arg;
use futures::future::Either;
use futures::TryFutureExt;
use itertools::Itertools;
use proxy::config::obfuscated_proxy_id;
use proxy::context::RequestContext;
use proxy::metrics::{Metrics, ThreadPoolMetrics};
use proxy::protocol2::ConnectionInfo;
@@ -186,7 +185,6 @@ async fn task_main(
},
proxy::metrics::Protocol::SniRouter,
"sni",
obfuscated_proxy_id(std::process::id(), "sni-router"), // just a shim for context
);
handle_client(ctx, dest_suffix, tls_config, tls_server_end_point, socket).await
}

View File

@@ -10,7 +10,7 @@ use proxy::auth::backend::{AuthRateLimiter, ConsoleRedirectBackend, MaybeOwned};
use proxy::cancellation::{CancelMap, CancellationHandler};
use proxy::config::{
self, remote_storage_from_toml, AuthenticationConfig, CacheOptions, ComputeConfig, HttpConfig,
ProjectInfoCacheOptions, ProxyConfig, ProxyProtocolV2, obfuscated_proxy_id,
ProjectInfoCacheOptions, ProxyConfig, ProxyProtocolV2,
};
use proxy::context::parquet::ParquetUploadArgs;
use proxy::http::health_server::AppMetrics;
@@ -396,8 +396,6 @@ async fn main() -> anyhow::Result<()> {
None => None,
};
let proxy_id: u16 = obfuscated_proxy_id(std::process::id(), &args.region);
let cancellation_handler = Arc::new(CancellationHandler::<
Option<Arc<Mutex<RedisPublisherClient>>>,
>::new(
@@ -439,7 +437,6 @@ async fn main() -> anyhow::Result<()> {
cancellation_token.clone(),
cancellation_handler.clone(),
endpoint_rate_limiter.clone(),
proxy_id,
));
}
@@ -451,7 +448,6 @@ async fn main() -> anyhow::Result<()> {
cancellation_token.clone(),
cancellation_handler.clone(),
endpoint_rate_limiter.clone(),
proxy_id,
));
}
}
@@ -463,7 +459,6 @@ async fn main() -> anyhow::Result<()> {
proxy_listener,
cancellation_token.clone(),
cancellation_handler.clone(),
proxy_id,
));
}
}

View File

@@ -80,24 +80,15 @@ impl ReportableError for CancelError {
impl<P: CancellationPublisher> CancellationHandler<P> {
/// Run async action within an ephemeral session identified by [`CancelKeyData`].
pub(crate) fn get_session(self: Arc<Self>, proxy_id: u16) -> Session<P> {
pub(crate) fn get_session(self: Arc<Self>) -> Session<P> {
// we intentionally generate a random "backend pid" and "secret key" here.
// we use the corresponding u64 as an identifier for the
// actual endpoint+pid+secret for postgres/pgbouncer.
//
// if we forwarded the backend_pid from postgres to the client, there would be a lot
// of overlap between our computes as most pids are small (~100).
let key = loop {
let key_rand: u64 = rand::random::<u64>() & 0x0000_ffff_ffff_ffff;
let backend_pid = ((proxy_id as u32) << 16) | ((key_rand >> 32) as u32) as u32;
let cancel_key = (key_rand as u32) as i32;
let key = CancelKeyData {
backend_pid: (backend_pid as i32),
cancel_key,
};
let key = rand::random();
// Random key collisions are unlikely to happen here, but they're still possible,
// which is why we have to take care not to rewrite an existing key.
@@ -460,7 +451,7 @@ mod tests {
CancellationSource::FromRedis,
));
let session = cancellation_handler.clone().get_session(123);
let session = cancellation_handler.clone().get_session();
assert!(cancellation_handler.contains(&session));
drop(session);
// Check that the session has been dropped.

View File

@@ -16,8 +16,6 @@ use crate::serverless::GlobalConnPoolOptions;
pub use crate::tls::server_config::{configure_tls, TlsConfig};
use crate::types::Host;
use sha2::{Digest, Sha256};
pub struct ProxyConfig {
pub tls_config: Option<TlsConfig>,
pub metric_collection: Option<MetricCollectionConfig>,
@@ -418,23 +416,6 @@ impl FromStr for ConcurrencyLockOptions {
}
}
fn map_u32_to_u8(value: u32) -> u8 {
((value * 31 + 17) % 255) as u8
}
pub fn obfuscated_proxy_id(process_id: u32, region_id: &str) -> u16 {
let process_id = map_u32_to_u8(process_id);
let hash_region_id = Sha256::digest(region_id.as_bytes());
const BASE: u64 = 257;
let combined_region = hash_region_id.iter().enumerate().fold(0u64, |acc, (i, &byte)| {
(acc + (byte as u64 * BASE.pow(i as u32))) % 255
});
let combined_region = (combined_region % 255) as u8;
((combined_region as u16) * 257 + (process_id as u16)) % 65535
}
#[cfg(test)]
mod tests {
use super::*;
@@ -530,12 +511,4 @@ mod tests {
Ok(())
}
#[test]
fn test_proxy_id_obfuscation() {
let process_id = 123;
let region_id = "us-west-2";
let proxy_id = obfuscated_proxy_id(process_id, region_id);
assert_eq!(proxy_id, 0x1f7b);
}
}

View File

@@ -25,7 +25,6 @@ pub async fn task_main(
listener: tokio::net::TcpListener,
cancellation_token: CancellationToken,
cancellation_handler: Arc<CancellationHandlerMain>,
proxy_id: u16,
) -> anyhow::Result<()> {
scopeguard::defer! {
info!("proxy has shut down");
@@ -90,7 +89,6 @@ pub async fn task_main(
peer_addr,
crate::metrics::Protocol::Tcp,
&config.region,
proxy_id,
);
let res = handle_client(
@@ -224,7 +222,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
node.cancel_closure
.set_ip_allowlist(ip_allowlist.unwrap_or_default());
let session = cancellation_handler.get_session(ctx.proxy_id());
let session = cancellation_handler.get_session();
prepare_client_connection(&node, &session, &mut stream).await?;
// Before proxy passing, forward to compute whatever data is left in the

View File

@@ -46,7 +46,6 @@ struct RequestContextInner {
pub(crate) protocol: Protocol,
first_packet: chrono::DateTime<Utc>,
region: &'static str,
pub(crate) proxy_id: u16, // for generating cancel keys per region/process
pub(crate) span: Span,
// filled in as they are discovered
@@ -93,7 +92,6 @@ impl Clone for RequestContext {
protocol: inner.protocol,
first_packet: inner.first_packet,
region: inner.region,
proxy_id: inner.proxy_id,
span: info_span!("background_task"),
project: inner.project,
@@ -126,7 +124,6 @@ impl RequestContext {
conn_info: ConnectionInfo,
protocol: Protocol,
region: &'static str,
proxy_id: u16,
) -> Self {
// TODO: be careful with long lived spans
let span = info_span!(
@@ -144,7 +141,6 @@ impl RequestContext {
protocol,
first_packet: Utc::now(),
region,
proxy_id,
span,
project: None,
@@ -176,7 +172,7 @@ impl RequestContext {
let ip = IpAddr::from([127, 0, 0, 1]);
let addr = SocketAddr::new(ip, 5432);
let conn_info = ConnectionInfo { addr, extra: None };
RequestContext::new(Uuid::now_v7(), conn_info, Protocol::Tcp, "test", 1)
RequestContext::new(Uuid::now_v7(), conn_info, Protocol::Tcp, "test")
}
pub(crate) fn console_application_name(&self) -> String {
@@ -338,10 +334,6 @@ impl RequestContext {
.latency_timer
.success();
}
pub(crate) fn proxy_id(&self) -> u16 {
self.0.try_lock().expect("should not deadlock").proxy_id
}
}
pub(crate) struct LatencyTimerPause<'a> {

View File

@@ -59,7 +59,6 @@ pub async fn task_main(
cancellation_token: CancellationToken,
cancellation_handler: Arc<CancellationHandlerMain>,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
proxy_id: u16,
) -> anyhow::Result<()> {
scopeguard::defer! {
info!("proxy has shut down");
@@ -125,7 +124,6 @@ pub async fn task_main(
conn_info,
crate::metrics::Protocol::Tcp,
&config.region,
proxy_id,
);
let res = handle_client(
@@ -360,7 +358,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
node.cancel_closure
.set_ip_allowlist(ip_allowlist.unwrap_or_default());
let session = cancellation_handler.get_session(ctx.proxy_id());
let session = cancellation_handler.get_session();
prepare_client_connection(&node, &session, &mut stream).await?;
// Before proxy passing, forward to compute whatever data is left in the

View File

@@ -63,7 +63,6 @@ pub async fn task_main(
cancellation_token: CancellationToken,
cancellation_handler: Arc<CancellationHandlerMain>,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
proxy_id: u16,
) -> anyhow::Result<()> {
scopeguard::defer! {
info!("websocket server has shut down");
@@ -199,7 +198,6 @@ pub async fn task_main(
conn,
conn_info,
session_id,
proxy_id,
))
.await;
}
@@ -326,7 +324,6 @@ async fn connection_handler(
conn: AsyncRW,
conn_info: ConnectionInfo,
session_id: uuid::Uuid,
proxy_id: u16,
) {
let session_id = AtomicTake::new(session_id);
@@ -374,7 +371,6 @@ async fn connection_handler(
http_request_token,
endpoint_rate_limiter.clone(),
cancellations,
proxy_id,
)
.in_current_span()
.map_ok_or_else(api_error_into_response, |r| r),
@@ -423,7 +419,6 @@ async fn request_handler(
http_cancellation_token: CancellationToken,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
cancellations: TaskTracker,
proxy_id: u16,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, ApiError> {
let host = request
.headers()
@@ -441,7 +436,6 @@ async fn request_handler(
conn_info,
crate::metrics::Protocol::Ws,
&config.region,
proxy_id,
);
let span = ctx.span();
@@ -479,7 +473,6 @@ async fn request_handler(
conn_info,
crate::metrics::Protocol::Http,
&config.region,
proxy_id,
);
let span = ctx.span();

View File

@@ -102,11 +102,6 @@ impl Client {
self.get(&uri).await
}
pub async fn utilization(&self) -> Result<reqwest::Response> {
let uri = format!("{}/v1/utilization/", self.mgmt_api_endpoint);
self.get(&uri).await
}
async fn get<U: IntoUrl>(&self, uri: U) -> Result<reqwest::Response> {
self.request(Method::GET, uri, ()).await
}

View File

@@ -127,13 +127,6 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
json_response(StatusCode::OK, ())
}
async fn utilization_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;
let global_timelines = get_global_timelines(&request);
let utilization = global_timelines.get_timeline_counts();
json_response(StatusCode::OK, utilization)
}
/// List all (not deleted) timelines.
/// Note: it is possible to do the same with debug_dump.
async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
@@ -627,7 +620,6 @@ pub fn make_router(
failpoints_handler(r, cancel).await
})
})
.get("/v1/uzilization", |r| request_span(r, utilization_handler))
.delete("/v1/tenant/:tenant_id", |r| {
request_span(r, tenant_delete_handler)
})

View File

@@ -13,7 +13,6 @@ use anyhow::{bail, Context, Result};
use camino::Utf8PathBuf;
use camino_tempfile::Utf8TempDir;
use safekeeper_api::membership::Configuration;
use safekeeper_api::models::SafekeeperUtilization;
use safekeeper_api::ServerInfo;
use serde::Serialize;
use std::collections::HashMap;
@@ -417,20 +416,6 @@ impl GlobalTimelines {
.collect()
}
/// Returns statistics about timeline counts
pub fn get_timeline_counts(&self) -> SafekeeperUtilization {
let global_lock = self.state.lock().unwrap();
let timeline_count = global_lock
.timelines
.values()
.filter(|t| match t {
GlobalMapTimeline::CreationInProgress => false,
GlobalMapTimeline::Timeline(t) => !t.is_cancelled(),
})
.count() as u64;
SafekeeperUtilization { timeline_count }
}
/// Returns all timelines belonging to a given tenant. Used for deleting all timelines of a tenant,
/// and that's why it can return cancelled timelines, to retry deleting them.
fn get_all_for_tenant(&self, tenant_id: TenantId) -> Vec<Arc<Timeline>> {

View File

@@ -1,17 +1,11 @@
use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
time::Duration,
};
use std::{sync::Arc, time::Duration};
use pageserver_api::controller_api::ShardSchedulingPolicy;
use rand::seq::SliceRandom;
use rand::thread_rng;
use tokio_util::sync::CancellationToken;
use utils::id::NodeId;
use utils::shard::TenantShardId;
use super::{Node, Scheduler, Service, TenantShard};
use super::Service;
pub struct ChaosInjector {
service: Arc<Service>,
@@ -41,86 +35,50 @@ impl ChaosInjector {
}
}
/// If a shard has a secondary and attached location, then re-assign the secondary to be
/// attached and the attached to be secondary.
///
/// Only modifies tenants if they're in Active scheduling policy.
fn maybe_migrate_to_secondary(
&self,
tenant_shard_id: TenantShardId,
nodes: &Arc<HashMap<NodeId, Node>>,
tenants: &mut BTreeMap<TenantShardId, TenantShard>,
scheduler: &mut Scheduler,
) {
let shard = tenants
.get_mut(&tenant_shard_id)
.expect("Held lock between choosing ID and this get");
if !matches!(shard.get_scheduling_policy(), ShardSchedulingPolicy::Active) {
// Skip non-active scheduling policies, so that a shard with a policy like Pause can
// be pinned without being disrupted by us.
tracing::info!(
"Skipping shard {tenant_shard_id}: scheduling policy is {:?}",
shard.get_scheduling_policy()
);
return;
}
// Pick a secondary to promote
let Some(new_location) = shard
.intent
.get_secondary()
.choose(&mut thread_rng())
.cloned()
else {
tracing::info!(
"Skipping shard {tenant_shard_id}: no secondary location, can't migrate"
);
return;
};
let Some(old_location) = *shard.intent.get_attached() else {
tracing::info!("Skipping shard {tenant_shard_id}: currently has no attached location");
return;
};
tracing::info!("Injecting chaos: migrate {tenant_shard_id} {old_location}->{new_location}");
shard.intent.demote_attached(scheduler, old_location);
shard.intent.promote_attached(scheduler, new_location);
self.service.maybe_reconcile_shard(shard, nodes);
}
async fn inject_chaos(&mut self) {
// Pick some shards to interfere with
let batch_size = 128;
let mut inner = self.service.inner.write().unwrap();
let (nodes, tenants, scheduler) = inner.parts_mut();
let tenant_ids = tenants.keys().cloned().collect::<Vec<_>>();
// Prefer to migrate tenants that are currently outside their home AZ. This avoids the chaos injector
// continuously pushing tenants outside their home AZ: instead, we'll tend to cycle between picking some
// random tenants to move, and then on next chaos iteration moving them back, then picking some new
// random tenants on the next iteration.
let mut victims = Vec::with_capacity(batch_size);
for shard in tenants.values() {
if shard.is_attached_outside_preferred_az(nodes) {
victims.push(shard.tenant_shard_id);
}
if victims.len() >= batch_size {
break;
}
}
let choose_random = batch_size.saturating_sub(victims.len());
tracing::info!("Injecting chaos: found {} shards to migrate back to home AZ, picking {choose_random} random shards to migrate", victims.len());
let random_victims = tenant_ids.choose_multiple(&mut thread_rng(), choose_random);
victims.extend(random_victims);
let victims = tenant_ids.choose_multiple(&mut thread_rng(), batch_size);
for victim in victims {
self.maybe_migrate_to_secondary(victim, nodes, tenants, scheduler);
let shard = tenants
.get_mut(victim)
.expect("Held lock between choosing ID and this get");
if !matches!(shard.get_scheduling_policy(), ShardSchedulingPolicy::Active) {
// Skip non-active scheduling policies, so that a shard with a policy like Pause can
// be pinned without being disrupted by us.
tracing::info!(
"Skipping shard {victim}: scheduling policy is {:?}",
shard.get_scheduling_policy()
);
continue;
}
// Pick a secondary to promote
let Some(new_location) = shard
.intent
.get_secondary()
.choose(&mut thread_rng())
.cloned()
else {
tracing::info!("Skipping shard {victim}: no secondary location, can't migrate");
continue;
};
let Some(old_location) = *shard.intent.get_attached() else {
tracing::info!("Skipping shard {victim}: currently has no attached location");
continue;
};
tracing::info!("Injecting chaos: migrate {victim} {old_location}->{new_location}");
shard.intent.demote_attached(scheduler, old_location);
shard.intent.promote_attached(scheduler, new_location);
self.service.maybe_reconcile_shard(shard, nodes);
}
}
}

View File

@@ -1793,23 +1793,6 @@ impl TenantShard {
}
}
}
/// Returns true if the tenant shard is attached to a node that is outside the preferred AZ.
///
/// If the shard does not have a preferred AZ, returns false.
pub(crate) fn is_attached_outside_preferred_az(&self, nodes: &HashMap<NodeId, Node>) -> bool {
self.intent
.get_attached()
.map(|node_id| {
Some(
nodes
.get(&node_id)
.expect("referenced node exists")
.get_availability_zone_id(),
) == self.intent.preferred_az_id.as_ref()
})
.unwrap_or(false)
}
}
impl Drop for TenantShard {

View File

@@ -1,60 +0,0 @@
# NB: there are benchmarks that double-serve as tests inside the `performance` directory.
import subprocess
from pathlib import Path
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
@pytest.mark.timeout(30) # test takes <20s if pageserver impl is correct
@pytest.mark.parametrize("kind", ["pageserver-stop", "tenant-detach"])
def test_slow_flush(neon_env_builder: NeonEnvBuilder, neon_binpath: Path, kind: str):
def patch_pageserver_toml(config):
config["page_service_pipelining"] = {
"mode": "pipelined",
"max_batch_size": 32,
"execution": "concurrent-futures",
}
neon_env_builder.pageserver_config_override = patch_pageserver_toml
env = neon_env_builder.init_start()
log.info("make flush appear slow")
log.info("sending requests until pageserver accepts no more")
# TODO: extract this into a helper, like subprocess_capture,
# so that we capture the stderr from the helper somewhere.
child = subprocess.Popen(
[
neon_binpath / "test_helper_slow_client_reads",
env.pageserver.connstr(),
str(env.initial_tenant),
str(env.initial_timeline),
],
bufsize=0, # unbuffered
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
)
assert child.stdout is not None
buf = child.stdout.read(1)
if len(buf) != 1:
raise Exception("unexpected EOF")
if buf != b"R":
raise Exception(f"unexpected data: {buf!r}")
log.info("helper reports pageserver accepts no more requests")
log.info(
"assuming pageserver connection handle is in a state where TCP has backpressured pageserver=>client response flush() into userspace"
)
if kind == "pageserver-stop":
log.info("try to shut down the pageserver cleanly")
env.pageserver.stop()
elif kind == "tenant-detach":
log.info("try to shut down the tenant")
env.pageserver.tenant_detach(env.initial_tenant)
else:
raise ValueError(f"unexpected kind: {kind}")
log.info("shutdown did not time out, test passed")

View File

@@ -5,7 +5,7 @@
],
"v16": [
"16.6",
"d674efd776f59d78e8fa1535bd2f95c3e6984fca"
"5f3ef1656925c10c90c8fecfb668048d819912e3"
],
"v15": [
"15.10",