mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-26 06:40:38 +00:00
Compare commits
2 Commits
proxy_id
...
out_of_sha
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d71bece102 | ||
|
|
21725fe377 |
2
.github/workflows/build_and_test.yml
vendored
2
.github/workflows/build_and_test.yml
vendored
@@ -824,7 +824,7 @@ jobs:
|
||||
docker compose -f ./docker-compose/docker-compose.yml down
|
||||
|
||||
promote-images-dev:
|
||||
needs: [ check-permissions, tag, vm-compute-node-image, neon-image ]
|
||||
needs: [ check-permissions, tag, vm-compute-node-image ]
|
||||
runs-on: ubuntu-22.04
|
||||
|
||||
permissions:
|
||||
|
||||
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -1312,7 +1312,6 @@ dependencies = [
|
||||
"tracing-utils",
|
||||
"url",
|
||||
"utils",
|
||||
"uuid",
|
||||
"vm_monitor",
|
||||
"workspace_hack",
|
||||
"zstd",
|
||||
|
||||
@@ -995,50 +995,24 @@ RUN wget https://github.com/kelvich/pg_tiktoken/archive/9118dd4549b7d8c0bbc98e04
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "pg-pgx-ulid-build"
|
||||
# Compile "pgx_ulid" extension for v16 and below
|
||||
# Compile "pgx_ulid" extension
|
||||
#
|
||||
#########################################################################################
|
||||
|
||||
FROM rust-extensions-build AS pg-pgx-ulid-build
|
||||
ARG PG_VERSION
|
||||
|
||||
RUN case "${PG_VERSION}" in \
|
||||
"v14" | "v15" | "v16") \
|
||||
;; \
|
||||
*) \
|
||||
echo "skipping the version of pgx_ulid for $PG_VERSION" && exit 0 \
|
||||
;; \
|
||||
# doesn't support v17 yet
|
||||
# https://github.com/pksunkara/pgx_ulid/pull/52
|
||||
RUN case "${PG_VERSION}" in "v17") \
|
||||
echo "pgx_ulid does not support pg17 as of the latest version (0.1.5)" && exit 0;; \
|
||||
esac && \
|
||||
wget https://github.com/pksunkara/pgx_ulid/archive/refs/tags/v0.1.5.tar.gz -O pgx_ulid.tar.gz && \
|
||||
echo "9d1659a2da65af0133d5451c454de31b37364e3502087dadf579f790bc8bef17 pgx_ulid.tar.gz" | sha256sum --check && \
|
||||
echo "9d1659a2da65af0133d5451c454de31b37364e3502087dadf579f790bc8bef17 pgx_ulid.tar.gz" | sha256sum --check && \
|
||||
mkdir pgx_ulid-src && cd pgx_ulid-src && tar xzf ../pgx_ulid.tar.gz --strip-components=1 -C . && \
|
||||
sed -i 's/pgrx = "^0.11.2"/pgrx = { version = "0.11.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
|
||||
sed -i 's/pgrx = "^0.11.2"/pgrx = { version = "=0.11.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
|
||||
cargo pgrx install --release && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/ulid.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "pg-pgx-ulid-pgrx12-build"
|
||||
# Compile "pgx_ulid" extension for v17 and up
|
||||
#
|
||||
#########################################################################################
|
||||
|
||||
FROM rust-extensions-build-pgrx12 AS pg-pgx-ulid-pgrx12-build
|
||||
ARG PG_VERSION
|
||||
|
||||
RUN case "${PG_VERSION}" in \
|
||||
"v17") \
|
||||
;; \
|
||||
*) \
|
||||
echo "skipping the version of pgx_ulid for $PG_VERSION" && exit 0 \
|
||||
;; \
|
||||
esac && \
|
||||
wget https://github.com/pksunkara/pgx_ulid/archive/refs/tags/v0.2.0.tar.gz -O pgx_ulid.tar.gz && \
|
||||
echo "cef6a9a2e5e7bd1a10a18989286586ee9e6c1c06005a4055cff190de41bf3e9f pgx_ulid.tar.gz" | sha256sum --check && \
|
||||
mkdir pgx_ulid-src && cd pgx_ulid-src && tar xzf ../pgx_ulid.tar.gz --strip-components=1 -C . && \
|
||||
sed -i 's/pgrx = "^0.12.7"/pgrx = { version = "0.12.9", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
|
||||
cargo pgrx install --release && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgx_ulid.control
|
||||
echo "trusted = true" >> /usr/local/pgsql/share/extension/ulid.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
@@ -1183,7 +1157,6 @@ COPY --from=timescaledb-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-hint-plan-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-cron-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-pgx-ulid-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-pgx-ulid-pgrx12-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-session-jwt-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=rdkit-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-uuidv7-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
@@ -51,7 +51,6 @@ tracing-subscriber.workspace = true
|
||||
tracing-utils.workspace = true
|
||||
thiserror.workspace = true
|
||||
url.workspace = true
|
||||
uuid.workspace = true
|
||||
prometheus.workspace = true
|
||||
|
||||
postgres_initdb.workspace = true
|
||||
|
||||
@@ -1,14 +1,15 @@
|
||||
use std::{
|
||||
net::{IpAddr, Ipv6Addr, SocketAddr},
|
||||
sync::Arc,
|
||||
sync::{
|
||||
atomic::{AtomicU64, Ordering},
|
||||
Arc,
|
||||
},
|
||||
thread,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use anyhow::Result;
|
||||
use axum::{
|
||||
extract::Request,
|
||||
middleware::{self, Next},
|
||||
response::{IntoResponse, Response},
|
||||
routing::{get, post},
|
||||
Router,
|
||||
@@ -16,9 +17,11 @@ use axum::{
|
||||
use http::StatusCode;
|
||||
use tokio::net::TcpListener;
|
||||
use tower::ServiceBuilder;
|
||||
use tower_http::{request_id::PropagateRequestIdLayer, trace::TraceLayer};
|
||||
use tower_http::{
|
||||
request_id::{MakeRequestId, PropagateRequestIdLayer, RequestId, SetRequestIdLayer},
|
||||
trace::TraceLayer,
|
||||
};
|
||||
use tracing::{debug, error, info, Span};
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::routes::{
|
||||
check_writability, configure, database_schema, dbs_and_roles, extension_server, extensions,
|
||||
@@ -31,24 +34,30 @@ async fn handle_404() -> Response {
|
||||
StatusCode::NOT_FOUND.into_response()
|
||||
}
|
||||
|
||||
const X_REQUEST_ID: &str = "x-request-id";
|
||||
#[derive(Clone, Default)]
|
||||
struct ComputeMakeRequestId(Arc<AtomicU64>);
|
||||
|
||||
/// This middleware function allows compute_ctl to generate its own request ID
|
||||
/// if one isn't supplied. The control plane will always send one as a UUID. The
|
||||
/// neon Postgres extension on the other hand does not send one.
|
||||
async fn maybe_add_request_id_header(mut request: Request, next: Next) -> Response {
|
||||
let headers = request.headers_mut();
|
||||
impl MakeRequestId for ComputeMakeRequestId {
|
||||
fn make_request_id<B>(
|
||||
&mut self,
|
||||
_request: &http::Request<B>,
|
||||
) -> Option<tower_http::request_id::RequestId> {
|
||||
let request_id = self
|
||||
.0
|
||||
.fetch_add(1, Ordering::SeqCst)
|
||||
.to_string()
|
||||
.parse()
|
||||
.unwrap();
|
||||
|
||||
if headers.get(X_REQUEST_ID).is_none() {
|
||||
headers.append(X_REQUEST_ID, Uuid::new_v4().to_string().parse().unwrap());
|
||||
Some(RequestId::new(request_id))
|
||||
}
|
||||
|
||||
next.run(request).await
|
||||
}
|
||||
|
||||
/// Run the HTTP server and wait on it forever.
|
||||
#[tokio::main]
|
||||
async fn serve(port: u16, compute: Arc<ComputeNode>) {
|
||||
const X_REQUEST_ID: &str = "x-request-id";
|
||||
|
||||
let mut app = Router::new()
|
||||
.route("/check_writability", post(check_writability::is_writable))
|
||||
.route("/configure", post(configure::configure))
|
||||
@@ -73,8 +82,9 @@ async fn serve(port: u16, compute: Arc<ComputeNode>) {
|
||||
.fallback(handle_404)
|
||||
.layer(
|
||||
ServiceBuilder::new()
|
||||
// Add this middleware since we assume the request ID exists
|
||||
.layer(middleware::from_fn(maybe_add_request_id_header))
|
||||
.layer(SetRequestIdLayer::x_request_id(
|
||||
ComputeMakeRequestId::default(),
|
||||
))
|
||||
.layer(
|
||||
TraceLayer::new_for_http()
|
||||
.on_request(|request: &http::Request<_>, _span: &Span| {
|
||||
|
||||
@@ -33,6 +33,7 @@ use crate::{
|
||||
reltag::RelTag,
|
||||
shard::{ShardCount, ShardStripeSize, TenantShardId},
|
||||
};
|
||||
use anyhow::bail;
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
|
||||
/// The state of a tenant in this pageserver.
|
||||
@@ -1399,8 +1400,6 @@ pub enum PagestreamFeMessage {
|
||||
GetPage(PagestreamGetPageRequest),
|
||||
DbSize(PagestreamDbSizeRequest),
|
||||
GetSlruSegment(PagestreamGetSlruSegmentRequest),
|
||||
#[cfg(feature = "testing")]
|
||||
Test(PagestreamTestRequest),
|
||||
}
|
||||
|
||||
// Wrapped in libpq CopyData
|
||||
@@ -1412,22 +1411,6 @@ pub enum PagestreamBeMessage {
|
||||
Error(PagestreamErrorResponse),
|
||||
DbSize(PagestreamDbSizeResponse),
|
||||
GetSlruSegment(PagestreamGetSlruSegmentResponse),
|
||||
#[cfg(feature = "testing")]
|
||||
Test(PagestreamTestResponse),
|
||||
}
|
||||
|
||||
// Keep in sync with `pagestore_client.h`
|
||||
#[repr(u8)]
|
||||
enum PagestreamFeMessageTag {
|
||||
Exists = 0,
|
||||
Nblocks = 1,
|
||||
GetPage = 2,
|
||||
DbSize = 3,
|
||||
GetSlruSegment = 4,
|
||||
/* future tags above this line */
|
||||
/// For testing purposes, not available in production.
|
||||
#[cfg(feature = "testing")]
|
||||
Test = 99,
|
||||
}
|
||||
|
||||
// Keep in sync with `pagestore_client.h`
|
||||
@@ -1439,28 +1422,7 @@ enum PagestreamBeMessageTag {
|
||||
Error = 103,
|
||||
DbSize = 104,
|
||||
GetSlruSegment = 105,
|
||||
/* future tags above this line */
|
||||
/// For testing purposes, not available in production.
|
||||
#[cfg(feature = "testing")]
|
||||
Test = 199,
|
||||
}
|
||||
|
||||
impl TryFrom<u8> for PagestreamFeMessageTag {
|
||||
type Error = u8;
|
||||
fn try_from(value: u8) -> Result<Self, u8> {
|
||||
match value {
|
||||
0 => Ok(PagestreamFeMessageTag::Exists),
|
||||
1 => Ok(PagestreamFeMessageTag::Nblocks),
|
||||
2 => Ok(PagestreamFeMessageTag::GetPage),
|
||||
3 => Ok(PagestreamFeMessageTag::DbSize),
|
||||
4 => Ok(PagestreamFeMessageTag::GetSlruSegment),
|
||||
#[cfg(feature = "testing")]
|
||||
99 => Ok(PagestreamFeMessageTag::Test),
|
||||
_ => Err(value),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<u8> for PagestreamBeMessageTag {
|
||||
type Error = u8;
|
||||
fn try_from(value: u8) -> Result<Self, u8> {
|
||||
@@ -1471,8 +1433,6 @@ impl TryFrom<u8> for PagestreamBeMessageTag {
|
||||
103 => Ok(PagestreamBeMessageTag::Error),
|
||||
104 => Ok(PagestreamBeMessageTag::DbSize),
|
||||
105 => Ok(PagestreamBeMessageTag::GetSlruSegment),
|
||||
#[cfg(feature = "testing")]
|
||||
199 => Ok(PagestreamBeMessageTag::Test),
|
||||
_ => Err(value),
|
||||
}
|
||||
}
|
||||
@@ -1590,20 +1550,6 @@ pub struct PagestreamDbSizeResponse {
|
||||
pub db_size: i64,
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
#[derive(Debug, PartialEq, Eq, Clone)]
|
||||
pub struct PagestreamTestRequest {
|
||||
pub hdr: PagestreamRequest,
|
||||
pub batch_key: u64,
|
||||
pub message: String,
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamTestResponse {
|
||||
pub req: PagestreamTestRequest,
|
||||
}
|
||||
|
||||
// This is a cut-down version of TenantHistorySize from the pageserver crate, omitting fields
|
||||
// that require pageserver-internal types. It is sufficient to get the total size.
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
@@ -1623,7 +1569,7 @@ impl PagestreamFeMessage {
|
||||
|
||||
match self {
|
||||
Self::Exists(req) => {
|
||||
bytes.put_u8(PagestreamFeMessageTag::Exists as u8);
|
||||
bytes.put_u8(0);
|
||||
bytes.put_u64(req.hdr.reqid);
|
||||
bytes.put_u64(req.hdr.request_lsn.0);
|
||||
bytes.put_u64(req.hdr.not_modified_since.0);
|
||||
@@ -1634,7 +1580,7 @@ impl PagestreamFeMessage {
|
||||
}
|
||||
|
||||
Self::Nblocks(req) => {
|
||||
bytes.put_u8(PagestreamFeMessageTag::Nblocks as u8);
|
||||
bytes.put_u8(1);
|
||||
bytes.put_u64(req.hdr.reqid);
|
||||
bytes.put_u64(req.hdr.request_lsn.0);
|
||||
bytes.put_u64(req.hdr.not_modified_since.0);
|
||||
@@ -1645,7 +1591,7 @@ impl PagestreamFeMessage {
|
||||
}
|
||||
|
||||
Self::GetPage(req) => {
|
||||
bytes.put_u8(PagestreamFeMessageTag::GetPage as u8);
|
||||
bytes.put_u8(2);
|
||||
bytes.put_u64(req.hdr.reqid);
|
||||
bytes.put_u64(req.hdr.request_lsn.0);
|
||||
bytes.put_u64(req.hdr.not_modified_since.0);
|
||||
@@ -1657,7 +1603,7 @@ impl PagestreamFeMessage {
|
||||
}
|
||||
|
||||
Self::DbSize(req) => {
|
||||
bytes.put_u8(PagestreamFeMessageTag::DbSize as u8);
|
||||
bytes.put_u8(3);
|
||||
bytes.put_u64(req.hdr.reqid);
|
||||
bytes.put_u64(req.hdr.request_lsn.0);
|
||||
bytes.put_u64(req.hdr.not_modified_since.0);
|
||||
@@ -1665,24 +1611,13 @@ impl PagestreamFeMessage {
|
||||
}
|
||||
|
||||
Self::GetSlruSegment(req) => {
|
||||
bytes.put_u8(PagestreamFeMessageTag::GetSlruSegment as u8);
|
||||
bytes.put_u8(4);
|
||||
bytes.put_u64(req.hdr.reqid);
|
||||
bytes.put_u64(req.hdr.request_lsn.0);
|
||||
bytes.put_u64(req.hdr.not_modified_since.0);
|
||||
bytes.put_u8(req.kind);
|
||||
bytes.put_u32(req.segno);
|
||||
}
|
||||
#[cfg(feature = "testing")]
|
||||
Self::Test(req) => {
|
||||
bytes.put_u8(PagestreamFeMessageTag::Test as u8);
|
||||
bytes.put_u64(req.hdr.reqid);
|
||||
bytes.put_u64(req.hdr.request_lsn.0);
|
||||
bytes.put_u64(req.hdr.not_modified_since.0);
|
||||
bytes.put_u64(req.batch_key);
|
||||
let message = req.message.as_bytes();
|
||||
bytes.put_u64(message.len() as u64);
|
||||
bytes.put_slice(message);
|
||||
}
|
||||
}
|
||||
|
||||
bytes.into()
|
||||
@@ -1710,66 +1645,56 @@ impl PagestreamFeMessage {
|
||||
),
|
||||
};
|
||||
|
||||
match PagestreamFeMessageTag::try_from(msg_tag)
|
||||
.map_err(|tag: u8| anyhow::anyhow!("invalid tag {tag}"))?
|
||||
{
|
||||
PagestreamFeMessageTag::Exists => {
|
||||
Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid,
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
},
|
||||
rel: RelTag {
|
||||
spcnode: body.read_u32::<BigEndian>()?,
|
||||
dbnode: body.read_u32::<BigEndian>()?,
|
||||
relnode: body.read_u32::<BigEndian>()?,
|
||||
forknum: body.read_u8()?,
|
||||
},
|
||||
}))
|
||||
}
|
||||
PagestreamFeMessageTag::Nblocks => {
|
||||
Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid,
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
},
|
||||
rel: RelTag {
|
||||
spcnode: body.read_u32::<BigEndian>()?,
|
||||
dbnode: body.read_u32::<BigEndian>()?,
|
||||
relnode: body.read_u32::<BigEndian>()?,
|
||||
forknum: body.read_u8()?,
|
||||
},
|
||||
}))
|
||||
}
|
||||
PagestreamFeMessageTag::GetPage => {
|
||||
Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid,
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
},
|
||||
rel: RelTag {
|
||||
spcnode: body.read_u32::<BigEndian>()?,
|
||||
dbnode: body.read_u32::<BigEndian>()?,
|
||||
relnode: body.read_u32::<BigEndian>()?,
|
||||
forknum: body.read_u8()?,
|
||||
},
|
||||
blkno: body.read_u32::<BigEndian>()?,
|
||||
}))
|
||||
}
|
||||
PagestreamFeMessageTag::DbSize => {
|
||||
Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid,
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
},
|
||||
match msg_tag {
|
||||
0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid,
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
},
|
||||
rel: RelTag {
|
||||
spcnode: body.read_u32::<BigEndian>()?,
|
||||
dbnode: body.read_u32::<BigEndian>()?,
|
||||
}))
|
||||
}
|
||||
PagestreamFeMessageTag::GetSlruSegment => Ok(PagestreamFeMessage::GetSlruSegment(
|
||||
relnode: body.read_u32::<BigEndian>()?,
|
||||
forknum: body.read_u8()?,
|
||||
},
|
||||
})),
|
||||
1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid,
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
},
|
||||
rel: RelTag {
|
||||
spcnode: body.read_u32::<BigEndian>()?,
|
||||
dbnode: body.read_u32::<BigEndian>()?,
|
||||
relnode: body.read_u32::<BigEndian>()?,
|
||||
forknum: body.read_u8()?,
|
||||
},
|
||||
})),
|
||||
2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid,
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
},
|
||||
rel: RelTag {
|
||||
spcnode: body.read_u32::<BigEndian>()?,
|
||||
dbnode: body.read_u32::<BigEndian>()?,
|
||||
relnode: body.read_u32::<BigEndian>()?,
|
||||
forknum: body.read_u8()?,
|
||||
},
|
||||
blkno: body.read_u32::<BigEndian>()?,
|
||||
})),
|
||||
3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid,
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
},
|
||||
dbnode: body.read_u32::<BigEndian>()?,
|
||||
})),
|
||||
4 => Ok(PagestreamFeMessage::GetSlruSegment(
|
||||
PagestreamGetSlruSegmentRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid,
|
||||
@@ -1780,21 +1705,7 @@ impl PagestreamFeMessage {
|
||||
segno: body.read_u32::<BigEndian>()?,
|
||||
},
|
||||
)),
|
||||
#[cfg(feature = "testing")]
|
||||
PagestreamFeMessageTag::Test => Ok(PagestreamFeMessage::Test(PagestreamTestRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid,
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
},
|
||||
batch_key: body.read_u64::<BigEndian>()?,
|
||||
message: {
|
||||
let len = body.read_u64::<BigEndian>()?;
|
||||
let mut buf = vec![0; len as usize];
|
||||
body.read_exact(&mut buf)?;
|
||||
String::from_utf8(buf)?
|
||||
},
|
||||
})),
|
||||
_ => bail!("unknown smgr message tag: {:?}", msg_tag),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1837,15 +1748,6 @@ impl PagestreamBeMessage {
|
||||
bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
|
||||
bytes.put(&resp.segment[..]);
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
Self::Test(resp) => {
|
||||
bytes.put_u8(Tag::Test as u8);
|
||||
bytes.put_u64(resp.req.batch_key);
|
||||
let message = resp.req.message.as_bytes();
|
||||
bytes.put_u64(message.len() as u64);
|
||||
bytes.put_slice(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
PagestreamProtocolVersion::V3 => {
|
||||
@@ -1914,18 +1816,6 @@ impl PagestreamBeMessage {
|
||||
bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
|
||||
bytes.put(&resp.segment[..]);
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
Self::Test(resp) => {
|
||||
bytes.put_u8(Tag::Test as u8);
|
||||
bytes.put_u64(resp.req.hdr.reqid);
|
||||
bytes.put_u64(resp.req.hdr.request_lsn.0);
|
||||
bytes.put_u64(resp.req.hdr.not_modified_since.0);
|
||||
bytes.put_u64(resp.req.batch_key);
|
||||
let message = resp.req.message.as_bytes();
|
||||
bytes.put_u64(message.len() as u64);
|
||||
bytes.put_slice(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2068,28 +1958,6 @@ impl PagestreamBeMessage {
|
||||
segment: segment.into(),
|
||||
})
|
||||
}
|
||||
#[cfg(feature = "testing")]
|
||||
Tag::Test => {
|
||||
let reqid = buf.read_u64::<BigEndian>()?;
|
||||
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
|
||||
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
|
||||
let batch_key = buf.read_u64::<BigEndian>()?;
|
||||
let len = buf.read_u64::<BigEndian>()?;
|
||||
let mut msg = vec![0; len as usize];
|
||||
buf.read_exact(&mut msg)?;
|
||||
let message = String::from_utf8(msg)?;
|
||||
Self::Test(PagestreamTestResponse {
|
||||
req: PagestreamTestRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid,
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
},
|
||||
batch_key,
|
||||
message,
|
||||
},
|
||||
})
|
||||
}
|
||||
};
|
||||
let remaining = buf.into_inner();
|
||||
if !remaining.is_empty() {
|
||||
@@ -2109,8 +1977,6 @@ impl PagestreamBeMessage {
|
||||
Self::Error(_) => "Error",
|
||||
Self::DbSize(_) => "DbSize",
|
||||
Self::GetSlruSegment(_) => "GetSlruSegment",
|
||||
#[cfg(feature = "testing")]
|
||||
Self::Test(_) => "Test",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -277,8 +277,3 @@ pub struct TimelineTermBumpResponse {
|
||||
pub previous_term: u64,
|
||||
pub current_term: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct SafekeeperUtilization {
|
||||
pub timeline_count: u64,
|
||||
}
|
||||
|
||||
@@ -8,7 +8,7 @@ license.workspace = true
|
||||
default = []
|
||||
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
|
||||
# which adds some runtime cost to run tests on outage conditions
|
||||
testing = ["fail/failpoints", "pageserver_api/testing", "wal_decoder/testing", "pageserver_client/testing"]
|
||||
testing = ["fail/failpoints", "pageserver_api/testing", "wal_decoder/testing"]
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
@@ -114,7 +114,3 @@ harness = false
|
||||
[[bench]]
|
||||
name = "upload_queue"
|
||||
harness = false
|
||||
|
||||
[[bin]]
|
||||
name = "test_helper_slow_client_reads"
|
||||
required-features = [ "testing" ]
|
||||
|
||||
@@ -4,9 +4,6 @@ version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[features]
|
||||
testing = [ "pageserver_api/testing" ]
|
||||
|
||||
[dependencies]
|
||||
pageserver_api.workspace = true
|
||||
thiserror.workspace = true
|
||||
|
||||
@@ -1,9 +1,6 @@
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::pin::Pin;
|
||||
|
||||
use futures::{
|
||||
stream::{SplitSink, SplitStream},
|
||||
SinkExt, StreamExt,
|
||||
};
|
||||
use futures::SinkExt;
|
||||
use pageserver_api::{
|
||||
models::{
|
||||
PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest,
|
||||
@@ -13,6 +10,7 @@ use pageserver_api::{
|
||||
};
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_postgres::CopyOutStream;
|
||||
use tokio_stream::StreamExt;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::{
|
||||
id::{TenantId, TimelineId},
|
||||
@@ -64,28 +62,15 @@ impl Client {
|
||||
.client
|
||||
.copy_both_simple(&format!("pagestream_v3 {tenant_id} {timeline_id}"))
|
||||
.await?;
|
||||
let (sink, stream) = copy_both.split(); // TODO: actually support splitting of the CopyBothDuplex so the lock inside this split adaptor goes away.
|
||||
let Client {
|
||||
cancel_on_client_drop,
|
||||
conn_task,
|
||||
client: _,
|
||||
} = self;
|
||||
let shared = Arc::new(Mutex::new(PagestreamShared::ConnTaskRunning(
|
||||
ConnTaskRunning {
|
||||
cancel_on_client_drop,
|
||||
conn_task,
|
||||
},
|
||||
)));
|
||||
Ok(PagestreamClient {
|
||||
sink: PagestreamSender {
|
||||
shared: shared.clone(),
|
||||
sink,
|
||||
},
|
||||
stream: PagestreamReceiver {
|
||||
shared: shared.clone(),
|
||||
stream,
|
||||
},
|
||||
shared,
|
||||
copy_both: Box::pin(copy_both),
|
||||
conn_task,
|
||||
cancel_on_client_drop,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -112,28 +97,7 @@ impl Client {
|
||||
|
||||
/// Create using [`Client::pagestream`].
|
||||
pub struct PagestreamClient {
|
||||
shared: Arc<Mutex<PagestreamShared>>,
|
||||
sink: PagestreamSender,
|
||||
stream: PagestreamReceiver,
|
||||
}
|
||||
|
||||
pub struct PagestreamSender {
|
||||
#[allow(dead_code)]
|
||||
shared: Arc<Mutex<PagestreamShared>>,
|
||||
sink: SplitSink<tokio_postgres::CopyBothDuplex<bytes::Bytes>, bytes::Bytes>,
|
||||
}
|
||||
|
||||
pub struct PagestreamReceiver {
|
||||
#[allow(dead_code)]
|
||||
shared: Arc<Mutex<PagestreamShared>>,
|
||||
stream: SplitStream<tokio_postgres::CopyBothDuplex<bytes::Bytes>>,
|
||||
}
|
||||
|
||||
enum PagestreamShared {
|
||||
ConnTaskRunning(ConnTaskRunning),
|
||||
ConnTaskCancelledJoinHandleReturnedOrDropped,
|
||||
}
|
||||
struct ConnTaskRunning {
|
||||
copy_both: Pin<Box<tokio_postgres::CopyBothDuplex<bytes::Bytes>>>,
|
||||
cancel_on_client_drop: Option<tokio_util::sync::DropGuard>,
|
||||
conn_task: JoinHandle<()>,
|
||||
}
|
||||
@@ -146,11 +110,11 @@ pub struct RelTagBlockNo {
|
||||
impl PagestreamClient {
|
||||
pub async fn shutdown(self) {
|
||||
let Self {
|
||||
shared,
|
||||
sink,
|
||||
stream,
|
||||
} = { self };
|
||||
// The `copy_both` split into `sink` and `stream` contains internal channel sender, the receiver of which is polled by `conn_task`.
|
||||
copy_both,
|
||||
cancel_on_client_drop: cancel_conn_task,
|
||||
conn_task,
|
||||
} = self;
|
||||
// The `copy_both` contains internal channel sender, the receiver of which is polled by `conn_task`.
|
||||
// When `conn_task` observes the sender has been dropped, it sends a `FeMessage::CopyFail` into the connection.
|
||||
// (see https://github.com/neondatabase/rust-postgres/blob/2005bf79573b8add5cf205b52a2b208e356cc8b0/tokio-postgres/src/copy_both.rs#L56).
|
||||
//
|
||||
@@ -167,77 +131,27 @@ impl PagestreamClient {
|
||||
//
|
||||
// NB: page_service doesn't have a use case to exit the `pagestream` mode currently.
|
||||
// => https://github.com/neondatabase/neon/issues/6390
|
||||
let ConnTaskRunning {
|
||||
cancel_on_client_drop,
|
||||
conn_task,
|
||||
} = {
|
||||
let mut guard = shared.lock().unwrap();
|
||||
match std::mem::replace(
|
||||
&mut *guard,
|
||||
PagestreamShared::ConnTaskCancelledJoinHandleReturnedOrDropped,
|
||||
) {
|
||||
PagestreamShared::ConnTaskRunning(conn_task_running) => conn_task_running,
|
||||
PagestreamShared::ConnTaskCancelledJoinHandleReturnedOrDropped => unreachable!(),
|
||||
}
|
||||
};
|
||||
let _ = cancel_on_client_drop.unwrap();
|
||||
let _ = cancel_conn_task.unwrap();
|
||||
conn_task.await.unwrap();
|
||||
|
||||
// Now drop the split copy_both.
|
||||
drop(sink);
|
||||
drop(stream);
|
||||
}
|
||||
|
||||
pub fn split(self) -> (PagestreamSender, PagestreamReceiver) {
|
||||
let Self {
|
||||
shared: _,
|
||||
sink,
|
||||
stream,
|
||||
} = self;
|
||||
(sink, stream)
|
||||
drop(copy_both);
|
||||
}
|
||||
|
||||
pub async fn getpage(
|
||||
&mut self,
|
||||
req: PagestreamGetPageRequest,
|
||||
) -> anyhow::Result<PagestreamGetPageResponse> {
|
||||
self.getpage_send(req).await?;
|
||||
self.getpage_recv().await
|
||||
}
|
||||
let req = PagestreamFeMessage::GetPage(req);
|
||||
let req: bytes::Bytes = req.serialize();
|
||||
// let mut req = tokio_util::io::ReaderStream::new(&req);
|
||||
let mut req = tokio_stream::once(Ok(req));
|
||||
|
||||
pub async fn getpage_send(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()> {
|
||||
self.sink.getpage_send(req).await
|
||||
}
|
||||
self.copy_both.send_all(&mut req).await?;
|
||||
|
||||
pub async fn getpage_recv(&mut self) -> anyhow::Result<PagestreamGetPageResponse> {
|
||||
self.stream.getpage_recv().await
|
||||
}
|
||||
}
|
||||
|
||||
impl PagestreamSender {
|
||||
// TODO: maybe make this impl Sink instead for better composability?
|
||||
pub async fn send(&mut self, msg: PagestreamFeMessage) -> anyhow::Result<()> {
|
||||
let msg = msg.serialize();
|
||||
self.sink.send_all(&mut tokio_stream::once(Ok(msg))).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn getpage_send(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()> {
|
||||
self.send(PagestreamFeMessage::GetPage(req)).await
|
||||
}
|
||||
}
|
||||
|
||||
impl PagestreamReceiver {
|
||||
// TODO: maybe make this impl Stream instead for better composability?
|
||||
pub async fn recv(&mut self) -> anyhow::Result<PagestreamBeMessage> {
|
||||
let next: Option<Result<bytes::Bytes, _>> = self.stream.next().await;
|
||||
let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await;
|
||||
let next: bytes::Bytes = next.unwrap()?;
|
||||
PagestreamBeMessage::deserialize(next)
|
||||
}
|
||||
|
||||
pub async fn getpage_recv(&mut self) -> anyhow::Result<PagestreamGetPageResponse> {
|
||||
let next: PagestreamBeMessage = self.recv().await?;
|
||||
match next {
|
||||
let msg = PagestreamBeMessage::deserialize(next)?;
|
||||
match msg {
|
||||
PagestreamBeMessage::GetPage(p) => Ok(p),
|
||||
PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e),
|
||||
PagestreamBeMessage::Exists(_)
|
||||
@@ -246,14 +160,7 @@ impl PagestreamReceiver {
|
||||
| PagestreamBeMessage::GetSlruSegment(_) => {
|
||||
anyhow::bail!(
|
||||
"unexpected be message kind in response to getpage request: {}",
|
||||
next.kind()
|
||||
)
|
||||
}
|
||||
#[cfg(feature = "testing")]
|
||||
PagestreamBeMessage::Test(_) => {
|
||||
anyhow::bail!(
|
||||
"unexpected be message kind in response to getpage request: {}",
|
||||
next.kind()
|
||||
msg.kind()
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,65 +0,0 @@
|
||||
use std::{
|
||||
io::{stdin, stdout, Read, Write},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use clap::Parser;
|
||||
use pageserver_api::models::{PagestreamRequest, PagestreamTestRequest};
|
||||
use utils::{
|
||||
id::{TenantId, TimelineId},
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
#[derive(clap::Parser)]
|
||||
struct Args {
|
||||
connstr: String,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let Args {
|
||||
connstr,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
} = Args::parse();
|
||||
let client = pageserver_client::page_service::Client::new(connstr).await?;
|
||||
let client = client.pagestream(tenant_id, timeline_id).await?;
|
||||
let (mut sender, _receiver) = client.split();
|
||||
|
||||
eprintln!("filling the pipe");
|
||||
let mut msg = 0;
|
||||
loop {
|
||||
msg += 1;
|
||||
let fut = sender.send(pageserver_api::models::PagestreamFeMessage::Test(
|
||||
PagestreamTestRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid: 0,
|
||||
request_lsn: Lsn(23),
|
||||
not_modified_since: Lsn(23),
|
||||
},
|
||||
batch_key: 42,
|
||||
message: format!("message {}", msg),
|
||||
},
|
||||
));
|
||||
let Ok(res) = tokio::time::timeout(Duration::from_secs(10), fut).await else {
|
||||
eprintln!("pipe seems full");
|
||||
break;
|
||||
};
|
||||
let _: () = res?;
|
||||
}
|
||||
|
||||
let n = stdout().write(b"R")?;
|
||||
assert_eq!(n, 1);
|
||||
stdout().flush()?;
|
||||
|
||||
eprintln!("waiting for signal to tell us to exit");
|
||||
|
||||
let mut buf = [0u8; 1];
|
||||
stdin().read_exact(&mut buf)?;
|
||||
|
||||
eprintln!("termination signal received, exiting");
|
||||
|
||||
anyhow::Ok(())
|
||||
}
|
||||
@@ -100,32 +100,6 @@ pub(crate) static VEC_READ_NUM_LAYERS_VISITED: Lazy<Histogram> = Lazy::new(|| {
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static CONCURRENT_INITDBS: Lazy<UIntGauge> = Lazy::new(|| {
|
||||
register_uint_gauge!(
|
||||
"pageserver_concurrent_initdb",
|
||||
"Number of initdb processes running"
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static INITDB_SEMAPHORE_ACQUISITION_TIME: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"pageserver_initdb_semaphore_seconds_global",
|
||||
"Time spent getting a permit from the global initdb semaphore",
|
||||
STORAGE_OP_BUCKETS.into()
|
||||
)
|
||||
.expect("failed to define metric")
|
||||
});
|
||||
|
||||
pub(crate) static INITDB_RUN_TIME: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"pageserver_initdb_seconds_global",
|
||||
"Time spent performing initdb",
|
||||
STORAGE_OP_BUCKETS.into()
|
||||
)
|
||||
.expect("failed to define metric")
|
||||
});
|
||||
|
||||
// Metrics collected on operations on the storage repository.
|
||||
#[derive(
|
||||
Clone, Copy, enum_map::Enum, strum_macros::EnumString, strum_macros::Display, IntoStaticStr,
|
||||
@@ -1489,8 +1463,6 @@ pub enum SmgrQueryType {
|
||||
GetPageAtLsn,
|
||||
GetDbSize,
|
||||
GetSlruSegment,
|
||||
#[cfg(feature = "testing")]
|
||||
Test,
|
||||
}
|
||||
|
||||
pub(crate) struct SmgrQueryTimePerTimeline {
|
||||
|
||||
@@ -555,52 +555,37 @@ struct BatchedGetPageRequest {
|
||||
timer: SmgrOpTimer,
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
struct BatchedTestRequest {
|
||||
req: models::PagestreamTestRequest,
|
||||
timer: SmgrOpTimer,
|
||||
}
|
||||
|
||||
/// NB: we only hold [`timeline::handle::WeakHandle`] inside this enum,
|
||||
/// so that we don't keep the [`Timeline::gate`] open while the batch
|
||||
/// is being built up inside the [`spsc_fold`] (pagestream pipelining).
|
||||
enum BatchedFeMessage {
|
||||
Exists {
|
||||
span: Span,
|
||||
timer: SmgrOpTimer,
|
||||
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
|
||||
shard: timeline::handle::Handle<TenantManagerTypes>,
|
||||
req: models::PagestreamExistsRequest,
|
||||
},
|
||||
Nblocks {
|
||||
span: Span,
|
||||
timer: SmgrOpTimer,
|
||||
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
|
||||
shard: timeline::handle::Handle<TenantManagerTypes>,
|
||||
req: models::PagestreamNblocksRequest,
|
||||
},
|
||||
GetPage {
|
||||
span: Span,
|
||||
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
|
||||
shard: timeline::handle::Handle<TenantManagerTypes>,
|
||||
effective_request_lsn: Lsn,
|
||||
pages: smallvec::SmallVec<[BatchedGetPageRequest; 1]>,
|
||||
},
|
||||
DbSize {
|
||||
span: Span,
|
||||
timer: SmgrOpTimer,
|
||||
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
|
||||
shard: timeline::handle::Handle<TenantManagerTypes>,
|
||||
req: models::PagestreamDbSizeRequest,
|
||||
},
|
||||
GetSlruSegment {
|
||||
span: Span,
|
||||
timer: SmgrOpTimer,
|
||||
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
|
||||
shard: timeline::handle::Handle<TenantManagerTypes>,
|
||||
req: models::PagestreamGetSlruSegmentRequest,
|
||||
},
|
||||
#[cfg(feature = "testing")]
|
||||
Test {
|
||||
span: Span,
|
||||
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
|
||||
requests: Vec<BatchedTestRequest>,
|
||||
},
|
||||
RespondError {
|
||||
span: Span,
|
||||
error: BatchedPageStreamError,
|
||||
@@ -621,12 +606,6 @@ impl BatchedFeMessage {
|
||||
page.timer.observe_execution_start(at);
|
||||
}
|
||||
}
|
||||
#[cfg(feature = "testing")]
|
||||
BatchedFeMessage::Test { requests, .. } => {
|
||||
for req in requests {
|
||||
req.timer.observe_execution_start(at);
|
||||
}
|
||||
}
|
||||
BatchedFeMessage::RespondError { .. } => {}
|
||||
}
|
||||
}
|
||||
@@ -756,7 +735,7 @@ impl PageServerHandler {
|
||||
BatchedFeMessage::Exists {
|
||||
span,
|
||||
timer,
|
||||
shard: shard.downgrade(),
|
||||
shard,
|
||||
req,
|
||||
}
|
||||
}
|
||||
@@ -775,7 +754,7 @@ impl PageServerHandler {
|
||||
BatchedFeMessage::Nblocks {
|
||||
span,
|
||||
timer,
|
||||
shard: shard.downgrade(),
|
||||
shard,
|
||||
req,
|
||||
}
|
||||
}
|
||||
@@ -794,7 +773,7 @@ impl PageServerHandler {
|
||||
BatchedFeMessage::DbSize {
|
||||
span,
|
||||
timer,
|
||||
shard: shard.downgrade(),
|
||||
shard,
|
||||
req,
|
||||
}
|
||||
}
|
||||
@@ -813,7 +792,7 @@ impl PageServerHandler {
|
||||
BatchedFeMessage::GetSlruSegment {
|
||||
span,
|
||||
timer,
|
||||
shard: shard.downgrade(),
|
||||
shard,
|
||||
req,
|
||||
}
|
||||
}
|
||||
@@ -865,7 +844,6 @@ impl PageServerHandler {
|
||||
)
|
||||
.await?;
|
||||
|
||||
// We're holding the Handle
|
||||
let effective_request_lsn = match Self::wait_or_get_last_lsn(
|
||||
&shard,
|
||||
req.hdr.request_lsn,
|
||||
@@ -883,27 +861,11 @@ impl PageServerHandler {
|
||||
};
|
||||
BatchedFeMessage::GetPage {
|
||||
span,
|
||||
shard: shard.downgrade(),
|
||||
shard,
|
||||
effective_request_lsn,
|
||||
pages: smallvec::smallvec![BatchedGetPageRequest { req, timer }],
|
||||
}
|
||||
}
|
||||
#[cfg(feature = "testing")]
|
||||
PagestreamFeMessage::Test(req) => {
|
||||
let span = tracing::info_span!(parent: parent_span, "handle_test_request");
|
||||
let shard = timeline_handles
|
||||
.get(tenant_id, timeline_id, ShardSelector::Zero)
|
||||
.instrument(span.clone()) // sets `shard_id` field
|
||||
.await?;
|
||||
let timer =
|
||||
record_op_start_and_throttle(&shard, metrics::SmgrQueryType::Test, received_at)
|
||||
.await?;
|
||||
BatchedFeMessage::Test {
|
||||
span,
|
||||
shard: shard.downgrade(),
|
||||
requests: vec![BatchedTestRequest { req, timer }],
|
||||
}
|
||||
}
|
||||
};
|
||||
Ok(Some(batched_msg))
|
||||
}
|
||||
@@ -945,7 +907,9 @@ impl PageServerHandler {
|
||||
assert_eq!(accum_pages.len(), max_batch_size.get());
|
||||
return false;
|
||||
}
|
||||
if !accum_shard.is_same_handle_as(&this_shard) {
|
||||
if (accum_shard.tenant_shard_id, accum_shard.timeline_id)
|
||||
!= (this_shard.tenant_shard_id, this_shard.timeline_id)
|
||||
{
|
||||
trace!(%accum_lsn, %this_lsn, "stopping batching because timeline object mismatch");
|
||||
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
|
||||
// But the current logic for keeping responses in order does not support that.
|
||||
@@ -964,44 +928,6 @@ impl PageServerHandler {
|
||||
accum_pages.extend(this_pages);
|
||||
Ok(())
|
||||
}
|
||||
#[cfg(feature = "testing")]
|
||||
(
|
||||
Ok(BatchedFeMessage::Test {
|
||||
shard: accum_shard,
|
||||
requests: accum_requests,
|
||||
..
|
||||
}),
|
||||
BatchedFeMessage::Test {
|
||||
shard: this_shard,
|
||||
requests: this_requests,
|
||||
..
|
||||
},
|
||||
) if (|| {
|
||||
assert!(this_requests.len() == 1);
|
||||
if accum_requests.len() >= max_batch_size.get() {
|
||||
trace!(%max_batch_size, "stopping batching because of batch size");
|
||||
assert_eq!(accum_requests.len(), max_batch_size.get());
|
||||
return false;
|
||||
}
|
||||
if !accum_shard.is_same_handle_as(&this_shard) {
|
||||
trace!("stopping batching because timeline object mismatch");
|
||||
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
|
||||
// But the current logic for keeping responses in order does not support that.
|
||||
return false;
|
||||
}
|
||||
let this_batch_key = this_requests[0].req.batch_key;
|
||||
let accum_batch_key = accum_requests[0].req.batch_key;
|
||||
if this_requests[0].req.batch_key != accum_requests[0].req.batch_key {
|
||||
trace!(%accum_batch_key, %this_batch_key, "stopping batching because batch key changed");
|
||||
return false;
|
||||
}
|
||||
true
|
||||
})() =>
|
||||
{
|
||||
// ok to batch
|
||||
accum_requests.extend(this_requests);
|
||||
Ok(())
|
||||
}
|
||||
// something batched already but this message is unbatchable
|
||||
(_, this_msg) => {
|
||||
// by default, don't continue batching
|
||||
@@ -1043,7 +969,7 @@ impl PageServerHandler {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::exists");
|
||||
(
|
||||
vec![self
|
||||
.handle_get_rel_exists_request(&*shard.upgrade()?, &req, ctx)
|
||||
.handle_get_rel_exists_request(&shard, &req, ctx)
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
.map(|msg| (msg, timer))
|
||||
@@ -1060,7 +986,7 @@ impl PageServerHandler {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::nblocks");
|
||||
(
|
||||
vec![self
|
||||
.handle_get_nblocks_request(&*shard.upgrade()?, &req, ctx)
|
||||
.handle_get_nblocks_request(&shard, &req, ctx)
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
.map(|msg| (msg, timer))
|
||||
@@ -1081,7 +1007,7 @@ impl PageServerHandler {
|
||||
trace!(npages, "handling getpage request");
|
||||
let res = self
|
||||
.handle_get_page_at_lsn_request_batched(
|
||||
&*shard.upgrade()?,
|
||||
&shard,
|
||||
effective_request_lsn,
|
||||
pages,
|
||||
ctx,
|
||||
@@ -1103,7 +1029,7 @@ impl PageServerHandler {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::dbsize");
|
||||
(
|
||||
vec![self
|
||||
.handle_db_size_request(&*shard.upgrade()?, &req, ctx)
|
||||
.handle_db_size_request(&shard, &req, ctx)
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
.map(|msg| (msg, timer))
|
||||
@@ -1120,7 +1046,7 @@ impl PageServerHandler {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
|
||||
(
|
||||
vec![self
|
||||
.handle_get_slru_segment_request(&*shard.upgrade()?, &req, ctx)
|
||||
.handle_get_slru_segment_request(&shard, &req, ctx)
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
.map(|msg| (msg, timer))
|
||||
@@ -1128,27 +1054,6 @@ impl PageServerHandler {
|
||||
span,
|
||||
)
|
||||
}
|
||||
#[cfg(feature = "testing")]
|
||||
BatchedFeMessage::Test {
|
||||
span,
|
||||
shard,
|
||||
requests,
|
||||
} => {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::test");
|
||||
(
|
||||
{
|
||||
let npages = requests.len();
|
||||
trace!(npages, "handling getpage request");
|
||||
let res = self
|
||||
.handle_test_request_batch(&*shard.upgrade()?, requests, ctx)
|
||||
.instrument(span.clone())
|
||||
.await;
|
||||
assert_eq!(res.len(), npages);
|
||||
res
|
||||
},
|
||||
span,
|
||||
)
|
||||
}
|
||||
BatchedFeMessage::RespondError { span, error } => {
|
||||
// We've already decided to respond with an error, so we don't need to
|
||||
// call the handler.
|
||||
@@ -1886,51 +1791,6 @@ impl PageServerHandler {
|
||||
))
|
||||
}
|
||||
|
||||
// NB: this impl mimics what we do for batched getpage requests.
|
||||
#[cfg(feature = "testing")]
|
||||
#[instrument(skip_all, fields(shard_id))]
|
||||
async fn handle_test_request_batch(
|
||||
&mut self,
|
||||
timeline: &Timeline,
|
||||
requests: Vec<BatchedTestRequest>,
|
||||
_ctx: &RequestContext,
|
||||
) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>> {
|
||||
// real requests would do something with the timeline
|
||||
let mut results = Vec::with_capacity(requests.len());
|
||||
for _req in requests.iter() {
|
||||
tokio::task::yield_now().await;
|
||||
|
||||
results.push({
|
||||
if timeline.cancel.is_cancelled() {
|
||||
Err(PageReconstructError::Cancelled)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// TODO: avoid creating the new Vec here
|
||||
Vec::from_iter(
|
||||
requests
|
||||
.into_iter()
|
||||
.zip(results.into_iter())
|
||||
.map(|(req, res)| {
|
||||
res.map(|()| {
|
||||
(
|
||||
PagestreamBeMessage::Test(models::PagestreamTestResponse {
|
||||
req: req.req.clone(),
|
||||
}),
|
||||
req.timer,
|
||||
)
|
||||
})
|
||||
.map_err(|e| BatchedPageStreamError {
|
||||
err: PageStreamError::from(e),
|
||||
req: req.req.hdr,
|
||||
})
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
/// Note on "fullbackup":
|
||||
/// Full basebackups should only be used for debugging purposes.
|
||||
/// Originally, it was introduced to enable breaking storage format changes,
|
||||
@@ -2546,14 +2406,6 @@ impl From<GetActiveTimelineError> for QueryError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<crate::tenant::timeline::handle::HandleUpgradeError> for QueryError {
|
||||
fn from(e: crate::tenant::timeline::handle::HandleUpgradeError) -> Self {
|
||||
match e {
|
||||
crate::tenant::timeline::handle::HandleUpgradeError::ShutDown => QueryError::Shutdown,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn set_tracing_field_shard_id(timeline: &Timeline) {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
|
||||
tracing::Span::current().record(
|
||||
|
||||
@@ -95,9 +95,6 @@ use crate::deletion_queue::DeletionQueueError;
|
||||
use crate::import_datadir;
|
||||
use crate::is_uninit_mark;
|
||||
use crate::l0_flush::L0FlushGlobalState;
|
||||
use crate::metrics::CONCURRENT_INITDBS;
|
||||
use crate::metrics::INITDB_RUN_TIME;
|
||||
use crate::metrics::INITDB_SEMAPHORE_ACQUISITION_TIME;
|
||||
use crate::metrics::TENANT;
|
||||
use crate::metrics::{
|
||||
remove_tenant_metrics, BROKEN_TENANTS_SET, CIRCUIT_BREAKERS_BROKEN, CIRCUIT_BREAKERS_UNBROKEN,
|
||||
@@ -5350,17 +5347,8 @@ async fn run_initdb(
|
||||
initdb_bin_path, initdb_target_dir, initdb_lib_dir,
|
||||
);
|
||||
|
||||
let _permit = {
|
||||
let _timer = INITDB_SEMAPHORE_ACQUISITION_TIME.start_timer();
|
||||
INIT_DB_SEMAPHORE.acquire().await
|
||||
};
|
||||
let _permit = INIT_DB_SEMAPHORE.acquire().await;
|
||||
|
||||
CONCURRENT_INITDBS.inc();
|
||||
scopeguard::defer! {
|
||||
CONCURRENT_INITDBS.dec();
|
||||
}
|
||||
|
||||
let _timer = INITDB_RUN_TIME.start_timer();
|
||||
let res = postgres_initdb::do_run_initdb(postgres_initdb::RunInitdbArgs {
|
||||
superuser: &conf.superuser,
|
||||
locale: &conf.locale,
|
||||
|
||||
@@ -382,12 +382,6 @@ pub(crate) struct RemoteTimelineClient {
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
impl Drop for RemoteTimelineClient {
|
||||
fn drop(&mut self) {
|
||||
debug!("dropping RemoteTimelineClient");
|
||||
}
|
||||
}
|
||||
|
||||
impl RemoteTimelineClient {
|
||||
///
|
||||
/// Create a remote storage client for given timeline
|
||||
@@ -803,12 +797,6 @@ impl RemoteTimelineClient {
|
||||
|
||||
upload_queue.dirty.metadata.apply(update);
|
||||
|
||||
// Defense in depth: if we somehow generated invalid metadata, do not persist it.
|
||||
upload_queue
|
||||
.dirty
|
||||
.validate()
|
||||
.map_err(|e| anyhow::anyhow!(e))?;
|
||||
|
||||
self.schedule_index_upload(upload_queue);
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -152,21 +152,6 @@ impl IndexPart {
|
||||
};
|
||||
is_same_remote_layer_path(name, metadata, name, index_metadata)
|
||||
}
|
||||
|
||||
/// Check for invariants in the index: this is useful when uploading an index to ensure that if
|
||||
/// we encounter a bug, we do not persist buggy metadata.
|
||||
pub(crate) fn validate(&self) -> Result<(), String> {
|
||||
if self.import_pgdata.is_none()
|
||||
&& self.metadata.ancestor_timeline().is_none()
|
||||
&& self.layer_metadata.is_empty()
|
||||
{
|
||||
// Unless we're in the middle of a raw pgdata import, or this is a child timeline,the index must
|
||||
// always have at least one layer.
|
||||
return Err("Index has no ancestor and no layers".to_string());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Metadata gathered for each of the layer files.
|
||||
|
||||
@@ -40,10 +40,6 @@ pub(crate) async fn upload_index_part(
|
||||
});
|
||||
pausable_failpoint!("before-upload-index-pausable");
|
||||
|
||||
// Safety: refuse to persist invalid index metadata, to mitigate the impact of any bug that produces this
|
||||
// (this should never happen)
|
||||
index_part.validate().map_err(|e| anyhow::anyhow!(e))?;
|
||||
|
||||
// FIXME: this error comes too late
|
||||
let serialized = index_part.to_json_bytes()?;
|
||||
let serialized = Bytes::from(serialized);
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use std::time::UNIX_EPOCH;
|
||||
|
||||
use pageserver_api::key::{Key, CONTROLFILE_KEY};
|
||||
use pageserver_api::key::CONTROLFILE_KEY;
|
||||
use tokio::task::JoinSet;
|
||||
use utils::{
|
||||
completion::{self, Completion},
|
||||
@@ -9,10 +9,7 @@ use utils::{
|
||||
|
||||
use super::failpoints::{Failpoint, FailpointKind};
|
||||
use super::*;
|
||||
use crate::{
|
||||
context::DownloadBehavior,
|
||||
tenant::{harness::test_img, storage_layer::LayerVisibilityHint},
|
||||
};
|
||||
use crate::{context::DownloadBehavior, tenant::storage_layer::LayerVisibilityHint};
|
||||
use crate::{task_mgr::TaskKind, tenant::harness::TenantHarness};
|
||||
|
||||
/// Used in tests to advance a future to wanted await point, and not futher.
|
||||
@@ -34,51 +31,20 @@ async fn smoke_test() {
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Download);
|
||||
|
||||
let image_layers = vec![(
|
||||
Lsn(0x40),
|
||||
vec![(
|
||||
Key::from_hex("620000000033333333444444445500000000").unwrap(),
|
||||
test_img("foo"),
|
||||
)],
|
||||
)];
|
||||
|
||||
// Create a test timeline with one real layer, and one synthetic test layer. The synthetic
|
||||
// one is only there so that we can GC the real one without leaving the timeline's metadata
|
||||
// empty, which is an illegal state (see [`IndexPart::validate`]).
|
||||
let timeline = tenant
|
||||
.create_test_timeline_with_layers(
|
||||
TimelineId::generate(),
|
||||
Lsn(0x10),
|
||||
14,
|
||||
&ctx,
|
||||
Default::default(),
|
||||
image_layers,
|
||||
Lsn(0x100),
|
||||
)
|
||||
.create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Grab one of the timeline's layers to exercise in the test, and the other layer that is just
|
||||
// there to avoid the timeline being illegally empty
|
||||
let (layer, dummy_layer) = {
|
||||
let layer = {
|
||||
let mut layers = {
|
||||
let layers = timeline.layers.read().await;
|
||||
layers.likely_resident_layers().cloned().collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
assert_eq!(layers.len(), 2);
|
||||
assert_eq!(layers.len(), 1);
|
||||
|
||||
layers.sort_by_key(|l| l.layer_desc().get_key_range().start);
|
||||
let synthetic_layer = layers.pop().unwrap();
|
||||
let real_layer = layers.pop().unwrap();
|
||||
tracing::info!(
|
||||
"real_layer={:?} ({}), synthetic_layer={:?} ({})",
|
||||
real_layer,
|
||||
real_layer.layer_desc().file_size,
|
||||
synthetic_layer,
|
||||
synthetic_layer.layer_desc().file_size
|
||||
);
|
||||
(real_layer, synthetic_layer)
|
||||
layers.swap_remove(0)
|
||||
};
|
||||
|
||||
// all layers created at pageserver are like `layer`, initialized with strong
|
||||
@@ -207,13 +173,10 @@ async fn smoke_test() {
|
||||
|
||||
let rtc = &timeline.remote_client;
|
||||
|
||||
// Simulate GC removing our test layer.
|
||||
{
|
||||
let mut g = timeline.layers.write().await;
|
||||
|
||||
let layers = &[layer];
|
||||
let mut g = timeline.layers.write().await;
|
||||
g.open_mut().unwrap().finish_gc_timeline(layers);
|
||||
|
||||
// this just updates the remote_physical_size for demonstration purposes
|
||||
rtc.schedule_gc_update(layers).unwrap();
|
||||
}
|
||||
@@ -228,10 +191,7 @@ async fn smoke_test() {
|
||||
|
||||
rtc.wait_completion().await.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
rtc.get_remote_physical_size(),
|
||||
dummy_layer.metadata().file_size
|
||||
);
|
||||
assert_eq!(rtc.get_remote_physical_size(), 0);
|
||||
assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
|
||||
}
|
||||
|
||||
|
||||
@@ -76,7 +76,6 @@ use std::{pin::pin, sync::OnceLock};
|
||||
|
||||
use crate::{
|
||||
aux_file::AuxFileSizeEstimator,
|
||||
page_service::TenantManagerTypes,
|
||||
tenant::{
|
||||
config::AttachmentMode,
|
||||
layer_map::{LayerMap, SearchResult},
|
||||
@@ -432,7 +431,7 @@ pub struct Timeline {
|
||||
|
||||
pub(crate) l0_flush_global_state: L0FlushGlobalState,
|
||||
|
||||
pub(crate) handles: handle::PerTimelineState<TenantManagerTypes>,
|
||||
pub(crate) handles: handle::PerTimelineState<crate::page_service::TenantManagerTypes>,
|
||||
|
||||
pub(crate) attach_wal_lag_cooldown: Arc<OnceLock<WalLagCooldown>>,
|
||||
|
||||
@@ -4626,10 +4625,6 @@ impl Drop for Timeline {
|
||||
}
|
||||
}
|
||||
}
|
||||
info!(
|
||||
"Timeline {} for tenant {} is being dropped",
|
||||
self.timeline_id, self.tenant_shard_id.tenant_id
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5678,17 +5673,9 @@ impl Timeline {
|
||||
info!("force created image layer {}", image_layer.local_path());
|
||||
{
|
||||
let mut guard = self.layers.write().await;
|
||||
guard
|
||||
.open_mut()
|
||||
.unwrap()
|
||||
.force_insert_layer(image_layer.clone());
|
||||
guard.open_mut().unwrap().force_insert_layer(image_layer);
|
||||
}
|
||||
|
||||
// Update remote_timeline_client state to reflect existence of this layer
|
||||
self.remote_client
|
||||
.schedule_layer_file_upload(image_layer)
|
||||
.unwrap();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -5739,17 +5726,9 @@ impl Timeline {
|
||||
info!("force created delta layer {}", delta_layer.local_path());
|
||||
{
|
||||
let mut guard = self.layers.write().await;
|
||||
guard
|
||||
.open_mut()
|
||||
.unwrap()
|
||||
.force_insert_layer(delta_layer.clone());
|
||||
guard.open_mut().unwrap().force_insert_layer(delta_layer);
|
||||
}
|
||||
|
||||
// Update remote_timeline_client state to reflect existence of this layer
|
||||
self.remote_client
|
||||
.schedule_layer_file_upload(delta_layer)
|
||||
.unwrap();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -32,151 +32,54 @@
|
||||
//!
|
||||
//! # Design
|
||||
//!
|
||||
//! ## Data Structures
|
||||
//!
|
||||
//! There are three user-facing data structures:
|
||||
//! - `PerTimelineState`: a struct embedded into each Timeline struct. Lifetime == Timeline lifetime.
|
||||
//! - `Cache`: a struct private to each connection handler; Lifetime == connection lifetime.
|
||||
//! - `Handle`: a smart pointer that holds the Timeline gate open and derefs to `&Timeline`.
|
||||
//! - `WeakHandle`: downgrade of a `Handle` that does not keep the gate open, but allows
|
||||
//! trying to ugprade back to a `Handle`, guaranteeing it's the same `Timeline` *object*.
|
||||
//! Lifetime: for a single request dispatch on the Timeline (i.e., one getpage request)
|
||||
//!
|
||||
//! Internally, there is 0 or 1 `HandleInner` per `(Cache,Timeline)`.
|
||||
//! Since Cache:Connection is 1:1, there is 0 or 1 `HandleInner` per `(Connection,Timeline)`.
|
||||
//! The `Handle` is just a wrapper around an `Arc<HandleInner>`.
|
||||
//!
|
||||
//! The `HandleInner` is allocated as a `Arc<Mutex<HandleInner>>` and
|
||||
//! referenced weakly and strongly from various places which we are now illustrating.
|
||||
//! For brevity, we will omit the `Arc<Mutex<>>` part in the following and instead
|
||||
//! use `strong ref` and `weak ref` when referring to the `Arc<Mutex<HandleInner>>`
|
||||
//! or `Weak<Mutex<HandleInner>>`, respectively.
|
||||
//!
|
||||
//! - The `Handle` is a strong ref.
|
||||
//! - The `WeakHandle` is a weak ref.
|
||||
//! - The `PerTimelineState` contains a `HashMap<CacheId, strong ref>`.
|
||||
//! - The `Cache` is a `HashMap<unique identifier for the shard, weak ref>`.
|
||||
//!
|
||||
//! Lifetimes:
|
||||
//! - `WeakHandle` and `Handle`: single pagestream request.
|
||||
//! - `Cache`: single page service connection.
|
||||
//! - `PerTimelineState`: lifetime of the Timeline object (i.e., i.e., till `Timeline::shutdown`).
|
||||
//!
|
||||
//! ## Request Handling Flow (= filling and using the `Cache``)
|
||||
//! There is one long-lived `Arc<HandleInner>`, which is stored in the `PerTimelineState`.
|
||||
//! The `Cache` stores a `Weak<HandleInner>` for each cached Timeline.
|
||||
//!
|
||||
//! To dispatch a request, the page service connection calls `Cache::get`.
|
||||
//!
|
||||
//! A cache miss means we consult the tenant manager for shard routing,
|
||||
//! resulting in an `Arc<Timeline>`. We enter its gate _once_ and store it in the the
|
||||
//! `Arc<Mutex<HandleInner>>>`. A weak ref is stored in the `Cache`
|
||||
//! and a strong ref in the `PerTimelineState`.
|
||||
//! A strong ref is returned wrapped in a `Handle`.
|
||||
//! resulting in an `Arc<Timeline>`. We enter its gate _once_ and construct an
|
||||
//! `Arc<HandleInner>`. We store a `Weak<HandleInner>` in the cache
|
||||
//! and the `Arc<HandleInner>` in the `PerTimelineState`.
|
||||
//!
|
||||
//! For subsequent requests, `Cache::get` will perform a "fast path" shard routing
|
||||
//! and find the weak ref in the cache.
|
||||
//! We upgrade the weak ref to a strong ref and return it wrapped in a `Handle`.
|
||||
//! and find the `Weak<HandleInner>` in the cache.
|
||||
//! We upgrade the `Weak<HandleInner>` to an `Arc<HandleInner>` and wrap it in the user-facing `Handle` type.
|
||||
//!
|
||||
//! The pagestream processing is pipelined and involves a batching step.
|
||||
//! While a request is batching, the `Handle` is downgraded to a `WeakHandle`.
|
||||
//! When the batch is ready to be executed, the `WeakHandle` is upgraded back to a `Handle`
|
||||
//! and the request handler dispatches the request to the right `<Handle as Deref<Target = Timeline>>::$request_method`.
|
||||
//! The request handler dispatches the request to the right `<Handle as Deref<Target = Timeline>>::$request_method`.
|
||||
//! It then drops the `Handle`, which drops the `Arc<HandleInner>`.
|
||||
//!
|
||||
//! # Performance
|
||||
//! # Memory Management / How The Reference Cycle Is Broken
|
||||
//!
|
||||
//! Remember from the introductory section:
|
||||
//! The attentive reader may have noticed the strong reference cycle
|
||||
//! from `Arc<HandleInner>` to `PerTimelineState` to `Arc<Timeline>`.
|
||||
//!
|
||||
//! > However, we want to avoid the overhead of entering the gate for every
|
||||
//! > method invocation.
|
||||
//!
|
||||
//! Why do we want to avoid that?
|
||||
//! Because the gate is a shared location in memory and entering it involves
|
||||
//! bumping refcounts, which leads to cache contention if done frequently
|
||||
//! from multiple cores in parallel.
|
||||
//!
|
||||
//! So, we only acquire the `GateGuard` once on `Cache` miss, and wrap it in an `Arc`.
|
||||
//! That `Arc` is private to the `HandleInner` and hence to the connection.
|
||||
//! (Review the "Data Structures" section if that is unclear to you.)
|
||||
//!
|
||||
//! A `WeakHandle` is a weak ref to the `HandleInner`.
|
||||
//! When upgrading a `WeakHandle`, we upgrade to a strong ref to the `HandleInner` and
|
||||
//! further acquire an additional strong ref to the `Arc<GateGuard>` inside it.
|
||||
//! Again, this manipulation of ref counts is is cheap because `Arc` is private to the connection.
|
||||
//!
|
||||
//! When downgrading a `Handle` to a `WeakHandle`, we drop the `Arc<GateGuard>`.
|
||||
//! Again, this is cheap because the `Arc` is private to the connection.
|
||||
//!
|
||||
//! In addition to the GateGuard, we need to provide `Deref<Target=Timeline>` impl.
|
||||
//! For this, both `Handle` need infallible access to an `Arc<Timeline>`.
|
||||
//! We could clone the `Arc<Timeline>` when upgrading a `WeakHandle`, but that would cause contention
|
||||
//! on the shared memory location that trakcs the refcount of the `Arc<Timeline>`.
|
||||
//! Instead, we wrap the `Arc<Timeline>` into another `Arc`.
|
||||
//! so that we can clone it cheaply when upgrading a `WeakHandle`.
|
||||
//!
|
||||
//! # Shutdown
|
||||
//!
|
||||
//! The attentive reader may have noticed the following reference cycle around the `Arc<Timeline>`:
|
||||
//!
|
||||
//! ```text
|
||||
//! Timeline --owns--> PerTimelineState --strong--> HandleInner --strong--> Timeline
|
||||
//! ```
|
||||
//!
|
||||
//! Further, there is this cycle:
|
||||
//!
|
||||
//! ```text
|
||||
//! Timeline --owns--> PerTimelineState --strong--> HandleInner --strong--> GateGuard --keepalive--> Timeline
|
||||
//! ```
|
||||
//!
|
||||
//! The former cycle is a memory leak if not broken.
|
||||
//! The latter cycle further prevents the Timeline from shutting down
|
||||
//! because we certainly won't drop the Timeline while the GateGuard is alive.
|
||||
//! Preventing shutdown is the whole point of this handle/cache system,
|
||||
//! but when the Timeline needs to shut down, we need to break the cycle.
|
||||
//! This cycle is intentional: while it exists, the `Cache` can upgrade its
|
||||
//! `Weak<HandleInner>` to an `Arc<HandleInner>` in a single atomic operation.
|
||||
//!
|
||||
//! The cycle is broken by either
|
||||
//! - Timeline shutdown (=> `PerTimelineState::shutdown`)
|
||||
//! - Connection shutdown (=> dropping the `Cache`).
|
||||
//! - `PerTimelineState::shutdown` or
|
||||
//! - dropping the `Cache`.
|
||||
//!
|
||||
//! Both transition the `HandleInner` from [`HandleInner::KeepingTimelineGateOpen`] to
|
||||
//! [`HandleInner::ShutDown`], which drops the only long-lived strong ref to the
|
||||
//! `Arc<GateGuard>`.
|
||||
//!
|
||||
//! `PerTimelineState::shutdown` drops all the `HandleInners` it contains,
|
||||
//! thereby breaking the cycle.
|
||||
//! It also initiates draining of already existing `Handle`s by
|
||||
//! poisoning things so that no new `HandleInner`'s can be added
|
||||
//! to the `PerTimelineState`, which will make subsequent `Cache::get` fail.
|
||||
//!
|
||||
//! Concurrently existing / already upgraded `Handle`s will extend the
|
||||
//! lifetime of the `Arc<Mutex<HandleInner>>` and hence cycles.
|
||||
//! Concurrently existing `Handle`s will extend the existence of the cycle.
|
||||
//! However, since `Handle`s are short-lived and new `Handle`s are not
|
||||
//! handed out from `Cache::get` or `WeakHandle::upgrade` after
|
||||
//! `PerTimelineState::shutdown`, that extension of the cycle is bounded.
|
||||
//!
|
||||
//! Concurrently existing `WeakHandle`s will fail to `upgrade()`:
|
||||
//! while they will succeed in upgrading `Weak<Mutex<HandleInner>>`,
|
||||
//! they will find the inner in state `HandleInner::ShutDown` state where the
|
||||
//! `Arc<GateGuard>` and Timeline has already been dropped.
|
||||
//!
|
||||
//! Dropping the `Cache` undoes the registration of this `Cache`'s
|
||||
//! `HandleInner`s from all the `PerTimelineState`s, i.e., it
|
||||
//! removes the strong ref to each of its `HandleInner`s
|
||||
//! from all the `PerTimelineState`.
|
||||
//!
|
||||
//! # Locking Rules
|
||||
//!
|
||||
//! To prevent deadlocks we:
|
||||
//!
|
||||
//! 1. Only ever hold one of the locks at a time.
|
||||
//! 2. Don't add more than one Drop impl that locks on the
|
||||
//! cycles above.
|
||||
//!
|
||||
//! As per (2), that impl is in `Drop for Cache`.
|
||||
//! handed out after either `PerTimelineState::shutdown` or `Cache` drop,
|
||||
//! that extension of the cycle is bounded.
|
||||
//!
|
||||
//! # Fast Path for Shard Routing
|
||||
//!
|
||||
//! The `Cache` has a fast path for shard routing to avoid calling into
|
||||
//! the tenant manager for every request.
|
||||
//!
|
||||
//! The `Cache` maintains a hash map of `ShardTimelineId` to `WeakHandle`s.
|
||||
//! The `Cache` maintains a hash map of `ShardTimelineId` to `Weak<HandleInner>`.
|
||||
//!
|
||||
//! The current implementation uses the first entry in the hash map
|
||||
//! to determine the `ShardParameters` and derive the correct
|
||||
@@ -184,18 +87,18 @@
|
||||
//!
|
||||
//! It then looks up the hash map for that `ShardTimelineId := {ShardIndex,TimelineId}`.
|
||||
//!
|
||||
//! If the lookup is successful and the `WeakHandle` can be upgraded,
|
||||
//! If the lookup is successful and the `Weak<HandleInner>` can be upgraded,
|
||||
//! it's a hit.
|
||||
//!
|
||||
//! ## Cache invalidation
|
||||
//!
|
||||
//! The insight is that cache invalidation is sufficient and most efficiently if done lazily.
|
||||
//! The insight is that cache invalidation is sufficient and most efficiently done lazily.
|
||||
//! The only reasons why an entry in the cache can become stale are:
|
||||
//! 1. The `PerTimelineState` / Timeline is shutting down e.g. because the shard is
|
||||
//! being detached, timeline or shard deleted, or pageserver is shutting down.
|
||||
//! 2. We're doing a shard split and new traffic should be routed to the child shards.
|
||||
//!
|
||||
//! Regarding (1), we will eventually fail to upgrade the `WeakHandle` once the
|
||||
//! Regarding (1), we will eventually fail to upgrade the `Weak<HandleInner>` once the
|
||||
//! timeline has shut down, and when that happens, we remove the entry from the cache.
|
||||
//!
|
||||
//! Regarding (2), the insight is that it is toally fine to keep dispatching requests
|
||||
@@ -204,6 +107,8 @@
|
||||
|
||||
use std::collections::hash_map;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use std::sync::Weak;
|
||||
@@ -247,7 +152,7 @@ pub(crate) struct Cache<T: Types> {
|
||||
map: Map<T>,
|
||||
}
|
||||
|
||||
type Map<T> = HashMap<ShardTimelineId, WeakHandle<T>>;
|
||||
type Map<T> = HashMap<ShardTimelineId, Weak<HandleInner<T>>>;
|
||||
|
||||
impl<T: Types> Default for Cache<T> {
|
||||
fn default() -> Self {
|
||||
@@ -265,22 +170,12 @@ pub(crate) struct ShardTimelineId {
|
||||
}
|
||||
|
||||
/// See module-level comment.
|
||||
pub(crate) struct Handle<T: Types> {
|
||||
timeline: Arc<T::Timeline>,
|
||||
#[allow(dead_code)] // the field exists to keep the gate open
|
||||
gate_guard: Arc<utils::sync::gate::GateGuard>,
|
||||
inner: Arc<Mutex<HandleInner<T>>>,
|
||||
}
|
||||
pub(crate) struct WeakHandle<T: Types> {
|
||||
inner: Weak<Mutex<HandleInner<T>>>,
|
||||
}
|
||||
enum HandleInner<T: Types> {
|
||||
KeepingTimelineGateOpen {
|
||||
#[allow(dead_code)]
|
||||
gate_guard: Arc<utils::sync::gate::GateGuard>,
|
||||
timeline: Arc<T::Timeline>,
|
||||
},
|
||||
ShutDown,
|
||||
pub(crate) struct Handle<T: Types>(Arc<HandleInner<T>>);
|
||||
struct HandleInner<T: Types> {
|
||||
shut_down: AtomicBool,
|
||||
timeline: T::Timeline,
|
||||
// The timeline's gate held open.
|
||||
_gate_guard: utils::sync::gate::GateGuard,
|
||||
}
|
||||
|
||||
/// Embedded in each [`Types::Timeline`] as the anchor for the only long-lived strong ref to `HandleInner`.
|
||||
@@ -288,8 +183,7 @@ enum HandleInner<T: Types> {
|
||||
/// See module-level comment for details.
|
||||
pub struct PerTimelineState<T: Types> {
|
||||
// None = shutting down
|
||||
#[allow(clippy::type_complexity)]
|
||||
handles: Mutex<Option<HashMap<CacheId, Arc<Mutex<HandleInner<T>>>>>>,
|
||||
handles: Mutex<Option<HashMap<CacheId, Arc<HandleInner<T>>>>>,
|
||||
}
|
||||
|
||||
impl<T: Types> Default for PerTimelineState<T> {
|
||||
@@ -349,24 +243,49 @@ impl<T: Types> Cache<T> {
|
||||
shard_selector: ShardSelector,
|
||||
tenant_manager: &T::TenantManager,
|
||||
) -> Result<Handle<T>, GetError<T>> {
|
||||
// terminates because when every iteration we remove an element from the map
|
||||
let miss: ShardSelector = loop {
|
||||
// terminates because each iteration removes an element from the map
|
||||
loop {
|
||||
let handle = self
|
||||
.get_impl(timeline_id, shard_selector, tenant_manager)
|
||||
.await?;
|
||||
if handle.0.shut_down.load(Ordering::Relaxed) {
|
||||
let removed = self
|
||||
.map
|
||||
.remove(&handle.0.timeline.shard_timeline_id())
|
||||
.expect("invariant of get_impl is that the returned handle is in the map");
|
||||
assert!(
|
||||
Weak::ptr_eq(&removed, &Arc::downgrade(&handle.0)),
|
||||
"shard_timeline_id() incorrect?"
|
||||
);
|
||||
} else {
|
||||
return Ok(handle);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", skip_all)]
|
||||
async fn get_impl(
|
||||
&mut self,
|
||||
timeline_id: TimelineId,
|
||||
shard_selector: ShardSelector,
|
||||
tenant_manager: &T::TenantManager,
|
||||
) -> Result<Handle<T>, GetError<T>> {
|
||||
let miss: ShardSelector = {
|
||||
let routing_state = self.shard_routing(timeline_id, shard_selector);
|
||||
match routing_state {
|
||||
RoutingResult::FastPath(handle) => return Ok(handle),
|
||||
RoutingResult::SlowPath(key) => match self.map.get(&key) {
|
||||
Some(cached) => match cached.upgrade() {
|
||||
Ok(upgraded) => return Ok(upgraded),
|
||||
Err(HandleUpgradeError::ShutDown) => {
|
||||
// TODO: dedup with shard_routing()
|
||||
Some(upgraded) => return Ok(Handle(upgraded)),
|
||||
None => {
|
||||
trace!("handle cache stale");
|
||||
self.map.remove(&key).unwrap();
|
||||
continue;
|
||||
ShardSelector::Known(key.shard_index)
|
||||
}
|
||||
},
|
||||
None => break ShardSelector::Known(key.shard_index),
|
||||
None => ShardSelector::Known(key.shard_index),
|
||||
},
|
||||
RoutingResult::NeedConsultTenantManager => break shard_selector,
|
||||
RoutingResult::NeedConsultTenantManager => shard_selector,
|
||||
}
|
||||
};
|
||||
self.get_miss(timeline_id, miss, tenant_manager).await
|
||||
@@ -383,7 +302,7 @@ impl<T: Types> Cache<T> {
|
||||
let Some((first_key, first_handle)) = self.map.iter().next() else {
|
||||
return RoutingResult::NeedConsultTenantManager;
|
||||
};
|
||||
let Ok(first_handle) = first_handle.upgrade() else {
|
||||
let Some(first_handle) = first_handle.upgrade() else {
|
||||
// TODO: dedup with get()
|
||||
trace!("handle cache stale");
|
||||
let first_key_owned = *first_key;
|
||||
@@ -391,7 +310,7 @@ impl<T: Types> Cache<T> {
|
||||
continue;
|
||||
};
|
||||
|
||||
let first_handle_shard_identity = first_handle.get_shard_identity();
|
||||
let first_handle_shard_identity = first_handle.timeline.get_shard_identity();
|
||||
let make_shard_index = |shard_num: ShardNumber| ShardIndex {
|
||||
shard_number: shard_num,
|
||||
shard_count: first_handle_shard_identity.count,
|
||||
@@ -410,11 +329,11 @@ impl<T: Types> Cache<T> {
|
||||
};
|
||||
let first_handle_shard_timeline_id = ShardTimelineId {
|
||||
shard_index: first_handle_shard_identity.shard_index(),
|
||||
timeline_id: first_handle.shard_timeline_id().timeline_id,
|
||||
timeline_id: first_handle.timeline.shard_timeline_id().timeline_id,
|
||||
};
|
||||
|
||||
if need_shard_timeline_id == first_handle_shard_timeline_id {
|
||||
return RoutingResult::FastPath(first_handle);
|
||||
return RoutingResult::FastPath(Handle(first_handle));
|
||||
} else {
|
||||
return RoutingResult::SlowPath(need_shard_timeline_id);
|
||||
}
|
||||
@@ -438,30 +357,23 @@ impl<T: Types> Cache<T> {
|
||||
ShardSelector::Known(idx) => assert_eq!(idx, &key.shard_index),
|
||||
}
|
||||
|
||||
trace!("creating new HandleInner");
|
||||
let handle_inner_arc = Arc::new(Mutex::new(HandleInner::KeepingTimelineGateOpen {
|
||||
gate_guard: Arc::new(
|
||||
// this enter() is expensive in production code because
|
||||
// it hits the global Arc<Timeline>::gate refcounts
|
||||
match timeline.gate().enter() {
|
||||
Ok(guard) => guard,
|
||||
Err(_) => {
|
||||
return Err(GetError::TimelineGateClosed);
|
||||
}
|
||||
},
|
||||
),
|
||||
// this clone is expensive in production code because
|
||||
// it hits the global Arc<Timeline>::clone refcounts
|
||||
timeline: Arc::new(timeline.clone()),
|
||||
}));
|
||||
let handle_weak = WeakHandle {
|
||||
inner: Arc::downgrade(&handle_inner_arc),
|
||||
let gate_guard = match timeline.gate().enter() {
|
||||
Ok(guard) => guard,
|
||||
Err(_) => {
|
||||
return Err(GetError::TimelineGateClosed);
|
||||
}
|
||||
};
|
||||
let handle = handle_weak
|
||||
.upgrade()
|
||||
.ok()
|
||||
.expect("we just created it and it's not linked anywhere yet");
|
||||
{
|
||||
trace!("creating new HandleInner");
|
||||
let handle = Arc::new(
|
||||
// TODO: global metric that keeps track of the number of live HandlerTimeline instances
|
||||
// so we can identify reference cycle bugs.
|
||||
HandleInner {
|
||||
shut_down: AtomicBool::new(false),
|
||||
_gate_guard: gate_guard,
|
||||
timeline: timeline.clone(),
|
||||
},
|
||||
);
|
||||
let handle = {
|
||||
let mut lock_guard = timeline
|
||||
.per_timeline_state()
|
||||
.handles
|
||||
@@ -469,8 +381,7 @@ impl<T: Types> Cache<T> {
|
||||
.expect("mutex poisoned");
|
||||
match &mut *lock_guard {
|
||||
Some(per_timeline_state) => {
|
||||
let replaced =
|
||||
per_timeline_state.insert(self.id, Arc::clone(&handle_inner_arc));
|
||||
let replaced = per_timeline_state.insert(self.id, Arc::clone(&handle));
|
||||
assert!(replaced.is_none(), "some earlier code left a stale handle");
|
||||
match self.map.entry(key) {
|
||||
hash_map::Entry::Occupied(_o) => {
|
||||
@@ -481,7 +392,8 @@ impl<T: Types> Cache<T> {
|
||||
unreachable!()
|
||||
}
|
||||
hash_map::Entry::Vacant(v) => {
|
||||
v.insert(handle_weak);
|
||||
v.insert(Arc::downgrade(&handle));
|
||||
handle
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -489,62 +401,14 @@ impl<T: Types> Cache<T> {
|
||||
return Err(GetError::PerTimelineStateShutDown);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(handle)
|
||||
};
|
||||
Ok(Handle(handle))
|
||||
}
|
||||
Err(e) => Err(GetError::TenantManager(e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) enum HandleUpgradeError {
|
||||
ShutDown,
|
||||
}
|
||||
|
||||
impl<T: Types> WeakHandle<T> {
|
||||
pub(crate) fn upgrade(&self) -> Result<Handle<T>, HandleUpgradeError> {
|
||||
let Some(inner) = Weak::upgrade(&self.inner) else {
|
||||
return Err(HandleUpgradeError::ShutDown);
|
||||
};
|
||||
let lock_guard = inner.lock().expect("poisoned");
|
||||
match &*lock_guard {
|
||||
HandleInner::KeepingTimelineGateOpen {
|
||||
timeline,
|
||||
gate_guard,
|
||||
} => {
|
||||
let gate_guard = Arc::clone(gate_guard);
|
||||
let timeline = Arc::clone(timeline);
|
||||
drop(lock_guard);
|
||||
Ok(Handle {
|
||||
timeline,
|
||||
gate_guard,
|
||||
inner,
|
||||
})
|
||||
}
|
||||
HandleInner::ShutDown => Err(HandleUpgradeError::ShutDown),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn is_same_handle_as(&self, other: &WeakHandle<T>) -> bool {
|
||||
Weak::ptr_eq(&self.inner, &other.inner)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Types> std::ops::Deref for Handle<T> {
|
||||
type Target = T::Timeline;
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.timeline
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Types> Handle<T> {
|
||||
pub(crate) fn downgrade(&self) -> WeakHandle<T> {
|
||||
WeakHandle {
|
||||
inner: Arc::downgrade(&self.inner),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Types> PerTimelineState<T> {
|
||||
/// After this method returns, [`Cache::get`] will never again return a [`Handle`]
|
||||
/// to the [`Types::Timeline`] that embeds this per-timeline state.
|
||||
@@ -566,54 +430,43 @@ impl<T: Types> PerTimelineState<T> {
|
||||
trace!("already shut down");
|
||||
return;
|
||||
};
|
||||
for handle_inner_arc in handles.values() {
|
||||
for handle in handles.values() {
|
||||
// Make hits fail.
|
||||
let mut lock_guard = handle_inner_arc.lock().expect("poisoned");
|
||||
lock_guard.shutdown();
|
||||
handle.shut_down.store(true, Ordering::Relaxed);
|
||||
}
|
||||
drop(handles);
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Types> std::ops::Deref for Handle<T> {
|
||||
type Target = T::Timeline;
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0.timeline
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl<T: Types> Drop for HandleInner<T> {
|
||||
fn drop(&mut self) {
|
||||
trace!("HandleInner dropped");
|
||||
}
|
||||
}
|
||||
|
||||
// When dropping a [`Cache`], prune its handles in the [`PerTimelineState`] to break the reference cycle.
|
||||
impl<T: Types> Drop for Cache<T> {
|
||||
fn drop(&mut self) {
|
||||
for (
|
||||
_,
|
||||
WeakHandle {
|
||||
inner: handle_inner_weak,
|
||||
},
|
||||
) in self.map.drain()
|
||||
{
|
||||
let Some(handle_inner_arc) = handle_inner_weak.upgrade() else {
|
||||
continue;
|
||||
};
|
||||
let handle_timeline = handle_inner_arc
|
||||
// locking rules: drop lock before acquiring other lock below
|
||||
.lock()
|
||||
.expect("poisoned")
|
||||
.shutdown();
|
||||
let per_timeline_state = handle_timeline.per_timeline_state();
|
||||
let mut handles_lock_guard = per_timeline_state.handles.lock().expect("mutex poisoned");
|
||||
let Some(handles) = &mut *handles_lock_guard else {
|
||||
continue;
|
||||
};
|
||||
let Some(removed_handle_inner_arc) = handles.remove(&self.id) else {
|
||||
// There could have been a shutdown inbetween us upgrading the weak and locking the mutex.
|
||||
continue;
|
||||
};
|
||||
drop(handles_lock_guard); // locking rules: remember them when!
|
||||
assert!(Arc::ptr_eq(&removed_handle_inner_arc, &handle_inner_arc,));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Types> HandleInner<T> {
|
||||
fn shutdown(&mut self) -> Arc<T::Timeline> {
|
||||
match std::mem::replace(self, HandleInner::ShutDown) {
|
||||
HandleInner::KeepingTimelineGateOpen { timeline, .. } => timeline,
|
||||
HandleInner::ShutDown => {
|
||||
unreachable!("handles are only shut down once in their lifetime");
|
||||
for (_, weak) in self.map.drain() {
|
||||
if let Some(strong) = weak.upgrade() {
|
||||
// handle is still being kept alive in PerTimelineState
|
||||
let timeline = strong.timeline.per_timeline_state();
|
||||
let mut handles = timeline.handles.lock().expect("mutex poisoned");
|
||||
if let Some(handles) = &mut *handles {
|
||||
let Some(removed) = handles.remove(&self.id) else {
|
||||
// There could have been a shutdown inbetween us upgrading the weak and locking the mutex.
|
||||
continue;
|
||||
};
|
||||
assert!(Arc::ptr_eq(&removed, &strong));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -621,8 +474,6 @@ impl<T: Types> HandleInner<T> {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Weak;
|
||||
|
||||
use pageserver_api::{
|
||||
key::{rel_block_to_key, Key, DBDIR_KEY},
|
||||
models::ShardParameters,
|
||||
@@ -732,13 +583,39 @@ mod tests {
|
||||
//
|
||||
// fill the cache
|
||||
//
|
||||
assert_eq!(
|
||||
(Arc::strong_count(&shard0), Arc::weak_count(&shard0)),
|
||||
(2, 1),
|
||||
"strong: shard0, mgr; weak: myself"
|
||||
);
|
||||
|
||||
let handle: Handle<_> = cache
|
||||
.get(timeline_id, ShardSelector::Page(key), &mgr)
|
||||
.await
|
||||
.expect("we have the timeline");
|
||||
let handle_inner_weak = Arc::downgrade(&handle.0);
|
||||
assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
|
||||
assert_eq!(
|
||||
(
|
||||
Weak::strong_count(&handle_inner_weak),
|
||||
Weak::weak_count(&handle_inner_weak)
|
||||
),
|
||||
(2, 2),
|
||||
"strong: handle, per_timeline_state, weak: handle_inner_weak, cache"
|
||||
);
|
||||
assert_eq!(cache.map.len(), 1);
|
||||
|
||||
assert_eq!(
|
||||
(Arc::strong_count(&shard0), Arc::weak_count(&shard0)),
|
||||
(3, 1),
|
||||
"strong: handleinner(per_timeline_state), shard0, mgr; weak: myself"
|
||||
);
|
||||
drop(handle);
|
||||
assert_eq!(
|
||||
(Arc::strong_count(&shard0), Arc::weak_count(&shard0)),
|
||||
(3, 1),
|
||||
"strong: handleinner(per_timeline_state), shard0, mgr; weak: myself"
|
||||
);
|
||||
|
||||
//
|
||||
// demonstrate that Handle holds up gate closure
|
||||
@@ -763,11 +640,21 @@ mod tests {
|
||||
// SHUTDOWN
|
||||
shard0.per_timeline_state.shutdown(); // keeping handle alive across shutdown
|
||||
|
||||
assert_eq!(
|
||||
1,
|
||||
Weak::strong_count(&handle_inner_weak),
|
||||
"through local var handle"
|
||||
);
|
||||
assert_eq!(
|
||||
cache.map.len(),
|
||||
1,
|
||||
"this is an implementation detail but worth pointing out: we can't clear the cache from shutdown(), it's cleared on first access after"
|
||||
);
|
||||
assert_eq!(
|
||||
(Arc::strong_count(&shard0), Arc::weak_count(&shard0)),
|
||||
(3, 1),
|
||||
"strong: handleinner(via handle), shard0, mgr; weak: myself"
|
||||
);
|
||||
|
||||
// this handle is perfectly usable
|
||||
handle.getpage();
|
||||
@@ -791,6 +678,16 @@ mod tests {
|
||||
}
|
||||
|
||||
drop(handle);
|
||||
assert_eq!(
|
||||
0,
|
||||
Weak::strong_count(&handle_inner_weak),
|
||||
"the HandleInner destructor already ran"
|
||||
);
|
||||
assert_eq!(
|
||||
(Arc::strong_count(&shard0), Arc::weak_count(&shard0)),
|
||||
(2, 1),
|
||||
"strong: shard0, mgr; weak: myself"
|
||||
);
|
||||
|
||||
// closing gate succeeds after dropping handle
|
||||
tokio::select! {
|
||||
@@ -809,8 +706,10 @@ mod tests {
|
||||
assert_eq!(cache.map.len(), 0);
|
||||
|
||||
// ensure all refs to shard0 are gone and we're not leaking anything
|
||||
let myself = Weak::clone(&shard0.myself);
|
||||
drop(shard0);
|
||||
drop(mgr);
|
||||
assert_eq!(Weak::strong_count(&myself), 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -1049,11 +948,15 @@ mod tests {
|
||||
handle
|
||||
};
|
||||
handle.getpage();
|
||||
used_handles.push(Arc::downgrade(&handle.timeline));
|
||||
used_handles.push(Arc::downgrade(&handle.0));
|
||||
}
|
||||
|
||||
// No handles exist, thus gates are closed and don't require shutdown.
|
||||
// Thus the gate should close immediately, even without shutdown.
|
||||
// No handles exist, thus gates are closed and don't require shutdown
|
||||
assert!(used_handles
|
||||
.iter()
|
||||
.all(|weak| Weak::strong_count(weak) == 0));
|
||||
|
||||
// ... thus the gate should close immediately, even without shutdown
|
||||
tokio::select! {
|
||||
_ = shard0.gate.close() => { }
|
||||
_ = tokio::time::sleep(FOREVER) => {
|
||||
@@ -1061,75 +964,4 @@ mod tests {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn test_weak_handles() {
|
||||
crate::tenant::harness::setup_logging();
|
||||
let timeline_id = TimelineId::generate();
|
||||
let shard0 = Arc::new_cyclic(|myself| StubTimeline {
|
||||
gate: Default::default(),
|
||||
id: timeline_id,
|
||||
shard: ShardIdentity::unsharded(),
|
||||
per_timeline_state: PerTimelineState::default(),
|
||||
myself: myself.clone(),
|
||||
});
|
||||
let mgr = StubManager {
|
||||
shards: vec![shard0.clone()],
|
||||
};
|
||||
|
||||
let refcount_start = Arc::strong_count(&shard0);
|
||||
|
||||
let key = DBDIR_KEY;
|
||||
|
||||
let mut cache = Cache::<TestTypes>::default();
|
||||
|
||||
let handle = cache
|
||||
.get(timeline_id, ShardSelector::Page(key), &mgr)
|
||||
.await
|
||||
.expect("we have the timeline");
|
||||
assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
|
||||
|
||||
let weak_handle = handle.downgrade();
|
||||
|
||||
drop(handle);
|
||||
|
||||
let upgraded_handle = weak_handle.upgrade().ok().expect("we can upgrade it");
|
||||
|
||||
// Start shutdown
|
||||
shard0.per_timeline_state.shutdown();
|
||||
|
||||
// Upgrades during shutdown don't work, even if upgraded_handle exists.
|
||||
weak_handle
|
||||
.upgrade()
|
||||
.err()
|
||||
.expect("can't upgrade weak handle as soon as shutdown started");
|
||||
|
||||
// But upgraded_handle is still alive, so the gate won't close.
|
||||
tokio::select! {
|
||||
_ = shard0.gate.close() => {
|
||||
panic!("handle is keeping gate open");
|
||||
}
|
||||
_ = tokio::time::sleep(FOREVER) => { }
|
||||
}
|
||||
|
||||
// Drop the last handle.
|
||||
drop(upgraded_handle);
|
||||
|
||||
// The gate should close now, despite there still being a weak_handle.
|
||||
tokio::select! {
|
||||
_ = shard0.gate.close() => { }
|
||||
_ = tokio::time::sleep(FOREVER) => {
|
||||
panic!("only strong handle is dropped and we shut down per-timeline-state")
|
||||
}
|
||||
}
|
||||
|
||||
// The weak handle still can't be upgraded.
|
||||
weak_handle
|
||||
.upgrade()
|
||||
.err()
|
||||
.expect("still shouldn't be able to upgrade the weak handle");
|
||||
|
||||
// There should be no strong references to the timeline object except the one on "stack".
|
||||
assert_eq!(Arc::strong_count(&shard0), refcount_start);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -34,8 +34,6 @@ typedef enum
|
||||
T_NeonGetPageRequest,
|
||||
T_NeonDbSizeRequest,
|
||||
T_NeonGetSlruSegmentRequest,
|
||||
/* future tags above this line */
|
||||
T_NeonTestRequest = 99, /* only in cfg(feature = "testing") */
|
||||
|
||||
/* pagestore -> pagestore_client */
|
||||
T_NeonExistsResponse = 100,
|
||||
@@ -44,8 +42,6 @@ typedef enum
|
||||
T_NeonErrorResponse,
|
||||
T_NeonDbSizeResponse,
|
||||
T_NeonGetSlruSegmentResponse,
|
||||
/* future tags above this line */
|
||||
T_NeonTestResponse = 199, /* only in cfg(feature = "testing") */
|
||||
} NeonMessageTag;
|
||||
|
||||
typedef uint64 NeonRequestId;
|
||||
|
||||
@@ -14,7 +14,7 @@ use proxy::auth::backend::local::{LocalBackend, JWKS_ROLE_MAP};
|
||||
use proxy::auth::{self};
|
||||
use proxy::cancellation::CancellationHandlerMain;
|
||||
use proxy::config::{
|
||||
self, obfuscated_proxy_id, AuthenticationConfig, ComputeConfig, HttpConfig, ProxyConfig, RetryConfig
|
||||
self, AuthenticationConfig, ComputeConfig, HttpConfig, ProxyConfig, RetryConfig,
|
||||
};
|
||||
use proxy::control_plane::locks::ApiLocks;
|
||||
use proxy::control_plane::messages::{EndpointJwksResponse, JwksSettings};
|
||||
@@ -218,7 +218,6 @@ async fn main() -> anyhow::Result<()> {
|
||||
proxy::metrics::CancellationSource::Local,
|
||||
)),
|
||||
endpoint_rate_limiter,
|
||||
obfuscated_proxy_id(std::process::id(), "local"),
|
||||
);
|
||||
|
||||
match futures::future::select(pin!(maintenance_tasks.join_next()), pin!(task)).await {
|
||||
|
||||
@@ -10,7 +10,6 @@ use clap::Arg;
|
||||
use futures::future::Either;
|
||||
use futures::TryFutureExt;
|
||||
use itertools::Itertools;
|
||||
use proxy::config::obfuscated_proxy_id;
|
||||
use proxy::context::RequestContext;
|
||||
use proxy::metrics::{Metrics, ThreadPoolMetrics};
|
||||
use proxy::protocol2::ConnectionInfo;
|
||||
@@ -186,7 +185,6 @@ async fn task_main(
|
||||
},
|
||||
proxy::metrics::Protocol::SniRouter,
|
||||
"sni",
|
||||
obfuscated_proxy_id(std::process::id(), "sni-router"), // just a shim for context
|
||||
);
|
||||
handle_client(ctx, dest_suffix, tls_config, tls_server_end_point, socket).await
|
||||
}
|
||||
|
||||
@@ -10,7 +10,7 @@ use proxy::auth::backend::{AuthRateLimiter, ConsoleRedirectBackend, MaybeOwned};
|
||||
use proxy::cancellation::{CancelMap, CancellationHandler};
|
||||
use proxy::config::{
|
||||
self, remote_storage_from_toml, AuthenticationConfig, CacheOptions, ComputeConfig, HttpConfig,
|
||||
ProjectInfoCacheOptions, ProxyConfig, ProxyProtocolV2, obfuscated_proxy_id,
|
||||
ProjectInfoCacheOptions, ProxyConfig, ProxyProtocolV2,
|
||||
};
|
||||
use proxy::context::parquet::ParquetUploadArgs;
|
||||
use proxy::http::health_server::AppMetrics;
|
||||
@@ -396,8 +396,6 @@ async fn main() -> anyhow::Result<()> {
|
||||
None => None,
|
||||
};
|
||||
|
||||
let proxy_id: u16 = obfuscated_proxy_id(std::process::id(), &args.region);
|
||||
|
||||
let cancellation_handler = Arc::new(CancellationHandler::<
|
||||
Option<Arc<Mutex<RedisPublisherClient>>>,
|
||||
>::new(
|
||||
@@ -439,7 +437,6 @@ async fn main() -> anyhow::Result<()> {
|
||||
cancellation_token.clone(),
|
||||
cancellation_handler.clone(),
|
||||
endpoint_rate_limiter.clone(),
|
||||
proxy_id,
|
||||
));
|
||||
}
|
||||
|
||||
@@ -451,7 +448,6 @@ async fn main() -> anyhow::Result<()> {
|
||||
cancellation_token.clone(),
|
||||
cancellation_handler.clone(),
|
||||
endpoint_rate_limiter.clone(),
|
||||
proxy_id,
|
||||
));
|
||||
}
|
||||
}
|
||||
@@ -463,7 +459,6 @@ async fn main() -> anyhow::Result<()> {
|
||||
proxy_listener,
|
||||
cancellation_token.clone(),
|
||||
cancellation_handler.clone(),
|
||||
proxy_id,
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -80,24 +80,15 @@ impl ReportableError for CancelError {
|
||||
|
||||
impl<P: CancellationPublisher> CancellationHandler<P> {
|
||||
/// Run async action within an ephemeral session identified by [`CancelKeyData`].
|
||||
pub(crate) fn get_session(self: Arc<Self>, proxy_id: u16) -> Session<P> {
|
||||
pub(crate) fn get_session(self: Arc<Self>) -> Session<P> {
|
||||
// we intentionally generate a random "backend pid" and "secret key" here.
|
||||
// we use the corresponding u64 as an identifier for the
|
||||
// actual endpoint+pid+secret for postgres/pgbouncer.
|
||||
//
|
||||
// if we forwarded the backend_pid from postgres to the client, there would be a lot
|
||||
// of overlap between our computes as most pids are small (~100).
|
||||
|
||||
let key = loop {
|
||||
let key_rand: u64 = rand::random::<u64>() & 0x0000_ffff_ffff_ffff;
|
||||
|
||||
let backend_pid = ((proxy_id as u32) << 16) | ((key_rand >> 32) as u32) as u32;
|
||||
let cancel_key = (key_rand as u32) as i32;
|
||||
|
||||
let key = CancelKeyData {
|
||||
backend_pid: (backend_pid as i32),
|
||||
cancel_key,
|
||||
};
|
||||
let key = rand::random();
|
||||
|
||||
// Random key collisions are unlikely to happen here, but they're still possible,
|
||||
// which is why we have to take care not to rewrite an existing key.
|
||||
@@ -460,7 +451,7 @@ mod tests {
|
||||
CancellationSource::FromRedis,
|
||||
));
|
||||
|
||||
let session = cancellation_handler.clone().get_session(123);
|
||||
let session = cancellation_handler.clone().get_session();
|
||||
assert!(cancellation_handler.contains(&session));
|
||||
drop(session);
|
||||
// Check that the session has been dropped.
|
||||
|
||||
@@ -16,8 +16,6 @@ use crate::serverless::GlobalConnPoolOptions;
|
||||
pub use crate::tls::server_config::{configure_tls, TlsConfig};
|
||||
use crate::types::Host;
|
||||
|
||||
use sha2::{Digest, Sha256};
|
||||
|
||||
pub struct ProxyConfig {
|
||||
pub tls_config: Option<TlsConfig>,
|
||||
pub metric_collection: Option<MetricCollectionConfig>,
|
||||
@@ -418,23 +416,6 @@ impl FromStr for ConcurrencyLockOptions {
|
||||
}
|
||||
}
|
||||
|
||||
fn map_u32_to_u8(value: u32) -> u8 {
|
||||
((value * 31 + 17) % 255) as u8
|
||||
}
|
||||
|
||||
pub fn obfuscated_proxy_id(process_id: u32, region_id: &str) -> u16 {
|
||||
let process_id = map_u32_to_u8(process_id);
|
||||
let hash_region_id = Sha256::digest(region_id.as_bytes());
|
||||
|
||||
const BASE: u64 = 257;
|
||||
let combined_region = hash_region_id.iter().enumerate().fold(0u64, |acc, (i, &byte)| {
|
||||
(acc + (byte as u64 * BASE.pow(i as u32))) % 255
|
||||
});
|
||||
|
||||
let combined_region = (combined_region % 255) as u8;
|
||||
((combined_region as u16) * 257 + (process_id as u16)) % 65535
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -530,12 +511,4 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_proxy_id_obfuscation() {
|
||||
let process_id = 123;
|
||||
let region_id = "us-west-2";
|
||||
let proxy_id = obfuscated_proxy_id(process_id, region_id);
|
||||
assert_eq!(proxy_id, 0x1f7b);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,7 +25,6 @@ pub async fn task_main(
|
||||
listener: tokio::net::TcpListener,
|
||||
cancellation_token: CancellationToken,
|
||||
cancellation_handler: Arc<CancellationHandlerMain>,
|
||||
proxy_id: u16,
|
||||
) -> anyhow::Result<()> {
|
||||
scopeguard::defer! {
|
||||
info!("proxy has shut down");
|
||||
@@ -90,7 +89,6 @@ pub async fn task_main(
|
||||
peer_addr,
|
||||
crate::metrics::Protocol::Tcp,
|
||||
&config.region,
|
||||
proxy_id,
|
||||
);
|
||||
|
||||
let res = handle_client(
|
||||
@@ -224,7 +222,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
|
||||
node.cancel_closure
|
||||
.set_ip_allowlist(ip_allowlist.unwrap_or_default());
|
||||
let session = cancellation_handler.get_session(ctx.proxy_id());
|
||||
let session = cancellation_handler.get_session();
|
||||
prepare_client_connection(&node, &session, &mut stream).await?;
|
||||
|
||||
// Before proxy passing, forward to compute whatever data is left in the
|
||||
|
||||
@@ -46,7 +46,6 @@ struct RequestContextInner {
|
||||
pub(crate) protocol: Protocol,
|
||||
first_packet: chrono::DateTime<Utc>,
|
||||
region: &'static str,
|
||||
pub(crate) proxy_id: u16, // for generating cancel keys per region/process
|
||||
pub(crate) span: Span,
|
||||
|
||||
// filled in as they are discovered
|
||||
@@ -93,7 +92,6 @@ impl Clone for RequestContext {
|
||||
protocol: inner.protocol,
|
||||
first_packet: inner.first_packet,
|
||||
region: inner.region,
|
||||
proxy_id: inner.proxy_id,
|
||||
span: info_span!("background_task"),
|
||||
|
||||
project: inner.project,
|
||||
@@ -126,7 +124,6 @@ impl RequestContext {
|
||||
conn_info: ConnectionInfo,
|
||||
protocol: Protocol,
|
||||
region: &'static str,
|
||||
proxy_id: u16,
|
||||
) -> Self {
|
||||
// TODO: be careful with long lived spans
|
||||
let span = info_span!(
|
||||
@@ -144,7 +141,6 @@ impl RequestContext {
|
||||
protocol,
|
||||
first_packet: Utc::now(),
|
||||
region,
|
||||
proxy_id,
|
||||
span,
|
||||
|
||||
project: None,
|
||||
@@ -176,7 +172,7 @@ impl RequestContext {
|
||||
let ip = IpAddr::from([127, 0, 0, 1]);
|
||||
let addr = SocketAddr::new(ip, 5432);
|
||||
let conn_info = ConnectionInfo { addr, extra: None };
|
||||
RequestContext::new(Uuid::now_v7(), conn_info, Protocol::Tcp, "test", 1)
|
||||
RequestContext::new(Uuid::now_v7(), conn_info, Protocol::Tcp, "test")
|
||||
}
|
||||
|
||||
pub(crate) fn console_application_name(&self) -> String {
|
||||
@@ -338,10 +334,6 @@ impl RequestContext {
|
||||
.latency_timer
|
||||
.success();
|
||||
}
|
||||
|
||||
pub(crate) fn proxy_id(&self) -> u16 {
|
||||
self.0.try_lock().expect("should not deadlock").proxy_id
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct LatencyTimerPause<'a> {
|
||||
|
||||
@@ -59,7 +59,6 @@ pub async fn task_main(
|
||||
cancellation_token: CancellationToken,
|
||||
cancellation_handler: Arc<CancellationHandlerMain>,
|
||||
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
proxy_id: u16,
|
||||
) -> anyhow::Result<()> {
|
||||
scopeguard::defer! {
|
||||
info!("proxy has shut down");
|
||||
@@ -125,7 +124,6 @@ pub async fn task_main(
|
||||
conn_info,
|
||||
crate::metrics::Protocol::Tcp,
|
||||
&config.region,
|
||||
proxy_id,
|
||||
);
|
||||
|
||||
let res = handle_client(
|
||||
@@ -360,7 +358,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
|
||||
node.cancel_closure
|
||||
.set_ip_allowlist(ip_allowlist.unwrap_or_default());
|
||||
let session = cancellation_handler.get_session(ctx.proxy_id());
|
||||
let session = cancellation_handler.get_session();
|
||||
prepare_client_connection(&node, &session, &mut stream).await?;
|
||||
|
||||
// Before proxy passing, forward to compute whatever data is left in the
|
||||
|
||||
@@ -63,7 +63,6 @@ pub async fn task_main(
|
||||
cancellation_token: CancellationToken,
|
||||
cancellation_handler: Arc<CancellationHandlerMain>,
|
||||
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
proxy_id: u16,
|
||||
) -> anyhow::Result<()> {
|
||||
scopeguard::defer! {
|
||||
info!("websocket server has shut down");
|
||||
@@ -199,7 +198,6 @@ pub async fn task_main(
|
||||
conn,
|
||||
conn_info,
|
||||
session_id,
|
||||
proxy_id,
|
||||
))
|
||||
.await;
|
||||
}
|
||||
@@ -326,7 +324,6 @@ async fn connection_handler(
|
||||
conn: AsyncRW,
|
||||
conn_info: ConnectionInfo,
|
||||
session_id: uuid::Uuid,
|
||||
proxy_id: u16,
|
||||
) {
|
||||
let session_id = AtomicTake::new(session_id);
|
||||
|
||||
@@ -374,7 +371,6 @@ async fn connection_handler(
|
||||
http_request_token,
|
||||
endpoint_rate_limiter.clone(),
|
||||
cancellations,
|
||||
proxy_id,
|
||||
)
|
||||
.in_current_span()
|
||||
.map_ok_or_else(api_error_into_response, |r| r),
|
||||
@@ -423,7 +419,6 @@ async fn request_handler(
|
||||
http_cancellation_token: CancellationToken,
|
||||
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
cancellations: TaskTracker,
|
||||
proxy_id: u16,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, ApiError> {
|
||||
let host = request
|
||||
.headers()
|
||||
@@ -441,7 +436,6 @@ async fn request_handler(
|
||||
conn_info,
|
||||
crate::metrics::Protocol::Ws,
|
||||
&config.region,
|
||||
proxy_id,
|
||||
);
|
||||
|
||||
let span = ctx.span();
|
||||
@@ -479,7 +473,6 @@ async fn request_handler(
|
||||
conn_info,
|
||||
crate::metrics::Protocol::Http,
|
||||
&config.region,
|
||||
proxy_id,
|
||||
);
|
||||
let span = ctx.span();
|
||||
|
||||
|
||||
@@ -102,11 +102,6 @@ impl Client {
|
||||
self.get(&uri).await
|
||||
}
|
||||
|
||||
pub async fn utilization(&self) -> Result<reqwest::Response> {
|
||||
let uri = format!("{}/v1/utilization/", self.mgmt_api_endpoint);
|
||||
self.get(&uri).await
|
||||
}
|
||||
|
||||
async fn get<U: IntoUrl>(&self, uri: U) -> Result<reqwest::Response> {
|
||||
self.request(Method::GET, uri, ()).await
|
||||
}
|
||||
|
||||
@@ -127,13 +127,6 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn utilization_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permission(&request, None)?;
|
||||
let global_timelines = get_global_timelines(&request);
|
||||
let utilization = global_timelines.get_timeline_counts();
|
||||
json_response(StatusCode::OK, utilization)
|
||||
}
|
||||
|
||||
/// List all (not deleted) timelines.
|
||||
/// Note: it is possible to do the same with debug_dump.
|
||||
async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
@@ -627,7 +620,6 @@ pub fn make_router(
|
||||
failpoints_handler(r, cancel).await
|
||||
})
|
||||
})
|
||||
.get("/v1/uzilization", |r| request_span(r, utilization_handler))
|
||||
.delete("/v1/tenant/:tenant_id", |r| {
|
||||
request_span(r, tenant_delete_handler)
|
||||
})
|
||||
|
||||
@@ -13,7 +13,6 @@ use anyhow::{bail, Context, Result};
|
||||
use camino::Utf8PathBuf;
|
||||
use camino_tempfile::Utf8TempDir;
|
||||
use safekeeper_api::membership::Configuration;
|
||||
use safekeeper_api::models::SafekeeperUtilization;
|
||||
use safekeeper_api::ServerInfo;
|
||||
use serde::Serialize;
|
||||
use std::collections::HashMap;
|
||||
@@ -417,20 +416,6 @@ impl GlobalTimelines {
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Returns statistics about timeline counts
|
||||
pub fn get_timeline_counts(&self) -> SafekeeperUtilization {
|
||||
let global_lock = self.state.lock().unwrap();
|
||||
let timeline_count = global_lock
|
||||
.timelines
|
||||
.values()
|
||||
.filter(|t| match t {
|
||||
GlobalMapTimeline::CreationInProgress => false,
|
||||
GlobalMapTimeline::Timeline(t) => !t.is_cancelled(),
|
||||
})
|
||||
.count() as u64;
|
||||
SafekeeperUtilization { timeline_count }
|
||||
}
|
||||
|
||||
/// Returns all timelines belonging to a given tenant. Used for deleting all timelines of a tenant,
|
||||
/// and that's why it can return cancelled timelines, to retry deleting them.
|
||||
fn get_all_for_tenant(&self, tenant_id: TenantId) -> Vec<Arc<Timeline>> {
|
||||
|
||||
@@ -1,17 +1,11 @@
|
||||
use std::{
|
||||
collections::{BTreeMap, HashMap},
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use pageserver_api::controller_api::ShardSchedulingPolicy;
|
||||
use rand::seq::SliceRandom;
|
||||
use rand::thread_rng;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::id::NodeId;
|
||||
use utils::shard::TenantShardId;
|
||||
|
||||
use super::{Node, Scheduler, Service, TenantShard};
|
||||
use super::Service;
|
||||
|
||||
pub struct ChaosInjector {
|
||||
service: Arc<Service>,
|
||||
@@ -41,86 +35,50 @@ impl ChaosInjector {
|
||||
}
|
||||
}
|
||||
|
||||
/// If a shard has a secondary and attached location, then re-assign the secondary to be
|
||||
/// attached and the attached to be secondary.
|
||||
///
|
||||
/// Only modifies tenants if they're in Active scheduling policy.
|
||||
fn maybe_migrate_to_secondary(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
nodes: &Arc<HashMap<NodeId, Node>>,
|
||||
tenants: &mut BTreeMap<TenantShardId, TenantShard>,
|
||||
scheduler: &mut Scheduler,
|
||||
) {
|
||||
let shard = tenants
|
||||
.get_mut(&tenant_shard_id)
|
||||
.expect("Held lock between choosing ID and this get");
|
||||
|
||||
if !matches!(shard.get_scheduling_policy(), ShardSchedulingPolicy::Active) {
|
||||
// Skip non-active scheduling policies, so that a shard with a policy like Pause can
|
||||
// be pinned without being disrupted by us.
|
||||
tracing::info!(
|
||||
"Skipping shard {tenant_shard_id}: scheduling policy is {:?}",
|
||||
shard.get_scheduling_policy()
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// Pick a secondary to promote
|
||||
let Some(new_location) = shard
|
||||
.intent
|
||||
.get_secondary()
|
||||
.choose(&mut thread_rng())
|
||||
.cloned()
|
||||
else {
|
||||
tracing::info!(
|
||||
"Skipping shard {tenant_shard_id}: no secondary location, can't migrate"
|
||||
);
|
||||
return;
|
||||
};
|
||||
|
||||
let Some(old_location) = *shard.intent.get_attached() else {
|
||||
tracing::info!("Skipping shard {tenant_shard_id}: currently has no attached location");
|
||||
return;
|
||||
};
|
||||
|
||||
tracing::info!("Injecting chaos: migrate {tenant_shard_id} {old_location}->{new_location}");
|
||||
|
||||
shard.intent.demote_attached(scheduler, old_location);
|
||||
shard.intent.promote_attached(scheduler, new_location);
|
||||
self.service.maybe_reconcile_shard(shard, nodes);
|
||||
}
|
||||
|
||||
async fn inject_chaos(&mut self) {
|
||||
// Pick some shards to interfere with
|
||||
let batch_size = 128;
|
||||
let mut inner = self.service.inner.write().unwrap();
|
||||
let (nodes, tenants, scheduler) = inner.parts_mut();
|
||||
let tenant_ids = tenants.keys().cloned().collect::<Vec<_>>();
|
||||
|
||||
// Prefer to migrate tenants that are currently outside their home AZ. This avoids the chaos injector
|
||||
// continuously pushing tenants outside their home AZ: instead, we'll tend to cycle between picking some
|
||||
// random tenants to move, and then on next chaos iteration moving them back, then picking some new
|
||||
// random tenants on the next iteration.
|
||||
let mut victims = Vec::with_capacity(batch_size);
|
||||
for shard in tenants.values() {
|
||||
if shard.is_attached_outside_preferred_az(nodes) {
|
||||
victims.push(shard.tenant_shard_id);
|
||||
}
|
||||
|
||||
if victims.len() >= batch_size {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let choose_random = batch_size.saturating_sub(victims.len());
|
||||
tracing::info!("Injecting chaos: found {} shards to migrate back to home AZ, picking {choose_random} random shards to migrate", victims.len());
|
||||
|
||||
let random_victims = tenant_ids.choose_multiple(&mut thread_rng(), choose_random);
|
||||
victims.extend(random_victims);
|
||||
let victims = tenant_ids.choose_multiple(&mut thread_rng(), batch_size);
|
||||
|
||||
for victim in victims {
|
||||
self.maybe_migrate_to_secondary(victim, nodes, tenants, scheduler);
|
||||
let shard = tenants
|
||||
.get_mut(victim)
|
||||
.expect("Held lock between choosing ID and this get");
|
||||
|
||||
if !matches!(shard.get_scheduling_policy(), ShardSchedulingPolicy::Active) {
|
||||
// Skip non-active scheduling policies, so that a shard with a policy like Pause can
|
||||
// be pinned without being disrupted by us.
|
||||
tracing::info!(
|
||||
"Skipping shard {victim}: scheduling policy is {:?}",
|
||||
shard.get_scheduling_policy()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Pick a secondary to promote
|
||||
let Some(new_location) = shard
|
||||
.intent
|
||||
.get_secondary()
|
||||
.choose(&mut thread_rng())
|
||||
.cloned()
|
||||
else {
|
||||
tracing::info!("Skipping shard {victim}: no secondary location, can't migrate");
|
||||
continue;
|
||||
};
|
||||
|
||||
let Some(old_location) = *shard.intent.get_attached() else {
|
||||
tracing::info!("Skipping shard {victim}: currently has no attached location");
|
||||
continue;
|
||||
};
|
||||
|
||||
tracing::info!("Injecting chaos: migrate {victim} {old_location}->{new_location}");
|
||||
|
||||
shard.intent.demote_attached(scheduler, old_location);
|
||||
shard.intent.promote_attached(scheduler, new_location);
|
||||
self.service.maybe_reconcile_shard(shard, nodes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1793,23 +1793,6 @@ impl TenantShard {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true if the tenant shard is attached to a node that is outside the preferred AZ.
|
||||
///
|
||||
/// If the shard does not have a preferred AZ, returns false.
|
||||
pub(crate) fn is_attached_outside_preferred_az(&self, nodes: &HashMap<NodeId, Node>) -> bool {
|
||||
self.intent
|
||||
.get_attached()
|
||||
.map(|node_id| {
|
||||
Some(
|
||||
nodes
|
||||
.get(&node_id)
|
||||
.expect("referenced node exists")
|
||||
.get_availability_zone_id(),
|
||||
) == self.intent.preferred_az_id.as_ref()
|
||||
})
|
||||
.unwrap_or(false)
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for TenantShard {
|
||||
|
||||
@@ -1,60 +0,0 @@
|
||||
# NB: there are benchmarks that double-serve as tests inside the `performance` directory.
|
||||
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
|
||||
|
||||
@pytest.mark.timeout(30) # test takes <20s if pageserver impl is correct
|
||||
@pytest.mark.parametrize("kind", ["pageserver-stop", "tenant-detach"])
|
||||
def test_slow_flush(neon_env_builder: NeonEnvBuilder, neon_binpath: Path, kind: str):
|
||||
def patch_pageserver_toml(config):
|
||||
config["page_service_pipelining"] = {
|
||||
"mode": "pipelined",
|
||||
"max_batch_size": 32,
|
||||
"execution": "concurrent-futures",
|
||||
}
|
||||
|
||||
neon_env_builder.pageserver_config_override = patch_pageserver_toml
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
log.info("make flush appear slow")
|
||||
|
||||
log.info("sending requests until pageserver accepts no more")
|
||||
# TODO: extract this into a helper, like subprocess_capture,
|
||||
# so that we capture the stderr from the helper somewhere.
|
||||
child = subprocess.Popen(
|
||||
[
|
||||
neon_binpath / "test_helper_slow_client_reads",
|
||||
env.pageserver.connstr(),
|
||||
str(env.initial_tenant),
|
||||
str(env.initial_timeline),
|
||||
],
|
||||
bufsize=0, # unbuffered
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
)
|
||||
assert child.stdout is not None
|
||||
buf = child.stdout.read(1)
|
||||
if len(buf) != 1:
|
||||
raise Exception("unexpected EOF")
|
||||
if buf != b"R":
|
||||
raise Exception(f"unexpected data: {buf!r}")
|
||||
log.info("helper reports pageserver accepts no more requests")
|
||||
log.info(
|
||||
"assuming pageserver connection handle is in a state where TCP has backpressured pageserver=>client response flush() into userspace"
|
||||
)
|
||||
|
||||
if kind == "pageserver-stop":
|
||||
log.info("try to shut down the pageserver cleanly")
|
||||
env.pageserver.stop()
|
||||
elif kind == "tenant-detach":
|
||||
log.info("try to shut down the tenant")
|
||||
env.pageserver.tenant_detach(env.initial_tenant)
|
||||
else:
|
||||
raise ValueError(f"unexpected kind: {kind}")
|
||||
|
||||
log.info("shutdown did not time out, test passed")
|
||||
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: d674efd776...5f3ef16569
2
vendor/revisions.json
vendored
2
vendor/revisions.json
vendored
@@ -5,7 +5,7 @@
|
||||
],
|
||||
"v16": [
|
||||
"16.6",
|
||||
"d674efd776f59d78e8fa1535bd2f95c3e6984fca"
|
||||
"5f3ef1656925c10c90c8fecfb668048d819912e3"
|
||||
],
|
||||
"v15": [
|
||||
"15.10",
|
||||
|
||||
Reference in New Issue
Block a user