fix: deny branching, starting compute from not yet uploaded timelines (#5484)

Part of #5172. First commits show that we used to allow starting up a
compute or creating a branch off a not yet uploaded timeline. This PR
moves activation of a timeline to happen **after** initial layer file(s)
(if any) and `index_part.json` have been uploaded. Simply moving
activation to be *after* downloads have finished works because we now
spawn a task per http request handler.

Current behaviour of uploading on the timelines on next startup is kept,
to be removed later as part of #5172.

Adds:
- `NeonCli.map_branch` and corresponding `neon_local` implementation:
allow creating computes for timelines managed via pageserver http
client/api
- possibly duplicate tests (I did not want to search for, will cleanup
in a follow-up if these duplicated)

Changes:
- make `wait_until_tenant_state` return immediatedly on `Broken` and not
wait more
This commit is contained in:
Joonas Koivunen
2023-10-09 17:03:38 +03:00
committed by GitHub
parent 010b4d0d5c
commit 4772cd6c93
7 changed files with 345 additions and 10 deletions

View File

@@ -116,6 +116,7 @@ fn main() -> Result<()> {
"attachment_service" => handle_attachment_service(sub_args, &env), "attachment_service" => handle_attachment_service(sub_args, &env),
"safekeeper" => handle_safekeeper(sub_args, &env), "safekeeper" => handle_safekeeper(sub_args, &env),
"endpoint" => handle_endpoint(sub_args, &env), "endpoint" => handle_endpoint(sub_args, &env),
"mappings" => handle_mappings(sub_args, &mut env),
"pg" => bail!("'pg' subcommand has been renamed to 'endpoint'"), "pg" => bail!("'pg' subcommand has been renamed to 'endpoint'"),
_ => bail!("unexpected subcommand {sub_name}"), _ => bail!("unexpected subcommand {sub_name}"),
}; };
@@ -816,6 +817,38 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
Ok(()) Ok(())
} }
fn handle_mappings(sub_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Result<()> {
let (sub_name, sub_args) = match sub_match.subcommand() {
Some(ep_subcommand_data) => ep_subcommand_data,
None => bail!("no mappings subcommand provided"),
};
match sub_name {
"map" => {
let branch_name = sub_args
.get_one::<String>("branch-name")
.expect("branch-name argument missing");
let tenant_id = sub_args
.get_one::<String>("tenant-id")
.map(|x| TenantId::from_str(x))
.expect("tenant-id argument missing")
.expect("malformed tenant-id arg");
let timeline_id = sub_args
.get_one::<String>("timeline-id")
.map(|x| TimelineId::from_str(x))
.expect("timeline-id argument missing")
.expect("malformed timeline-id arg");
env.register_branch_mapping(branch_name.to_owned(), tenant_id, timeline_id)?;
Ok(())
}
other => unimplemented!("mappings subcommand {other}"),
}
}
fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
fn get_pageserver(env: &local_env::LocalEnv, args: &ArgMatches) -> Result<PageServerNode> { fn get_pageserver(env: &local_env::LocalEnv, args: &ArgMatches) -> Result<PageServerNode> {
let node_id = if let Some(id_str) = args.get_one::<String>("pageserver-id") { let node_id = if let Some(id_str) = args.get_one::<String>("pageserver-id") {
@@ -1325,8 +1358,8 @@ fn cli() -> Command {
.about("Start postgres.\n If the endpoint doesn't exist yet, it is created.") .about("Start postgres.\n If the endpoint doesn't exist yet, it is created.")
.arg(endpoint_id_arg.clone()) .arg(endpoint_id_arg.clone())
.arg(tenant_id_arg.clone()) .arg(tenant_id_arg.clone())
.arg(branch_name_arg) .arg(branch_name_arg.clone())
.arg(timeline_id_arg) .arg(timeline_id_arg.clone())
.arg(lsn_arg) .arg(lsn_arg)
.arg(pg_port_arg) .arg(pg_port_arg)
.arg(http_port_arg) .arg(http_port_arg)
@@ -1339,7 +1372,7 @@ fn cli() -> Command {
.subcommand( .subcommand(
Command::new("stop") Command::new("stop")
.arg(endpoint_id_arg) .arg(endpoint_id_arg)
.arg(tenant_id_arg) .arg(tenant_id_arg.clone())
.arg( .arg(
Arg::new("destroy") Arg::new("destroy")
.help("Also delete data directory (now optional, should be default in future)") .help("Also delete data directory (now optional, should be default in future)")
@@ -1350,6 +1383,18 @@ fn cli() -> Command {
) )
) )
.subcommand(
Command::new("mappings")
.arg_required_else_help(true)
.about("Manage neon_local branch name mappings")
.subcommand(
Command::new("map")
.about("Create new mapping which cannot exist already")
.arg(branch_name_arg.clone())
.arg(tenant_id_arg.clone())
.arg(timeline_id_arg.clone())
)
)
// Obsolete old name for 'endpoint'. We now just print an error if it's used. // Obsolete old name for 'endpoint'. We now just print an error if it's used.
.subcommand( .subcommand(
Command::new("pg") Command::new("pg")

View File

@@ -396,6 +396,9 @@ async fn timeline_create_handler(
format!("{err:#}") format!("{err:#}")
)) ))
} }
Err(e @ tenant::CreateTimelineError::AncestorNotActive) => {
json_response(StatusCode::SERVICE_UNAVAILABLE, HttpErrorBody::from_msg(e.to_string()))
}
Err(tenant::CreateTimelineError::Other(err)) => Err(ApiError::InternalServerError(err)), Err(tenant::CreateTimelineError::Other(err)) => Err(ApiError::InternalServerError(err)),
} }
} }

View File

@@ -406,6 +406,8 @@ pub enum CreateTimelineError {
AlreadyExists, AlreadyExists,
#[error(transparent)] #[error(transparent)]
AncestorLsn(anyhow::Error), AncestorLsn(anyhow::Error),
#[error("ancestor timeline is not active")]
AncestorNotActive,
#[error(transparent)] #[error(transparent)]
Other(#[from] anyhow::Error), Other(#[from] anyhow::Error),
} }
@@ -1587,6 +1589,12 @@ impl Tenant {
.get_timeline(ancestor_timeline_id, false) .get_timeline(ancestor_timeline_id, false)
.context("Cannot branch off the timeline that's not present in pageserver")?; .context("Cannot branch off the timeline that's not present in pageserver")?;
// instead of waiting around, just deny the request because ancestor is not yet
// ready for other purposes either.
if !ancestor_timeline.is_active() {
return Err(CreateTimelineError::AncestorNotActive);
}
if let Some(lsn) = ancestor_start_lsn.as_mut() { if let Some(lsn) = ancestor_start_lsn.as_mut() {
*lsn = lsn.align(); *lsn = lsn.align();
@@ -1619,8 +1627,6 @@ impl Tenant {
} }
}; };
loaded_timeline.activate(broker_client, None, ctx);
if let Some(remote_client) = loaded_timeline.remote_client.as_ref() { if let Some(remote_client) = loaded_timeline.remote_client.as_ref() {
// Wait for the upload of the 'index_part.json` file to finish, so that when we return // Wait for the upload of the 'index_part.json` file to finish, so that when we return
// Ok, the timeline is durable in remote storage. // Ok, the timeline is durable in remote storage.
@@ -1632,6 +1638,8 @@ impl Tenant {
})?; })?;
} }
loaded_timeline.activate(broker_client, None, ctx);
Ok(loaded_timeline) Ok(loaded_timeline)
} }

View File

@@ -31,6 +31,7 @@ pub(super) async fn upload_index_part<'a>(
fail_point!("before-upload-index", |_| { fail_point!("before-upload-index", |_| {
bail!("failpoint before-upload-index") bail!("failpoint before-upload-index")
}); });
pausable_failpoint!("before-upload-index-pausable");
let index_part_bytes = let index_part_bytes =
serde_json::to_vec(&index_part).context("serialize index part file into bytes")?; serde_json::to_vec(&index_part).context("serialize index part file into bytes")?;

View File

@@ -1464,6 +1464,29 @@ class NeonCli(AbstractNeonCli):
return self.raw_cli(args, check_return_code=check_return_code) return self.raw_cli(args, check_return_code=check_return_code)
def map_branch(
self, name: str, tenant_id: TenantId, timeline_id: TimelineId
) -> "subprocess.CompletedProcess[str]":
"""
Map tenant id and timeline id to a neon_local branch name. They do not have to exist.
Usually needed when creating branches via PageserverHttpClient and not neon_local.
After creating a name mapping, you can use EndpointFactory.create_start
with this registered branch name.
"""
args = [
"mappings",
"map",
"--branch-name",
name,
"--tenant-id",
str(tenant_id),
"--timeline-id",
str(timeline_id),
]
return self.raw_cli(args, check_return_code=True)
def start(self, check_return_code=True) -> "subprocess.CompletedProcess[str]": def start(self, check_return_code=True) -> "subprocess.CompletedProcess[str]":
return self.raw_cli(["start"], check_return_code=check_return_code) return self.raw_cli(["start"], check_return_code=check_return_code)

View File

@@ -74,11 +74,14 @@ def wait_until_tenant_state(
for _ in range(iterations): for _ in range(iterations):
try: try:
tenant = pageserver_http.tenant_status(tenant_id=tenant_id) tenant = pageserver_http.tenant_status(tenant_id=tenant_id)
except Exception as e:
log.debug(f"Tenant {tenant_id} state retrieval failure: {e}")
else:
log.debug(f"Tenant {tenant_id} data: {tenant}") log.debug(f"Tenant {tenant_id} data: {tenant}")
if tenant["state"]["slug"] == expected_state: if tenant["state"]["slug"] == expected_state:
return tenant return tenant
except Exception as e: if tenant["state"]["slug"] == "Broken":
log.debug(f"Tenant {tenant_id} state retrieval failure: {e}") raise RuntimeError(f"tenant became Broken, not {expected_state}")
time.sleep(period) time.sleep(period)

View File

@@ -1,14 +1,24 @@
import random import random
import threading import threading
import time import time
from typing import List from queue import SimpleQueue
from typing import Any, Dict, List, Union
import pytest import pytest
from fixtures.log_helper import log from fixtures.log_helper import log
from fixtures.neon_fixtures import Endpoint, NeonEnv, PgBin from fixtures.neon_fixtures import (
from fixtures.types import Lsn Endpoint,
NeonEnv,
NeonEnvBuilder,
PgBin,
)
from fixtures.pageserver.http import PageserverApiException
from fixtures.pageserver.utils import wait_until_tenant_active
from fixtures.types import Lsn, TimelineId
from fixtures.utils import query_scalar from fixtures.utils import query_scalar
from performance.test_perf_pgbench import get_scales_matrix from performance.test_perf_pgbench import get_scales_matrix
from requests import RequestException
from requests.exceptions import RetryError
# Test branch creation # Test branch creation
@@ -128,3 +138,245 @@ def test_branching_unnormalized_start_lsn(neon_simple_env: NeonEnv, pg_bin: PgBi
endpoint1 = env.endpoints.create_start("b1") endpoint1 = env.endpoints.create_start("b1")
pg_bin.run_capture(["pgbench", "-i", endpoint1.connstr()]) pg_bin.run_capture(["pgbench", "-i", endpoint1.connstr()])
def test_cannot_create_endpoint_on_non_uploaded_timeline(neon_env_builder: NeonEnvBuilder):
"""
Endpoint should not be possible to create because branch has not been uploaded.
"""
env = neon_env_builder.init_configs()
env.start()
env.pageserver.allowed_errors.append(
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*"
)
env.pageserver.allowed_errors.append(
".*page_service_conn_main.*: query handler for 'basebackup .* is not active, state: Loading"
)
ps_http = env.pageserver.http_client()
# pause all uploads
ps_http.configure_failpoints(("before-upload-index-pausable", "pause"))
ps_http.tenant_create(env.initial_tenant)
initial_branch = "initial_branch"
def start_creating_timeline():
with pytest.raises(RequestException):
ps_http.timeline_create(
env.pg_version, env.initial_tenant, env.initial_timeline, timeout=60
)
t = threading.Thread(target=start_creating_timeline)
try:
t.start()
wait_until_paused(env, "before-upload-index-pausable")
env.neon_cli.map_branch(initial_branch, env.initial_tenant, env.initial_timeline)
with pytest.raises(RuntimeError, match="is not active, state: Loading"):
env.endpoints.create_start(initial_branch, tenant_id=env.initial_tenant)
finally:
# FIXME: paused uploads bother shutdown
env.pageserver.stop(immediate=True)
t.join()
def test_cannot_branch_from_non_uploaded_branch(neon_env_builder: NeonEnvBuilder):
"""
Branch should not be possible to create because ancestor has not been uploaded.
"""
env = neon_env_builder.init_configs()
env.start()
env.pageserver.allowed_errors.append(
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*"
)
ps_http = env.pageserver.http_client()
# pause all uploads
ps_http.configure_failpoints(("before-upload-index-pausable", "pause"))
ps_http.tenant_create(env.initial_tenant)
def start_creating_timeline():
with pytest.raises(RequestException):
ps_http.timeline_create(
env.pg_version, env.initial_tenant, env.initial_timeline, timeout=60
)
t = threading.Thread(target=start_creating_timeline)
try:
t.start()
wait_until_paused(env, "before-upload-index-pausable")
branch_id = TimelineId.generate()
with pytest.raises(RetryError, match="too many 503 error responses"):
ps_http.timeline_create(
env.pg_version,
env.initial_tenant,
branch_id,
ancestor_timeline_id=env.initial_timeline,
)
with pytest.raises(
PageserverApiException,
match=f"NotFound: Timeline {env.initial_tenant}/{branch_id} was not found",
):
ps_http.timeline_detail(env.initial_tenant, branch_id)
# important to note that a task might still be in progress to complete
# the work, but will never get to that because we have the pause
# failpoint
finally:
# FIXME: paused uploads bother shutdown
env.pageserver.stop(immediate=True)
t.join()
def test_competing_branchings_from_loading_race_to_ok_or_err(neon_env_builder: NeonEnvBuilder):
"""
If the activate only after upload is used, then retries could become competing.
"""
env = neon_env_builder.init_configs()
env.start()
env.pageserver.allowed_errors.append(
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*"
)
env.pageserver.allowed_errors.append(
".*Error processing HTTP request: InternalServerError\\(Timeline .*/.* already exists in pageserver's memory"
)
ps_http = env.pageserver.http_client()
# pause all uploads
ps_http.configure_failpoints(("before-upload-index-pausable", "pause"))
ps_http.tenant_create(env.initial_tenant)
def start_creating_timeline():
ps_http.timeline_create(
env.pg_version, env.initial_tenant, env.initial_timeline, timeout=60
)
create_root = threading.Thread(target=start_creating_timeline)
branch_id = TimelineId.generate()
queue: SimpleQueue[Union[Dict[Any, Any], Exception]] = SimpleQueue()
barrier = threading.Barrier(3)
def try_branch():
barrier.wait()
barrier.wait()
try:
ret = ps_http.timeline_create(
env.pg_version,
env.initial_tenant,
branch_id,
ancestor_timeline_id=env.initial_timeline,
timeout=5,
)
queue.put(ret)
except Exception as e:
queue.put(e)
threads = [threading.Thread(target=try_branch) for _ in range(2)]
try:
create_root.start()
for t in threads:
t.start()
wait_until_paused(env, "before-upload-index-pausable")
barrier.wait()
ps_http.configure_failpoints(("before-upload-index-pausable", "off"))
barrier.wait()
# now both requests race to branch, only one can win because they take gc_cs, Tenant::timelines or marker files
first = queue.get()
second = queue.get()
log.info(first)
log.info(second)
(succeeded, failed) = (first, second) if isinstance(second, Exception) else (second, first)
assert isinstance(failed, Exception)
assert isinstance(succeeded, Dict)
# FIXME: there's probably multiple valid status codes:
# - Timeline 62505b9a9f6b1d29117b1b74eaf07b12/56cd19d3b2dbcc65e9d53ec6ca304f24 already exists
# - whatever 409 response says, but that is a subclass of PageserverApiException
assert isinstance(failed, PageserverApiException)
assert succeeded["state"] == "Active"
finally:
# we might still have the failpoint active
env.pageserver.stop(immediate=True)
# pytest should nag if we leave threads unjoined
for t in threads:
t.join()
create_root.join()
def test_non_uploaded_branch_availability_after_restart(neon_env_builder: NeonEnvBuilder):
"""
Currently before RFC#27 we keep and continue uploading branches which were not successfully uploaded before shutdown.
This test likely duplicates some other test, but it's easier to write one than to make sure there will be a failing test when the rfc is implemented.
"""
env = neon_env_builder.init_configs()
env.start()
env.pageserver.allowed_errors.append(
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*"
)
ps_http = env.pageserver.http_client()
# pause all uploads
ps_http.configure_failpoints(("before-upload-index-pausable", "pause"))
ps_http.tenant_create(env.initial_tenant)
def start_creating_timeline():
with pytest.raises(RequestException):
ps_http.timeline_create(
env.pg_version, env.initial_tenant, env.initial_timeline, timeout=60
)
t = threading.Thread(target=start_creating_timeline)
try:
t.start()
wait_until_paused(env, "before-upload-index-pausable")
finally:
# FIXME: paused uploads bother shutdown
env.pageserver.stop(immediate=True)
t.join()
# now without a failpoint
env.pageserver.start()
wait_until_tenant_active(ps_http, env.initial_tenant)
# currently it lives on and will get eventually uploaded, but this will change
detail = ps_http.timeline_detail(env.initial_tenant, env.initial_timeline)
assert detail["state"] == "Active"
def wait_until_paused(env: NeonEnv, failpoint: str):
found = False
msg = f"at failpoint {failpoint}"
for _ in range(20):
time.sleep(1)
found = env.pageserver.log_contains(msg) is not None
if found:
break
assert found