fix: allow layer flushes more often (#7927)

As seen with the pgvector 0.7.0 index builds, we can receive large
batches of images, leading to very large L0 layers in the range of 1GB.
These large layers are produced because we are only able to roll the
layer after we have witnessed two different Lsns in a single
`DataDirModification::commit`. As the single Lsn batches of images can
span over multiple `DataDirModification` lifespans, we will rarely get
to write two different Lsns in a single `put_batch` currently.

The solution is to remember the TimelineWriterState instead of eagerly
forgetting it until we really open the next layer or someone else
flushes (while holding the write_guard).

Additional changes are test fixes to avoid "initdb image layer
optimization" or ignoring initdb layers for assertion.

Cc: #7197 because small `checkpoint_distance` will now trigger the
"initdb image layer optimization"
This commit is contained in:
Joonas Koivunen
2024-06-10 16:50:17 +03:00
committed by GitHub
parent 5a7e285c2c
commit b52e31c1a4
9 changed files with 245 additions and 37 deletions

View File

@@ -52,7 +52,7 @@ pub struct InMemoryLayer {
/// Frozen layers have an exclusive end LSN.
/// Writes are only allowed when this is `None`.
end_lsn: OnceLock<Lsn>,
pub(crate) end_lsn: OnceLock<Lsn>,
/// Used for traversal path. Cached representation of the in-memory layer before frozen.
local_path_str: Arc<str>,

View File

@@ -322,6 +322,8 @@ pub struct Timeline {
/// Locked automatically by [`TimelineWriter`] and checkpointer.
/// Must always be acquired before the layer map/individual layer lock
/// to avoid deadlock.
///
/// The state is cleared upon freezing.
write_lock: tokio::sync::Mutex<Option<TimelineWriterState>>,
/// Used to avoid multiple `flush_loop` tasks running
@@ -1578,7 +1580,7 @@ impl Timeline {
// an ephemeral layer open forever when idle. It also freezes layers if the global limit on
// ephemeral layer bytes has been breached.
pub(super) async fn maybe_freeze_ephemeral_layer(&self) {
let Ok(_write_guard) = self.write_lock.try_lock() else {
let Ok(mut write_guard) = self.write_lock.try_lock() else {
// If the write lock is held, there is an active wal receiver: rolling open layers
// is their responsibility while they hold this lock.
return;
@@ -1672,6 +1674,7 @@ impl Timeline {
.await;
}
}
write_guard.take();
self.flush_frozen_layers();
}
}
@@ -2036,11 +2039,11 @@ impl Timeline {
true
} else if distance > 0 && opened_at.elapsed() >= self.get_checkpoint_timeout() {
info!(
"Will roll layer at {} with layer size {} due to time since first write to the layer ({:?})",
projected_lsn,
layer_size,
opened_at.elapsed()
);
"Will roll layer at {} with layer size {} due to time since first write to the layer ({:?})",
projected_lsn,
layer_size,
opened_at.elapsed()
);
true
} else {
@@ -3653,7 +3656,10 @@ impl Timeline {
let _write_guard = if write_lock_held {
None
} else {
Some(self.write_lock.lock().await)
let mut g = self.write_lock.lock().await;
// remove the reference to an open layer
g.take();
Some(g)
};
let to_lsn = self.get_last_record_lsn();
@@ -5541,6 +5547,9 @@ impl Timeline {
type TraversalPathItem = (ValueReconstructResult, Lsn, TraversalId);
/// Tracking writes ingestion does to a particular in-memory layer.
///
/// Cleared upon freezing a layer.
struct TimelineWriterState {
open_layer: Arc<InMemoryLayer>,
current_size: u64,
@@ -5581,12 +5590,6 @@ impl Deref for TimelineWriter<'_> {
}
}
impl Drop for TimelineWriter<'_> {
fn drop(&mut self) {
self.write_guard.take();
}
}
#[derive(PartialEq)]
enum OpenLayerAction {
Roll,
@@ -5692,6 +5695,17 @@ impl<'a> TimelineWriter<'a> {
return OpenLayerAction::Open;
};
if state.cached_last_freeze_at < self.tl.last_freeze_at.load() {
// TODO(#7993): branch is needed before refactoring the many places of freezing for the
// possibility `state` having a "dangling" reference to an already frozen in-memory
// layer.
assert!(
state.open_layer.end_lsn.get().is_some(),
"our open_layer must be outdated"
);
return OpenLayerAction::Open;
}
if state.prev_lsn == Some(lsn) {
// Rolling mid LSN is not supported by downstream code.
// Hence, only roll at LSN boundaries.

View File

@@ -313,7 +313,7 @@ def assert_prefix_empty(
# https://neon-github-public-dev.s3.amazonaws.com/reports/pr-5322/6207777020/index.html#suites/3556ed71f2d69272a7014df6dcb02317/53b5c368b5a68865
# this seems like a mock_s3 issue
log.warning(
f"contrading ListObjectsV2 response with KeyCount={keys} and Contents={objects}, CommonPrefixes={common_prefixes}, assuming this means KeyCount=0"
f"contradicting ListObjectsV2 response with KeyCount={keys} and Contents={objects}, CommonPrefixes={common_prefixes}, assuming this means KeyCount=0"
)
keys = 0
elif keys != 0 and len(objects) == 0:

View File

@@ -582,3 +582,20 @@ class PropagatingThread(threading.Thread):
if self.exc:
raise self.exc
return self.ret
def human_bytes(amt: float) -> str:
"""
Render a bytes amount into nice IEC bytes string.
"""
suffixes = ["", "Ki", "Mi", "Gi"]
last = suffixes[-1]
for name in suffixes:
if amt < 1024 or name == last:
return f"{int(round(amt))} {name}B"
amt = amt / 1024
raise RuntimeError("unreachable")

View File

@@ -17,7 +17,7 @@ from fixtures.neon_fixtures import (
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.utils import wait_for_upload_queue_empty
from fixtures.remote_storage import RemoteStorageKind
from fixtures.utils import wait_until
from fixtures.utils import human_bytes, wait_until
GLOBAL_LRU_LOG_LINE = "tenant_min_resident_size-respecting LRU would not relieve pressure, evicting more following global LRU policy"
@@ -218,19 +218,6 @@ def count_layers_per_tenant(
return dict(ret)
def human_bytes(amt: float) -> str:
suffixes = ["", "Ki", "Mi", "Gi"]
last = suffixes[-1]
for name in suffixes:
if amt < 1024 or name == last:
return f"{int(round(amt))} {name}B"
amt = amt / 1024
raise RuntimeError("unreachable")
def _eviction_env(
request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, num_pageservers: int
) -> EvictionEnv:
@@ -294,7 +281,7 @@ def pgbench_init_tenant(
"gc_period": "0s",
"compaction_period": "0s",
"checkpoint_distance": f"{layer_size}",
"image_creation_threshold": "100",
"image_creation_threshold": "999999",
"compaction_target_size": f"{layer_size}",
}
)
@@ -668,11 +655,10 @@ def test_fast_growing_tenant(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, or
finish_tenant_creation(env, tenant_id, timeline_id, min_expected_layers)
tenant_layers = count_layers_per_tenant(env.pageserver, map(lambda x: x[0], timelines))
(total_on_disk, _, _) = poor_mans_du(env, map(lambda x: x[0], timelines), env.pageserver, False)
(total_on_disk, _, _) = poor_mans_du(env, map(lambda x: x[0], timelines), env.pageserver, True)
# cut 10 percent
response = env.pageserver.http_client().disk_usage_eviction_run(
{"evict_bytes": total_on_disk // 10, "eviction_order": order.config()}
{"evict_bytes": total_on_disk // 5, "eviction_order": order.config()}
)
log.info(f"{response}")

View File

@@ -0,0 +1,151 @@
from dataclasses import dataclass
from typing import Iterable, List, Union
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, wait_for_last_flush_lsn
from fixtures.pageserver.http import HistoricLayerInfo, LayerMapInfo
from fixtures.utils import human_bytes
def test_ingesting_large_batches_of_images(neon_env_builder: NeonEnvBuilder, build_type: str):
"""
Build a non-small GIN index which includes similarly batched up images in WAL stream as does pgvector
to show that we no longer create oversized layers.
"""
if build_type == "debug":
pytest.skip("debug run is unnecessarily slow")
minimum_initdb_size = 20 * 1024**2
checkpoint_distance = 32 * 1024**2
minimum_good_layer_size = checkpoint_distance * 0.9
minimum_too_large_layer_size = 2 * checkpoint_distance
# index size: 99MiB
rows = 2_500_000
# bucket lower limits
buckets = [0, minimum_initdb_size, minimum_good_layer_size, minimum_too_large_layer_size]
assert (
minimum_initdb_size < minimum_good_layer_size
), "keep checkpoint_distance higher than the initdb size (find it by experimenting)"
env = neon_env_builder.init_start(
initial_tenant_conf={
"checkpoint_distance": f"{checkpoint_distance}",
"compaction_target_size": f"{checkpoint_distance}",
# this test is primarly interested in L0 sizes but we'll compact after ingestion to ensure sizes are good even then
"compaction_period": "0s",
"gc_period": "0s",
"compaction_threshold": "255",
"image_creation_threshold": "99999",
}
)
# build a larger than 3*checkpoint_distance sized gin index.
# gin index building exhibits the same behaviour as the pgvector with the two phase build
with env.endpoints.create_start("main") as ep, ep.cursor() as cur:
cur.execute(
f"create table int_array_test as select array_agg(g) as int_array from generate_series(1, {rows}) g group by g / 10;"
)
cur.execute(
"create index int_array_test_gin_index on int_array_test using gin (int_array);"
)
cur.execute("select pg_table_size('int_array_test_gin_index')")
size = cur.fetchone()
assert size is not None
assert isinstance(size[0], int)
log.info(f"gin index size: {human_bytes(size[0])}")
assert (
size[0] > checkpoint_distance * 3
), f"gin index is not large enough: {human_bytes(size[0])}"
wait_for_last_flush_lsn(env, ep, env.initial_tenant, env.initial_timeline)
ps_http = env.pageserver.http_client()
ps_http.timeline_checkpoint(env.initial_tenant, env.initial_timeline)
infos = ps_http.layer_map_info(env.initial_tenant, env.initial_timeline)
assert len(infos.in_memory_layers) == 0, "should had flushed open layers"
post_ingest = histogram_historic_layers(infos, buckets)
# describe first, assert later for easier debugging
log.info("non-cumulative layer size distribution after ingestion:")
print_layer_size_histogram(post_ingest)
# since all we have are L0s, we should be getting nice L1s and images out of them now
ps_http.patch_tenant_config_client_side(
env.initial_tenant,
{
"compaction_threshold": 1,
"image_creation_threshold": 1,
},
)
ps_http.timeline_compact(env.initial_tenant, env.initial_timeline, True, True)
infos = ps_http.layer_map_info(env.initial_tenant, env.initial_timeline)
assert len(infos.in_memory_layers) == 0, "no new inmem layers expected"
post_compact = histogram_historic_layers(infos, buckets)
log.info("non-cumulative layer size distribution after compaction:")
print_layer_size_histogram(post_compact)
assert (
post_ingest.counts[3] == 0
), f"there should be no layers larger than 2*checkpoint_distance ({human_bytes(2*checkpoint_distance)})"
assert post_ingest.counts[1] == 1, "expect one smaller layer for initdb"
assert (
post_ingest.counts[0] <= 1
), "expect at most one tiny layer from shutting down the endpoint"
# just make sure we don't have trouble splitting the layers apart
assert post_compact.counts[3] == 0
@dataclass
class Histogram:
buckets: List[Union[int, float]]
counts: List[int]
sums: List[int]
def histogram_historic_layers(
infos: LayerMapInfo, minimum_sizes: List[Union[int, float]]
) -> Histogram:
def log_layer(layer: HistoricLayerInfo) -> HistoricLayerInfo:
log.info(
f"{layer.layer_file_name} {human_bytes(layer.layer_file_size)} ({layer.layer_file_size} bytes)"
)
return layer
layers = map(log_layer, infos.historic_layers)
sizes = (x.layer_file_size for x in layers)
return histogram(sizes, minimum_sizes)
def histogram(sizes: Iterable[int], minimum_sizes: List[Union[int, float]]) -> Histogram:
assert all(minimum_sizes[i] < minimum_sizes[i + 1] for i in range(len(minimum_sizes) - 1))
buckets = list(enumerate(minimum_sizes))
counts = [0 for _ in buckets]
sums = [0 for _ in buckets]
for size in sizes:
found = False
for index, min_size in reversed(buckets):
if size >= min_size:
counts[index] += 1
sums[index] += size
found = True
break
assert found
return Histogram(minimum_sizes, counts, sums)
def print_layer_size_histogram(h: Histogram):
for index, min_size in enumerate(h.buckets):
log.info(
f">= {human_bytes(min_size)}: {h.counts[index]} layers total {human_bytes(h.sums[index])}"
)

View File

@@ -563,6 +563,7 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
)
),
)
workload.stop()
def test_secondary_background_downloads(neon_env_builder: NeonEnvBuilder):

View File

@@ -2,6 +2,7 @@ import time
from datetime import datetime, timezone
from fixtures.common_types import Lsn
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
PgBin,
@@ -32,7 +33,12 @@ def test_tenant_s3_restore(
assert remote_storage, "remote storage not configured"
enable_remote_storage_versioning(remote_storage)
env = neon_env_builder.init_start(initial_tenant_conf=MANY_SMALL_LAYERS_TENANT_CONFIG)
# change it back after initdb, recovery doesn't work if the two
# index_part.json uploads happen at same second or too close to each other.
initial_tenant_conf = MANY_SMALL_LAYERS_TENANT_CONFIG
del initial_tenant_conf["checkpoint_distance"]
env = neon_env_builder.init_start(initial_tenant_conf)
env.pageserver.allowed_errors.extend(
[
# The deletion queue will complain when it encounters simulated S3 errors
@@ -43,14 +49,16 @@ def test_tenant_s3_restore(
)
ps_http = env.pageserver.http_client()
tenant_id = env.initial_tenant
# now lets create the small layers
ps_http.set_tenant_config(tenant_id, MANY_SMALL_LAYERS_TENANT_CONFIG)
# Default tenant and the one we created
assert ps_http.get_metric_value("pageserver_tenant_manager_slots", {"mode": "attached"}) == 1
# create two timelines one being the parent of another, both with non-trivial data
parent = None
parent = "main"
last_flush_lsns = []
for timeline in ["first", "second"]:
@@ -64,6 +72,7 @@ def test_tenant_s3_restore(
last_flush_lsns.append(last_flush_lsn)
ps_http.timeline_checkpoint(tenant_id, timeline_id)
wait_for_upload(ps_http, tenant_id, timeline_id, last_flush_lsn)
log.info(f"{timeline} timeline {timeline_id} {last_flush_lsn=}")
parent = timeline
# These sleeps are important because they fend off differences in clocks between us and S3
@@ -108,6 +117,9 @@ def test_tenant_s3_restore(
ps_http.tenant_attach(tenant_id, generation=generation)
env.pageserver.quiesce_tenants()
for tline in ps_http.timeline_list(env.initial_tenant):
log.info(f"timeline detail: {tline}")
for i, timeline in enumerate(["first", "second"]):
with env.endpoints.create_start(timeline, tenant_id=tenant_id) as endpoint:
endpoint.safe_psql(f"SELECT * FROM created_{timeline};")

View File

@@ -697,6 +697,9 @@ def test_sharding_ingest_layer_sizes(
# small checkpointing and compaction targets to ensure we generate many upload operations
"checkpoint_distance": f"{expect_layer_size}",
"compaction_target_size": f"{expect_layer_size}",
# aim to reduce flakyness, we are not doing explicit checkpointing
"compaction_period": "0s",
"gc_period": "0s",
}
shard_count = 4
neon_env_builder.num_pageservers = shard_count
@@ -712,6 +715,23 @@ def test_sharding_ingest_layer_sizes(
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
# ignore the initdb layer(s) for the purposes of the size comparison as a initdb image layer optimization
# will produce a lot more smaller layers.
initial_layers_per_shard = {}
log.info("initdb distribution (not asserted on):")
for shard in env.storage_controller.locate(tenant_id):
pageserver = env.get_pageserver(shard["node_id"])
shard_id = shard["shard_id"]
layers = (
env.get_pageserver(shard["node_id"]).http_client().layer_map_info(shard_id, timeline_id)
)
for layer in layers.historic_layers:
log.info(
f"layer[{pageserver.id}]: {layer.layer_file_name} (size {layer.layer_file_size})"
)
initial_layers_per_shard[shard_id] = set(layers.historic_layers)
workload = Workload(env, tenant_id, timeline_id)
workload.init()
workload.write_rows(4096, upload=False)
@@ -733,7 +753,13 @@ def test_sharding_ingest_layer_sizes(
historic_layers = sorted(layer_map.historic_layers, key=lambda layer: layer.lsn_start)
initial_layers = initial_layers_per_shard[shard_id]
for layer in historic_layers:
if layer in initial_layers:
# ignore the initdb image layers for the size histogram
continue
if layer.layer_file_size < expect_layer_size // 2:
classification = "Small"
small_layer_count += 1
@@ -763,7 +789,8 @@ def test_sharding_ingest_layer_sizes(
pass
else:
# General case:
assert float(small_layer_count) / float(ok_layer_count) < 0.25
# old limit was 0.25 but pg14 is right at the limit with 7/28
assert float(small_layer_count) / float(ok_layer_count) < 0.3
# Each shard may emit up to one huge layer, because initdb ingest doesn't respect checkpoint_distance.
assert huge_layer_count <= shard_count