mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-26 07:39:58 +00:00
pageserver: hook importing timelines up into disk usage eviction (#12038)
## Problem Disk usage eviction isn't sensitive to layers of imported timelines. ## Summary of changes Hook importing timelines up into eviction and add a test for it. I don't think we need any special eviction logic for this. These layers will all be visible and their access time will be their creation time. Hence, we'll remove covered layers first and get to the imported layers if there's still disk pressure.
This commit is contained in:
@@ -837,7 +837,30 @@ async fn collect_eviction_candidates(
|
||||
continue;
|
||||
}
|
||||
let info = tl.get_local_layers_for_disk_usage_eviction().await;
|
||||
debug!(tenant_id=%tl.tenant_shard_id.tenant_id, shard_id=%tl.tenant_shard_id.shard_slug(), timeline_id=%tl.timeline_id, "timeline resident layers count: {}", info.resident_layers.len());
|
||||
debug!(
|
||||
tenant_id=%tl.tenant_shard_id.tenant_id,
|
||||
shard_id=%tl.tenant_shard_id.shard_slug(),
|
||||
timeline_id=%tl.timeline_id,
|
||||
"timeline resident layers count: {}", info.resident_layers.len()
|
||||
);
|
||||
|
||||
tenant_candidates.extend(info.resident_layers.into_iter());
|
||||
max_layer_size = max_layer_size.max(info.max_layer_size.unwrap_or(0));
|
||||
|
||||
if cancel.is_cancelled() {
|
||||
return Ok(EvictionCandidates::Cancelled);
|
||||
}
|
||||
}
|
||||
|
||||
// Also consider layers of timelines being imported for eviction
|
||||
for tl in tenant.list_importing_timelines() {
|
||||
let info = tl.timeline.get_local_layers_for_disk_usage_eviction().await;
|
||||
debug!(
|
||||
tenant_id=%tl.timeline.tenant_shard_id.tenant_id,
|
||||
shard_id=%tl.timeline.tenant_shard_id.shard_slug(),
|
||||
timeline_id=%tl.timeline.timeline_id,
|
||||
"timeline resident layers count: {}", info.resident_layers.len()
|
||||
);
|
||||
|
||||
tenant_candidates.extend(info.resident_layers.into_iter());
|
||||
max_layer_size = max_layer_size.max(info.max_layer_size.unwrap_or(0));
|
||||
|
||||
@@ -2442,6 +2442,17 @@ impl TenantShard {
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Lists timelines the tenant contains.
|
||||
/// It's up to callers to omit certain timelines that are not considered ready for use.
|
||||
pub fn list_importing_timelines(&self) -> Vec<Arc<ImportingTimeline>> {
|
||||
self.timelines_importing
|
||||
.lock()
|
||||
.unwrap()
|
||||
.values()
|
||||
.map(Arc::clone)
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Lists timelines the tenant manages, including offloaded ones.
|
||||
///
|
||||
/// It's up to callers to omit certain timelines that are not considered ready for use.
|
||||
|
||||
@@ -1348,6 +1348,21 @@ impl RemoteTimelineClient {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn schedule_unlinking_of_layers_from_index_part<I>(
|
||||
self: &Arc<Self>,
|
||||
names: I,
|
||||
) -> Result<(), NotInitialized>
|
||||
where
|
||||
I: IntoIterator<Item = LayerName>,
|
||||
{
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
|
||||
self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Update the remote index file, removing the to-be-deleted files from the index,
|
||||
/// allowing scheduling of actual deletions later.
|
||||
fn schedule_unlinking_of_layers_from_index_part0<I>(
|
||||
|
||||
@@ -8,6 +8,7 @@ use tokio::task::JoinHandle;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::info;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::pausable_failpoint;
|
||||
use utils::sync::gate::Gate;
|
||||
|
||||
use super::{Timeline, TimelineDeleteProgress};
|
||||
@@ -110,6 +111,8 @@ pub async fn doit(
|
||||
.schedule_index_upload_for_file_changes()?;
|
||||
timeline.remote_client.wait_completion().await?;
|
||||
|
||||
pausable_failpoint!("import-timeline-pre-success-notify-pausable");
|
||||
|
||||
// Communicate that shard is done.
|
||||
// Ensure at-least-once delivery of the upcall to storage controller
|
||||
// before we mark the task as done and never come here again.
|
||||
|
||||
@@ -982,6 +982,15 @@ impl ChunkProcessingJob {
|
||||
.cloned();
|
||||
match existing_layer {
|
||||
Some(existing) => {
|
||||
// Unlink the remote layer from the index without scheduling its deletion.
|
||||
// When `existing_layer` drops [`LayerInner::drop`] will schedule its deletion from
|
||||
// remote storage, but that assumes that the layer was unlinked from the index first.
|
||||
timeline
|
||||
.remote_client
|
||||
.schedule_unlinking_of_layers_from_index_part(std::iter::once(
|
||||
existing.layer_desc().layer_name(),
|
||||
))?;
|
||||
|
||||
guard.open_mut()?.rewrite_layers(
|
||||
&[(existing.clone(), resident_layer.clone())],
|
||||
&[],
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
@@ -11,6 +12,7 @@ from _pytest.config import Config
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_cli import AbstractNeonCli
|
||||
from fixtures.neon_fixtures import Endpoint, VanillaPostgres
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.remote_storage import MockS3Server
|
||||
|
||||
@@ -161,3 +163,57 @@ def fast_import(
|
||||
f.write(fi.cmd.stderr)
|
||||
|
||||
log.info("Written logs to %s", test_output_dir)
|
||||
|
||||
|
||||
def mock_import_bucket(vanilla_pg: VanillaPostgres, path: Path):
|
||||
"""
|
||||
Mock the import S3 bucket into a local directory for a provided vanilla PG instance.
|
||||
"""
|
||||
assert not vanilla_pg.is_running()
|
||||
|
||||
path.mkdir()
|
||||
# what cplane writes before scheduling fast_import
|
||||
specpath = path / "spec.json"
|
||||
specpath.write_text(json.dumps({"branch_id": "somebranch", "project_id": "someproject"}))
|
||||
# what fast_import writes
|
||||
vanilla_pg.pgdatadir.rename(path / "pgdata")
|
||||
statusdir = path / "status"
|
||||
statusdir.mkdir()
|
||||
(statusdir / "pgdata").write_text(json.dumps({"done": True}))
|
||||
(statusdir / "fast_import").write_text(json.dumps({"command": "pgdata", "done": True}))
|
||||
|
||||
|
||||
def populate_vanilla_pg(vanilla_pg: VanillaPostgres, target_relblock_size: int) -> int:
|
||||
assert vanilla_pg.is_running()
|
||||
|
||||
vanilla_pg.safe_psql("create user cloud_admin with password 'postgres' superuser")
|
||||
# fillfactor so we don't need to produce that much data
|
||||
# 900 byte per row is > 10% => 1 row per page
|
||||
vanilla_pg.safe_psql("""create table t (data char(900)) with (fillfactor = 10)""")
|
||||
|
||||
nrows = 0
|
||||
while True:
|
||||
relblock_size = vanilla_pg.safe_psql_scalar("select pg_relation_size('t')")
|
||||
log.info(
|
||||
f"relblock size: {relblock_size / 8192} pages (target: {target_relblock_size // 8192}) pages"
|
||||
)
|
||||
if relblock_size >= target_relblock_size:
|
||||
break
|
||||
addrows = int((target_relblock_size - relblock_size) // 8192)
|
||||
assert addrows >= 1, "forward progress"
|
||||
vanilla_pg.safe_psql(
|
||||
f"insert into t select generate_series({nrows + 1}, {nrows + addrows})"
|
||||
)
|
||||
nrows += addrows
|
||||
|
||||
return nrows
|
||||
|
||||
|
||||
def validate_import_from_vanilla_pg(endpoint: Endpoint, nrows: int):
|
||||
assert endpoint.safe_psql_many(
|
||||
[
|
||||
"set effective_io_concurrency=32;",
|
||||
"SET statement_timeout='300s';",
|
||||
"select count(*), sum(data::bigint)::bigint from t",
|
||||
]
|
||||
) == [[], [], [(nrows, nrows * (nrows + 1) // 2)]]
|
||||
|
||||
@@ -2829,6 +2829,11 @@ class NeonPageserver(PgProtocol, LogUtils):
|
||||
if self.running:
|
||||
self.http_client().configure_failpoints([(name, action)])
|
||||
|
||||
def clear_persistent_failpoint(self, name: str):
|
||||
del self._persistent_failpoints[name]
|
||||
if self.running:
|
||||
self.http_client().configure_failpoints([(name, "off")])
|
||||
|
||||
def timeline_dir(
|
||||
self,
|
||||
tenant_shard_id: TenantId | TenantShardId,
|
||||
|
||||
@@ -1,31 +1,41 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import enum
|
||||
import json
|
||||
import time
|
||||
from collections import Counter
|
||||
from dataclasses import dataclass
|
||||
from enum import StrEnum
|
||||
from threading import Event
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import pytest
|
||||
from fixtures.common_types import Lsn, TenantId, TimelineId
|
||||
from fixtures.fast_import import mock_import_bucket, populate_vanilla_pg
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
NeonPageserver,
|
||||
PgBin,
|
||||
VanillaPostgres,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
from fixtures.pageserver.http import (
|
||||
ImportPgdataIdemptencyKey,
|
||||
)
|
||||
from fixtures.pageserver.utils import wait_for_upload_queue_empty
|
||||
from fixtures.remote_storage import RemoteStorageKind
|
||||
from fixtures.utils import human_bytes, wait_until
|
||||
from fixtures.utils import human_bytes, run_only_on_default_postgres, wait_until
|
||||
from werkzeug.wrappers.response import Response
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Iterable
|
||||
from typing import Any
|
||||
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from pytest_httpserver import HTTPServer
|
||||
from werkzeug.wrappers.request import Request
|
||||
|
||||
|
||||
GLOBAL_LRU_LOG_LINE = "tenant_min_resident_size-respecting LRU would not relieve pressure, evicting more following global LRU policy"
|
||||
@@ -164,6 +174,7 @@ class EvictionEnv:
|
||||
min_avail_bytes,
|
||||
mock_behavior,
|
||||
eviction_order: EvictionOrder,
|
||||
wait_logical_size: bool = True,
|
||||
):
|
||||
"""
|
||||
Starts pageserver up with mocked statvfs setup. The startup is
|
||||
@@ -201,11 +212,12 @@ class EvictionEnv:
|
||||
pageserver.start()
|
||||
|
||||
# we now do initial logical size calculation on startup, which on debug builds can fight with disk usage based eviction
|
||||
for tenant_id, timeline_id in self.timelines:
|
||||
tenant_ps = self.neon_env.get_tenant_pageserver(tenant_id)
|
||||
# Pageserver may be none if we are currently not attached anywhere, e.g. during secondary eviction test
|
||||
if tenant_ps is not None:
|
||||
tenant_ps.http_client().timeline_wait_logical_size(tenant_id, timeline_id)
|
||||
if wait_logical_size:
|
||||
for tenant_id, timeline_id in self.timelines:
|
||||
tenant_ps = self.neon_env.get_tenant_pageserver(tenant_id)
|
||||
# Pageserver may be none if we are currently not attached anywhere, e.g. during secondary eviction test
|
||||
if tenant_ps is not None:
|
||||
tenant_ps.http_client().timeline_wait_logical_size(tenant_id, timeline_id)
|
||||
|
||||
def statvfs_called():
|
||||
pageserver.assert_log_contains(".*running mocked statvfs.*")
|
||||
@@ -882,3 +894,121 @@ def test_secondary_mode_eviction(eviction_env_ha: EvictionEnv):
|
||||
assert total_size - post_eviction_total_size >= evict_bytes, (
|
||||
"we requested at least evict_bytes worth of free space"
|
||||
)
|
||||
|
||||
|
||||
@run_only_on_default_postgres(reason="PG version is irrelevant here")
|
||||
def test_import_timeline_disk_pressure_eviction(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
vanilla_pg: VanillaPostgres,
|
||||
make_httpserver: HTTPServer,
|
||||
pg_bin: PgBin,
|
||||
):
|
||||
"""
|
||||
TODO
|
||||
"""
|
||||
# Set up mock control plane HTTP server to listen for import completions
|
||||
import_completion_signaled = Event()
|
||||
|
||||
def handler(request: Request) -> Response:
|
||||
log.info(f"control plane /import_complete request: {request.json}")
|
||||
import_completion_signaled.set()
|
||||
return Response(json.dumps({}), status=200)
|
||||
|
||||
cplane_mgmt_api_server = make_httpserver
|
||||
cplane_mgmt_api_server.expect_request(
|
||||
"/storage/api/v1/import_complete", method="PUT"
|
||||
).respond_with_handler(handler)
|
||||
|
||||
# Plug the cplane mock in
|
||||
neon_env_builder.control_plane_hooks_api = (
|
||||
f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/storage/api/v1/"
|
||||
)
|
||||
|
||||
# The import will specifiy a local filesystem path mocking remote storage
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
|
||||
vanilla_pg.start()
|
||||
target_relblock_size = 1024 * 1024 * 128
|
||||
populate_vanilla_pg(vanilla_pg, target_relblock_size)
|
||||
vanilla_pg.stop()
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
importbucket_path = neon_env_builder.repo_dir / "test_import_completion_bucket"
|
||||
mock_import_bucket(vanilla_pg, importbucket_path)
|
||||
|
||||
tenant_id = TenantId.generate()
|
||||
timeline_id = TimelineId.generate()
|
||||
idempotency = ImportPgdataIdemptencyKey.random()
|
||||
|
||||
eviction_env = EvictionEnv(
|
||||
timelines=[(tenant_id, timeline_id)],
|
||||
neon_env=env,
|
||||
pageserver_http=env.pageserver.http_client(),
|
||||
layer_size=5 * 1024 * 1024, # Doesn't apply here
|
||||
pg_bin=pg_bin, # Not used here
|
||||
pgbench_init_lsns={}, # Not used here
|
||||
)
|
||||
|
||||
# Pause before delivering the final notification to storcon.
|
||||
# This keeps the import in progress.
|
||||
failpoint_name = "import-timeline-pre-success-notify-pausable"
|
||||
env.pageserver.add_persistent_failpoint(failpoint_name, "pause")
|
||||
|
||||
env.storage_controller.tenant_create(tenant_id)
|
||||
env.storage_controller.timeline_create(
|
||||
tenant_id,
|
||||
{
|
||||
"new_timeline_id": str(timeline_id),
|
||||
"import_pgdata": {
|
||||
"idempotency_key": str(idempotency),
|
||||
"location": {"LocalFs": {"path": str(importbucket_path.absolute())}},
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
def hit_failpoint():
|
||||
log.info("Checking log for pattern...")
|
||||
try:
|
||||
assert env.pageserver.log_contains(f".*at failpoint {failpoint_name}.*")
|
||||
except Exception:
|
||||
log.exception("Failed to find pattern in log")
|
||||
raise
|
||||
|
||||
wait_until(hit_failpoint)
|
||||
assert not import_completion_signaled.is_set()
|
||||
|
||||
env.pageserver.stop()
|
||||
|
||||
total_size, _, _ = eviction_env.timelines_du(env.pageserver)
|
||||
blocksize = 512
|
||||
total_blocks = (total_size + (blocksize - 1)) // blocksize
|
||||
|
||||
eviction_env.pageserver_start_with_disk_usage_eviction(
|
||||
env.pageserver,
|
||||
period="1s",
|
||||
max_usage_pct=33,
|
||||
min_avail_bytes=0,
|
||||
mock_behavior={
|
||||
"type": "Success",
|
||||
"blocksize": blocksize,
|
||||
"total_blocks": total_blocks,
|
||||
# Only count layer files towards used bytes in the mock_statvfs.
|
||||
# This avoids accounting for metadata files & tenant conf in the tests.
|
||||
"name_filter": ".*__.*",
|
||||
},
|
||||
eviction_order=EvictionOrder.RELATIVE_ORDER_SPARE,
|
||||
wait_logical_size=False,
|
||||
)
|
||||
|
||||
wait_until(lambda: env.pageserver.assert_log_contains(".*disk usage pressure relieved"))
|
||||
|
||||
env.pageserver.clear_persistent_failpoint(failpoint_name)
|
||||
|
||||
def cplane_notified():
|
||||
assert import_completion_signaled.is_set()
|
||||
|
||||
wait_until(cplane_notified)
|
||||
|
||||
env.pageserver.allowed_errors.append(r".* running disk usage based eviction due to pressure.*")
|
||||
|
||||
@@ -12,7 +12,12 @@ import psycopg2
|
||||
import psycopg2.errors
|
||||
import pytest
|
||||
from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineId
|
||||
from fixtures.fast_import import FastImport
|
||||
from fixtures.fast_import import (
|
||||
FastImport,
|
||||
mock_import_bucket,
|
||||
populate_vanilla_pg,
|
||||
validate_import_from_vanilla_pg,
|
||||
)
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
@@ -60,24 +65,6 @@ smoke_params = [
|
||||
]
|
||||
|
||||
|
||||
def mock_import_bucket(vanilla_pg: VanillaPostgres, path: Path):
|
||||
"""
|
||||
Mock the import S3 bucket into a local directory for a provided vanilla PG instance.
|
||||
"""
|
||||
assert not vanilla_pg.is_running()
|
||||
|
||||
path.mkdir()
|
||||
# what cplane writes before scheduling fast_import
|
||||
specpath = path / "spec.json"
|
||||
specpath.write_text(json.dumps({"branch_id": "somebranch", "project_id": "someproject"}))
|
||||
# what fast_import writes
|
||||
vanilla_pg.pgdatadir.rename(path / "pgdata")
|
||||
statusdir = path / "status"
|
||||
statusdir.mkdir()
|
||||
(statusdir / "pgdata").write_text(json.dumps({"done": True}))
|
||||
(statusdir / "fast_import").write_text(json.dumps({"command": "pgdata", "done": True}))
|
||||
|
||||
|
||||
@skip_in_debug_build("MULTIPLE_RELATION_SEGMENTS has non trivial amount of data")
|
||||
@pytest.mark.parametrize("shard_count,stripe_size,rel_block_size", smoke_params)
|
||||
def test_pgdata_import_smoke(
|
||||
@@ -132,10 +119,6 @@ def test_pgdata_import_smoke(
|
||||
# Put data in vanilla pg
|
||||
#
|
||||
|
||||
vanilla_pg.start()
|
||||
vanilla_pg.safe_psql("create user cloud_admin with password 'postgres' superuser")
|
||||
|
||||
log.info("create relblock data")
|
||||
if rel_block_size == RelBlockSize.ONE_STRIPE_SIZE:
|
||||
target_relblock_size = stripe_size * 8192
|
||||
elif rel_block_size == RelBlockSize.TWO_STRPES_PER_SHARD:
|
||||
@@ -146,45 +129,8 @@ def test_pgdata_import_smoke(
|
||||
else:
|
||||
raise ValueError
|
||||
|
||||
# fillfactor so we don't need to produce that much data
|
||||
# 900 byte per row is > 10% => 1 row per page
|
||||
vanilla_pg.safe_psql("""create table t (data char(900)) with (fillfactor = 10)""")
|
||||
|
||||
nrows = 0
|
||||
while True:
|
||||
relblock_size = vanilla_pg.safe_psql_scalar("select pg_relation_size('t')")
|
||||
log.info(
|
||||
f"relblock size: {relblock_size / 8192} pages (target: {target_relblock_size // 8192}) pages"
|
||||
)
|
||||
if relblock_size >= target_relblock_size:
|
||||
break
|
||||
addrows = int((target_relblock_size - relblock_size) // 8192)
|
||||
assert addrows >= 1, "forward progress"
|
||||
vanilla_pg.safe_psql(
|
||||
f"insert into t select generate_series({nrows + 1}, {nrows + addrows})"
|
||||
)
|
||||
nrows += addrows
|
||||
expect_nrows = nrows
|
||||
expect_sum = (
|
||||
(nrows) * (nrows + 1) // 2
|
||||
) # https://stackoverflow.com/questions/43901484/sum-of-the-integers-from-1-to-n
|
||||
|
||||
def validate_vanilla_equivalence(ep):
|
||||
# TODO: would be nicer to just compare pgdump
|
||||
|
||||
# Enable IO concurrency for batching on large sequential scan, to avoid making
|
||||
# this test unnecessarily onerous on CPU. Especially on debug mode, it's still
|
||||
# pretty onerous though, so increase statement_timeout to avoid timeouts.
|
||||
assert ep.safe_psql_many(
|
||||
[
|
||||
"set effective_io_concurrency=32;",
|
||||
"SET statement_timeout='300s';",
|
||||
"select count(*), sum(data::bigint)::bigint from t",
|
||||
]
|
||||
) == [[], [], [(expect_nrows, expect_sum)]]
|
||||
|
||||
validate_vanilla_equivalence(vanilla_pg)
|
||||
|
||||
vanilla_pg.start()
|
||||
rows_inserted = populate_vanilla_pg(vanilla_pg, target_relblock_size)
|
||||
vanilla_pg.stop()
|
||||
|
||||
#
|
||||
@@ -275,14 +221,14 @@ def test_pgdata_import_smoke(
|
||||
config_lines=ep_config,
|
||||
)
|
||||
|
||||
validate_vanilla_equivalence(ro_endpoint)
|
||||
validate_import_from_vanilla_pg(ro_endpoint, rows_inserted)
|
||||
|
||||
# ensure the import survives restarts
|
||||
ro_endpoint.stop()
|
||||
env.pageserver.stop(immediate=True)
|
||||
env.pageserver.start()
|
||||
ro_endpoint.start()
|
||||
validate_vanilla_equivalence(ro_endpoint)
|
||||
validate_import_from_vanilla_pg(ro_endpoint, rows_inserted)
|
||||
|
||||
#
|
||||
# validate the layer files in each shard only have the shard-specific data
|
||||
@@ -322,7 +268,7 @@ def test_pgdata_import_smoke(
|
||||
child_workload = workload.branch(timeline_id=child_timeline_id, branch_name="br-tip")
|
||||
child_workload.validate()
|
||||
|
||||
validate_vanilla_equivalence(child_workload.endpoint())
|
||||
validate_import_from_vanilla_pg(child_workload.endpoint(), rows_inserted)
|
||||
|
||||
# ... at the initdb lsn
|
||||
_ = env.create_branch(
|
||||
@@ -337,7 +283,7 @@ def test_pgdata_import_smoke(
|
||||
tenant_id=tenant_id,
|
||||
config_lines=ep_config,
|
||||
)
|
||||
validate_vanilla_equivalence(br_initdb_endpoint)
|
||||
validate_import_from_vanilla_pg(br_initdb_endpoint, rows_inserted)
|
||||
with pytest.raises(psycopg2.errors.UndefinedTable):
|
||||
br_initdb_endpoint.safe_psql(f"select * from {workload.table}")
|
||||
|
||||
@@ -578,23 +524,8 @@ def test_import_chaos(
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
|
||||
vanilla_pg.start()
|
||||
vanilla_pg.safe_psql("create user cloud_admin with password 'postgres' superuser")
|
||||
vanilla_pg.safe_psql("""create table t (data char(900)) with (fillfactor = 10)""")
|
||||
|
||||
nrows = 0
|
||||
while True:
|
||||
relblock_size = vanilla_pg.safe_psql_scalar("select pg_relation_size('t')")
|
||||
log.info(
|
||||
f"relblock size: {relblock_size / 8192} pages (target: {TARGET_RELBOCK_SIZE // 8192}) pages"
|
||||
)
|
||||
if relblock_size >= TARGET_RELBOCK_SIZE:
|
||||
break
|
||||
addrows = int((TARGET_RELBOCK_SIZE - relblock_size) // 8192)
|
||||
assert addrows >= 1, "forward progress"
|
||||
vanilla_pg.safe_psql(
|
||||
f"insert into t select generate_series({nrows + 1}, {nrows + addrows})"
|
||||
)
|
||||
nrows += addrows
|
||||
inserted_rows = populate_vanilla_pg(vanilla_pg, TARGET_RELBOCK_SIZE)
|
||||
|
||||
vanilla_pg.stop()
|
||||
|
||||
@@ -762,13 +693,7 @@ def test_import_chaos(
|
||||
endpoint = env.endpoints.create_start(branch_name=import_branch_name, tenant_id=tenant_id)
|
||||
|
||||
# Validate the imported data is legit
|
||||
assert endpoint.safe_psql_many(
|
||||
[
|
||||
"set effective_io_concurrency=32;",
|
||||
"SET statement_timeout='300s';",
|
||||
"select count(*), sum(data::bigint)::bigint from t",
|
||||
]
|
||||
) == [[], [], [(nrows, nrows * (nrows + 1) // 2)]]
|
||||
validate_import_from_vanilla_pg(endpoint, inserted_rows)
|
||||
|
||||
endpoint.stop()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user