From 53b05c4ba03633dd7e96e5c420f7148f203fc57a Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 14 Jan 2025 22:45:09 +0100 Subject: [PATCH] cleanups to make CI pass (well, fail because the bug isn't fixed yet) --- libs/pageserver_api/src/models.rs | 19 ++++++++++++------- pageserver/Cargo.toml | 4 ++++ pageserver/client/Cargo.toml | 3 +++ pageserver/client/src/page_service.rs | 13 ++++++++++--- .../src/bin/test_helper_slow_client_reads.rs | 3 ++- pageserver/src/page_service.rs | 4 +++- .../test_page_service_batching_regressions.py | 1 + 7 files changed, 35 insertions(+), 12 deletions(-) diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 2e6949e6ce..7e08b2825a 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -1641,15 +1641,16 @@ impl PagestreamFeMessage { bytes.put_u8(req.kind); bytes.put_u32(req.segno); } - + #[cfg(feature = "testing")] Self::Test(req) => { bytes.put_u8(5); bytes.put_u64(req.hdr.reqid); bytes.put_u64(req.hdr.request_lsn.0); bytes.put_u64(req.hdr.not_modified_since.0); bytes.put_u64(req.batch_key); - bytes.put_u64(req.message.as_bytes().len() as u64); - bytes.put_slice(req.message.as_bytes()); + let message = req.message.as_bytes(); + bytes.put_u64(message.len() as u64); + bytes.put_slice(message); } } @@ -1797,11 +1798,13 @@ impl PagestreamBeMessage { bytes.put(&resp.segment[..]); } + #[cfg(feature = "testing")] Self::Test(resp) => { bytes.put_u8(Tag::Test as u8); bytes.put_u64(resp.req.batch_key); - bytes.put_u64(resp.req.message.as_bytes().len() as u64); - bytes.put_slice(resp.req.message.as_bytes()); + let message = resp.req.message.as_bytes(); + bytes.put_u64(message.len() as u64); + bytes.put_slice(message); } } } @@ -1872,14 +1875,16 @@ impl PagestreamBeMessage { bytes.put(&resp.segment[..]); } + #[cfg(feature = "testing")] Self::Test(resp) => { bytes.put_u8(Tag::Test as u8); bytes.put_u64(resp.req.hdr.reqid); bytes.put_u64(resp.req.hdr.request_lsn.0); bytes.put_u64(resp.req.hdr.not_modified_since.0); bytes.put_u64(resp.req.batch_key); - bytes.put_u64(resp.req.message.as_bytes().len() as u64); - bytes.put_slice(resp.req.message.as_bytes()); + let message = resp.req.message.as_bytes(); + bytes.put_u64(message.len() as u64); + bytes.put_slice(message); } } } diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 8547746d94..0fe3b8337b 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -113,3 +113,7 @@ harness = false [[bench]] name = "upload_queue" harness = false + +[[bin]] +name = "test_helper_slow_client_reads" +required-features = [ "testing" ] diff --git a/pageserver/client/Cargo.toml b/pageserver/client/Cargo.toml index d9b36bf3d4..f582d307a7 100644 --- a/pageserver/client/Cargo.toml +++ b/pageserver/client/Cargo.toml @@ -4,6 +4,9 @@ version = "0.1.0" edition.workspace = true license.workspace = true +[features] +testing = [ "pageserver_api/testing" ] + [dependencies] pageserver_api.workspace = true thiserror.workspace = true diff --git a/pageserver/client/src/page_service.rs b/pageserver/client/src/page_service.rs index f312b888b6..27280912b4 100644 --- a/pageserver/client/src/page_service.rs +++ b/pageserver/client/src/page_service.rs @@ -228,10 +228,11 @@ impl PagestreamSender { } impl PagestreamReceiver { + // TODO: maybe make this impl Stream instead for better composability? pub async fn recv(&mut self) -> anyhow::Result { let next: Option> = self.stream.next().await; let next: bytes::Bytes = next.unwrap()?; - Ok(PagestreamBeMessage::deserialize(next)?) + PagestreamBeMessage::deserialize(next) } pub async fn getpage_recv(&mut self) -> anyhow::Result { @@ -242,8 +243,14 @@ impl PagestreamReceiver { PagestreamBeMessage::Exists(_) | PagestreamBeMessage::Nblocks(_) | PagestreamBeMessage::DbSize(_) - | PagestreamBeMessage::GetSlruSegment(_) - | PagestreamBeMessage::Test(_) => { + | PagestreamBeMessage::GetSlruSegment(_) => { + anyhow::bail!( + "unexpected be message kind in response to getpage request: {}", + next.kind() + ) + } + #[cfg(feature = "testing")] + PagestreamBeMessage::Test(_) => { anyhow::bail!( "unexpected be message kind in response to getpage request: {}", next.kind() diff --git a/pageserver/src/bin/test_helper_slow_client_reads.rs b/pageserver/src/bin/test_helper_slow_client_reads.rs index 9a812e436d..c1ce332b6c 100644 --- a/pageserver/src/bin/test_helper_slow_client_reads.rs +++ b/pageserver/src/bin/test_helper_slow_client_reads.rs @@ -50,7 +50,8 @@ async fn main() -> anyhow::Result<()> { let _: () = res?; } - stdout().write(b"R")?; + let n = stdout().write(b"R")?; + assert_eq!(n, 1); stdout().flush()?; eprintln!("waiting for signal to tell us to exit"); diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 96d1a826a9..f4a7517bbd 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -617,6 +617,7 @@ impl BatchedFeMessage { page.timer.observe_execution_start(at); } } + #[cfg(feature = "testing")] BatchedFeMessage::Test { requests, .. } => { for req in requests { req.timer.observe_execution_start(at); @@ -960,6 +961,7 @@ impl PageServerHandler { accum_pages.extend(this_pages); Ok(()) } + #[cfg(feature = "testing")] ( Ok(BatchedFeMessage::Test { shard: accum_shard, @@ -1885,7 +1887,7 @@ impl PageServerHandler { results.push({ if timeline.cancel.is_cancelled() { - Err(PageStreamError::Shutdown) + Err(PageReconstructError::Cancelled) } else { Ok(()) } diff --git a/test_runner/regress/test_page_service_batching_regressions.py b/test_runner/regress/test_page_service_batching_regressions.py index f781dd4850..46afc4cc3d 100644 --- a/test_runner/regress/test_page_service_batching_regressions.py +++ b/test_runner/regress/test_page_service_batching_regressions.py @@ -32,6 +32,7 @@ def test_slow_flush(neon_env_builder: NeonEnvBuilder, neon_binpath: Path): stdin=subprocess.PIPE, stdout=subprocess.PIPE, ) + assert child.stdout is not None buf = child.stdout.read(1) if len(buf) != 1: raise Exception("unexpected EOF")