diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 73ec54132b..d43566d2df 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -116,6 +116,7 @@ fn main() -> Result<()> { "attachment_service" => handle_attachment_service(sub_args, &env), "safekeeper" => handle_safekeeper(sub_args, &env), "endpoint" => handle_endpoint(sub_args, &env), + "mappings" => handle_mappings(sub_args, &mut env), "pg" => bail!("'pg' subcommand has been renamed to 'endpoint'"), _ => bail!("unexpected subcommand {sub_name}"), }; @@ -816,6 +817,38 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( Ok(()) } +fn handle_mappings(sub_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Result<()> { + let (sub_name, sub_args) = match sub_match.subcommand() { + Some(ep_subcommand_data) => ep_subcommand_data, + None => bail!("no mappings subcommand provided"), + }; + + match sub_name { + "map" => { + let branch_name = sub_args + .get_one::("branch-name") + .expect("branch-name argument missing"); + + let tenant_id = sub_args + .get_one::("tenant-id") + .map(|x| TenantId::from_str(x)) + .expect("tenant-id argument missing") + .expect("malformed tenant-id arg"); + + let timeline_id = sub_args + .get_one::("timeline-id") + .map(|x| TimelineId::from_str(x)) + .expect("timeline-id argument missing") + .expect("malformed timeline-id arg"); + + env.register_branch_mapping(branch_name.to_owned(), tenant_id, timeline_id)?; + + Ok(()) + } + other => unimplemented!("mappings subcommand {other}"), + } +} + fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { fn get_pageserver(env: &local_env::LocalEnv, args: &ArgMatches) -> Result { let node_id = if let Some(id_str) = args.get_one::("pageserver-id") { @@ -1325,8 +1358,8 @@ fn cli() -> Command { .about("Start postgres.\n If the endpoint doesn't exist yet, it is created.") .arg(endpoint_id_arg.clone()) .arg(tenant_id_arg.clone()) - .arg(branch_name_arg) - .arg(timeline_id_arg) + .arg(branch_name_arg.clone()) + .arg(timeline_id_arg.clone()) .arg(lsn_arg) .arg(pg_port_arg) .arg(http_port_arg) @@ -1339,7 +1372,7 @@ fn cli() -> Command { .subcommand( Command::new("stop") .arg(endpoint_id_arg) - .arg(tenant_id_arg) + .arg(tenant_id_arg.clone()) .arg( Arg::new("destroy") .help("Also delete data directory (now optional, should be default in future)") @@ -1350,6 +1383,18 @@ fn cli() -> Command { ) ) + .subcommand( + Command::new("mappings") + .arg_required_else_help(true) + .about("Manage neon_local branch name mappings") + .subcommand( + Command::new("map") + .about("Create new mapping which cannot exist already") + .arg(branch_name_arg.clone()) + .arg(tenant_id_arg.clone()) + .arg(timeline_id_arg.clone()) + ) + ) // Obsolete old name for 'endpoint'. We now just print an error if it's used. .subcommand( Command::new("pg") diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index f5a9f2b16a..e0529aeafa 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -396,6 +396,9 @@ async fn timeline_create_handler( format!("{err:#}") )) } + Err(e @ tenant::CreateTimelineError::AncestorNotActive) => { + json_response(StatusCode::SERVICE_UNAVAILABLE, HttpErrorBody::from_msg(e.to_string())) + } Err(tenant::CreateTimelineError::Other(err)) => Err(ApiError::InternalServerError(err)), } } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 17e17d8d20..57c1b5f070 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -406,6 +406,8 @@ pub enum CreateTimelineError { AlreadyExists, #[error(transparent)] AncestorLsn(anyhow::Error), + #[error("ancestor timeline is not active")] + AncestorNotActive, #[error(transparent)] Other(#[from] anyhow::Error), } @@ -1587,6 +1589,12 @@ impl Tenant { .get_timeline(ancestor_timeline_id, false) .context("Cannot branch off the timeline that's not present in pageserver")?; + // instead of waiting around, just deny the request because ancestor is not yet + // ready for other purposes either. + if !ancestor_timeline.is_active() { + return Err(CreateTimelineError::AncestorNotActive); + } + if let Some(lsn) = ancestor_start_lsn.as_mut() { *lsn = lsn.align(); @@ -1619,8 +1627,6 @@ impl Tenant { } }; - loaded_timeline.activate(broker_client, None, ctx); - if let Some(remote_client) = loaded_timeline.remote_client.as_ref() { // Wait for the upload of the 'index_part.json` file to finish, so that when we return // Ok, the timeline is durable in remote storage. @@ -1632,6 +1638,8 @@ impl Tenant { })?; } + loaded_timeline.activate(broker_client, None, ctx); + Ok(loaded_timeline) } diff --git a/pageserver/src/tenant/remote_timeline_client/upload.rs b/pageserver/src/tenant/remote_timeline_client/upload.rs index dcb49794d4..90e603deb0 100644 --- a/pageserver/src/tenant/remote_timeline_client/upload.rs +++ b/pageserver/src/tenant/remote_timeline_client/upload.rs @@ -31,6 +31,7 @@ pub(super) async fn upload_index_part<'a>( fail_point!("before-upload-index", |_| { bail!("failpoint before-upload-index") }); + pausable_failpoint!("before-upload-index-pausable"); let index_part_bytes = serde_json::to_vec(&index_part).context("serialize index part file into bytes")?; diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index feac846a4c..d9a75637b8 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1464,6 +1464,29 @@ class NeonCli(AbstractNeonCli): return self.raw_cli(args, check_return_code=check_return_code) + def map_branch( + self, name: str, tenant_id: TenantId, timeline_id: TimelineId + ) -> "subprocess.CompletedProcess[str]": + """ + Map tenant id and timeline id to a neon_local branch name. They do not have to exist. + Usually needed when creating branches via PageserverHttpClient and not neon_local. + + After creating a name mapping, you can use EndpointFactory.create_start + with this registered branch name. + """ + args = [ + "mappings", + "map", + "--branch-name", + name, + "--tenant-id", + str(tenant_id), + "--timeline-id", + str(timeline_id), + ] + + return self.raw_cli(args, check_return_code=True) + def start(self, check_return_code=True) -> "subprocess.CompletedProcess[str]": return self.raw_cli(["start"], check_return_code=check_return_code) diff --git a/test_runner/fixtures/pageserver/utils.py b/test_runner/fixtures/pageserver/utils.py index 70c2a06a07..e54b5408b4 100644 --- a/test_runner/fixtures/pageserver/utils.py +++ b/test_runner/fixtures/pageserver/utils.py @@ -74,11 +74,14 @@ def wait_until_tenant_state( for _ in range(iterations): try: tenant = pageserver_http.tenant_status(tenant_id=tenant_id) + except Exception as e: + log.debug(f"Tenant {tenant_id} state retrieval failure: {e}") + else: log.debug(f"Tenant {tenant_id} data: {tenant}") if tenant["state"]["slug"] == expected_state: return tenant - except Exception as e: - log.debug(f"Tenant {tenant_id} state retrieval failure: {e}") + if tenant["state"]["slug"] == "Broken": + raise RuntimeError(f"tenant became Broken, not {expected_state}") time.sleep(period) diff --git a/test_runner/regress/test_branching.py b/test_runner/regress/test_branching.py index 31f9df6ebe..2541d5d475 100644 --- a/test_runner/regress/test_branching.py +++ b/test_runner/regress/test_branching.py @@ -1,14 +1,24 @@ import random import threading import time -from typing import List +from queue import SimpleQueue +from typing import Any, Dict, List, Union import pytest from fixtures.log_helper import log -from fixtures.neon_fixtures import Endpoint, NeonEnv, PgBin -from fixtures.types import Lsn +from fixtures.neon_fixtures import ( + Endpoint, + NeonEnv, + NeonEnvBuilder, + PgBin, +) +from fixtures.pageserver.http import PageserverApiException +from fixtures.pageserver.utils import wait_until_tenant_active +from fixtures.types import Lsn, TimelineId from fixtures.utils import query_scalar from performance.test_perf_pgbench import get_scales_matrix +from requests import RequestException +from requests.exceptions import RetryError # Test branch creation @@ -128,3 +138,245 @@ def test_branching_unnormalized_start_lsn(neon_simple_env: NeonEnv, pg_bin: PgBi endpoint1 = env.endpoints.create_start("b1") pg_bin.run_capture(["pgbench", "-i", endpoint1.connstr()]) + + +def test_cannot_create_endpoint_on_non_uploaded_timeline(neon_env_builder: NeonEnvBuilder): + """ + Endpoint should not be possible to create because branch has not been uploaded. + """ + + env = neon_env_builder.init_configs() + env.start() + + env.pageserver.allowed_errors.append( + ".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*" + ) + env.pageserver.allowed_errors.append( + ".*page_service_conn_main.*: query handler for 'basebackup .* is not active, state: Loading" + ) + ps_http = env.pageserver.http_client() + + # pause all uploads + ps_http.configure_failpoints(("before-upload-index-pausable", "pause")) + ps_http.tenant_create(env.initial_tenant) + + initial_branch = "initial_branch" + + def start_creating_timeline(): + with pytest.raises(RequestException): + ps_http.timeline_create( + env.pg_version, env.initial_tenant, env.initial_timeline, timeout=60 + ) + + t = threading.Thread(target=start_creating_timeline) + try: + t.start() + + wait_until_paused(env, "before-upload-index-pausable") + + env.neon_cli.map_branch(initial_branch, env.initial_tenant, env.initial_timeline) + + with pytest.raises(RuntimeError, match="is not active, state: Loading"): + env.endpoints.create_start(initial_branch, tenant_id=env.initial_tenant) + finally: + # FIXME: paused uploads bother shutdown + env.pageserver.stop(immediate=True) + + t.join() + + +def test_cannot_branch_from_non_uploaded_branch(neon_env_builder: NeonEnvBuilder): + """ + Branch should not be possible to create because ancestor has not been uploaded. + """ + + env = neon_env_builder.init_configs() + env.start() + + env.pageserver.allowed_errors.append( + ".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*" + ) + ps_http = env.pageserver.http_client() + + # pause all uploads + ps_http.configure_failpoints(("before-upload-index-pausable", "pause")) + ps_http.tenant_create(env.initial_tenant) + + def start_creating_timeline(): + with pytest.raises(RequestException): + ps_http.timeline_create( + env.pg_version, env.initial_tenant, env.initial_timeline, timeout=60 + ) + + t = threading.Thread(target=start_creating_timeline) + try: + t.start() + + wait_until_paused(env, "before-upload-index-pausable") + + branch_id = TimelineId.generate() + + with pytest.raises(RetryError, match="too many 503 error responses"): + ps_http.timeline_create( + env.pg_version, + env.initial_tenant, + branch_id, + ancestor_timeline_id=env.initial_timeline, + ) + + with pytest.raises( + PageserverApiException, + match=f"NotFound: Timeline {env.initial_tenant}/{branch_id} was not found", + ): + ps_http.timeline_detail(env.initial_tenant, branch_id) + # important to note that a task might still be in progress to complete + # the work, but will never get to that because we have the pause + # failpoint + finally: + # FIXME: paused uploads bother shutdown + env.pageserver.stop(immediate=True) + + t.join() + + +def test_competing_branchings_from_loading_race_to_ok_or_err(neon_env_builder: NeonEnvBuilder): + """ + If the activate only after upload is used, then retries could become competing. + """ + + env = neon_env_builder.init_configs() + env.start() + + env.pageserver.allowed_errors.append( + ".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*" + ) + env.pageserver.allowed_errors.append( + ".*Error processing HTTP request: InternalServerError\\(Timeline .*/.* already exists in pageserver's memory" + ) + ps_http = env.pageserver.http_client() + + # pause all uploads + ps_http.configure_failpoints(("before-upload-index-pausable", "pause")) + ps_http.tenant_create(env.initial_tenant) + + def start_creating_timeline(): + ps_http.timeline_create( + env.pg_version, env.initial_tenant, env.initial_timeline, timeout=60 + ) + + create_root = threading.Thread(target=start_creating_timeline) + + branch_id = TimelineId.generate() + + queue: SimpleQueue[Union[Dict[Any, Any], Exception]] = SimpleQueue() + barrier = threading.Barrier(3) + + def try_branch(): + barrier.wait() + barrier.wait() + try: + ret = ps_http.timeline_create( + env.pg_version, + env.initial_tenant, + branch_id, + ancestor_timeline_id=env.initial_timeline, + timeout=5, + ) + queue.put(ret) + except Exception as e: + queue.put(e) + + threads = [threading.Thread(target=try_branch) for _ in range(2)] + + try: + create_root.start() + + for t in threads: + t.start() + + wait_until_paused(env, "before-upload-index-pausable") + + barrier.wait() + ps_http.configure_failpoints(("before-upload-index-pausable", "off")) + barrier.wait() + + # now both requests race to branch, only one can win because they take gc_cs, Tenant::timelines or marker files + first = queue.get() + second = queue.get() + + log.info(first) + log.info(second) + + (succeeded, failed) = (first, second) if isinstance(second, Exception) else (second, first) + assert isinstance(failed, Exception) + assert isinstance(succeeded, Dict) + + # FIXME: there's probably multiple valid status codes: + # - Timeline 62505b9a9f6b1d29117b1b74eaf07b12/56cd19d3b2dbcc65e9d53ec6ca304f24 already exists + # - whatever 409 response says, but that is a subclass of PageserverApiException + assert isinstance(failed, PageserverApiException) + assert succeeded["state"] == "Active" + finally: + # we might still have the failpoint active + env.pageserver.stop(immediate=True) + + # pytest should nag if we leave threads unjoined + for t in threads: + t.join() + create_root.join() + + +def test_non_uploaded_branch_availability_after_restart(neon_env_builder: NeonEnvBuilder): + """ + Currently before RFC#27 we keep and continue uploading branches which were not successfully uploaded before shutdown. + + This test likely duplicates some other test, but it's easier to write one than to make sure there will be a failing test when the rfc is implemented. + """ + + env = neon_env_builder.init_configs() + env.start() + + env.pageserver.allowed_errors.append( + ".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*" + ) + ps_http = env.pageserver.http_client() + + # pause all uploads + ps_http.configure_failpoints(("before-upload-index-pausable", "pause")) + ps_http.tenant_create(env.initial_tenant) + + def start_creating_timeline(): + with pytest.raises(RequestException): + ps_http.timeline_create( + env.pg_version, env.initial_tenant, env.initial_timeline, timeout=60 + ) + + t = threading.Thread(target=start_creating_timeline) + try: + t.start() + + wait_until_paused(env, "before-upload-index-pausable") + finally: + # FIXME: paused uploads bother shutdown + env.pageserver.stop(immediate=True) + t.join() + + # now without a failpoint + env.pageserver.start() + + wait_until_tenant_active(ps_http, env.initial_tenant) + + # currently it lives on and will get eventually uploaded, but this will change + detail = ps_http.timeline_detail(env.initial_tenant, env.initial_timeline) + assert detail["state"] == "Active" + + +def wait_until_paused(env: NeonEnv, failpoint: str): + found = False + msg = f"at failpoint {failpoint}" + for _ in range(20): + time.sleep(1) + found = env.pageserver.log_contains(msg) is not None + if found: + break + assert found