diff --git a/control_plane/src/bin/attachment_service.rs b/control_plane/src/bin/attachment_service.rs index be7cff352c..e50c8fbba0 100644 --- a/control_plane/src/bin/attachment_service.rs +++ b/control_plane/src/bin/attachment_service.rs @@ -201,6 +201,12 @@ async fn handle_validate(mut req: Request) -> Result, ApiEr // TODO(sharding): make this shard-aware if let Some(tenant_state) = locked.tenants.get(&req_tenant.id.tenant_id) { let valid = tenant_state.generation == req_tenant.gen; + tracing::info!( + "handle_validate: {}(gen {}): valid={valid} (latest {})", + req_tenant.id, + req_tenant.gen, + tenant_state.generation + ); response.tenants.push(ValidateResponseTenant { id: req_tenant.id, valid, @@ -250,6 +256,13 @@ async fn handle_attach_hook(mut req: Request) -> Result, Ap tenant_state.pageserver = attach_req.node_id; let generation = tenant_state.generation; + tracing::info!( + "handle_attach_hook: tenant {} set generation {}, pageserver {}", + attach_req.tenant_id, + tenant_state.generation, + attach_req.node_id.unwrap_or(utils::id::NodeId(0xfffffff)) + ); + locked.save().await.map_err(ApiError::InternalServerError)?; json_response( diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 2234a06501..495a58e865 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -323,6 +323,7 @@ impl TenantConfigRequest { #[derive(Debug, Deserialize)] pub struct TenantAttachRequest { + #[serde(default)] pub config: TenantAttachConfig, #[serde(default)] pub generation: Option, @@ -330,7 +331,7 @@ pub struct TenantAttachRequest { /// Newtype to enforce deny_unknown_fields on TenantConfig for /// its usage inside `TenantAttachRequest`. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Default)] #[serde(deny_unknown_fields)] pub struct TenantAttachConfig { #[serde(flatten)] diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index 1be50ce565..fccc78de20 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -99,27 +99,35 @@ impl LocalFs { }; // If we were given a directory, we may use it as our starting point. - // Otherwise, we must go up to the parent directory. This is because + // Otherwise, we must go up to the first ancestor dir that exists. This is because // S3 object list prefixes can be arbitrary strings, but when reading // the local filesystem we need a directory to start calling read_dir on. let mut initial_dir = full_path.clone(); - match fs::metadata(full_path.clone()).await { - Ok(meta) => { - if !meta.is_dir() { + loop { + // Did we make it to the root? + if initial_dir.parent().is_none() { + anyhow::bail!("list_files: failed to find valid ancestor dir for {full_path}"); + } + + match fs::metadata(initial_dir.clone()).await { + Ok(meta) if meta.is_dir() => { + // We found a directory, break + break; + } + Ok(_meta) => { // It's not a directory: strip back to the parent initial_dir.pop(); } - } - Err(e) if e.kind() == ErrorKind::NotFound => { - // It's not a file that exists: strip the prefix back to the parent directory - initial_dir.pop(); - } - Err(e) => { - // Unexpected I/O error - anyhow::bail!(e) + Err(e) if e.kind() == ErrorKind::NotFound => { + // It's not a file that exists: strip the prefix back to the parent directory + initial_dir.pop(); + } + Err(e) => { + // Unexpected I/O error + anyhow::bail!(e) + } } } - // Note that Utf8PathBuf starts_with only considers full path segments, but // object prefixes are arbitrary strings, so we need the strings for doing // starts_with later. diff --git a/pageserver/src/deletion_queue/list_writer.rs b/pageserver/src/deletion_queue/list_writer.rs index 7ff27ceb44..3a3d600ac2 100644 --- a/pageserver/src/deletion_queue/list_writer.rs +++ b/pageserver/src/deletion_queue/list_writer.rs @@ -312,7 +312,18 @@ impl ListWriter { for (tenant_shard_id, tenant_list) in &mut deletion_list.tenants { if let Some(attached_gen) = attached_tenants.get(tenant_shard_id) { if attached_gen.previous() == tenant_list.generation { + info!( + seq=%s, tenant_id=%tenant_shard_id.tenant_id, + shard_id=%tenant_shard_id.shard_slug(), + old_gen=?tenant_list.generation, new_gen=?attached_gen, + "Updating gen on recovered list"); tenant_list.generation = *attached_gen; + } else { + info!( + seq=%s, tenant_id=%tenant_shard_id.tenant_id, + shard_id=%tenant_shard_id.shard_slug(), + old_gen=?tenant_list.generation, new_gen=?attached_gen, + "Encountered stale generation on recovered list"); } } } diff --git a/scripts/export_import_between_pageservers.py b/scripts/export_import_between_pageservers.py index 77e4310eac..ff584bd4b0 100755 --- a/scripts/export_import_between_pageservers.py +++ b/scripts/export_import_between_pageservers.py @@ -266,9 +266,7 @@ class NeonPageserverHttpClient(requests.Session): def tenant_create(self, new_tenant_id: uuid.UUID, ok_if_exists): res = self.post( f"http://{self.host}:{self.port}/v1/tenant", - json={ - "new_tenant_id": new_tenant_id.hex, - }, + json={"new_tenant_id": new_tenant_id.hex, "generation": 1}, ) if res.status_code == 409: diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 9545dc2dd5..617d4806b2 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -455,7 +455,7 @@ class NeonEnvBuilder: self.preserve_database_files = preserve_database_files self.initial_tenant = initial_tenant or TenantId.generate() self.initial_timeline = initial_timeline or TimelineId.generate() - self.enable_generations = False + self.enable_generations = True self.scrub_on_exit = False self.test_output_dir = test_output_dir @@ -1571,6 +1571,20 @@ class NeonAttachmentService: ) response.raise_for_status() + def inspect(self, tenant_id: TenantId) -> Optional[tuple[int, int]]: + response = requests.post( + f"{self.env.control_plane_api}/inspect", + json={"tenant_id": str(tenant_id)}, + ) + response.raise_for_status() + json = response.json() + log.info(f"Response: {json}") + if json["attachment"]: + # Explicit int() to make python type linter happy + return (int(json["attachment"][0]), int(json["attachment"][1])) + else: + return None + def __enter__(self) -> "NeonAttachmentService": return self @@ -1769,13 +1783,10 @@ class NeonPageserver(PgProtocol): Tenant attachment passes through here to acquire a generation number before proceeding to call into the pageserver HTTP client. """ - if self.env.attachment_service is not None: - generation = self.env.attachment_service.attach_hook_issue(tenant_id, self.id) - else: - generation = None - client = self.http_client() - return client.tenant_attach(tenant_id, config, config_null, generation=generation) + return client.tenant_attach( + tenant_id, config, config_null, generation=self.maybe_get_generation(tenant_id) + ) def tenant_detach(self, tenant_id: TenantId): if self.env.attachment_service is not None: @@ -1784,6 +1795,34 @@ class NeonPageserver(PgProtocol): client = self.http_client() return client.tenant_detach(tenant_id) + def tenant_create( + self, + tenant_id: TenantId, + conf: Optional[Dict[str, Any]] = None, + auth_token: Optional[str] = None, + ) -> TenantId: + client = self.http_client(auth_token=auth_token) + return client.tenant_create( + tenant_id, conf, generation=self.maybe_get_generation(tenant_id) + ) + + def tenant_load(self, tenant_id: TenantId): + client = self.http_client() + return client.tenant_load(tenant_id, generation=self.maybe_get_generation(tenant_id)) + + def maybe_get_generation(self, tenant_id: TenantId): + """ + For tests that would like to use an HTTP client directly instead of using + the `tenant_attach` and `tenant_create` helpers here: issue a generation + number for a tenant. + + Returns None if the attachment service is not enabled (legacy mode) + """ + if self.env.attachment_service is not None: + return self.env.attachment_service.attach_hook_issue(tenant_id, self.id) + else: + return None + def append_pageserver_param_overrides( params_to_update: List[str], diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index eccab5fb6a..3e75bac424 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -210,16 +210,25 @@ class PageserverHttpClient(requests.Session): return res_json def tenant_create( - self, new_tenant_id: TenantId, conf: Optional[Dict[str, Any]] = None + self, + new_tenant_id: TenantId, + conf: Optional[Dict[str, Any]] = None, + generation: Optional[int] = None, ) -> TenantId: if conf is not None: assert "new_tenant_id" not in conf.keys() + + body: Dict[str, Any] = { + "new_tenant_id": str(new_tenant_id), + **(conf or {}), + } + + if generation is not None: + body.update({"generation": generation}) + res = self.post( f"http://localhost:{self.port}/v1/tenant", - json={ - "new_tenant_id": str(new_tenant_id), - **(conf or {}), - }, + json=body, ) self.verbose_error(res) if res.status_code == 409: @@ -273,8 +282,11 @@ class PageserverHttpClient(requests.Session): self.verbose_error(res) return res - def tenant_load(self, tenant_id: TenantId): - res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/load") + def tenant_load(self, tenant_id: TenantId, generation=None): + body = None + if generation is not None: + body = {"generation": generation} + res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/load", json=body) self.verbose_error(res) def tenant_ignore(self, tenant_id: TenantId): diff --git a/test_runner/fixtures/pageserver/types.py b/test_runner/fixtures/pageserver/types.py index 30e3f527bf..b3c1174b35 100644 --- a/test_runner/fixtures/pageserver/types.py +++ b/test_runner/fixtures/pageserver/types.py @@ -6,9 +6,8 @@ from fixtures.types import KEY_MAX, KEY_MIN, Key, Lsn @dataclass class IndexLayerMetadata: - @classmethod - def from_json(cls, d: Dict[str, Any]): - return {} + file_size: int + generation: int @dataclass(frozen=True) @@ -139,7 +138,7 @@ class IndexPartDump: def from_json(cls, d: Dict[str, Any]) -> "IndexPartDump": return IndexPartDump( layer_metadata={ - parse_layer_file_name(n): IndexLayerMetadata.from_json(v) + parse_layer_file_name(n): IndexLayerMetadata(v["file_size"], v["generation"]) for n, v in d["layer_metadata"].items() }, disk_consistent_lsn=Lsn(d["disk_consistent_lsn"]), diff --git a/test_runner/fixtures/remote_storage.py b/test_runner/fixtures/remote_storage.py index 954c3142a3..d8361133d7 100644 --- a/test_runner/fixtures/remote_storage.py +++ b/test_runner/fixtures/remote_storage.py @@ -12,7 +12,6 @@ import boto3 from mypy_boto3_s3 import S3Client from fixtures.log_helper import log -from fixtures.pageserver.types import LayerFileName from fixtures.types import TenantId, TimelineId TIMELINE_INDEX_PART_FILE_NAME = "index_part.json" @@ -88,13 +87,46 @@ class LocalFsStorage: def timeline_path(self, tenant_id: TenantId, timeline_id: TimelineId) -> Path: return self.tenant_path(tenant_id) / "timelines" / str(timeline_id) - def layer_path( - self, tenant_id: TenantId, timeline_id: TimelineId, layer_file_name: LayerFileName - ): - return self.timeline_path(tenant_id, timeline_id) / layer_file_name.to_str() + def timeline_latest_generation(self, tenant_id, timeline_id): + timeline_files = os.listdir(self.timeline_path(tenant_id, timeline_id)) + index_parts = [f for f in timeline_files if f.startswith("index_part")] + + def parse_gen(filename): + log.info(f"parsing index_part '{filename}'") + parts = filename.split("-") + if len(parts) == 2: + return int(parts[1], 16) + else: + return None + + generations = sorted([parse_gen(f) for f in index_parts]) + if len(generations) == 0: + raise RuntimeError(f"No index_part found for {tenant_id}/{timeline_id}") + return generations[-1] def index_path(self, tenant_id: TenantId, timeline_id: TimelineId) -> Path: - return self.timeline_path(tenant_id, timeline_id) / TIMELINE_INDEX_PART_FILE_NAME + latest_gen = self.timeline_latest_generation(tenant_id, timeline_id) + if latest_gen is None: + filename = TIMELINE_INDEX_PART_FILE_NAME + else: + filename = f"{TIMELINE_INDEX_PART_FILE_NAME}-{latest_gen:08x}" + + return self.timeline_path(tenant_id, timeline_id) / filename + + def remote_layer_path( + self, + tenant_id: TenantId, + timeline_id: TimelineId, + local_name: str, + generation: Optional[int] = None, + ): + if generation is None: + generation = self.timeline_latest_generation(tenant_id, timeline_id) + + assert generation is not None, "Cannot calculate remote layer path without generation" + + filename = f"{local_name}-{generation:08x}" + return self.timeline_path(tenant_id, timeline_id) / filename def index_content(self, tenant_id: TenantId, timeline_id: TimelineId): with self.index_path(tenant_id, timeline_id).open("r") as f: diff --git a/test_runner/regress/test_attach_tenant_config.py b/test_runner/regress/test_attach_tenant_config.py index 10cffb1d6c..70d386a566 100644 --- a/test_runner/regress/test_attach_tenant_config.py +++ b/test_runner/regress/test_attach_tenant_config.py @@ -100,7 +100,6 @@ def test_config_with_unknown_keys_is_bad_request(negative_env: NegativeTests): env = negative_env.neon_env tenant_id = negative_env.tenant_id - ps_http = env.pageserver.http_client() config_with_unknown_keys = { "compaction_period": "1h", @@ -108,16 +107,16 @@ def test_config_with_unknown_keys_is_bad_request(negative_env: NegativeTests): } with pytest.raises(PageserverApiException) as e: - ps_http.tenant_attach(tenant_id, config=config_with_unknown_keys) + env.pageserver.tenant_attach(tenant_id, config=config_with_unknown_keys) assert e.type == PageserverApiException assert e.value.status_code == 400 @pytest.mark.parametrize("content_type", [None, "application/json"]) -def test_empty_body(positive_env: NeonEnv, content_type: Optional[str]): +def test_no_config(positive_env: NeonEnv, content_type: Optional[str]): """ - For backwards-compatibility: if we send an empty body, - the request should be accepted and the config should be the default config. + When the 'config' body attribute is omitted, the request should be accepted + and the tenant should use the default configuration """ env = positive_env ps_http = env.pageserver.http_client() @@ -128,9 +127,14 @@ def test_empty_body(positive_env: NeonEnv, content_type: Optional[str]): ps_http.tenant_detach(tenant_id) assert tenant_id not in [TenantId(t["id"]) for t in ps_http.tenant_list()] + body = {} + gen = env.pageserver.maybe_get_generation(tenant_id) + if gen is not None: + body["generation"] = gen + ps_http.post( f"{ps_http.base_url}/v1/tenant/{tenant_id}/attach", - data=b"", + json=body, headers=None if content_type else {"Content-Type": "application/json"}, ).raise_for_status() @@ -191,7 +195,7 @@ def test_fully_custom_config(positive_env: NeonEnv): }, "ensure our custom config has different values than the default config for all config options, so we know we overrode everything" ps_http.tenant_detach(tenant_id) - ps_http.tenant_attach(tenant_id, config=fully_custom_config) + env.pageserver.tenant_attach(tenant_id, config=fully_custom_config) assert ps_http.tenant_config(tenant_id).tenant_specific_overrides == fully_custom_config assert set(ps_http.tenant_config(tenant_id).effective_config.keys()) == set( diff --git a/test_runner/regress/test_auth.py b/test_runner/regress/test_auth.py index f729bdee98..7487106c44 100644 --- a/test_runner/regress/test_auth.py +++ b/test_runner/regress/test_auth.py @@ -60,14 +60,14 @@ def test_pageserver_auth(neon_env_builder: NeonEnvBuilder): assert_client_authorized(env, invalid_tenant_http_client) # create tenant using management token - pageserver_http_client.tenant_create(TenantId.generate()) + env.pageserver.tenant_create(TenantId.generate(), auth_token=pageserver_token) # fail to create tenant using tenant token with pytest.raises( PageserverApiException, match="Forbidden: JWT authentication error", ): - tenant_http_client.tenant_create(TenantId.generate()) + env.pageserver.tenant_create(TenantId.generate(), auth_token=tenant_token) def test_compute_auth_to_pageserver(neon_env_builder: NeonEnvBuilder): diff --git a/test_runner/regress/test_branching.py b/test_runner/regress/test_branching.py index c4f743204e..a908dd713a 100644 --- a/test_runner/regress/test_branching.py +++ b/test_runner/regress/test_branching.py @@ -158,7 +158,7 @@ def test_cannot_create_endpoint_on_non_uploaded_timeline(neon_env_builder: NeonE # pause all uploads ps_http.configure_failpoints(("before-upload-index-pausable", "pause")) - ps_http.tenant_create(env.initial_tenant) + env.pageserver.tenant_create(env.initial_tenant) initial_branch = "initial_branch" @@ -200,7 +200,7 @@ def test_cannot_branch_from_non_uploaded_branch(neon_env_builder: NeonEnvBuilder # pause all uploads ps_http.configure_failpoints(("before-upload-index-pausable", "pause")) - ps_http.tenant_create(env.initial_tenant) + env.pageserver.tenant_create(env.initial_tenant) def start_creating_timeline(): with pytest.raises(RequestException): @@ -257,7 +257,7 @@ def test_competing_branchings_from_loading_race_to_ok_or_err(neon_env_builder: N # pause all uploads ps_http.configure_failpoints(("before-upload-index-pausable", "pause")) - ps_http.tenant_create(env.initial_tenant) + env.pageserver.tenant_create(env.initial_tenant) def start_creating_timeline(): ps_http.timeline_create( @@ -343,8 +343,7 @@ def test_non_uploaded_root_timeline_is_deleted_after_restart(neon_env_builder: N ) ps_http = env.pageserver.http_client() - # pause all uploads - ps_http.tenant_create(env.initial_tenant) + env.pageserver.tenant_create(env.initial_tenant) # Create a timeline whose creation will succeed. The tenant will need at least one # timeline to be loadable. @@ -397,7 +396,7 @@ def test_non_uploaded_branch_is_deleted_after_restart(neon_env_builder: NeonEnvB ) ps_http = env.pageserver.http_client() - ps_http.tenant_create(env.initial_tenant) + env.pageserver.tenant_create(env.initial_tenant) ps_http.timeline_create(env.pg_version, env.initial_tenant, env.initial_timeline) # pause all uploads diff --git a/test_runner/regress/test_broken_timeline.py b/test_runner/regress/test_broken_timeline.py index 84a322039a..53eeb8bbe9 100644 --- a/test_runner/regress/test_broken_timeline.py +++ b/test_runner/regress/test_broken_timeline.py @@ -160,7 +160,7 @@ def test_timeline_init_break_before_checkpoint_recreate( ] ) - pageserver_http.tenant_create(env.initial_tenant) + env.pageserver.tenant_create(env.initial_tenant) tenant_id = env.initial_tenant timelines_dir = env.pageserver.timeline_dir(tenant_id) diff --git a/test_runner/regress/test_change_pageserver.py b/test_runner/regress/test_change_pageserver.py index 410bf03c2b..1b6c982850 100644 --- a/test_runner/regress/test_change_pageserver.py +++ b/test_runner/regress/test_change_pageserver.py @@ -14,6 +14,11 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder): ) env = neon_env_builder.init_start() + for pageserver in env.pageservers: + # This test dual-attaches a tenant, one of the pageservers will therefore + # be running with a stale generation. + pageserver.allowed_errors.append(".*Dropped remote consistent LSN updates.*") + env.neon_cli.create_branch("test_change_pageserver") endpoint = env.endpoints.create_start("test_change_pageserver") @@ -79,6 +84,11 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder): # Try failing back, and this time we will stop the current pageserver before reconfiguring # the endpoint. Whereas the previous reconfiguration was like a healthy migration, this # is more like what happens in an unexpected pageserver failure. + # + # Since we're dual-attached, need to tip-off attachment service to treat the one we're + # about to start as the attached pageserver + assert env.attachment_service is not None + env.attachment_service.attach_hook_issue(env.initial_tenant, env.pageservers[0].id) env.pageservers[0].start() env.pageservers[1].stop() @@ -88,6 +98,9 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder): assert fetchone() == (100000,) env.pageservers[0].stop() + # Since we're dual-attached, need to tip-off attachment service to treat the one we're + # about to start as the attached pageserver + env.attachment_service.attach_hook_issue(env.initial_tenant, env.pageservers[1].id) env.pageservers[1].start() # Test a (former) bug where a child process spins without updating its connection string diff --git a/test_runner/regress/test_duplicate_layers.py b/test_runner/regress/test_duplicate_layers.py index bcf99cae7c..224e6f50c7 100644 --- a/test_runner/regress/test_duplicate_layers.py +++ b/test_runner/regress/test_duplicate_layers.py @@ -112,7 +112,9 @@ def test_actually_duplicated_l1(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) assert l1_found is not None, "failed to find L1 locally" - uploaded = env.pageserver_remote_storage.timeline_path(tenant_id, timeline_id) / l1_found.name + uploaded = env.pageserver_remote_storage.remote_layer_path( + tenant_id, timeline_id, l1_found.name + ) assert not uploaded.exists(), "to-be-overwritten should not yet be uploaded" env.pageserver.start() @@ -139,4 +141,7 @@ def test_actually_duplicated_l1(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) wait_for_upload_queue_empty(pageserver_http, tenant_id, timeline_id) + uploaded = env.pageserver_remote_storage.remote_layer_path( + tenant_id, timeline_id, l1_found.name + ) assert uploaded.exists(), "the L1 is uploaded" diff --git a/test_runner/regress/test_import.py b/test_runner/regress/test_import.py index 8da5f1eec2..920e8d0b72 100644 --- a/test_runner/regress/test_import.py +++ b/test_runner/regress/test_import.py @@ -84,8 +84,7 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS) env = neon_env_builder.init_start() - client = env.pageserver.http_client() - client.tenant_create(tenant) + env.pageserver.tenant_create(tenant) env.pageserver.allowed_errors.extend( [ @@ -149,6 +148,7 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build ".*WARN.*ignored .* unexpected bytes after the tar archive.*" ) + client = env.pageserver.http_client() timeline_delete_wait_completed(client, tenant, timeline) # Importing correct backup works @@ -292,7 +292,7 @@ def _import( # Import to pageserver endpoint_id = "ep-import_from_pageserver" client = env.pageserver.http_client() - client.tenant_create(tenant) + env.pageserver.tenant_create(tenant) env.neon_cli.raw_cli( [ "timeline", diff --git a/test_runner/regress/test_layers_from_future.py b/test_runner/regress/test_layers_from_future.py index 7ce0bdaeba..ef2b2185c3 100644 --- a/test_runner/regress/test_layers_from_future.py +++ b/test_runner/regress/test_layers_from_future.py @@ -149,19 +149,28 @@ def test_issue_5878(neon_env_builder: NeonEnvBuilder): f"got layer from the future: lsn={future_layer.lsn} disk_consistent_lsn={ip.disk_consistent_lsn} last_record_lsn={last_record_lsn}" ) assert isinstance(env.pageserver_remote_storage, LocalFsStorage) - future_layer_path = env.pageserver_remote_storage.layer_path( - tenant_id, timeline_id, future_layer + future_layer_path = env.pageserver_remote_storage.remote_layer_path( + tenant_id, timeline_id, future_layer.to_str() ) log.info(f"future layer path: {future_layer_path}") pre_stat = future_layer_path.stat() time.sleep(1.1) # so that we can use change in pre_stat.st_mtime to detect overwrites + def get_generation_number(): + assert env.attachment_service is not None + attachment = env.attachment_service.inspect(tenant_id) + assert attachment is not None + return attachment[0] + # force removal of layers from the future tenant_conf = ps_http.tenant_config(tenant_id) - ps_http.tenant_detach(tenant_id) + generation_before_detach = get_generation_number() + env.pageserver.tenant_detach(tenant_id) failpoint_name = "before-delete-layer-pausable" + ps_http.configure_failpoints((failpoint_name, "pause")) - ps_http.tenant_attach(tenant_id, tenant_conf.tenant_specific_overrides) + env.pageserver.tenant_attach(tenant_id, tenant_conf.tenant_specific_overrides) + generation_after_reattach = get_generation_number() wait_until_tenant_active(ps_http, tenant_id) # Ensure the IndexPart upload that unlinks the layer file finishes, i.e., doesn't clog the queue. @@ -177,6 +186,10 @@ def test_issue_5878(neon_env_builder: NeonEnvBuilder): assert env.pageserver.log_contains(f".*{tenant_id}.*at failpoint.*{failpoint_name}") wait_until(10, 0.5, delete_at_pause_point) + future_layer_path = env.pageserver_remote_storage.remote_layer_path( + tenant_id, timeline_id, future_layer.to_str(), generation=generation_before_detach + ) + log.info(f"future layer path: {future_layer_path}") assert future_layer_path.exists() # wait for re-ingestion of the WAL from safekeepers into the in-memory layer @@ -215,12 +228,17 @@ def test_issue_5878(neon_env_builder: NeonEnvBuilder): # Examine the resulting S3 state. log.info("integrity-check the remote storage") ip = get_index_part() - for layer_file_name in ip.layer_metadata.keys(): - layer_path = env.pageserver_remote_storage.layer_path( - tenant_id, timeline_id, layer_file_name + for layer_file_name, layer_metadata in ip.layer_metadata.items(): + log.info(f"Layer metadata {layer_file_name.to_str()}: {layer_metadata}") + layer_path = env.pageserver_remote_storage.remote_layer_path( + tenant_id, timeline_id, layer_file_name.to_str(), layer_metadata.generation ) assert layer_path.exists(), f"{layer_file_name.to_str()}" log.info("assert that the overwritten layer won") + future_layer_path = env.pageserver_remote_storage.remote_layer_path( + tenant_id, timeline_id, future_layer.to_str(), generation=generation_after_reattach + ) final_stat = future_layer_path.stat() + log.info(f"future layer path: {future_layer_path}") assert final_stat.st_mtime != pre_stat.st_mtime diff --git a/test_runner/regress/test_neon_cli.py b/test_runner/regress/test_neon_cli.py index de18ea0e6b..16d120e24a 100644 --- a/test_runner/regress/test_neon_cli.py +++ b/test_runner/regress/test_neon_cli.py @@ -133,6 +133,7 @@ def test_cli_start_stop(neon_env_builder: NeonEnvBuilder): # Stop default ps/sk env.neon_cli.pageserver_stop(env.pageserver.id) env.neon_cli.safekeeper_stop() + env.neon_cli.attachment_service_stop(False) # Keep NeonEnv state up to date, it usually owns starting/stopping services env.pageserver.running = False @@ -173,6 +174,9 @@ def test_cli_start_stop_multi(neon_env_builder: NeonEnvBuilder): env.neon_cli.safekeeper_stop(neon_env_builder.safekeepers_id_start + 1) env.neon_cli.safekeeper_stop(neon_env_builder.safekeepers_id_start + 2) + # Stop this to get out of the way of the following `start` + env.neon_cli.attachment_service_stop(False) + # Default start res = env.neon_cli.raw_cli(["start"]) res.check_returncode() diff --git a/test_runner/regress/test_pageserver_api.py b/test_runner/regress/test_pageserver_api.py index 2d83788193..64e41a2dd5 100644 --- a/test_runner/regress/test_pageserver_api.py +++ b/test_runner/regress/test_pageserver_api.py @@ -8,7 +8,6 @@ from fixtures.neon_fixtures import ( NeonEnvBuilder, ) from fixtures.pageserver.http import PageserverHttpClient -from fixtures.pg_version import PgVersion from fixtures.types import Lsn, TenantId, TimelineId from fixtures.utils import wait_until @@ -62,7 +61,10 @@ def test_pageserver_init_node_id( assert "has node id already, it cannot be overridden" in bad_update.stderr -def check_client(pg_version: PgVersion, client: PageserverHttpClient, initial_tenant: TenantId): +def check_client(env: NeonEnv, client: PageserverHttpClient): + pg_version = env.pg_version + initial_tenant = env.initial_tenant + client.check_status() # check initial tenant is there @@ -70,7 +72,7 @@ def check_client(pg_version: PgVersion, client: PageserverHttpClient, initial_te # create new tenant and check it is also there tenant_id = TenantId.generate() - client.tenant_create(tenant_id) + client.tenant_create(tenant_id, generation=env.pageserver.maybe_get_generation(tenant_id)) assert tenant_id in {TenantId(t["id"]) for t in client.tenant_list()} timelines = client.timeline_list(tenant_id) @@ -181,7 +183,7 @@ def test_pageserver_http_get_wal_receiver_success(neon_simple_env: NeonEnv): def test_pageserver_http_api_client(neon_simple_env: NeonEnv): env = neon_simple_env with env.pageserver.http_client() as client: - check_client(env.pg_version, client, env.initial_tenant) + check_client(env, client) def test_pageserver_http_api_client_auth_enabled(neon_env_builder: NeonEnvBuilder): @@ -191,4 +193,4 @@ def test_pageserver_http_api_client_auth_enabled(neon_env_builder: NeonEnvBuilde pageserver_token = env.auth_keys.generate_pageserver_token() with env.pageserver.http_client(auth_token=pageserver_token) as client: - check_client(env.pg_version, client, env.initial_tenant) + check_client(env, client) diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index 67daf5f901..0a5046e219 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -23,7 +23,6 @@ from fixtures.pageserver.utils import ( wait_until_tenant_state, ) from fixtures.remote_storage import ( - TIMELINE_INDEX_PART_FILE_NAME, LocalFsStorage, RemoteStorageKind, available_remote_storages, @@ -350,6 +349,13 @@ def test_remote_storage_upload_queue_retries( env.pageserver.stop(immediate=True) env.endpoints.stop_all() + # We are about to forcibly drop local dirs. Attachment service will increment generation in re-attach before + # we later increment when actually attaching it again, leading to skipping a generation and potentially getting + # these warnings if there was a durable but un-executed deletion list at time of restart. + env.pageserver.allowed_errors.extend( + [".*Dropped remote consistent LSN updates.*", ".*Dropping stale deletions.*"] + ) + dir_to_clear = env.pageserver.tenant_dir() shutil.rmtree(dir_to_clear) os.mkdir(dir_to_clear) @@ -648,7 +654,7 @@ def test_empty_branch_remote_storage_upload(neon_env_builder: NeonEnvBuilder): ), f"Expected to have an initial timeline and the branch timeline only, but got {timelines_before_detach}" client.tenant_detach(env.initial_tenant) - client.tenant_attach(env.initial_tenant) + env.pageserver.tenant_attach(env.initial_tenant) wait_until_tenant_state(client, env.initial_tenant, "Active", 5) timelines_after_detach = set( @@ -758,10 +764,11 @@ def test_empty_branch_remote_storage_upload_on_restart(neon_env_builder: NeonEnv # this is because creating a timeline always awaits for the uploads to complete assert_nothing_to_upload(client, env.initial_tenant, new_branch_timeline_id) - assert ( - new_branch_on_remote_storage / TIMELINE_INDEX_PART_FILE_NAME + assert env.pageserver_remote_storage.index_path( + env.initial_tenant, new_branch_timeline_id ).is_file(), "uploads scheduled during initial load should had been awaited for" finally: + barrier.abort() create_thread.join() diff --git a/test_runner/regress/test_tenant_conf.py b/test_runner/regress/test_tenant_conf.py index fcc3243e81..f4565c2ee2 100644 --- a/test_runner/regress/test_tenant_conf.py +++ b/test_runner/regress/test_tenant_conf.py @@ -314,7 +314,7 @@ def test_creating_tenant_conf_after_attach(neon_env_builder: NeonEnvBuilder): assert not config_path.exists(), "detach did not remove config file" - http_client.tenant_attach(tenant_id) + env.pageserver.tenant_attach(tenant_id) wait_until( number_of_iterations=5, interval=1, diff --git a/test_runner/regress/test_tenant_delete.py b/test_runner/regress/test_tenant_delete.py index 89c474286a..3929f59e46 100644 --- a/test_runner/regress/test_tenant_delete.py +++ b/test_runner/regress/test_tenant_delete.py @@ -380,7 +380,7 @@ def test_tenant_delete_is_resumed_on_attach( env.pageserver.start() # now we call attach - ps_http.tenant_attach(tenant_id=tenant_id) + env.pageserver.tenant_attach(tenant_id=tenant_id) # delete should be resumed wait_tenant_status_404(ps_http, tenant_id, iterations) @@ -419,7 +419,7 @@ def test_long_timeline_create_cancelled_by_tenant_delete(neon_env_builder: NeonE f".*Error processing HTTP request: InternalServerError\\(new timeline {env.initial_tenant}/{env.initial_timeline} has invalid disk_consistent_lsn" ) - pageserver_http.tenant_create(env.initial_tenant) + env.pageserver.tenant_create(env.initial_tenant) failpoint = "flush-layer-cancel-after-writing-layer-out-pausable" pageserver_http.configure_failpoints((failpoint, "pause")) diff --git a/test_runner/regress/test_tenant_detach.py b/test_runner/regress/test_tenant_detach.py index df497c0f7b..5c57fb233b 100644 --- a/test_runner/regress/test_tenant_detach.py +++ b/test_runner/regress/test_tenant_detach.py @@ -82,6 +82,10 @@ def test_tenant_reattach( env.pageserver.allowed_errors.extend(PERMIT_PAGE_SERVICE_ERRORS) + # Our re-attach may race with the deletion queue processing LSN updates + # from the original attachment. + env.pageserver.allowed_errors.append(".*Dropped remote consistent LSN updates.*") + with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint: with endpoint.cursor() as cur: cur.execute("CREATE TABLE t(key int primary key, value text)") @@ -112,8 +116,8 @@ def test_tenant_reattach( if mode == ReattachMode.REATTACH_EXPLICIT: # Explicitly detach then attach the tenant as two separate API calls - pageserver_http.tenant_detach(tenant_id) - pageserver_http.tenant_attach(tenant_id) + env.pageserver.tenant_detach(tenant_id) + env.pageserver.tenant_attach(tenant_id) elif mode in (ReattachMode.REATTACH_RESET, ReattachMode.REATTACH_RESET_DROP): # Use the reset API to detach/attach in one shot pageserver_http.tenant_reset(tenant_id, mode == ReattachMode.REATTACH_RESET_DROP) @@ -192,6 +196,9 @@ def test_tenant_reattach_while_busy( updates_finished = 0 updates_to_perform = 0 + neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind) + env = neon_env_builder.init_start() + # Run random UPDATEs on test table. On failure, try again. async def update_table(pg_conn: asyncpg.Connection): nonlocal updates_started, updates_finished, updates_to_perform @@ -223,7 +230,7 @@ def test_tenant_reattach_while_busy( pageserver_http.tenant_detach(tenant_id) await asyncio.sleep(1) log.info("Re-attaching tenant") - pageserver_http.tenant_attach(tenant_id) + env.pageserver.tenant_attach(tenant_id) log.info("Re-attach finished") # Continue with 5000 more updates @@ -244,9 +251,6 @@ def test_tenant_reattach_while_busy( assert updates_finished == updates_to_perform - neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind) - env = neon_env_builder.init_start() - pageserver_http = env.pageserver.http_client() # create new nenant @@ -454,6 +458,10 @@ def test_detach_while_attaching( env.pageserver.allowed_errors.extend(PERMIT_PAGE_SERVICE_ERRORS) + # Our re-attach may race with the deletion queue processing LSN updates + # from the original attachment. + env.pageserver.allowed_errors.append(".*Dropped remote consistent LSN updates.*") + # Create table, and insert some rows. Make it big enough that it doesn't fit in # shared_buffers, otherwise the SELECT after restart will just return answer # from shared_buffers without hitting the page server, which defeats the point @@ -487,7 +495,7 @@ def test_detach_while_attaching( # And re-attach pageserver_http.configure_failpoints([("attach-before-activate", "return(5000)")]) - pageserver_http.tenant_attach(tenant_id) + env.pageserver.tenant_attach(tenant_id) # Before it has chance to finish, detach it again pageserver_http.tenant_detach(tenant_id) @@ -497,7 +505,7 @@ def test_detach_while_attaching( # Attach it again. If the GC and compaction loops from the previous attach/detach # cycle are still running, things could get really confusing.. - pageserver_http.tenant_attach(tenant_id) + env.pageserver.tenant_attach(tenant_id) with endpoint.cursor() as cur: cur.execute("SELECT COUNT(*) FROM foo") @@ -556,7 +564,7 @@ def test_ignored_tenant_reattach(neon_env_builder: NeonEnvBuilder): ), "Ignored tenant should not be reloaded after pageserver restart" # now, load it from the local files and expect it works - pageserver_http.tenant_load(tenant_id=ignored_tenant_id) + env.pageserver.tenant_load(tenant_id=ignored_tenant_id) wait_until_tenant_state(pageserver_http, ignored_tenant_id, "Active", 5) tenants_after_attach = [tenant["id"] for tenant in pageserver_http.tenant_list()] @@ -611,7 +619,7 @@ def test_ignored_tenant_download_missing_layers(neon_env_builder: NeonEnvBuilder assert layers_removed, f"Found no layers for tenant {timeline_dir}" # now, load it from the local files and expect it to work due to remote storage restoration - pageserver_http.tenant_load(tenant_id=tenant_id) + env.pageserver.tenant_load(tenant_id=tenant_id) wait_until_tenant_state(pageserver_http, tenant_id, "Active", 5) tenants_after_attach = [tenant["id"] for tenant in pageserver_http.tenant_list()] @@ -645,13 +653,13 @@ def test_load_attach_negatives(neon_env_builder: NeonEnvBuilder): expected_exception=PageserverApiException, match=f"tenant {tenant_id} already exists, state: Active", ): - pageserver_http.tenant_load(tenant_id) + env.pageserver.tenant_load(tenant_id) with pytest.raises( expected_exception=PageserverApiException, match=f"tenant {tenant_id} already exists, state: Active", ): - pageserver_http.tenant_attach(tenant_id) + env.pageserver.tenant_attach(tenant_id) pageserver_http.tenant_ignore(tenant_id) @@ -660,7 +668,7 @@ def test_load_attach_negatives(neon_env_builder: NeonEnvBuilder): expected_exception=PageserverApiException, match="tenant directory already exists", ): - pageserver_http.tenant_attach(tenant_id) + env.pageserver.tenant_attach(tenant_id) def test_ignore_while_attaching( @@ -679,6 +687,10 @@ def test_ignore_while_attaching( env.pageserver.allowed_errors.extend(PERMIT_PAGE_SERVICE_ERRORS) + # Our re-attach may race with the deletion queue processing LSN updates + # from the original attachment. + env.pageserver.allowed_errors.append(".*Dropped remote consistent LSN updates.*") + data_id = 1 data_secret = "very secret secret" insert_test_data(pageserver_http, tenant_id, timeline_id, data_id, data_secret, endpoint) @@ -689,7 +701,7 @@ def test_ignore_while_attaching( pageserver_http.tenant_detach(tenant_id) # And re-attach, but stop attach task_mgr task from completing pageserver_http.configure_failpoints([("attach-before-activate", "return(5000)")]) - pageserver_http.tenant_attach(tenant_id) + env.pageserver.tenant_attach(tenant_id) # Run ignore on the task, thereby cancelling the attach. # XXX This should take priority over attach, i.e., it should cancel the attach task. # But neither the failpoint, nor the proper remote_timeline_client download functions, @@ -704,7 +716,7 @@ def test_ignore_while_attaching( expected_exception=PageserverApiException, match="tenant directory already exists", ): - pageserver_http.tenant_attach(tenant_id) + env.pageserver.tenant_attach(tenant_id) tenants_after_ignore = [tenant["id"] for tenant in pageserver_http.tenant_list()] assert tenant_id not in tenants_after_ignore, "Ignored tenant should be missing" @@ -714,7 +726,7 @@ def test_ignore_while_attaching( # Calling load will bring the tenant back online pageserver_http.configure_failpoints([("attach-before-activate", "off")]) - pageserver_http.tenant_load(tenant_id) + env.pageserver.tenant_load(tenant_id) wait_until_tenant_state(pageserver_http, tenant_id, "Active", 5) @@ -818,7 +830,7 @@ def test_metrics_while_ignoring_broken_tenant_and_reloading( found_broken ), f"broken should still be in set, but it is not in the tenant state count: broken={broken}, broken_set={broken_set}" - client.tenant_load(env.initial_tenant) + env.pageserver.tenant_load(env.initial_tenant) found_active = False active, broken_set = ([], []) diff --git a/test_runner/regress/test_tenant_relocation.py b/test_runner/regress/test_tenant_relocation.py index 8be0f0449b..c7ff80d675 100644 --- a/test_runner/regress/test_tenant_relocation.py +++ b/test_runner/regress/test_tenant_relocation.py @@ -7,13 +7,8 @@ from pathlib import Path from typing import Any, Dict, Optional, Tuple import pytest -from fixtures.broker import NeonBroker from fixtures.log_helper import log -from fixtures.neon_fixtures import ( - Endpoint, - NeonEnv, - NeonEnvBuilder, -) +from fixtures.neon_fixtures import Endpoint, NeonEnvBuilder, NeonPageserver from fixtures.pageserver.http import PageserverHttpClient from fixtures.pageserver.utils import ( assert_tenant_state, @@ -30,7 +25,6 @@ from fixtures.remote_storage import ( from fixtures.types import Lsn, TenantId, TimelineId from fixtures.utils import ( query_scalar, - start_in_background, subprocess_capture, wait_until, ) @@ -40,58 +34,6 @@ def assert_abs_margin_ratio(a: float, b: float, margin_ratio: float): assert abs(a - b) / a < margin_ratio, abs(a - b) / a -@contextmanager -def new_pageserver_service( - new_pageserver_dir: Path, - pageserver_bin: Path, - remote_storage_mock_path: Path, - pg_port: int, - http_port: int, - broker: Optional[NeonBroker], - pg_distrib_dir: Path, -): - """ - cannot use NeonPageserver yet because it depends on neon cli - which currently lacks support for multiple pageservers - """ - # actually run new pageserver - cmd = [ - str(pageserver_bin), - "--workdir", - str(new_pageserver_dir), - "--update-config", - f"-c listen_pg_addr='localhost:{pg_port}'", - f"-c listen_http_addr='localhost:{http_port}'", - f"-c pg_distrib_dir='{pg_distrib_dir}'", - "-c id=2", - f"-c remote_storage={{local_path='{remote_storage_mock_path}'}}", - ] - if broker is not None: - cmd.append( - f"-c broker_endpoint='{broker.client_url()}'", - ) - pageserver_client = PageserverHttpClient( - port=http_port, - auth_token=None, - is_testing_enabled_or_skip=lambda: True, # TODO: check if testing really enabled - ) - try: - pageserver_process = start_in_background( - cmd, new_pageserver_dir, "pageserver.log", pageserver_client.check_status - ) - except Exception as e: - log.error(e) - pageserver_process.kill() - raise Exception(f"Failed to start pageserver as {cmd}, reason: {e}") from e - - log.info("new pageserver started") - try: - yield pageserver_process - finally: - log.info("stopping new pageserver") - pageserver_process.kill() - - @contextmanager def pg_cur(endpoint): with closing(endpoint.connect()) as conn: @@ -201,7 +143,7 @@ def check_timeline_attached( def switch_pg_to_new_pageserver( - env: NeonEnv, + origin_ps: NeonPageserver, endpoint: Endpoint, new_pageserver_port: int, tenant_id: TenantId, @@ -216,7 +158,7 @@ def switch_pg_to_new_pageserver( endpoint.start() - timeline_to_detach_local_path = env.pageserver.timeline_dir(tenant_id, timeline_id) + timeline_to_detach_local_path = origin_ps.timeline_dir(tenant_id, timeline_id) files_before_detach = os.listdir(timeline_to_detach_local_path) assert ( "metadata" in files_before_detach @@ -269,27 +211,32 @@ def test_tenant_relocation( with_load: str, ): neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS) + neon_env_builder.num_pageservers = 2 env = neon_env_builder.init_start() tenant_id = TenantId("74ee8b079a0e437eb0afea7d26a07209") # FIXME: Is this expected? - env.pageserver.allowed_errors.append( + env.pageservers[0].allowed_errors.append( ".*init_tenant_mgr: marking .* as locally complete, while it doesnt exist in remote index.*" ) - # Needed for detach polling. - env.pageserver.allowed_errors.append(f".*NotFound: tenant {tenant_id}.*") + # Needed for detach polling on the original pageserver + env.pageservers[0].allowed_errors.append(f".*NotFound: tenant {tenant_id}.*") + # We will dual-attach in this test, so stale generations are expected + env.pageservers[0].allowed_errors.append(".*Dropped remote consistent LSN updates.*") assert isinstance(env.pageserver_remote_storage, LocalFsStorage) - remote_storage_mock_path = env.pageserver_remote_storage.root # we use two branches to check that they are both relocated # first branch is used for load, compute for second one is used to # check that data is not lost - pageserver_http = env.pageserver.http_client() + origin_ps = env.pageservers[0] + destination_ps = env.pageservers[1] + origin_http = origin_ps.http_client() + destination_http = destination_ps.http_client() _, initial_timeline_id = env.neon_cli.create_tenant(tenant_id) log.info("tenant to relocate %s initial_timeline_id %s", tenant_id, initial_timeline_id) @@ -302,7 +249,7 @@ def test_tenant_relocation( timeline_id_main, current_lsn_main = populate_branch( ep_main, tenant_id=tenant_id, - ps_http=pageserver_http, + ps_http=origin_http, create_table=True, expected_sum=500500, ) @@ -320,17 +267,17 @@ def test_tenant_relocation( timeline_id_second, current_lsn_second = populate_branch( ep_second, tenant_id=tenant_id, - ps_http=pageserver_http, + ps_http=origin_http, create_table=False, expected_sum=1001000, ) # wait until pageserver receives that data - wait_for_last_record_lsn(pageserver_http, tenant_id, timeline_id_main, current_lsn_main) - timeline_detail_main = pageserver_http.timeline_detail(tenant_id, timeline_id_main) + wait_for_last_record_lsn(origin_http, tenant_id, timeline_id_main, current_lsn_main) + timeline_detail_main = origin_http.timeline_detail(tenant_id, timeline_id_main) - wait_for_last_record_lsn(pageserver_http, tenant_id, timeline_id_second, current_lsn_second) - timeline_detail_second = pageserver_http.timeline_detail(tenant_id, timeline_id_second) + wait_for_last_record_lsn(origin_http, tenant_id, timeline_id_second, current_lsn_second) + timeline_detail_second = origin_http.timeline_detail(tenant_id, timeline_id_second) if with_load == "with_load": # create load table @@ -350,170 +297,149 @@ def test_tenant_relocation( # if user creates a branch during migration # it wont appear on the new pageserver ensure_checkpoint( - pageserver_http=pageserver_http, + pageserver_http=origin_http, tenant_id=tenant_id, timeline_id=timeline_id_main, current_lsn=current_lsn_main, ) ensure_checkpoint( - pageserver_http=pageserver_http, + pageserver_http=origin_http, tenant_id=tenant_id, timeline_id=timeline_id_second, current_lsn=current_lsn_second, ) - log.info("inititalizing new pageserver") - # bootstrap second pageserver - new_pageserver_dir = env.repo_dir / "new_pageserver" - new_pageserver_dir.mkdir() + # Migrate either by attaching from s3 or import/export basebackup + if method == "major": + cmd = [ + "poetry", + "run", + "python", + str(base_dir / "scripts/export_import_between_pageservers.py"), + "--tenant-id", + str(tenant_id), + "--from-host", + "localhost", + "--from-http-port", + str(origin_http.port), + "--from-pg-port", + str(origin_ps.service_port.pg), + "--to-host", + "localhost", + "--to-http-port", + str(destination_http.port), + "--to-pg-port", + str(destination_ps.service_port.pg), + "--pg-distrib-dir", + str(neon_env_builder.pg_distrib_dir), + "--work-dir", + str(test_output_dir), + "--tmp-pg-port", + str(port_distributor.get_port()), + ] + subprocess_capture(test_output_dir, cmd, check=True) - new_pageserver_pg_port = port_distributor.get_port() - new_pageserver_http_port = port_distributor.get_port() - log.info("new pageserver ports pg %s http %s", new_pageserver_pg_port, new_pageserver_http_port) - pageserver_bin = neon_binpath / "pageserver" + destination_ps.allowed_errors.append( + ".*ignored .* unexpected bytes after the tar archive.*" + ) + elif method == "minor": + # call to attach timeline to new pageserver + destination_ps.tenant_attach(tenant_id) - new_pageserver_http = PageserverHttpClient( - port=new_pageserver_http_port, - auth_token=None, - is_testing_enabled_or_skip=env.pageserver.is_testing_enabled_or_skip, - ) + # wait for tenant to finish attaching + wait_until( + number_of_iterations=10, + interval=1, + func=lambda: assert_tenant_state(destination_http, tenant_id, "Active"), + ) - with new_pageserver_service( - new_pageserver_dir, - pageserver_bin, - remote_storage_mock_path, - new_pageserver_pg_port, - new_pageserver_http_port, - neon_env_builder.broker, - neon_env_builder.pg_distrib_dir, - ): - # Migrate either by attaching from s3 or import/export basebackup - if method == "major": - cmd = [ - "poetry", - "run", - "python", - str(base_dir / "scripts/export_import_between_pageservers.py"), - "--tenant-id", - str(tenant_id), - "--from-host", - "localhost", - "--from-http-port", - str(pageserver_http.port), - "--from-pg-port", - str(env.pageserver.service_port.pg), - "--to-host", - "localhost", - "--to-http-port", - str(new_pageserver_http_port), - "--to-pg-port", - str(new_pageserver_pg_port), - "--pg-distrib-dir", - str(neon_env_builder.pg_distrib_dir), - "--work-dir", - str(test_output_dir), - "--tmp-pg-port", - str(port_distributor.get_port()), - ] - subprocess_capture(test_output_dir, cmd, check=True) - elif method == "minor": - # call to attach timeline to new pageserver - new_pageserver_http.tenant_attach(tenant_id) - - # wait for tenant to finish attaching - wait_until( - number_of_iterations=10, - interval=1, - func=lambda: assert_tenant_state(new_pageserver_http, tenant_id, "Active"), - ) - - check_timeline_attached( - new_pageserver_http, - tenant_id, - timeline_id_main, - timeline_detail_main, - current_lsn_main, - ) - - check_timeline_attached( - new_pageserver_http, - tenant_id, - timeline_id_second, - timeline_detail_second, - current_lsn_second, - ) - - # rewrite neon cli config to use new pageserver for basebackup to start new compute - lines = (env.repo_dir / "config").read_text().splitlines() - for i, line in enumerate(lines): - if line.startswith("listen_http_addr"): - lines[i] = f"listen_http_addr = 'localhost:{new_pageserver_http_port}'" - if line.startswith("listen_pg_addr"): - lines[i] = f"listen_pg_addr = 'localhost:{new_pageserver_pg_port}'" - (env.repo_dir / "config").write_text("\n".join(lines)) - - old_local_path_main = switch_pg_to_new_pageserver( - env, - ep_main, - new_pageserver_pg_port, + check_timeline_attached( + destination_http, tenant_id, timeline_id_main, + timeline_detail_main, + current_lsn_main, ) - old_local_path_second = switch_pg_to_new_pageserver( - env, - ep_second, - new_pageserver_pg_port, + check_timeline_attached( + destination_http, tenant_id, timeline_id_second, + timeline_detail_second, + current_lsn_second, ) - # detach tenant from old pageserver before we check - # that all the data is there to be sure that old pageserver - # is no longer involved, and if it is, we will see the error - pageserver_http.tenant_detach(tenant_id) + # rewrite neon cli config to use new pageserver for basebackup to start new compute + lines = (env.repo_dir / "config").read_text().splitlines() + for i, line in enumerate(lines): + if line.startswith("listen_http_addr"): + lines[i] = f"listen_http_addr = 'localhost:{destination_http.port}'" + if line.startswith("listen_pg_addr"): + lines[i] = f"listen_pg_addr = 'localhost:{destination_ps.service_port.pg}'" + (env.repo_dir / "config").write_text("\n".join(lines)) - # Wait a little, so that the detach operation has time to finish. - wait_tenant_status_404(pageserver_http, tenant_id, iterations=100, interval=1) + old_local_path_main = switch_pg_to_new_pageserver( + origin_ps, + ep_main, + destination_ps.service_port.pg, + tenant_id, + timeline_id_main, + ) - post_migration_check(ep_main, 500500, old_local_path_main) - post_migration_check(ep_second, 1001000, old_local_path_second) + old_local_path_second = switch_pg_to_new_pageserver( + origin_ps, + ep_second, + destination_ps.service_port.pg, + tenant_id, + timeline_id_second, + ) - # ensure that we can successfully read all relations on the new pageserver - with pg_cur(ep_second) as cur: - cur.execute( - """ - DO $$ - DECLARE - r RECORD; - BEGIN - FOR r IN - SELECT relname FROM pg_class WHERE relkind='r' - LOOP - RAISE NOTICE '%', r.relname; - EXECUTE 'SELECT count(*) FROM quote_ident($1)' USING r.relname; - END LOOP; - END$$; - """ - ) + # detach tenant from old pageserver before we check + # that all the data is there to be sure that old pageserver + # is no longer involved, and if it is, we will see the error + origin_http.tenant_detach(tenant_id) - if with_load == "with_load": - assert load_ok_event.wait(3) - log.info("stopping load thread") - load_stop_event.set() - load_thread.join(timeout=10) - log.info("load thread stopped") + # Wait a little, so that the detach operation has time to finish. + wait_tenant_status_404(origin_http, tenant_id, iterations=100, interval=1) - # bring old pageserver back for clean shutdown via neon cli - # new pageserver will be shut down by the context manager - lines = (env.repo_dir / "config").read_text().splitlines() - for i, line in enumerate(lines): - if line.startswith("listen_http_addr"): - lines[i] = f"listen_http_addr = 'localhost:{env.pageserver.service_port.http}'" - if line.startswith("listen_pg_addr"): - lines[i] = f"listen_pg_addr = 'localhost:{env.pageserver.service_port.pg}'" - (env.repo_dir / "config").write_text("\n".join(lines)) + post_migration_check(ep_main, 500500, old_local_path_main) + post_migration_check(ep_second, 1001000, old_local_path_second) + + # ensure that we can successfully read all relations on the new pageserver + with pg_cur(ep_second) as cur: + cur.execute( + """ + DO $$ + DECLARE + r RECORD; + BEGIN + FOR r IN + SELECT relname FROM pg_class WHERE relkind='r' + LOOP + RAISE NOTICE '%', r.relname; + EXECUTE 'SELECT count(*) FROM quote_ident($1)' USING r.relname; + END LOOP; + END$$; + """ + ) + + if with_load == "with_load": + assert load_ok_event.wait(3) + log.info("stopping load thread") + load_stop_event.set() + load_thread.join(timeout=10) + log.info("load thread stopped") + + # bring old pageserver back for clean shutdown via neon cli + # new pageserver will be shut down by the context manager + lines = (env.repo_dir / "config").read_text().splitlines() + for i, line in enumerate(lines): + if line.startswith("listen_http_addr"): + lines[i] = f"listen_http_addr = 'localhost:{origin_ps.service_port.http}'" + if line.startswith("listen_pg_addr"): + lines[i] = f"listen_pg_addr = 'localhost:{origin_ps.service_port.pg}'" + (env.repo_dir / "config").write_text("\n".join(lines)) # Simulate hard crash of pageserver and re-attach a tenant with a branch @@ -571,7 +497,7 @@ def test_emergency_relocate_with_branches_slow_replay( # Attach and wait a few seconds to give it time to load the tenants, attach to the # safekeepers, and to stream and ingest the WAL up to the pause-point. before_attach_time = time.time() - pageserver_http.tenant_attach(tenant_id) + env.pageserver.tenant_attach(tenant_id) time.sleep(3) # The wal ingestion on the main timeline should now be paused at the fail point. @@ -718,7 +644,7 @@ def test_emergency_relocate_with_branches_createdb( # ingest the WAL, but let's make this less dependent on accidental timing. pageserver_http.configure_failpoints([("wal-ingest-logical-message-sleep", "return(5000)")]) before_attach_time = time.time() - pageserver_http.tenant_attach(tenant_id) + env.pageserver.tenant_attach(tenant_id) child_endpoint.start() with child_endpoint.cursor(dbname="neondb") as cur: diff --git a/test_runner/regress/test_tenants_with_remote_storage.py b/test_runner/regress/test_tenants_with_remote_storage.py index 0169335a70..f181e70696 100644 --- a/test_runner/regress/test_tenants_with_remote_storage.py +++ b/test_runner/regress/test_tenants_with_remote_storage.py @@ -297,8 +297,8 @@ def test_tenant_redownloads_truncated_file_on_startup( assert os.stat(path).st_size == expected_size, "truncated layer should had been re-downloaded" # the remote side of local_layer_truncated - remote_layer_path = ( - env.pageserver_remote_storage.timeline_path(tenant_id, timeline_id) / path.name + remote_layer_path = env.pageserver_remote_storage.remote_layer_path( + tenant_id, timeline_id, path.name ) # if the upload ever was ongoing, this check would be racy, but at least one diff --git a/test_runner/regress/test_timeline_delete.py b/test_runner/regress/test_timeline_delete.py index b1a2755394..dab7b3879e 100644 --- a/test_runner/regress/test_timeline_delete.py +++ b/test_runner/regress/test_timeline_delete.py @@ -396,7 +396,7 @@ def test_timeline_resurrection_on_attach( ##### Second start, restore the data and ensure that we see only timeline that wasnt deleted env.pageserver.start() - ps_http.tenant_attach(tenant_id=tenant_id) + env.pageserver.tenant_attach(tenant_id=tenant_id) wait_until_tenant_active(ps_http, tenant_id=tenant_id, iterations=10, period=0.5) @@ -897,7 +897,7 @@ def test_timeline_delete_resumed_on_attach( env.pageserver.start() # now we call attach - ps_http.tenant_attach(tenant_id=tenant_id) + env.pageserver.tenant_attach(tenant_id=tenant_id) # delete should be resumed wait_timeline_detail_404(ps_http, env.initial_tenant, timeline_id, iterations=iterations) diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index ad12b56874..b34d2de0ba 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -298,17 +298,21 @@ def test_broker(neon_env_builder: NeonEnvBuilder): # and wait till remote_consistent_lsn propagates to all safekeepers # - # TODO: this executes long as timeline on safekeeper is immediately - # deactivated once rcl reaches pageserver one, and thus we generally wait - # till pageserver reconnects to all safekeepers one by one here. Timeline - # status on safekeeper should take into account peers state as well. + # This timeout is long: safekeepers learn about remote_consistent_lsn updates when a pageserver + # connects, receives a PrimaryKeepAlive, and sends a PageserverFeedback. So the timeout has to encompass: + # - pageserver deletion_queue to validate + publish the remote_consistent_lsn + # - pageserver to reconnect to all safekeepers one by one, with multi-second delays between + # + # TODO: timeline status on safekeeper should take into account peers state as well. + rcl_propagate_secs = 60 + started_at = time.time() while True: stat_after = [cli.timeline_status(tenant_id, timeline_id) for cli in clients] if all([s_after.remote_consistent_lsn >= new_rcl for s_after in stat_after]): break elapsed = time.time() - started_at - if elapsed > 30: + if elapsed > rcl_propagate_secs: raise RuntimeError( f"timed out waiting {elapsed:.0f}s for remote_consistent_lsn propagation: status before {stat_before}, status current {stat_after}" )