mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 12:32:54 +00:00
storage_controller: create imported timelines on safekeepers (#11801)
## Problem SK timeline creations were skipped for imported timelines since we didn't know the correct start LSN of the timeline at that point. ## Summary of changes Created imported timelines on the SK as part of the import finalize step. We use the last record LSN of shard 0 as the start LSN for the safekeeper timeline. Closes https://github.com/neondatabase/neon/issues/11569
This commit is contained in:
@@ -3886,10 +3886,10 @@ impl Service {
|
||||
|
||||
None
|
||||
} else if safekeepers {
|
||||
// Note that we do not support creating the timeline on the safekeepers
|
||||
// for imported timelines. The `start_lsn` of the timeline is not known
|
||||
// until the import finshes.
|
||||
// https://github.com/neondatabase/neon/issues/11569
|
||||
// Note that for imported timelines, we do not create the timeline on the safekeepers
|
||||
// straight away. Instead, we do it once the import finalized such that we know what
|
||||
// start LSN to provide for the safekeepers. This is done in
|
||||
// [`Self::finalize_timeline_import`].
|
||||
let res = self
|
||||
.tenant_timeline_create_safekeepers(tenant_id, &timeline_info)
|
||||
.instrument(tracing::info_span!("timeline_create_safekeepers", %tenant_id, timeline_id=%timeline_info.timeline_id))
|
||||
@@ -3966,11 +3966,22 @@ impl Service {
|
||||
let active = self.timeline_active_on_all_shards(&import).await?;
|
||||
|
||||
match active {
|
||||
true => {
|
||||
Some(timeline_info) => {
|
||||
tracing::info!("Timeline became active on all shards");
|
||||
|
||||
if self.config.timelines_onto_safekeepers {
|
||||
// Now that we know the start LSN of this timeline, create it on the
|
||||
// safekeepers.
|
||||
self.tenant_timeline_create_safekeepers_until_success(
|
||||
import.tenant_id,
|
||||
timeline_info,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
false => {
|
||||
None => {
|
||||
tracing::info!("Timeline not active on all shards yet");
|
||||
|
||||
tokio::select! {
|
||||
@@ -4004,9 +4015,6 @@ impl Service {
|
||||
.range_mut(TenantShardId::tenant_range(import.tenant_id))
|
||||
.for_each(|(_id, shard)| shard.importing = TimelineImportState::Idle);
|
||||
|
||||
// TODO(vlad): Timeline creations in import mode do not return a correct initdb lsn,
|
||||
// so we can't create the timeline on the safekeepers. Fix by moving creation here.
|
||||
// https://github.com/neondatabase/neon/issues/11569
|
||||
tracing::info!(%import_failed, "Timeline import complete");
|
||||
|
||||
Ok(())
|
||||
@@ -4021,10 +4029,16 @@ impl Service {
|
||||
.await;
|
||||
}
|
||||
|
||||
/// If the timeline is active on all shards, returns the [`TimelineInfo`]
|
||||
/// collected from shard 0.
|
||||
///
|
||||
/// An error is returned if the shard layout has changed during the import.
|
||||
/// This is guarded against within the storage controller and the pageserver,
|
||||
/// and, therefore, unexpected.
|
||||
async fn timeline_active_on_all_shards(
|
||||
self: &Arc<Self>,
|
||||
import: &TimelineImport,
|
||||
) -> anyhow::Result<bool> {
|
||||
) -> anyhow::Result<Option<TimelineInfo>> {
|
||||
let targets = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let mut targets = Vec::new();
|
||||
@@ -4048,13 +4062,17 @@ impl Service {
|
||||
.expect("Pageservers may not be deleted while referenced");
|
||||
targets.push((*tenant_shard_id, node.clone()));
|
||||
} else {
|
||||
return Ok(false);
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
|
||||
targets
|
||||
};
|
||||
|
||||
if targets.is_empty() {
|
||||
anyhow::bail!("No shards found to finalize import for");
|
||||
}
|
||||
|
||||
let results = self
|
||||
.tenant_for_shards_api(
|
||||
targets,
|
||||
@@ -4070,10 +4088,17 @@ impl Service {
|
||||
)
|
||||
.await;
|
||||
|
||||
Ok(results.into_iter().all(|res| match res {
|
||||
let all_active = results.iter().all(|res| match res {
|
||||
Ok(info) => info.state == TimelineState::Active,
|
||||
Err(_) => false,
|
||||
}))
|
||||
});
|
||||
|
||||
if all_active {
|
||||
// Both unwraps are validated above
|
||||
Ok(Some(results.into_iter().next().unwrap().unwrap()))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_timeline_archival_config(
|
||||
|
||||
@@ -323,6 +323,42 @@ impl Service {
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_timeline_create_safekeepers_until_success(
|
||||
self: &Arc<Self>,
|
||||
tenant_id: TenantId,
|
||||
timeline_info: TimelineInfo,
|
||||
) -> anyhow::Result<()> {
|
||||
const BACKOFF: Duration = Duration::from_secs(5);
|
||||
|
||||
loop {
|
||||
if self.cancel.is_cancelled() {
|
||||
anyhow::bail!("Shut down requested while finalizing import");
|
||||
}
|
||||
|
||||
let res = self
|
||||
.tenant_timeline_create_safekeepers(tenant_id, &timeline_info)
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Ok(_) => {
|
||||
tracing::info!("Timeline created on safekeepers");
|
||||
break;
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::error!("Failed to create timeline on safekeepers: {err}");
|
||||
tokio::select! {
|
||||
_ = self.cancel.cancelled() => {
|
||||
anyhow::bail!("Shut down requested while finalizing import");
|
||||
},
|
||||
_ = tokio::time::sleep(BACKOFF) => {}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Directly insert the timeline into the database without reconciling it with safekeepers.
|
||||
///
|
||||
/// Useful if the timeline already exists on the specified safekeepers,
|
||||
|
||||
@@ -24,6 +24,7 @@ from fixtures.utils import (
|
||||
skip_in_debug_build,
|
||||
wait_until,
|
||||
)
|
||||
from fixtures.workload import Workload
|
||||
from mypy_boto3_kms import KMSClient
|
||||
from mypy_boto3_kms.type_defs import EncryptResponseTypeDef
|
||||
from mypy_boto3_s3 import S3Client
|
||||
@@ -97,6 +98,10 @@ def test_pgdata_import_smoke(
|
||||
f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/storage/api/v1/"
|
||||
)
|
||||
|
||||
if neon_env_builder.storage_controller_config is None:
|
||||
neon_env_builder.storage_controller_config = {}
|
||||
neon_env_builder.storage_controller_config["timelines_onto_safekeepers"] = True
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# The test needs LocalFs support, which is only built in testing mode.
|
||||
@@ -286,34 +291,28 @@ def test_pgdata_import_smoke(
|
||||
#
|
||||
# validate that we can write
|
||||
#
|
||||
rw_endpoint = env.endpoints.create_start(
|
||||
branch_name=import_branch_name,
|
||||
endpoint_id="rw",
|
||||
tenant_id=tenant_id,
|
||||
config_lines=ep_config,
|
||||
)
|
||||
rw_endpoint.safe_psql("create table othertable(values text)")
|
||||
rw_lsn = Lsn(rw_endpoint.safe_psql_scalar("select pg_current_wal_flush_lsn()"))
|
||||
workload = Workload(env, tenant_id, timeline_id, branch_name=import_branch_name)
|
||||
workload.init()
|
||||
workload.write_rows(64)
|
||||
workload.validate()
|
||||
|
||||
# TODO: consider using `class Workload` here
|
||||
# to do compaction and whatnot?
|
||||
rw_lsn = Lsn(workload.endpoint().safe_psql_scalar("select pg_current_wal_flush_lsn()"))
|
||||
|
||||
#
|
||||
# validate that we can branch (important use case)
|
||||
#
|
||||
|
||||
# ... at the tip
|
||||
_ = env.create_branch(
|
||||
child_timeline_id = env.create_branch(
|
||||
new_branch_name="br-tip",
|
||||
ancestor_branch_name=import_branch_name,
|
||||
tenant_id=tenant_id,
|
||||
ancestor_start_lsn=rw_lsn,
|
||||
)
|
||||
br_tip_endpoint = env.endpoints.create_start(
|
||||
branch_name="br-tip", endpoint_id="br-tip-ro", tenant_id=tenant_id, config_lines=ep_config
|
||||
)
|
||||
validate_vanilla_equivalence(br_tip_endpoint)
|
||||
br_tip_endpoint.safe_psql("select * from othertable")
|
||||
child_workload = workload.branch(timeline_id=child_timeline_id, branch_name="br-tip")
|
||||
child_workload.validate()
|
||||
|
||||
validate_vanilla_equivalence(child_workload.endpoint())
|
||||
|
||||
# ... at the initdb lsn
|
||||
_ = env.create_branch(
|
||||
@@ -330,7 +329,7 @@ def test_pgdata_import_smoke(
|
||||
)
|
||||
validate_vanilla_equivalence(br_initdb_endpoint)
|
||||
with pytest.raises(psycopg2.errors.UndefinedTable):
|
||||
br_initdb_endpoint.safe_psql("select * from othertable")
|
||||
br_initdb_endpoint.safe_psql(f"select * from {workload.table}")
|
||||
|
||||
|
||||
@run_only_on_default_postgres(reason="PG version is irrelevant here")
|
||||
|
||||
Reference in New Issue
Block a user