mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 05:52:55 +00:00
pageserver: apply shard filtering to blocks ingested during initdb (#7319)
## Problem Ingest filtering wasn't being applied to timeline creations, so a timeline created on a sharded tenant would use 20MB+ on each shard (each shard got a full copy). This didn't break anything, but is inefficient and leaves the system in a harder-to-validate state where shards initially have some data that they will eventually drop during compaction. Closes: https://github.com/neondatabase/neon/issues/6649 ## Summary of changes - in `import_rel`, filter block-by-block with is_key_local - During test_sharding_smoke, check that per-shard physical sizes are as expected - Also extend the test to check deletion works as expected (this was an outstanding tech debt task)
This commit is contained in:
@@ -8,6 +8,7 @@ use anyhow::{bail, ensure, Context, Result};
|
||||
use bytes::Bytes;
|
||||
use camino::Utf8Path;
|
||||
use futures::StreamExt;
|
||||
use pageserver_api::key::rel_block_to_key;
|
||||
use tokio::io::{AsyncRead, AsyncReadExt};
|
||||
use tokio_tar::Archive;
|
||||
use tracing::*;
|
||||
@@ -170,7 +171,10 @@ async fn import_rel(
|
||||
let r = reader.read_exact(&mut buf).await;
|
||||
match r {
|
||||
Ok(_) => {
|
||||
modification.put_rel_page_image(rel, blknum, Bytes::copy_from_slice(&buf))?;
|
||||
let key = rel_block_to_key(rel, blknum);
|
||||
if modification.tline.get_shard_identity().is_key_local(&key) {
|
||||
modification.put_rel_page_image(rel, blknum, Bytes::copy_from_slice(&buf))?;
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: UnexpectedEof is expected
|
||||
|
||||
@@ -81,9 +81,13 @@ class Workload:
|
||||
|
||||
return self._endpoint
|
||||
|
||||
def __del__(self):
|
||||
def stop(self):
|
||||
if self._endpoint is not None:
|
||||
self._endpoint.stop()
|
||||
self._endpoint = None
|
||||
|
||||
def __del__(self):
|
||||
self.stop()
|
||||
|
||||
def stop(self):
|
||||
if self._endpoint is not None:
|
||||
|
||||
@@ -10,11 +10,13 @@ from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
S3Scrubber,
|
||||
StorageControllerApiException,
|
||||
last_flush_lsn_upload,
|
||||
tenant_get_shards,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
from fixtures.pageserver.utils import assert_prefix_empty, assert_prefix_not_empty
|
||||
from fixtures.remote_storage import s3_storage
|
||||
from fixtures.types import Lsn, TenantId, TenantShardId, TimelineId
|
||||
from fixtures.utils import wait_until
|
||||
@@ -69,6 +71,15 @@ def test_sharding_smoke(
|
||||
log.info(f"sizes = {sizes}")
|
||||
return sizes
|
||||
|
||||
# The imported initdb for timeline creation should
|
||||
# not be fully imported on every shard. We use a 1MB strripe size so expect
|
||||
# pretty good distribution: no one shard should have more than half the data
|
||||
sizes = get_sizes()
|
||||
physical_initdb_total = sum(sizes.values())
|
||||
expect_initdb_size = 20 * 1024 * 1024
|
||||
assert physical_initdb_total > expect_initdb_size
|
||||
assert all(s < expect_initdb_size // 2 for s in sizes.values())
|
||||
|
||||
# Test that timeline creation works on a sharded tenant
|
||||
timeline_b = env.neon_cli.create_branch("branch_b", tenant_id=tenant_id)
|
||||
|
||||
@@ -101,6 +112,38 @@ def test_sharding_smoke(
|
||||
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
# Validate that deleting a sharded tenant removes all files in the prefix
|
||||
|
||||
# Before deleting, stop the client and check we have some objects to delete
|
||||
workload.stop()
|
||||
assert_prefix_not_empty(
|
||||
neon_env_builder.pageserver_remote_storage,
|
||||
prefix="/".join(
|
||||
(
|
||||
"tenants",
|
||||
str(tenant_id),
|
||||
)
|
||||
),
|
||||
)
|
||||
|
||||
# Check the scrubber isn't confused by sharded content, then disable
|
||||
# it during teardown because we'll have deleted by then
|
||||
S3Scrubber(neon_env_builder).scan_metadata()
|
||||
neon_env_builder.scrub_on_exit = False
|
||||
|
||||
env.storage_controller.pageserver_api().tenant_delete(tenant_id)
|
||||
assert_prefix_empty(
|
||||
neon_env_builder.pageserver_remote_storage,
|
||||
prefix="/".join(
|
||||
(
|
||||
"tenants",
|
||||
str(tenant_id),
|
||||
)
|
||||
),
|
||||
)
|
||||
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
|
||||
def test_sharding_split_unsharded(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
|
||||
Reference in New Issue
Block a user