mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 20:42:54 +00:00
Reuse existing tenants during attach (#2540)
This commit is contained in:
@@ -639,6 +639,7 @@ pub fn spawn_storage_sync_task(
|
||||
(storage, remote_index_clone, sync_queue),
|
||||
max_sync_errors,
|
||||
)
|
||||
.instrument(info_span!("storage_sync_loop"))
|
||||
.await;
|
||||
Ok(())
|
||||
},
|
||||
|
||||
@@ -400,16 +400,19 @@ impl Tenant {
|
||||
timeline_id,
|
||||
metadata.pg_version()
|
||||
);
|
||||
let timeline = self
|
||||
.initialize_new_timeline(timeline_id, metadata, &mut timelines_accessor)
|
||||
.with_context(|| format!("Failed to initialize timeline {timeline_id}"))?;
|
||||
|
||||
match timelines_accessor.entry(timeline.timeline_id) {
|
||||
Entry::Occupied(_) => bail!(
|
||||
"Found freshly initialized timeline {} in the tenant map",
|
||||
timeline.timeline_id
|
||||
let ancestor = metadata
|
||||
.ancestor_timeline()
|
||||
.and_then(|ancestor_timeline_id| timelines_accessor.get(&ancestor_timeline_id))
|
||||
.cloned();
|
||||
match timelines_accessor.entry(timeline_id) {
|
||||
Entry::Occupied(_) => warn!(
|
||||
"Timeline {}/{} already exists in the tenant map, skipping its initialization",
|
||||
self.tenant_id, timeline_id
|
||||
),
|
||||
Entry::Vacant(v) => {
|
||||
let timeline = self
|
||||
.initialize_new_timeline(timeline_id, metadata, ancestor)
|
||||
.with_context(|| format!("Failed to initialize timeline {timeline_id}"))?;
|
||||
v.insert(timeline);
|
||||
}
|
||||
}
|
||||
@@ -609,21 +612,14 @@ impl Tenant {
|
||||
&self,
|
||||
new_timeline_id: TimelineId,
|
||||
new_metadata: TimelineMetadata,
|
||||
timelines: &mut MutexGuard<HashMap<TimelineId, Arc<Timeline>>>,
|
||||
ancestor: Option<Arc<Timeline>>,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
let ancestor = match new_metadata.ancestor_timeline() {
|
||||
Some(ancestor_timeline_id) => Some(
|
||||
timelines
|
||||
.get(&ancestor_timeline_id)
|
||||
.cloned()
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Timeline's {new_timeline_id} ancestor {ancestor_timeline_id} was not found"
|
||||
)
|
||||
})?,
|
||||
),
|
||||
None => None,
|
||||
};
|
||||
if let Some(ancestor_timeline_id) = new_metadata.ancestor_timeline() {
|
||||
anyhow::ensure!(
|
||||
ancestor.is_some(),
|
||||
"Timeline's {new_timeline_id} ancestor {ancestor_timeline_id} was not found"
|
||||
)
|
||||
}
|
||||
|
||||
let new_disk_consistent_lsn = new_metadata.disk_consistent_lsn();
|
||||
let pg_version = new_metadata.pg_version();
|
||||
@@ -1080,8 +1076,12 @@ impl Tenant {
|
||||
)
|
||||
})?;
|
||||
|
||||
let ancestor = new_metadata
|
||||
.ancestor_timeline()
|
||||
.and_then(|ancestor_timeline_id| timelines.get(&ancestor_timeline_id))
|
||||
.cloned();
|
||||
let new_timeline = self
|
||||
.initialize_new_timeline(new_timeline_id, new_metadata, timelines)
|
||||
.initialize_new_timeline(new_timeline_id, new_metadata, ancestor)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to initialize timeline {}/{}",
|
||||
|
||||
@@ -107,6 +107,9 @@ pub fn init_tenant_mgr(
|
||||
/// Ignores other timelines that might be present for tenant, but were not passed as a parameter.
|
||||
/// Attempts to load as many entites as possible: if a certain timeline fails during the load, the tenant is marked as "Broken",
|
||||
/// and the load continues.
|
||||
///
|
||||
/// Attach happens on startup and sucessful timeline downloads
|
||||
/// (some subset of timeline files, always including its metadata, after which the new one needs to be registered).
|
||||
pub fn attach_local_tenants(
|
||||
conf: &'static PageServerConf,
|
||||
remote_index: &RemoteIndex,
|
||||
@@ -122,18 +125,20 @@ pub fn attach_local_tenants(
|
||||
);
|
||||
debug!("Timelines to attach: {local_timelines:?}");
|
||||
|
||||
let tenant = load_local_tenant(conf, tenant_id, remote_index);
|
||||
{
|
||||
match tenants_state::write_tenants().entry(tenant_id) {
|
||||
hash_map::Entry::Occupied(_) => {
|
||||
error!("Cannot attach tenant {tenant_id}: there's already an entry in the tenant state");
|
||||
continue;
|
||||
}
|
||||
hash_map::Entry::Vacant(v) => {
|
||||
v.insert(Arc::clone(&tenant));
|
||||
}
|
||||
let mut tenants_accessor = tenants_state::write_tenants();
|
||||
let tenant = match tenants_accessor.entry(tenant_id) {
|
||||
hash_map::Entry::Occupied(o) => {
|
||||
info!("Tenant {tenant_id} was found in pageserver's memory");
|
||||
Arc::clone(o.get())
|
||||
}
|
||||
}
|
||||
hash_map::Entry::Vacant(v) => {
|
||||
info!("Tenant {tenant_id} was not found in pageserver's memory, loading it");
|
||||
let tenant = load_local_tenant(conf, tenant_id, remote_index);
|
||||
v.insert(Arc::clone(&tenant));
|
||||
tenant
|
||||
}
|
||||
};
|
||||
drop(tenants_accessor);
|
||||
|
||||
if tenant.current_state() == TenantState::Broken {
|
||||
warn!("Skipping timeline load for broken tenant {tenant_id}")
|
||||
|
||||
@@ -7,19 +7,25 @@
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import List, Tuple
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
NeonPageserverHttpClient,
|
||||
Postgres,
|
||||
RemoteStorageKind,
|
||||
available_remote_storages,
|
||||
wait_for_last_record_lsn,
|
||||
wait_for_upload,
|
||||
wait_until,
|
||||
)
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
|
||||
async def tenant_workload(env: NeonEnv, pg: Postgres):
|
||||
@@ -93,3 +99,93 @@ def test_tenants_many(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Rem
|
||||
# run final checkpoint manually to flush all the data to remote storage
|
||||
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
wait_for_upload(pageserver_http, tenant_id, timeline_id, current_lsn)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
|
||||
def test_tenants_attached_after_download(
|
||||
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
|
||||
):
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=remote_storage_kind,
|
||||
test_name="remote_storage_kind",
|
||||
)
|
||||
|
||||
data_id = 1
|
||||
data_secret = "very secret secret"
|
||||
|
||||
##### First start, insert secret data and upload it to the remote storage
|
||||
env = neon_env_builder.init_start()
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
pg = env.postgres.create_start("main")
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
tenant_id = TenantId(pg.safe_psql("show neon.tenant_id")[0][0])
|
||||
timeline_id = TimelineId(pg.safe_psql("show neon.timeline_id")[0][0])
|
||||
|
||||
for checkpoint_number in range(1, 3):
|
||||
with pg.cursor() as cur:
|
||||
cur.execute(
|
||||
f"""
|
||||
CREATE TABLE t{checkpoint_number}(id int primary key, secret text);
|
||||
INSERT INTO t{checkpoint_number} VALUES ({data_id}, '{data_secret}|{checkpoint_number}');
|
||||
"""
|
||||
)
|
||||
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
|
||||
|
||||
# wait until pageserver receives that data
|
||||
wait_for_last_record_lsn(client, tenant_id, timeline_id, current_lsn)
|
||||
|
||||
# run checkpoint manually to be sure that data landed in remote storage
|
||||
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
|
||||
log.info(f"waiting for checkpoint {checkpoint_number} upload")
|
||||
# wait until pageserver successfully uploaded a checkpoint to remote storage
|
||||
wait_for_upload(client, tenant_id, timeline_id, current_lsn)
|
||||
log.info(f"upload of checkpoint {checkpoint_number} is done")
|
||||
|
||||
##### Stop the pageserver, erase its layer file to force it being downloaded from S3
|
||||
env.postgres.stop_all()
|
||||
env.pageserver.stop()
|
||||
|
||||
timeline_dir = Path(env.repo_dir) / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
|
||||
local_layer_deleted = False
|
||||
for path in Path.iterdir(timeline_dir):
|
||||
if path.name.startswith("00000"):
|
||||
# Looks like a layer file. Remove it
|
||||
os.remove(path)
|
||||
local_layer_deleted = True
|
||||
break
|
||||
assert local_layer_deleted, f"Found no local layer files to delete in directory {timeline_dir}"
|
||||
|
||||
##### Start the pageserver, forcing it to download the layer file and load the timeline into memory
|
||||
env.pageserver.start()
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
wait_until(
|
||||
number_of_iterations=5,
|
||||
interval=1,
|
||||
func=lambda: expect_tenant_to_download_timeline(client, tenant_id),
|
||||
)
|
||||
|
||||
restored_timelines = client.timeline_list(tenant_id)
|
||||
assert (
|
||||
len(restored_timelines) == 1
|
||||
), f"Tenant {tenant_id} should have its timeline reattached after its layer is downloaded from the remote storage"
|
||||
retored_timeline = restored_timelines[0]
|
||||
assert retored_timeline["timeline_id"] == str(
|
||||
timeline_id
|
||||
), f"Tenant {tenant_id} should have its old timeline {timeline_id} restored from the remote storage"
|
||||
|
||||
|
||||
def expect_tenant_to_download_timeline(
|
||||
client: NeonPageserverHttpClient,
|
||||
tenant_id: TenantId,
|
||||
):
|
||||
for tenant in client.tenant_list():
|
||||
if tenant["id"] == str(tenant_id):
|
||||
assert not tenant.get(
|
||||
"has_in_progress_downloads", True
|
||||
), f"Tenant {tenant_id} should have no downloads in progress"
|
||||
return
|
||||
assert False, f"Tenant {tenant_id} is missing on pageserver"
|
||||
|
||||
Reference in New Issue
Block a user