diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs index 13252037e5..f13b3709f5 100644 --- a/pageserver/src/disk_usage_eviction_task.rs +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -837,7 +837,30 @@ async fn collect_eviction_candidates( continue; } let info = tl.get_local_layers_for_disk_usage_eviction().await; - debug!(tenant_id=%tl.tenant_shard_id.tenant_id, shard_id=%tl.tenant_shard_id.shard_slug(), timeline_id=%tl.timeline_id, "timeline resident layers count: {}", info.resident_layers.len()); + debug!( + tenant_id=%tl.tenant_shard_id.tenant_id, + shard_id=%tl.tenant_shard_id.shard_slug(), + timeline_id=%tl.timeline_id, + "timeline resident layers count: {}", info.resident_layers.len() + ); + + tenant_candidates.extend(info.resident_layers.into_iter()); + max_layer_size = max_layer_size.max(info.max_layer_size.unwrap_or(0)); + + if cancel.is_cancelled() { + return Ok(EvictionCandidates::Cancelled); + } + } + + // Also consider layers of timelines being imported for eviction + for tl in tenant.list_importing_timelines() { + let info = tl.timeline.get_local_layers_for_disk_usage_eviction().await; + debug!( + tenant_id=%tl.timeline.tenant_shard_id.tenant_id, + shard_id=%tl.timeline.tenant_shard_id.shard_slug(), + timeline_id=%tl.timeline.timeline_id, + "timeline resident layers count: {}", info.resident_layers.len() + ); tenant_candidates.extend(info.resident_layers.into_iter()); max_layer_size = max_layer_size.max(info.max_layer_size.unwrap_or(0)); diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index d85d970583..451d266bc0 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -2442,6 +2442,17 @@ impl TenantShard { .collect() } + /// Lists timelines the tenant contains. + /// It's up to callers to omit certain timelines that are not considered ready for use. + pub fn list_importing_timelines(&self) -> Vec> { + self.timelines_importing + .lock() + .unwrap() + .values() + .map(Arc::clone) + .collect() + } + /// Lists timelines the tenant manages, including offloaded ones. /// /// It's up to callers to omit certain timelines that are not considered ready for use. diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 21d68495f7..fd65000379 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -1348,6 +1348,21 @@ impl RemoteTimelineClient { Ok(()) } + pub(crate) fn schedule_unlinking_of_layers_from_index_part( + self: &Arc, + names: I, + ) -> Result<(), NotInitialized> + where + I: IntoIterator, + { + let mut guard = self.upload_queue.lock().unwrap(); + let upload_queue = guard.initialized_mut()?; + + self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names); + + Ok(()) + } + /// Update the remote index file, removing the to-be-deleted files from the index, /// allowing scheduling of actual deletions later. fn schedule_unlinking_of_layers_from_index_part0( diff --git a/pageserver/src/tenant/timeline/import_pgdata.rs b/pageserver/src/tenant/timeline/import_pgdata.rs index bdb34ec3a3..f19a4b3e9c 100644 --- a/pageserver/src/tenant/timeline/import_pgdata.rs +++ b/pageserver/src/tenant/timeline/import_pgdata.rs @@ -8,6 +8,7 @@ use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use tracing::info; use utils::lsn::Lsn; +use utils::pausable_failpoint; use utils::sync::gate::Gate; use super::{Timeline, TimelineDeleteProgress}; @@ -110,6 +111,8 @@ pub async fn doit( .schedule_index_upload_for_file_changes()?; timeline.remote_client.wait_completion().await?; + pausable_failpoint!("import-timeline-pre-success-notify-pausable"); + // Communicate that shard is done. // Ensure at-least-once delivery of the upcall to storage controller // before we mark the task as done and never come here again. diff --git a/pageserver/src/tenant/timeline/import_pgdata/flow.rs b/pageserver/src/tenant/timeline/import_pgdata/flow.rs index 0d87a2f135..9743aa3f26 100644 --- a/pageserver/src/tenant/timeline/import_pgdata/flow.rs +++ b/pageserver/src/tenant/timeline/import_pgdata/flow.rs @@ -982,6 +982,15 @@ impl ChunkProcessingJob { .cloned(); match existing_layer { Some(existing) => { + // Unlink the remote layer from the index without scheduling its deletion. + // When `existing_layer` drops [`LayerInner::drop`] will schedule its deletion from + // remote storage, but that assumes that the layer was unlinked from the index first. + timeline + .remote_client + .schedule_unlinking_of_layers_from_index_part(std::iter::once( + existing.layer_desc().layer_name(), + ))?; + guard.open_mut()?.rewrite_layers( &[(existing.clone(), resident_layer.clone())], &[], diff --git a/test_runner/fixtures/fast_import.py b/test_runner/fixtures/fast_import.py index f9e5f9c1db..bd6dc2583b 100644 --- a/test_runner/fixtures/fast_import.py +++ b/test_runner/fixtures/fast_import.py @@ -1,3 +1,4 @@ +import json import os import shutil import subprocess @@ -11,6 +12,7 @@ from _pytest.config import Config from fixtures.log_helper import log from fixtures.neon_cli import AbstractNeonCli +from fixtures.neon_fixtures import Endpoint, VanillaPostgres from fixtures.pg_version import PgVersion from fixtures.remote_storage import MockS3Server @@ -161,3 +163,57 @@ def fast_import( f.write(fi.cmd.stderr) log.info("Written logs to %s", test_output_dir) + + +def mock_import_bucket(vanilla_pg: VanillaPostgres, path: Path): + """ + Mock the import S3 bucket into a local directory for a provided vanilla PG instance. + """ + assert not vanilla_pg.is_running() + + path.mkdir() + # what cplane writes before scheduling fast_import + specpath = path / "spec.json" + specpath.write_text(json.dumps({"branch_id": "somebranch", "project_id": "someproject"})) + # what fast_import writes + vanilla_pg.pgdatadir.rename(path / "pgdata") + statusdir = path / "status" + statusdir.mkdir() + (statusdir / "pgdata").write_text(json.dumps({"done": True})) + (statusdir / "fast_import").write_text(json.dumps({"command": "pgdata", "done": True})) + + +def populate_vanilla_pg(vanilla_pg: VanillaPostgres, target_relblock_size: int) -> int: + assert vanilla_pg.is_running() + + vanilla_pg.safe_psql("create user cloud_admin with password 'postgres' superuser") + # fillfactor so we don't need to produce that much data + # 900 byte per row is > 10% => 1 row per page + vanilla_pg.safe_psql("""create table t (data char(900)) with (fillfactor = 10)""") + + nrows = 0 + while True: + relblock_size = vanilla_pg.safe_psql_scalar("select pg_relation_size('t')") + log.info( + f"relblock size: {relblock_size / 8192} pages (target: {target_relblock_size // 8192}) pages" + ) + if relblock_size >= target_relblock_size: + break + addrows = int((target_relblock_size - relblock_size) // 8192) + assert addrows >= 1, "forward progress" + vanilla_pg.safe_psql( + f"insert into t select generate_series({nrows + 1}, {nrows + addrows})" + ) + nrows += addrows + + return nrows + + +def validate_import_from_vanilla_pg(endpoint: Endpoint, nrows: int): + assert endpoint.safe_psql_many( + [ + "set effective_io_concurrency=32;", + "SET statement_timeout='300s';", + "select count(*), sum(data::bigint)::bigint from t", + ] + ) == [[], [], [(nrows, nrows * (nrows + 1) // 2)]] diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index eedeb4f696..ab4885ce6b 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2829,6 +2829,11 @@ class NeonPageserver(PgProtocol, LogUtils): if self.running: self.http_client().configure_failpoints([(name, action)]) + def clear_persistent_failpoint(self, name: str): + del self._persistent_failpoints[name] + if self.running: + self.http_client().configure_failpoints([(name, "off")]) + def timeline_dir( self, tenant_shard_id: TenantId | TenantShardId, diff --git a/test_runner/regress/test_disk_usage_eviction.py b/test_runner/regress/test_disk_usage_eviction.py index b29610e021..1420dc59a1 100644 --- a/test_runner/regress/test_disk_usage_eviction.py +++ b/test_runner/regress/test_disk_usage_eviction.py @@ -1,31 +1,41 @@ from __future__ import annotations import enum +import json import time from collections import Counter from dataclasses import dataclass from enum import StrEnum +from threading import Event from typing import TYPE_CHECKING import pytest from fixtures.common_types import Lsn, TenantId, TimelineId +from fixtures.fast_import import mock_import_bucket, populate_vanilla_pg from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonEnv, NeonEnvBuilder, NeonPageserver, PgBin, + VanillaPostgres, wait_for_last_flush_lsn, ) +from fixtures.pageserver.http import ( + ImportPgdataIdemptencyKey, +) from fixtures.pageserver.utils import wait_for_upload_queue_empty from fixtures.remote_storage import RemoteStorageKind -from fixtures.utils import human_bytes, wait_until +from fixtures.utils import human_bytes, run_only_on_default_postgres, wait_until +from werkzeug.wrappers.response import Response if TYPE_CHECKING: from collections.abc import Iterable from typing import Any from fixtures.pageserver.http import PageserverHttpClient + from pytest_httpserver import HTTPServer + from werkzeug.wrappers.request import Request GLOBAL_LRU_LOG_LINE = "tenant_min_resident_size-respecting LRU would not relieve pressure, evicting more following global LRU policy" @@ -164,6 +174,7 @@ class EvictionEnv: min_avail_bytes, mock_behavior, eviction_order: EvictionOrder, + wait_logical_size: bool = True, ): """ Starts pageserver up with mocked statvfs setup. The startup is @@ -201,11 +212,12 @@ class EvictionEnv: pageserver.start() # we now do initial logical size calculation on startup, which on debug builds can fight with disk usage based eviction - for tenant_id, timeline_id in self.timelines: - tenant_ps = self.neon_env.get_tenant_pageserver(tenant_id) - # Pageserver may be none if we are currently not attached anywhere, e.g. during secondary eviction test - if tenant_ps is not None: - tenant_ps.http_client().timeline_wait_logical_size(tenant_id, timeline_id) + if wait_logical_size: + for tenant_id, timeline_id in self.timelines: + tenant_ps = self.neon_env.get_tenant_pageserver(tenant_id) + # Pageserver may be none if we are currently not attached anywhere, e.g. during secondary eviction test + if tenant_ps is not None: + tenant_ps.http_client().timeline_wait_logical_size(tenant_id, timeline_id) def statvfs_called(): pageserver.assert_log_contains(".*running mocked statvfs.*") @@ -882,3 +894,121 @@ def test_secondary_mode_eviction(eviction_env_ha: EvictionEnv): assert total_size - post_eviction_total_size >= evict_bytes, ( "we requested at least evict_bytes worth of free space" ) + + +@run_only_on_default_postgres(reason="PG version is irrelevant here") +def test_import_timeline_disk_pressure_eviction( + neon_env_builder: NeonEnvBuilder, + vanilla_pg: VanillaPostgres, + make_httpserver: HTTPServer, + pg_bin: PgBin, +): + """ + TODO + """ + # Set up mock control plane HTTP server to listen for import completions + import_completion_signaled = Event() + + def handler(request: Request) -> Response: + log.info(f"control plane /import_complete request: {request.json}") + import_completion_signaled.set() + return Response(json.dumps({}), status=200) + + cplane_mgmt_api_server = make_httpserver + cplane_mgmt_api_server.expect_request( + "/storage/api/v1/import_complete", method="PUT" + ).respond_with_handler(handler) + + # Plug the cplane mock in + neon_env_builder.control_plane_hooks_api = ( + f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/storage/api/v1/" + ) + + # The import will specifiy a local filesystem path mocking remote storage + neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS) + + vanilla_pg.start() + target_relblock_size = 1024 * 1024 * 128 + populate_vanilla_pg(vanilla_pg, target_relblock_size) + vanilla_pg.stop() + + env = neon_env_builder.init_configs() + env.start() + + importbucket_path = neon_env_builder.repo_dir / "test_import_completion_bucket" + mock_import_bucket(vanilla_pg, importbucket_path) + + tenant_id = TenantId.generate() + timeline_id = TimelineId.generate() + idempotency = ImportPgdataIdemptencyKey.random() + + eviction_env = EvictionEnv( + timelines=[(tenant_id, timeline_id)], + neon_env=env, + pageserver_http=env.pageserver.http_client(), + layer_size=5 * 1024 * 1024, # Doesn't apply here + pg_bin=pg_bin, # Not used here + pgbench_init_lsns={}, # Not used here + ) + + # Pause before delivering the final notification to storcon. + # This keeps the import in progress. + failpoint_name = "import-timeline-pre-success-notify-pausable" + env.pageserver.add_persistent_failpoint(failpoint_name, "pause") + + env.storage_controller.tenant_create(tenant_id) + env.storage_controller.timeline_create( + tenant_id, + { + "new_timeline_id": str(timeline_id), + "import_pgdata": { + "idempotency_key": str(idempotency), + "location": {"LocalFs": {"path": str(importbucket_path.absolute())}}, + }, + }, + ) + + def hit_failpoint(): + log.info("Checking log for pattern...") + try: + assert env.pageserver.log_contains(f".*at failpoint {failpoint_name}.*") + except Exception: + log.exception("Failed to find pattern in log") + raise + + wait_until(hit_failpoint) + assert not import_completion_signaled.is_set() + + env.pageserver.stop() + + total_size, _, _ = eviction_env.timelines_du(env.pageserver) + blocksize = 512 + total_blocks = (total_size + (blocksize - 1)) // blocksize + + eviction_env.pageserver_start_with_disk_usage_eviction( + env.pageserver, + period="1s", + max_usage_pct=33, + min_avail_bytes=0, + mock_behavior={ + "type": "Success", + "blocksize": blocksize, + "total_blocks": total_blocks, + # Only count layer files towards used bytes in the mock_statvfs. + # This avoids accounting for metadata files & tenant conf in the tests. + "name_filter": ".*__.*", + }, + eviction_order=EvictionOrder.RELATIVE_ORDER_SPARE, + wait_logical_size=False, + ) + + wait_until(lambda: env.pageserver.assert_log_contains(".*disk usage pressure relieved")) + + env.pageserver.clear_persistent_failpoint(failpoint_name) + + def cplane_notified(): + assert import_completion_signaled.is_set() + + wait_until(cplane_notified) + + env.pageserver.allowed_errors.append(r".* running disk usage based eviction due to pressure.*") diff --git a/test_runner/regress/test_import_pgdata.py b/test_runner/regress/test_import_pgdata.py index 262ec9b06c..ba60c3caa6 100644 --- a/test_runner/regress/test_import_pgdata.py +++ b/test_runner/regress/test_import_pgdata.py @@ -12,7 +12,12 @@ import psycopg2 import psycopg2.errors import pytest from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineId -from fixtures.fast_import import FastImport +from fixtures.fast_import import ( + FastImport, + mock_import_bucket, + populate_vanilla_pg, + validate_import_from_vanilla_pg, +) from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonEnvBuilder, @@ -60,24 +65,6 @@ smoke_params = [ ] -def mock_import_bucket(vanilla_pg: VanillaPostgres, path: Path): - """ - Mock the import S3 bucket into a local directory for a provided vanilla PG instance. - """ - assert not vanilla_pg.is_running() - - path.mkdir() - # what cplane writes before scheduling fast_import - specpath = path / "spec.json" - specpath.write_text(json.dumps({"branch_id": "somebranch", "project_id": "someproject"})) - # what fast_import writes - vanilla_pg.pgdatadir.rename(path / "pgdata") - statusdir = path / "status" - statusdir.mkdir() - (statusdir / "pgdata").write_text(json.dumps({"done": True})) - (statusdir / "fast_import").write_text(json.dumps({"command": "pgdata", "done": True})) - - @skip_in_debug_build("MULTIPLE_RELATION_SEGMENTS has non trivial amount of data") @pytest.mark.parametrize("shard_count,stripe_size,rel_block_size", smoke_params) def test_pgdata_import_smoke( @@ -132,10 +119,6 @@ def test_pgdata_import_smoke( # Put data in vanilla pg # - vanilla_pg.start() - vanilla_pg.safe_psql("create user cloud_admin with password 'postgres' superuser") - - log.info("create relblock data") if rel_block_size == RelBlockSize.ONE_STRIPE_SIZE: target_relblock_size = stripe_size * 8192 elif rel_block_size == RelBlockSize.TWO_STRPES_PER_SHARD: @@ -146,45 +129,8 @@ def test_pgdata_import_smoke( else: raise ValueError - # fillfactor so we don't need to produce that much data - # 900 byte per row is > 10% => 1 row per page - vanilla_pg.safe_psql("""create table t (data char(900)) with (fillfactor = 10)""") - - nrows = 0 - while True: - relblock_size = vanilla_pg.safe_psql_scalar("select pg_relation_size('t')") - log.info( - f"relblock size: {relblock_size / 8192} pages (target: {target_relblock_size // 8192}) pages" - ) - if relblock_size >= target_relblock_size: - break - addrows = int((target_relblock_size - relblock_size) // 8192) - assert addrows >= 1, "forward progress" - vanilla_pg.safe_psql( - f"insert into t select generate_series({nrows + 1}, {nrows + addrows})" - ) - nrows += addrows - expect_nrows = nrows - expect_sum = ( - (nrows) * (nrows + 1) // 2 - ) # https://stackoverflow.com/questions/43901484/sum-of-the-integers-from-1-to-n - - def validate_vanilla_equivalence(ep): - # TODO: would be nicer to just compare pgdump - - # Enable IO concurrency for batching on large sequential scan, to avoid making - # this test unnecessarily onerous on CPU. Especially on debug mode, it's still - # pretty onerous though, so increase statement_timeout to avoid timeouts. - assert ep.safe_psql_many( - [ - "set effective_io_concurrency=32;", - "SET statement_timeout='300s';", - "select count(*), sum(data::bigint)::bigint from t", - ] - ) == [[], [], [(expect_nrows, expect_sum)]] - - validate_vanilla_equivalence(vanilla_pg) - + vanilla_pg.start() + rows_inserted = populate_vanilla_pg(vanilla_pg, target_relblock_size) vanilla_pg.stop() # @@ -275,14 +221,14 @@ def test_pgdata_import_smoke( config_lines=ep_config, ) - validate_vanilla_equivalence(ro_endpoint) + validate_import_from_vanilla_pg(ro_endpoint, rows_inserted) # ensure the import survives restarts ro_endpoint.stop() env.pageserver.stop(immediate=True) env.pageserver.start() ro_endpoint.start() - validate_vanilla_equivalence(ro_endpoint) + validate_import_from_vanilla_pg(ro_endpoint, rows_inserted) # # validate the layer files in each shard only have the shard-specific data @@ -322,7 +268,7 @@ def test_pgdata_import_smoke( child_workload = workload.branch(timeline_id=child_timeline_id, branch_name="br-tip") child_workload.validate() - validate_vanilla_equivalence(child_workload.endpoint()) + validate_import_from_vanilla_pg(child_workload.endpoint(), rows_inserted) # ... at the initdb lsn _ = env.create_branch( @@ -337,7 +283,7 @@ def test_pgdata_import_smoke( tenant_id=tenant_id, config_lines=ep_config, ) - validate_vanilla_equivalence(br_initdb_endpoint) + validate_import_from_vanilla_pg(br_initdb_endpoint, rows_inserted) with pytest.raises(psycopg2.errors.UndefinedTable): br_initdb_endpoint.safe_psql(f"select * from {workload.table}") @@ -578,23 +524,8 @@ def test_import_chaos( neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS) vanilla_pg.start() - vanilla_pg.safe_psql("create user cloud_admin with password 'postgres' superuser") - vanilla_pg.safe_psql("""create table t (data char(900)) with (fillfactor = 10)""") - nrows = 0 - while True: - relblock_size = vanilla_pg.safe_psql_scalar("select pg_relation_size('t')") - log.info( - f"relblock size: {relblock_size / 8192} pages (target: {TARGET_RELBOCK_SIZE // 8192}) pages" - ) - if relblock_size >= TARGET_RELBOCK_SIZE: - break - addrows = int((TARGET_RELBOCK_SIZE - relblock_size) // 8192) - assert addrows >= 1, "forward progress" - vanilla_pg.safe_psql( - f"insert into t select generate_series({nrows + 1}, {nrows + addrows})" - ) - nrows += addrows + inserted_rows = populate_vanilla_pg(vanilla_pg, TARGET_RELBOCK_SIZE) vanilla_pg.stop() @@ -762,13 +693,7 @@ def test_import_chaos( endpoint = env.endpoints.create_start(branch_name=import_branch_name, tenant_id=tenant_id) # Validate the imported data is legit - assert endpoint.safe_psql_many( - [ - "set effective_io_concurrency=32;", - "SET statement_timeout='300s';", - "select count(*), sum(data::bigint)::bigint from t", - ] - ) == [[], [], [(nrows, nrows * (nrows + 1) // 2)]] + validate_import_from_vanilla_pg(endpoint, inserted_rows) endpoint.stop()