WIP: page_service: add basic testcase for merging

The steps in the test work in neon_local + psql
but for some reason they don't work in the test.

Asked compute team on Slack for help:
https://neondb.slack.com/archives/C04DGM6SMTM/p1731952688386789
This commit is contained in:
Christian Schwarz
2024-11-18 20:25:44 +01:00
parent 9b6af2bcad
commit 0689965282
2 changed files with 79 additions and 1 deletions

View File

@@ -611,6 +611,7 @@ impl PageServerHandler {
)
}
#[instrument(skip_all, level = tracing::Level::TRACE)]
async fn read_batch_from_connection<IO>(
&mut self,
pgb: &mut PostgresBackend<IO>,
@@ -621,6 +622,8 @@ impl PageServerHandler {
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
let mut batch = self.next_batch.take();
let mut batch_started_at: Option<std::time::Instant> = None;
@@ -641,7 +644,8 @@ impl PageServerHandler {
msg
}
_ = sleep_fut => {
assert!(batch.is_some());
assert!(batch.is_some(), "batch_started_at => sleep_fut = futures::future::pending()");
trace!("batch timeout");
break None;
}
};
@@ -787,12 +791,14 @@ impl PageServerHandler {
) if async {
assert_eq!(this_pages.len(), 1);
if accum_pages.len() >= Timeline::MAX_GET_VECTORED_KEYS as usize {
trace!(%accum_lsn, %this_lsn, "stopping batching because of batch size");
assert_eq!(accum_pages.len(), Timeline::MAX_GET_VECTORED_KEYS as usize);
return false;
}
if (accum_shard.tenant_shard_id, accum_shard.timeline_id)
!= (this_shard.tenant_shard_id, this_shard.timeline_id)
{
trace!(%accum_lsn, %this_lsn, "stopping batching because timeline object mismatch");
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
// But the current logic for keeping responses in order does not support that.
return false;
@@ -800,6 +806,7 @@ impl PageServerHandler {
// the vectored get currently only supports a single LSN, so, bounce as soon
// as the effective request_lsn changes
if *accum_lsn != this_lsn {
trace!(%accum_lsn, %this_lsn, "stopping batching because LSN changed");
return false;
}
true
@@ -922,6 +929,7 @@ impl PageServerHandler {
(
{
let npages = pages.len();
trace!(npages, "handling getpage request");
let res = self
.handle_get_page_at_lsn_request_batched(
&shard,

View File

@@ -0,0 +1,70 @@
import psycopg2.extras
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.log_helper import log
def test_getpage_merge_smoke(neon_env_builder: NeonEnvBuilder):
"""
Do a bunch of sequential scans and ensure that the pageserver does some merging.
"""
def patch_config_toml(ps_cfg):
ps_cfg["server_side_batch_timeout"] = "100ms"
neon_env_builder.pageserver_config_override = patch_config_toml
env = neon_env_builder.init_start()
ps_http = env.pageserver.http_client()
endpoint = env.endpoints.create_start("main")
with endpoint.connect() as conn:
with conn.cursor() as cur:
# cur.execute("SET max_parallel_workers_per_gather=0") # disable parallel backends
cur.execute("SET effective_io_concurrency=100")
# cur.execute("SET neon.readahead_buffer_size=128")
# cur.execute("SET neon.flush_output_after=1")
cur.execute("CREATE EXTENSION IF NOT EXISTS neon;")
cur.execute("CREATE EXTENSION IF NOT EXISTS neon_test_utils;")
log.info("Filling the table")
cur.execute("CREATE TABLE t (data char(1000)) with (fillfactor=10)")
cur.execute("INSERT INTO t SELECT generate_series(1, 1024)")
# TODO: can we force postgres to doe sequential scans?
def get_metrics():
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute("select value from neon_perf_counters where metric='getpage_wait_seconds_count';")
compute = cur.fetchall()
pageserver_metrics = ps_http.get_metrics()
pageserver = {
"getpage_count": pageserver_metrics.query_one("pageserver_smgr_query_seconds_count", {"smgr_query_type": "get_page_at_lsn"}),
"vectored_get_count": pageserver_metrics.query_one("pageserver_get_vectored_seconds_count", {"task_kind": "PageRequestHandler"}),
}
return {
"pageserver": pageserver,
"compute": compute
}
log.info("Doing bunch of seqscans")
for i in range(4):
log.info("Seqscan %d", i)
if i == 1:
# round zero for warming up all the metrics
before = get_metrics()
cur.execute("select clear_buffer_cache()") # TODO: what about LFC? doesn't matter right now because LFC isn't enabled by default in tests
cur.execute("select sum(data::bigint) from t")
assert cur.fetchall()[0][0] == sum(range(1, 1024 + 1))
after = get_metrics()
import pdb; pdb.set_trace()
# TODO: assert that getpage counts roughly match between compute and ps
# TODO: assert that batching occurs by asserting that vectored get count is siginificantly less than getpage count