test(detach_ancestor): ensure L0 compaction in history is ok (#7813)

detaching a timeline from its ancestor can leave the resulting timeline
with more L0 layers than the compaction threshold. most of the time, the
detached timeline has made progress, and next time the L0 -> L1
compaction happens near the original branch point and not near the
last_record_lsn.

add a test to ensure that inheriting the historical L0s does not change
fullbackup. additionally:
- add `wait_until_completed` to test-only timeline checkpoint and
compact HTTP endpoints. with `?wait_until_completed=true` the endpoints
will wait until the remote client has completed uploads.
- for delta layers, describe L0-ness with the `/layer` endpoint

Cc: #6994
This commit is contained in:
Joonas Koivunen
2024-05-21 20:08:43 +03:00
committed by GitHub
parent 353afe4fe7
commit a8a88ba7bc
12 changed files with 200 additions and 20 deletions

View File

@@ -789,6 +789,8 @@ pub enum HistoricLayerInfo {
lsn_end: Lsn,
remote: bool,
access_stats: LayerAccessStats,
l0: bool,
},
Image {
layer_file_name: String,

View File

@@ -1736,6 +1736,8 @@ async fn timeline_compact_handler(
if Some(true) == parse_query_param::<_, bool>(&request, "force_image_layer_creation")? {
flags |= CompactFlags::ForceImageLayerCreation;
}
let wait_until_uploaded =
parse_query_param::<_, bool>(&request, "wait_until_uploaded")?.unwrap_or(false);
async {
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
@@ -1744,6 +1746,9 @@ async fn timeline_compact_handler(
.compact(&cancel, flags, &ctx)
.await
.map_err(|e| ApiError::InternalServerError(e.into()))?;
if wait_until_uploaded {
timeline.remote_client.wait_completion().await.map_err(ApiError::InternalServerError)?;
}
json_response(StatusCode::OK, ())
}
.instrument(info_span!("manual_compaction", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
@@ -1768,6 +1773,8 @@ async fn timeline_checkpoint_handler(
if Some(true) == parse_query_param::<_, bool>(&request, "force_image_layer_creation")? {
flags |= CompactFlags::ForceImageLayerCreation;
}
let wait_until_uploaded =
parse_query_param::<_, bool>(&request, "wait_until_uploaded")?.unwrap_or(false);
async {
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
@@ -1781,6 +1788,10 @@ async fn timeline_checkpoint_handler(
.await
.map_err(|e| ApiError::InternalServerError(e.into()))?;
if wait_until_uploaded {
timeline.remote_client.wait_completion().await.map_err(ApiError::InternalServerError)?;
}
json_response(StatusCode::OK, ())
}
.instrument(info_span!("manual_checkpoint", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))

View File

@@ -1264,6 +1264,7 @@ impl LayerInner {
lsn_end: lsn_range.end,
remote: !resident,
access_stats,
l0: crate::tenant::layer_map::LayerMap::is_l0(self.layer_desc()),
}
} else {
let lsn = self.desc.image_layer_lsn();

View File

@@ -347,37 +347,33 @@ impl<'de> serde::de::Visitor<'de> for LayerNameVisitor {
mod test {
use super::*;
#[test]
fn image_layer_parse() -> anyhow::Result<()> {
fn image_layer_parse() {
let expected = LayerName::Image(ImageLayerName {
key_range: Key::from_i128(0)
..Key::from_hex("000000067F00000001000004DF0000000006").unwrap(),
lsn: Lsn::from_hex("00000000014FED58").unwrap(),
});
let parsed = LayerName::from_str("000000000000000000000000000000000000-000000067F00000001000004DF0000000006__00000000014FED58-v1-00000001").map_err(|s| anyhow::anyhow!(s))?;
let parsed = LayerName::from_str("000000000000000000000000000000000000-000000067F00000001000004DF0000000006__00000000014FED58-v1-00000001").unwrap();
assert_eq!(parsed, expected,);
// Omitting generation suffix is valid
let parsed = LayerName::from_str("000000000000000000000000000000000000-000000067F00000001000004DF0000000006__00000000014FED58").map_err(|s| anyhow::anyhow!(s))?;
let parsed = LayerName::from_str("000000000000000000000000000000000000-000000067F00000001000004DF0000000006__00000000014FED58").unwrap();
assert_eq!(parsed, expected,);
Ok(())
}
#[test]
fn delta_layer_parse() -> anyhow::Result<()> {
fn delta_layer_parse() {
let expected = LayerName::Delta(DeltaLayerName {
key_range: Key::from_i128(0)
..Key::from_hex("000000067F00000001000004DF0000000006").unwrap(),
lsn_range: Lsn::from_hex("00000000014FED58").unwrap()
..Lsn::from_hex("000000000154C481").unwrap(),
});
let parsed = LayerName::from_str("000000000000000000000000000000000000-000000067F00000001000004DF0000000006__00000000014FED58-000000000154C481-v1-00000001").map_err(|s| anyhow::anyhow!(s))?;
let parsed = LayerName::from_str("000000000000000000000000000000000000-000000067F00000001000004DF0000000006__00000000014FED58-000000000154C481-v1-00000001").unwrap();
assert_eq!(parsed, expected);
// Omitting generation suffix is valid
let parsed = LayerName::from_str("000000000000000000000000000000000000-000000067F00000001000004DF0000000006__00000000014FED58-000000000154C481").map_err(|s| anyhow::anyhow!(s))?;
let parsed = LayerName::from_str("000000000000000000000000000000000000-000000067F00000001000004DF0000000006__00000000014FED58-000000000154C481").unwrap();
assert_eq!(parsed, expected);
Ok(())
}
}

View File

@@ -56,20 +56,30 @@ class InMemoryLayerInfo:
class HistoricLayerInfo:
kind: str
layer_file_name: str
layer_file_size: Optional[int]
layer_file_size: int
lsn_start: str
lsn_end: Optional[str]
remote: bool
# None for image layers, true if pageserver thinks this is an L0 delta layer
l0: Optional[bool]
@classmethod
def from_json(cls, d: Dict[str, Any]) -> HistoricLayerInfo:
# instead of parsing the key range lets keep the definition of "L0" in pageserver
l0_ness = d.get("l0")
assert l0_ness is None or isinstance(l0_ness, bool)
size = d["layer_file_size"]
assert isinstance(size, int)
return HistoricLayerInfo(
kind=d["kind"],
layer_file_name=d["layer_file_name"],
layer_file_size=d.get("layer_file_size"),
layer_file_size=size,
lsn_start=d["lsn_start"],
lsn_end=d.get("lsn_end"),
remote=d["remote"],
l0=l0_ness,
)
@@ -583,6 +593,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
timeline_id: TimelineId,
force_repartition=False,
force_image_layer_creation=False,
wait_until_uploaded=False,
):
self.is_testing_enabled_or_skip()
query = {}
@@ -590,6 +601,8 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
query["force_repartition"] = "true"
if force_image_layer_creation:
query["force_image_layer_creation"] = "true"
if wait_until_uploaded:
query["wait_until_uploaded"] = "true"
log.info(f"Requesting compact: tenant {tenant_id}, timeline {timeline_id}")
res = self.put(
@@ -656,6 +669,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
timeline_id: TimelineId,
force_repartition=False,
force_image_layer_creation=False,
wait_until_uploaded=False,
):
self.is_testing_enabled_or_skip()
query = {}
@@ -663,6 +677,8 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
query["force_repartition"] = "true"
if force_image_layer_creation:
query["force_image_layer_creation"] = "true"
if wait_until_uploaded:
query["wait_until_uploaded"] = "true"
log.info(f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}")
res = self.put(

View File

@@ -165,7 +165,6 @@ def test_sharding_compaction(
image_layer_sizes[layer.layer_file_name] = layer.layer_file_size
# Pageserver should assert rather than emit an empty layer file, but double check here
assert layer.layer_file_size is not None
assert layer.layer_file_size > 0
shard_has_image_layers.append(len(image_layer_sizes) > 1)
@@ -178,7 +177,7 @@ def test_sharding_compaction(
#
# We only do this check with tiny stripes, because large stripes may not give all shards enough
# data to have statistically significant image layers
avg_size = sum(v for v in image_layer_sizes.values()) / len(image_layer_sizes) # type: ignore
avg_size = sum(v for v in image_layer_sizes.values()) / len(image_layer_sizes)
log.info(f"Shard {shard_id} average image layer size: {avg_size}")
assert avg_size > compaction_target_size / 2

View File

@@ -272,14 +272,14 @@ def test_gc_of_remote_layers(neon_env_builder: NeonEnvBuilder):
resident_physical_size_metric == 0
), "ensure that resident_physical_size metric is zero"
assert resident_physical_size_metric == sum(
layer.layer_file_size or 0 for layer in info.historic_layers if not layer.remote
layer.layer_file_size for layer in info.historic_layers if not layer.remote
), "ensure that resident_physical_size metric corresponds to layer map dump"
remote_physical_size_metric = ps_http.get_timeline_metric(
tenant_id, timeline_id, "pageserver_remote_physical_size"
)
assert remote_physical_size_metric == sum(
layer.layer_file_size or 0 for layer in info.historic_layers if layer.remote
layer.layer_file_size for layer in info.historic_layers if layer.remote
), "ensure that remote_physical_size metric corresponds to layer map dump"
log.info("before runnning GC, ensure that remote_physical size is zero")

View File

@@ -540,7 +540,6 @@ def test_compaction_downloads_on_demand_without_image_creation(neon_env_builder:
for layer in layers.historic_layers:
log.info(f"pre-compact: {layer}")
assert layer.layer_file_size is not None, "we must know layer file sizes"
layer_sizes += layer.layer_file_size
pageserver_http.evict_layer(tenant_id, timeline_id, layer.layer_file_name)

View File

@@ -287,7 +287,7 @@ def test_total_size_limit(neon_env_builder: NeonEnvBuilder):
total_historic_bytes += sum(
layer.layer_file_size
for layer in layer_map.historic_layers
if layer.layer_file_size is not None and Lsn(layer.lsn_start) > initdb_lsn
if Lsn(layer.lsn_start) > initdb_lsn
)
total_ephemeral_layers += len(layer_map.in_memory_layers)

View File

@@ -632,7 +632,6 @@ def test_sharding_ingest_layer_sizes(
historic_layers = sorted(layer_map.historic_layers, key=lambda layer: layer.lsn_start)
for layer in historic_layers:
assert layer.layer_file_size is not None
if layer.layer_file_size < expect_layer_size // 2:
classification = "Small"
small_layer_count += 1

View File

@@ -482,6 +482,163 @@ def test_detached_receives_flushes_while_being_detached(neon_env_builder: NeonEn
env.pageserver.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
def test_compaction_induced_by_detaches_in_history(
neon_env_builder: NeonEnvBuilder, test_output_dir, pg_distrib_dir, pg_bin: PgBin
):
"""
Assuming the tree of timelines:
root
|- child1
|- ...
|- wanted_detached_child
Each detach can add N more L0 per level, this is actually unbounded because
compaction can be arbitrarily delayed (or detach happen right before one
starts). If "wanted_detached_child" has already made progress and compacted
L1s, we want to make sure "compaction in the history" does not leave the
timeline broken.
"""
psql_env = {"LD_LIBRARY_PATH": str(pg_distrib_dir / "lib")}
env = neon_env_builder.init_start(
initial_tenant_conf={
# we want to create layers manually so we don't branch on arbitrary
# Lsn, but we also do not want to compact L0 -> L1.
"compaction_threshold": "99999",
"compaction_period": "0s",
# shouldn't matter, but just in case
"gc_period": "0s",
}
)
env.pageserver.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
client = env.pageserver.http_client()
def delta_layers(timeline_id: TimelineId):
# shorthand for more readable formatting
return client.layer_map_info(env.initial_tenant, timeline_id).delta_layers()
with env.endpoints.create_start("main", tenant_id=env.initial_tenant) as ep:
ep.safe_psql("create table integers (i bigint not null);")
ep.safe_psql("insert into integers (i) values (42)")
branch_lsn = wait_for_last_flush_lsn(env, ep, env.initial_tenant, env.initial_timeline)
client.timeline_checkpoint(env.initial_tenant, env.initial_timeline)
assert len(delta_layers(env.initial_timeline)) == 2
more_good_numbers = range(0, 3)
branches: List[Tuple[str, TimelineId]] = [("main", env.initial_timeline)]
for num in more_good_numbers:
branch_name = f"br-{len(branches)}"
branch_timeline_id = env.neon_cli.create_branch(
branch_name,
ancestor_branch_name=branches[-1][0],
tenant_id=env.initial_tenant,
ancestor_start_lsn=branch_lsn,
)
branches.append((branch_name, branch_timeline_id))
with env.endpoints.create_start(branches[-1][0], tenant_id=env.initial_tenant) as ep:
ep.safe_psql(
f"insert into integers (i) select i from generate_series({num}, {num + 100}) as s(i)"
)
branch_lsn = wait_for_last_flush_lsn(env, ep, env.initial_tenant, branch_timeline_id)
client.timeline_checkpoint(env.initial_tenant, branch_timeline_id)
assert len(delta_layers(branch_timeline_id)) == 1
# now fill in the final, most growing timeline
branch_name, branch_timeline_id = branches[-1]
with env.endpoints.create_start(branch_name, tenant_id=env.initial_tenant) as ep:
ep.safe_psql("insert into integers (i) select i from generate_series(50, 500) s(i)")
last_suffix = None
for suffix in range(0, 4):
ep.safe_psql(f"create table other_table_{suffix} as select * from integers")
wait_for_last_flush_lsn(env, ep, env.initial_tenant, branch_timeline_id)
client.timeline_checkpoint(env.initial_tenant, branch_timeline_id)
last_suffix = suffix
assert last_suffix is not None
assert len(delta_layers(branch_timeline_id)) == 5
client.patch_tenant_config_client_side(
env.initial_tenant, {"compaction_threshold": 5}, None
)
client.timeline_compact(env.initial_tenant, branch_timeline_id)
# one more layer
ep.safe_psql(f"create table other_table_{last_suffix + 1} as select * from integers")
wait_for_last_flush_lsn(env, ep, env.initial_tenant, branch_timeline_id)
# we need to wait here, because the detaches will do implicit tenant restart,
# and we could get unexpected layer counts
client.timeline_checkpoint(env.initial_tenant, branch_timeline_id, wait_until_uploaded=True)
assert len([filter(lambda x: x.l0, delta_layers(branch_timeline_id))]) == 1
skip_main = branches[1:]
branch_lsn = client.timeline_detail(env.initial_tenant, branch_timeline_id)["ancestor_lsn"]
# take the fullbackup before and after inheriting the new L0s
fullbackup_before = test_output_dir / "fullbackup-before.tar"
cmd = [
"psql",
"--no-psqlrc",
env.pageserver.connstr(),
"-c",
f"fullbackup {env.initial_tenant} {branch_timeline_id} {branch_lsn}",
"-o",
str(fullbackup_before),
]
pg_bin.run_capture(cmd, env=psql_env)
for _, timeline_id in skip_main:
reparented = client.detach_ancestor(env.initial_tenant, timeline_id)
assert reparented == set(), "we have no earlier branches at any level"
post_detach_l0s = list(filter(lambda x: x.l0, delta_layers(branch_timeline_id)))
assert len(post_detach_l0s) == 5, "should had inherited 4 L0s, have 5 in total"
# checkpoint does compaction, which in turn decides to run, because
# there is now in total threshold number L0s even if they are not
# adjacent in Lsn space:
#
# inherited flushed during this checkpoint
# \\\\ /
# 1234X5---> lsn
# |
# l1 layers from "fill in the final, most growing timeline"
#
# branch_lsn is between 4 and first X.
client.timeline_checkpoint(env.initial_tenant, branch_timeline_id)
post_compact_l0s = list(filter(lambda x: x.l0, delta_layers(branch_timeline_id)))
assert len(post_compact_l0s) == 1, "only the consecutive inherited L0s should be compacted"
fullbackup_after = test_output_dir / "fullbackup_after.tar"
cmd = [
"psql",
"--no-psqlrc",
env.pageserver.connstr(),
"-c",
f"fullbackup {env.initial_tenant} {branch_timeline_id} {branch_lsn}",
"-o",
str(fullbackup_after),
]
pg_bin.run_capture(cmd, env=psql_env)
# we don't need to skip any files, because zenith.signal will be identical
tar_cmp(fullbackup_before, fullbackup_after, set())
# TODO:
# - after starting the operation, tenant is deleted
# - after starting the operation, pageserver is shutdown, restarted

View File

@@ -656,7 +656,7 @@ def get_physical_size_values(
client = env.pageserver.http_client()
res.layer_map_file_size_sum = sum(
layer.layer_file_size or 0
layer.layer_file_size
for layer in client.layer_map_info(tenant_id, timeline_id).historic_layers
)