diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 451ee1a13c..05a444d738 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -789,6 +789,8 @@ pub enum HistoricLayerInfo { lsn_end: Lsn, remote: bool, access_stats: LayerAccessStats, + + l0: bool, }, Image { layer_file_name: String, diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 0eab6510ca..ec3b1141f3 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -1736,6 +1736,8 @@ async fn timeline_compact_handler( if Some(true) == parse_query_param::<_, bool>(&request, "force_image_layer_creation")? { flags |= CompactFlags::ForceImageLayerCreation; } + let wait_until_uploaded = + parse_query_param::<_, bool>(&request, "wait_until_uploaded")?.unwrap_or(false); async { let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); @@ -1744,6 +1746,9 @@ async fn timeline_compact_handler( .compact(&cancel, flags, &ctx) .await .map_err(|e| ApiError::InternalServerError(e.into()))?; + if wait_until_uploaded { + timeline.remote_client.wait_completion().await.map_err(ApiError::InternalServerError)?; + } json_response(StatusCode::OK, ()) } .instrument(info_span!("manual_compaction", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id)) @@ -1768,6 +1773,8 @@ async fn timeline_checkpoint_handler( if Some(true) == parse_query_param::<_, bool>(&request, "force_image_layer_creation")? { flags |= CompactFlags::ForceImageLayerCreation; } + let wait_until_uploaded = + parse_query_param::<_, bool>(&request, "wait_until_uploaded")?.unwrap_or(false); async { let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); @@ -1781,6 +1788,10 @@ async fn timeline_checkpoint_handler( .await .map_err(|e| ApiError::InternalServerError(e.into()))?; + if wait_until_uploaded { + timeline.remote_client.wait_completion().await.map_err(ApiError::InternalServerError)?; + } + json_response(StatusCode::OK, ()) } .instrument(info_span!("manual_checkpoint", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id)) diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index 97b349f635..45d61ce048 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -1264,6 +1264,7 @@ impl LayerInner { lsn_end: lsn_range.end, remote: !resident, access_stats, + l0: crate::tenant::layer_map::LayerMap::is_l0(self.layer_desc()), } } else { let lsn = self.desc.image_layer_lsn(); diff --git a/pageserver/src/tenant/storage_layer/layer_name.rs b/pageserver/src/tenant/storage_layer/layer_name.rs index c733404693..da26e1eeb7 100644 --- a/pageserver/src/tenant/storage_layer/layer_name.rs +++ b/pageserver/src/tenant/storage_layer/layer_name.rs @@ -347,37 +347,33 @@ impl<'de> serde::de::Visitor<'de> for LayerNameVisitor { mod test { use super::*; #[test] - fn image_layer_parse() -> anyhow::Result<()> { + fn image_layer_parse() { let expected = LayerName::Image(ImageLayerName { key_range: Key::from_i128(0) ..Key::from_hex("000000067F00000001000004DF0000000006").unwrap(), lsn: Lsn::from_hex("00000000014FED58").unwrap(), }); - let parsed = LayerName::from_str("000000000000000000000000000000000000-000000067F00000001000004DF0000000006__00000000014FED58-v1-00000001").map_err(|s| anyhow::anyhow!(s))?; + let parsed = LayerName::from_str("000000000000000000000000000000000000-000000067F00000001000004DF0000000006__00000000014FED58-v1-00000001").unwrap(); assert_eq!(parsed, expected,); // Omitting generation suffix is valid - let parsed = LayerName::from_str("000000000000000000000000000000000000-000000067F00000001000004DF0000000006__00000000014FED58").map_err(|s| anyhow::anyhow!(s))?; + let parsed = LayerName::from_str("000000000000000000000000000000000000-000000067F00000001000004DF0000000006__00000000014FED58").unwrap(); assert_eq!(parsed, expected,); - - Ok(()) } #[test] - fn delta_layer_parse() -> anyhow::Result<()> { + fn delta_layer_parse() { let expected = LayerName::Delta(DeltaLayerName { key_range: Key::from_i128(0) ..Key::from_hex("000000067F00000001000004DF0000000006").unwrap(), lsn_range: Lsn::from_hex("00000000014FED58").unwrap() ..Lsn::from_hex("000000000154C481").unwrap(), }); - let parsed = LayerName::from_str("000000000000000000000000000000000000-000000067F00000001000004DF0000000006__00000000014FED58-000000000154C481-v1-00000001").map_err(|s| anyhow::anyhow!(s))?; + let parsed = LayerName::from_str("000000000000000000000000000000000000-000000067F00000001000004DF0000000006__00000000014FED58-000000000154C481-v1-00000001").unwrap(); assert_eq!(parsed, expected); // Omitting generation suffix is valid - let parsed = LayerName::from_str("000000000000000000000000000000000000-000000067F00000001000004DF0000000006__00000000014FED58-000000000154C481").map_err(|s| anyhow::anyhow!(s))?; + let parsed = LayerName::from_str("000000000000000000000000000000000000-000000067F00000001000004DF0000000006__00000000014FED58-000000000154C481").unwrap(); assert_eq!(parsed, expected); - - Ok(()) } } diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index 4d563a532b..f1f96f6d5f 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -56,20 +56,30 @@ class InMemoryLayerInfo: class HistoricLayerInfo: kind: str layer_file_name: str - layer_file_size: Optional[int] + layer_file_size: int lsn_start: str lsn_end: Optional[str] remote: bool + # None for image layers, true if pageserver thinks this is an L0 delta layer + l0: Optional[bool] @classmethod def from_json(cls, d: Dict[str, Any]) -> HistoricLayerInfo: + # instead of parsing the key range lets keep the definition of "L0" in pageserver + l0_ness = d.get("l0") + assert l0_ness is None or isinstance(l0_ness, bool) + + size = d["layer_file_size"] + assert isinstance(size, int) + return HistoricLayerInfo( kind=d["kind"], layer_file_name=d["layer_file_name"], - layer_file_size=d.get("layer_file_size"), + layer_file_size=size, lsn_start=d["lsn_start"], lsn_end=d.get("lsn_end"), remote=d["remote"], + l0=l0_ness, ) @@ -583,6 +593,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter): timeline_id: TimelineId, force_repartition=False, force_image_layer_creation=False, + wait_until_uploaded=False, ): self.is_testing_enabled_or_skip() query = {} @@ -590,6 +601,8 @@ class PageserverHttpClient(requests.Session, MetricsGetter): query["force_repartition"] = "true" if force_image_layer_creation: query["force_image_layer_creation"] = "true" + if wait_until_uploaded: + query["wait_until_uploaded"] = "true" log.info(f"Requesting compact: tenant {tenant_id}, timeline {timeline_id}") res = self.put( @@ -656,6 +669,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter): timeline_id: TimelineId, force_repartition=False, force_image_layer_creation=False, + wait_until_uploaded=False, ): self.is_testing_enabled_or_skip() query = {} @@ -663,6 +677,8 @@ class PageserverHttpClient(requests.Session, MetricsGetter): query["force_repartition"] = "true" if force_image_layer_creation: query["force_image_layer_creation"] = "true" + if wait_until_uploaded: + query["wait_until_uploaded"] = "true" log.info(f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}") res = self.put( diff --git a/test_runner/regress/test_compaction.py b/test_runner/regress/test_compaction.py index 93a16620a3..6a3515e1bd 100644 --- a/test_runner/regress/test_compaction.py +++ b/test_runner/regress/test_compaction.py @@ -165,7 +165,6 @@ def test_sharding_compaction( image_layer_sizes[layer.layer_file_name] = layer.layer_file_size # Pageserver should assert rather than emit an empty layer file, but double check here - assert layer.layer_file_size is not None assert layer.layer_file_size > 0 shard_has_image_layers.append(len(image_layer_sizes) > 1) @@ -178,7 +177,7 @@ def test_sharding_compaction( # # We only do this check with tiny stripes, because large stripes may not give all shards enough # data to have statistically significant image layers - avg_size = sum(v for v in image_layer_sizes.values()) / len(image_layer_sizes) # type: ignore + avg_size = sum(v for v in image_layer_sizes.values()) / len(image_layer_sizes) log.info(f"Shard {shard_id} average image layer size: {avg_size}") assert avg_size > compaction_target_size / 2 diff --git a/test_runner/regress/test_layer_eviction.py b/test_runner/regress/test_layer_eviction.py index 0ef4f6d95b..193149ea03 100644 --- a/test_runner/regress/test_layer_eviction.py +++ b/test_runner/regress/test_layer_eviction.py @@ -272,14 +272,14 @@ def test_gc_of_remote_layers(neon_env_builder: NeonEnvBuilder): resident_physical_size_metric == 0 ), "ensure that resident_physical_size metric is zero" assert resident_physical_size_metric == sum( - layer.layer_file_size or 0 for layer in info.historic_layers if not layer.remote + layer.layer_file_size for layer in info.historic_layers if not layer.remote ), "ensure that resident_physical_size metric corresponds to layer map dump" remote_physical_size_metric = ps_http.get_timeline_metric( tenant_id, timeline_id, "pageserver_remote_physical_size" ) assert remote_physical_size_metric == sum( - layer.layer_file_size or 0 for layer in info.historic_layers if layer.remote + layer.layer_file_size for layer in info.historic_layers if layer.remote ), "ensure that remote_physical_size metric corresponds to layer map dump" log.info("before runnning GC, ensure that remote_physical size is zero") diff --git a/test_runner/regress/test_ondemand_download.py b/test_runner/regress/test_ondemand_download.py index b51754c9e0..b137fb3a5c 100644 --- a/test_runner/regress/test_ondemand_download.py +++ b/test_runner/regress/test_ondemand_download.py @@ -540,7 +540,6 @@ def test_compaction_downloads_on_demand_without_image_creation(neon_env_builder: for layer in layers.historic_layers: log.info(f"pre-compact: {layer}") - assert layer.layer_file_size is not None, "we must know layer file sizes" layer_sizes += layer.layer_file_size pageserver_http.evict_layer(tenant_id, timeline_id, layer.layer_file_name) diff --git a/test_runner/regress/test_pageserver_layer_rolling.py b/test_runner/regress/test_pageserver_layer_rolling.py index aab0536f5a..66b6185aaa 100644 --- a/test_runner/regress/test_pageserver_layer_rolling.py +++ b/test_runner/regress/test_pageserver_layer_rolling.py @@ -287,7 +287,7 @@ def test_total_size_limit(neon_env_builder: NeonEnvBuilder): total_historic_bytes += sum( layer.layer_file_size for layer in layer_map.historic_layers - if layer.layer_file_size is not None and Lsn(layer.lsn_start) > initdb_lsn + if Lsn(layer.lsn_start) > initdb_lsn ) total_ephemeral_layers += len(layer_map.in_memory_layers) diff --git a/test_runner/regress/test_sharding.py b/test_runner/regress/test_sharding.py index 1bfeec6f4b..bbb1ad0c6d 100644 --- a/test_runner/regress/test_sharding.py +++ b/test_runner/regress/test_sharding.py @@ -632,7 +632,6 @@ def test_sharding_ingest_layer_sizes( historic_layers = sorted(layer_map.historic_layers, key=lambda layer: layer.lsn_start) for layer in historic_layers: - assert layer.layer_file_size is not None if layer.layer_file_size < expect_layer_size // 2: classification = "Small" small_layer_count += 1 diff --git a/test_runner/regress/test_timeline_detach_ancestor.py b/test_runner/regress/test_timeline_detach_ancestor.py index 8406de8bc1..3e435caeee 100644 --- a/test_runner/regress/test_timeline_detach_ancestor.py +++ b/test_runner/regress/test_timeline_detach_ancestor.py @@ -482,6 +482,163 @@ def test_detached_receives_flushes_while_being_detached(neon_env_builder: NeonEn env.pageserver.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS) +def test_compaction_induced_by_detaches_in_history( + neon_env_builder: NeonEnvBuilder, test_output_dir, pg_distrib_dir, pg_bin: PgBin +): + """ + Assuming the tree of timelines: + + root + |- child1 + |- ... + |- wanted_detached_child + + Each detach can add N more L0 per level, this is actually unbounded because + compaction can be arbitrarily delayed (or detach happen right before one + starts). If "wanted_detached_child" has already made progress and compacted + L1s, we want to make sure "compaction in the history" does not leave the + timeline broken. + """ + + psql_env = {"LD_LIBRARY_PATH": str(pg_distrib_dir / "lib")} + + env = neon_env_builder.init_start( + initial_tenant_conf={ + # we want to create layers manually so we don't branch on arbitrary + # Lsn, but we also do not want to compact L0 -> L1. + "compaction_threshold": "99999", + "compaction_period": "0s", + # shouldn't matter, but just in case + "gc_period": "0s", + } + ) + env.pageserver.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS) + client = env.pageserver.http_client() + + def delta_layers(timeline_id: TimelineId): + # shorthand for more readable formatting + return client.layer_map_info(env.initial_tenant, timeline_id).delta_layers() + + with env.endpoints.create_start("main", tenant_id=env.initial_tenant) as ep: + ep.safe_psql("create table integers (i bigint not null);") + ep.safe_psql("insert into integers (i) values (42)") + branch_lsn = wait_for_last_flush_lsn(env, ep, env.initial_tenant, env.initial_timeline) + + client.timeline_checkpoint(env.initial_tenant, env.initial_timeline) + + assert len(delta_layers(env.initial_timeline)) == 2 + + more_good_numbers = range(0, 3) + + branches: List[Tuple[str, TimelineId]] = [("main", env.initial_timeline)] + + for num in more_good_numbers: + branch_name = f"br-{len(branches)}" + branch_timeline_id = env.neon_cli.create_branch( + branch_name, + ancestor_branch_name=branches[-1][0], + tenant_id=env.initial_tenant, + ancestor_start_lsn=branch_lsn, + ) + branches.append((branch_name, branch_timeline_id)) + + with env.endpoints.create_start(branches[-1][0], tenant_id=env.initial_tenant) as ep: + ep.safe_psql( + f"insert into integers (i) select i from generate_series({num}, {num + 100}) as s(i)" + ) + branch_lsn = wait_for_last_flush_lsn(env, ep, env.initial_tenant, branch_timeline_id) + client.timeline_checkpoint(env.initial_tenant, branch_timeline_id) + + assert len(delta_layers(branch_timeline_id)) == 1 + + # now fill in the final, most growing timeline + + branch_name, branch_timeline_id = branches[-1] + with env.endpoints.create_start(branch_name, tenant_id=env.initial_tenant) as ep: + ep.safe_psql("insert into integers (i) select i from generate_series(50, 500) s(i)") + + last_suffix = None + for suffix in range(0, 4): + ep.safe_psql(f"create table other_table_{suffix} as select * from integers") + wait_for_last_flush_lsn(env, ep, env.initial_tenant, branch_timeline_id) + client.timeline_checkpoint(env.initial_tenant, branch_timeline_id) + last_suffix = suffix + + assert last_suffix is not None + + assert len(delta_layers(branch_timeline_id)) == 5 + + client.patch_tenant_config_client_side( + env.initial_tenant, {"compaction_threshold": 5}, None + ) + + client.timeline_compact(env.initial_tenant, branch_timeline_id) + + # one more layer + ep.safe_psql(f"create table other_table_{last_suffix + 1} as select * from integers") + wait_for_last_flush_lsn(env, ep, env.initial_tenant, branch_timeline_id) + + # we need to wait here, because the detaches will do implicit tenant restart, + # and we could get unexpected layer counts + client.timeline_checkpoint(env.initial_tenant, branch_timeline_id, wait_until_uploaded=True) + + assert len([filter(lambda x: x.l0, delta_layers(branch_timeline_id))]) == 1 + + skip_main = branches[1:] + branch_lsn = client.timeline_detail(env.initial_tenant, branch_timeline_id)["ancestor_lsn"] + + # take the fullbackup before and after inheriting the new L0s + fullbackup_before = test_output_dir / "fullbackup-before.tar" + cmd = [ + "psql", + "--no-psqlrc", + env.pageserver.connstr(), + "-c", + f"fullbackup {env.initial_tenant} {branch_timeline_id} {branch_lsn}", + "-o", + str(fullbackup_before), + ] + pg_bin.run_capture(cmd, env=psql_env) + + for _, timeline_id in skip_main: + reparented = client.detach_ancestor(env.initial_tenant, timeline_id) + assert reparented == set(), "we have no earlier branches at any level" + + post_detach_l0s = list(filter(lambda x: x.l0, delta_layers(branch_timeline_id))) + assert len(post_detach_l0s) == 5, "should had inherited 4 L0s, have 5 in total" + + # checkpoint does compaction, which in turn decides to run, because + # there is now in total threshold number L0s even if they are not + # adjacent in Lsn space: + # + # inherited flushed during this checkpoint + # \\\\ / + # 1234X5---> lsn + # | + # l1 layers from "fill in the final, most growing timeline" + # + # branch_lsn is between 4 and first X. + client.timeline_checkpoint(env.initial_tenant, branch_timeline_id) + + post_compact_l0s = list(filter(lambda x: x.l0, delta_layers(branch_timeline_id))) + assert len(post_compact_l0s) == 1, "only the consecutive inherited L0s should be compacted" + + fullbackup_after = test_output_dir / "fullbackup_after.tar" + cmd = [ + "psql", + "--no-psqlrc", + env.pageserver.connstr(), + "-c", + f"fullbackup {env.initial_tenant} {branch_timeline_id} {branch_lsn}", + "-o", + str(fullbackup_after), + ] + pg_bin.run_capture(cmd, env=psql_env) + + # we don't need to skip any files, because zenith.signal will be identical + tar_cmp(fullbackup_before, fullbackup_after, set()) + + # TODO: # - after starting the operation, tenant is deleted # - after starting the operation, pageserver is shutdown, restarted diff --git a/test_runner/regress/test_timeline_size.py b/test_runner/regress/test_timeline_size.py index a6d06df3b6..db5297870e 100644 --- a/test_runner/regress/test_timeline_size.py +++ b/test_runner/regress/test_timeline_size.py @@ -656,7 +656,7 @@ def get_physical_size_values( client = env.pageserver.http_client() res.layer_map_file_size_sum = sum( - layer.layer_file_size or 0 + layer.layer_file_size for layer in client.layer_map_info(tenant_id, timeline_id).historic_layers )