diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs
index 05a444d738..55e9c48421 100644
--- a/libs/pageserver_api/src/models.rs
+++ b/libs/pageserver_api/src/models.rs
@@ -162,6 +162,22 @@ impl std::fmt::Debug for TenantState {
}
}
+/// A temporary lease to a specific lsn inside a timeline.
+/// Access to the lsn is guaranteed by the pageserver until the expiration indicated by `valid_until`.
+#[serde_as]
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+pub struct LsnLease {
+ #[serde_as(as = "SystemTimeAsRfc3339Millis")]
+ pub valid_until: SystemTime,
+}
+
+serde_with::serde_conv!(
+ SystemTimeAsRfc3339Millis,
+ SystemTime,
+ |time: &SystemTime| humantime::format_rfc3339_millis(*time).to_string(),
+ |value: String| -> Result<_, humantime::TimestampError> { humantime::parse_rfc3339(&value) }
+);
+
/// The only [`TenantState`] variants we could be `TenantState::Activating` from.
#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum ActivatingFrom {
diff --git a/libs/pageserver_api/src/shard.rs b/libs/pageserver_api/src/shard.rs
index 43d9b2e48c..1c05a01926 100644
--- a/libs/pageserver_api/src/shard.rs
+++ b/libs/pageserver_api/src/shard.rs
@@ -559,6 +559,14 @@ impl ShardIdentity {
}
}
+ /// Obtains the shard number and count combined into a `ShardIndex`.
+ pub fn shard_index(&self) -> ShardIndex {
+ ShardIndex {
+ shard_count: self.count,
+ shard_number: self.number,
+ }
+ }
+
pub fn shard_slug(&self) -> String {
if self.count > ShardCount(0) {
format!("-{:02x}{:02x}", self.number.0, self.count.0)
diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml
index 36c74ed140..107bcd4a22 100644
--- a/pageserver/src/http/openapi_spec.yml
+++ b/pageserver/src/http/openapi_spec.yml
@@ -257,6 +257,37 @@ paths:
schema:
$ref: "#/components/schemas/LsnByTimestampResponse"
+ /v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/lsn_lease:
+ parameters:
+ - name: tenant_shard_id
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: timeline_id
+ in: path
+ required: true
+ schema:
+ type: string
+ format: hex
+ post:
+ description: Obtain lease for the given LSN
+ parameters:
+ - name: lsn
+ in: query
+ required: true
+ schema:
+ type: string
+ format: hex
+ description: A LSN to obtain the lease for
+ responses:
+ "200":
+ description: OK
+ content:
+ application/json:
+ schema:
+ $ref: "#/components/schemas/LsnLease"
+
/v1/tenant/{tenant_id}/timeline/{timeline_id}/do_gc:
parameters:
- name: tenant_id
@@ -980,6 +1011,15 @@ components:
type: string
enum: [past, present, future, nodata]
+ LsnLease:
+ type: object
+ required:
+ - valid_until
+ properties:
+ valid_until:
+ type: string
+ format: date-time
+
PageserverUtilization:
type: object
required:
diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs
index ec3b1141f3..c75e4ca5a9 100644
--- a/pageserver/src/http/routes.rs
+++ b/pageserver/src/http/routes.rs
@@ -1701,6 +1701,32 @@ async fn handle_tenant_break(
json_response(StatusCode::OK, ())
}
+// Obtains an lsn lease on the given timeline.
+async fn lsn_lease_handler(
+ request: Request
,
+ _cancel: CancellationToken,
+) -> Result, ApiError> {
+ let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
+ let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
+ check_permission(&request, Some(tenant_shard_id.tenant_id))?;
+
+ let lsn: Lsn = parse_query_param(&request, "lsn")?
+ .ok_or_else(|| ApiError::BadRequest(anyhow!("missing 'lsn' query parameter")))?;
+
+ let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
+
+ let state = get_state(&request);
+
+ let timeline =
+ active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
+ .await?;
+ let result = timeline
+ .make_lsn_lease(lsn, &ctx)
+ .map_err(|e| ApiError::InternalServerError(e.context("lsn lease http handler")))?;
+
+ json_response(StatusCode::OK, result)
+}
+
// Run GC immediately on given timeline.
async fn timeline_gc_handler(
mut request: Request,
@@ -2712,6 +2738,10 @@ pub fn make_router(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/get_timestamp_of_lsn",
|r| api_handler(r, get_timestamp_of_lsn_handler),
)
+ .post(
+ "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/lsn_lease",
+ |r| api_handler(r, lsn_lease_handler),
+ )
.put(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/do_gc",
|r| api_handler(r, timeline_gc_handler),
diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs
index 35aba044b2..c066f56c17 100644
--- a/pageserver/src/page_service.rs
+++ b/pageserver/src/page_service.rs
@@ -19,6 +19,7 @@ use pageserver_api::models::{
};
use pageserver_api::shard::ShardIndex;
use pageserver_api::shard::ShardNumber;
+use pageserver_api::shard::TenantShardId;
use postgres_backend::{is_expected_io_error, AuthType, PostgresBackend, QueryError};
use pq_proto::framed::ConnectionError;
use pq_proto::FeStartupPacket;
@@ -33,6 +34,7 @@ use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
+use std::time::SystemTime;
use tokio::io::AsyncWriteExt;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::io::StreamReader;
@@ -905,6 +907,39 @@ impl PageServerHandler {
}
}
+ #[instrument(skip_all, fields(shard_id, %lsn))]
+ async fn handle_make_lsn_lease(
+ &self,
+ pgb: &mut PostgresBackend,
+ tenant_shard_id: TenantShardId,
+ timeline_id: TimelineId,
+ lsn: Lsn,
+ ctx: &RequestContext,
+ ) -> Result<(), QueryError>
+ where
+ IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
+ {
+ let shard_selector = ShardSelector::Known(tenant_shard_id.to_index());
+ let timeline = self
+ .get_active_tenant_timeline(tenant_shard_id.tenant_id, timeline_id, shard_selector)
+ .await?;
+ let lease = timeline.make_lsn_lease(lsn, ctx)?;
+ let valid_until = lease
+ .valid_until
+ .duration_since(SystemTime::UNIX_EPOCH)
+ .map_err(|e| QueryError::Other(e.into()))?;
+
+ pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col(
+ b"valid_until",
+ )]))?
+ .write_message_noflush(&BeMessage::DataRow(&[Some(
+ &valid_until.as_millis().to_be_bytes(),
+ )]))?
+ .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
+
+ Ok(())
+ }
+
#[instrument(skip_all, fields(shard_id))]
async fn handle_get_rel_exists_request(
&mut self,
@@ -1802,6 +1837,44 @@ where
// important because psycopg2 executes "SET datestyle TO 'ISO'"
// on connect
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
+ } else if query_string.starts_with("lease lsn ") {
+ let (_, params_raw) = query_string.split_at("lease lsn ".len());
+ let params = params_raw.split_whitespace().collect::>();
+ if params.len() != 3 {
+ return Err(QueryError::Other(anyhow::anyhow!(
+ "invalid param number {} for lease lsn command",
+ params.len()
+ )));
+ }
+
+ let tenant_shard_id = TenantShardId::from_str(params[0])
+ .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
+ let timeline_id = TimelineId::from_str(params[1])
+ .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
+
+ tracing::Span::current()
+ .record("tenant_id", field::display(tenant_shard_id))
+ .record("timeline_id", field::display(timeline_id));
+
+ self.check_permission(Some(tenant_shard_id.tenant_id))?;
+
+ // The caller is responsible for providing correct lsn.
+ let lsn = Lsn::from_str(params[2])
+ .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?;
+
+ match self
+ .handle_make_lsn_lease(pgb, tenant_shard_id, timeline_id, lsn, &ctx)
+ .await
+ {
+ Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
+ Err(e) => {
+ error!("error obtaining lsn lease for {lsn}: {e:?}");
+ pgb.write_message_noflush(&BeMessage::ErrorResponse(
+ &e.to_string(),
+ Some(e.pg_error_code()),
+ ))?
+ }
+ };
} else if query_string.starts_with("show ") {
// show
let (_, params_raw) = query_string.split_at("show ".len());
diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs
index 1d8e2cf6d3..89fdf31849 100644
--- a/pageserver/src/tenant/mgr.rs
+++ b/pageserver/src/tenant/mgr.rs
@@ -7,7 +7,7 @@ use itertools::Itertools;
use pageserver_api::key::Key;
use pageserver_api::models::LocationConfigMode;
use pageserver_api::shard::{
- ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId,
+ ShardCount, ShardIdentity, ShardIndex, ShardNumber, ShardStripeSize, TenantShardId,
};
use pageserver_api::upcall_api::ReAttachResponseTenant;
use rand::{distributions::Alphanumeric, Rng};
@@ -127,6 +127,8 @@ pub(crate) enum ShardSelector {
First,
/// Pick the shard that holds this key
Page(Key),
+ /// The shard ID is known: pick the given shard
+ Known(ShardIndex),
}
/// A convenience for use with the re_attach ControlPlaneClient function: rather
@@ -2067,6 +2069,11 @@ impl TenantManager {
return ShardResolveResult::Found(tenant.clone());
}
}
+ ShardSelector::Known(shard)
+ if tenant.shard_identity.shard_index() == shard =>
+ {
+ return ShardResolveResult::Found(tenant.clone());
+ }
_ => continue,
}
}
diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs
index b0e5275b5f..1f8ee9ffc4 100644
--- a/pageserver/src/tenant/timeline.rs
+++ b/pageserver/src/tenant/timeline.rs
@@ -25,7 +25,7 @@ use pageserver_api::{
models::{
AtomicAuxFilePolicy, AuxFilePolicy, CompactionAlgorithm, DownloadRemoteLayersTaskInfo,
DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy, InMemoryLayerInfo, LayerMapInfo,
- TimelineState,
+ LsnLease, TimelineState,
},
reltag::BlockNumber,
shard::{ShardIdentity, ShardNumber, TenantShardId},
@@ -1532,6 +1532,20 @@ impl Timeline {
Ok(())
}
+ /// Obtains a temporary lease blocking garbage collection for the given LSN
+ pub(crate) fn make_lsn_lease(
+ &self,
+ _lsn: Lsn,
+ _ctx: &RequestContext,
+ ) -> anyhow::Result {
+ const LEASE_LENGTH: Duration = Duration::from_secs(5 * 60);
+ let lease = LsnLease {
+ valid_until: SystemTime::now() + LEASE_LENGTH,
+ };
+ // TODO: dummy implementation
+ Ok(lease)
+ }
+
/// Flush to disk all data that was written with the put_* functions
#[instrument(skip(self), fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id))]
pub(crate) async fn freeze_and_flush(&self) -> anyhow::Result<()> {