it repros

This commit is contained in:
Christian Schwarz
2025-01-14 22:16:27 +01:00
parent 9a02bc0cfd
commit 45e08d0aa5
6 changed files with 429 additions and 110 deletions

View File

@@ -1398,6 +1398,8 @@ pub enum PagestreamFeMessage {
GetPage(PagestreamGetPageRequest),
DbSize(PagestreamDbSizeRequest),
GetSlruSegment(PagestreamGetSlruSegmentRequest),
#[cfg(feature = "testing")]
Test(PagestreamTestRequest),
}
// Wrapped in libpq CopyData
@@ -1409,6 +1411,8 @@ pub enum PagestreamBeMessage {
Error(PagestreamErrorResponse),
DbSize(PagestreamDbSizeResponse),
GetSlruSegment(PagestreamGetSlruSegmentResponse),
#[cfg(feature = "testing")]
Test(PagestreamTestResponse),
}
// Keep in sync with `pagestore_client.h`
@@ -1420,6 +1424,9 @@ enum PagestreamBeMessageTag {
Error = 103,
DbSize = 104,
GetSlruSegment = 105,
/// Test message discrimimant is unstable
#[cfg(feature = "testing")]
Test = 106,
}
impl TryFrom<u8> for PagestreamBeMessageTag {
type Error = u8;
@@ -1431,6 +1438,8 @@ impl TryFrom<u8> for PagestreamBeMessageTag {
103 => Ok(PagestreamBeMessageTag::Error),
104 => Ok(PagestreamBeMessageTag::DbSize),
105 => Ok(PagestreamBeMessageTag::GetSlruSegment),
#[cfg(feature = "testing")]
106 => Ok(PagestreamBeMessageTag::Test),
_ => Err(value),
}
}
@@ -1548,6 +1557,20 @@ pub struct PagestreamDbSizeResponse {
pub db_size: i64,
}
#[cfg(feature = "testing")]
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct PagestreamTestRequest {
pub hdr: PagestreamRequest,
pub batch_key: u64,
pub message: String,
}
#[cfg(feature = "testing")]
#[derive(Debug)]
pub struct PagestreamTestResponse {
pub req: PagestreamTestRequest,
}
// This is a cut-down version of TenantHistorySize from the pageserver crate, omitting fields
// that require pageserver-internal types. It is sufficient to get the total size.
#[derive(Serialize, Deserialize, Debug)]
@@ -1616,6 +1639,16 @@ impl PagestreamFeMessage {
bytes.put_u8(req.kind);
bytes.put_u32(req.segno);
}
Self::Test(req) => {
bytes.put_u8(5);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
bytes.put_u64(req.batch_key);
bytes.put_u64(req.message.as_bytes().len() as u64);
bytes.put_slice(req.message.as_bytes());
}
}
bytes.into()
@@ -1703,6 +1736,21 @@ impl PagestreamFeMessage {
segno: body.read_u32::<BigEndian>()?,
},
)),
#[cfg(feature = "testing")]
5 => Ok(PagestreamFeMessage::Test(PagestreamTestRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
batch_key: body.read_u64::<BigEndian>()?,
message: {
let len = body.read_u64::<BigEndian>()?;
let mut buf = vec![0; len as usize];
body.read_exact(&mut buf)?;
String::from_utf8(buf)?
},
})),
_ => bail!("unknown smgr message tag: {:?}", msg_tag),
}
}
@@ -1746,6 +1794,13 @@ impl PagestreamBeMessage {
bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
bytes.put(&resp.segment[..]);
}
Self::Test(resp) => {
bytes.put_u8(Tag::Test as u8);
bytes.put_u64(resp.req.batch_key);
bytes.put_u64(resp.req.message.as_bytes().len() as u64);
bytes.put_slice(resp.req.message.as_bytes());
}
}
}
PagestreamProtocolVersion::V3 => {
@@ -1814,6 +1869,16 @@ impl PagestreamBeMessage {
bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
bytes.put(&resp.segment[..]);
}
Self::Test(resp) => {
bytes.put_u8(Tag::Test as u8);
bytes.put_u64(resp.req.hdr.reqid);
bytes.put_u64(resp.req.hdr.request_lsn.0);
bytes.put_u64(resp.req.hdr.not_modified_since.0);
bytes.put_u64(resp.req.batch_key);
bytes.put_u64(resp.req.message.as_bytes().len() as u64);
bytes.put_slice(resp.req.message.as_bytes());
}
}
}
}
@@ -1956,6 +2021,28 @@ impl PagestreamBeMessage {
segment: segment.into(),
})
}
#[cfg(feature = "testing")]
Tag::Test => {
let reqid = buf.read_u64::<BigEndian>()?;
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
let batch_key = buf.read_u64::<BigEndian>()?;
let len = buf.read_u64::<BigEndian>()?;
let mut msg = vec![0; len as usize];
buf.read_exact(&mut msg)?;
let message = String::from_utf8(msg)?;
Self::Test(PagestreamTestResponse {
req: PagestreamTestRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
batch_key,
message,
},
})
}
};
let remaining = buf.into_inner();
if !remaining.is_empty() {
@@ -1975,6 +2062,8 @@ impl PagestreamBeMessage {
Self::Error(_) => "Error",
Self::DbSize(_) => "DbSize",
Self::GetSlruSegment(_) => "GetSlruSegment",
#[cfg(feature = "testing")]
Self::Test(_) => "Test",
}
}
}

View File

@@ -1,6 +1,9 @@
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use futures::SinkExt;
use futures::{
stream::{SplitSink, SplitStream},
SinkExt, StreamExt,
};
use pageserver_api::{
models::{
PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest,
@@ -10,7 +13,6 @@ use pageserver_api::{
};
use tokio::task::JoinHandle;
use tokio_postgres::CopyOutStream;
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
use utils::{
id::{TenantId, TimelineId},
@@ -62,15 +64,28 @@ impl Client {
.client
.copy_both_simple(&format!("pagestream_v3 {tenant_id} {timeline_id}"))
.await?;
let (sink, stream) = copy_both.split(); // TODO: actually support splitting of the CopyBothDuplex so the lock inside this split adaptor goes away.
let Client {
cancel_on_client_drop,
conn_task,
client: _,
} = self;
let shared = Arc::new(Mutex::new(PagestreamShared::ConnTaskRunning(
ConnTaskRunning {
cancel_on_client_drop,
conn_task,
},
)));
Ok(PagestreamClient {
copy_both: Box::pin(copy_both),
conn_task,
cancel_on_client_drop,
sink: PagestreamSender {
shared: shared.clone(),
sink,
},
stream: PagestreamReceiver {
shared: shared.clone(),
stream,
},
shared,
})
}
@@ -97,7 +112,26 @@ impl Client {
/// Create using [`Client::pagestream`].
pub struct PagestreamClient {
copy_both: Pin<Box<tokio_postgres::CopyBothDuplex<bytes::Bytes>>>,
shared: Arc<Mutex<PagestreamShared>>,
sink: PagestreamSender,
stream: PagestreamReceiver,
}
pub struct PagestreamSender {
shared: Arc<Mutex<PagestreamShared>>,
sink: SplitSink<tokio_postgres::CopyBothDuplex<bytes::Bytes>, bytes::Bytes>,
}
pub struct PagestreamReceiver {
shared: Arc<Mutex<PagestreamShared>>,
stream: SplitStream<tokio_postgres::CopyBothDuplex<bytes::Bytes>>,
}
enum PagestreamShared {
ConnTaskRunning(ConnTaskRunning),
ConnTaskCancelledJoinHandleReturnedOrDropped,
}
struct ConnTaskRunning {
cancel_on_client_drop: Option<tokio_util::sync::DropGuard>,
conn_task: JoinHandle<()>,
}
@@ -110,11 +144,11 @@ pub struct RelTagBlockNo {
impl PagestreamClient {
pub async fn shutdown(self) {
let Self {
copy_both,
cancel_on_client_drop: cancel_conn_task,
conn_task,
} = self;
// The `copy_both` contains internal channel sender, the receiver of which is polled by `conn_task`.
shared,
sink,
stream,
} = { self };
// The `copy_both` split into `sink` and `stream` contains internal channel sender, the receiver of which is polled by `conn_task`.
// When `conn_task` observes the sender has been dropped, it sends a `FeMessage::CopyFail` into the connection.
// (see https://github.com/neondatabase/rust-postgres/blob/2005bf79573b8add5cf205b52a2b208e356cc8b0/tokio-postgres/src/copy_both.rs#L56).
//
@@ -131,36 +165,86 @@ impl PagestreamClient {
//
// NB: page_service doesn't have a use case to exit the `pagestream` mode currently.
// => https://github.com/neondatabase/neon/issues/6390
let _ = cancel_conn_task.unwrap();
let ConnTaskRunning {
cancel_on_client_drop,
conn_task,
} = {
let mut guard = shared.lock().unwrap();
match std::mem::replace(
&mut *guard,
PagestreamShared::ConnTaskCancelledJoinHandleReturnedOrDropped,
) {
PagestreamShared::ConnTaskRunning(conn_task_running) => conn_task_running,
PagestreamShared::ConnTaskCancelledJoinHandleReturnedOrDropped => unreachable!(),
}
};
let _ = cancel_on_client_drop.unwrap();
conn_task.await.unwrap();
drop(copy_both);
// Now drop the split copy_both.
drop(sink);
drop(stream);
}
pub fn split(self) -> (PagestreamSender, PagestreamReceiver) {
let Self {
shared,
sink,
stream,
} = self;
(sink, stream)
}
pub async fn getpage(
&mut self,
req: PagestreamGetPageRequest,
) -> anyhow::Result<PagestreamGetPageResponse> {
let req = PagestreamFeMessage::GetPage(req);
let req: bytes::Bytes = req.serialize();
// let mut req = tokio_util::io::ReaderStream::new(&req);
let mut req = tokio_stream::once(Ok(req));
self.getpage_send(req).await?;
self.getpage_recv().await
}
self.copy_both.send_all(&mut req).await?;
pub async fn getpage_send(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()> {
self.sink.getpage_send(req).await
}
let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await;
pub async fn getpage_recv(&mut self) -> anyhow::Result<PagestreamGetPageResponse> {
self.stream.getpage_recv().await
}
}
impl PagestreamSender {
// TODO: maybe make this impl Sink instead for better composability?
pub async fn send(&mut self, msg: PagestreamFeMessage) -> anyhow::Result<()> {
let msg = msg.serialize();
self.sink.send_all(&mut tokio_stream::once(Ok(msg))).await?;
Ok(())
}
pub async fn getpage_send(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()> {
self.send(PagestreamFeMessage::GetPage(req)).await
}
}
impl PagestreamReceiver {
pub async fn recv(&mut self) -> anyhow::Result<PagestreamBeMessage> {
let next: Option<Result<bytes::Bytes, _>> = self.stream.next().await;
let next: bytes::Bytes = next.unwrap()?;
Ok(PagestreamBeMessage::deserialize(next)?)
}
let msg = PagestreamBeMessage::deserialize(next)?;
match msg {
pub async fn getpage_recv(&mut self) -> anyhow::Result<PagestreamGetPageResponse> {
let next: PagestreamBeMessage = self.recv().await?;
match next {
PagestreamBeMessage::GetPage(p) => Ok(p),
PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e),
PagestreamBeMessage::Exists(_)
| PagestreamBeMessage::Nblocks(_)
| PagestreamBeMessage::DbSize(_)
| PagestreamBeMessage::GetSlruSegment(_) => {
| PagestreamBeMessage::GetSlruSegment(_)
| PagestreamBeMessage::Test(_) => {
anyhow::bail!(
"unexpected be message kind in response to getpage request: {}",
msg.kind()
next.kind()
)
}
}

View File

@@ -0,0 +1,63 @@
use std::{
io::{stdin, stdout, Read, Write},
time::Duration,
};
use clap::Parser;
use pageserver_api::models::{PagestreamGetPageRequest, PagestreamRequest, PagestreamTestRequest};
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
shard::TenantShardId,
};
#[derive(clap::Parser)]
struct Args {
connstr: String,
tenant_id: TenantId,
timeline_id: TimelineId,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let Args {
connstr,
tenant_id,
timeline_id,
} = Args::parse();
let client = pageserver_client::page_service::Client::new(connstr).await?;
let client = client.pagestream(tenant_id, timeline_id).await?;
let (mut sender, mut receiver) = client.split();
eprintln!("filling the pipe");
let mut msg = 0;
loop {
msg += 1;
let fut = sender.send(pageserver_api::models::PagestreamFeMessage::Test(
PagestreamTestRequest {
hdr: PagestreamRequest {
reqid: 0,
request_lsn: Lsn(23),
not_modified_since: Lsn(23),
},
batch_key: 42,
message: format!("message {}", msg),
},
));
let Ok(res) = tokio::time::timeout(Duration::from_secs(1), fut).await else {
eprintln!("pipe seems full");
break;
};
let _: () = res?;
}
stdout().write(b"R")?;
stdout().flush()?;
let mut buf = [0u8; 1];
stdin().read_exact(&mut buf)?;
eprintln!("termination signal received, exiting");
anyhow::Ok(())
}

View File

@@ -1472,6 +1472,8 @@ pub enum SmgrQueryType {
GetPageAtLsn,
GetDbSize,
GetSlruSegment,
#[cfg(feature = "testing")]
Test,
}
pub(crate) struct SmgrQueryTimePerTimeline {

View File

@@ -555,6 +555,12 @@ struct BatchedGetPageRequest {
timer: SmgrOpTimer,
}
#[cfg(feature = "testing")]
struct BatchedTestRequest {
req: models::PagestreamTestRequest,
timer: SmgrOpTimer,
}
enum BatchedFeMessage {
Exists {
span: Span,
@@ -586,6 +592,12 @@ enum BatchedFeMessage {
shard: timeline::handle::Handle<TenantManagerTypes>,
req: models::PagestreamGetSlruSegmentRequest,
},
#[cfg(feature = "testing")]
Test {
span: Span,
shard: timeline::handle::Handle<TenantManagerTypes>,
requests: Vec<BatchedTestRequest>,
},
RespondError {
span: Span,
error: BatchedPageStreamError,
@@ -606,6 +618,11 @@ impl BatchedFeMessage {
page.timer.observe_execution_start(at);
}
}
BatchedFeMessage::Test { requests, .. } => {
for req in requests {
req.timer.observe_execution_start(at);
}
}
BatchedFeMessage::RespondError { .. } => {}
}
}
@@ -866,6 +883,22 @@ impl PageServerHandler {
pages: smallvec::smallvec![BatchedGetPageRequest { req, timer }],
}
}
#[cfg(feature = "testing")]
PagestreamFeMessage::Test(req) => {
let span = tracing::info_span!(parent: parent_span, "handle_test_request");
let shard = timeline_handles
.get(tenant_id, timeline_id, ShardSelector::Zero)
.instrument(span.clone()) // sets `shard_id` field
.await?;
let timer =
record_op_start_and_throttle(&shard, metrics::SmgrQueryType::Test, received_at)
.await?;
BatchedFeMessage::Test {
span,
shard,
requests: vec![BatchedTestRequest { req, timer }],
}
}
};
Ok(Some(batched_msg))
}
@@ -928,6 +961,45 @@ impl PageServerHandler {
accum_pages.extend(this_pages);
Ok(())
}
(
Ok(BatchedFeMessage::Test {
shard: accum_shard,
requests: accum_requests,
..
}),
BatchedFeMessage::Test {
shard: this_shard,
requests: this_requests,
..
},
) if (|| {
assert!(this_requests.len() == 1);
if accum_requests.len() >= max_batch_size.get() {
trace!(%max_batch_size, "stopping batching because of batch size");
assert_eq!(accum_requests.len(), max_batch_size.get());
return false;
}
if (accum_shard.tenant_shard_id, accum_shard.timeline_id)
!= (this_shard.tenant_shard_id, this_shard.timeline_id)
{
trace!("stopping batching because timeline object mismatch");
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
// But the current logic for keeping responses in order does not support that.
return false;
}
let this_batch_key = this_requests[0].req.batch_key;
let accum_batch_key = accum_requests[0].req.batch_key;
if this_requests[0].req.batch_key != accum_requests[0].req.batch_key {
trace!(%accum_batch_key, %this_batch_key, "stopping batching because batch key changed");
return false;
}
true
})() =>
{
// ok to batch
accum_requests.extend(this_requests);
Ok(())
}
// something batched already but this message is unbatchable
(_, this_msg) => {
// by default, don't continue batching
@@ -1054,6 +1126,27 @@ impl PageServerHandler {
span,
)
}
#[cfg(feature = "testing")]
BatchedFeMessage::Test {
span,
shard,
requests,
} => {
fail::fail_point!("ps::handle-pagerequest-message::test");
(
{
let npages = requests.len();
trace!(npages, "handling getpage request");
let res = self
.handle_test_request_batch(&shard, requests, ctx)
.instrument(span.clone())
.await;
assert_eq!(res.len(), npages);
res
},
span,
)
}
BatchedFeMessage::RespondError { span, error } => {
// We've already decided to respond with an error, so we don't need to
// call the handler.
@@ -1780,6 +1873,54 @@ impl PageServerHandler {
))
}
#[cfg(feature = "testing")]
#[instrument(skip_all, fields(shard_id))]
async fn handle_test_request_batch(
&mut self,
timeline: &Timeline,
requests: Vec<BatchedTestRequest>,
ctx: &RequestContext,
) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>> {
use pageserver_api::models::PagestreamTestResponse;
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
// real requests would do something with the timeline
let mut results = Vec::with_capacity(requests.len());
for req in requests.iter() {
tokio::task::yield_now().await;
results.push({
if timeline.cancel.is_cancelled() {
Err(PageStreamError::Shutdown)
} else {
Ok(PagestreamTestResponse { req: req.req.clone() })
}
});
}
// TODO: avoid creating the new Vec here
Vec::from_iter(
requests
.into_iter()
.zip(results.into_iter())
.map(|(req, res)| {
res.map(|page| {
(
PagestreamBeMessage::Test(models::PagestreamTestResponse {
req: req.req.clone(),
}),
req.timer,
)
})
.map_err(|e| BatchedPageStreamError {
err: PageStreamError::from(e),
req: req.req.hdr,
})
}),
)
}
/// Note on "fullbackup":
/// Full basebackups should only be used for debugging purposes.
/// Originally, it was introduced to enable breaking storage format changes,

View File

@@ -1,19 +1,13 @@
# NB: there are benchmarks that double-serve as tests inside the `performance` directory.
import threading
import time
import subprocess
from pathlib import Path
import requests.exceptions
import fixtures
from fixtures.common_types import NodeId
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, StorageControllerApiException
from fixtures.neon_fixtures import NeonEnvBuilder
def test_slow_flush(neon_env_builder: NeonEnvBuilder):
tablesize_mib = 500
def test_slow_flush(neon_env_builder: NeonEnvBuilder, neon_binpath: Path):
def patch_pageserver_toml(config):
config["page_service_pipelining"] = {
"mode": "pipelined",
@@ -22,84 +16,30 @@ def test_slow_flush(neon_env_builder: NeonEnvBuilder):
}
neon_env_builder.pageserver_config_override = patch_pageserver_toml
neon_env_builder.num_pageservers = 2
env = neon_env_builder.init_start()
ep = env.endpoints.create_start(
"main",
config_lines=[
"max_parallel_workers_per_gather=0", # disable parallel backends
"effective_io_concurrency=100", # give plenty of opportunity for pipelining
"neon.readahead_buffer_size=128", # this is the default value at time of writing
"shared_buffers=128MB", # keep lower than tablesize_mib
# debug
"log_statement=all",
log.info("make flush appear slow")
ps_http = env.pageserver.http_client()
ps_http.configure_failpoints(("page_service:flush:pre", "return(10000000)"))
log.info("filling pipe")
child = subprocess.Popen(
[
neon_binpath / "test_helper_slow_client_reads",
env.pageserver.connstr(),
str(env.initial_tenant),
str(env.initial_timeline),
],
bufsize=0, # unbuffered
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
)
buf = child.stdout.read(1)
if len(buf) != 1:
raise Exception("unexpected EOF")
if buf != b"R":
raise Exception(f"unexpected data: {buf!r}")
log.info("helper reports pipe filled")
conn = ep.connect()
cur = conn.cursor()
cur.execute("CREATE EXTENSION IF NOT EXISTS neon;")
cur.execute("CREATE EXTENSION IF NOT EXISTS neon_test_utils;")
log.info("Filling the table")
cur.execute("CREATE TABLE t (data char(1000)) with (fillfactor=10)")
tablesize = tablesize_mib * 1024 * 1024
npages = tablesize // (8 * 1024)
cur.execute("INSERT INTO t SELECT generate_series(1, %s)", (npages,))
cur.close()
conn.close()
def workload(stop: threading.Event, max_iters=None):
iters = 0
while stop.is_set() is False and (max_iters == None or iters < max_iters):
log.info("Seqscan %d", iters)
conn = ep.connect()
cur = conn.cursor()
cur.execute(
"select clear_buffer_cache()"
) # TODO: what about LFC? doesn't matter right now because LFC isn't enabled by default in tests
cur.execute("select sum(data::bigint) from t")
assert cur.fetchall()[0][0] == npages * (npages + 1) // 2
iters += 1
log.info("workload done")
stop = threading.Event()
log.info("calibrating workload duration")
workload(stop, 1)
before = time.time()
workload(stop, 1)
after = time.time()
duration = after - before
log.info("duration: %f", duration)
assert(duration > 3)
log.info("begin")
threading.Thread(target=workload, args=[stop]).start()
# make flush appear slow
ps_http = [p.http_client() for p in env.pageservers]
ps_http[0].configure_failpoints(("page_service:flush:pre", "return(10000000)"))
ps_http[1].configure_failpoints(("page_service:flush:pre", "return(10000000)"))
time.sleep(1)
# try to shut down the tenant
for i in range(1, 10):
log.info(f"start migration {i}")
try:
env.storage_controller.tenant_shard_migrate(env.initial_tenant, (i % 2)+1)
except StorageControllerApiException as e:
log.info(f"shard migrate request failed: {e}")
while True:
node_id = NodeId(env.storage_controller.tenant_describe(env.initial_tenant)["node_id"])
if node_id == NodeId(i % 2)+1:
break
log.info(f"waiting for migration to complete")
time.sleep(1)
log.info(f"migration done")
time.sleep(1)
log.info("try to shut down the tenant")
env.pageserver.tenant_detach(env.initial_tenant)