storage_controller: coordinate imports across shards in the storage controller (#11345)

## Problem

Pageservers notify control plane directly when a shard import has
completed.
Control plane has to download the status of each shard from S3 and
figure out if everything is truly done,
before proceeding with branch activation.

Issues with this approach are:
* We can't control shard split behaviour on the storage controller side.
It's unsafe to split
during import.
* Control plane needs to know about shards and implement logic to check
all timelines are indeed ready.

## Summary of changes

In short, storage controller coordinates imports, and, only when
everything is done, notifies control plane.

Big rocks:
1. Store timeline imports in the storage controller database. Each
import stores the status of its shards in the database.
We hook into the timeline creation call as our entry point for this.
2. Pageservers get a new upcall endpoint to notify the storage
controller of shard import updates.
3. Storage controller handles these updates by updating persisted state.
If an update finalizes the import,
then poll pageservers until timeline activation, and, then, notify the
control plane that the import is complete.

Cplane side change with new endpoint is in
https://github.com/neondatabase/cloud/pull/26166

Closes https://github.com/neondatabase/neon/issues/11566
This commit is contained in:
Vlad Lazar
2025-04-24 12:26:06 +01:00
committed by GitHub
parent d43b8e73ae
commit 3a50d95b6d
20 changed files with 833 additions and 274 deletions

View File

@@ -320,6 +320,35 @@ pub struct TimelineCreateRequest {
pub mode: TimelineCreateRequestMode, pub mode: TimelineCreateRequestMode,
} }
impl TimelineCreateRequest {
pub fn mode_tag(&self) -> &'static str {
match &self.mode {
TimelineCreateRequestMode::Branch { .. } => "branch",
TimelineCreateRequestMode::ImportPgdata { .. } => "import",
TimelineCreateRequestMode::Bootstrap { .. } => "bootstrap",
}
}
pub fn is_import(&self) -> bool {
matches!(self.mode, TimelineCreateRequestMode::ImportPgdata { .. })
}
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub enum ShardImportStatus {
InProgress,
Done,
Error(String),
}
impl ShardImportStatus {
pub fn is_terminal(&self) -> bool {
match self {
ShardImportStatus::InProgress => false,
ShardImportStatus::Done | ShardImportStatus::Error(_) => true,
}
}
}
/// Storage controller specific extensions to [`TimelineInfo`]. /// Storage controller specific extensions to [`TimelineInfo`].
#[derive(Serialize, Deserialize, Clone)] #[derive(Serialize, Deserialize, Clone)]
pub struct TimelineCreateResponseStorcon { pub struct TimelineCreateResponseStorcon {

View File

@@ -4,10 +4,10 @@
//! See docs/rfcs/025-generation-numbers.md //! See docs/rfcs/025-generation-numbers.md
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use utils::id::NodeId; use utils::id::{NodeId, TimelineId};
use crate::controller_api::NodeRegisterRequest; use crate::controller_api::NodeRegisterRequest;
use crate::models::LocationConfigMode; use crate::models::{LocationConfigMode, ShardImportStatus};
use crate::shard::TenantShardId; use crate::shard::TenantShardId;
/// Upcall message sent by the pageserver to the configured `control_plane_api` on /// Upcall message sent by the pageserver to the configured `control_plane_api` on
@@ -62,3 +62,10 @@ pub struct ValidateResponseTenant {
pub id: TenantShardId, pub id: TenantShardId,
pub valid: bool, pub valid: bool,
} }
#[derive(Serialize, Deserialize)]
pub struct PutTimelineImportStatusRequest {
pub tenant_shard_id: TenantShardId,
pub timeline_id: TimelineId,
pub status: ShardImportStatus,
}

View File

@@ -419,6 +419,23 @@ impl Client {
} }
} }
pub async fn timeline_detail(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
) -> Result<TimelineInfo> {
let uri = format!(
"{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}",
self.mgmt_api_endpoint
);
self.request(Method::GET, &uri, ())
.await?
.json()
.await
.map_err(Error::ReceiveBody)
}
pub async fn timeline_archival_config( pub async fn timeline_archival_config(
&self, &self,
tenant_shard_id: TenantShardId, tenant_shard_id: TenantShardId,

View File

@@ -3,10 +3,11 @@ use std::collections::HashMap;
use futures::Future; use futures::Future;
use pageserver_api::config::NodeMetadata; use pageserver_api::config::NodeMetadata;
use pageserver_api::controller_api::{AvailabilityZone, NodeRegisterRequest}; use pageserver_api::controller_api::{AvailabilityZone, NodeRegisterRequest};
use pageserver_api::models::ShardImportStatus;
use pageserver_api::shard::TenantShardId; use pageserver_api::shard::TenantShardId;
use pageserver_api::upcall_api::{ use pageserver_api::upcall_api::{
ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest, PutTimelineImportStatusRequest, ReAttachRequest, ReAttachResponse, ReAttachResponseTenant,
ValidateRequestTenant, ValidateResponse, ValidateRequest, ValidateRequestTenant, ValidateResponse,
}; };
use reqwest::Certificate; use reqwest::Certificate;
use serde::Serialize; use serde::Serialize;
@@ -14,7 +15,7 @@ use serde::de::DeserializeOwned;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use url::Url; use url::Url;
use utils::generation::Generation; use utils::generation::Generation;
use utils::id::NodeId; use utils::id::{NodeId, TimelineId};
use utils::{backoff, failpoint_support}; use utils::{backoff, failpoint_support};
use crate::config::PageServerConf; use crate::config::PageServerConf;
@@ -46,6 +47,12 @@ pub trait StorageControllerUpcallApi {
&self, &self,
tenants: Vec<(TenantShardId, Generation)>, tenants: Vec<(TenantShardId, Generation)>,
) -> impl Future<Output = Result<HashMap<TenantShardId, bool>, RetryForeverError>> + Send; ) -> impl Future<Output = Result<HashMap<TenantShardId, bool>, RetryForeverError>> + Send;
fn put_timeline_import_status(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
status: ShardImportStatus,
) -> impl Future<Output = Result<(), RetryForeverError>> + Send;
} }
impl StorageControllerUpcallClient { impl StorageControllerUpcallClient {
@@ -273,4 +280,30 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient {
Ok(result.into_iter().collect()) Ok(result.into_iter().collect())
} }
/// Send a shard import status to the storage controller
///
/// The implementation must have at-least-once delivery semantics.
/// To this end, we retry the request until it succeeds. If the pageserver
/// restarts or crashes, the shard import will start again from the beggining.
#[tracing::instrument(skip_all)] // so that warning logs from retry_http_forever have context
async fn put_timeline_import_status(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
status: ShardImportStatus,
) -> Result<(), RetryForeverError> {
let url = self
.base_url
.join("timeline_import_status")
.expect("Failed to build path");
let request = PutTimelineImportStatusRequest {
tenant_shard_id,
timeline_id,
status,
};
self.retry_http_forever(&url, request).await
}
} }

View File

@@ -787,6 +787,15 @@ mod test {
Ok(result) Ok(result)
} }
async fn put_timeline_import_status(
&self,
_tenant_shard_id: TenantShardId,
_timeline_id: TimelineId,
_status: pageserver_api::models::ShardImportStatus,
) -> Result<(), RetryForeverError> {
unimplemented!()
}
} }
async fn setup(test_name: &str) -> anyhow::Result<TestSetup> { async fn setup(test_name: &str) -> anyhow::Result<TestSetup> {

View File

@@ -1,20 +1,21 @@
use std::sync::Arc; use std::sync::Arc;
use anyhow::{Context, bail}; use anyhow::{Context, bail};
use pageserver_api::models::ShardImportStatus;
use remote_storage::RemotePath; use remote_storage::RemotePath;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{Instrument, info, info_span}; use tracing::info;
use utils::lsn::Lsn; use utils::lsn::Lsn;
use super::Timeline; use super::Timeline;
use crate::context::RequestContext; use crate::context::RequestContext;
use crate::controller_upcall_client::{StorageControllerUpcallApi, StorageControllerUpcallClient};
use crate::tenant::metadata::TimelineMetadata; use crate::tenant::metadata::TimelineMetadata;
mod flow; mod flow;
mod importbucket_client; mod importbucket_client;
mod importbucket_format; mod importbucket_format;
pub(crate) mod index_part_format; pub(crate) mod index_part_format;
pub(crate) mod upcall_api;
pub async fn doit( pub async fn doit(
timeline: &Arc<Timeline>, timeline: &Arc<Timeline>,
@@ -34,23 +35,6 @@ pub async fn doit(
let storage = importbucket_client::new(timeline.conf, &location, cancel.clone()).await?; let storage = importbucket_client::new(timeline.conf, &location, cancel.clone()).await?;
info!("get spec early so we know we'll be able to upcall when done");
let Some(spec) = storage.get_spec().await? else {
bail!("spec not found")
};
let upcall_client =
upcall_api::Client::new(timeline.conf, cancel.clone()).context("create upcall client")?;
//
// send an early progress update to clean up k8s job early and generate potentially useful logs
//
info!("send early progress update");
upcall_client
.send_progress_until_success(&spec)
.instrument(info_span!("early_progress_update"))
.await?;
let status_prefix = RemotePath::from_string("status").unwrap(); let status_prefix = RemotePath::from_string("status").unwrap();
// //
@@ -176,7 +160,21 @@ pub async fn doit(
// //
// Communicate that shard is done. // Communicate that shard is done.
// Ensure at-least-once delivery of the upcall to storage controller
// before we mark the task as done and never come here again.
// //
let storcon_client = StorageControllerUpcallClient::new(timeline.conf, &cancel)?
.expect("storcon configured");
storcon_client
.put_timeline_import_status(
timeline.tenant_shard_id,
timeline.timeline_id,
// TODO(vlad): What about import errors?
ShardImportStatus::Done,
)
.await
.map_err(|_err| anyhow::anyhow!("Shut down while putting timeline import status"))?;
storage storage
.put_json( .put_json(
&shard_status_key, &shard_status_key,
@@ -186,16 +184,6 @@ pub async fn doit(
.context("put shard status")?; .context("put shard status")?;
} }
//
// Ensure at-least-once deliver of the upcall to cplane
// before we mark the task as done and never come here again.
//
info!("send final progress update");
upcall_client
.send_progress_until_success(&spec)
.instrument(info_span!("final_progress_update"))
.await?;
// //
// Mark as done in index_part. // Mark as done in index_part.
// This makes subsequent timeline loads enter the normal load code path // This makes subsequent timeline loads enter the normal load code path

View File

@@ -13,7 +13,7 @@ use tokio_util::sync::CancellationToken;
use tracing::{debug, info, instrument}; use tracing::{debug, info, instrument};
use utils::lsn::Lsn; use utils::lsn::Lsn;
use super::{importbucket_format, index_part_format}; use super::index_part_format;
use crate::assert_u64_eq_usize::U64IsUsize; use crate::assert_u64_eq_usize::U64IsUsize;
use crate::config::PageServerConf; use crate::config::PageServerConf;
@@ -173,12 +173,6 @@ impl RemoteStorageWrapper {
res res
} }
pub async fn get_spec(&self) -> Result<Option<importbucket_format::Spec>, anyhow::Error> {
self.get_json(&RemotePath::from_string("spec.json").unwrap())
.await
.context("get spec")
}
#[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))] #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))]
pub async fn get_json<T: DeserializeOwned>( pub async fn get_json<T: DeserializeOwned>(
&self, &self,

View File

@@ -11,10 +11,3 @@ pub struct ShardStatus {
pub done: bool, pub done: bool,
// TODO: remaining fields // TODO: remaining fields
} }
// TODO: dedupe with fast_import code
#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq)]
pub struct Spec {
pub project_id: String,
pub branch_id: String,
}

View File

@@ -1,124 +0,0 @@
//! FIXME: most of this is copy-paste from mgmt_api.rs ; dedupe into a `reqwest_utils::Client` crate.
use pageserver_client::mgmt_api::{Error, ResponseErrorMessageExt};
use reqwest::{Certificate, Method};
use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken;
use tracing::error;
use super::importbucket_format::Spec;
use crate::config::PageServerConf;
pub struct Client {
base_url: String,
authorization_header: Option<String>,
client: reqwest::Client,
cancel: CancellationToken,
}
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Serialize, Deserialize, Debug)]
struct ImportProgressRequest {
// no fields yet, not sure if there every will be any
}
#[derive(Serialize, Deserialize, Debug)]
struct ImportProgressResponse {
// we don't care
}
impl Client {
pub fn new(conf: &PageServerConf, cancel: CancellationToken) -> anyhow::Result<Self> {
let Some(ref base_url) = conf.import_pgdata_upcall_api else {
anyhow::bail!("import_pgdata_upcall_api is not configured")
};
let mut http_client = reqwest::Client::builder();
for cert in &conf.ssl_ca_certs {
http_client = http_client.add_root_certificate(Certificate::from_der(cert.contents())?);
}
let http_client = http_client.build()?;
Ok(Self {
base_url: base_url.to_string(),
client: http_client,
cancel,
authorization_header: conf
.import_pgdata_upcall_api_token
.as_ref()
.map(|secret_string| secret_string.get_contents())
.map(|jwt| format!("Bearer {jwt}")),
})
}
fn start_request<U: reqwest::IntoUrl>(
&self,
method: Method,
uri: U,
) -> reqwest::RequestBuilder {
let req = self.client.request(method, uri);
if let Some(value) = &self.authorization_header {
req.header(reqwest::header::AUTHORIZATION, value)
} else {
req
}
}
async fn request_noerror<B: serde::Serialize, U: reqwest::IntoUrl>(
&self,
method: Method,
uri: U,
body: B,
) -> Result<reqwest::Response> {
self.start_request(method, uri)
.json(&body)
.send()
.await
.map_err(Error::ReceiveBody)
}
async fn request<B: serde::Serialize, U: reqwest::IntoUrl>(
&self,
method: Method,
uri: U,
body: B,
) -> Result<reqwest::Response> {
let res = self.request_noerror(method, uri, body).await?;
let response = res.error_from_body().await?;
Ok(response)
}
pub async fn send_progress_once(&self, spec: &Spec) -> Result<()> {
let url = format!(
"{}/projects/{}/branches/{}/import_progress",
self.base_url, spec.project_id, spec.branch_id
);
let ImportProgressResponse {} = self
.request(Method::POST, url, &ImportProgressRequest {})
.await?
.json()
.await
.map_err(Error::ReceiveBody)?;
Ok(())
}
pub async fn send_progress_until_success(&self, spec: &Spec) -> anyhow::Result<()> {
loop {
match self.send_progress_once(spec).await {
Ok(()) => return Ok(()),
Err(Error::Cancelled) => return Err(anyhow::anyhow!("cancelled")),
Err(err) => {
error!(?err, "error sending progress, retrying");
if tokio::time::timeout(
std::time::Duration::from_secs(10),
self.cancel.cancelled(),
)
.await
.is_ok()
{
anyhow::bail!("cancelled while sending early progress update");
}
}
}
}
}
}

View File

@@ -0,0 +1 @@
DROP TABLE timeline_imports;

View File

@@ -0,0 +1,6 @@
CREATE TABLE timeline_imports (
tenant_id VARCHAR NOT NULL,
timeline_id VARCHAR NOT NULL,
shard_statuses JSONB NOT NULL,
PRIMARY KEY(tenant_id, timeline_id)
);

View File

@@ -30,7 +30,9 @@ use pageserver_api::models::{
TimelineArchivalConfigRequest, TimelineCreateRequest, TimelineArchivalConfigRequest, TimelineCreateRequest,
}; };
use pageserver_api::shard::TenantShardId; use pageserver_api::shard::TenantShardId;
use pageserver_api::upcall_api::{ReAttachRequest, ValidateRequest}; use pageserver_api::upcall_api::{
PutTimelineImportStatusRequest, ReAttachRequest, ValidateRequest,
};
use pageserver_client::{BlockUnblock, mgmt_api}; use pageserver_client::{BlockUnblock, mgmt_api};
use routerify::Middleware; use routerify::Middleware;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
@@ -154,6 +156,28 @@ async fn handle_validate(req: Request<Body>) -> Result<Response<Body>, ApiError>
json_response(StatusCode::OK, state.service.validate(validate_req).await?) json_response(StatusCode::OK, state.service.validate(validate_req).await?)
} }
async fn handle_put_timeline_import_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::GenerationsApi)?;
let mut req = match maybe_forward(req).await {
ForwardOutcome::Forwarded(res) => {
return res;
}
ForwardOutcome::NotForwarded(req) => req,
};
let put_req = json_request::<PutTimelineImportStatusRequest>(&mut req).await?;
let state = get_state(&req);
json_response(
StatusCode::OK,
state
.service
.handle_timeline_shard_import_progress_upcall(put_req)
.await?,
)
}
/// Call into this before attaching a tenant to a pageserver, to acquire a generation number /// Call into this before attaching a tenant to a pageserver, to acquire a generation number
/// (in the real control plane this is unnecessary, because the same program is managing /// (in the real control plane this is unnecessary, because the same program is managing
/// generation numbers and doing attachments). /// generation numbers and doing attachments).
@@ -1961,6 +1985,13 @@ pub fn make_router(
.post("/upcall/v1/validate", |r| { .post("/upcall/v1/validate", |r| {
named_request_span(r, handle_validate, RequestName("upcall_v1_validate")) named_request_span(r, handle_validate, RequestName("upcall_v1_validate"))
}) })
.post("/upcall/v1/timeline_import_status", |r| {
named_request_span(
r,
handle_put_timeline_import_status,
RequestName("upcall_v1_timeline_import_status"),
)
})
// Test/dev/debug endpoints // Test/dev/debug endpoints
.post("/debug/v1/attach-hook", |r| { .post("/debug/v1/attach-hook", |r| {
named_request_span(r, handle_attach_hook, RequestName("debug_v1_attach_hook")) named_request_span(r, handle_attach_hook, RequestName("debug_v1_attach_hook"))

View File

@@ -23,6 +23,7 @@ mod scheduler;
mod schema; mod schema;
pub mod service; pub mod service;
mod tenant_shard; mod tenant_shard;
mod timeline_import;
#[derive(Ord, PartialOrd, Eq, PartialEq, Copy, Clone, Serialize)] #[derive(Ord, PartialOrd, Eq, PartialEq, Copy, Clone, Serialize)]
struct Sequence(u64); struct Sequence(u64);

View File

@@ -212,6 +212,21 @@ impl PageserverClient {
) )
} }
pub(crate) async fn timeline_detail(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
) -> Result<TimelineInfo> {
measured_request!(
"timeline_detail",
crate::metrics::Method::Get,
&self.node_id_label,
self.inner
.timeline_detail(tenant_shard_id, timeline_id)
.await
)
}
pub(crate) async fn tenant_shard_split( pub(crate) async fn tenant_shard_split(
&self, &self,
tenant_shard_id: TenantShardId, tenant_shard_id: TenantShardId,

View File

@@ -22,7 +22,7 @@ use pageserver_api::controller_api::{
AvailabilityZone, MetadataHealthRecord, NodeSchedulingPolicy, PlacementPolicy, AvailabilityZone, MetadataHealthRecord, NodeSchedulingPolicy, PlacementPolicy,
SafekeeperDescribeResponse, ShardSchedulingPolicy, SkSchedulingPolicy, SafekeeperDescribeResponse, ShardSchedulingPolicy, SkSchedulingPolicy,
}; };
use pageserver_api::models::TenantConfig; use pageserver_api::models::{ShardImportStatus, TenantConfig};
use pageserver_api::shard::{ use pageserver_api::shard::{
ShardConfigError, ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId, ShardConfigError, ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId,
}; };
@@ -40,6 +40,9 @@ use crate::metrics::{
DatabaseQueryErrorLabelGroup, DatabaseQueryLatencyLabelGroup, METRICS_REGISTRY, DatabaseQueryErrorLabelGroup, DatabaseQueryLatencyLabelGroup, METRICS_REGISTRY,
}; };
use crate::node::Node; use crate::node::Node;
use crate::timeline_import::{
TimelineImport, TimelineImportUpdateError, TimelineImportUpdateFollowUp,
};
const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations"); const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations");
/// ## What do we store? /// ## What do we store?
@@ -127,6 +130,9 @@ pub(crate) enum DatabaseOperation {
RemoveTimelineReconcile, RemoveTimelineReconcile,
ListTimelineReconcile, ListTimelineReconcile,
ListTimelineReconcileStartup, ListTimelineReconcileStartup,
InsertTimelineImport,
UpdateTimelineImport,
DeleteTimelineImport,
} }
#[must_use] #[must_use]
@@ -1614,6 +1620,129 @@ impl Persistence {
Ok(()) Ok(())
} }
pub(crate) async fn insert_timeline_import(
&self,
import: TimelineImportPersistence,
) -> DatabaseResult<bool> {
self.with_measured_conn(DatabaseOperation::InsertTimelineImport, move |conn| {
Box::pin({
let import = import.clone();
async move {
let inserted = diesel::insert_into(crate::schema::timeline_imports::table)
.values(import)
.execute(conn)
.await?;
Ok(inserted == 1)
}
})
})
.await
}
pub(crate) async fn delete_timeline_import(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> DatabaseResult<()> {
use crate::schema::timeline_imports::dsl;
self.with_measured_conn(DatabaseOperation::DeleteTimelineImport, move |conn| {
Box::pin(async move {
diesel::delete(crate::schema::timeline_imports::table)
.filter(
dsl::tenant_id
.eq(tenant_id.to_string())
.and(dsl::timeline_id.eq(timeline_id.to_string())),
)
.execute(conn)
.await?;
Ok(())
})
})
.await
}
/// Idempotently update the status of one shard for an ongoing timeline import
///
/// If the update was persisted to the database, then the current state of the
/// import is returned to the caller. In case of logical errors a bespoke
/// [`TimelineImportUpdateError`] instance is returned. Other database errors
/// are covered by the outer [`DatabaseError`].
pub(crate) async fn update_timeline_import(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
shard_status: ShardImportStatus,
) -> DatabaseResult<Result<Option<TimelineImport>, TimelineImportUpdateError>> {
use crate::schema::timeline_imports::dsl;
self.with_measured_conn(DatabaseOperation::UpdateTimelineImport, move |conn| {
Box::pin({
let shard_status = shard_status.clone();
async move {
// Load the current state from the database
let mut from_db: Vec<TimelineImportPersistence> = dsl::timeline_imports
.filter(
dsl::tenant_id
.eq(tenant_shard_id.tenant_id.to_string())
.and(dsl::timeline_id.eq(timeline_id.to_string())),
)
.load(conn)
.await?;
assert!(from_db.len() <= 1);
let mut status = match from_db.pop() {
Some(some) => TimelineImport::from_persistent(some).unwrap(),
None => {
return Ok(Err(TimelineImportUpdateError::ImportNotFound {
tenant_id: tenant_shard_id.tenant_id,
timeline_id,
}));
}
};
// Perform the update in-memory
let follow_up = match status.update(tenant_shard_id.to_index(), shard_status) {
Ok(ok) => ok,
Err(err) => {
return Ok(Err(err));
}
};
let new_persistent = status.to_persistent();
// Write back if required (in the same transaction)
match follow_up {
TimelineImportUpdateFollowUp::Persist => {
let updated = diesel::update(dsl::timeline_imports)
.filter(
dsl::tenant_id
.eq(tenant_shard_id.tenant_id.to_string())
.and(dsl::timeline_id.eq(timeline_id.to_string())),
)
.set(dsl::shard_statuses.eq(new_persistent.shard_statuses))
.execute(conn)
.await?;
if updated != 1 {
return Ok(Err(TimelineImportUpdateError::ImportNotFound {
tenant_id: tenant_shard_id.tenant_id,
timeline_id,
}));
}
Ok(Ok(Some(status)))
}
TimelineImportUpdateFollowUp::None => Ok(Ok(None)),
}
}
})
})
.await
}
} }
pub(crate) fn load_certs() -> anyhow::Result<Arc<rustls::RootCertStore>> { pub(crate) fn load_certs() -> anyhow::Result<Arc<rustls::RootCertStore>> {
@@ -2171,3 +2300,11 @@ impl ToSql<diesel::sql_types::VarChar, Pg> for SafekeeperTimelineOpKind {
.map_err(Into::into) .map_err(Into::into)
} }
} }
#[derive(Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq, Clone)]
#[diesel(table_name = crate::schema::timeline_imports)]
pub(crate) struct TimelineImportPersistence {
pub(crate) tenant_id: String,
pub(crate) timeline_id: String,
pub(crate) shard_statuses: serde_json::Value,
}

View File

@@ -76,6 +76,14 @@ diesel::table! {
} }
} }
diesel::table! {
timeline_imports (tenant_id, timeline_id) {
tenant_id -> Varchar,
timeline_id -> Varchar,
shard_statuses -> Jsonb,
}
}
diesel::table! { diesel::table! {
use diesel::sql_types::*; use diesel::sql_types::*;
use super::sql_types::PgLsn; use super::sql_types::PgLsn;
@@ -99,5 +107,6 @@ diesel::allow_tables_to_appear_in_same_query!(
safekeeper_timeline_pending_ops, safekeeper_timeline_pending_ops,
safekeepers, safekeepers,
tenant_shards, tenant_shards,
timeline_imports,
timelines, timelines,
); );

View File

@@ -40,14 +40,14 @@ use pageserver_api::models::{
TenantLocationConfigResponse, TenantShardLocation, TenantShardSplitRequest, TenantLocationConfigResponse, TenantShardLocation, TenantShardSplitRequest,
TenantShardSplitResponse, TenantSorting, TenantTimeTravelRequest, TenantShardSplitResponse, TenantSorting, TenantTimeTravelRequest,
TimelineArchivalConfigRequest, TimelineCreateRequest, TimelineCreateResponseStorcon, TimelineArchivalConfigRequest, TimelineCreateRequest, TimelineCreateResponseStorcon,
TimelineInfo, TopTenantShardItem, TopTenantShardsRequest, TimelineInfo, TimelineState, TopTenantShardItem, TopTenantShardsRequest,
}; };
use pageserver_api::shard::{ use pageserver_api::shard::{
DEFAULT_STRIPE_SIZE, ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId, DEFAULT_STRIPE_SIZE, ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId,
}; };
use pageserver_api::upcall_api::{ use pageserver_api::upcall_api::{
ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest, ValidateResponse, PutTimelineImportStatusRequest, ReAttachRequest, ReAttachResponse, ReAttachResponseTenant,
ValidateResponseTenant, ValidateRequest, ValidateResponse, ValidateResponseTenant,
}; };
use pageserver_client::{BlockUnblock, mgmt_api}; use pageserver_client::{BlockUnblock, mgmt_api};
use reqwest::{Certificate, StatusCode}; use reqwest::{Certificate, StatusCode};
@@ -97,6 +97,7 @@ use crate::tenant_shard::{
ReconcileNeeded, ReconcileResult, ReconcileWaitError, ReconcilerStatus, ReconcilerWaiter, ReconcileNeeded, ReconcileResult, ReconcileWaitError, ReconcilerStatus, ReconcilerWaiter,
ScheduleOptimization, ScheduleOptimizationAction, TenantShard, ScheduleOptimization, ScheduleOptimizationAction, TenantShard,
}; };
use crate::timeline_import::{ShardImportStatuses, TimelineImport, UpcallClient};
const WAITER_FILL_DRAIN_POLL_TIMEOUT: Duration = Duration::from_millis(500); const WAITER_FILL_DRAIN_POLL_TIMEOUT: Duration = Duration::from_millis(500);
@@ -3732,11 +3733,14 @@ impl Service {
create_req: TimelineCreateRequest, create_req: TimelineCreateRequest,
) -> Result<TimelineCreateResponseStorcon, ApiError> { ) -> Result<TimelineCreateResponseStorcon, ApiError> {
let safekeepers = self.config.timelines_onto_safekeepers; let safekeepers = self.config.timelines_onto_safekeepers;
let timeline_id = create_req.new_timeline_id;
tracing::info!( tracing::info!(
mode=%create_req.mode_tag(),
%safekeepers, %safekeepers,
"Creating timeline {}/{}", "Creating timeline {}/{}",
tenant_id, tenant_id,
create_req.new_timeline_id, timeline_id,
); );
let _tenant_lock = trace_shared_lock( let _tenant_lock = trace_shared_lock(
@@ -3746,15 +3750,62 @@ impl Service {
) )
.await; .await;
failpoint_support::sleep_millis_async!("tenant-create-timeline-shared-lock"); failpoint_support::sleep_millis_async!("tenant-create-timeline-shared-lock");
let create_mode = create_req.mode.clone(); let is_import = create_req.is_import();
let timeline_info = self let timeline_info = self
.tenant_timeline_create_pageservers(tenant_id, create_req) .tenant_timeline_create_pageservers(tenant_id, create_req)
.await?; .await?;
let safekeepers = if safekeepers { let selected_safekeepers = if is_import {
let shards = {
let locked = self.inner.read().unwrap();
locked
.tenants
.range(TenantShardId::tenant_range(tenant_id))
.map(|(ts_id, _)| ts_id.to_index())
.collect::<Vec<_>>()
};
if !shards
.iter()
.map(|shard_index| shard_index.shard_count)
.all_equal()
{
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"Inconsistent shard count"
)));
}
let import = TimelineImport {
tenant_id,
timeline_id,
shard_statuses: ShardImportStatuses::new(shards),
};
let inserted = self
.persistence
.insert_timeline_import(import.to_persistent())
.await
.context("timeline import insert")
.map_err(ApiError::InternalServerError)?;
match inserted {
true => {
tracing::info!(%tenant_id, %timeline_id, "Inserted timeline import");
}
false => {
tracing::info!(%tenant_id, %timeline_id, "Timeline import entry already present");
}
}
None
} else if safekeepers {
// Note that we do not support creating the timeline on the safekeepers
// for imported timelines. The `start_lsn` of the timeline is not known
// until the import finshes.
// https://github.com/neondatabase/neon/issues/11569
let res = self let res = self
.tenant_timeline_create_safekeepers(tenant_id, &timeline_info, create_mode) .tenant_timeline_create_safekeepers(tenant_id, &timeline_info)
.instrument(tracing::info_span!("timeline_create_safekeepers", %tenant_id, timeline_id=%timeline_info.timeline_id)) .instrument(tracing::info_span!("timeline_create_safekeepers", %tenant_id, timeline_id=%timeline_info.timeline_id))
.await?; .await?;
Some(res) Some(res)
@@ -3764,10 +3815,168 @@ impl Service {
Ok(TimelineCreateResponseStorcon { Ok(TimelineCreateResponseStorcon {
timeline_info, timeline_info,
safekeepers, safekeepers: selected_safekeepers,
}) })
} }
pub(crate) async fn handle_timeline_shard_import_progress_upcall(
self: &Arc<Self>,
req: PutTimelineImportStatusRequest,
) -> Result<(), ApiError> {
let res = self
.persistence
.update_timeline_import(req.tenant_shard_id, req.timeline_id, req.status)
.await;
let timeline_import = match res {
Ok(Ok(Some(timeline_import))) => timeline_import,
Ok(Ok(None)) => {
// Idempotency: we've already seen and handled this update.
return Ok(());
}
Ok(Err(logical_err)) => {
return Err(logical_err.into());
}
Err(db_err) => {
return Err(db_err.into());
}
};
tracing::info!(
tenant_id=%req.tenant_shard_id.tenant_id,
timeline_id=%req.timeline_id,
shard_id=%req.tenant_shard_id.shard_slug(),
"Updated timeline import status to: {timeline_import:?}");
if timeline_import.is_complete() {
tokio::task::spawn({
let this = self.clone();
async move { this.finalize_timeline_import(timeline_import).await }
});
}
Ok(())
}
#[instrument(skip_all, fields(
tenant_id=%import.tenant_id,
shard_id=%import.timeline_id,
))]
async fn finalize_timeline_import(
self: &Arc<Self>,
import: TimelineImport,
) -> anyhow::Result<()> {
// TODO(vlad): On start-up, load up the imports and notify cplane of the
// ones that have been completed. This assumes the new cplane API will
// be idempotent. If that's not possible, bang a flag in the database.
// https://github.com/neondatabase/neon/issues/11570
tracing::info!("Finalizing timeline import");
let import_failed = import.completion_error().is_some();
if !import_failed {
loop {
if self.cancel.is_cancelled() {
anyhow::bail!("Shut down requested while finalizing import");
}
let active = self.timeline_active_on_all_shards(&import).await?;
match active {
true => {
tracing::info!("Timeline became active on all shards");
break;
}
false => {
tracing::info!("Timeline not active on all shards yet");
tokio::select! {
_ = self.cancel.cancelled() => {
anyhow::bail!("Shut down requested while finalizing import");
},
_ = tokio::time::sleep(Duration::from_secs(5)) => {}
};
}
}
}
}
tracing::info!(%import_failed, "Notifying cplane of import completion");
let client = UpcallClient::new(self.get_config(), self.cancel.child_token());
client.notify_import_complete(&import).await?;
if let Err(err) = self
.persistence
.delete_timeline_import(import.tenant_id, import.timeline_id)
.await
{
tracing::warn!("Failed to delete timeline import entry from database: {err}");
}
// TODO(vlad): Timeline creations in import mode do not return a correct initdb lsn,
// so we can't create the timeline on the safekeepers. Fix by moving creation here.
// https://github.com/neondatabase/neon/issues/11569
tracing::info!(%import_failed, "Timeline import complete");
Ok(())
}
async fn timeline_active_on_all_shards(
self: &Arc<Self>,
import: &TimelineImport,
) -> anyhow::Result<bool> {
let targets = {
let locked = self.inner.read().unwrap();
let mut targets = Vec::new();
for (tenant_shard_id, shard) in locked
.tenants
.range(TenantShardId::tenant_range(import.tenant_id))
{
if !import
.shard_statuses
.0
.contains_key(&tenant_shard_id.to_index())
{
anyhow::bail!("Shard layout change detected on completion");
}
if let Some(node_id) = shard.intent.get_attached() {
let node = locked
.nodes
.get(node_id)
.expect("Pageservers may not be deleted while referenced");
targets.push((*tenant_shard_id, node.clone()));
} else {
return Ok(false);
}
}
targets
};
let results = self
.tenant_for_shards_api(
targets,
|tenant_shard_id, client| async move {
client
.timeline_detail(tenant_shard_id, import.timeline_id)
.await
},
1,
1,
SHORT_RECONCILE_TIMEOUT,
&self.cancel,
)
.await;
Ok(results.into_iter().all(|res| match res {
Ok(info) => info.state == TimelineState::Active,
Err(_) => false,
}))
}
pub(crate) async fn tenant_timeline_archival_config( pub(crate) async fn tenant_timeline_archival_config(
&self, &self,
tenant_id: TenantId, tenant_id: TenantId,

View File

@@ -15,7 +15,7 @@ use http_utils::error::ApiError;
use pageserver_api::controller_api::{ use pageserver_api::controller_api::{
SafekeeperDescribeResponse, SkSchedulingPolicy, TimelineImportRequest, SafekeeperDescribeResponse, SkSchedulingPolicy, TimelineImportRequest,
}; };
use pageserver_api::models::{self, SafekeeperInfo, SafekeepersInfo, TimelineInfo}; use pageserver_api::models::{SafekeeperInfo, SafekeepersInfo, TimelineInfo};
use safekeeper_api::membership::{MemberSet, SafekeeperId}; use safekeeper_api::membership::{MemberSet, SafekeeperId};
use tokio::task::JoinSet; use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
@@ -207,7 +207,6 @@ impl Service {
self: &Arc<Self>, self: &Arc<Self>,
tenant_id: TenantId, tenant_id: TenantId,
timeline_info: &TimelineInfo, timeline_info: &TimelineInfo,
create_mode: models::TimelineCreateRequestMode,
) -> Result<SafekeepersInfo, ApiError> { ) -> Result<SafekeepersInfo, ApiError> {
let timeline_id = timeline_info.timeline_id; let timeline_id = timeline_info.timeline_id;
let pg_version = timeline_info.pg_version * 10000; let pg_version = timeline_info.pg_version * 10000;
@@ -217,15 +216,8 @@ impl Service {
// previously existed as on retries in theory endpoint might have // previously existed as on retries in theory endpoint might have
// already written some data and advanced last_record_lsn, while we want // already written some data and advanced last_record_lsn, while we want
// safekeepers to have consistent start_lsn. // safekeepers to have consistent start_lsn.
let start_lsn = match create_mode { let start_lsn = timeline_info.last_record_lsn;
models::TimelineCreateRequestMode::Bootstrap { .. } => timeline_info.last_record_lsn,
models::TimelineCreateRequestMode::Branch { .. } => timeline_info.last_record_lsn,
models::TimelineCreateRequestMode::ImportPgdata { .. } => {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"import pgdata doesn't specify the start lsn, aborting creation on safekeepers"
)))?;
}
};
// Choose initial set of safekeepers respecting affinity // Choose initial set of safekeepers respecting affinity
let sks = self.safekeepers_for_new_timeline().await?; let sks = self.safekeepers_for_new_timeline().await?;
let sks_persistence = sks.iter().map(|sk| sk.id.0 as i64).collect::<Vec<_>>(); let sks_persistence = sks.iter().map(|sk| sk.id.0 as i64).collect::<Vec<_>>();

View File

@@ -0,0 +1,260 @@
use std::time::Duration;
use std::{collections::HashMap, str::FromStr};
use http_utils::error::ApiError;
use reqwest::Method;
use serde::{Deserialize, Serialize};
use pageserver_api::models::ShardImportStatus;
use tokio_util::sync::CancellationToken;
use utils::{
id::{TenantId, TimelineId},
shard::ShardIndex,
};
use crate::{persistence::TimelineImportPersistence, service::Config};
#[derive(Serialize, Deserialize, Clone, Debug)]
pub(crate) struct ShardImportStatuses(pub(crate) HashMap<ShardIndex, ShardImportStatus>);
impl ShardImportStatuses {
pub(crate) fn new(shards: Vec<ShardIndex>) -> Self {
ShardImportStatuses(
shards
.into_iter()
.map(|ts_id| (ts_id, ShardImportStatus::InProgress))
.collect(),
)
}
}
#[derive(Debug)]
pub(crate) struct TimelineImport {
pub(crate) tenant_id: TenantId,
pub(crate) timeline_id: TimelineId,
pub(crate) shard_statuses: ShardImportStatuses,
}
pub(crate) enum TimelineImportUpdateFollowUp {
Persist,
None,
}
pub(crate) enum TimelineImportUpdateError {
ImportNotFound {
tenant_id: TenantId,
timeline_id: TimelineId,
},
MismatchedShards,
UnexpectedUpdate,
}
impl From<TimelineImportUpdateError> for ApiError {
fn from(err: TimelineImportUpdateError) -> ApiError {
match err {
TimelineImportUpdateError::ImportNotFound {
tenant_id,
timeline_id,
} => ApiError::NotFound(
anyhow::anyhow!("Import for {tenant_id}/{timeline_id} not found").into(),
),
TimelineImportUpdateError::MismatchedShards => {
ApiError::InternalServerError(anyhow::anyhow!(
"Import shards do not match update request, likely a shard split happened during import, this is a bug"
))
}
TimelineImportUpdateError::UnexpectedUpdate => {
ApiError::InternalServerError(anyhow::anyhow!("Update request is unexpected"))
}
}
}
}
impl TimelineImport {
pub(crate) fn from_persistent(persistent: TimelineImportPersistence) -> anyhow::Result<Self> {
let tenant_id = TenantId::from_str(persistent.tenant_id.as_str())?;
let timeline_id = TimelineId::from_str(persistent.timeline_id.as_str())?;
let shard_statuses = serde_json::from_value(persistent.shard_statuses)?;
Ok(TimelineImport {
tenant_id,
timeline_id,
shard_statuses,
})
}
pub(crate) fn to_persistent(&self) -> TimelineImportPersistence {
TimelineImportPersistence {
tenant_id: self.tenant_id.to_string(),
timeline_id: self.timeline_id.to_string(),
shard_statuses: serde_json::to_value(self.shard_statuses.clone()).unwrap(),
}
}
pub(crate) fn update(
&mut self,
shard: ShardIndex,
status: ShardImportStatus,
) -> Result<TimelineImportUpdateFollowUp, TimelineImportUpdateError> {
use std::collections::hash_map::Entry::*;
match self.shard_statuses.0.entry(shard) {
Occupied(mut occ) => {
let crnt = occ.get_mut();
if *crnt == status {
Ok(TimelineImportUpdateFollowUp::None)
} else if crnt.is_terminal() && !status.is_terminal() {
Err(TimelineImportUpdateError::UnexpectedUpdate)
} else {
*crnt = status;
Ok(TimelineImportUpdateFollowUp::Persist)
}
}
Vacant(_) => Err(TimelineImportUpdateError::MismatchedShards),
}
}
pub(crate) fn is_complete(&self) -> bool {
self.shard_statuses
.0
.values()
.all(|status| status.is_terminal())
}
pub(crate) fn completion_error(&self) -> Option<String> {
assert!(self.is_complete());
let shard_errors: HashMap<_, _> = self
.shard_statuses
.0
.iter()
.filter_map(|(shard, status)| {
if let ShardImportStatus::Error(err) = status {
Some((*shard, err.clone()))
} else {
None
}
})
.collect();
if shard_errors.is_empty() {
None
} else {
Some(serde_json::to_string(&shard_errors).unwrap())
}
}
}
pub(crate) struct UpcallClient {
authorization_header: Option<String>,
client: reqwest::Client,
cancel: CancellationToken,
base_url: String,
}
const IMPORT_COMPLETE_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
#[derive(Serialize, Deserialize, Debug)]
struct ImportCompleteRequest {
tenant_id: TenantId,
timeline_id: TimelineId,
error: Option<String>,
}
impl UpcallClient {
pub(crate) fn new(config: &Config, cancel: CancellationToken) -> Self {
let authorization_header = config
.control_plane_jwt_token
.clone()
.map(|jwt| format!("Bearer {}", jwt));
let client = reqwest::ClientBuilder::new()
.timeout(IMPORT_COMPLETE_REQUEST_TIMEOUT)
.build()
.expect("Failed to construct HTTP client");
let base_url = config
.control_plane_url
.clone()
.expect("must be configured");
Self {
authorization_header,
client,
cancel,
base_url,
}
}
/// Notify control plane of a completed import
///
/// This method guarantees at least once delivery semantics assuming
/// eventual cplane availability. The cplane API is idempotent.
pub(crate) async fn notify_import_complete(
&self,
import: &TimelineImport,
) -> anyhow::Result<()> {
let endpoint = if self.base_url.ends_with('/') {
format!("{}import_complete", self.base_url)
} else {
format!("{}/import_complete", self.base_url)
};
tracing::info!("Endpoint is {endpoint}");
let request = self
.client
.request(Method::PUT, endpoint)
.json(&ImportCompleteRequest {
tenant_id: import.tenant_id,
timeline_id: import.timeline_id,
error: import.completion_error(),
})
.timeout(IMPORT_COMPLETE_REQUEST_TIMEOUT);
let request = if let Some(auth) = &self.authorization_header {
request.header(reqwest::header::AUTHORIZATION, auth)
} else {
request
};
const RETRY_DELAY: Duration = Duration::from_secs(1);
let mut attempt = 1;
loop {
if self.cancel.is_cancelled() {
return Err(anyhow::anyhow!(
"Shutting down while notifying cplane of import completion"
));
}
match request.try_clone().unwrap().send().await {
Ok(response) if response.status().is_success() => {
return Ok(());
}
Ok(response) => {
tracing::warn!(
"Import complete notification failed with status {}, attempt {}",
response.status(),
attempt
);
}
Err(e) => {
tracing::warn!(
"Import complete notification failed with error: {}, attempt {}",
e,
attempt
);
}
}
tokio::select! {
_ = tokio::time::sleep(RETRY_DELAY) => {}
_ = self.cancel.cancelled() => {
return Err(anyhow::anyhow!("Shutting down while notifying cplane of import completion"));
}
}
attempt += 1;
}
}
}

View File

@@ -1,9 +1,9 @@
import base64 import base64
import json import json
import re
import time import time
from enum import Enum from enum import Enum
from pathlib import Path from pathlib import Path
from threading import Event
import psycopg2 import psycopg2
import psycopg2.errors import psycopg2.errors
@@ -14,12 +14,11 @@ from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, PgProtocol, VanillaPostgres from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, PgProtocol, VanillaPostgres
from fixtures.pageserver.http import ( from fixtures.pageserver.http import (
ImportPgdataIdemptencyKey, ImportPgdataIdemptencyKey,
PageserverApiException,
) )
from fixtures.pg_version import PgVersion from fixtures.pg_version import PgVersion
from fixtures.port_distributor import PortDistributor from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import MockS3Server, RemoteStorageKind from fixtures.remote_storage import MockS3Server, RemoteStorageKind
from fixtures.utils import shared_buffers_for_max_cu from fixtures.utils import shared_buffers_for_max_cu, skip_in_debug_build, wait_until
from mypy_boto3_kms import KMSClient from mypy_boto3_kms import KMSClient
from mypy_boto3_kms.type_defs import EncryptResponseTypeDef from mypy_boto3_kms.type_defs import EncryptResponseTypeDef
from mypy_boto3_s3 import S3Client from mypy_boto3_s3 import S3Client
@@ -44,6 +43,7 @@ smoke_params = [
] ]
@skip_in_debug_build("MULTIPLE_RELATION_SEGMENTS has non trivial amount of data")
@pytest.mark.parametrize("shard_count,stripe_size,rel_block_size", smoke_params) @pytest.mark.parametrize("shard_count,stripe_size,rel_block_size", smoke_params)
def test_pgdata_import_smoke( def test_pgdata_import_smoke(
vanilla_pg: VanillaPostgres, vanilla_pg: VanillaPostgres,
@@ -56,24 +56,29 @@ def test_pgdata_import_smoke(
# #
# Setup fake control plane for import progress # Setup fake control plane for import progress
# #
import_completion_signaled = Event()
def handler(request: Request) -> Response: def handler(request: Request) -> Response:
log.info(f"control plane request: {request.json}") log.info(f"control plane /import_complete request: {request.json}")
import_completion_signaled.set()
return Response(json.dumps({}), status=200) return Response(json.dumps({}), status=200)
cplane_mgmt_api_server = make_httpserver cplane_mgmt_api_server = make_httpserver
cplane_mgmt_api_server.expect_request(re.compile(".*")).respond_with_handler(handler) cplane_mgmt_api_server.expect_request(
"/storage/api/v1/import_complete", method="PUT"
).respond_with_handler(handler)
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS) neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
neon_env_builder.control_plane_hooks_api = (
f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/storage/api/v1/"
)
env = neon_env_builder.init_start() env = neon_env_builder.init_start()
# The test needs LocalFs support, which is only built in testing mode. # The test needs LocalFs support, which is only built in testing mode.
env.pageserver.is_testing_enabled_or_skip() env.pageserver.is_testing_enabled_or_skip()
env.pageserver.patch_config_toml_nonrecursive(
{
"import_pgdata_upcall_api": f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/path/to/mgmt/api"
}
)
env.pageserver.stop() env.pageserver.stop()
env.pageserver.start() env.pageserver.start()
@@ -193,40 +198,11 @@ def test_pgdata_import_smoke(
) )
env.neon_cli.mappings_map_branch(import_branch_name, tenant_id, timeline_id) env.neon_cli.mappings_map_branch(import_branch_name, tenant_id, timeline_id)
while True: def cplane_notified():
locations = env.storage_controller.locate(tenant_id) assert import_completion_signaled.is_set()
active_count = 0
for location in locations:
shard_id = TenantShardId.parse(location["shard_id"])
ps = env.get_pageserver(location["node_id"])
try:
detail = ps.http_client().timeline_detail(shard_id, timeline_id)
state = detail["state"]
log.info(f"shard {shard_id} state: {state}")
if state == "Active":
active_count += 1
except PageserverApiException as e:
if e.status_code == 404:
log.info("not found, import is in progress")
continue
elif e.status_code == 429:
log.info("import is in progress")
continue
else:
raise
shard_status_file = statusdir / f"shard-{shard_id.shard_index}" # Generous timeout for the MULTIPLE_RELATION_SEGMENTS test variants
if state == "Active": wait_until(cplane_notified, timeout=90)
shard_status_file_contents = (
shard_status_file.read_text()
) # Active state implies import is done
shard_status = json.loads(shard_status_file_contents)
assert shard_status["done"] is True
if active_count == len(locations):
log.info("all shards are active")
break
time.sleep(1)
import_duration = time.monotonic() - start import_duration = time.monotonic() - start
log.info(f"import complete; duration={import_duration:.2f}s") log.info(f"import complete; duration={import_duration:.2f}s")
@@ -372,19 +348,27 @@ def test_fast_import_with_pageserver_ingest(
vanilla_pg.safe_psql("CREATE TABLE foo (a int); INSERT INTO foo SELECT generate_series(1, 10);") vanilla_pg.safe_psql("CREATE TABLE foo (a int); INSERT INTO foo SELECT generate_series(1, 10);")
# Setup pageserver and fake cplane for import progress # Setup pageserver and fake cplane for import progress
import_completion_signaled = Event()
def handler(request: Request) -> Response: def handler(request: Request) -> Response:
log.info(f"control plane request: {request.json}") log.info(f"control plane /import_complete request: {request.json}")
import_completion_signaled.set()
return Response(json.dumps({}), status=200) return Response(json.dumps({}), status=200)
cplane_mgmt_api_server = make_httpserver cplane_mgmt_api_server = make_httpserver
cplane_mgmt_api_server.expect_request(re.compile(".*")).respond_with_handler(handler) cplane_mgmt_api_server.expect_request(
"/storage/api/v1/import_complete", method="PUT"
).respond_with_handler(handler)
neon_env_builder.control_plane_hooks_api = (
f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/storage/api/v1/"
)
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.MOCK_S3) neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.MOCK_S3)
env = neon_env_builder.init_start() env = neon_env_builder.init_start()
env.pageserver.patch_config_toml_nonrecursive( env.pageserver.patch_config_toml_nonrecursive(
{ {
"import_pgdata_upcall_api": f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/path/to/mgmt/api",
# because import_pgdata code uses this endpoint, not the one in common remote storage config # because import_pgdata code uses this endpoint, not the one in common remote storage config
# TODO: maybe use common remote_storage config in pageserver? # TODO: maybe use common remote_storage config in pageserver?
"import_pgdata_aws_endpoint_url": env.s3_mock_server.endpoint(), "import_pgdata_aws_endpoint_url": env.s3_mock_server.endpoint(),
@@ -476,42 +460,10 @@ def test_fast_import_with_pageserver_ingest(
conn = PgProtocol(dsn=f"postgresql://cloud_admin@localhost:{pg_port}/neondb") conn = PgProtocol(dsn=f"postgresql://cloud_admin@localhost:{pg_port}/neondb")
validate_vanilla_equivalence(conn) validate_vanilla_equivalence(conn)
# Poll pageserver statuses in s3 def cplane_notified():
while True: assert import_completion_signaled.is_set()
locations = env.storage_controller.locate(tenant_id)
active_count = 0
for location in locations:
shard_id = TenantShardId.parse(location["shard_id"])
ps = env.get_pageserver(location["node_id"])
try:
detail = ps.http_client().timeline_detail(shard_id, timeline_id)
log.info(f"timeline {tenant_id}/{timeline_id} detail: {detail}")
state = detail["state"]
log.info(f"shard {shard_id} state: {state}")
if state == "Active":
active_count += 1
except PageserverApiException as e:
if e.status_code == 404:
log.info("not found, import is in progress")
continue
elif e.status_code == 429:
log.info("import is in progress")
continue
else:
raise
if state == "Active": wait_until(cplane_notified, timeout=60)
key = f"{key_prefix}/status/shard-{shard_id.shard_index}"
shard_status_file_contents = (
mock_s3_client.get_object(Bucket=bucket, Key=key)["Body"].read().decode("utf-8")
)
shard_status = json.loads(shard_status_file_contents)
assert shard_status["done"] is True
if active_count == len(locations):
log.info("all shards are active")
break
time.sleep(0.5)
import_duration = time.monotonic() - start import_duration = time.monotonic() - start
log.info(f"import complete; duration={import_duration:.2f}s") log.info(f"import complete; duration={import_duration:.2f}s")