diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index e367db614f..120c508f7a 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -320,6 +320,35 @@ pub struct TimelineCreateRequest { pub mode: TimelineCreateRequestMode, } +impl TimelineCreateRequest { + pub fn mode_tag(&self) -> &'static str { + match &self.mode { + TimelineCreateRequestMode::Branch { .. } => "branch", + TimelineCreateRequestMode::ImportPgdata { .. } => "import", + TimelineCreateRequestMode::Bootstrap { .. } => "bootstrap", + } + } + + pub fn is_import(&self) -> bool { + matches!(self.mode, TimelineCreateRequestMode::ImportPgdata { .. }) + } +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +pub enum ShardImportStatus { + InProgress, + Done, + Error(String), +} +impl ShardImportStatus { + pub fn is_terminal(&self) -> bool { + match self { + ShardImportStatus::InProgress => false, + ShardImportStatus::Done | ShardImportStatus::Error(_) => true, + } + } +} + /// Storage controller specific extensions to [`TimelineInfo`]. #[derive(Serialize, Deserialize, Clone)] pub struct TimelineCreateResponseStorcon { diff --git a/libs/pageserver_api/src/upcall_api.rs b/libs/pageserver_api/src/upcall_api.rs index 285ba06056..7ee63f9036 100644 --- a/libs/pageserver_api/src/upcall_api.rs +++ b/libs/pageserver_api/src/upcall_api.rs @@ -4,10 +4,10 @@ //! See docs/rfcs/025-generation-numbers.md use serde::{Deserialize, Serialize}; -use utils::id::NodeId; +use utils::id::{NodeId, TimelineId}; use crate::controller_api::NodeRegisterRequest; -use crate::models::LocationConfigMode; +use crate::models::{LocationConfigMode, ShardImportStatus}; use crate::shard::TenantShardId; /// Upcall message sent by the pageserver to the configured `control_plane_api` on @@ -62,3 +62,10 @@ pub struct ValidateResponseTenant { pub id: TenantShardId, pub valid: bool, } + +#[derive(Serialize, Deserialize)] +pub struct PutTimelineImportStatusRequest { + pub tenant_shard_id: TenantShardId, + pub timeline_id: TimelineId, + pub status: ShardImportStatus, +} diff --git a/pageserver/client/src/mgmt_api.rs b/pageserver/client/src/mgmt_api.rs index e0cd19817d..4a87a91910 100644 --- a/pageserver/client/src/mgmt_api.rs +++ b/pageserver/client/src/mgmt_api.rs @@ -419,6 +419,23 @@ impl Client { } } + pub async fn timeline_detail( + &self, + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + ) -> Result { + let uri = format!( + "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}", + self.mgmt_api_endpoint + ); + + self.request(Method::GET, &uri, ()) + .await? + .json() + .await + .map_err(Error::ReceiveBody) + } + pub async fn timeline_archival_config( &self, tenant_shard_id: TenantShardId, diff --git a/pageserver/src/controller_upcall_client.rs b/pageserver/src/controller_upcall_client.rs index ed52823c20..59c94f1549 100644 --- a/pageserver/src/controller_upcall_client.rs +++ b/pageserver/src/controller_upcall_client.rs @@ -3,10 +3,11 @@ use std::collections::HashMap; use futures::Future; use pageserver_api::config::NodeMetadata; use pageserver_api::controller_api::{AvailabilityZone, NodeRegisterRequest}; +use pageserver_api::models::ShardImportStatus; use pageserver_api::shard::TenantShardId; use pageserver_api::upcall_api::{ - ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest, - ValidateRequestTenant, ValidateResponse, + PutTimelineImportStatusRequest, ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, + ValidateRequest, ValidateRequestTenant, ValidateResponse, }; use reqwest::Certificate; use serde::Serialize; @@ -14,7 +15,7 @@ use serde::de::DeserializeOwned; use tokio_util::sync::CancellationToken; use url::Url; use utils::generation::Generation; -use utils::id::NodeId; +use utils::id::{NodeId, TimelineId}; use utils::{backoff, failpoint_support}; use crate::config::PageServerConf; @@ -46,6 +47,12 @@ pub trait StorageControllerUpcallApi { &self, tenants: Vec<(TenantShardId, Generation)>, ) -> impl Future, RetryForeverError>> + Send; + fn put_timeline_import_status( + &self, + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + status: ShardImportStatus, + ) -> impl Future> + Send; } impl StorageControllerUpcallClient { @@ -273,4 +280,30 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient { Ok(result.into_iter().collect()) } + + /// Send a shard import status to the storage controller + /// + /// The implementation must have at-least-once delivery semantics. + /// To this end, we retry the request until it succeeds. If the pageserver + /// restarts or crashes, the shard import will start again from the beggining. + #[tracing::instrument(skip_all)] // so that warning logs from retry_http_forever have context + async fn put_timeline_import_status( + &self, + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + status: ShardImportStatus, + ) -> Result<(), RetryForeverError> { + let url = self + .base_url + .join("timeline_import_status") + .expect("Failed to build path"); + + let request = PutTimelineImportStatusRequest { + tenant_shard_id, + timeline_id, + status, + }; + + self.retry_http_forever(&url, request).await + } } diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index d9c1c07b10..6dd7d741c1 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -787,6 +787,15 @@ mod test { Ok(result) } + + async fn put_timeline_import_status( + &self, + _tenant_shard_id: TenantShardId, + _timeline_id: TimelineId, + _status: pageserver_api::models::ShardImportStatus, + ) -> Result<(), RetryForeverError> { + unimplemented!() + } } async fn setup(test_name: &str) -> anyhow::Result { diff --git a/pageserver/src/tenant/timeline/import_pgdata.rs b/pageserver/src/tenant/timeline/import_pgdata.rs index 8b94a114d6..b917fdbfd8 100644 --- a/pageserver/src/tenant/timeline/import_pgdata.rs +++ b/pageserver/src/tenant/timeline/import_pgdata.rs @@ -1,20 +1,21 @@ use std::sync::Arc; use anyhow::{Context, bail}; +use pageserver_api::models::ShardImportStatus; use remote_storage::RemotePath; use tokio_util::sync::CancellationToken; -use tracing::{Instrument, info, info_span}; +use tracing::info; use utils::lsn::Lsn; use super::Timeline; use crate::context::RequestContext; +use crate::controller_upcall_client::{StorageControllerUpcallApi, StorageControllerUpcallClient}; use crate::tenant::metadata::TimelineMetadata; mod flow; mod importbucket_client; mod importbucket_format; pub(crate) mod index_part_format; -pub(crate) mod upcall_api; pub async fn doit( timeline: &Arc, @@ -34,23 +35,6 @@ pub async fn doit( let storage = importbucket_client::new(timeline.conf, &location, cancel.clone()).await?; - info!("get spec early so we know we'll be able to upcall when done"); - let Some(spec) = storage.get_spec().await? else { - bail!("spec not found") - }; - - let upcall_client = - upcall_api::Client::new(timeline.conf, cancel.clone()).context("create upcall client")?; - - // - // send an early progress update to clean up k8s job early and generate potentially useful logs - // - info!("send early progress update"); - upcall_client - .send_progress_until_success(&spec) - .instrument(info_span!("early_progress_update")) - .await?; - let status_prefix = RemotePath::from_string("status").unwrap(); // @@ -176,7 +160,21 @@ pub async fn doit( // // Communicate that shard is done. + // Ensure at-least-once delivery of the upcall to storage controller + // before we mark the task as done and never come here again. // + let storcon_client = StorageControllerUpcallClient::new(timeline.conf, &cancel)? + .expect("storcon configured"); + storcon_client + .put_timeline_import_status( + timeline.tenant_shard_id, + timeline.timeline_id, + // TODO(vlad): What about import errors? + ShardImportStatus::Done, + ) + .await + .map_err(|_err| anyhow::anyhow!("Shut down while putting timeline import status"))?; + storage .put_json( &shard_status_key, @@ -186,16 +184,6 @@ pub async fn doit( .context("put shard status")?; } - // - // Ensure at-least-once deliver of the upcall to cplane - // before we mark the task as done and never come here again. - // - info!("send final progress update"); - upcall_client - .send_progress_until_success(&spec) - .instrument(info_span!("final_progress_update")) - .await?; - // // Mark as done in index_part. // This makes subsequent timeline loads enter the normal load code path diff --git a/pageserver/src/tenant/timeline/import_pgdata/importbucket_client.rs b/pageserver/src/tenant/timeline/import_pgdata/importbucket_client.rs index a17a10d56b..2929f30dce 100644 --- a/pageserver/src/tenant/timeline/import_pgdata/importbucket_client.rs +++ b/pageserver/src/tenant/timeline/import_pgdata/importbucket_client.rs @@ -13,7 +13,7 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, info, instrument}; use utils::lsn::Lsn; -use super::{importbucket_format, index_part_format}; +use super::index_part_format; use crate::assert_u64_eq_usize::U64IsUsize; use crate::config::PageServerConf; @@ -173,12 +173,6 @@ impl RemoteStorageWrapper { res } - pub async fn get_spec(&self) -> Result, anyhow::Error> { - self.get_json(&RemotePath::from_string("spec.json").unwrap()) - .await - .context("get spec") - } - #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))] pub async fn get_json( &self, diff --git a/pageserver/src/tenant/timeline/import_pgdata/importbucket_format.rs b/pageserver/src/tenant/timeline/import_pgdata/importbucket_format.rs index 04ba3c6f1f..57c647cc7f 100644 --- a/pageserver/src/tenant/timeline/import_pgdata/importbucket_format.rs +++ b/pageserver/src/tenant/timeline/import_pgdata/importbucket_format.rs @@ -11,10 +11,3 @@ pub struct ShardStatus { pub done: bool, // TODO: remaining fields } - -// TODO: dedupe with fast_import code -#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq)] -pub struct Spec { - pub project_id: String, - pub branch_id: String, -} diff --git a/pageserver/src/tenant/timeline/import_pgdata/upcall_api.rs b/pageserver/src/tenant/timeline/import_pgdata/upcall_api.rs deleted file mode 100644 index 99081a65e0..0000000000 --- a/pageserver/src/tenant/timeline/import_pgdata/upcall_api.rs +++ /dev/null @@ -1,124 +0,0 @@ -//! FIXME: most of this is copy-paste from mgmt_api.rs ; dedupe into a `reqwest_utils::Client` crate. -use pageserver_client::mgmt_api::{Error, ResponseErrorMessageExt}; -use reqwest::{Certificate, Method}; -use serde::{Deserialize, Serialize}; -use tokio_util::sync::CancellationToken; -use tracing::error; - -use super::importbucket_format::Spec; -use crate::config::PageServerConf; - -pub struct Client { - base_url: String, - authorization_header: Option, - client: reqwest::Client, - cancel: CancellationToken, -} - -pub type Result = std::result::Result; - -#[derive(Serialize, Deserialize, Debug)] -struct ImportProgressRequest { - // no fields yet, not sure if there every will be any -} - -#[derive(Serialize, Deserialize, Debug)] -struct ImportProgressResponse { - // we don't care -} - -impl Client { - pub fn new(conf: &PageServerConf, cancel: CancellationToken) -> anyhow::Result { - let Some(ref base_url) = conf.import_pgdata_upcall_api else { - anyhow::bail!("import_pgdata_upcall_api is not configured") - }; - let mut http_client = reqwest::Client::builder(); - for cert in &conf.ssl_ca_certs { - http_client = http_client.add_root_certificate(Certificate::from_der(cert.contents())?); - } - let http_client = http_client.build()?; - - Ok(Self { - base_url: base_url.to_string(), - client: http_client, - cancel, - authorization_header: conf - .import_pgdata_upcall_api_token - .as_ref() - .map(|secret_string| secret_string.get_contents()) - .map(|jwt| format!("Bearer {jwt}")), - }) - } - - fn start_request( - &self, - method: Method, - uri: U, - ) -> reqwest::RequestBuilder { - let req = self.client.request(method, uri); - if let Some(value) = &self.authorization_header { - req.header(reqwest::header::AUTHORIZATION, value) - } else { - req - } - } - - async fn request_noerror( - &self, - method: Method, - uri: U, - body: B, - ) -> Result { - self.start_request(method, uri) - .json(&body) - .send() - .await - .map_err(Error::ReceiveBody) - } - - async fn request( - &self, - method: Method, - uri: U, - body: B, - ) -> Result { - let res = self.request_noerror(method, uri, body).await?; - let response = res.error_from_body().await?; - Ok(response) - } - - pub async fn send_progress_once(&self, spec: &Spec) -> Result<()> { - let url = format!( - "{}/projects/{}/branches/{}/import_progress", - self.base_url, spec.project_id, spec.branch_id - ); - let ImportProgressResponse {} = self - .request(Method::POST, url, &ImportProgressRequest {}) - .await? - .json() - .await - .map_err(Error::ReceiveBody)?; - Ok(()) - } - - pub async fn send_progress_until_success(&self, spec: &Spec) -> anyhow::Result<()> { - loop { - match self.send_progress_once(spec).await { - Ok(()) => return Ok(()), - Err(Error::Cancelled) => return Err(anyhow::anyhow!("cancelled")), - Err(err) => { - error!(?err, "error sending progress, retrying"); - if tokio::time::timeout( - std::time::Duration::from_secs(10), - self.cancel.cancelled(), - ) - .await - .is_ok() - { - anyhow::bail!("cancelled while sending early progress update"); - } - } - } - } - } -} diff --git a/storage_controller/migrations/2025-03-18-103700_timeline_imports/down.sql b/storage_controller/migrations/2025-03-18-103700_timeline_imports/down.sql new file mode 100644 index 0000000000..4e7ae74ce2 --- /dev/null +++ b/storage_controller/migrations/2025-03-18-103700_timeline_imports/down.sql @@ -0,0 +1 @@ +DROP TABLE timeline_imports; diff --git a/storage_controller/migrations/2025-03-18-103700_timeline_imports/up.sql b/storage_controller/migrations/2025-03-18-103700_timeline_imports/up.sql new file mode 100644 index 0000000000..27741c439d --- /dev/null +++ b/storage_controller/migrations/2025-03-18-103700_timeline_imports/up.sql @@ -0,0 +1,6 @@ +CREATE TABLE timeline_imports ( + tenant_id VARCHAR NOT NULL, + timeline_id VARCHAR NOT NULL, + shard_statuses JSONB NOT NULL, + PRIMARY KEY(tenant_id, timeline_id) +); diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs index fb4530d0d2..0a36ce8b6f 100644 --- a/storage_controller/src/http.rs +++ b/storage_controller/src/http.rs @@ -30,7 +30,9 @@ use pageserver_api::models::{ TimelineArchivalConfigRequest, TimelineCreateRequest, }; use pageserver_api::shard::TenantShardId; -use pageserver_api::upcall_api::{ReAttachRequest, ValidateRequest}; +use pageserver_api::upcall_api::{ + PutTimelineImportStatusRequest, ReAttachRequest, ValidateRequest, +}; use pageserver_client::{BlockUnblock, mgmt_api}; use routerify::Middleware; use tokio_util::sync::CancellationToken; @@ -154,6 +156,28 @@ async fn handle_validate(req: Request) -> Result, ApiError> json_response(StatusCode::OK, state.service.validate(validate_req).await?) } +async fn handle_put_timeline_import_status(req: Request) -> Result, ApiError> { + check_permissions(&req, Scope::GenerationsApi)?; + + let mut req = match maybe_forward(req).await { + ForwardOutcome::Forwarded(res) => { + return res; + } + ForwardOutcome::NotForwarded(req) => req, + }; + + let put_req = json_request::(&mut req).await?; + + let state = get_state(&req); + json_response( + StatusCode::OK, + state + .service + .handle_timeline_shard_import_progress_upcall(put_req) + .await?, + ) +} + /// Call into this before attaching a tenant to a pageserver, to acquire a generation number /// (in the real control plane this is unnecessary, because the same program is managing /// generation numbers and doing attachments). @@ -1961,6 +1985,13 @@ pub fn make_router( .post("/upcall/v1/validate", |r| { named_request_span(r, handle_validate, RequestName("upcall_v1_validate")) }) + .post("/upcall/v1/timeline_import_status", |r| { + named_request_span( + r, + handle_put_timeline_import_status, + RequestName("upcall_v1_timeline_import_status"), + ) + }) // Test/dev/debug endpoints .post("/debug/v1/attach-hook", |r| { named_request_span(r, handle_attach_hook, RequestName("debug_v1_attach_hook")) diff --git a/storage_controller/src/lib.rs b/storage_controller/src/lib.rs index 5f2c081927..a9ec511431 100644 --- a/storage_controller/src/lib.rs +++ b/storage_controller/src/lib.rs @@ -23,6 +23,7 @@ mod scheduler; mod schema; pub mod service; mod tenant_shard; +mod timeline_import; #[derive(Ord, PartialOrd, Eq, PartialEq, Copy, Clone, Serialize)] struct Sequence(u64); diff --git a/storage_controller/src/pageserver_client.rs b/storage_controller/src/pageserver_client.rs index d14fc35b39..554ca375f5 100644 --- a/storage_controller/src/pageserver_client.rs +++ b/storage_controller/src/pageserver_client.rs @@ -212,6 +212,21 @@ impl PageserverClient { ) } + pub(crate) async fn timeline_detail( + &self, + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + ) -> Result { + measured_request!( + "timeline_detail", + crate::metrics::Method::Get, + &self.node_id_label, + self.inner + .timeline_detail(tenant_shard_id, timeline_id) + .await + ) + } + pub(crate) async fn tenant_shard_split( &self, tenant_shard_id: TenantShardId, diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index a413bba3c9..30c0e03d03 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -22,7 +22,7 @@ use pageserver_api::controller_api::{ AvailabilityZone, MetadataHealthRecord, NodeSchedulingPolicy, PlacementPolicy, SafekeeperDescribeResponse, ShardSchedulingPolicy, SkSchedulingPolicy, }; -use pageserver_api::models::TenantConfig; +use pageserver_api::models::{ShardImportStatus, TenantConfig}; use pageserver_api::shard::{ ShardConfigError, ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId, }; @@ -40,6 +40,9 @@ use crate::metrics::{ DatabaseQueryErrorLabelGroup, DatabaseQueryLatencyLabelGroup, METRICS_REGISTRY, }; use crate::node::Node; +use crate::timeline_import::{ + TimelineImport, TimelineImportUpdateError, TimelineImportUpdateFollowUp, +}; const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations"); /// ## What do we store? @@ -127,6 +130,9 @@ pub(crate) enum DatabaseOperation { RemoveTimelineReconcile, ListTimelineReconcile, ListTimelineReconcileStartup, + InsertTimelineImport, + UpdateTimelineImport, + DeleteTimelineImport, } #[must_use] @@ -1614,6 +1620,129 @@ impl Persistence { Ok(()) } + + pub(crate) async fn insert_timeline_import( + &self, + import: TimelineImportPersistence, + ) -> DatabaseResult { + self.with_measured_conn(DatabaseOperation::InsertTimelineImport, move |conn| { + Box::pin({ + let import = import.clone(); + async move { + let inserted = diesel::insert_into(crate::schema::timeline_imports::table) + .values(import) + .execute(conn) + .await?; + Ok(inserted == 1) + } + }) + }) + .await + } + + pub(crate) async fn delete_timeline_import( + &self, + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> DatabaseResult<()> { + use crate::schema::timeline_imports::dsl; + + self.with_measured_conn(DatabaseOperation::DeleteTimelineImport, move |conn| { + Box::pin(async move { + diesel::delete(crate::schema::timeline_imports::table) + .filter( + dsl::tenant_id + .eq(tenant_id.to_string()) + .and(dsl::timeline_id.eq(timeline_id.to_string())), + ) + .execute(conn) + .await?; + + Ok(()) + }) + }) + .await + } + + /// Idempotently update the status of one shard for an ongoing timeline import + /// + /// If the update was persisted to the database, then the current state of the + /// import is returned to the caller. In case of logical errors a bespoke + /// [`TimelineImportUpdateError`] instance is returned. Other database errors + /// are covered by the outer [`DatabaseError`]. + pub(crate) async fn update_timeline_import( + &self, + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + shard_status: ShardImportStatus, + ) -> DatabaseResult, TimelineImportUpdateError>> { + use crate::schema::timeline_imports::dsl; + + self.with_measured_conn(DatabaseOperation::UpdateTimelineImport, move |conn| { + Box::pin({ + let shard_status = shard_status.clone(); + async move { + // Load the current state from the database + let mut from_db: Vec = dsl::timeline_imports + .filter( + dsl::tenant_id + .eq(tenant_shard_id.tenant_id.to_string()) + .and(dsl::timeline_id.eq(timeline_id.to_string())), + ) + .load(conn) + .await?; + + assert!(from_db.len() <= 1); + + let mut status = match from_db.pop() { + Some(some) => TimelineImport::from_persistent(some).unwrap(), + None => { + return Ok(Err(TimelineImportUpdateError::ImportNotFound { + tenant_id: tenant_shard_id.tenant_id, + timeline_id, + })); + } + }; + + // Perform the update in-memory + let follow_up = match status.update(tenant_shard_id.to_index(), shard_status) { + Ok(ok) => ok, + Err(err) => { + return Ok(Err(err)); + } + }; + + let new_persistent = status.to_persistent(); + + // Write back if required (in the same transaction) + match follow_up { + TimelineImportUpdateFollowUp::Persist => { + let updated = diesel::update(dsl::timeline_imports) + .filter( + dsl::tenant_id + .eq(tenant_shard_id.tenant_id.to_string()) + .and(dsl::timeline_id.eq(timeline_id.to_string())), + ) + .set(dsl::shard_statuses.eq(new_persistent.shard_statuses)) + .execute(conn) + .await?; + + if updated != 1 { + return Ok(Err(TimelineImportUpdateError::ImportNotFound { + tenant_id: tenant_shard_id.tenant_id, + timeline_id, + })); + } + + Ok(Ok(Some(status))) + } + TimelineImportUpdateFollowUp::None => Ok(Ok(None)), + } + } + }) + }) + .await + } } pub(crate) fn load_certs() -> anyhow::Result> { @@ -2171,3 +2300,11 @@ impl ToSql for SafekeeperTimelineOpKind { .map_err(Into::into) } } + +#[derive(Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq, Clone)] +#[diesel(table_name = crate::schema::timeline_imports)] +pub(crate) struct TimelineImportPersistence { + pub(crate) tenant_id: String, + pub(crate) timeline_id: String, + pub(crate) shard_statuses: serde_json::Value, +} diff --git a/storage_controller/src/schema.rs b/storage_controller/src/schema.rs index 9b36376fcb..20be9bb5ca 100644 --- a/storage_controller/src/schema.rs +++ b/storage_controller/src/schema.rs @@ -76,6 +76,14 @@ diesel::table! { } } +diesel::table! { + timeline_imports (tenant_id, timeline_id) { + tenant_id -> Varchar, + timeline_id -> Varchar, + shard_statuses -> Jsonb, + } +} + diesel::table! { use diesel::sql_types::*; use super::sql_types::PgLsn; @@ -99,5 +107,6 @@ diesel::allow_tables_to_appear_in_same_query!( safekeeper_timeline_pending_ops, safekeepers, tenant_shards, + timeline_imports, timelines, ); diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 860fc4f6ab..c5cf4bedcf 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -40,14 +40,14 @@ use pageserver_api::models::{ TenantLocationConfigResponse, TenantShardLocation, TenantShardSplitRequest, TenantShardSplitResponse, TenantSorting, TenantTimeTravelRequest, TimelineArchivalConfigRequest, TimelineCreateRequest, TimelineCreateResponseStorcon, - TimelineInfo, TopTenantShardItem, TopTenantShardsRequest, + TimelineInfo, TimelineState, TopTenantShardItem, TopTenantShardsRequest, }; use pageserver_api::shard::{ DEFAULT_STRIPE_SIZE, ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId, }; use pageserver_api::upcall_api::{ - ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest, ValidateResponse, - ValidateResponseTenant, + PutTimelineImportStatusRequest, ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, + ValidateRequest, ValidateResponse, ValidateResponseTenant, }; use pageserver_client::{BlockUnblock, mgmt_api}; use reqwest::{Certificate, StatusCode}; @@ -97,6 +97,7 @@ use crate::tenant_shard::{ ReconcileNeeded, ReconcileResult, ReconcileWaitError, ReconcilerStatus, ReconcilerWaiter, ScheduleOptimization, ScheduleOptimizationAction, TenantShard, }; +use crate::timeline_import::{ShardImportStatuses, TimelineImport, UpcallClient}; const WAITER_FILL_DRAIN_POLL_TIMEOUT: Duration = Duration::from_millis(500); @@ -3732,11 +3733,14 @@ impl Service { create_req: TimelineCreateRequest, ) -> Result { let safekeepers = self.config.timelines_onto_safekeepers; + let timeline_id = create_req.new_timeline_id; + tracing::info!( + mode=%create_req.mode_tag(), %safekeepers, "Creating timeline {}/{}", tenant_id, - create_req.new_timeline_id, + timeline_id, ); let _tenant_lock = trace_shared_lock( @@ -3746,15 +3750,62 @@ impl Service { ) .await; failpoint_support::sleep_millis_async!("tenant-create-timeline-shared-lock"); - let create_mode = create_req.mode.clone(); + let is_import = create_req.is_import(); let timeline_info = self .tenant_timeline_create_pageservers(tenant_id, create_req) .await?; - let safekeepers = if safekeepers { + let selected_safekeepers = if is_import { + let shards = { + let locked = self.inner.read().unwrap(); + locked + .tenants + .range(TenantShardId::tenant_range(tenant_id)) + .map(|(ts_id, _)| ts_id.to_index()) + .collect::>() + }; + + if !shards + .iter() + .map(|shard_index| shard_index.shard_count) + .all_equal() + { + return Err(ApiError::InternalServerError(anyhow::anyhow!( + "Inconsistent shard count" + ))); + } + + let import = TimelineImport { + tenant_id, + timeline_id, + shard_statuses: ShardImportStatuses::new(shards), + }; + + let inserted = self + .persistence + .insert_timeline_import(import.to_persistent()) + .await + .context("timeline import insert") + .map_err(ApiError::InternalServerError)?; + + match inserted { + true => { + tracing::info!(%tenant_id, %timeline_id, "Inserted timeline import"); + } + false => { + tracing::info!(%tenant_id, %timeline_id, "Timeline import entry already present"); + } + } + + None + } else if safekeepers { + // Note that we do not support creating the timeline on the safekeepers + // for imported timelines. The `start_lsn` of the timeline is not known + // until the import finshes. + // https://github.com/neondatabase/neon/issues/11569 let res = self - .tenant_timeline_create_safekeepers(tenant_id, &timeline_info, create_mode) + .tenant_timeline_create_safekeepers(tenant_id, &timeline_info) .instrument(tracing::info_span!("timeline_create_safekeepers", %tenant_id, timeline_id=%timeline_info.timeline_id)) .await?; Some(res) @@ -3764,10 +3815,168 @@ impl Service { Ok(TimelineCreateResponseStorcon { timeline_info, - safekeepers, + safekeepers: selected_safekeepers, }) } + pub(crate) async fn handle_timeline_shard_import_progress_upcall( + self: &Arc, + req: PutTimelineImportStatusRequest, + ) -> Result<(), ApiError> { + let res = self + .persistence + .update_timeline_import(req.tenant_shard_id, req.timeline_id, req.status) + .await; + let timeline_import = match res { + Ok(Ok(Some(timeline_import))) => timeline_import, + Ok(Ok(None)) => { + // Idempotency: we've already seen and handled this update. + return Ok(()); + } + Ok(Err(logical_err)) => { + return Err(logical_err.into()); + } + Err(db_err) => { + return Err(db_err.into()); + } + }; + + tracing::info!( + tenant_id=%req.tenant_shard_id.tenant_id, + timeline_id=%req.timeline_id, + shard_id=%req.tenant_shard_id.shard_slug(), + "Updated timeline import status to: {timeline_import:?}"); + + if timeline_import.is_complete() { + tokio::task::spawn({ + let this = self.clone(); + async move { this.finalize_timeline_import(timeline_import).await } + }); + } + + Ok(()) + } + + #[instrument(skip_all, fields( + tenant_id=%import.tenant_id, + shard_id=%import.timeline_id, + ))] + async fn finalize_timeline_import( + self: &Arc, + import: TimelineImport, + ) -> anyhow::Result<()> { + // TODO(vlad): On start-up, load up the imports and notify cplane of the + // ones that have been completed. This assumes the new cplane API will + // be idempotent. If that's not possible, bang a flag in the database. + // https://github.com/neondatabase/neon/issues/11570 + + tracing::info!("Finalizing timeline import"); + + let import_failed = import.completion_error().is_some(); + + if !import_failed { + loop { + if self.cancel.is_cancelled() { + anyhow::bail!("Shut down requested while finalizing import"); + } + + let active = self.timeline_active_on_all_shards(&import).await?; + + match active { + true => { + tracing::info!("Timeline became active on all shards"); + break; + } + false => { + tracing::info!("Timeline not active on all shards yet"); + + tokio::select! { + _ = self.cancel.cancelled() => { + anyhow::bail!("Shut down requested while finalizing import"); + }, + _ = tokio::time::sleep(Duration::from_secs(5)) => {} + }; + } + } + } + } + + tracing::info!(%import_failed, "Notifying cplane of import completion"); + + let client = UpcallClient::new(self.get_config(), self.cancel.child_token()); + client.notify_import_complete(&import).await?; + + if let Err(err) = self + .persistence + .delete_timeline_import(import.tenant_id, import.timeline_id) + .await + { + tracing::warn!("Failed to delete timeline import entry from database: {err}"); + } + + // TODO(vlad): Timeline creations in import mode do not return a correct initdb lsn, + // so we can't create the timeline on the safekeepers. Fix by moving creation here. + // https://github.com/neondatabase/neon/issues/11569 + tracing::info!(%import_failed, "Timeline import complete"); + + Ok(()) + } + + async fn timeline_active_on_all_shards( + self: &Arc, + import: &TimelineImport, + ) -> anyhow::Result { + let targets = { + let locked = self.inner.read().unwrap(); + let mut targets = Vec::new(); + + for (tenant_shard_id, shard) in locked + .tenants + .range(TenantShardId::tenant_range(import.tenant_id)) + { + if !import + .shard_statuses + .0 + .contains_key(&tenant_shard_id.to_index()) + { + anyhow::bail!("Shard layout change detected on completion"); + } + + if let Some(node_id) = shard.intent.get_attached() { + let node = locked + .nodes + .get(node_id) + .expect("Pageservers may not be deleted while referenced"); + targets.push((*tenant_shard_id, node.clone())); + } else { + return Ok(false); + } + } + + targets + }; + + let results = self + .tenant_for_shards_api( + targets, + |tenant_shard_id, client| async move { + client + .timeline_detail(tenant_shard_id, import.timeline_id) + .await + }, + 1, + 1, + SHORT_RECONCILE_TIMEOUT, + &self.cancel, + ) + .await; + + Ok(results.into_iter().all(|res| match res { + Ok(info) => info.state == TimelineState::Active, + Err(_) => false, + })) + } + pub(crate) async fn tenant_timeline_archival_config( &self, tenant_id: TenantId, diff --git a/storage_controller/src/service/safekeeper_service.rs b/storage_controller/src/service/safekeeper_service.rs index 088b3c4741..5eecf0d415 100644 --- a/storage_controller/src/service/safekeeper_service.rs +++ b/storage_controller/src/service/safekeeper_service.rs @@ -15,7 +15,7 @@ use http_utils::error::ApiError; use pageserver_api::controller_api::{ SafekeeperDescribeResponse, SkSchedulingPolicy, TimelineImportRequest, }; -use pageserver_api::models::{self, SafekeeperInfo, SafekeepersInfo, TimelineInfo}; +use pageserver_api::models::{SafekeeperInfo, SafekeepersInfo, TimelineInfo}; use safekeeper_api::membership::{MemberSet, SafekeeperId}; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; @@ -207,7 +207,6 @@ impl Service { self: &Arc, tenant_id: TenantId, timeline_info: &TimelineInfo, - create_mode: models::TimelineCreateRequestMode, ) -> Result { let timeline_id = timeline_info.timeline_id; let pg_version = timeline_info.pg_version * 10000; @@ -217,15 +216,8 @@ impl Service { // previously existed as on retries in theory endpoint might have // already written some data and advanced last_record_lsn, while we want // safekeepers to have consistent start_lsn. - let start_lsn = match create_mode { - models::TimelineCreateRequestMode::Bootstrap { .. } => timeline_info.last_record_lsn, - models::TimelineCreateRequestMode::Branch { .. } => timeline_info.last_record_lsn, - models::TimelineCreateRequestMode::ImportPgdata { .. } => { - return Err(ApiError::InternalServerError(anyhow::anyhow!( - "import pgdata doesn't specify the start lsn, aborting creation on safekeepers" - )))?; - } - }; + let start_lsn = timeline_info.last_record_lsn; + // Choose initial set of safekeepers respecting affinity let sks = self.safekeepers_for_new_timeline().await?; let sks_persistence = sks.iter().map(|sk| sk.id.0 as i64).collect::>(); diff --git a/storage_controller/src/timeline_import.rs b/storage_controller/src/timeline_import.rs new file mode 100644 index 0000000000..364299c9f0 --- /dev/null +++ b/storage_controller/src/timeline_import.rs @@ -0,0 +1,260 @@ +use std::time::Duration; +use std::{collections::HashMap, str::FromStr}; + +use http_utils::error::ApiError; +use reqwest::Method; +use serde::{Deserialize, Serialize}; + +use pageserver_api::models::ShardImportStatus; +use tokio_util::sync::CancellationToken; +use utils::{ + id::{TenantId, TimelineId}, + shard::ShardIndex, +}; + +use crate::{persistence::TimelineImportPersistence, service::Config}; + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub(crate) struct ShardImportStatuses(pub(crate) HashMap); + +impl ShardImportStatuses { + pub(crate) fn new(shards: Vec) -> Self { + ShardImportStatuses( + shards + .into_iter() + .map(|ts_id| (ts_id, ShardImportStatus::InProgress)) + .collect(), + ) + } +} + +#[derive(Debug)] +pub(crate) struct TimelineImport { + pub(crate) tenant_id: TenantId, + pub(crate) timeline_id: TimelineId, + pub(crate) shard_statuses: ShardImportStatuses, +} + +pub(crate) enum TimelineImportUpdateFollowUp { + Persist, + None, +} + +pub(crate) enum TimelineImportUpdateError { + ImportNotFound { + tenant_id: TenantId, + timeline_id: TimelineId, + }, + MismatchedShards, + UnexpectedUpdate, +} + +impl From for ApiError { + fn from(err: TimelineImportUpdateError) -> ApiError { + match err { + TimelineImportUpdateError::ImportNotFound { + tenant_id, + timeline_id, + } => ApiError::NotFound( + anyhow::anyhow!("Import for {tenant_id}/{timeline_id} not found").into(), + ), + TimelineImportUpdateError::MismatchedShards => { + ApiError::InternalServerError(anyhow::anyhow!( + "Import shards do not match update request, likely a shard split happened during import, this is a bug" + )) + } + TimelineImportUpdateError::UnexpectedUpdate => { + ApiError::InternalServerError(anyhow::anyhow!("Update request is unexpected")) + } + } + } +} + +impl TimelineImport { + pub(crate) fn from_persistent(persistent: TimelineImportPersistence) -> anyhow::Result { + let tenant_id = TenantId::from_str(persistent.tenant_id.as_str())?; + let timeline_id = TimelineId::from_str(persistent.timeline_id.as_str())?; + let shard_statuses = serde_json::from_value(persistent.shard_statuses)?; + + Ok(TimelineImport { + tenant_id, + timeline_id, + shard_statuses, + }) + } + + pub(crate) fn to_persistent(&self) -> TimelineImportPersistence { + TimelineImportPersistence { + tenant_id: self.tenant_id.to_string(), + timeline_id: self.timeline_id.to_string(), + shard_statuses: serde_json::to_value(self.shard_statuses.clone()).unwrap(), + } + } + + pub(crate) fn update( + &mut self, + shard: ShardIndex, + status: ShardImportStatus, + ) -> Result { + use std::collections::hash_map::Entry::*; + + match self.shard_statuses.0.entry(shard) { + Occupied(mut occ) => { + let crnt = occ.get_mut(); + if *crnt == status { + Ok(TimelineImportUpdateFollowUp::None) + } else if crnt.is_terminal() && !status.is_terminal() { + Err(TimelineImportUpdateError::UnexpectedUpdate) + } else { + *crnt = status; + Ok(TimelineImportUpdateFollowUp::Persist) + } + } + Vacant(_) => Err(TimelineImportUpdateError::MismatchedShards), + } + } + + pub(crate) fn is_complete(&self) -> bool { + self.shard_statuses + .0 + .values() + .all(|status| status.is_terminal()) + } + + pub(crate) fn completion_error(&self) -> Option { + assert!(self.is_complete()); + + let shard_errors: HashMap<_, _> = self + .shard_statuses + .0 + .iter() + .filter_map(|(shard, status)| { + if let ShardImportStatus::Error(err) = status { + Some((*shard, err.clone())) + } else { + None + } + }) + .collect(); + + if shard_errors.is_empty() { + None + } else { + Some(serde_json::to_string(&shard_errors).unwrap()) + } + } +} + +pub(crate) struct UpcallClient { + authorization_header: Option, + client: reqwest::Client, + cancel: CancellationToken, + base_url: String, +} + +const IMPORT_COMPLETE_REQUEST_TIMEOUT: Duration = Duration::from_secs(10); + +#[derive(Serialize, Deserialize, Debug)] +struct ImportCompleteRequest { + tenant_id: TenantId, + timeline_id: TimelineId, + error: Option, +} + +impl UpcallClient { + pub(crate) fn new(config: &Config, cancel: CancellationToken) -> Self { + let authorization_header = config + .control_plane_jwt_token + .clone() + .map(|jwt| format!("Bearer {}", jwt)); + + let client = reqwest::ClientBuilder::new() + .timeout(IMPORT_COMPLETE_REQUEST_TIMEOUT) + .build() + .expect("Failed to construct HTTP client"); + + let base_url = config + .control_plane_url + .clone() + .expect("must be configured"); + + Self { + authorization_header, + client, + cancel, + base_url, + } + } + + /// Notify control plane of a completed import + /// + /// This method guarantees at least once delivery semantics assuming + /// eventual cplane availability. The cplane API is idempotent. + pub(crate) async fn notify_import_complete( + &self, + import: &TimelineImport, + ) -> anyhow::Result<()> { + let endpoint = if self.base_url.ends_with('/') { + format!("{}import_complete", self.base_url) + } else { + format!("{}/import_complete", self.base_url) + }; + + tracing::info!("Endpoint is {endpoint}"); + + let request = self + .client + .request(Method::PUT, endpoint) + .json(&ImportCompleteRequest { + tenant_id: import.tenant_id, + timeline_id: import.timeline_id, + error: import.completion_error(), + }) + .timeout(IMPORT_COMPLETE_REQUEST_TIMEOUT); + + let request = if let Some(auth) = &self.authorization_header { + request.header(reqwest::header::AUTHORIZATION, auth) + } else { + request + }; + + const RETRY_DELAY: Duration = Duration::from_secs(1); + let mut attempt = 1; + + loop { + if self.cancel.is_cancelled() { + return Err(anyhow::anyhow!( + "Shutting down while notifying cplane of import completion" + )); + } + + match request.try_clone().unwrap().send().await { + Ok(response) if response.status().is_success() => { + return Ok(()); + } + Ok(response) => { + tracing::warn!( + "Import complete notification failed with status {}, attempt {}", + response.status(), + attempt + ); + } + Err(e) => { + tracing::warn!( + "Import complete notification failed with error: {}, attempt {}", + e, + attempt + ); + } + } + + tokio::select! { + _ = tokio::time::sleep(RETRY_DELAY) => {} + _ = self.cancel.cancelled() => { + return Err(anyhow::anyhow!("Shutting down while notifying cplane of import completion")); + } + } + attempt += 1; + } + } +} diff --git a/test_runner/regress/test_import_pgdata.py b/test_runner/regress/test_import_pgdata.py index 6b3b71f29c..971245f393 100644 --- a/test_runner/regress/test_import_pgdata.py +++ b/test_runner/regress/test_import_pgdata.py @@ -1,9 +1,9 @@ import base64 import json -import re import time from enum import Enum from pathlib import Path +from threading import Event import psycopg2 import psycopg2.errors @@ -14,12 +14,11 @@ from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, PgProtocol, VanillaPostgres from fixtures.pageserver.http import ( ImportPgdataIdemptencyKey, - PageserverApiException, ) from fixtures.pg_version import PgVersion from fixtures.port_distributor import PortDistributor from fixtures.remote_storage import MockS3Server, RemoteStorageKind -from fixtures.utils import shared_buffers_for_max_cu +from fixtures.utils import shared_buffers_for_max_cu, skip_in_debug_build, wait_until from mypy_boto3_kms import KMSClient from mypy_boto3_kms.type_defs import EncryptResponseTypeDef from mypy_boto3_s3 import S3Client @@ -44,6 +43,7 @@ smoke_params = [ ] +@skip_in_debug_build("MULTIPLE_RELATION_SEGMENTS has non trivial amount of data") @pytest.mark.parametrize("shard_count,stripe_size,rel_block_size", smoke_params) def test_pgdata_import_smoke( vanilla_pg: VanillaPostgres, @@ -56,24 +56,29 @@ def test_pgdata_import_smoke( # # Setup fake control plane for import progress # + import_completion_signaled = Event() + def handler(request: Request) -> Response: - log.info(f"control plane request: {request.json}") + log.info(f"control plane /import_complete request: {request.json}") + import_completion_signaled.set() return Response(json.dumps({}), status=200) cplane_mgmt_api_server = make_httpserver - cplane_mgmt_api_server.expect_request(re.compile(".*")).respond_with_handler(handler) + cplane_mgmt_api_server.expect_request( + "/storage/api/v1/import_complete", method="PUT" + ).respond_with_handler(handler) neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS) + + neon_env_builder.control_plane_hooks_api = ( + f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/storage/api/v1/" + ) + env = neon_env_builder.init_start() # The test needs LocalFs support, which is only built in testing mode. env.pageserver.is_testing_enabled_or_skip() - env.pageserver.patch_config_toml_nonrecursive( - { - "import_pgdata_upcall_api": f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/path/to/mgmt/api" - } - ) env.pageserver.stop() env.pageserver.start() @@ -193,40 +198,11 @@ def test_pgdata_import_smoke( ) env.neon_cli.mappings_map_branch(import_branch_name, tenant_id, timeline_id) - while True: - locations = env.storage_controller.locate(tenant_id) - active_count = 0 - for location in locations: - shard_id = TenantShardId.parse(location["shard_id"]) - ps = env.get_pageserver(location["node_id"]) - try: - detail = ps.http_client().timeline_detail(shard_id, timeline_id) - state = detail["state"] - log.info(f"shard {shard_id} state: {state}") - if state == "Active": - active_count += 1 - except PageserverApiException as e: - if e.status_code == 404: - log.info("not found, import is in progress") - continue - elif e.status_code == 429: - log.info("import is in progress") - continue - else: - raise + def cplane_notified(): + assert import_completion_signaled.is_set() - shard_status_file = statusdir / f"shard-{shard_id.shard_index}" - if state == "Active": - shard_status_file_contents = ( - shard_status_file.read_text() - ) # Active state implies import is done - shard_status = json.loads(shard_status_file_contents) - assert shard_status["done"] is True - - if active_count == len(locations): - log.info("all shards are active") - break - time.sleep(1) + # Generous timeout for the MULTIPLE_RELATION_SEGMENTS test variants + wait_until(cplane_notified, timeout=90) import_duration = time.monotonic() - start log.info(f"import complete; duration={import_duration:.2f}s") @@ -372,19 +348,27 @@ def test_fast_import_with_pageserver_ingest( vanilla_pg.safe_psql("CREATE TABLE foo (a int); INSERT INTO foo SELECT generate_series(1, 10);") # Setup pageserver and fake cplane for import progress + import_completion_signaled = Event() + def handler(request: Request) -> Response: - log.info(f"control plane request: {request.json}") + log.info(f"control plane /import_complete request: {request.json}") + import_completion_signaled.set() return Response(json.dumps({}), status=200) cplane_mgmt_api_server = make_httpserver - cplane_mgmt_api_server.expect_request(re.compile(".*")).respond_with_handler(handler) + cplane_mgmt_api_server.expect_request( + "/storage/api/v1/import_complete", method="PUT" + ).respond_with_handler(handler) + + neon_env_builder.control_plane_hooks_api = ( + f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/storage/api/v1/" + ) neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.MOCK_S3) env = neon_env_builder.init_start() env.pageserver.patch_config_toml_nonrecursive( { - "import_pgdata_upcall_api": f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/path/to/mgmt/api", # because import_pgdata code uses this endpoint, not the one in common remote storage config # TODO: maybe use common remote_storage config in pageserver? "import_pgdata_aws_endpoint_url": env.s3_mock_server.endpoint(), @@ -476,42 +460,10 @@ def test_fast_import_with_pageserver_ingest( conn = PgProtocol(dsn=f"postgresql://cloud_admin@localhost:{pg_port}/neondb") validate_vanilla_equivalence(conn) - # Poll pageserver statuses in s3 - while True: - locations = env.storage_controller.locate(tenant_id) - active_count = 0 - for location in locations: - shard_id = TenantShardId.parse(location["shard_id"]) - ps = env.get_pageserver(location["node_id"]) - try: - detail = ps.http_client().timeline_detail(shard_id, timeline_id) - log.info(f"timeline {tenant_id}/{timeline_id} detail: {detail}") - state = detail["state"] - log.info(f"shard {shard_id} state: {state}") - if state == "Active": - active_count += 1 - except PageserverApiException as e: - if e.status_code == 404: - log.info("not found, import is in progress") - continue - elif e.status_code == 429: - log.info("import is in progress") - continue - else: - raise + def cplane_notified(): + assert import_completion_signaled.is_set() - if state == "Active": - key = f"{key_prefix}/status/shard-{shard_id.shard_index}" - shard_status_file_contents = ( - mock_s3_client.get_object(Bucket=bucket, Key=key)["Body"].read().decode("utf-8") - ) - shard_status = json.loads(shard_status_file_contents) - assert shard_status["done"] is True - - if active_count == len(locations): - log.info("all shards are active") - break - time.sleep(0.5) + wait_until(cplane_notified, timeout=60) import_duration = time.monotonic() - start log.info(f"import complete; duration={import_duration:.2f}s")