Merge remote-tracking branch 'origin/main'

This commit is contained in:
Heikki Linnakangas
2022-05-15 17:16:56 +03:00
2 changed files with 135 additions and 28 deletions

View File

@@ -78,6 +78,9 @@ pub enum TenantState {
// The local disk might have some newer files that don't exist in cloud storage yet.
// The tenant cannot be accessed anymore for any reason, but graceful shutdown.
Stopping,
// Something went wrong loading the tenant state
Broken,
}
impl fmt::Display for TenantState {
@@ -86,6 +89,7 @@ impl fmt::Display for TenantState {
TenantState::Active => f.write_str("Active"),
TenantState::Idle => f.write_str("Idle"),
TenantState::Stopping => f.write_str("Stopping"),
TenantState::Broken => f.write_str("Broken"),
}
}
}
@@ -99,7 +103,22 @@ pub fn init_tenant_mgr(conf: &'static PageServerConf) -> anyhow::Result<RemoteIn
local_timeline_init_statuses,
} = storage_sync::start_local_timeline_sync(conf)
.context("Failed to set up local files sync with external storage")?;
init_local_repositories(conf, local_timeline_init_statuses, &remote_index)?;
for (tenant_id, local_timeline_init_statuses) in local_timeline_init_statuses {
if let Err(err) =
init_local_repository(conf, tenant_id, local_timeline_init_statuses, &remote_index)
{
// Report the error, but continue with the startup for other tenants. An error
// loading a tenant is serious, but it's better to complete the startup and
// serve other tenants, than fail completely.
error!("Failed to initialize local tenant {tenant_id}: {:?}", err);
let mut m = tenants_state::write_tenants();
if let Some(tenant) = m.get_mut(&tenant_id) {
tenant.state = TenantState::Broken;
}
}
}
Ok(remote_index)
}
@@ -143,8 +162,13 @@ pub fn shutdown_all_tenants() {
let mut m = tenants_state::write_tenants();
let mut tenantids = Vec::new();
for (tenantid, tenant) in m.iter_mut() {
tenant.state = TenantState::Stopping;
tenantids.push(*tenantid)
match tenant.state {
TenantState::Active | TenantState::Idle | TenantState::Stopping => {
tenant.state = TenantState::Stopping;
tenantids.push(*tenantid)
}
TenantState::Broken => {}
}
}
drop(m);
@@ -270,6 +294,10 @@ pub fn activate_tenant(tenant_id: ZTenantId) -> anyhow::Result<()> {
TenantState::Stopping => {
// don't re-activate it if it's being stopped
}
TenantState::Broken => {
// cannot activate
}
}
Ok(())
}
@@ -370,38 +398,37 @@ pub fn list_tenants() -> Vec<TenantInfo> {
.collect()
}
fn init_local_repositories(
fn init_local_repository(
conf: &'static PageServerConf,
local_timeline_init_statuses: HashMap<ZTenantId, HashMap<ZTimelineId, LocalTimelineInitStatus>>,
tenant_id: ZTenantId,
local_timeline_init_statuses: HashMap<ZTimelineId, LocalTimelineInitStatus>,
remote_index: &RemoteIndex,
) -> anyhow::Result<(), anyhow::Error> {
for (tenant_id, local_timeline_init_statuses) in local_timeline_init_statuses {
// initialize local tenant
let repo = load_local_repo(conf, tenant_id, remote_index)
.with_context(|| format!("Failed to load repo for tenant {tenant_id}"))?;
// initialize local tenant
let repo = load_local_repo(conf, tenant_id, remote_index)
.with_context(|| format!("Failed to load repo for tenant {tenant_id}"))?;
let mut status_updates = HashMap::with_capacity(local_timeline_init_statuses.len());
for (timeline_id, init_status) in local_timeline_init_statuses {
match init_status {
LocalTimelineInitStatus::LocallyComplete => {
debug!("timeline {timeline_id} for tenant {tenant_id} is locally complete, registering it in repository");
status_updates.insert(timeline_id, TimelineSyncStatusUpdate::Downloaded);
}
LocalTimelineInitStatus::NeedsSync => {
debug!(
"timeline {tenant_id} for tenant {timeline_id} needs sync, \
so skipped for adding into repository until sync is finished"
);
}
let mut status_updates = HashMap::with_capacity(local_timeline_init_statuses.len());
for (timeline_id, init_status) in local_timeline_init_statuses {
match init_status {
LocalTimelineInitStatus::LocallyComplete => {
debug!("timeline {timeline_id} for tenant {tenant_id} is locally complete, registering it in repository");
status_updates.insert(timeline_id, TimelineSyncStatusUpdate::Downloaded);
}
LocalTimelineInitStatus::NeedsSync => {
debug!(
"timeline {tenant_id} for tenant {timeline_id} needs sync, \
so skipped for adding into repository until sync is finished"
);
}
}
// Lets fail here loudly to be on the safe side.
// XXX: It may be a better api to actually distinguish between repository startup
// and processing of newly downloaded timelines.
apply_timeline_remote_sync_status_updates(&repo, status_updates)
.with_context(|| format!("Failed to bootstrap timelines for tenant {tenant_id}"))?
}
// Lets fail here loudly to be on the safe side.
// XXX: It may be a better api to actually distinguish between repository startup
// and processing of newly downloaded timelines.
apply_timeline_remote_sync_status_updates(&repo, status_updates)
.with_context(|| format!("Failed to bootstrap timelines for tenant {tenant_id}"))?;
Ok(())
}

View File

@@ -0,0 +1,80 @@
import pytest
from contextlib import closing
from fixtures.zenith_fixtures import ZenithEnvBuilder
from fixtures.log_helper import log
import os
# Test restarting page server, while safekeeper and compute node keep
# running.
def test_broken_timeline(zenith_env_builder: ZenithEnvBuilder):
# One safekeeper is enough for this test.
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init_start()
tenant_timelines = []
for n in range(4):
tenant_id_uuid, timeline_id_uuid = env.zenith_cli.create_tenant()
tenant_id = tenant_id_uuid.hex
timeline_id = timeline_id_uuid.hex
pg = env.postgres.create_start(f'main', tenant_id=tenant_id_uuid)
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute("CREATE TABLE t(key int primary key, value text)")
cur.execute("INSERT INTO t SELECT generate_series(1,100), 'payload'")
cur.execute("SHOW zenith.zenith_timeline")
timeline_id = cur.fetchone()[0]
pg.stop()
tenant_timelines.append((tenant_id, timeline_id, pg))
# Stop the pageserver
env.pageserver.stop()
# Leave the first timeline alone, but corrupt the others in different ways
(tenant0, timeline0, pg0) = tenant_timelines[0]
# Corrupt metadata file on timeline 1
(tenant1, timeline1, pg1) = tenant_timelines[1]
metadata_path = "{}/tenants/{}/timelines/{}/metadata".format(env.repo_dir, tenant1, timeline1)
print(f'overwriting metadata file at {metadata_path}')
f = open(metadata_path, "w")
f.write("overwritten with garbage!")
f.close()
# Missing layer files file on timeline 2. (This would actually work
# if we had Cloud Storage enabled in this test.)
(tenant2, timeline2, pg2) = tenant_timelines[2]
timeline_path = "{}/tenants/{}/timelines/{}/".format(env.repo_dir, tenant2, timeline2)
for filename in os.listdir(timeline_path):
if filename.startswith('00000'):
# Looks like a layer file. Remove it
os.remove(f'{timeline_path}/{filename}')
# Corrupt layer files file on timeline 3
(tenant3, timeline3, pg3) = tenant_timelines[3]
timeline_path = "{}/tenants/{}/timelines/{}/".format(env.repo_dir, tenant3, timeline3)
for filename in os.listdir(timeline_path):
if filename.startswith('00000'):
# Looks like a layer file. Corrupt it
f = open(f'{timeline_path}/{filename}', "w")
f.write("overwritten with garbage!")
f.close()
env.pageserver.start()
# Tenant 0 should still work
pg0.start()
with closing(pg0.connect()) as conn:
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM t")
assert cur.fetchone()[0] == 100
# But all others are broken
for n in range(1, 4):
(tenant, timeline, pg) = tenant_timelines[n]
with pytest.raises(Exception, match="Cannot load local timeline") as err:
pg.start()
log.info(f'compute startup failed as expected: {err}')