cleanups to make CI pass (well, fail because the bug isn't fixed yet)

This commit is contained in:
Christian Schwarz
2025-01-14 22:45:09 +01:00
parent 1f7d173235
commit 53b05c4ba0
7 changed files with 35 additions and 12 deletions

View File

@@ -1641,15 +1641,16 @@ impl PagestreamFeMessage {
bytes.put_u8(req.kind);
bytes.put_u32(req.segno);
}
#[cfg(feature = "testing")]
Self::Test(req) => {
bytes.put_u8(5);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
bytes.put_u64(req.batch_key);
bytes.put_u64(req.message.as_bytes().len() as u64);
bytes.put_slice(req.message.as_bytes());
let message = req.message.as_bytes();
bytes.put_u64(message.len() as u64);
bytes.put_slice(message);
}
}
@@ -1797,11 +1798,13 @@ impl PagestreamBeMessage {
bytes.put(&resp.segment[..]);
}
#[cfg(feature = "testing")]
Self::Test(resp) => {
bytes.put_u8(Tag::Test as u8);
bytes.put_u64(resp.req.batch_key);
bytes.put_u64(resp.req.message.as_bytes().len() as u64);
bytes.put_slice(resp.req.message.as_bytes());
let message = resp.req.message.as_bytes();
bytes.put_u64(message.len() as u64);
bytes.put_slice(message);
}
}
}
@@ -1872,14 +1875,16 @@ impl PagestreamBeMessage {
bytes.put(&resp.segment[..]);
}
#[cfg(feature = "testing")]
Self::Test(resp) => {
bytes.put_u8(Tag::Test as u8);
bytes.put_u64(resp.req.hdr.reqid);
bytes.put_u64(resp.req.hdr.request_lsn.0);
bytes.put_u64(resp.req.hdr.not_modified_since.0);
bytes.put_u64(resp.req.batch_key);
bytes.put_u64(resp.req.message.as_bytes().len() as u64);
bytes.put_slice(resp.req.message.as_bytes());
let message = resp.req.message.as_bytes();
bytes.put_u64(message.len() as u64);
bytes.put_slice(message);
}
}
}

View File

@@ -113,3 +113,7 @@ harness = false
[[bench]]
name = "upload_queue"
harness = false
[[bin]]
name = "test_helper_slow_client_reads"
required-features = [ "testing" ]

View File

@@ -4,6 +4,9 @@ version = "0.1.0"
edition.workspace = true
license.workspace = true
[features]
testing = [ "pageserver_api/testing" ]
[dependencies]
pageserver_api.workspace = true
thiserror.workspace = true

View File

@@ -228,10 +228,11 @@ impl PagestreamSender {
}
impl PagestreamReceiver {
// TODO: maybe make this impl Stream instead for better composability?
pub async fn recv(&mut self) -> anyhow::Result<PagestreamBeMessage> {
let next: Option<Result<bytes::Bytes, _>> = self.stream.next().await;
let next: bytes::Bytes = next.unwrap()?;
Ok(PagestreamBeMessage::deserialize(next)?)
PagestreamBeMessage::deserialize(next)
}
pub async fn getpage_recv(&mut self) -> anyhow::Result<PagestreamGetPageResponse> {
@@ -242,8 +243,14 @@ impl PagestreamReceiver {
PagestreamBeMessage::Exists(_)
| PagestreamBeMessage::Nblocks(_)
| PagestreamBeMessage::DbSize(_)
| PagestreamBeMessage::GetSlruSegment(_)
| PagestreamBeMessage::Test(_) => {
| PagestreamBeMessage::GetSlruSegment(_) => {
anyhow::bail!(
"unexpected be message kind in response to getpage request: {}",
next.kind()
)
}
#[cfg(feature = "testing")]
PagestreamBeMessage::Test(_) => {
anyhow::bail!(
"unexpected be message kind in response to getpage request: {}",
next.kind()

View File

@@ -50,7 +50,8 @@ async fn main() -> anyhow::Result<()> {
let _: () = res?;
}
stdout().write(b"R")?;
let n = stdout().write(b"R")?;
assert_eq!(n, 1);
stdout().flush()?;
eprintln!("waiting for signal to tell us to exit");

View File

@@ -617,6 +617,7 @@ impl BatchedFeMessage {
page.timer.observe_execution_start(at);
}
}
#[cfg(feature = "testing")]
BatchedFeMessage::Test { requests, .. } => {
for req in requests {
req.timer.observe_execution_start(at);
@@ -960,6 +961,7 @@ impl PageServerHandler {
accum_pages.extend(this_pages);
Ok(())
}
#[cfg(feature = "testing")]
(
Ok(BatchedFeMessage::Test {
shard: accum_shard,
@@ -1885,7 +1887,7 @@ impl PageServerHandler {
results.push({
if timeline.cancel.is_cancelled() {
Err(PageStreamError::Shutdown)
Err(PageReconstructError::Cancelled)
} else {
Ok(())
}

View File

@@ -32,6 +32,7 @@ def test_slow_flush(neon_env_builder: NeonEnvBuilder, neon_binpath: Path):
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
)
assert child.stdout is not None
buf = child.stdout.read(1)
if len(buf) != 1:
raise Exception("unexpected EOF")