Merge commit '5ec82105c' into problame/standby-horizon-leases

This commit is contained in:
Christian Schwarz
2025-08-06 17:46:37 +02:00
15 changed files with 375 additions and 127 deletions

View File

@@ -87,6 +87,24 @@ jobs:
uses: ./.github/workflows/build-build-tools-image.yml
secrets: inherit
lint-openapi-spec:
runs-on: ubuntu-22.04
needs: [ meta, check-permissions ]
# We do need to run this in `.*-rc-pr` because of hotfixes.
if: ${{ contains(fromJSON('["pr", "push-main", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: docker/login-action@74a5d142397b4f367a81961eba4e8cd7edddf772 # v3.4.0
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- run: make lint-openapi-spec
check-codestyle-python:
needs: [ meta, check-permissions, build-build-tools-image ]
# No need to run on `main` because we this in the merge queue. We do need to run this in `.*-rc-pr` because of hotfixes.

1
Cargo.lock generated
View File

@@ -4499,6 +4499,7 @@ name = "pageserver_client_grpc"
version = "0.1.0"
dependencies = [
"anyhow",
"bytes",
"compute_api",
"futures",
"pageserver_api",

View File

@@ -220,6 +220,15 @@ neon-pgindent: postgres-v17-pg-bsd-indent neon-pg-ext-v17
setup-pre-commit-hook:
ln -s -f $(ROOT_PROJECT_DIR)/pre-commit.py .git/hooks/pre-commit
.PHONY: lint-openapi-spec
lint-openapi-spec:
# operation-2xx-response: pageserver timeline delete returns 404 on success
find . -iname "openapi_spec.y*ml" -exec\
docker run --rm -v ${PWD}:/spec ghcr.io/redocly/cli:1.34.4\
--skip-rule=operation-operationId --skip-rule=operation-summary --extends=minimal\
--skip-rule=no-server-example.com --skip-rule=operation-2xx-response\
lint {} \+
# Targets for building PostgreSQL are defined in postgres.mk.
#
# But if the caller has indicated that PostgreSQL is already

View File

@@ -416,15 +416,6 @@ components:
total_startup_ms:
type: integer
Info:
type: object
description: Information about VM/Pod.
required:
- num_cpus
properties:
num_cpus:
type: integer
DbsAndRoles:
type: object
description: Databases and Roles
@@ -642,26 +633,6 @@ components:
description: Promote error, if any
type: string
InstalledExtensions:
type: object
properties:
extensions:
description: Contains list of installed extensions.
type: array
items:
type: object
properties:
extname:
type: string
version:
type: string
items:
type: string
n_databases:
type: integer
owned_by_superuser:
type: integer
SetRoleGrantsRequest:
type: object
required:

View File

@@ -6,6 +6,7 @@ license.workspace = true
[dependencies]
anyhow.workspace = true
bytes.workspace = true
compute_api.workspace = true
futures.workspace = true
pageserver_api.workspace = true

View File

@@ -2,13 +2,15 @@ use std::collections::HashMap;
use std::sync::Arc;
use anyhow::anyhow;
use futures::stream::FuturesUnordered;
use futures::{FutureExt as _, StreamExt as _};
use tracing::instrument;
use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool};
use crate::retry::Retry;
use crate::split::GetPageSplitter;
use compute_api::spec::PageserverProtocol;
use pageserver_api::key::{Key, rel_block_to_key};
use pageserver_api::shard::{ShardStripeSize, key_to_shard_number};
use pageserver_api::shard::ShardStripeSize;
use pageserver_page_api as page_api;
use utils::id::{TenantId, TimelineId};
use utils::shard::{ShardCount, ShardIndex, ShardNumber};
@@ -78,10 +80,11 @@ impl PageserverClient {
.await
}
/// Fetches a page. The `request_id` must be unique across all in-flight requests.
/// Fetches pages. The `request_id` must be unique across all in-flight requests. Automatically
/// splits requests that straddle shard boundaries, and assembles the responses.
///
/// Unlike the `page_api::Client`, this client automatically converts `status_code` into
/// `tonic::Status` errors. All responses will have `GetPageStatusCode::Ok`.
/// Unlike `page_api::Client`, this automatically converts `status_code` into `tonic::Status`
/// errors. All responses will have `GetPageStatusCode::Ok`.
#[instrument(skip_all, fields(
req_id = %req.request_id,
rel = %req.rel,
@@ -93,22 +96,55 @@ impl PageserverClient {
&self,
req: page_api::GetPageRequest,
) -> tonic::Result<page_api::GetPageResponse> {
// TODO: this needs to split batch requests across shards and reassemble responses into a
// single response. It must also re-split the batch in case the shard map changes. For now,
// just use the first page.
let key = rel_block_to_key(
req.rel,
req.block_numbers
.first()
.copied()
.ok_or_else(|| tonic::Status::invalid_argument("no block numbers provided"))?,
);
// Make sure we have at least one page.
if req.block_numbers.is_empty() {
return Err(tonic::Status::invalid_argument("no block number"));
}
self.retry
// Fast path: request is for a single shard.
if let Some(shard_id) =
GetPageSplitter::is_single_shard(&req, self.shards.count, self.shards.stripe_size)
{
return self.get_page_for_shard(shard_id, req).await;
}
// Request spans multiple shards. Split it, dispatch concurrent per-shard requests, and
// reassemble the responses.
//
// TODO: when we support shard map updates, we need to detect when it changes and re-split
// the request on errors.
let mut splitter = GetPageSplitter::split(req, self.shards.count, self.shards.stripe_size);
let mut shard_requests: FuturesUnordered<_> = splitter
.drain_requests()
.map(|(shard_id, shard_req)| {
// NB: each request will retry internally.
self.get_page_for_shard(shard_id, shard_req)
.map(move |result| result.map(|resp| (shard_id, resp)))
})
.collect();
while let Some((shard_id, shard_response)) = shard_requests.next().await.transpose()? {
splitter.add_response(shard_id, shard_response)?;
}
splitter.assemble_response()
}
/// Fetches pages that belong to the given shard.
#[instrument(skip_all, fields(shard = %shard_id))]
async fn get_page_for_shard(
&self,
shard_id: ShardIndex,
req: page_api::GetPageRequest,
) -> tonic::Result<page_api::GetPageResponse> {
let resp = self
.retry
.with(async || {
let stream = self.shards.get_for_key(key).stream().await;
let stream = self.shards.get(shard_id)?.stream().await;
let resp = stream.send(req.clone()).await?;
// Convert per-request errors into a tonic::Status.
if resp.status_code != page_api::GetPageStatusCode::Ok {
return Err(tonic::Status::new(
resp.status_code.into(),
@@ -118,7 +154,18 @@ impl PageserverClient {
Ok(resp)
})
.await
.await?;
// Make sure we got the right number of pages.
// NB: check outside of the retry loop, since we don't want to retry this.
let (expected, actual) = (req.block_numbers.len(), resp.page_images.len());
if expected != actual {
return Err(tonic::Status::internal(format!(
"expected {expected} pages for shard {shard_id}, got {actual}",
)));
}
Ok(resp)
}
/// Returns the size of a relation, as # of blocks.
@@ -216,13 +263,6 @@ impl Shards {
.ok_or_else(|| tonic::Status::not_found(format!("unknown shard {shard_id}")))
}
/// Looks up the shard that owns the given key.
fn get_for_key(&self, key: Key) -> &Shard {
let shard_number = key_to_shard_number(self.count, self.stripe_size, &key);
self.get(ShardIndex::new(shard_number, self.count))
.expect("must exist")
}
/// Returns shard 0.
fn get_zero(&self) -> &Shard {
self.get(ShardIndex::new(ShardNumber(0), self.count))

View File

@@ -1,5 +1,6 @@
mod client;
mod pool;
mod retry;
mod split;
pub use client::PageserverClient;

View File

@@ -0,0 +1,172 @@
use std::collections::HashMap;
use bytes::Bytes;
use pageserver_api::key::rel_block_to_key;
use pageserver_api::shard::{ShardStripeSize, key_to_shard_number};
use pageserver_page_api as page_api;
use utils::shard::{ShardCount, ShardIndex};
/// Splits GetPageRequests that straddle shard boundaries and assembles the responses.
/// TODO: add tests for this.
pub struct GetPageSplitter {
/// The original request ID. Used for all shard requests.
request_id: page_api::RequestID,
/// Split requests by shard index.
requests: HashMap<ShardIndex, page_api::GetPageRequest>,
/// Maps the offset in `GetPageRequest::block_numbers` to the owning shard. Used to assemble
/// the response pages in the same order as the original request.
block_shards: Vec<ShardIndex>,
/// Page responses by shard index. Will be assembled into a single response.
responses: HashMap<ShardIndex, Vec<Bytes>>,
}
impl GetPageSplitter {
/// Checks if the given request only touches a single shard, and returns the shard ID. This is
/// the common case, so we check first in order to avoid unnecessary allocations and overhead.
/// The caller must ensure that the request has at least one block number, or this will panic.
pub fn is_single_shard(
req: &page_api::GetPageRequest,
count: ShardCount,
stripe_size: ShardStripeSize,
) -> Option<ShardIndex> {
// Fast path: unsharded tenant.
if count.is_unsharded() {
return Some(ShardIndex::unsharded());
}
// Find the base shard index for the first page, and compare with the rest.
let key = rel_block_to_key(req.rel, *req.block_numbers.first().expect("no pages"));
let shard_number = key_to_shard_number(count, stripe_size, &key);
req.block_numbers
.iter()
.skip(1) // computed above
.all(|&blkno| {
let key = rel_block_to_key(req.rel, blkno);
key_to_shard_number(count, stripe_size, &key) == shard_number
})
.then_some(ShardIndex::new(shard_number, count))
}
/// Splits the given request.
pub fn split(
req: page_api::GetPageRequest,
count: ShardCount,
stripe_size: ShardStripeSize,
) -> Self {
// The caller should make sure we don't split requests unnecessarily.
debug_assert!(
Self::is_single_shard(&req, count, stripe_size).is_none(),
"unnecessary request split"
);
// Split the requests by shard index.
let mut requests = HashMap::with_capacity(2); // common case
let mut block_shards = Vec::with_capacity(req.block_numbers.len());
for blkno in req.block_numbers {
let key = rel_block_to_key(req.rel, blkno);
let shard_number = key_to_shard_number(count, stripe_size, &key);
let shard_id = ShardIndex::new(shard_number, count);
let shard_req = requests
.entry(shard_id)
.or_insert_with(|| page_api::GetPageRequest {
request_id: req.request_id,
request_class: req.request_class,
rel: req.rel,
read_lsn: req.read_lsn,
block_numbers: Vec::new(),
});
shard_req.block_numbers.push(blkno);
block_shards.push(shard_id);
}
Self {
request_id: req.request_id,
responses: HashMap::with_capacity(requests.len()),
requests,
block_shards,
}
}
/// Drains the per-shard requests, moving them out of the hashmap to avoid extra allocations.
pub fn drain_requests(
&mut self,
) -> impl Iterator<Item = (ShardIndex, page_api::GetPageRequest)> {
self.requests.drain()
}
/// Adds a response from the given shard.
#[allow(clippy::result_large_err)]
pub fn add_response(
&mut self,
shard_id: ShardIndex,
response: page_api::GetPageResponse,
) -> tonic::Result<()> {
// The caller should already have converted status codes into tonic::Status.
assert_eq!(response.status_code, page_api::GetPageStatusCode::Ok);
// Make sure the response matches the request ID.
if response.request_id != self.request_id {
return Err(tonic::Status::internal(format!(
"response ID {} does not match request ID {}",
response.request_id, self.request_id
)));
}
// Add the response data to the map.
let old = self.responses.insert(shard_id, response.page_images);
if old.is_some() {
return Err(tonic::Status::internal(format!(
"duplicate response for shard {shard_id}",
)));
}
Ok(())
}
/// Assembles the shard responses into a single response. Responses must be present for all
/// relevant shards, and the total number of pages must match the original request.
#[allow(clippy::result_large_err)]
pub fn assemble_response(self) -> tonic::Result<page_api::GetPageResponse> {
let mut response = page_api::GetPageResponse {
request_id: self.request_id,
status_code: page_api::GetPageStatusCode::Ok,
reason: None,
page_images: Vec::with_capacity(self.block_shards.len()),
};
// Set up per-shard page iterators we can pull from.
let mut shard_responses = HashMap::with_capacity(self.responses.len());
for (shard_id, responses) in self.responses {
shard_responses.insert(shard_id, responses.into_iter());
}
// Reassemble the responses in the same order as the original request.
for shard_id in &self.block_shards {
let page = shard_responses
.get_mut(shard_id)
.ok_or_else(|| {
tonic::Status::internal(format!("missing response for shard {shard_id}"))
})?
.next()
.ok_or_else(|| {
tonic::Status::internal(format!("missing page from shard {shard_id}"))
})?;
response.page_images.push(page);
}
// Make sure there are no additional pages.
for (shard_id, mut pages) in shard_responses {
if pages.next().is_some() {
return Err(tonic::Status::internal(format!(
"extra pages returned from shard {shard_id}"
)));
}
}
Ok(response)
}
}

View File

@@ -409,11 +409,12 @@ impl TenantFeatureResolver {
/// Refresh the cached properties and flags on the critical path.
pub fn refresh_properties_and_flags(&self, tenant_shard: &TenantShard) {
let mut remote_size_mb = None;
let mut remote_size_mb = Some(0.0);
for timeline in tenant_shard.list_timelines() {
let size = timeline.metrics.resident_physical_size_get();
if size == 0 {
remote_size_mb = None;
break;
}
if let Some(ref mut remote_size_mb) = remote_size_mb {
*remote_size_mb += size as f64 / 1024.0 / 1024.0;

View File

@@ -116,26 +116,6 @@ paths:
schema:
type: string
/v1/tenant/{tenant_id}/timeline:
parameters:
- name: tenant_id
in: path
required: true
schema:
type: string
get:
description: Get timelines for tenant
responses:
"200":
description: TimelineInfo
content:
application/json:
schema:
type: array
items:
$ref: "#/components/schemas/TimelineInfo"
/v1/tenant/{tenant_id}/timeline/{timeline_id}:
parameters:
- name: tenant_id
@@ -618,7 +598,7 @@ paths:
schema:
$ref: "#/components/schemas/SecondaryProgress"
/v1/tenant/{tenant_id}/timeline/:
/v1/tenant/{tenant_id}/timeline:
parameters:
- name: tenant_id
in: path
@@ -685,6 +665,17 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/Error"
get:
description: Get timelines for tenant
responses:
"200":
description: TimelineInfo
content:
application/json:
schema:
type: array
items:
$ref: "#/components/schemas/TimelineInfo"
/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/detach_ancestor:
parameters:
@@ -767,7 +758,7 @@ paths:
$ref: "#/components/schemas/ServiceUnavailableError"
/v1/tenant/:
/v1/tenant:
get:
description: Get tenants list
responses:
@@ -847,7 +838,7 @@ paths:
items:
$ref: "#/components/schemas/TenantInfo"
/v1/tenant/{tenant_id}/config/:
/v1/tenant/{tenant_id}/config:
parameters:
- name: tenant_id
in: path

View File

@@ -3694,6 +3694,23 @@ async fn read_tar_eof(mut reader: (impl tokio::io::AsyncRead + Unpin)) -> anyhow
Ok(())
}
async fn force_refresh_feature_flag(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let state = get_state(&request);
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
tenant
.feature_resolver
.refresh_properties_and_flags(&tenant);
json_response(StatusCode::OK, ())
}
async fn tenant_evaluate_feature_flag(
request: Request<Body>,
_cancel: CancellationToken,
@@ -4159,6 +4176,9 @@ pub fn make_router(
.get("/v1/tenant/:tenant_shard_id/feature_flag/:flag_key", |r| {
api_handler(r, tenant_evaluate_feature_flag)
})
.post("/v1/tenant/:tenant_shard_id/force_refresh_feature_flag", |r| {
api_handler(r, force_refresh_feature_flag)
})
.put("/v1/feature_flag/:flag_key", |r| {
testing_api_handler("force override feature flag - put", r, force_override_feature_flag_for_testing_put)
})

View File

@@ -1247,3 +1247,10 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
)
self.verbose_error(res)
return res.json()
def force_refresh_feature_flag(self, tenant_id: TenantId | TenantShardId):
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/force_refresh_feature_flag",
)
self.verbose_error(res)
return res.json()

View File

@@ -117,7 +117,9 @@ class NeonBranch:
def create_child_branch(self) -> NeonBranch | None:
return self.project.create_branch(self.id)
def create_ro_endpoint(self) -> NeonEndpoint:
def create_ro_endpoint(self) -> NeonEndpoint | None:
if not self.project.check_limit_endpoints():
return None
return NeonEndpoint(
self.project,
self.neon_api.create_endpoint(self.project_id, self.id, "read_only", {})["endpoint"],
@@ -151,11 +153,26 @@ class NeonBranch:
return
self.updated_at = datetime.fromisoformat(res["branch"]["updated_at"])
parent_id: str = res["branch"]["parent_id"]
# XXX Retry get parent details to work around the issue
# https://databricks.atlassian.net/browse/LKB-279
target_time = datetime.now() + timedelta(seconds=30)
while datetime.now() < target_time:
try:
parent_def = self.neon_api.get_branch_details(self.project_id, parent_id)
except HTTPError as he:
if he.response.status_code == 404:
log.info("Branch not found, waiting...")
time.sleep(1)
else:
raise HTTPError(he) from he
else:
break
else:
raise RuntimeError(f"Branch {parent_id} not found")
# Creates an object for the parent branch
# After the reset operation a new parent branch is created
parent = NeonBranch(
self.project, self.neon_api.get_branch_details(self.project_id, parent_id), True
)
parent = NeonBranch(self.project, parent_def, True)
self.project.branches[parent_id] = parent
self.parent = parent
parent.children[self.id] = self
@@ -168,29 +185,21 @@ class NeonBranch:
source_timestamp: str | None = None,
preserve_under_name: str | None = None,
) -> dict[str, Any] | None:
if not self.project.check_limit_branches():
return None
endpoints = [ep for ep in self.endpoints.values() if ep.type == "read_only"]
# Terminate all the benchmarks running to prevent errors. Errors in benchmark during pgbench are expected
for ep in endpoints:
ep.terminate_benchmark()
self.terminate_benchmark()
try:
res: dict[str, Any] = self.neon_api.restore_branch(
self.project_id,
self.id,
source_branch_id,
source_lsn,
source_timestamp,
preserve_under_name,
)
except HTTPError as he:
if (
he.response.status_code == 422
and he.response.json()["code"] == "BRANCHES_LIMIT_EXCEEDED"
):
log.info("Branch limit exceeded, skipping")
return None
else:
raise HTTPError(he) from he
res: dict[str, Any] = self.neon_api.restore_branch(
self.project_id,
self.id,
source_branch_id,
source_lsn,
source_timestamp,
preserve_under_name,
)
self.project.wait()
self.start_benchmark()
for ep in endpoints:
@@ -239,19 +248,30 @@ class NeonProject:
def delete(self) -> None:
self.neon_api.delete_project(self.id)
def check_limit_branches(self) -> bool:
if self.limits["max_branches"] == -1 or len(self.branches) < self.limits["max_branches"]:
return True
log.info("branch limit exceeded (%s/%s)", len(self.branches), self.limits["max_branches"])
return False
def check_limit_endpoints(self) -> bool:
if (
self.limits["max_read_only_endpoints"] == -1
or self.read_only_endpoints_total < self.limits["max_read_only_endpoints"]
):
return True
log.info(
"Maximum read only endpoint limit exceeded (%s/%s)",
self.read_only_endpoints_total,
self.limits["max_read_only_endpoints"],
)
return False
def create_branch(self, parent_id: str | None = None) -> NeonBranch | None:
self.wait()
try:
branch_def = self.neon_api.create_branch(self.id, parent_id=parent_id)
except HTTPError as he:
if (
he.response.status_code == 422
and he.response.json()["code"] == "BRANCHES_LIMIT_EXCEEDED"
):
log.info("Branch limit exceeded, skipping")
return None
else:
raise HTTPError(he) from he
if not self.check_limit_branches():
return None
branch_def = self.neon_api.create_branch(self.id, parent_id=parent_id)
new_branch = NeonBranch(self, branch_def)
self.wait()
return new_branch
@@ -388,17 +408,9 @@ def do_action(project: NeonProject, action: str) -> bool:
log.info("Action: %s", action)
if action == "new_branch":
log.info("Trying to create a new branch")
if 0 <= project.limits["max_branches"] <= len(project.branches):
log.info(
"Maximum branch limit exceeded (%s of %s)",
len(project.branches),
project.limits["max_branches"],
)
return False
parent = project.branches[
random.choice(list(set(project.branches.keys()) - project.reset_branches))
]
log.info("Parent: %s", parent)
child = parent.create_child_branch()
if child is None:
return False
@@ -413,16 +425,11 @@ def do_action(project: NeonProject, action: str) -> bool:
log.info("Leaf branches not found, skipping")
return False
elif action == "new_ro_endpoint":
if 0 <= project.limits["max_read_only_endpoints"] <= project.read_only_endpoints_total:
log.info(
"Maximum read only endpoint limit exceeded (%s of %s)",
project.read_only_endpoints_total,
project.limits["max_read_only_endpoints"],
)
return False
ep = random.choice(
[br for br in project.branches.values() if br.id not in project.reset_branches]
).create_ro_endpoint()
if ep is None:
return False
log.info("Created the RO endpoint with id %s branch: %s", ep.id, ep.branch.id)
ep.start_benchmark()
elif action == "delete_ro_endpoint":

View File

@@ -49,3 +49,12 @@ def test_feature_flag(neon_env_builder: NeonEnvBuilder):
env.initial_tenant, "test-feature-flag"
)["result"]
)
env.pageserver.http_client().force_refresh_feature_flag(env.initial_tenant)
# Check if the properties exist
result = env.pageserver.http_client().evaluate_feature_flag_multivariate(
env.initial_tenant, "test-feature-flag"
)
assert "tenant_remote_size_mb" in result["properties"]
assert "tenant_id" in result["properties"]

View File

@@ -332,7 +332,7 @@ def test_multiple_subscription_branching(neon_simple_env: NeonEnv):
last_insert_lsn = query_scalar(cursor, "select pg_current_wal_insert_lsn();")
def start_publisher_workload(table_num: int, duration: int):
def start_publisher_workload(i: int, duration: int):
start = time.time()
with endpoint.cursor(dbname="publisher_db") as cur:
while time.time() - start < duration: