From 478322ebf90f8580258b02e1fb5c899c7f8ad279 Mon Sep 17 00:00:00 2001 From: Anastasia Lubennikova Date: Fri, 20 Jan 2023 20:21:36 +0200 Subject: [PATCH] Fix tenant size orphans (#3377) Before only the timelines which have passed the `gc_horizon` were processed which failed with orphans at the tree_sort phase. Example input in added `test_branched_empty_timeline_size` test case. The PR changes iteration to happen through all timelines, and in addition to that, any learned branch points will be calculated as they would had been in the original implementation if the ancestor branch had been over the `gc_horizon`. This also changes how tenants where all timelines are below `gc_horizon` are handled. Previously tenant_size 0 was returned, but now they will have approximately `initdb_lsn` worth of tenant_size. The PR also adds several new tenant size tests that describe various corner cases of branching structure and `gc_horizon` setting. They are currently disabled to not consume time during CI. Co-authored-by: Joonas Koivunen Co-authored-by: Anastasia Lubennikova --- pageserver/src/tenant/size.rs | 149 ++++++++++++--- test_runner/fixtures/neon_fixtures.py | 9 +- test_runner/regress/test_tenant_size.py | 244 ++++++++++++++++++++++-- 3 files changed, 360 insertions(+), 42 deletions(-) diff --git a/pageserver/src/tenant/size.rs b/pageserver/src/tenant/size.rs index dd4bf768a7..2181d6d4dc 100644 --- a/pageserver/src/tenant/size.rs +++ b/pageserver/src/tenant/size.rs @@ -23,7 +23,13 @@ use tracing::*; pub struct ModelInputs { updates: Vec, retention_period: u64, + + /// Relevant lsns per timeline. + /// + /// This field is not required for deserialization purposes, which is mostly used in tests. The + /// LSNs explain the outcome (updates) but are not needed in size calculation. #[serde_as(as = "HashMap")] + #[serde(default)] timeline_inputs: HashMap, } @@ -32,6 +38,8 @@ pub struct ModelInputs { #[serde_with::serde_as] #[derive(Debug, serde::Serialize, serde::Deserialize)] struct TimelineInputs { + #[serde_as(as = "serde_with::DisplayFromStr")] + ancestor_lsn: Lsn, #[serde_as(as = "serde_with::DisplayFromStr")] last_record: Lsn, #[serde_as(as = "serde_with::DisplayFromStr")] @@ -178,21 +186,13 @@ pub(super) async fn gather_inputs( // our advantage with `?` error handling. let mut joinset = tokio::task::JoinSet::new(); - let timelines = tenant + // refresh is needed to update gc related pitr_cutoff and horizon_cutoff + tenant .refresh_gc_info() .await .context("Failed to refresh gc_info before gathering inputs")?; - if timelines.is_empty() { - // All timelines are below tenant's gc_horizon; alternative would be to use - // Tenant::list_timelines but then those gc_info's would not be updated yet, possibly - // missing GcInfo::retain_lsns or having obsolete values for cutoff's. - return Ok(ModelInputs { - updates: vec![], - retention_period: 0, - timeline_inputs: HashMap::new(), - }); - } + let timelines = tenant.list_timelines(); // record the used/inserted cache keys here, to remove extras not to start leaking // after initial run the cache should be quite stable, but live timelines will eventually @@ -201,13 +201,25 @@ pub(super) async fn gather_inputs( let mut updates = Vec::new(); - // record the per timline values used to determine `retention_period` + // record the per timeline values useful to debug the model inputs, also used to track + // ancestor_lsn without keeping a hold of Timeline let mut timeline_inputs = HashMap::with_capacity(timelines.len()); // used to determine the `retention_period` for the size model let mut max_cutoff_distance = None; + // mapping from (TimelineId, Lsn) => if this branch point has been handled already via + // GcInfo::retain_lsns or if it needs to have its logical_size calculated. + let mut referenced_branch_froms = HashMap::<(TimelineId, Lsn), bool>::new(); + for timeline in timelines { + if !timeline.is_active() { + anyhow::bail!( + "timeline {} is not active, cannot calculate tenant_size now", + timeline.timeline_id + ); + } + let last_record_lsn = timeline.get_last_record_lsn(); let (interesting_lsns, horizon_cutoff, pitr_cutoff, next_gc_cutoff) = { @@ -273,13 +285,30 @@ pub(super) async fn gather_inputs( // all timelines branch from something, because it might be impossible to pinpoint // which is the tenant_size_model's "default" branch. + + let ancestor_lsn = timeline.get_ancestor_lsn(); + updates.push(Update { - lsn: timeline.get_ancestor_lsn(), + lsn: ancestor_lsn, command: Command::BranchFrom(timeline.get_ancestor_timeline_id()), timeline_id: timeline.timeline_id, }); + if let Some(parent_timeline_id) = timeline.get_ancestor_timeline_id() { + // refresh_gc_info will update branchpoints and pitr_cutoff but only do it for branches + // which are over gc_horizon. for example, a "main" branch which never received any + // updates apart from initdb not have branch points recorded. + referenced_branch_froms + .entry((parent_timeline_id, timeline.get_ancestor_lsn())) + .or_default(); + } + for (lsn, _kind) in &interesting_lsns { + // mark this visited so don't need to re-process this parent + *referenced_branch_froms + .entry((timeline.timeline_id, *lsn)) + .or_default() = true; + if let Some(size) = logical_size_cache.get(&(timeline.timeline_id, *lsn)) { updates.push(Update { lsn: *lsn, @@ -295,22 +324,10 @@ pub(super) async fn gather_inputs( } } - // all timelines also have an end point if they have made any progress - if last_record_lsn > timeline.get_ancestor_lsn() - && !interesting_lsns - .iter() - .any(|(lsn, _)| lsn == &last_record_lsn) - { - updates.push(Update { - lsn: last_record_lsn, - command: Command::EndOfBranch, - timeline_id: timeline.timeline_id, - }); - } - timeline_inputs.insert( timeline.timeline_id, TimelineInputs { + ancestor_lsn, last_record: last_record_lsn, // this is not used above, because it might not have updated recently enough latest_gc_cutoff: *timeline.get_latest_gc_cutoff_lsn(), @@ -321,6 +338,80 @@ pub(super) async fn gather_inputs( ); } + // iterate over discovered branch points and make sure we are getting logical sizes at those + // points. + for ((timeline_id, lsn), handled) in referenced_branch_froms.iter() { + if *handled { + continue; + } + + let timeline_id = *timeline_id; + let lsn = *lsn; + + match timeline_inputs.get(&timeline_id) { + Some(inputs) if inputs.ancestor_lsn == lsn => { + // we don't need an update at this branch point which is also point where + // timeline_id branch was branched from. + continue; + } + Some(_) => {} + None => { + // we should have this because we have iterated through all of the timelines + anyhow::bail!("missing timeline_input for {timeline_id}") + } + } + + if let Some(size) = logical_size_cache.get(&(timeline_id, lsn)) { + updates.push(Update { + lsn, + timeline_id, + command: Command::Update(*size), + }); + + needed_cache.insert((timeline_id, lsn)); + } else { + let timeline = tenant + .get_timeline(timeline_id, false) + .context("find referenced ancestor timeline")?; + let parallel_size_calcs = Arc::clone(limit); + joinset.spawn(calculate_logical_size( + parallel_size_calcs, + timeline.clone(), + lsn, + )); + + if let Some(parent_id) = timeline.get_ancestor_timeline_id() { + // we should not find new ones because we iterated tenants all timelines + anyhow::ensure!( + timeline_inputs.contains_key(&parent_id), + "discovered new timeline {parent_id} (parent of {timeline_id})" + ); + } + }; + } + + // finally add in EndOfBranch for all timelines where their last_record_lsn is not a branch + // point. this is needed by the model. + for (timeline_id, inputs) in timeline_inputs.iter() { + let lsn = inputs.last_record; + + if referenced_branch_froms.contains_key(&(*timeline_id, lsn)) { + // this means that the (timeline_id, last_record_lsn) represents a branch point + // we do not want to add EndOfBranch updates for these points because it doesn't fit + // into the current tenant_size_model. + continue; + } + + if lsn > inputs.ancestor_lsn { + // all timelines also have an end point if they have made any progress + updates.push(Update { + lsn, + command: Command::EndOfBranch, + timeline_id: *timeline_id, + }); + } + } + let mut have_any_error = false; while let Some(res) = joinset.join_next().await { @@ -379,6 +470,7 @@ pub(super) async fn gather_inputs( // handled by the variant order in `Command`. // updates.sort_unstable(); + // And another sort to handle Command::BranchFrom ordering // in case when there are multiple branches at the same LSN. let sorted_updates = sort_updates_in_tree_order(updates)?; @@ -574,7 +666,10 @@ fn updates_sort() { fn verify_size_for_multiple_branches() { // this is generated from integration test test_tenant_size_with_multiple_branches, but this way // it has the stable lsn's - let doc = r#"{"updates":[{"lsn":"0/0","command":{"branch_from":null},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/176FA40","command":{"update":25763840},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/176FA40","command":{"branch_from":"cd9d9409c216e64bf580904facedb01b"},"timeline_id":"10b532a550540bc15385eac4edde416a"},{"lsn":"0/1819818","command":{"update":26075136},"timeline_id":"10b532a550540bc15385eac4edde416a"},{"lsn":"0/18B5E40","command":{"update":26427392},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/18D3DF0","command":{"update":26492928},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/18D3DF0","command":{"branch_from":"cd9d9409c216e64bf580904facedb01b"},"timeline_id":"230fc9d756f7363574c0d66533564dcc"},{"lsn":"0/220F438","command":{"update":25239552},"timeline_id":"230fc9d756f7363574c0d66533564dcc"}],"retention_period":131072,"timeline_inputs":{"cd9d9409c216e64bf580904facedb01b":{"last_record":"0/18D5E40","latest_gc_cutoff":"0/169ACF0","horizon_cutoff":"0/18B5E40","pitr_cutoff":"0/18B5E40","next_gc_cutoff":"0/18B5E40"},"10b532a550540bc15385eac4edde416a":{"last_record":"0/1839818","latest_gc_cutoff":"0/169ACF0","horizon_cutoff":"0/1819818","pitr_cutoff":"0/1819818","next_gc_cutoff":"0/1819818"},"230fc9d756f7363574c0d66533564dcc":{"last_record":"0/222F438","latest_gc_cutoff":"0/169ACF0","horizon_cutoff":"0/220F438","pitr_cutoff":"0/220F438","next_gc_cutoff":"0/220F438"}}}"#; + // + // timelineinputs have been left out, because those explain the inputs, but don't participate + // in further size calculations. + let doc = r#"{"updates":[{"lsn":"0/0","command":{"branch_from":null},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/176FA40","command":{"update":25763840},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/176FA40","command":{"branch_from":"cd9d9409c216e64bf580904facedb01b"},"timeline_id":"10b532a550540bc15385eac4edde416a"},{"lsn":"0/1819818","command":{"update":26075136},"timeline_id":"10b532a550540bc15385eac4edde416a"},{"lsn":"0/18B5E40","command":{"update":26427392},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/18D3DF0","command":{"update":26492928},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/18D3DF0","command":{"branch_from":"cd9d9409c216e64bf580904facedb01b"},"timeline_id":"230fc9d756f7363574c0d66533564dcc"},{"lsn":"0/220F438","command":{"update":25239552},"timeline_id":"230fc9d756f7363574c0d66533564dcc"}],"retention_period":131072}"#; let inputs: ModelInputs = serde_json::from_str(doc).unwrap(); diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index d6c4c32b0b..8476066691 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1206,6 +1206,9 @@ class PageserverHttpClient(requests.Session): return res_json def tenant_size(self, tenant_id: TenantId) -> int: + return self.tenant_size_and_modelinputs(tenant_id)[0] + + def tenant_size_and_modelinputs(self, tenant_id: TenantId) -> Tuple[int, Dict[str, Any]]: """ Returns the tenant size, together with the model inputs as the second tuple item. """ @@ -1216,9 +1219,9 @@ class PageserverHttpClient(requests.Session): assert TenantId(res["id"]) == tenant_id size = res["size"] assert type(size) == int - # there are additional inputs, which are the collected raw information before being fed to the tenant_size_model - # there are no tests for those right now. - return size + inputs = res["inputs"] + assert type(inputs) is dict + return (size, inputs) def timeline_list( self, diff --git a/test_runner/regress/test_tenant_size.py b/test_runner/regress/test_tenant_size.py index 5747ae235f..72cfbc9dda 100644 --- a/test_runner/regress/test_tenant_size.py +++ b/test_runner/regress/test_tenant_size.py @@ -1,5 +1,6 @@ -from typing import List, Tuple +from typing import Any, List, Tuple +import pytest from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, wait_for_last_flush_lsn from fixtures.types import Lsn @@ -9,28 +10,247 @@ def test_empty_tenant_size(neon_simple_env: NeonEnv): env = neon_simple_env (tenant_id, _) = env.neon_cli.create_tenant() http_client = env.pageserver.http_client() - size = http_client.tenant_size(tenant_id) + initial_size = http_client.tenant_size(tenant_id) - # we should never have zero, because there should be the initdb however - # this is questionable if we should have anything in this case, as the - # gc_cutoff is negative - assert ( - size == 0 - ), "initial implementation returns zero tenant_size before last_record_lsn is past gc_horizon" + # we should never have zero, because there should be the initdb "changes" + assert initial_size > 0, "initial implementation returns ~initdb tenant_size" - with env.postgres.create_start("main", tenant_id=tenant_id) as pg: + main_branch_name = "main" + + with env.postgres.create_start( + main_branch_name, + tenant_id=tenant_id, + config_lines=["autovacuum=off", "checkpoint_timeout=10min"], + ) as pg: with pg.cursor() as cur: cur.execute("SELECT 1") row = cur.fetchone() assert row is not None assert row[0] == 1 size = http_client.tenant_size(tenant_id) - assert size == 0, "starting idle compute should not change the tenant size" + # we've disabled the autovacuum and checkpoint + # so background processes should not change the size. + # If this test will flake we should probably loosen the check + assert size == initial_size, "starting idle compute should not change the tenant size" # the size should be the same, until we increase the size over the # gc_horizon - size = http_client.tenant_size(tenant_id) - assert size == 0, "tenant_size should not be affected by shutdown of compute" + size, inputs = http_client.tenant_size_and_modelinputs(tenant_id) + assert size == initial_size, "tenant_size should not be affected by shutdown of compute" + + expected_commands: List[Any] = [{"branch_from": None}, "end_of_branch"] + actual_commands: List[Any] = list(map(lambda x: x["command"], inputs["updates"])) # type: ignore + assert actual_commands == expected_commands + + +def test_branched_empty_timeline_size(neon_simple_env: NeonEnv): + """ + Issue found in production. Because the ancestor branch was under + gc_horizon, the branchpoint was "dangling" and the computation could not be + done. + + Assuming gc_horizon = 50 + root: I 0---10------>20 + branch: |-------------------I---------->150 + gc_horizon + """ + env = neon_simple_env + (tenant_id, _) = env.neon_cli.create_tenant() + http_client = env.pageserver.http_client() + + initial_size = http_client.tenant_size(tenant_id) + + first_branch_timeline_id = env.neon_cli.create_branch("first-branch", tenant_id=tenant_id) + + with env.postgres.create_start("first-branch", tenant_id=tenant_id) as pg: + with pg.cursor() as cur: + cur.execute( + "CREATE TABLE t0 AS SELECT i::bigint n FROM generate_series(0, 1000000) s(i)" + ) + wait_for_last_flush_lsn(env, pg, tenant_id, first_branch_timeline_id) + + size_after_branching = http_client.tenant_size(tenant_id) + log.info(f"size_after_branching: {size_after_branching}") + + assert size_after_branching > initial_size + + +def test_branched_from_many_empty_parents_size(neon_simple_env: NeonEnv): + """ + More general version of test_branched_empty_timeline_size + + Assuming gc_horizon = 50 + + root: I 0------10 + first: I 10 + nth_0: I 10 + nth_1: I 10 + nth_n: 10------------I--------100 + """ + env = neon_simple_env + (tenant_id, _) = env.neon_cli.create_tenant() + http_client = env.pageserver.http_client() + + initial_size = http_client.tenant_size(tenant_id) + + first_branch_name = "first" + env.neon_cli.create_branch(first_branch_name, tenant_id=tenant_id) + + size_after_branching = http_client.tenant_size(tenant_id) + + # this might be flaky like test_get_tenant_size_with_multiple_branches + # https://github.com/neondatabase/neon/issues/2962 + assert size_after_branching == initial_size + + last_branch_name = first_branch_name + last_branch = None + + for i in range(0, 4): + latest_branch_name = f"nth_{i}" + last_branch = env.neon_cli.create_branch( + latest_branch_name, ancestor_branch_name=last_branch_name, tenant_id=tenant_id + ) + last_branch_name = latest_branch_name + + size_after_branching = http_client.tenant_size(tenant_id) + assert size_after_branching == initial_size + + assert last_branch is not None + + with env.postgres.create_start(last_branch_name, tenant_id=tenant_id) as pg: + with pg.cursor() as cur: + cur.execute( + "CREATE TABLE t0 AS SELECT i::bigint n FROM generate_series(0, 1000000) s(i)" + ) + wait_for_last_flush_lsn(env, pg, tenant_id, last_branch) + + size_after_writes = http_client.tenant_size(tenant_id) + assert size_after_writes > initial_size + + +@pytest.mark.skip("This should work, but is left out because assumed covered by other tests") +def test_branch_point_within_horizon(neon_simple_env: NeonEnv): + """ + gc_horizon = 15 + + main: 0--I-10------>20 + branch: |-------------------I---------->150 + gc_horizon + """ + + env = neon_simple_env + gc_horizon = 20_000 + (tenant_id, main_id) = env.neon_cli.create_tenant(conf={"gc_horizon": str(gc_horizon)}) + http_client = env.pageserver.http_client() + + with env.postgres.create_start("main", tenant_id=tenant_id) as pg: + initdb_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, main_id) + with pg.cursor() as cur: + cur.execute("CREATE TABLE t0 AS SELECT i::bigint n FROM generate_series(0, 1000) s(i)") + flushed_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, main_id) + + size_before_branching = http_client.tenant_size(tenant_id) + + assert flushed_lsn.lsn_int - gc_horizon > initdb_lsn.lsn_int + + branch_id = env.neon_cli.create_branch( + "branch", tenant_id=tenant_id, ancestor_start_lsn=flushed_lsn + ) + + with env.postgres.create_start("branch", tenant_id=tenant_id) as pg: + with pg.cursor() as cur: + cur.execute("CREATE TABLE t1 AS SELECT i::bigint n FROM generate_series(0, 1000) s(i)") + wait_for_last_flush_lsn(env, pg, tenant_id, branch_id) + + size_after = http_client.tenant_size(tenant_id) + + assert size_before_branching < size_after + + +@pytest.mark.skip("This should work, but is left out because assumed covered by other tests") +def test_parent_within_horizon(neon_simple_env: NeonEnv): + """ + gc_horizon = 5 + + main: 0----10----I->20 + branch: |-------------------I---------->150 + gc_horizon + """ + + env = neon_simple_env + gc_horizon = 200_000 + (tenant_id, main_id) = env.neon_cli.create_tenant(conf={"gc_horizon": str(gc_horizon)}) + http_client = env.pageserver.http_client() + + with env.postgres.create_start("main", tenant_id=tenant_id) as pg: + initdb_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, main_id) + with pg.cursor() as cur: + cur.execute("CREATE TABLE t0 AS SELECT i::bigint n FROM generate_series(0, 1000) s(i)") + + flushed_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, main_id) + + with pg.cursor() as cur: + cur.execute("CREATE TABLE t00 AS SELECT i::bigint n FROM generate_series(0, 2000) s(i)") + + wait_for_last_flush_lsn(env, pg, tenant_id, main_id) + + size_before_branching = http_client.tenant_size(tenant_id) + + assert flushed_lsn.lsn_int - gc_horizon > initdb_lsn.lsn_int + + branch_id = env.neon_cli.create_branch( + "branch", tenant_id=tenant_id, ancestor_start_lsn=flushed_lsn + ) + + with env.postgres.create_start("branch", tenant_id=tenant_id) as pg: + with pg.cursor() as cur: + cur.execute("CREATE TABLE t1 AS SELECT i::bigint n FROM generate_series(0, 10000) s(i)") + wait_for_last_flush_lsn(env, pg, tenant_id, branch_id) + + size_after = http_client.tenant_size(tenant_id) + + assert size_before_branching < size_after + + +@pytest.mark.skip("This should work, but is left out because assumed covered by other tests") +def test_only_heads_within_horizon(neon_simple_env: NeonEnv): + """ + gc_horizon = small + + main: 0--------10-----I>20 + first: |-----------------------------I>150 + second: |---------I>30 + """ + + env = neon_simple_env + (tenant_id, main_id) = env.neon_cli.create_tenant(conf={"gc_horizon": "1024"}) + http_client = env.pageserver.http_client() + + initial_size = http_client.tenant_size(tenant_id) + + first_id = env.neon_cli.create_branch("first", tenant_id=tenant_id) + second_id = env.neon_cli.create_branch("second", tenant_id=tenant_id) + + ids = {"main": main_id, "first": first_id, "second": second_id} + + latest_size = None + + # gc is not expected to change the results + + for branch_name, amount in [("main", 2000), ("first", 15000), ("second", 3000)]: + with env.postgres.create_start(branch_name, tenant_id=tenant_id) as pg: + with pg.cursor() as cur: + cur.execute( + f"CREATE TABLE t0 AS SELECT i::bigint n FROM generate_series(0, {amount}) s(i)" + ) + wait_for_last_flush_lsn(env, pg, tenant_id, ids[branch_name]) + size_now = http_client.tenant_size(tenant_id) + if latest_size is not None: + assert size_now > latest_size + else: + assert size_now > initial_size + + latest_size = size_now def test_single_branch_get_tenant_size_grows(neon_env_builder: NeonEnvBuilder):