Compare commits

...

17 Commits

Author SHA1 Message Date
Christian Schwarz
173f18832c fixup 2025-01-15 00:24:59 +01:00
Christian Schwarz
dedd524d7e refinements 2025-01-15 00:21:28 +01:00
Christian Schwarz
a8f9b564be fix cd pageserver && cargo clippy --features testing build 2025-01-14 23:50:22 +01:00
Christian Schwarz
5450e54dab bump ci 2025-01-14 22:47:16 +01:00
Christian Schwarz
53b05c4ba0 cleanups to make CI pass (well, fail because the bug isn't fixed yet) 2025-01-14 22:45:09 +01:00
Christian Schwarz
1f7d173235 Merge remote-tracking branch 'origin/main' into problame/hung-shutdown/demo-hypothesis 2025-01-14 22:33:20 +01:00
Christian Schwarz
8454e19a0f address warnings and such 2025-01-14 22:28:08 +01:00
Christian Schwarz
45e08d0aa5 it repros 2025-01-14 22:16:27 +01:00
Christian Schwarz
9a02bc0cfd try to repro root cause hypothesis for https://github.com/neondatabase/neon/issues/10309
This approach here doesn't work because it slows down all the responses.

the workload() thread gets stuck in auth, prob with zero pipeline depth

  0x00007fa28fe48e63 in epoll_wait () from /lib/x86_64-linux-gnu/libc.so.6
  (gdb) bt
  #0  0x00007fa28fe48e63 in epoll_wait () from /lib/x86_64-linux-gnu/libc.so.6
  #1  0x0000561a285bf44e in WaitEventSetWaitBlock (set=0x561a292723e8, cur_timeout=9999, occurred_events=0x7ffd1f11c970, nevents=1)
      at /home/christian/src/neon//vendor/postgres-v16/src/backend/storage/ipc/latch.c:1535
  #2  0x0000561a285bf338 in WaitEventSetWait (set=0x561a292723e8, timeout=9999, occurred_events=0x7ffd1f11c970, nevents=1, wait_event_info=117440512)
      at /home/christian/src/neon//vendor/postgres-v16/src/backend/storage/ipc/latch.c:1481
  #3  0x00007fa2904a7345 in call_PQgetCopyData (shard_no=0, buffer=0x7ffd1f11cad0) at /home/christian/src/neon//pgxn/neon/libpagestore.c:703
  #4  0x00007fa2904a7aec in pageserver_receive (shard_no=0) at /home/christian/src/neon//pgxn/neon/libpagestore.c:899
  #5  0x00007fa2904af471 in prefetch_read (slot=0x561a292863b0) at /home/christian/src/neon//pgxn/neon/pagestore_smgr.c:644
  #6  0x00007fa2904af26b in prefetch_wait_for (ring_index=0) at /home/christian/src/neon//pgxn/neon/pagestore_smgr.c:596
  #7  0x00007fa2904b489d in neon_read_at_lsnv (rinfo=..., forkNum=MAIN_FORKNUM, base_blockno=0, request_lsns=0x7ffd1f11cd60, buffers=0x7ffd1f11cd30, nblocks=1, mask=0x0)
      at /home/christian/src/neon//pgxn/neon/pagestore_smgr.c:3024
  #8  0x00007fa2904b4f34 in neon_read_at_lsn (rinfo=..., forkNum=MAIN_FORKNUM, blkno=0, request_lsns=..., buffer=0x7fa28b969000) at /home/christian/src/neon//pgxn/neon/pagestore_smgr.c:3104
  #9  0x00007fa2904b515d in neon_read (reln=0x561a292ef448, forkNum=MAIN_FORKNUM, blkno=0, buffer=0x7fa28b969000) at /home/christian/src/neon//pgxn/neon/pagestore_smgr.c:3146
  #10 0x0000561a285f1ed5 in smgrread (reln=0x561a292ef448, forknum=MAIN_FORKNUM, blocknum=0, buffer=0x7fa28b969000) at /home/christian/src/neon//vendor/postgres-v16/src/backend/storage/smgr/smgr.c:567
  #11 0x0000561a285a528a in ReadBuffer_common (smgr=0x561a292ef448, relpersistence=112 'p', forkNum=MAIN_FORKNUM, blockNum=0, mode=RBM_NORMAL, strategy=0x0, hit=0x7ffd1f11cf1b)
      at /home/christian/src/neon//vendor/postgres-v16/src/backend/storage/buffer/bufmgr.c:1134
  #12 0x0000561a285a47e3 in ReadBufferExtended (reln=0x7fa28ce1c888, forkNum=MAIN_FORKNUM, blockNum=0, mode=RBM_NORMAL, strategy=0x0)
      at /home/christian/src/neon//vendor/postgres-v16/src/backend/storage/buffer/bufmgr.c:781
  #13 0x0000561a285a46ad in ReadBuffer (reln=0x7fa28ce1c888, blockNum=0) at /home/christian/src/neon//vendor/postgres-v16/src/backend/storage/buffer/bufmgr.c:715
  #14 0x0000561a2811d511 in _bt_getbuf (rel=0x7fa28ce1c888, blkno=0, access=1) at /home/christian/src/neon//vendor/postgres-v16/src/backend/access/nbtree/nbtpage.c:852
  #15 0x0000561a2811d1b2 in _bt_metaversion (rel=0x7fa28ce1c888, heapkeyspace=0x7ffd1f11d9f0, allequalimage=0x7ffd1f11d9f1) at /home/christian/src/neon//vendor/postgres-v16/src/backend/access/nbtree/nbtpage.c:747
  #16 0x0000561a28126220 in _bt_first (scan=0x561a292d0348, dir=ForwardScanDirection) at /home/christian/src/neon//vendor/postgres-v16/src/backend/access/nbtree/nbtsearch.c:1465
  #17 0x0000561a28121a07 in btgettuple (scan=0x561a292d0348, dir=ForwardScanDirection) at /home/christian/src/neon//vendor/postgres-v16/src/backend/access/nbtree/nbtree.c:246
  #18 0x0000561a28111afa in index_getnext_tid (scan=0x561a292d0348, direction=ForwardScanDirection) at /home/christian/src/neon//vendor/postgres-v16/src/backend/access/index/indexam.c:583
  #19 0x0000561a28111d14 in index_getnext_slot (scan=0x561a292d0348, direction=ForwardScanDirection, slot=0x561a292d01a8) at /home/christian/src/neon//vendor/postgres-v16/src/backend/access/index/indexam.c:675
  #20 0x0000561a2810fbcc in systable_getnext (sysscan=0x561a292d0158) at /home/christian/src/neon//vendor/postgres-v16/src/backend/access/index/genam.c:512
  #21 0x0000561a287a1ee1 in SearchCatCacheMiss (cache=0x561a292a0f80, nkeys=1, hashValue=3028463561, hashIndex=1, v1=94670359561576, v2=0, v3=0, v4=0)
      at /home/christian/src/neon//vendor/postgres-v16/src/backend/utils/cache/catcache.c:1440
  #22 0x0000561a287a1d8a in SearchCatCacheInternal (cache=0x561a292a0f80, nkeys=1, v1=94670359561576, v2=0, v3=0, v4=0) at /home/christian/src/neon//vendor/postgres-v16/src/backend/utils/cache/catcache.c:1360
  #23 0x0000561a287a1a4f in SearchCatCache (cache=0x561a292a0f80, v1=94670359561576, v2=0, v3=0, v4=0) at /home/christian/src/neon//vendor/postgres-v16/src/backend/utils/cache/catcache.c:1214
  #24 0x0000561a287be060 in SearchSysCache (cacheId=10, key1=94670359561576, key2=0, key3=0, key4=0) at /home/christian/src/neon//vendor/postgres-v16/src/backend/utils/cache/syscache.c:817
  #25 0x0000561a287be66f in GetSysCacheOid (cacheId=10, oidcol=1, key1=94670359561576, key2=0, key3=0, key4=0) at /home/christian/src/neon//vendor/postgres-v16/src/backend/utils/cache/syscache.c:1055
  #26 0x0000561a286319a5 in get_role_oid (rolname=0x561a29270568 "cloud_admin", missing_ok=true) at /home/christian/src/neon//vendor/postgres-v16/src/backend/utils/adt/acl.c:5251
  #27 0x0000561a283d42ca in check_hba (port=0x561a29268de0) at /home/christian/src/neon//vendor/postgres-v16/src/backend/libpq/hba.c:2493
  #28 0x0000561a283d5537 in hba_getauthmethod (port=0x561a29268de0) at /home/christian/src/neon//vendor/postgres-v16/src/backend/libpq/hba.c:3067
  #29 0x0000561a283c6fd7 in ClientAuthentication (port=0x561a29268de0) at /home/christian/src/neon//vendor/postgres-v16/src/backend/libpq/auth.c:395
  #30 0x0000561a287dc943 in PerformAuthentication (port=0x561a29268de0) at /home/christian/src/neon//vendor/postgres-v16/src/backend/utils/init/postinit.c:247
  #31 0x0000561a287dd9cd in InitPostgres (in_dbname=0x561a29270588 "postgres", dboid=0, username=0x561a29270568 "cloud_admin", useroid=0, load_session_libraries=true, override_allow_connections=false,
      out_dbname=0x0) at /home/christian/src/neon//vendor/postgres-v16/src/backend/utils/init/postinit.c:929
  #32 0x0000561a285fa10b in PostgresMain (dbname=0x561a29270588 "postgres", username=0x561a29270568 "cloud_admin") at /home/christian/src/neon//vendor/postgres-v16/src/backend/tcop/postgres.c:4293
  #33 0x0000561a28524ce4 in BackendRun (port=0x561a29268de0) at /home/christian/src/neon//vendor/postgres-v16/src/backend/postmaster/postmaster.c:4465
  #34 0x0000561a285245da in BackendStartup (port=0x561a29268de0) at /home/christian/src/neon//vendor/postgres-v16/src/backend/postmaster/postmaster.c:4193
  #35 0x0000561a285209c4 in ServerLoop () at /home/christian/src/neon//vendor/postgres-v16/src/backend/postmaster/postmaster.c:1782
  #36 0x0000561a2852030f in PostmasterMain (argc=3, argv=0x561a291c5fc0) at /home/christian/src/neon//vendor/postgres-v16/src/backend/postmaster/postmaster.c:1466
  #37 0x0000561a283dd987 in main (argc=3, argv=0x561a291c5fc0) at /home/christian/src/neon//vendor/postgres-v16/src/backend/main/main.c:238
2025-01-14 20:42:01 +01:00
Christian Schwarz
9e1cd986d7 address "why mut" nits; https://github.com/neondatabase/neon/pull/10353#discussion_r1913685991 https://github.com/neondatabase/neon/pull/10353#discussion_r1913683065 https://github.com/neondatabase/neon/pull/10353#discussion_r1913683392 2025-01-14 15:30:33 +01:00
Christian Schwarz
47544dcc0b simplify SmgrOpTimerState variant names, and add some doc comments; https://github.com/neondatabase/neon/pull/10353#discussion_r1913676569 and https://github.com/neondatabase/neon/pull/10353#discussion_r1913676824 2025-01-14 15:28:09 +01:00
Christian Schwarz
ee851d1127 Merge remote-tracking branch 'origin/main' into problame/throttle-before-batching 2025-01-10 20:38:08 +01:00
Christian Schwarz
d6a2b62cfb grand refactor of SmgrOpTimer states 2025-01-10 20:29:44 +01:00
Christian Schwarz
ad5120197c self-review 2025-01-10 20:28:20 +01:00
Christian Schwarz
4d496a29c2 following up to the last commit, the observation points that we use to calculate the various latency metrics are different, adjust for that 2025-01-10 20:26:09 +01:00
Christian Schwarz
8793e28ccb throttling cancel-sensitivity 2025-01-10 20:25:24 +01:00
Christian Schwarz
aa8da1e621 move throttle into pagestream_read_message 2025-01-10 15:35:07 +01:00
8 changed files with 485 additions and 24 deletions

View File

@@ -1400,6 +1400,8 @@ pub enum PagestreamFeMessage {
GetPage(PagestreamGetPageRequest),
DbSize(PagestreamDbSizeRequest),
GetSlruSegment(PagestreamGetSlruSegmentRequest),
#[cfg(feature = "testing")]
Test(PagestreamTestRequest),
}
// Wrapped in libpq CopyData
@@ -1411,6 +1413,8 @@ pub enum PagestreamBeMessage {
Error(PagestreamErrorResponse),
DbSize(PagestreamDbSizeResponse),
GetSlruSegment(PagestreamGetSlruSegmentResponse),
#[cfg(feature = "testing")]
Test(PagestreamTestResponse),
}
// Keep in sync with `pagestore_client.h`
@@ -1422,6 +1426,9 @@ enum PagestreamBeMessageTag {
Error = 103,
DbSize = 104,
GetSlruSegment = 105,
/// Test message discrimimant is unstable
#[cfg(feature = "testing")]
Test = 106,
}
impl TryFrom<u8> for PagestreamBeMessageTag {
type Error = u8;
@@ -1433,6 +1440,8 @@ impl TryFrom<u8> for PagestreamBeMessageTag {
103 => Ok(PagestreamBeMessageTag::Error),
104 => Ok(PagestreamBeMessageTag::DbSize),
105 => Ok(PagestreamBeMessageTag::GetSlruSegment),
#[cfg(feature = "testing")]
106 => Ok(PagestreamBeMessageTag::Test),
_ => Err(value),
}
}
@@ -1550,6 +1559,20 @@ pub struct PagestreamDbSizeResponse {
pub db_size: i64,
}
#[cfg(feature = "testing")]
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct PagestreamTestRequest {
pub hdr: PagestreamRequest,
pub batch_key: u64,
pub message: String,
}
#[cfg(feature = "testing")]
#[derive(Debug)]
pub struct PagestreamTestResponse {
pub req: PagestreamTestRequest,
}
// This is a cut-down version of TenantHistorySize from the pageserver crate, omitting fields
// that require pageserver-internal types. It is sufficient to get the total size.
#[derive(Serialize, Deserialize, Debug)]
@@ -1618,6 +1641,17 @@ impl PagestreamFeMessage {
bytes.put_u8(req.kind);
bytes.put_u32(req.segno);
}
#[cfg(feature = "testing")]
Self::Test(req) => {
bytes.put_u8(5);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
bytes.put_u64(req.batch_key);
let message = req.message.as_bytes();
bytes.put_u64(message.len() as u64);
bytes.put_slice(message);
}
}
bytes.into()
@@ -1705,6 +1739,21 @@ impl PagestreamFeMessage {
segno: body.read_u32::<BigEndian>()?,
},
)),
#[cfg(feature = "testing")]
5 => Ok(PagestreamFeMessage::Test(PagestreamTestRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
batch_key: body.read_u64::<BigEndian>()?,
message: {
let len = body.read_u64::<BigEndian>()?;
let mut buf = vec![0; len as usize];
body.read_exact(&mut buf)?;
String::from_utf8(buf)?
},
})),
_ => bail!("unknown smgr message tag: {:?}", msg_tag),
}
}
@@ -1748,6 +1797,15 @@ impl PagestreamBeMessage {
bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
bytes.put(&resp.segment[..]);
}
#[cfg(feature = "testing")]
Self::Test(resp) => {
bytes.put_u8(Tag::Test as u8);
bytes.put_u64(resp.req.batch_key);
let message = resp.req.message.as_bytes();
bytes.put_u64(message.len() as u64);
bytes.put_slice(message);
}
}
}
PagestreamProtocolVersion::V3 => {
@@ -1816,6 +1874,18 @@ impl PagestreamBeMessage {
bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
bytes.put(&resp.segment[..]);
}
#[cfg(feature = "testing")]
Self::Test(resp) => {
bytes.put_u8(Tag::Test as u8);
bytes.put_u64(resp.req.hdr.reqid);
bytes.put_u64(resp.req.hdr.request_lsn.0);
bytes.put_u64(resp.req.hdr.not_modified_since.0);
bytes.put_u64(resp.req.batch_key);
let message = resp.req.message.as_bytes();
bytes.put_u64(message.len() as u64);
bytes.put_slice(message);
}
}
}
}
@@ -1958,6 +2028,28 @@ impl PagestreamBeMessage {
segment: segment.into(),
})
}
#[cfg(feature = "testing")]
Tag::Test => {
let reqid = buf.read_u64::<BigEndian>()?;
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
let batch_key = buf.read_u64::<BigEndian>()?;
let len = buf.read_u64::<BigEndian>()?;
let mut msg = vec![0; len as usize];
buf.read_exact(&mut msg)?;
let message = String::from_utf8(msg)?;
Self::Test(PagestreamTestResponse {
req: PagestreamTestRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
batch_key,
message,
},
})
}
};
let remaining = buf.into_inner();
if !remaining.is_empty() {
@@ -1977,6 +2069,8 @@ impl PagestreamBeMessage {
Self::Error(_) => "Error",
Self::DbSize(_) => "DbSize",
Self::GetSlruSegment(_) => "GetSlruSegment",
#[cfg(feature = "testing")]
Self::Test(_) => "Test",
}
}
}

View File

@@ -8,7 +8,7 @@ license.workspace = true
default = []
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
# which adds some runtime cost to run tests on outage conditions
testing = ["fail/failpoints", "pageserver_api/testing", "wal_decoder/testing"]
testing = ["fail/failpoints", "pageserver_api/testing", "wal_decoder/testing", "pageserver_client/testing"]
[dependencies]
anyhow.workspace = true
@@ -113,3 +113,7 @@ harness = false
[[bench]]
name = "upload_queue"
harness = false
[[bin]]
name = "test_helper_slow_client_reads"
required-features = [ "testing" ]

View File

@@ -4,6 +4,9 @@ version = "0.1.0"
edition.workspace = true
license.workspace = true
[features]
testing = [ "pageserver_api/testing" ]
[dependencies]
pageserver_api.workspace = true
thiserror.workspace = true

View File

@@ -1,6 +1,9 @@
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use futures::SinkExt;
use futures::{
stream::{SplitSink, SplitStream},
SinkExt, StreamExt,
};
use pageserver_api::{
models::{
PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest,
@@ -10,7 +13,6 @@ use pageserver_api::{
};
use tokio::task::JoinHandle;
use tokio_postgres::CopyOutStream;
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
use utils::{
id::{TenantId, TimelineId},
@@ -62,15 +64,28 @@ impl Client {
.client
.copy_both_simple(&format!("pagestream_v3 {tenant_id} {timeline_id}"))
.await?;
let (sink, stream) = copy_both.split(); // TODO: actually support splitting of the CopyBothDuplex so the lock inside this split adaptor goes away.
let Client {
cancel_on_client_drop,
conn_task,
client: _,
} = self;
let shared = Arc::new(Mutex::new(PagestreamShared::ConnTaskRunning(
ConnTaskRunning {
cancel_on_client_drop,
conn_task,
},
)));
Ok(PagestreamClient {
copy_both: Box::pin(copy_both),
conn_task,
cancel_on_client_drop,
sink: PagestreamSender {
shared: shared.clone(),
sink,
},
stream: PagestreamReceiver {
shared: shared.clone(),
stream,
},
shared,
})
}
@@ -97,7 +112,28 @@ impl Client {
/// Create using [`Client::pagestream`].
pub struct PagestreamClient {
copy_both: Pin<Box<tokio_postgres::CopyBothDuplex<bytes::Bytes>>>,
shared: Arc<Mutex<PagestreamShared>>,
sink: PagestreamSender,
stream: PagestreamReceiver,
}
pub struct PagestreamSender {
#[allow(dead_code)]
shared: Arc<Mutex<PagestreamShared>>,
sink: SplitSink<tokio_postgres::CopyBothDuplex<bytes::Bytes>, bytes::Bytes>,
}
pub struct PagestreamReceiver {
#[allow(dead_code)]
shared: Arc<Mutex<PagestreamShared>>,
stream: SplitStream<tokio_postgres::CopyBothDuplex<bytes::Bytes>>,
}
enum PagestreamShared {
ConnTaskRunning(ConnTaskRunning),
ConnTaskCancelledJoinHandleReturnedOrDropped,
}
struct ConnTaskRunning {
cancel_on_client_drop: Option<tokio_util::sync::DropGuard>,
conn_task: JoinHandle<()>,
}
@@ -110,11 +146,11 @@ pub struct RelTagBlockNo {
impl PagestreamClient {
pub async fn shutdown(self) {
let Self {
copy_both,
cancel_on_client_drop: cancel_conn_task,
conn_task,
} = self;
// The `copy_both` contains internal channel sender, the receiver of which is polled by `conn_task`.
shared,
sink,
stream,
} = { self };
// The `copy_both` split into `sink` and `stream` contains internal channel sender, the receiver of which is polled by `conn_task`.
// When `conn_task` observes the sender has been dropped, it sends a `FeMessage::CopyFail` into the connection.
// (see https://github.com/neondatabase/rust-postgres/blob/2005bf79573b8add5cf205b52a2b208e356cc8b0/tokio-postgres/src/copy_both.rs#L56).
//
@@ -131,27 +167,77 @@ impl PagestreamClient {
//
// NB: page_service doesn't have a use case to exit the `pagestream` mode currently.
// => https://github.com/neondatabase/neon/issues/6390
let _ = cancel_conn_task.unwrap();
let ConnTaskRunning {
cancel_on_client_drop,
conn_task,
} = {
let mut guard = shared.lock().unwrap();
match std::mem::replace(
&mut *guard,
PagestreamShared::ConnTaskCancelledJoinHandleReturnedOrDropped,
) {
PagestreamShared::ConnTaskRunning(conn_task_running) => conn_task_running,
PagestreamShared::ConnTaskCancelledJoinHandleReturnedOrDropped => unreachable!(),
}
};
let _ = cancel_on_client_drop.unwrap();
conn_task.await.unwrap();
drop(copy_both);
// Now drop the split copy_both.
drop(sink);
drop(stream);
}
pub fn split(self) -> (PagestreamSender, PagestreamReceiver) {
let Self {
shared: _,
sink,
stream,
} = self;
(sink, stream)
}
pub async fn getpage(
&mut self,
req: PagestreamGetPageRequest,
) -> anyhow::Result<PagestreamGetPageResponse> {
let req = PagestreamFeMessage::GetPage(req);
let req: bytes::Bytes = req.serialize();
// let mut req = tokio_util::io::ReaderStream::new(&req);
let mut req = tokio_stream::once(Ok(req));
self.getpage_send(req).await?;
self.getpage_recv().await
}
self.copy_both.send_all(&mut req).await?;
pub async fn getpage_send(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()> {
self.sink.getpage_send(req).await
}
let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await;
pub async fn getpage_recv(&mut self) -> anyhow::Result<PagestreamGetPageResponse> {
self.stream.getpage_recv().await
}
}
impl PagestreamSender {
// TODO: maybe make this impl Sink instead for better composability?
pub async fn send(&mut self, msg: PagestreamFeMessage) -> anyhow::Result<()> {
let msg = msg.serialize();
self.sink.send_all(&mut tokio_stream::once(Ok(msg))).await?;
Ok(())
}
pub async fn getpage_send(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()> {
self.send(PagestreamFeMessage::GetPage(req)).await
}
}
impl PagestreamReceiver {
// TODO: maybe make this impl Stream instead for better composability?
pub async fn recv(&mut self) -> anyhow::Result<PagestreamBeMessage> {
let next: Option<Result<bytes::Bytes, _>> = self.stream.next().await;
let next: bytes::Bytes = next.unwrap()?;
PagestreamBeMessage::deserialize(next)
}
let msg = PagestreamBeMessage::deserialize(next)?;
match msg {
pub async fn getpage_recv(&mut self) -> anyhow::Result<PagestreamGetPageResponse> {
let next: PagestreamBeMessage = self.recv().await?;
match next {
PagestreamBeMessage::GetPage(p) => Ok(p),
PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e),
PagestreamBeMessage::Exists(_)
@@ -160,7 +246,14 @@ impl PagestreamClient {
| PagestreamBeMessage::GetSlruSegment(_) => {
anyhow::bail!(
"unexpected be message kind in response to getpage request: {}",
msg.kind()
next.kind()
)
}
#[cfg(feature = "testing")]
PagestreamBeMessage::Test(_) => {
anyhow::bail!(
"unexpected be message kind in response to getpage request: {}",
next.kind()
)
}
}

View File

@@ -0,0 +1,65 @@
use std::{
io::{stdin, stdout, Read, Write},
time::Duration,
};
use clap::Parser;
use pageserver_api::models::{PagestreamRequest, PagestreamTestRequest};
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
};
#[derive(clap::Parser)]
struct Args {
connstr: String,
tenant_id: TenantId,
timeline_id: TimelineId,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let Args {
connstr,
tenant_id,
timeline_id,
} = Args::parse();
let client = pageserver_client::page_service::Client::new(connstr).await?;
let client = client.pagestream(tenant_id, timeline_id).await?;
let (mut sender, _receiver) = client.split();
eprintln!("filling the pipe");
let mut msg = 0;
loop {
msg += 1;
let fut = sender.send(pageserver_api::models::PagestreamFeMessage::Test(
PagestreamTestRequest {
hdr: PagestreamRequest {
reqid: 0,
request_lsn: Lsn(23),
not_modified_since: Lsn(23),
},
batch_key: 42,
message: format!("message {}", msg),
},
));
let Ok(res) = tokio::time::timeout(Duration::from_secs(10), fut).await else {
eprintln!("pipe seems full");
break;
};
let _: () = res?;
}
let n = stdout().write(b"R")?;
assert_eq!(n, 1);
stdout().flush()?;
eprintln!("waiting for signal to tell us to exit");
let mut buf = [0u8; 1];
stdin().read_exact(&mut buf)?;
eprintln!("termination signal received, exiting");
anyhow::Ok(())
}

View File

@@ -1463,6 +1463,8 @@ pub enum SmgrQueryType {
GetPageAtLsn,
GetDbSize,
GetSlruSegment,
#[cfg(feature = "testing")]
Test,
}
pub(crate) struct SmgrQueryTimePerTimeline {

View File

@@ -554,6 +554,12 @@ struct BatchedGetPageRequest {
timer: SmgrOpTimer,
}
#[cfg(feature = "testing")]
struct BatchedTestRequest {
req: models::PagestreamTestRequest,
timer: SmgrOpTimer,
}
enum BatchedFeMessage {
Exists {
span: Span,
@@ -585,6 +591,12 @@ enum BatchedFeMessage {
shard: timeline::handle::Handle<TenantManagerTypes>,
req: models::PagestreamGetSlruSegmentRequest,
},
#[cfg(feature = "testing")]
Test {
span: Span,
shard: timeline::handle::Handle<TenantManagerTypes>,
requests: Vec<BatchedTestRequest>,
},
RespondError {
span: Span,
error: BatchedPageStreamError,
@@ -605,6 +617,12 @@ impl BatchedFeMessage {
page.timer.observe_execution_start(at);
}
}
#[cfg(feature = "testing")]
BatchedFeMessage::Test { requests, .. } => {
for req in requests {
req.timer.observe_execution_start(at);
}
}
BatchedFeMessage::RespondError { .. } => {}
}
}
@@ -865,6 +883,22 @@ impl PageServerHandler {
pages: smallvec::smallvec![BatchedGetPageRequest { req, timer }],
}
}
#[cfg(feature = "testing")]
PagestreamFeMessage::Test(req) => {
let span = tracing::info_span!(parent: parent_span, "handle_test_request");
let shard = timeline_handles
.get(tenant_id, timeline_id, ShardSelector::Zero)
.instrument(span.clone()) // sets `shard_id` field
.await?;
let timer =
record_op_start_and_throttle(&shard, metrics::SmgrQueryType::Test, received_at)
.await?;
BatchedFeMessage::Test {
span,
shard,
requests: vec![BatchedTestRequest { req, timer }],
}
}
};
Ok(Some(batched_msg))
}
@@ -927,6 +961,46 @@ impl PageServerHandler {
accum_pages.extend(this_pages);
Ok(())
}
#[cfg(feature = "testing")]
(
Ok(BatchedFeMessage::Test {
shard: accum_shard,
requests: accum_requests,
..
}),
BatchedFeMessage::Test {
shard: this_shard,
requests: this_requests,
..
},
) if (|| {
assert!(this_requests.len() == 1);
if accum_requests.len() >= max_batch_size.get() {
trace!(%max_batch_size, "stopping batching because of batch size");
assert_eq!(accum_requests.len(), max_batch_size.get());
return false;
}
if (accum_shard.tenant_shard_id, accum_shard.timeline_id)
!= (this_shard.tenant_shard_id, this_shard.timeline_id)
{
trace!("stopping batching because timeline object mismatch");
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
// But the current logic for keeping responses in order does not support that.
return false;
}
let this_batch_key = this_requests[0].req.batch_key;
let accum_batch_key = accum_requests[0].req.batch_key;
if this_requests[0].req.batch_key != accum_requests[0].req.batch_key {
trace!(%accum_batch_key, %this_batch_key, "stopping batching because batch key changed");
return false;
}
true
})() =>
{
// ok to batch
accum_requests.extend(this_requests);
Ok(())
}
// something batched already but this message is unbatchable
(_, this_msg) => {
// by default, don't continue batching
@@ -1053,6 +1127,27 @@ impl PageServerHandler {
span,
)
}
#[cfg(feature = "testing")]
BatchedFeMessage::Test {
span,
shard,
requests,
} => {
fail::fail_point!("ps::handle-pagerequest-message::test");
(
{
let npages = requests.len();
trace!(npages, "handling getpage request");
let res = self
.handle_test_request_batch(&shard, requests, ctx)
.instrument(span.clone())
.await;
assert_eq!(res.len(), npages);
res
},
span,
)
}
BatchedFeMessage::RespondError { span, error } => {
// We've already decided to respond with an error, so we don't need to
// call the handler.
@@ -1776,6 +1871,51 @@ impl PageServerHandler {
))
}
// NB: this impl mimics what we do for batched getpage requests.
#[cfg(feature = "testing")]
#[instrument(skip_all, fields(shard_id))]
async fn handle_test_request_batch(
&mut self,
timeline: &Timeline,
requests: Vec<BatchedTestRequest>,
_ctx: &RequestContext,
) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>> {
// real requests would do something with the timeline
let mut results = Vec::with_capacity(requests.len());
for _req in requests.iter() {
tokio::task::yield_now().await;
results.push({
if timeline.cancel.is_cancelled() {
Err(PageReconstructError::Cancelled)
} else {
Ok(())
}
});
}
// TODO: avoid creating the new Vec here
Vec::from_iter(
requests
.into_iter()
.zip(results.into_iter())
.map(|(req, res)| {
res.map(|()| {
(
PagestreamBeMessage::Test(models::PagestreamTestResponse {
req: req.req.clone(),
}),
req.timer,
)
})
.map_err(|e| BatchedPageStreamError {
err: PageStreamError::from(e),
req: req.req.hdr,
})
}),
)
}
/// Note on "fullbackup":
/// Full basebackups should only be used for debugging purposes.
/// Originally, it was introduced to enable breaking storage format changes,

View File

@@ -0,0 +1,60 @@
# NB: there are benchmarks that double-serve as tests inside the `performance` directory.
import subprocess
from pathlib import Path
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
@pytest.mark.timeout(30) # test takes <20s if pageserver impl is correct
@pytest.mark.parametrize("kind", ["pageserver-stop", "tenant-detach"])
def test_slow_flush(neon_env_builder: NeonEnvBuilder, neon_binpath: Path, kind: str):
def patch_pageserver_toml(config):
config["page_service_pipelining"] = {
"mode": "pipelined",
"max_batch_size": 32,
"execution": "concurrent-futures",
}
neon_env_builder.pageserver_config_override = patch_pageserver_toml
env = neon_env_builder.init_start()
log.info("make flush appear slow")
log.info("sending requests until pageserver accepts no more")
# TODO: extract this into a helper, like subprocess_capture,
# so that we capture the stderr from the helper somewhere.
child = subprocess.Popen(
[
neon_binpath / "test_helper_slow_client_reads",
env.pageserver.connstr(),
str(env.initial_tenant),
str(env.initial_timeline),
],
bufsize=0, # unbuffered
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
)
assert child.stdout is not None
buf = child.stdout.read(1)
if len(buf) != 1:
raise Exception("unexpected EOF")
if buf != b"R":
raise Exception(f"unexpected data: {buf!r}")
log.info("helper reports pageserver accepts no more requests")
log.info(
"assuming pageserver connection handle is in a state where TCP has backpressured pageserver=>client response flush() into userspace"
)
if kind == "pageserver-stop":
log.info("try to shut down the pageserver cleanly")
env.pageserver.stop()
elif kind == "tenant-detach":
log.info("try to shut down the tenant")
env.pageserver.tenant_detach(env.initial_tenant)
else:
raise ValueError(f"unexpected kind: {kind}")
log.info("shutdown did not time out, test passed")