diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 39390d7647..227bc19d67 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -1398,6 +1398,8 @@ pub enum PagestreamFeMessage { GetPage(PagestreamGetPageRequest), DbSize(PagestreamDbSizeRequest), GetSlruSegment(PagestreamGetSlruSegmentRequest), + #[cfg(feature = "testing")] + Test(PagestreamTestRequest), } // Wrapped in libpq CopyData @@ -1409,6 +1411,8 @@ pub enum PagestreamBeMessage { Error(PagestreamErrorResponse), DbSize(PagestreamDbSizeResponse), GetSlruSegment(PagestreamGetSlruSegmentResponse), + #[cfg(feature = "testing")] + Test(PagestreamTestResponse), } // Keep in sync with `pagestore_client.h` @@ -1420,6 +1424,9 @@ enum PagestreamBeMessageTag { Error = 103, DbSize = 104, GetSlruSegment = 105, + /// Test message discrimimant is unstable + #[cfg(feature = "testing")] + Test = 106, } impl TryFrom for PagestreamBeMessageTag { type Error = u8; @@ -1431,6 +1438,8 @@ impl TryFrom for PagestreamBeMessageTag { 103 => Ok(PagestreamBeMessageTag::Error), 104 => Ok(PagestreamBeMessageTag::DbSize), 105 => Ok(PagestreamBeMessageTag::GetSlruSegment), + #[cfg(feature = "testing")] + 106 => Ok(PagestreamBeMessageTag::Test), _ => Err(value), } } @@ -1548,6 +1557,20 @@ pub struct PagestreamDbSizeResponse { pub db_size: i64, } +#[cfg(feature = "testing")] +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct PagestreamTestRequest { + pub hdr: PagestreamRequest, + pub batch_key: u64, + pub message: String, +} + +#[cfg(feature = "testing")] +#[derive(Debug)] +pub struct PagestreamTestResponse { + pub req: PagestreamTestRequest, +} + // This is a cut-down version of TenantHistorySize from the pageserver crate, omitting fields // that require pageserver-internal types. It is sufficient to get the total size. #[derive(Serialize, Deserialize, Debug)] @@ -1616,6 +1639,16 @@ impl PagestreamFeMessage { bytes.put_u8(req.kind); bytes.put_u32(req.segno); } + + Self::Test(req) => { + bytes.put_u8(5); + bytes.put_u64(req.hdr.reqid); + bytes.put_u64(req.hdr.request_lsn.0); + bytes.put_u64(req.hdr.not_modified_since.0); + bytes.put_u64(req.batch_key); + bytes.put_u64(req.message.as_bytes().len() as u64); + bytes.put_slice(req.message.as_bytes()); + } } bytes.into() @@ -1703,6 +1736,21 @@ impl PagestreamFeMessage { segno: body.read_u32::()?, }, )), + #[cfg(feature = "testing")] + 5 => Ok(PagestreamFeMessage::Test(PagestreamTestRequest { + hdr: PagestreamRequest { + reqid, + request_lsn, + not_modified_since, + }, + batch_key: body.read_u64::()?, + message: { + let len = body.read_u64::()?; + let mut buf = vec![0; len as usize]; + body.read_exact(&mut buf)?; + String::from_utf8(buf)? + }, + })), _ => bail!("unknown smgr message tag: {:?}", msg_tag), } } @@ -1746,6 +1794,13 @@ impl PagestreamBeMessage { bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32); bytes.put(&resp.segment[..]); } + + Self::Test(resp) => { + bytes.put_u8(Tag::Test as u8); + bytes.put_u64(resp.req.batch_key); + bytes.put_u64(resp.req.message.as_bytes().len() as u64); + bytes.put_slice(resp.req.message.as_bytes()); + } } } PagestreamProtocolVersion::V3 => { @@ -1814,6 +1869,16 @@ impl PagestreamBeMessage { bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32); bytes.put(&resp.segment[..]); } + + Self::Test(resp) => { + bytes.put_u8(Tag::Test as u8); + bytes.put_u64(resp.req.hdr.reqid); + bytes.put_u64(resp.req.hdr.request_lsn.0); + bytes.put_u64(resp.req.hdr.not_modified_since.0); + bytes.put_u64(resp.req.batch_key); + bytes.put_u64(resp.req.message.as_bytes().len() as u64); + bytes.put_slice(resp.req.message.as_bytes()); + } } } } @@ -1956,6 +2021,28 @@ impl PagestreamBeMessage { segment: segment.into(), }) } + #[cfg(feature = "testing")] + Tag::Test => { + let reqid = buf.read_u64::()?; + let request_lsn = Lsn(buf.read_u64::()?); + let not_modified_since = Lsn(buf.read_u64::()?); + let batch_key = buf.read_u64::()?; + let len = buf.read_u64::()?; + let mut msg = vec![0; len as usize]; + buf.read_exact(&mut msg)?; + let message = String::from_utf8(msg)?; + Self::Test(PagestreamTestResponse { + req: PagestreamTestRequest { + hdr: PagestreamRequest { + reqid, + request_lsn, + not_modified_since, + }, + batch_key, + message, + }, + }) + } }; let remaining = buf.into_inner(); if !remaining.is_empty() { @@ -1975,6 +2062,8 @@ impl PagestreamBeMessage { Self::Error(_) => "Error", Self::DbSize(_) => "DbSize", Self::GetSlruSegment(_) => "GetSlruSegment", + #[cfg(feature = "testing")] + Self::Test(_) => "Test", } } } diff --git a/pageserver/client/src/page_service.rs b/pageserver/client/src/page_service.rs index 207ec4166c..3827e19aa4 100644 --- a/pageserver/client/src/page_service.rs +++ b/pageserver/client/src/page_service.rs @@ -1,6 +1,9 @@ -use std::pin::Pin; +use std::sync::{Arc, Mutex}; -use futures::SinkExt; +use futures::{ + stream::{SplitSink, SplitStream}, + SinkExt, StreamExt, +}; use pageserver_api::{ models::{ PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest, @@ -10,7 +13,6 @@ use pageserver_api::{ }; use tokio::task::JoinHandle; use tokio_postgres::CopyOutStream; -use tokio_stream::StreamExt; use tokio_util::sync::CancellationToken; use utils::{ id::{TenantId, TimelineId}, @@ -62,15 +64,28 @@ impl Client { .client .copy_both_simple(&format!("pagestream_v3 {tenant_id} {timeline_id}")) .await?; + let (sink, stream) = copy_both.split(); // TODO: actually support splitting of the CopyBothDuplex so the lock inside this split adaptor goes away. let Client { cancel_on_client_drop, conn_task, client: _, } = self; + let shared = Arc::new(Mutex::new(PagestreamShared::ConnTaskRunning( + ConnTaskRunning { + cancel_on_client_drop, + conn_task, + }, + ))); Ok(PagestreamClient { - copy_both: Box::pin(copy_both), - conn_task, - cancel_on_client_drop, + sink: PagestreamSender { + shared: shared.clone(), + sink, + }, + stream: PagestreamReceiver { + shared: shared.clone(), + stream, + }, + shared, }) } @@ -97,7 +112,26 @@ impl Client { /// Create using [`Client::pagestream`]. pub struct PagestreamClient { - copy_both: Pin>>, + shared: Arc>, + sink: PagestreamSender, + stream: PagestreamReceiver, +} + +pub struct PagestreamSender { + shared: Arc>, + sink: SplitSink, bytes::Bytes>, +} + +pub struct PagestreamReceiver { + shared: Arc>, + stream: SplitStream>, +} + +enum PagestreamShared { + ConnTaskRunning(ConnTaskRunning), + ConnTaskCancelledJoinHandleReturnedOrDropped, +} +struct ConnTaskRunning { cancel_on_client_drop: Option, conn_task: JoinHandle<()>, } @@ -110,11 +144,11 @@ pub struct RelTagBlockNo { impl PagestreamClient { pub async fn shutdown(self) { let Self { - copy_both, - cancel_on_client_drop: cancel_conn_task, - conn_task, - } = self; - // The `copy_both` contains internal channel sender, the receiver of which is polled by `conn_task`. + shared, + sink, + stream, + } = { self }; + // The `copy_both` split into `sink` and `stream` contains internal channel sender, the receiver of which is polled by `conn_task`. // When `conn_task` observes the sender has been dropped, it sends a `FeMessage::CopyFail` into the connection. // (see https://github.com/neondatabase/rust-postgres/blob/2005bf79573b8add5cf205b52a2b208e356cc8b0/tokio-postgres/src/copy_both.rs#L56). // @@ -131,36 +165,86 @@ impl PagestreamClient { // // NB: page_service doesn't have a use case to exit the `pagestream` mode currently. // => https://github.com/neondatabase/neon/issues/6390 - let _ = cancel_conn_task.unwrap(); + let ConnTaskRunning { + cancel_on_client_drop, + conn_task, + } = { + let mut guard = shared.lock().unwrap(); + match std::mem::replace( + &mut *guard, + PagestreamShared::ConnTaskCancelledJoinHandleReturnedOrDropped, + ) { + PagestreamShared::ConnTaskRunning(conn_task_running) => conn_task_running, + PagestreamShared::ConnTaskCancelledJoinHandleReturnedOrDropped => unreachable!(), + } + }; + let _ = cancel_on_client_drop.unwrap(); conn_task.await.unwrap(); - drop(copy_both); + + // Now drop the split copy_both. + drop(sink); + drop(stream); + } + + pub fn split(self) -> (PagestreamSender, PagestreamReceiver) { + let Self { + shared, + sink, + stream, + } = self; + (sink, stream) } pub async fn getpage( &mut self, req: PagestreamGetPageRequest, ) -> anyhow::Result { - let req = PagestreamFeMessage::GetPage(req); - let req: bytes::Bytes = req.serialize(); - // let mut req = tokio_util::io::ReaderStream::new(&req); - let mut req = tokio_stream::once(Ok(req)); + self.getpage_send(req).await?; + self.getpage_recv().await + } - self.copy_both.send_all(&mut req).await?; + pub async fn getpage_send(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()> { + self.sink.getpage_send(req).await + } - let next: Option> = self.copy_both.next().await; + pub async fn getpage_recv(&mut self) -> anyhow::Result { + self.stream.getpage_recv().await + } +} + +impl PagestreamSender { + // TODO: maybe make this impl Sink instead for better composability? + pub async fn send(&mut self, msg: PagestreamFeMessage) -> anyhow::Result<()> { + let msg = msg.serialize(); + self.sink.send_all(&mut tokio_stream::once(Ok(msg))).await?; + Ok(()) + } + + pub async fn getpage_send(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()> { + self.send(PagestreamFeMessage::GetPage(req)).await + } +} + +impl PagestreamReceiver { + pub async fn recv(&mut self) -> anyhow::Result { + let next: Option> = self.stream.next().await; let next: bytes::Bytes = next.unwrap()?; + Ok(PagestreamBeMessage::deserialize(next)?) + } - let msg = PagestreamBeMessage::deserialize(next)?; - match msg { + pub async fn getpage_recv(&mut self) -> anyhow::Result { + let next: PagestreamBeMessage = self.recv().await?; + match next { PagestreamBeMessage::GetPage(p) => Ok(p), PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e), PagestreamBeMessage::Exists(_) | PagestreamBeMessage::Nblocks(_) | PagestreamBeMessage::DbSize(_) - | PagestreamBeMessage::GetSlruSegment(_) => { + | PagestreamBeMessage::GetSlruSegment(_) + | PagestreamBeMessage::Test(_) => { anyhow::bail!( "unexpected be message kind in response to getpage request: {}", - msg.kind() + next.kind() ) } } diff --git a/pageserver/src/bin/test_helper_slow_client_reads.rs b/pageserver/src/bin/test_helper_slow_client_reads.rs new file mode 100644 index 0000000000..6fad937bf5 --- /dev/null +++ b/pageserver/src/bin/test_helper_slow_client_reads.rs @@ -0,0 +1,63 @@ +use std::{ + io::{stdin, stdout, Read, Write}, + time::Duration, +}; + +use clap::Parser; +use pageserver_api::models::{PagestreamGetPageRequest, PagestreamRequest, PagestreamTestRequest}; +use utils::{ + id::{TenantId, TimelineId}, + lsn::Lsn, + shard::TenantShardId, +}; + +#[derive(clap::Parser)] +struct Args { + connstr: String, + tenant_id: TenantId, + timeline_id: TimelineId, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let Args { + connstr, + tenant_id, + timeline_id, + } = Args::parse(); + let client = pageserver_client::page_service::Client::new(connstr).await?; + let client = client.pagestream(tenant_id, timeline_id).await?; + let (mut sender, mut receiver) = client.split(); + + eprintln!("filling the pipe"); + let mut msg = 0; + loop { + msg += 1; + let fut = sender.send(pageserver_api::models::PagestreamFeMessage::Test( + PagestreamTestRequest { + hdr: PagestreamRequest { + reqid: 0, + request_lsn: Lsn(23), + not_modified_since: Lsn(23), + }, + batch_key: 42, + message: format!("message {}", msg), + }, + )); + let Ok(res) = tokio::time::timeout(Duration::from_secs(1), fut).await else { + eprintln!("pipe seems full"); + break; + }; + let _: () = res?; + } + + stdout().write(b"R")?; + stdout().flush()?; + + let mut buf = [0u8; 1]; + stdin().read_exact(&mut buf)?; + + eprintln!("termination signal received, exiting"); + + anyhow::Ok(()) +} diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index d478aa87fa..9b877fc368 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1472,6 +1472,8 @@ pub enum SmgrQueryType { GetPageAtLsn, GetDbSize, GetSlruSegment, + #[cfg(feature = "testing")] + Test, } pub(crate) struct SmgrQueryTimePerTimeline { diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 8b4bab3385..301f248517 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -555,6 +555,12 @@ struct BatchedGetPageRequest { timer: SmgrOpTimer, } +#[cfg(feature = "testing")] +struct BatchedTestRequest { + req: models::PagestreamTestRequest, + timer: SmgrOpTimer, +} + enum BatchedFeMessage { Exists { span: Span, @@ -586,6 +592,12 @@ enum BatchedFeMessage { shard: timeline::handle::Handle, req: models::PagestreamGetSlruSegmentRequest, }, + #[cfg(feature = "testing")] + Test { + span: Span, + shard: timeline::handle::Handle, + requests: Vec, + }, RespondError { span: Span, error: BatchedPageStreamError, @@ -606,6 +618,11 @@ impl BatchedFeMessage { page.timer.observe_execution_start(at); } } + BatchedFeMessage::Test { requests, .. } => { + for req in requests { + req.timer.observe_execution_start(at); + } + } BatchedFeMessage::RespondError { .. } => {} } } @@ -866,6 +883,22 @@ impl PageServerHandler { pages: smallvec::smallvec![BatchedGetPageRequest { req, timer }], } } + #[cfg(feature = "testing")] + PagestreamFeMessage::Test(req) => { + let span = tracing::info_span!(parent: parent_span, "handle_test_request"); + let shard = timeline_handles + .get(tenant_id, timeline_id, ShardSelector::Zero) + .instrument(span.clone()) // sets `shard_id` field + .await?; + let timer = + record_op_start_and_throttle(&shard, metrics::SmgrQueryType::Test, received_at) + .await?; + BatchedFeMessage::Test { + span, + shard, + requests: vec![BatchedTestRequest { req, timer }], + } + } }; Ok(Some(batched_msg)) } @@ -928,6 +961,45 @@ impl PageServerHandler { accum_pages.extend(this_pages); Ok(()) } + ( + Ok(BatchedFeMessage::Test { + shard: accum_shard, + requests: accum_requests, + .. + }), + BatchedFeMessage::Test { + shard: this_shard, + requests: this_requests, + .. + }, + ) if (|| { + assert!(this_requests.len() == 1); + if accum_requests.len() >= max_batch_size.get() { + trace!(%max_batch_size, "stopping batching because of batch size"); + assert_eq!(accum_requests.len(), max_batch_size.get()); + return false; + } + if (accum_shard.tenant_shard_id, accum_shard.timeline_id) + != (this_shard.tenant_shard_id, this_shard.timeline_id) + { + trace!("stopping batching because timeline object mismatch"); + // TODO: we _could_ batch & execute each shard seperately (and in parallel). + // But the current logic for keeping responses in order does not support that. + return false; + } + let this_batch_key = this_requests[0].req.batch_key; + let accum_batch_key = accum_requests[0].req.batch_key; + if this_requests[0].req.batch_key != accum_requests[0].req.batch_key { + trace!(%accum_batch_key, %this_batch_key, "stopping batching because batch key changed"); + return false; + } + true + })() => + { + // ok to batch + accum_requests.extend(this_requests); + Ok(()) + } // something batched already but this message is unbatchable (_, this_msg) => { // by default, don't continue batching @@ -1054,6 +1126,27 @@ impl PageServerHandler { span, ) } + #[cfg(feature = "testing")] + BatchedFeMessage::Test { + span, + shard, + requests, + } => { + fail::fail_point!("ps::handle-pagerequest-message::test"); + ( + { + let npages = requests.len(); + trace!(npages, "handling getpage request"); + let res = self + .handle_test_request_batch(&shard, requests, ctx) + .instrument(span.clone()) + .await; + assert_eq!(res.len(), npages); + res + }, + span, + ) + } BatchedFeMessage::RespondError { span, error } => { // We've already decided to respond with an error, so we don't need to // call the handler. @@ -1780,6 +1873,54 @@ impl PageServerHandler { )) } + #[cfg(feature = "testing")] + #[instrument(skip_all, fields(shard_id))] + async fn handle_test_request_batch( + &mut self, + timeline: &Timeline, + requests: Vec, + ctx: &RequestContext, + ) -> Vec> { + use pageserver_api::models::PagestreamTestResponse; + + let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); + + // real requests would do something with the timeline + let mut results = Vec::with_capacity(requests.len()); + for req in requests.iter() { + tokio::task::yield_now().await; + + results.push({ + if timeline.cancel.is_cancelled() { + Err(PageStreamError::Shutdown) + } else { + Ok(PagestreamTestResponse { req: req.req.clone() }) + } + }); + } + + // TODO: avoid creating the new Vec here + Vec::from_iter( + requests + .into_iter() + .zip(results.into_iter()) + .map(|(req, res)| { + res.map(|page| { + ( + PagestreamBeMessage::Test(models::PagestreamTestResponse { + req: req.req.clone(), + }), + req.timer, + ) + }) + .map_err(|e| BatchedPageStreamError { + err: PageStreamError::from(e), + req: req.req.hdr, + }) + }), + ) + } + /// Note on "fullbackup": /// Full basebackups should only be used for debugging purposes. /// Originally, it was introduced to enable breaking storage format changes, diff --git a/test_runner/regress/test_page_service_batching_regressions.py b/test_runner/regress/test_page_service_batching_regressions.py index 4d456cad79..e4312644b4 100644 --- a/test_runner/regress/test_page_service_batching_regressions.py +++ b/test_runner/regress/test_page_service_batching_regressions.py @@ -1,19 +1,13 @@ # NB: there are benchmarks that double-serve as tests inside the `performance` directory. -import threading -import time +import subprocess +from pathlib import Path -import requests.exceptions - -import fixtures -from fixtures.common_types import NodeId from fixtures.log_helper import log -from fixtures.neon_fixtures import NeonEnvBuilder, StorageControllerApiException +from fixtures.neon_fixtures import NeonEnvBuilder -def test_slow_flush(neon_env_builder: NeonEnvBuilder): - tablesize_mib = 500 - +def test_slow_flush(neon_env_builder: NeonEnvBuilder, neon_binpath: Path): def patch_pageserver_toml(config): config["page_service_pipelining"] = { "mode": "pipelined", @@ -22,84 +16,30 @@ def test_slow_flush(neon_env_builder: NeonEnvBuilder): } neon_env_builder.pageserver_config_override = patch_pageserver_toml - neon_env_builder.num_pageservers = 2 env = neon_env_builder.init_start() - ep = env.endpoints.create_start( - "main", - config_lines=[ - "max_parallel_workers_per_gather=0", # disable parallel backends - "effective_io_concurrency=100", # give plenty of opportunity for pipelining - "neon.readahead_buffer_size=128", # this is the default value at time of writing - "shared_buffers=128MB", # keep lower than tablesize_mib - # debug - "log_statement=all", + log.info("make flush appear slow") + ps_http = env.pageserver.http_client() + ps_http.configure_failpoints(("page_service:flush:pre", "return(10000000)")) + + log.info("filling pipe") + child = subprocess.Popen( + [ + neon_binpath / "test_helper_slow_client_reads", + env.pageserver.connstr(), + str(env.initial_tenant), + str(env.initial_timeline), ], + bufsize=0, # unbuffered + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, ) + buf = child.stdout.read(1) + if len(buf) != 1: + raise Exception("unexpected EOF") + if buf != b"R": + raise Exception(f"unexpected data: {buf!r}") + log.info("helper reports pipe filled") - conn = ep.connect() - cur = conn.cursor() - - cur.execute("CREATE EXTENSION IF NOT EXISTS neon;") - cur.execute("CREATE EXTENSION IF NOT EXISTS neon_test_utils;") - - log.info("Filling the table") - cur.execute("CREATE TABLE t (data char(1000)) with (fillfactor=10)") - tablesize = tablesize_mib * 1024 * 1024 - npages = tablesize // (8 * 1024) - cur.execute("INSERT INTO t SELECT generate_series(1, %s)", (npages,)) - - cur.close() - conn.close() - - def workload(stop: threading.Event, max_iters=None): - iters = 0 - while stop.is_set() is False and (max_iters == None or iters < max_iters): - log.info("Seqscan %d", iters) - conn = ep.connect() - cur = conn.cursor() - cur.execute( - "select clear_buffer_cache()" - ) # TODO: what about LFC? doesn't matter right now because LFC isn't enabled by default in tests - cur.execute("select sum(data::bigint) from t") - assert cur.fetchall()[0][0] == npages * (npages + 1) // 2 - iters += 1 - log.info("workload done") - - stop = threading.Event() - - log.info("calibrating workload duration") - workload(stop, 1) - before = time.time() - workload(stop, 1) - after = time.time() - duration = after - before - log.info("duration: %f", duration) - assert(duration > 3) - - log.info("begin") - threading.Thread(target=workload, args=[stop]).start() - - # make flush appear slow - ps_http = [p.http_client() for p in env.pageservers] - ps_http[0].configure_failpoints(("page_service:flush:pre", "return(10000000)")) - ps_http[1].configure_failpoints(("page_service:flush:pre", "return(10000000)")) - - time.sleep(1) - - # try to shut down the tenant - for i in range(1, 10): - log.info(f"start migration {i}") - try: - env.storage_controller.tenant_shard_migrate(env.initial_tenant, (i % 2)+1) - except StorageControllerApiException as e: - log.info(f"shard migrate request failed: {e}") - while True: - node_id = NodeId(env.storage_controller.tenant_describe(env.initial_tenant)["node_id"]) - if node_id == NodeId(i % 2)+1: - break - log.info(f"waiting for migration to complete") - time.sleep(1) - log.info(f"migration done") - time.sleep(1) - + log.info("try to shut down the tenant") + env.pageserver.tenant_detach(env.initial_tenant)