From 06899652826ac71a2eb2e693bbacbf369790d477 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 18 Nov 2024 20:25:44 +0100 Subject: [PATCH] WIP: page_service: add basic testcase for merging The steps in the test work in neon_local + psql but for some reason they don't work in the test. Asked compute team on Slack for help: https://neondb.slack.com/archives/C04DGM6SMTM/p1731952688386789 --- pageserver/src/page_service.rs | 10 ++- .../regress/test_pageserver_getpage_merge.py | 70 +++++++++++++++++++ 2 files changed, 79 insertions(+), 1 deletion(-) create mode 100644 test_runner/regress/test_pageserver_getpage_merge.py diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index a429dff1fd..e9eb4bfe65 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -611,6 +611,7 @@ impl PageServerHandler { ) } + #[instrument(skip_all, level = tracing::Level::TRACE)] async fn read_batch_from_connection( &mut self, pgb: &mut PostgresBackend, @@ -621,6 +622,8 @@ impl PageServerHandler { where IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, { + debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id(); + let mut batch = self.next_batch.take(); let mut batch_started_at: Option = None; @@ -641,7 +644,8 @@ impl PageServerHandler { msg } _ = sleep_fut => { - assert!(batch.is_some()); + assert!(batch.is_some(), "batch_started_at => sleep_fut = futures::future::pending()"); + trace!("batch timeout"); break None; } }; @@ -787,12 +791,14 @@ impl PageServerHandler { ) if async { assert_eq!(this_pages.len(), 1); if accum_pages.len() >= Timeline::MAX_GET_VECTORED_KEYS as usize { + trace!(%accum_lsn, %this_lsn, "stopping batching because of batch size"); assert_eq!(accum_pages.len(), Timeline::MAX_GET_VECTORED_KEYS as usize); return false; } if (accum_shard.tenant_shard_id, accum_shard.timeline_id) != (this_shard.tenant_shard_id, this_shard.timeline_id) { + trace!(%accum_lsn, %this_lsn, "stopping batching because timeline object mismatch"); // TODO: we _could_ batch & execute each shard seperately (and in parallel). // But the current logic for keeping responses in order does not support that. return false; @@ -800,6 +806,7 @@ impl PageServerHandler { // the vectored get currently only supports a single LSN, so, bounce as soon // as the effective request_lsn changes if *accum_lsn != this_lsn { + trace!(%accum_lsn, %this_lsn, "stopping batching because LSN changed"); return false; } true @@ -922,6 +929,7 @@ impl PageServerHandler { ( { let npages = pages.len(); + trace!(npages, "handling getpage request"); let res = self .handle_get_page_at_lsn_request_batched( &shard, diff --git a/test_runner/regress/test_pageserver_getpage_merge.py b/test_runner/regress/test_pageserver_getpage_merge.py new file mode 100644 index 0000000000..cdfc808d9d --- /dev/null +++ b/test_runner/regress/test_pageserver_getpage_merge.py @@ -0,0 +1,70 @@ + +import psycopg2.extras +from fixtures.neon_fixtures import NeonEnvBuilder +from fixtures.log_helper import log + +def test_getpage_merge_smoke(neon_env_builder: NeonEnvBuilder): + """ + Do a bunch of sequential scans and ensure that the pageserver does some merging. + """ + + def patch_config_toml(ps_cfg): + ps_cfg["server_side_batch_timeout"] = "100ms" + + neon_env_builder.pageserver_config_override = patch_config_toml + env = neon_env_builder.init_start() + + ps_http = env.pageserver.http_client() + + endpoint = env.endpoints.create_start("main") + + with endpoint.connect() as conn: + with conn.cursor() as cur: + + # cur.execute("SET max_parallel_workers_per_gather=0") # disable parallel backends + cur.execute("SET effective_io_concurrency=100") + # cur.execute("SET neon.readahead_buffer_size=128") + # cur.execute("SET neon.flush_output_after=1") + + cur.execute("CREATE EXTENSION IF NOT EXISTS neon;") + cur.execute("CREATE EXTENSION IF NOT EXISTS neon_test_utils;") + + log.info("Filling the table") + cur.execute("CREATE TABLE t (data char(1000)) with (fillfactor=10)") + cur.execute("INSERT INTO t SELECT generate_series(1, 1024)") + # TODO: can we force postgres to doe sequential scans? + + def get_metrics(): + with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: + cur.execute("select value from neon_perf_counters where metric='getpage_wait_seconds_count';") + compute = cur.fetchall() + + pageserver_metrics = ps_http.get_metrics() + pageserver = { + "getpage_count": pageserver_metrics.query_one("pageserver_smgr_query_seconds_count", {"smgr_query_type": "get_page_at_lsn"}), + "vectored_get_count": pageserver_metrics.query_one("pageserver_get_vectored_seconds_count", {"task_kind": "PageRequestHandler"}), + } + return { + "pageserver": pageserver, + "compute": compute + } + + log.info("Doing bunch of seqscans") + + for i in range(4): + log.info("Seqscan %d", i) + if i == 1: + # round zero for warming up all the metrics + before = get_metrics() + cur.execute("select clear_buffer_cache()") # TODO: what about LFC? doesn't matter right now because LFC isn't enabled by default in tests + cur.execute("select sum(data::bigint) from t") + assert cur.fetchall()[0][0] == sum(range(1, 1024 + 1)) + + after = get_metrics() + + + import pdb; pdb.set_trace() + # TODO: assert that getpage counts roughly match between compute and ps + # TODO: assert that batching occurs by asserting that vectored get count is siginificantly less than getpage count + +