Compare commits

...

11 Commits

Author SHA1 Message Date
Vlad Lazar
11a97bbfd6 pageserver: handle rel drops correctly in rel size cache 2025-05-14 17:09:01 +02:00
Alex Chi Z.
81fd652151 fix(pageserver): use better estimation for compaction memory usage (#11904)
## Problem

Hopefully resolves `test_gc_feedback` flakiness.

## Summary of changes

`accumulated_values` should not exceed 512MB to avoid OOM. Previously we
only use number of items, which is not a good estimation.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-05-14 08:32:55 +00:00
Elizabeth Murray
d47e88e353 Update the pgrag version in the compute dockerfile. (#11867)
## Problem

The extensions test are hanging because of pgrag. The new version of
pgrag contains a fix for the hang.

## Summary of changes
2025-05-14 07:00:59 +00:00
Vlad Lazar
045ae13e06 pageserver: make imports work with tenant shut downs (#11855)
## Problem

Lifetime of imported timelines (and implicitly the import background
task) has some shortcomings:
1. Timeline activation upon import completion is tricky. Previously, a
timeline that finished importing
after a tenant detach would not get activated and there's concerns about
the safety of activating
concurrently with shut-down.
2. Import jobs can prevent tenant shut down since they hold the tenant
gate

## Summary of Changes

Track the import tasks in memory and abort them explicitly on tenant
shutdown.

Integrate more closely with the storage controller:
1. When an import task has finished all of its jobs, it notifies the
storage controller, but **does not** mark the import as done in the
index_part. When all shards have finished importing, the storage
controller will call the `/activate_post_import` idempotent endpoint for
all of them. The handler, marks the import complete in index part,
resets the tenant if required and checks if the timeline is active yet.
2. Not directly related, but the import job now gets the starting state
from the storage controller instead of the import bucket. This paves the
way for progress checkpointing.

Related: https://github.com/neondatabase/neon/issues/11568
2025-05-13 17:49:49 +00:00
Folke Behrens
234c882a07 proxy: Expose handlers for cpu and heap profiling (#11912)
## Problem

It's difficult to understand where proxy spends most of cpu and memory.

## Summary of changes

Expose cpu and heap profiling handlers for continuous profiling.

neondatabase/cloud#22670
2025-05-13 14:58:37 +00:00
Konstantin Knizhnik
290369061f Check prefetch result in DEBUG_COMPARE_LOCAL mode (#11502)
## Problem

Prefetched and LFC results are not checked in DEBUG_COMPARE_LOCAL mode

## Summary of changes

Add check for this results as well.

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-05-13 14:13:42 +00:00
Anastasia Lubennikova
25ab16ee24 chore(compute): Postgres 17.5, 16.9, 15.13 and 14.18 (#11886)
Bump all minor versions. 

the only conflict was
src/backend/storage/smgr/smgr.c in v17
where our smgr changes conflicted with

ee578921b6
but it was trivial to resolve.
2025-05-13 13:30:09 +00:00
Vlad Lazar
cfbef4d586 safekeeper: downgrade stream from future WAL log (#11909)
## Problem

1. Safekeeper selection on the pageserver side isn't very dynamic. Once
you connect to one safekeeper, you'll use that one for as long as the
safekeeper keeps the connection alive. In principle, we could be more
eager, since the wal receiver connection can be cancelled but we don't
do that. We wait until the "session" is done and then we pick a new SK.
2. Picking a new SK is quite conservative. We will switch if: 
a. We haven't received anything from the SK within the last 10 seconds
(wal_connect_timeout) or
b. The candidate SK is 1GiB ahead or
c. The candidate SK is in the same AZ as the PS or d. There's a
candidate that is ahead and we've not had any WAL within the last 10
seconds (lagging_wal_timeout)

Hence, we can end up with pageservers that are requesting WAL which
their safekeeper hasn't seen yet.

## Summary of changes

Downgrade warning log to info.
2025-05-13 13:02:25 +00:00
Alex Chi Z.
34a42b00ca feat(pageserver): add PostHog lite client (#11821)
## Problem

part of https://github.com/neondatabase/neon/issues/11813

## Summary of changes

Add a lite PostHog client that only uses the local flag evaluation
functionality. Added a test case that parses an example feature flag and
gets the evaluation result.

TODO: support boolean flag, remote config; implement all operators in
PostHog.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-05-13 09:49:14 +00:00
Alex Chi Z.
a9979620c5 fix(remote_storage): continue on Azure+AWS retryable error (#11903)
## Problem

We implemented the retry logic in AWS S3 but not in Azure. Therefore, if
there is an error during Azure listing, we will return an Err to the
caller, and the stream will end without fetching more tenants.

Part of https://github.com/neondatabase/neon/issues/11159

Without this fix, listing tenant will stop once we hit an error (could
be network errors -- that happens more frequent on Azure). If we happen
to stop at a point that we only listed part of the shards, we will hit
the "missed shards" error or even remove layers being used.

This bug (for Azure listing) was introduced as part of
https://github.com/neondatabase/neon/pull/9840

There is also a bug that stops the stream for AWS when there's a timeout
-- this is fixed along with this patch.

## Summary of changes

Retry the request on error. In the future, we should make such streams
return something like `Result<Result<T>>` where the outer result is the
error that ends the stream and the inner one is the error that should be
retried by the caller.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-05-13 08:53:35 +00:00
Conrad Ludgate
a113c48c43 proxy: fix redis batching support (#11905)
## Problem

For `StoreCancelKey`, we were inserting 2 commands, but we were not
inserting two replies. This mismatch leads to errors when decoding the
response.

## Summary of changes

Abstract the command + reply pipeline so that commands and replies are
registered at the same time.
2025-05-13 08:33:53 +00:00
42 changed files with 1967 additions and 665 deletions

16
Cargo.lock generated
View File

@@ -4848,6 +4848,19 @@ dependencies = [
"workspace_hack",
]
[[package]]
name = "posthog_client_lite"
version = "0.1.0"
dependencies = [
"anyhow",
"reqwest",
"serde",
"serde_json",
"sha2",
"thiserror 1.0.69",
"workspace_hack",
]
[[package]]
name = "powerfmt"
version = "0.2.0"
@@ -8439,8 +8452,10 @@ dependencies = [
"fail",
"form_urlencoded",
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-task",
"futures-util",
"generic-array",
"getrandom 0.2.11",
@@ -8470,6 +8485,7 @@ dependencies = [
"once_cell",
"p256 0.13.2",
"parquet",
"percent-encoding",
"prettyplease",
"proc-macro2",
"prost 0.13.3",

View File

@@ -26,6 +26,7 @@ members = [
"libs/utils",
"libs/consumption_metrics",
"libs/postgres_backend",
"libs/posthog_client_lite",
"libs/pq_proto",
"libs/tenant_size_model",
"libs/metrics",

View File

@@ -1117,8 +1117,8 @@ RUN wget https://github.com/microsoft/onnxruntime/archive/refs/tags/v1.18.1.tar.
mkdir onnxruntime-src && cd onnxruntime-src && tar xzf ../onnxruntime.tar.gz --strip-components=1 -C . && \
echo "#nothing to test here" > neon-test.sh
RUN wget https://github.com/neondatabase-labs/pgrag/archive/refs/tags/v0.1.1.tar.gz -O pgrag.tar.gz && \
echo "087b2ecd11ba307dc968042ef2e9e43dc04d9ba60e8306e882c407bbe1350a50 pgrag.tar.gz" | sha256sum --check && \
RUN wget https://github.com/neondatabase-labs/pgrag/archive/refs/tags/v0.1.2.tar.gz -O pgrag.tar.gz && \
echo "7361654ea24f08cbb9db13c2ee1c0fe008f6114076401bb871619690dafc5225 pgrag.tar.gz" | sha256sum --check && \
mkdir pgrag-src && cd pgrag-src && tar xzf ../pgrag.tar.gz --strip-components=1 -C .
FROM rust-extensions-build-pgrx14 AS pgrag-build

View File

@@ -36,6 +36,24 @@ impl Value {
Value::WalRecord(rec) => rec.will_init(),
}
}
#[inline(always)]
pub fn estimated_size(&self) -> usize {
match self {
Value::Image(image) => image.len(),
Value::WalRecord(NeonWalRecord::AuxFile {
content: Some(content),
..
}) => content.len(),
Value::WalRecord(NeonWalRecord::Postgres { rec, .. }) => rec.len(),
Value::WalRecord(NeonWalRecord::ClogSetAborted { xids }) => xids.len() * 4,
Value::WalRecord(NeonWalRecord::ClogSetCommitted { xids, .. }) => xids.len() * 4,
Value::WalRecord(NeonWalRecord::MultixactMembersCreate { members, .. }) => {
members.len() * 8
}
_ => 8192, /* use image size as the estimation */
}
}
}
#[derive(Debug, PartialEq)]

View File

@@ -0,0 +1,14 @@
[package]
name = "posthog_client_lite"
version = "0.1.0"
edition = "2024"
license.workspace = true
[dependencies]
anyhow.workspace = true
reqwest.workspace = true
serde.workspace = true
serde_json.workspace = true
sha2.workspace = true
workspace_hack.workspace = true
thiserror.workspace = true

View File

@@ -0,0 +1,634 @@
//! A lite version of the PostHog client that only supports local evaluation of feature flags.
use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use serde_json::json;
use sha2::Digest;
#[derive(Debug, thiserror::Error)]
pub enum PostHogEvaluationError {
/// The feature flag is not available, for example, because the local evaluation data is not populated yet.
#[error("Feature flag not available: {0}")]
NotAvailable(String),
#[error("No condition group is matched")]
NoConditionGroupMatched,
/// Real errors, e.g., the rollout percentage does not add up to 100.
#[error("Failed to evaluate feature flag: {0}")]
Internal(String),
}
#[derive(Deserialize)]
pub struct LocalEvaluationResponse {
#[allow(dead_code)]
flags: Vec<LocalEvaluationFlag>,
}
#[derive(Deserialize)]
pub struct LocalEvaluationFlag {
key: String,
filters: LocalEvaluationFlagFilters,
active: bool,
}
#[derive(Deserialize)]
pub struct LocalEvaluationFlagFilters {
groups: Vec<LocalEvaluationFlagFilterGroup>,
multivariate: LocalEvaluationFlagMultivariate,
}
#[derive(Deserialize)]
pub struct LocalEvaluationFlagFilterGroup {
variant: Option<String>,
properties: Option<Vec<LocalEvaluationFlagFilterProperty>>,
rollout_percentage: i64,
}
#[derive(Deserialize)]
pub struct LocalEvaluationFlagFilterProperty {
key: String,
value: PostHogFlagFilterPropertyValue,
operator: String,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(untagged)]
pub enum PostHogFlagFilterPropertyValue {
String(String),
Number(f64),
Boolean(bool),
List(Vec<String>),
}
#[derive(Deserialize)]
pub struct LocalEvaluationFlagMultivariate {
variants: Vec<LocalEvaluationFlagMultivariateVariant>,
}
#[derive(Deserialize)]
pub struct LocalEvaluationFlagMultivariateVariant {
key: String,
rollout_percentage: i64,
}
pub struct FeatureStore {
flags: HashMap<String, LocalEvaluationFlag>,
}
impl Default for FeatureStore {
fn default() -> Self {
Self::new()
}
}
enum GroupEvaluationResult {
MatchedAndOverride(String),
MatchedAndEvaluate,
Unmatched,
}
impl FeatureStore {
pub fn new() -> Self {
Self {
flags: HashMap::new(),
}
}
pub fn set_flags(&mut self, flags: Vec<LocalEvaluationFlag>) {
self.flags.clear();
for flag in flags {
self.flags.insert(flag.key.clone(), flag);
}
}
/// Generate a consistent hash for a user ID (e.g., tenant ID).
///
/// The implementation is different from PostHog SDK. In PostHog SDK, it is sha1 of `user_id.distinct_id.salt`.
/// However, as we do not upload all of our tenant IDs to PostHog, we do not have the PostHog distinct_id for a
/// tenant. Therefore, the way we compute it is sha256 of `user_id.feature_id.salt`.
fn consistent_hash(user_id: &str, flag_key: &str, salt: &str) -> f64 {
let mut hasher = sha2::Sha256::new();
hasher.update(user_id);
hasher.update(".");
hasher.update(flag_key);
hasher.update(".");
hasher.update(salt);
let hash = hasher.finalize();
let hash_int = u64::from_le_bytes(hash[..8].try_into().unwrap());
hash_int as f64 / u64::MAX as f64
}
/// Evaluate a condition. Returns an error if the condition cannot be evaluated due to parsing error or missing
/// property.
fn evaluate_condition(
&self,
operator: &str,
provided: &PostHogFlagFilterPropertyValue,
requested: &PostHogFlagFilterPropertyValue,
) -> Result<bool, PostHogEvaluationError> {
match operator {
"exact" => {
let PostHogFlagFilterPropertyValue::String(provided) = provided else {
// Left should be a string
return Err(PostHogEvaluationError::Internal(format!(
"The left side of the condition is not a string: {:?}",
provided
)));
};
let PostHogFlagFilterPropertyValue::List(requested) = requested else {
// Right should be a list of string
return Err(PostHogEvaluationError::Internal(format!(
"The right side of the condition is not a list: {:?}",
requested
)));
};
Ok(requested.contains(provided))
}
"lt" | "gt" => {
let PostHogFlagFilterPropertyValue::String(requested) = requested else {
// Right should be a string
return Err(PostHogEvaluationError::Internal(format!(
"The right side of the condition is not a string: {:?}",
requested
)));
};
let Ok(requested) = requested.parse::<f64>() else {
return Err(PostHogEvaluationError::Internal(format!(
"Can not parse the right side of the condition as a number: {:?}",
requested
)));
};
// Left can either be a number or a string
let provided = match provided {
PostHogFlagFilterPropertyValue::Number(provided) => *provided,
PostHogFlagFilterPropertyValue::String(provided) => {
let Ok(provided) = provided.parse::<f64>() else {
return Err(PostHogEvaluationError::Internal(format!(
"Can not parse the left side of the condition as a number: {:?}",
provided
)));
};
provided
}
_ => {
return Err(PostHogEvaluationError::Internal(format!(
"The left side of the condition is not a number or a string: {:?}",
provided
)));
}
};
match operator {
"lt" => Ok(provided < requested),
"gt" => Ok(provided > requested),
op => Err(PostHogEvaluationError::Internal(format!(
"Unsupported operator: {}",
op
))),
}
}
_ => Err(PostHogEvaluationError::Internal(format!(
"Unsupported operator: {}",
operator
))),
}
}
/// Evaluate a percentage.
fn evaluate_percentage(&self, mapped_user_id: f64, percentage: i64) -> bool {
mapped_user_id <= percentage as f64 / 100.0
}
/// Evaluate a filter group for a feature flag. Returns an error if there are errors during the evaluation.
///
/// Return values:
/// Ok(GroupEvaluationResult::MatchedAndOverride(variant)): matched and evaluated to this value
/// Ok(GroupEvaluationResult::MatchedAndEvaluate): condition matched but no variant override, use the global rollout percentage
/// Ok(GroupEvaluationResult::Unmatched): condition unmatched
fn evaluate_group(
&self,
group: &LocalEvaluationFlagFilterGroup,
hash_on_group_rollout_percentage: f64,
provided_properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
) -> Result<GroupEvaluationResult, PostHogEvaluationError> {
if let Some(ref properties) = group.properties {
for property in properties {
if let Some(value) = provided_properties.get(&property.key) {
// The user provided the property value
if !self.evaluate_condition(
property.operator.as_ref(),
value,
&property.value,
)? {
return Ok(GroupEvaluationResult::Unmatched);
}
} else {
// We cannot evaluate, the property is not available
return Err(PostHogEvaluationError::NotAvailable(format!(
"The required property in the condition is not available: {}",
property.key
)));
}
}
}
// The group has no condition matchers or we matched the properties
if self.evaluate_percentage(hash_on_group_rollout_percentage, group.rollout_percentage) {
if let Some(ref variant_override) = group.variant {
Ok(GroupEvaluationResult::MatchedAndOverride(
variant_override.clone(),
))
} else {
Ok(GroupEvaluationResult::MatchedAndEvaluate)
}
} else {
Ok(GroupEvaluationResult::Unmatched)
}
}
/// Evaluate a multivariate feature flag. Returns `None` if the flag is not available or if there are errors
/// during the evaluation.
///
/// The parsing logic is as follows:
///
/// * Match each filter group.
/// - If a group is matched, it will first determine whether the user is in the range of the group's rollout
/// percentage. We will generate a consistent hash for the user ID on the group rollout percentage. This hash
/// is shared across all groups.
/// - If the hash falls within the group's rollout percentage, return the variant if it's overridden, or
/// - Evaluate the variant using the global config and the global rollout percentage.
/// * Otherwise, continue with the next group until all groups are evaluated and no group is within the
/// rollout percentage.
/// * If there are no matching groups, return an error.
///
/// Example: we have a multivariate flag with 3 groups of the configured global rollout percentage: A (10%), B (20%), C (70%).
/// There is a single group with a condition that has a rollout percentage of 10% and it does not have a variant override.
/// Then, we will have 1% of the users evaluated to A, 2% to B, and 7% to C.
pub fn evaluate_multivariate(
&self,
flag_key: &str,
user_id: &str,
) -> Result<String, PostHogEvaluationError> {
let hash_on_global_rollout_percentage =
Self::consistent_hash(user_id, flag_key, "multivariate");
let hash_on_group_rollout_percentage =
Self::consistent_hash(user_id, flag_key, "within_group");
self.evaluate_multivariate_inner(
flag_key,
hash_on_global_rollout_percentage,
hash_on_group_rollout_percentage,
&HashMap::new(),
)
}
/// Evaluate a multivariate feature flag. Note that we directly take the mapped user ID
/// (a consistent hash ranging from 0 to 1) so that it is easier to use it in the tests
/// and avoid duplicate computations.
///
/// Use a different consistent hash for evaluating the group rollout percentage.
/// The behavior: if the condition is set to rolling out to 10% of the users, and
/// we set the variant A to 20% in the global config, then 2% of the total users will
/// be evaluated to variant A.
///
/// Note that the hash to determine group rollout percentage is shared across all groups. So if we have two
/// exactly-the-same conditions with 10% and 20% rollout percentage respectively, a total of 20% of the users
/// will be evaluated (versus 30% if group evaluation is done independently).
pub(crate) fn evaluate_multivariate_inner(
&self,
flag_key: &str,
hash_on_global_rollout_percentage: f64,
hash_on_group_rollout_percentage: f64,
properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
) -> Result<String, PostHogEvaluationError> {
if let Some(flag_config) = self.flags.get(flag_key) {
if !flag_config.active {
return Err(PostHogEvaluationError::NotAvailable(format!(
"The feature flag is not active: {}",
flag_key
)));
}
// TODO: sort the groups so that variant overrides always get evaluated first and it follows the PostHog
// Python SDK behavior; for now we do not configure conditions without variant overrides in Neon so it
// does not matter.
for group in &flag_config.filters.groups {
match self.evaluate_group(group, hash_on_group_rollout_percentage, properties)? {
GroupEvaluationResult::MatchedAndOverride(variant) => return Ok(variant),
GroupEvaluationResult::MatchedAndEvaluate => {
let mut percentage = 0;
for variant in &flag_config.filters.multivariate.variants {
percentage += variant.rollout_percentage;
if self
.evaluate_percentage(hash_on_global_rollout_percentage, percentage)
{
return Ok(variant.key.clone());
}
}
// This should not happen because the rollout percentage always adds up to 100, but just in case that PostHog
// returned invalid spec, we return an error.
return Err(PostHogEvaluationError::Internal(format!(
"Rollout percentage does not add up to 100: {}",
flag_key
)));
}
GroupEvaluationResult::Unmatched => continue,
}
}
// If no group is matched, the feature is not available, and up to the caller to decide what to do.
Err(PostHogEvaluationError::NoConditionGroupMatched)
} else {
// The feature flag is not available yet
Err(PostHogEvaluationError::NotAvailable(format!(
"Not found in the local evaluation spec: {}",
flag_key
)))
}
}
}
/// A lite PostHog client.
///
/// At the point of writing this code, PostHog does not have a functional Rust client with feature flag support.
/// This is a lite version that only supports local evaluation of feature flags and only supports those JSON specs
/// that will be used within Neon.
///
/// PostHog is designed as a browser-server system: the browser (client) side uses the client key and is exposed
/// to the end users; the server side uses a server key and is not exposed to the end users. The client and the
/// server has different API keys and provide a different set of APIs. In Neon, we only have the server (that is
/// pageserver), and it will use both the client API and the server API. So we need to store two API keys within
/// our PostHog client.
///
/// The server API is used to fetch the feature flag specs. The client API is used to capture events in case we
/// want to report the feature flag usage back to PostHog. The current plan is to use PostHog only as an UI to
/// configure feature flags so it is very likely that the client API will not be used.
pub struct PostHogClient {
/// The server API key.
server_api_key: String,
/// The client API key.
client_api_key: String,
/// The project ID.
project_id: String,
/// The private API URL.
private_api_url: String,
/// The public API URL.
public_api_url: String,
/// The HTTP client.
client: reqwest::Client,
}
impl PostHogClient {
pub fn new(
server_api_key: String,
client_api_key: String,
project_id: String,
private_api_url: String,
public_api_url: String,
) -> Self {
let client = reqwest::Client::new();
Self {
server_api_key,
client_api_key,
project_id,
private_api_url,
public_api_url,
client,
}
}
pub fn new_with_us_region(
server_api_key: String,
client_api_key: String,
project_id: String,
) -> Self {
Self::new(
server_api_key,
client_api_key,
project_id,
"https://us.posthog.com".to_string(),
"https://us.i.posthog.com".to_string(),
)
}
/// Fetch the feature flag specs from the server.
///
/// This is unfortunately an undocumented API at:
/// - <https://posthog.com/docs/api/feature-flags#get-api-projects-project_id-feature_flags-local_evaluation>
/// - <https://posthog.com/docs/feature-flags/local-evaluation>
///
/// The handling logic in [`FeatureStore`] mostly follows the Python API implementation.
/// See `_compute_flag_locally` in <https://github.com/PostHog/posthog-python/blob/master/posthog/client.py>
pub async fn get_feature_flags_local_evaluation(
&self,
) -> anyhow::Result<LocalEvaluationResponse> {
// BASE_URL/api/projects/:project_id/feature_flags/local_evaluation
// with bearer token of self.server_api_key
let url = format!(
"{}/api/projects/{}/feature_flags/local_evaluation",
self.private_api_url, self.project_id
);
let response = self
.client
.get(url)
.bearer_auth(&self.server_api_key)
.send()
.await?;
let body = response.text().await?;
Ok(serde_json::from_str(&body)?)
}
/// Capture an event. This will only be used to report the feature flag usage back to PostHog, though
/// it also support a lot of other functionalities.
///
/// <https://posthog.com/docs/api/capture>
pub async fn capture_event(
&self,
event: &str,
distinct_id: &str,
properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
) -> anyhow::Result<()> {
// PUBLIC_URL/capture/
// with bearer token of self.client_api_key
let url = format!("{}/capture/", self.public_api_url);
self.client
.post(url)
.body(serde_json::to_string(&json!({
"api_key": self.client_api_key,
"distinct_id": distinct_id,
"event": event,
"properties": properties,
}))?)
.send()
.await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
fn data() -> &'static str {
r#"{
"flags": [
{
"id": 132794,
"team_id": 152860,
"name": "",
"key": "gc-compaction",
"filters": {
"groups": [
{
"variant": "enabled-stage-2",
"properties": [
{
"key": "plan_type",
"type": "person",
"value": [
"free"
],
"operator": "exact"
},
{
"key": "pageserver_remote_size",
"type": "person",
"value": "10000000",
"operator": "lt"
}
],
"rollout_percentage": 50
},
{
"properties": [
{
"key": "plan_type",
"type": "person",
"value": [
"free"
],
"operator": "exact"
},
{
"key": "pageserver_remote_size",
"type": "person",
"value": "10000000",
"operator": "lt"
}
],
"rollout_percentage": 80
}
],
"payloads": {},
"multivariate": {
"variants": [
{
"key": "disabled",
"name": "",
"rollout_percentage": 90
},
{
"key": "enabled-stage-1",
"name": "",
"rollout_percentage": 10
},
{
"key": "enabled-stage-2",
"name": "",
"rollout_percentage": 0
},
{
"key": "enabled-stage-3",
"name": "",
"rollout_percentage": 0
},
{
"key": "enabled",
"name": "",
"rollout_percentage": 0
}
]
}
},
"deleted": false,
"active": true,
"ensure_experience_continuity": false,
"has_encrypted_payloads": false,
"version": 6
}
],
"group_type_mapping": {},
"cohorts": {}
}"#
}
#[test]
fn parse_local_evaluation() {
let data = data();
let _: LocalEvaluationResponse = serde_json::from_str(data).unwrap();
}
#[test]
fn evaluate_multivariate() {
let mut store = FeatureStore::new();
let response: LocalEvaluationResponse = serde_json::from_str(data()).unwrap();
store.set_flags(response.flags);
// This lacks the required properties and cannot be evaluated.
let variant =
store.evaluate_multivariate_inner("gc-compaction", 1.00, 0.40, &HashMap::new());
assert!(matches!(
variant,
Err(PostHogEvaluationError::NotAvailable(_))
),);
let properties_unmatched = HashMap::from([
(
"plan_type".to_string(),
PostHogFlagFilterPropertyValue::String("paid".to_string()),
),
(
"pageserver_remote_size".to_string(),
PostHogFlagFilterPropertyValue::Number(1000.0),
),
]);
// This does not match any group so there will be an error.
let variant =
store.evaluate_multivariate_inner("gc-compaction", 1.00, 0.40, &properties_unmatched);
assert!(matches!(
variant,
Err(PostHogEvaluationError::NoConditionGroupMatched)
),);
let variant =
store.evaluate_multivariate_inner("gc-compaction", 0.80, 0.80, &properties_unmatched);
assert!(matches!(
variant,
Err(PostHogEvaluationError::NoConditionGroupMatched)
),);
let properties = HashMap::from([
(
"plan_type".to_string(),
PostHogFlagFilterPropertyValue::String("free".to_string()),
),
(
"pageserver_remote_size".to_string(),
PostHogFlagFilterPropertyValue::Number(1000.0),
),
]);
// It matches the first group as 0.10 <= 0.50 and the properties are matched. Then it gets evaluated to the variant override.
let variant = store.evaluate_multivariate_inner("gc-compaction", 0.10, 0.10, &properties);
assert_eq!(variant.unwrap(), "enabled-stage-2".to_string());
// It matches the second group as 0.50 <= 0.60 <= 0.80 and the properties are matched. Then it gets evaluated using the global percentage.
let variant = store.evaluate_multivariate_inner("gc-compaction", 0.99, 0.60, &properties);
assert_eq!(variant.unwrap(), "enabled-stage-1".to_string());
let variant = store.evaluate_multivariate_inner("gc-compaction", 0.80, 0.60, &properties);
assert_eq!(variant.unwrap(), "disabled".to_string());
// It matches the group conditions but not the group rollout percentage.
let variant = store.evaluate_multivariate_inner("gc-compaction", 1.00, 0.90, &properties);
assert!(matches!(
variant,
Err(PostHogEvaluationError::NoConditionGroupMatched)
),);
}
}

View File

@@ -330,11 +330,18 @@ impl AzureBlobStorage {
if let Err(DownloadError::Timeout) = &next_item {
timeout_try_cnt += 1;
if timeout_try_cnt <= 5 {
continue;
continue 'outer;
}
}
let next_item = next_item?;
let next_item = match next_item {
Ok(next_item) => next_item,
Err(e) => {
// The error is potentially retryable, so we must rewind the loop after yielding.
yield Err(e);
continue 'outer;
},
};
// Log a warning if we saw two timeouts in a row before a successful request
if timeout_try_cnt > 2 {

View File

@@ -657,7 +657,14 @@ impl RemoteStorage for S3Bucket {
res = request => Ok(res),
_ = tokio::time::sleep(self.timeout) => Err(DownloadError::Timeout),
_ = cancel.cancelled() => Err(DownloadError::Cancelled),
}?;
};
if let Err(DownloadError::Timeout) = &response {
yield Err(DownloadError::Timeout);
continue 'outer;
}
let response = response?; // always yield cancellation errors and stop the stream
let response = response
.context("Failed to list S3 prefixes")

View File

@@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::error::Error as _;
use std::time::Duration;
use bytes::Bytes;
use detach_ancestor::AncestorDetached;
@@ -819,4 +820,25 @@ impl Client {
.await
.map(|resp| resp.status())
}
pub async fn activate_post_import(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
activate_timeline_timeout: Duration,
) -> Result<TimelineInfo> {
let uri = format!(
"{}/v1/tenant/{}/timeline/{}/activate_post_import?timeline_activate_timeout_ms={}",
self.mgmt_api_endpoint,
tenant_shard_id,
timeline_id,
activate_timeline_timeout.as_millis()
);
self.request(Method::PUT, uri, ())
.await?
.json()
.await
.map_err(Error::ReceiveBody)
}
}

View File

@@ -53,6 +53,11 @@ pub trait StorageControllerUpcallApi {
timeline_id: TimelineId,
status: ShardImportStatus,
) -> impl Future<Output = Result<(), RetryForeverError>> + Send;
fn get_timeline_import_status(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
) -> impl Future<Output = Result<Option<ShardImportStatus>, RetryForeverError>> + Send;
}
impl StorageControllerUpcallClient {
@@ -302,4 +307,39 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient {
self.retry_http_forever(&url, request).await
}
#[tracing::instrument(skip_all)] // so that warning logs from retry_http_forever have context
async fn get_timeline_import_status(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
) -> Result<Option<ShardImportStatus>, RetryForeverError> {
let url = self
.base_url
.join(format!("timeline_import_status/{}/{}", tenant_shard_id, timeline_id).as_str())
.expect("Failed to build path");
Ok(backoff::retry(
|| async {
let response = self.http_client.get(url.clone()).send().await?;
if let Err(err) = response.error_for_status_ref() {
if matches!(err.status(), Some(reqwest::StatusCode::NOT_FOUND)) {
return Ok(None);
} else {
return Err(err);
}
}
response.json::<ShardImportStatus>().await.map(Some)
},
|_| false,
3,
u32::MAX,
"storage controller upcall",
&self.cancel,
)
.await
.ok_or(RetryForeverError::ShuttingDown)?
.expect("We retry forever, this should never be reached"))
}
}

View File

@@ -663,6 +663,7 @@ mod test {
use camino::Utf8Path;
use hex_literal::hex;
use pageserver_api::key::Key;
use pageserver_api::models::ShardImportStatus;
use pageserver_api::shard::ShardIndex;
use pageserver_api::upcall_api::ReAttachResponseTenant;
use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
@@ -796,6 +797,14 @@ mod test {
) -> Result<(), RetryForeverError> {
unimplemented!()
}
async fn get_timeline_import_status(
&self,
_tenant_shard_id: TenantShardId,
_timeline_id: TimelineId,
) -> Result<Option<ShardImportStatus>, RetryForeverError> {
unimplemented!()
}
}
async fn setup(test_name: &str) -> anyhow::Result<TestSetup> {

View File

@@ -3500,6 +3500,107 @@ async fn put_tenant_timeline_import_wal(
}.instrument(span).await
}
/// Activate a timeline after its import has completed
///
/// The endpoint is idempotent and callers are expected to retry all
/// errors until a successful response.
async fn activate_post_import_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
const DEFAULT_ACTIVATE_TIMEOUT: Duration = Duration::from_secs(1);
let activate_timeout = parse_query_param(&request, "timeline_activate_timeout_ms")?
.map(Duration::from_millis)
.unwrap_or(DEFAULT_ACTIVATE_TIMEOUT);
let span = info_span!(
"activate_post_import_handler",
tenant_id=%tenant_shard_id.tenant_id,
timeline_id=%timeline_id,
shard_id=%tenant_shard_id.shard_slug()
);
async move {
let state = get_state(&request);
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
tenant
.finalize_importing_timeline(timeline_id)
.await
.map_err(ApiError::InternalServerError)?;
match tenant.get_timeline(timeline_id, false) {
Ok(_timeline) => {
// Timeline is already visible. Reset not required: fall through.
}
Err(GetTimelineError::NotFound { .. }) => {
// This is crude: we reset the whole tenant such that the new timeline is detected
// and activated. We can come up with something more granular in the future.
//
// Note that we only reset the tenant if required: when the timeline is
// not present in [`Tenant::timelines`].
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
state
.tenant_manager
.reset_tenant(tenant_shard_id, false, &ctx)
.await
.map_err(ApiError::InternalServerError)?;
}
Err(GetTimelineError::ShuttingDown) => {
return Err(ApiError::ShuttingDown);
}
Err(GetTimelineError::NotActive { .. }) => {
unreachable!("Called get_timeline with active_only=false");
}
}
let timeline = tenant.get_timeline(timeline_id, false)?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn)
.with_scope_timeline(&timeline);
let result =
tokio::time::timeout(activate_timeout, timeline.wait_to_become_active(&ctx)).await;
match result {
Ok(Ok(())) => {
// fallthrough
}
// Timeline reached some other state that's not active
// TODO(vlad): if the tenant is broken, return a permananet error
Ok(Err(_timeline_state)) => {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"Timeline activation failed"
)));
}
// Activation timed out
Err(_) => {
return Err(ApiError::Timeout("Timeline activation timed out".into()));
}
}
let timeline_info = build_timeline_info(
&timeline, false, // include_non_incremental_logical_size,
false, // force_await_initial_logical_size
&ctx,
)
.await
.context("get local timeline info")
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, timeline_info)
}
.instrument(span)
.await
}
/// Read the end of a tar archive.
///
/// A tar archive normally ends with two consecutive blocks of zeros, 512 bytes each.
@@ -3924,5 +4025,9 @@ pub fn make_router(
"/v1/tenant/:tenant_id/timeline/:timeline_id/import_wal",
|r| api_handler(r, put_tenant_timeline_import_wal),
)
.put(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/activate_post_import",
|r| api_handler(r, activate_post_import_handler),
)
.any(handler_404))
}

View File

@@ -50,7 +50,9 @@ use crate::span::{
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
};
use crate::tenant::storage_layer::IoConcurrency;
use crate::tenant::timeline::{GetVectoredError, VersionedKeySpaceQuery};
use crate::tenant::timeline::{
GetVectoredError, MissingKeyError, RelSizeCacheEntry, VersionedKeySpaceQuery,
};
/// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
pub const MAX_AUX_FILE_DELTAS: usize = 1024;
@@ -470,8 +472,26 @@ impl Timeline {
));
}
if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
return Ok(nblocks);
if let Some(entry) = self.get_cached_rel_size(&tag, version.get_lsn()) {
match entry {
RelSizeCacheEntry::Present(nblocks) => {
return Ok(nblocks);
}
RelSizeCacheEntry::Truncated => {
let key = rel_size_to_key(tag);
return Err(PageReconstructError::MissingKey(Box::new(
MissingKeyError {
keyspace: KeySpace::single(key..key.next()),
shard: self.get_shard_identity().number,
query: None,
original_hwm_lsn: version.get_lsn(),
ancestor_lsn: None,
read_path: None,
backtrace: None,
},
)));
}
}
}
if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
@@ -510,8 +530,15 @@ impl Timeline {
}
// first try to lookup relation in cache
if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
return Ok(true);
if let Some(entry) = self.get_cached_rel_size(&tag, version.get_lsn()) {
match entry {
RelSizeCacheEntry::Present(_) => {
return Ok(true);
}
RelSizeCacheEntry::Truncated => {
return Ok(false);
}
}
}
// then check if the database was already initialized.
// get_rel_exists can be called before dbdir is created.
@@ -1330,12 +1357,12 @@ impl Timeline {
}
/// Get cached size of relation if it not updated after specified LSN
pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<RelSizeCacheEntry> {
let rel_size_cache = self.rel_size_cache.read().unwrap();
if let Some((cached_lsn, nblocks)) = rel_size_cache.map.get(tag) {
if let Some((cached_lsn, entry)) = rel_size_cache.map.get(tag) {
if lsn >= *cached_lsn {
RELSIZE_CACHE_HITS.inc();
return Some(*nblocks);
return Some(*entry);
}
RELSIZE_CACHE_MISSES_OLD.inc();
}
@@ -1359,11 +1386,11 @@ impl Timeline {
hash_map::Entry::Occupied(mut entry) => {
let cached_lsn = entry.get_mut();
if lsn >= cached_lsn.0 {
*cached_lsn = (lsn, nblocks);
*cached_lsn = (lsn, RelSizeCacheEntry::Present(nblocks));
}
}
hash_map::Entry::Vacant(entry) => {
entry.insert((lsn, nblocks));
entry.insert((lsn, RelSizeCacheEntry::Present(nblocks)));
RELSIZE_CACHE_ENTRIES.inc();
}
}
@@ -1372,15 +1399,23 @@ impl Timeline {
/// Store cached relation size
pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
if rel_size_cache.map.insert(tag, (lsn, nblocks)).is_none() {
if rel_size_cache
.map
.insert(tag, (lsn, RelSizeCacheEntry::Present(nblocks)))
.is_none()
{
RELSIZE_CACHE_ENTRIES.inc();
}
}
/// Remove cached relation size
pub fn remove_cached_rel_size(&self, tag: &RelTag) {
pub fn remove_cached_rel_size(&self, tag: RelTag, lsn: Lsn) {
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
if rel_size_cache.map.remove(tag).is_some() {
if rel_size_cache
.map
.insert(tag, (lsn, RelSizeCacheEntry::Truncated))
.is_some()
{
RELSIZE_CACHE_ENTRIES.dec();
}
}
@@ -1585,7 +1620,9 @@ impl DatadirModification<'_> {
// check the cache too. This is because eagerly checking the cache results in
// less work overall and 10% better performance. It's more work on cache miss
// but cache miss is rare.
if let Some(nblocks) = self.tline.get_cached_rel_size(&rel, self.get_lsn()) {
if let Some(RelSizeCacheEntry::Present(nblocks)) =
self.tline.get_cached_rel_size(&rel, self.get_lsn())
{
Ok(nblocks)
} else if !self
.tline
@@ -2172,7 +2209,7 @@ impl DatadirModification<'_> {
self.pending_nblocks -= old_size as i64;
// Remove entry from relation size cache
self.tline.remove_cached_rel_size(&rel_tag);
self.tline.remove_cached_rel_size(rel_tag, self.lsn);
// Delete size entry, as well as all blocks; this is currently a no-op because we haven't implemented tombstones in storage.
self.delete(rel_key_range(rel_tag));

View File

@@ -50,6 +50,7 @@ use remote_timeline_client::{
use secondary::heatmap::{HeatMapTenant, HeatMapTimeline};
use storage_broker::BrokerClientChannel;
use timeline::compaction::{CompactionOutcome, GcCompactionQueue};
use timeline::import_pgdata::ImportingTimeline;
use timeline::offload::{OffloadError, offload_timeline};
use timeline::{
CompactFlags, CompactOptions, CompactionError, PreviousHeatmap, ShutdownMode, import_pgdata,
@@ -284,6 +285,19 @@ pub struct TenantShard {
/// **Lock order**: if acquiring all (or a subset), acquire them in order `timelines`, `timelines_offloaded`, `timelines_creating`
timelines_offloaded: Mutex<HashMap<TimelineId, Arc<OffloadedTimeline>>>,
/// Tracks the timelines that are currently importing into this tenant shard.
///
/// Note that importing timelines are also present in [`Self::timelines_creating`].
/// Keep this in mind when ordering lock acquisition.
///
/// Lifetime:
/// * An imported timeline is created while scanning the bucket on tenant attach
/// if the index part contains an `import_pgdata` entry and said field marks the import
/// as in progress.
/// * Imported timelines are removed when the storage controller calls the post timeline
/// import activation endpoint.
timelines_importing: std::sync::Mutex<HashMap<TimelineId, ImportingTimeline>>,
/// The last tenant manifest known to be in remote storage. None if the manifest has not yet
/// been either downloaded or uploaded. Always Some after tenant attach.
///
@@ -923,19 +937,10 @@ enum StartCreatingTimelineResult {
#[allow(clippy::large_enum_variant, reason = "TODO")]
enum TimelineInitAndSyncResult {
ReadyToActivate(Arc<Timeline>),
ReadyToActivate,
NeedsSpawnImportPgdata(TimelineInitAndSyncNeedsSpawnImportPgdata),
}
impl TimelineInitAndSyncResult {
fn ready_to_activate(self) -> Option<Arc<Timeline>> {
match self {
Self::ReadyToActivate(timeline) => Some(timeline),
_ => None,
}
}
}
#[must_use]
struct TimelineInitAndSyncNeedsSpawnImportPgdata {
timeline: Arc<Timeline>,
@@ -1012,10 +1017,6 @@ enum CreateTimelineCause {
enum LoadTimelineCause {
Attach,
Unoffload,
ImportPgdata {
create_guard: TimelineCreateGuard,
activate: ActivateTimelineArgs,
},
}
#[derive(thiserror::Error, Debug)]
@@ -1097,7 +1098,7 @@ impl TenantShard {
self: &Arc<Self>,
timeline_id: TimelineId,
resources: TimelineResources,
mut index_part: IndexPart,
index_part: IndexPart,
metadata: TimelineMetadata,
previous_heatmap: Option<PreviousHeatmap>,
ancestor: Option<Arc<Timeline>>,
@@ -1106,7 +1107,7 @@ impl TenantShard {
) -> anyhow::Result<TimelineInitAndSyncResult> {
let tenant_id = self.tenant_shard_id;
let import_pgdata = index_part.import_pgdata.take();
let import_pgdata = index_part.import_pgdata.clone();
let idempotency = match &import_pgdata {
Some(import_pgdata) => {
CreateTimelineIdempotency::ImportPgdata(CreatingTimelineIdempotencyImportPgdata {
@@ -1127,7 +1128,7 @@ impl TenantShard {
}
};
let (timeline, timeline_ctx) = self.create_timeline_struct(
let (timeline, _timeline_ctx) = self.create_timeline_struct(
timeline_id,
&metadata,
previous_heatmap,
@@ -1197,14 +1198,6 @@ impl TenantShard {
match import_pgdata {
Some(import_pgdata) if !import_pgdata.is_done() => {
match cause {
LoadTimelineCause::Attach | LoadTimelineCause::Unoffload => (),
LoadTimelineCause::ImportPgdata { .. } => {
unreachable!(
"ImportPgdata should not be reloading timeline import is done and persisted as such in s3"
)
}
}
let mut guard = self.timelines_creating.lock().unwrap();
if !guard.insert(timeline_id) {
// We should never try and load the same timeline twice during startup
@@ -1260,26 +1253,7 @@ impl TenantShard {
"Timeline has no ancestor and no layer files"
);
match cause {
LoadTimelineCause::Attach | LoadTimelineCause::Unoffload => (),
LoadTimelineCause::ImportPgdata {
create_guard,
activate,
} => {
// TODO: see the comment in the task code above how I'm not so certain
// it is safe to activate here because of concurrent shutdowns.
match activate {
ActivateTimelineArgs::Yes { broker_client } => {
info!("activating timeline after reload from pgdata import task");
timeline.activate(self.clone(), broker_client, None, &timeline_ctx);
}
ActivateTimelineArgs::No => (),
}
drop(create_guard);
}
}
Ok(TimelineInitAndSyncResult::ReadyToActivate(timeline))
Ok(TimelineInitAndSyncResult::ReadyToActivate)
}
}
}
@@ -1768,7 +1742,7 @@ impl TenantShard {
})?;
match effect {
TimelineInitAndSyncResult::ReadyToActivate(_) => {
TimelineInitAndSyncResult::ReadyToActivate => {
// activation happens later, on Tenant::activate
}
TimelineInitAndSyncResult::NeedsSpawnImportPgdata(
@@ -1778,13 +1752,24 @@ impl TenantShard {
guard,
},
) => {
tokio::task::spawn(self.clone().create_timeline_import_pgdata_task(
timeline,
import_pgdata,
ActivateTimelineArgs::No,
guard,
ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Warn),
));
let timeline_id = timeline.timeline_id;
let import_task_handle =
tokio::task::spawn(self.clone().create_timeline_import_pgdata_task(
timeline.clone(),
import_pgdata,
guard,
ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Warn),
));
let prev = self.timelines_importing.lock().unwrap().insert(
timeline_id,
ImportingTimeline {
timeline: timeline.clone(),
import_task_handle,
},
);
assert!(prev.is_none());
}
}
}
@@ -2678,14 +2663,7 @@ impl TenantShard {
.await?
}
CreateTimelineParams::ImportPgdata(params) => {
self.create_timeline_import_pgdata(
params,
ActivateTimelineArgs::Yes {
broker_client: broker_client.clone(),
},
ctx,
)
.await?
self.create_timeline_import_pgdata(params, ctx).await?
}
};
@@ -2759,7 +2737,6 @@ impl TenantShard {
async fn create_timeline_import_pgdata(
self: &Arc<Self>,
params: CreateTimelineParamsImportPgdata,
activate: ActivateTimelineArgs,
ctx: &RequestContext,
) -> Result<CreateTimelineResult, CreateTimelineError> {
let CreateTimelineParamsImportPgdata {
@@ -2840,24 +2817,71 @@ impl TenantShard {
let (timeline, timeline_create_guard) = uninit_timeline.finish_creation_myself();
tokio::spawn(self.clone().create_timeline_import_pgdata_task(
let import_task_handle = tokio::spawn(self.clone().create_timeline_import_pgdata_task(
timeline.clone(),
index_part,
activate,
timeline_create_guard,
timeline_ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Warn),
));
let prev = self.timelines_importing.lock().unwrap().insert(
timeline.timeline_id,
ImportingTimeline {
timeline: timeline.clone(),
import_task_handle,
},
);
// Idempotency is enforced higher up the stack
assert!(prev.is_none());
// NB: the timeline doesn't exist in self.timelines at this point
Ok(CreateTimelineResult::ImportSpawned(timeline))
}
/// Finalize the import of a timeline on this shard by marking it complete in
/// the index part. If the import task hasn't finished yet, returns an error.
///
/// This method is idempotent. If the import was finalized once, the next call
/// will be a no-op.
pub(crate) async fn finalize_importing_timeline(
&self,
timeline_id: TimelineId,
) -> anyhow::Result<()> {
let timeline = {
let locked = self.timelines_importing.lock().unwrap();
match locked.get(&timeline_id) {
Some(importing_timeline) => {
if !importing_timeline.import_task_handle.is_finished() {
return Err(anyhow::anyhow!("Import task not done yet"));
}
importing_timeline.timeline.clone()
}
None => {
return Ok(());
}
}
};
timeline
.remote_client
.schedule_index_upload_for_import_pgdata_finalize()?;
timeline.remote_client.wait_completion().await?;
self.timelines_importing
.lock()
.unwrap()
.remove(&timeline_id);
Ok(())
}
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline.timeline_id))]
async fn create_timeline_import_pgdata_task(
self: Arc<TenantShard>,
timeline: Arc<Timeline>,
index_part: import_pgdata::index_part_format::Root,
activate: ActivateTimelineArgs,
timeline_create_guard: TimelineCreateGuard,
ctx: RequestContext,
) {
@@ -2869,7 +2893,6 @@ impl TenantShard {
.create_timeline_import_pgdata_task_impl(
timeline,
index_part,
activate,
timeline_create_guard,
ctx,
)
@@ -2885,60 +2908,15 @@ impl TenantShard {
self: Arc<TenantShard>,
timeline: Arc<Timeline>,
index_part: import_pgdata::index_part_format::Root,
activate: ActivateTimelineArgs,
timeline_create_guard: TimelineCreateGuard,
_timeline_create_guard: TimelineCreateGuard,
ctx: RequestContext,
) -> Result<(), anyhow::Error> {
info!("importing pgdata");
let ctx = ctx.with_scope_timeline(&timeline);
import_pgdata::doit(&timeline, index_part, &ctx, self.cancel.clone())
.await
.context("import")?;
info!("import done");
//
// Reload timeline from remote.
// This proves that the remote state is attachable, and it reuses the code.
//
// TODO: think about whether this is safe to do with concurrent TenantShard::shutdown.
// timeline_create_guard hols the tenant gate open, so, shutdown cannot _complete_ until we exit.
// But our activate() call might launch new background tasks after TenantShard::shutdown
// already went past shutting down the TenantShard::timelines, which this timeline here is no part of.
// I think the same problem exists with the bootstrap & branch mgmt API tasks (tenant shutting
// down while bootstrapping/branching + activating), but, the race condition is much more likely
// to manifest because of the long runtime of this import task.
// in theory this shouldn't even .await anything except for coop yield
info!("shutting down timeline");
timeline.shutdown(ShutdownMode::Hard).await;
info!("timeline shut down, reloading from remote");
// TODO: we can't do the following check because create_timeline_import_pgdata must return an Arc<Timeline>
// let Some(timeline) = Arc::into_inner(timeline) else {
// anyhow::bail!("implementation error: timeline that we shut down was still referenced from somewhere");
// };
let timeline_id = timeline.timeline_id;
// load from object storage like TenantShard::attach does
let resources = self.build_timeline_resources(timeline_id);
let index_part = resources
.remote_client
.download_index_file(&self.cancel)
.await?;
let index_part = match index_part {
MaybeDeletedIndexPart::Deleted(_) => {
// likely concurrent delete call, cplane should prevent this
anyhow::bail!(
"index part says deleted but we are not done creating yet, this should not happen but"
)
}
MaybeDeletedIndexPart::IndexPart(p) => p,
};
let metadata = index_part.metadata.clone();
self
.load_remote_timeline(timeline_id, index_part, metadata, None, resources, LoadTimelineCause::ImportPgdata{
create_guard: timeline_create_guard, activate, }, &ctx)
.await?
.ready_to_activate()
.context("implementation error: reloaded timeline still needs import after import reported success")?;
info!("import done - waiting for activation");
anyhow::Ok(())
}
@@ -3475,6 +3453,14 @@ impl TenantShard {
timeline.defuse_for_tenant_drop();
});
}
{
let mut timelines_importing = self.timelines_importing.lock().unwrap();
timelines_importing
.drain()
.for_each(|(_timeline_id, importing_timeline)| {
importing_timeline.shutdown();
});
}
// test_long_timeline_create_then_tenant_delete is leaning on this message
tracing::info!("Waiting for timelines...");
while let Some(res) = js.join_next().await {
@@ -3949,13 +3935,6 @@ where
Ok(result)
}
enum ActivateTimelineArgs {
Yes {
broker_client: storage_broker::BrokerClientChannel,
},
No,
}
impl TenantShard {
pub fn tenant_specific_overrides(&self) -> pageserver_api::models::TenantConfig {
self.tenant_conf.load().tenant_conf.clone()
@@ -4322,6 +4301,7 @@ impl TenantShard {
timelines: Mutex::new(HashMap::new()),
timelines_creating: Mutex::new(HashSet::new()),
timelines_offloaded: Mutex::new(HashMap::new()),
timelines_importing: Mutex::new(HashMap::new()),
remote_tenant_manifest: Default::default(),
gc_cs: tokio::sync::Mutex::new(()),
walredo_mgr,

View File

@@ -949,6 +949,35 @@ impl RemoteTimelineClient {
Ok(())
}
/// If the `import_pgdata` field marks the timeline as having an import in progress,
/// launch an index-file upload operation that transitions it to done in the background
pub(crate) fn schedule_index_upload_for_import_pgdata_finalize(
self: &Arc<Self>,
) -> anyhow::Result<()> {
use import_pgdata::index_part_format;
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
let to_update = match &upload_queue.dirty.import_pgdata {
Some(import) if !import.is_done() => Some(import),
Some(_) | None => None,
};
if let Some(old) = to_update {
let new =
index_part_format::Root::V1(index_part_format::V1::Done(index_part_format::Done {
idempotency_key: old.idempotency_key().clone(),
started_at: *old.started_at(),
finished_at: chrono::Utc::now().naive_utc(),
}));
upload_queue.dirty.import_pgdata = Some(new);
self.schedule_index_upload(upload_queue);
}
Ok(())
}
/// Launch an index-file upload operation in the background, setting `gc_compaction_state` field.
pub(crate) fn schedule_index_upload_for_gc_compaction_state_update(
self: &Arc<Self>,

View File

@@ -204,7 +204,13 @@ pub struct TimelineResources {
/// value can be used to also update the cache, see [`Timeline::update_cached_rel_size`].
pub(crate) struct RelSizeCache {
pub(crate) complete_as_of: Lsn,
pub(crate) map: HashMap<RelTag, (Lsn, BlockNumber)>,
pub(crate) map: HashMap<RelTag, (Lsn, RelSizeCacheEntry)>,
}
#[derive(Debug, Copy, Clone)]
pub enum RelSizeCacheEntry {
Present(BlockNumber),
Truncated,
}
pub struct Timeline {
@@ -690,15 +696,15 @@ impl std::fmt::Display for ReadPath {
#[derive(thiserror::Error)]
pub struct MissingKeyError {
keyspace: KeySpace,
shard: ShardNumber,
query: Option<VersionedKeySpaceQuery>,
pub keyspace: KeySpace,
pub shard: ShardNumber,
pub query: Option<VersionedKeySpaceQuery>,
// This is largest request LSN from the get page request batch
original_hwm_lsn: Lsn,
ancestor_lsn: Option<Lsn>,
pub original_hwm_lsn: Lsn,
pub ancestor_lsn: Option<Lsn>,
/// Debug information about the read path if there's an error
read_path: Option<ReadPath>,
backtrace: Option<std::backtrace::Backtrace>,
pub read_path: Option<ReadPath>,
pub backtrace: Option<std::backtrace::Backtrace>,
}
impl MissingKeyError {

View File

@@ -3435,6 +3435,7 @@ impl Timeline {
// Step 2: Produce images+deltas.
let mut accumulated_values = Vec::new();
let mut accumulated_values_estimated_size = 0;
let mut last_key: Option<Key> = None;
// Only create image layers when there is no ancestor branches. TODO: create covering image layer
@@ -3611,12 +3612,16 @@ impl Timeline {
if last_key.is_none() {
last_key = Some(key);
}
accumulated_values_estimated_size += val.estimated_size();
accumulated_values.push((key, lsn, val));
if accumulated_values.len() >= 65536 {
// Assume all of them are images, that would be 512MB of data in memory for a single key.
// Accumulated values should never exceed 512MB.
if accumulated_values_estimated_size >= 1024 * 1024 * 512 {
return Err(CompactionError::Other(anyhow!(
"too many values for a single key, giving up gc-compaction"
"too many values for a single key: {} for key {}, {} items",
accumulated_values_estimated_size,
key,
accumulated_values.len()
)));
}
} else {
@@ -3651,6 +3656,7 @@ impl Timeline {
.map_err(CompactionError::Other)?;
accumulated_values.clear();
*last_key = key;
accumulated_values_estimated_size = val.estimated_size();
accumulated_values.push((key, lsn, val));
}
}

View File

@@ -3,6 +3,7 @@ use std::sync::Arc;
use anyhow::{Context, bail};
use pageserver_api::models::ShardImportStatus;
use remote_storage::RemotePath;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::info;
use utils::lsn::Lsn;
@@ -17,6 +18,17 @@ mod importbucket_client;
mod importbucket_format;
pub(crate) mod index_part_format;
pub(crate) struct ImportingTimeline {
pub import_task_handle: JoinHandle<()>,
pub timeline: Arc<Timeline>,
}
impl ImportingTimeline {
pub(crate) fn shutdown(self) {
self.import_task_handle.abort();
}
}
pub async fn doit(
timeline: &Arc<Timeline>,
index_part: index_part_format::Root,
@@ -26,173 +38,161 @@ pub async fn doit(
let index_part_format::Root::V1(v1) = index_part;
let index_part_format::InProgress {
location,
idempotency_key,
started_at,
idempotency_key: _,
started_at: _,
} = match v1 {
index_part_format::V1::Done(_) => return Ok(()),
index_part_format::V1::InProgress(in_progress) => in_progress,
};
let storage = importbucket_client::new(timeline.conf, &location, cancel.clone()).await?;
let storcon_client = StorageControllerUpcallClient::new(timeline.conf, &cancel);
let status_prefix = RemotePath::from_string("status").unwrap();
let shard_status = storcon_client
.get_timeline_import_status(timeline.tenant_shard_id, timeline.timeline_id)
.await
.map_err(|_err| anyhow::anyhow!("Shut down while getting timeline import status"))?;
//
// See if shard is done.
// TODO: incorporate generations into status key for split brain safety. Figure out together with checkpointing.
//
let shard_status_key =
status_prefix.join(format!("shard-{}", timeline.tenant_shard_id.shard_slug()));
let shard_status: Option<importbucket_format::ShardStatus> =
storage.get_json(&shard_status_key).await?;
info!(?shard_status, "peeking shard status");
if shard_status.map(|st| st.done).unwrap_or(false) {
info!("shard status indicates that the shard is done, skipping import");
} else {
// TODO: checkpoint the progress into the IndexPart instead of restarting
// from the beginning.
match shard_status {
None | Some(ShardImportStatus::InProgress) => {
// TODO: checkpoint the progress into the IndexPart instead of restarting
// from the beginning.
//
// Wipe the slate clean - the flow does not allow resuming.
// We can implement resuming in the future by checkpointing the progress into the IndexPart.
//
info!("wipe the slate clean");
{
// TODO: do we need to hold GC lock for this?
let mut guard = timeline.layers.write().await;
assert!(
guard.layer_map()?.open_layer.is_none(),
"while importing, there should be no in-memory layer" // this just seems like a good place to assert it
);
let all_layers_keys = guard.all_persistent_layers();
let all_layers: Vec<_> = all_layers_keys
.iter()
.map(|key| guard.get_from_key(key))
.collect();
let open = guard.open_mut().context("open_mut")?;
//
// Wipe the slate clean - the flow does not allow resuming.
// We can implement resuming in the future by checkpointing the progress into the IndexPart.
//
info!("wipe the slate clean");
{
// TODO: do we need to hold GC lock for this?
let mut guard = timeline.layers.write().await;
assert!(
guard.layer_map()?.open_layer.is_none(),
"while importing, there should be no in-memory layer" // this just seems like a good place to assert it
);
let all_layers_keys = guard.all_persistent_layers();
let all_layers: Vec<_> = all_layers_keys
.iter()
.map(|key| guard.get_from_key(key))
.collect();
let open = guard.open_mut().context("open_mut")?;
timeline.remote_client.schedule_gc_update(&all_layers)?;
open.finish_gc_timeline(&all_layers);
}
//
// Wait for pgdata to finish uploading
//
info!("wait for pgdata to reach status 'done'");
let pgdata_status_key = status_prefix.join("pgdata");
loop {
let res = async {
let pgdata_status: Option<importbucket_format::PgdataStatus> = storage
.get_json(&pgdata_status_key)
.await
.context("get pgdata status")?;
info!(?pgdata_status, "peeking pgdata status");
if pgdata_status.map(|st| st.done).unwrap_or(false) {
Ok(())
} else {
Err(anyhow::anyhow!("pgdata not done yet"))
}
timeline.remote_client.schedule_gc_update(&all_layers)?;
open.finish_gc_timeline(&all_layers);
}
.await;
match res {
Ok(_) => break,
Err(err) => {
info!(?err, "indefinitely waiting for pgdata to finish");
if tokio::time::timeout(std::time::Duration::from_secs(10), cancel.cancelled())
//
// Wait for pgdata to finish uploading
//
info!("wait for pgdata to reach status 'done'");
let storage =
importbucket_client::new(timeline.conf, &location, cancel.clone()).await?;
let status_prefix = RemotePath::from_string("status").unwrap();
let pgdata_status_key = status_prefix.join("pgdata");
loop {
let res = async {
let pgdata_status: Option<importbucket_format::PgdataStatus> = storage
.get_json(&pgdata_status_key)
.await
.context("get pgdata status")?;
info!(?pgdata_status, "peeking pgdata status");
if pgdata_status.map(|st| st.done).unwrap_or(false) {
Ok(())
} else {
Err(anyhow::anyhow!("pgdata not done yet"))
}
}
.await;
match res {
Ok(_) => break,
Err(err) => {
info!(?err, "indefinitely waiting for pgdata to finish");
if tokio::time::timeout(
std::time::Duration::from_secs(10),
cancel.cancelled(),
)
.await
.is_ok()
{
bail!("cancelled while waiting for pgdata");
{
bail!("cancelled while waiting for pgdata");
}
}
}
}
}
//
// Do the import
//
info!("do the import");
let control_file = storage.get_control_file().await?;
let base_lsn = control_file.base_lsn();
//
// Do the import
//
info!("do the import");
let control_file = storage.get_control_file().await?;
let base_lsn = control_file.base_lsn();
info!("update TimelineMetadata based on LSNs from control file");
{
let pg_version = control_file.pg_version();
let _ctx: &RequestContext = ctx;
async move {
// FIXME: The 'disk_consistent_lsn' should be the LSN at the *end* of the
// checkpoint record, and prev_record_lsn should point to its beginning.
// We should read the real end of the record from the WAL, but here we
// just fake it.
let disk_consistent_lsn = Lsn(base_lsn.0 + 8);
let prev_record_lsn = base_lsn;
let metadata = TimelineMetadata::new(
disk_consistent_lsn,
Some(prev_record_lsn),
None, // no ancestor
Lsn(0), // no ancestor lsn
base_lsn, // latest_gc_cutoff_lsn
base_lsn, // initdb_lsn
pg_version,
);
info!("update TimelineMetadata based on LSNs from control file");
{
let pg_version = control_file.pg_version();
let _ctx: &RequestContext = ctx;
async move {
// FIXME: The 'disk_consistent_lsn' should be the LSN at the *end* of the
// checkpoint record, and prev_record_lsn should point to its beginning.
// We should read the real end of the record from the WAL, but here we
// just fake it.
let disk_consistent_lsn = Lsn(base_lsn.0 + 8);
let prev_record_lsn = base_lsn;
let metadata = TimelineMetadata::new(
disk_consistent_lsn,
Some(prev_record_lsn),
None, // no ancestor
Lsn(0), // no ancestor lsn
base_lsn, // latest_gc_cutoff_lsn
base_lsn, // initdb_lsn
pg_version,
);
let _start_lsn = disk_consistent_lsn + 1;
let _start_lsn = disk_consistent_lsn + 1;
timeline
.remote_client
.schedule_index_upload_for_full_metadata_update(&metadata)?;
timeline
.remote_client
.schedule_index_upload_for_full_metadata_update(&metadata)?;
timeline.remote_client.wait_completion().await?;
timeline.remote_client.wait_completion().await?;
anyhow::Ok(())
anyhow::Ok(())
}
}
.await?;
flow::run(timeline.clone(), control_file, storage.clone(), ctx).await?;
// Communicate that shard is done.
// Ensure at-least-once delivery of the upcall to storage controller
// before we mark the task as done and never come here again.
//
// Note that we do not mark the import complete in the index part now.
// This happens in [`Tenant::finalize_importing_timeline`] in response
// to the storage controller calling
// `/v1/tenant/:tenant_id/timeline/:timeline_id/activate_post_import`.
storcon_client
.put_timeline_import_status(
timeline.tenant_shard_id,
timeline.timeline_id,
// TODO(vlad): What about import errors?
ShardImportStatus::Done,
)
.await
.map_err(|_err| {
anyhow::anyhow!("Shut down while putting timeline import status")
})?;
}
Some(ShardImportStatus::Error(err)) => {
info!(
"shard status indicates that the shard is done (error), skipping import {}",
err
);
}
Some(ShardImportStatus::Done) => {
info!("shard status indicates that the shard is done (success), skipping import");
}
.await?;
flow::run(timeline.clone(), control_file, storage.clone(), ctx).await?;
//
// Communicate that shard is done.
// Ensure at-least-once delivery of the upcall to storage controller
// before we mark the task as done and never come here again.
//
let storcon_client = StorageControllerUpcallClient::new(timeline.conf, &cancel);
storcon_client
.put_timeline_import_status(
timeline.tenant_shard_id,
timeline.timeline_id,
// TODO(vlad): What about import errors?
ShardImportStatus::Done,
)
.await
.map_err(|_err| anyhow::anyhow!("Shut down while putting timeline import status"))?;
storage
.put_json(
&shard_status_key,
&importbucket_format::ShardStatus { done: true },
)
.await
.context("put shard status")?;
}
//
// Mark as done in index_part.
// This makes subsequent timeline loads enter the normal load code path
// instead of spawning the import task and calling this here function.
//
info!("mark import as complete in index part");
timeline
.remote_client
.schedule_index_upload_for_import_pgdata_state_update(Some(index_part_format::Root::V1(
index_part_format::V1::Done(index_part_format::Done {
idempotency_key,
started_at,
finished_at: chrono::Utc::now().naive_utc(),
}),
)))?;
timeline.remote_client.wait_completion().await?;
Ok(())
}

View File

@@ -53,6 +53,7 @@ use tokio_stream::StreamExt;
use tracing::{debug, instrument};
use utils::bin_ser::BeSer;
use utils::lsn::Lsn;
use utils::pausable_failpoint;
use super::Timeline;
use super::importbucket_client::{ControlFile, RemoteStorageWrapper};
@@ -79,6 +80,9 @@ pub async fn run(
let import_config = &timeline.conf.timeline_import_config;
let plan = planner.plan(import_config).await?;
pausable_failpoint!("import-timeline-pre-execute-pausable");
plan.execute(timeline, import_config, ctx).await
}

View File

@@ -190,31 +190,6 @@ impl RemoteStorageWrapper {
Ok(Some(res))
}
#[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))]
pub async fn put_json<T>(&self, path: &RemotePath, value: &T) -> anyhow::Result<()>
where
T: serde::Serialize,
{
let buf = serde_json::to_vec(value)?;
let bytes = Bytes::from(buf);
utils::backoff::retry(
|| async {
let size = bytes.len();
let bytes = futures::stream::once(futures::future::ready(Ok(bytes.clone())));
self.storage
.upload_storage_object(bytes, size, path, &self.cancel)
.await
},
remote_storage::TimeoutOrCancel::caused_by_cancel,
1,
u32::MAX,
&format!("put json {path}"),
&self.cancel,
)
.await
.expect("practically infinite retries")
}
#[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))]
pub async fn get_range(
&self,

View File

@@ -5,9 +5,3 @@ pub struct PgdataStatus {
pub done: bool,
// TODO: remaining fields
}
#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq)]
pub struct ShardStatus {
pub done: bool,
// TODO: remaining fields
}

View File

@@ -64,4 +64,12 @@ impl Root {
},
}
}
pub fn started_at(&self) -> &chrono::NaiveDateTime {
match self {
Root::V1(v1) => match v1 {
V1::InProgress(in_progress) => &in_progress.started_at,
V1::Done(done) => &done.started_at,
},
}
}
}

View File

@@ -1281,75 +1281,24 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
communicator_read_at_lsnv(rinfo, forkNum, blkno, &request_lsns, &buffer, 1, NULL);
}
#if PG_MAJORVERSION_NUM < 17
/*
* neon_read() -- Read the specified block from a relation.
*/
#if PG_MAJORVERSION_NUM < 16
static void
neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, char *buffer)
#else
static void
neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer)
#endif
{
neon_request_lsns request_lsns;
bits8 present;
void *bufferp;
switch (reln->smgr_relpersistence)
{
case 0:
neon_log(ERROR, "cannot call smgrread() on rel with unknown persistence");
case RELPERSISTENCE_PERMANENT:
break;
case RELPERSISTENCE_TEMP:
case RELPERSISTENCE_UNLOGGED:
mdread(reln, forkNum, blkno, buffer);
return;
default:
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
}
/* Try to read PS results if they are available */
communicator_prefetch_pump_state();
neon_get_request_lsns(InfoFromSMgrRel(reln), forkNum, blkno, &request_lsns, 1);
present = 0;
bufferp = buffer;
if (communicator_prefetch_lookupv(InfoFromSMgrRel(reln), forkNum, blkno, &request_lsns, 1, &bufferp, &present))
{
/* Prefetch hit */
return;
}
/* Try to read from local file cache */
if (lfc_read(InfoFromSMgrRel(reln), forkNum, blkno, buffer))
{
MyNeonCounters->file_cache_hits_total++;
return;
}
neon_read_at_lsn(InfoFromSMgrRel(reln), forkNum, blkno, request_lsns, buffer);
/*
* Try to receive prefetch results once again just to make sure we don't leave the smgr code while the OS might still have buffered bytes.
*/
communicator_prefetch_pump_state();
#ifdef DEBUG_COMPARE_LOCAL
static void
compare_with_local(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void* buffer, XLogRecPtr request_lsn)
{
if (forkNum == MAIN_FORKNUM && IS_LOCAL_REL(reln))
{
char pageserver_masked[BLCKSZ];
PGIOAlignedBlock mdbuf;
PGIOAlignedBlock mdbuf_masked;
XLogRecPtr request_lsn = request_lsns.request_lsn;
#if PG_MAJORVERSION_NUM >= 17
{
void* mdbuffers[1] = { mdbuf.data };
mdreadv(reln, forkNum, blkno, mdbuffers, 1);
}
#else
mdread(reln, forkNum, blkno, mdbuf.data);
#endif
memcpy(pageserver_masked, buffer, BLCKSZ);
memcpy(mdbuf_masked.data, mdbuf.data, BLCKSZ);
@@ -1413,11 +1362,105 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
}
}
}
}
#endif
#if PG_MAJORVERSION_NUM < 17
/*
* neon_read() -- Read the specified block from a relation.
*/
#if PG_MAJORVERSION_NUM < 16
static void
neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, char *buffer)
#else
static void
neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer)
#endif
{
neon_request_lsns request_lsns;
bits8 present;
void *bufferp;
switch (reln->smgr_relpersistence)
{
case 0:
neon_log(ERROR, "cannot call smgrread() on rel with unknown persistence");
case RELPERSISTENCE_PERMANENT:
break;
case RELPERSISTENCE_TEMP:
case RELPERSISTENCE_UNLOGGED:
mdread(reln, forkNum, blkno, buffer);
return;
default:
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
}
/* Try to read PS results if they are available */
communicator_prefetch_pump_state();
neon_get_request_lsns(InfoFromSMgrRel(reln), forkNum, blkno, &request_lsns, 1);
present = 0;
bufferp = buffer;
if (communicator_prefetch_lookupv(InfoFromSMgrRel(reln), forkNum, blkno, &request_lsns, 1, &bufferp, &present))
{
/* Prefetch hit */
#ifdef DEBUG_COMPARE_LOCAL
compare_with_local(reln, forkNum, blkno, buffer, request_lsns.request_lsn);
#else
return;
#endif
}
/* Try to read from local file cache */
if (lfc_read(InfoFromSMgrRel(reln), forkNum, blkno, buffer))
{
MyNeonCounters->file_cache_hits_total++;
#ifdef DEBUG_COMPARE_LOCAL
compare_with_local(reln, forkNum, blkno, buffer, request_lsns.request_lsn);
#else
return;
#endif
}
neon_read_at_lsn(InfoFromSMgrRel(reln), forkNum, blkno, request_lsns, buffer);
/*
* Try to receive prefetch results once again just to make sure we don't leave the smgr code while the OS might still have buffered bytes.
*/
communicator_prefetch_pump_state();
#ifdef DEBUG_COMPARE_LOCAL
compare_with_local(reln, forkNum, blkno, buffer, request_lsns.request_lsn);
#endif
}
#endif /* PG_MAJORVERSION_NUM <= 16 */
#if PG_MAJORVERSION_NUM >= 17
#ifdef DEBUG_COMPARE_LOCAL
static void
compare_with_localv(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void** buffers, BlockNumber nblocks, neon_request_lsns* request_lsns, bits8* read_pages)
{
if (forkNum == MAIN_FORKNUM && IS_LOCAL_REL(reln))
{
for (BlockNumber i = 0; i < nblocks; i++)
{
if (BITMAP_ISSET(read_pages, i))
{
compare_with_local(reln, forkNum, blkno + i, buffers[i], request_lsns[i].request_lsn);
}
}
}
}
#endif
static void
neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
void **buffers, BlockNumber nblocks)
@@ -1460,8 +1503,13 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
blocknum, request_lsns, nblocks,
buffers, read_pages);
#ifdef DEBUG_COMPARE_LOCAL
compare_with_localv(reln, forknum, blocknum, buffers, nblocks, request_lsns, read_pages);
memset(read_pages, 0, sizeof(read_pages));
#else
if (prefetch_result == nblocks)
return;
#endif
/* Try to read from local file cache */
lfc_result = lfc_readv_select(InfoFromSMgrRel(reln), forknum, blocknum, buffers,
@@ -1470,9 +1518,14 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
if (lfc_result > 0)
MyNeonCounters->file_cache_hits_total += lfc_result;
#ifdef DEBUG_COMPARE_LOCAL
compare_with_localv(reln, forknum, blocknum, buffers, nblocks, request_lsns, read_pages);
memset(read_pages, 0, sizeof(read_pages));
#else
/* Read all blocks from LFC, so we're done */
if (prefetch_result + lfc_result == nblocks)
return;
#endif
communicator_read_at_lsnv(InfoFromSMgrRel(reln), forknum, blocknum, request_lsns,
buffers, nblocks, read_pages);
@@ -1483,91 +1536,8 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
communicator_prefetch_pump_state();
#ifdef DEBUG_COMPARE_LOCAL
if (forknum == MAIN_FORKNUM && IS_LOCAL_REL(reln))
{
char pageserver_masked[BLCKSZ];
PGIOAlignedBlock mdbuf;
PGIOAlignedBlock mdbuf_masked;
XLogRecPtr request_lsn = request_lsns->request_lsn;
for (int i = 0; i < nblocks; i++)
{
BlockNumber blkno = blocknum + i;
if (!BITMAP_ISSET(read_pages, i))
continue;
#if PG_MAJORVERSION_NUM >= 17
{
void* mdbuffers[1] = { mdbuf.data };
mdreadv(reln, forknum, blkno, mdbuffers, 1);
}
#else
mdread(reln, forknum, blkno, mdbuf.data);
#endif
memcpy(pageserver_masked, buffers[i], BLCKSZ);
memcpy(mdbuf_masked.data, mdbuf.data, BLCKSZ);
if (PageIsNew((Page) mdbuf.data))
{
if (!PageIsNew((Page) pageserver_masked))
{
neon_log(PANIC, "page is new in MD but not in Page Server at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n%s\n",
blkno,
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forknum,
(uint32) (request_lsn >> 32), (uint32) request_lsn,
hexdump_page(buffers[i]));
}
}
else if (PageIsNew((Page) buffers[i]))
{
neon_log(PANIC, "page is new in Page Server but not in MD at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n%s\n",
blkno,
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forknum,
(uint32) (request_lsn >> 32), (uint32) request_lsn,
hexdump_page(mdbuf.data));
}
else if (PageGetSpecialSize(mdbuf.data) == 0)
{
/* assume heap */
RmgrTable[RM_HEAP_ID].rm_mask(mdbuf_masked.data, blkno);
RmgrTable[RM_HEAP_ID].rm_mask(pageserver_masked, blkno);
if (memcmp(mdbuf_masked.data, pageserver_masked, BLCKSZ) != 0)
{
neon_log(PANIC, "heap buffers differ at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n------ MD ------\n%s\n------ Page Server ------\n%s\n",
blkno,
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forknum,
(uint32) (request_lsn >> 32), (uint32) request_lsn,
hexdump_page(mdbuf_masked.data),
hexdump_page(pageserver_masked));
}
}
else if (PageGetSpecialSize(mdbuf.data) == MAXALIGN(sizeof(BTPageOpaqueData)))
{
if (((BTPageOpaqueData *) PageGetSpecialPointer(mdbuf.data))->btpo_cycleid < MAX_BT_CYCLE_ID)
{
/* assume btree */
RmgrTable[RM_BTREE_ID].rm_mask(mdbuf_masked.data, blkno);
RmgrTable[RM_BTREE_ID].rm_mask(pageserver_masked, blkno);
if (memcmp(mdbuf_masked.data, pageserver_masked, BLCKSZ) != 0)
{
neon_log(PANIC, "btree buffers differ at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n------ MD ------\n%s\n------ Page Server ------\n%s\n",
blkno,
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forknum,
(uint32) (request_lsn >> 32), (uint32) request_lsn,
hexdump_page(mdbuf_masked.data),
hexdump_page(pageserver_masked));
}
}
}
}
}
memset(read_pages, 0xFF, sizeof(read_pages));
compare_with_localv(reln, forknum, blocknum, buffers, nblocks, request_lsns, read_pages);
#endif
}
#endif

View File

@@ -1,6 +1,10 @@
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
#[allow(non_upper_case_globals)]
#[unsafe(export_name = "malloc_conf")]
pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:21\0";
#[tokio::main]
async fn main() -> anyhow::Result<()> {
proxy::binary::proxy::run().await

View File

@@ -6,12 +6,12 @@ use ipnet::{IpNet, Ipv4Net, Ipv6Net};
use postgres_client::CancelToken;
use postgres_client::tls::MakeTlsConnect;
use pq_proto::CancelKeyData;
use redis::{FromRedisValue, Pipeline, Value, pipe};
use redis::{Cmd, FromRedisValue, Value};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::net::TcpStream;
use tokio::sync::{mpsc, oneshot};
use tracing::{debug, info, warn};
use tracing::{debug, error, info, warn};
use crate::auth::backend::ComputeUserInfo;
use crate::auth::{AuthError, check_peer_addr_is_in_list};
@@ -56,8 +56,70 @@ pub enum CancelKeyOp {
},
}
pub struct Pipeline {
inner: redis::Pipeline,
replies: Vec<CancelReplyOp>,
}
impl Pipeline {
fn with_capacity(n: usize) -> Self {
Self {
inner: redis::Pipeline::with_capacity(n),
replies: Vec::with_capacity(n),
}
}
async fn execute(&mut self, client: &mut RedisKVClient) {
let responses = self.replies.len();
let batch_size = self.inner.len();
match client.query(&self.inner).await {
// for each reply, we expect that many values.
Ok(Value::Array(values)) if values.len() == responses => {
debug!(
batch_size,
responses, "successfully completed cancellation jobs",
);
for (value, reply) in std::iter::zip(values, self.replies.drain(..)) {
reply.send_value(value);
}
}
Ok(value) => {
error!(batch_size, ?value, "unexpected redis return value");
for reply in self.replies.drain(..) {
reply.send_err(anyhow!("incorrect response type from redis"));
}
}
Err(err) => {
for reply in self.replies.drain(..) {
reply.send_err(anyhow!("could not send cmd to redis: {err}"));
}
}
}
self.inner.clear();
self.replies.clear();
}
fn add_command_with_reply(&mut self, cmd: Cmd, reply: CancelReplyOp) {
self.inner.add_command(cmd);
self.replies.push(reply);
}
fn add_command_no_reply(&mut self, cmd: Cmd) {
self.inner.add_command(cmd).ignore();
}
fn add_command(&mut self, cmd: Cmd, reply: Option<CancelReplyOp>) {
match reply {
Some(reply) => self.add_command_with_reply(cmd, reply),
None => self.add_command_no_reply(cmd),
}
}
}
impl CancelKeyOp {
fn register(self, pipe: &mut Pipeline) -> Option<CancelReplyOp> {
fn register(self, pipe: &mut Pipeline) {
#[allow(clippy::used_underscore_binding)]
match self {
CancelKeyOp::StoreCancelKey {
@@ -68,18 +130,18 @@ impl CancelKeyOp {
_guard,
expire,
} => {
pipe.hset(&key, field, value);
pipe.expire(key, expire);
let resp_tx = resp_tx?;
Some(CancelReplyOp::StoreCancelKey { resp_tx, _guard })
let reply =
resp_tx.map(|resp_tx| CancelReplyOp::StoreCancelKey { resp_tx, _guard });
pipe.add_command(Cmd::hset(&key, field, value), reply);
pipe.add_command_no_reply(Cmd::expire(key, expire));
}
CancelKeyOp::GetCancelData {
key,
resp_tx,
_guard,
} => {
pipe.hgetall(key);
Some(CancelReplyOp::GetCancelData { resp_tx, _guard })
let reply = CancelReplyOp::GetCancelData { resp_tx, _guard };
pipe.add_command_with_reply(Cmd::hgetall(key), reply);
}
CancelKeyOp::RemoveCancelKey {
key,
@@ -87,9 +149,9 @@ impl CancelKeyOp {
resp_tx,
_guard,
} => {
pipe.hdel(key, field);
let resp_tx = resp_tx?;
Some(CancelReplyOp::RemoveCancelKey { resp_tx, _guard })
let reply =
resp_tx.map(|resp_tx| CancelReplyOp::RemoveCancelKey { resp_tx, _guard });
pipe.add_command(Cmd::hdel(key, field), reply);
}
}
}
@@ -170,8 +232,8 @@ pub async fn handle_cancel_messages(
client: &mut RedisKVClient,
mut rx: mpsc::Receiver<CancelKeyOp>,
) -> anyhow::Result<()> {
let mut batch = Vec::new();
let mut replies = vec![];
let mut batch = Vec::with_capacity(BATCH_SIZE);
let mut pipeline = Pipeline::with_capacity(BATCH_SIZE);
loop {
if rx.recv_many(&mut batch, BATCH_SIZE).await == 0 {
@@ -182,42 +244,11 @@ pub async fn handle_cancel_messages(
let batch_size = batch.len();
debug!(batch_size, "running cancellation jobs");
let mut pipe = pipe();
for msg in batch.drain(..) {
if let Some(reply) = msg.register(&mut pipe) {
replies.push(reply);
} else {
pipe.ignore();
}
msg.register(&mut pipeline);
}
let responses = replies.len();
match client.query(pipe).await {
// for each reply, we expect that many values.
Ok(Value::Array(values)) if values.len() == responses => {
debug!(
batch_size,
responses, "successfully completed cancellation jobs",
);
for (value, reply) in std::iter::zip(values, replies.drain(..)) {
reply.send_value(value);
}
}
Ok(value) => {
debug!(?value, "unexpected redis return value");
for reply in replies.drain(..) {
reply.send_err(anyhow!("incorrect response type from redis"));
}
}
Err(err) => {
for reply in replies.drain(..) {
reply.send_err(anyhow!("could not send cmd to redis: {err}"));
}
}
}
replies.clear();
pipeline.execute(client).await;
}
}

View File

@@ -3,7 +3,7 @@ use std::net::TcpListener;
use std::sync::{Arc, Mutex};
use anyhow::{anyhow, bail};
use http_utils::endpoint::{self, request_span};
use http_utils::endpoint::{self, profile_cpu_handler, profile_heap_handler, request_span};
use http_utils::error::ApiError;
use http_utils::json::json_response;
use http_utils::{RouterBuilder, RouterService};
@@ -33,6 +33,12 @@ fn make_router(metrics: AppMetrics) -> RouterBuilder<hyper0::Body, ApiError> {
request_span(r, move |b| prometheus_metrics_handler(b, state))
})
.get("/v1/status", status_handler)
.get("/profile/cpu", move |r| {
request_span(r, profile_cpu_handler)
})
.get("/profile/heap", move |r| {
request_span(r, profile_heap_handler)
})
}
pub async fn task_main(

View File

@@ -47,7 +47,7 @@ impl RedisKVClient {
pub(crate) async fn query<T: FromRedisValue>(
&mut self,
q: impl Queryable,
q: &impl Queryable,
) -> anyhow::Result<T> {
if !self.limiter.check() {
tracing::info!("Rate limit exceeded. Skipping query");

View File

@@ -513,7 +513,7 @@ impl SafekeeperPostgresHandler {
let end_pos = end_watch.get();
if end_pos < start_pos {
warn!(
info!(
"requested start_pos {} is ahead of available WAL end_pos {}",
start_pos, end_pos
);

View File

@@ -157,6 +157,29 @@ async fn handle_validate(req: Request<Body>) -> Result<Response<Body>, ApiError>
json_response(StatusCode::OK, state.service.validate(validate_req).await?)
}
async fn handle_get_timeline_import_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::GenerationsApi)?;
let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
let req = match maybe_forward(req).await {
ForwardOutcome::Forwarded(res) => {
return res;
}
ForwardOutcome::NotForwarded(req) => req,
};
let state = get_state(&req);
json_response(
StatusCode::OK,
state
.service
.handle_timeline_shard_import_progress(tenant_shard_id, timeline_id)
.await?,
)
}
async fn handle_put_timeline_import_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::GenerationsApi)?;
@@ -2008,6 +2031,13 @@ pub fn make_router(
.post("/upcall/v1/validate", |r| {
named_request_span(r, handle_validate, RequestName("upcall_v1_validate"))
})
.get("/upcall/v1/timeline_import_status", |r| {
named_request_span(
r,
handle_get_timeline_import_status,
RequestName("upcall_v1_timeline_import_status"),
)
})
.post("/upcall/v1/timeline_import_status", |r| {
named_request_span(
r,

View File

@@ -1,3 +1,5 @@
use std::time::Duration;
use pageserver_api::models::detach_ancestor::AncestorDetached;
use pageserver_api::models::{
DetachBehavior, LocationConfig, LocationConfigListResponse, LsnLease, PageserverUtilization,
@@ -212,6 +214,7 @@ impl PageserverClient {
)
}
#[allow(unused)]
pub(crate) async fn timeline_detail(
&self,
tenant_shard_id: TenantShardId,
@@ -357,4 +360,20 @@ impl PageserverClient {
self.inner.wait_lsn(tenant_shard_id, request).await
)
}
pub(crate) async fn activate_post_import(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
timeline_activate_timeout: Duration,
) -> Result<TimelineInfo> {
measured_request!(
"activate_post_import",
crate::metrics::Method::Put,
&self.node_id_label,
self.inner
.activate_post_import(tenant_shard_id, timeline_id, timeline_activate_timeout)
.await
)
}
}

View File

@@ -1666,6 +1666,39 @@ impl Persistence {
}
}
pub(crate) async fn get_timeline_import(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> DatabaseResult<Option<TimelineImport>> {
use crate::schema::timeline_imports::dsl;
let persistent_import = self
.with_measured_conn(DatabaseOperation::ListTimelineImports, move |conn| {
Box::pin(async move {
let mut from_db: Vec<TimelineImportPersistence> = dsl::timeline_imports
.filter(dsl::tenant_id.eq(tenant_id.to_string()))
.filter(dsl::timeline_id.eq(timeline_id.to_string()))
.load(conn)
.await?;
if from_db.len() > 1 {
return Err(DatabaseError::Logical(format!(
"unexpected number of rows ({})",
from_db.len()
)));
}
Ok(from_db.pop())
})
})
.await?;
persistent_import
.map(TimelineImport::from_persistent)
.transpose()
.map_err(|err| DatabaseError::Logical(format!("failed to deserialize import: {err}")))
}
pub(crate) async fn delete_timeline_import(
&self,
tenant_id: TenantId,

View File

@@ -35,12 +35,12 @@ use pageserver_api::controller_api::{
};
use pageserver_api::models::{
self, DetachBehavior, LocationConfig, LocationConfigListResponse, LocationConfigMode, LsnLease,
PageserverUtilization, SecondaryProgress, ShardParameters, TenantConfig,
PageserverUtilization, SecondaryProgress, ShardImportStatus, ShardParameters, TenantConfig,
TenantConfigPatchRequest, TenantConfigRequest, TenantLocationConfigRequest,
TenantLocationConfigResponse, TenantShardLocation, TenantShardSplitRequest,
TenantShardSplitResponse, TenantSorting, TenantTimeTravelRequest,
TimelineArchivalConfigRequest, TimelineCreateRequest, TimelineCreateResponseStorcon,
TimelineInfo, TimelineState, TopTenantShardItem, TopTenantShardsRequest,
TimelineInfo, TopTenantShardItem, TopTenantShardsRequest,
};
use pageserver_api::shard::{
DEFAULT_STRIPE_SIZE, ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId,
@@ -61,6 +61,7 @@ use utils::completion::Barrier;
use utils::generation::Generation;
use utils::id::{NodeId, TenantId, TimelineId};
use utils::lsn::Lsn;
use utils::shard::ShardIndex;
use utils::sync::gate::{Gate, GateGuard};
use utils::{failpoint_support, pausable_failpoint};
@@ -98,7 +99,8 @@ use crate::tenant_shard::{
ScheduleOptimization, ScheduleOptimizationAction, TenantShard,
};
use crate::timeline_import::{
ShardImportStatuses, TimelineImport, TimelineImportState, UpcallClient,
ImportResult, ShardImportStatuses, TimelineImport, TimelineImportFinalizeError,
TimelineImportState, UpcallClient,
};
const WAITER_FILL_DRAIN_POLL_TIMEOUT: Duration = Duration::from_millis(500);
@@ -3905,6 +3907,38 @@ impl Service {
})
}
pub(crate) async fn handle_timeline_shard_import_progress(
self: &Arc<Self>,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
) -> Result<ShardImportStatus, ApiError> {
let maybe_import = self
.persistence
.get_timeline_import(tenant_shard_id.tenant_id, timeline_id)
.await?;
let import = maybe_import.ok_or_else(|| {
ApiError::NotFound(
format!(
"import for {}/{} not found",
tenant_shard_id.tenant_id, timeline_id
)
.into(),
)
})?;
import
.shard_statuses
.0
.get(&tenant_shard_id.to_index())
.cloned()
.ok_or_else(|| {
ApiError::NotFound(
format!("shard {} not found", tenant_shard_id.shard_slug()).into(),
)
})
}
pub(crate) async fn handle_timeline_shard_import_progress_upcall(
self: &Arc<Self>,
req: PutTimelineImportStatusRequest,
@@ -3943,6 +3977,16 @@ impl Service {
Ok(())
}
/// Finalize the import of a timeline
///
/// This method should be called once all shards have reported that the import is complete.
/// Firstly, it polls the post import timeline activation endpoint exposed by the pageserver.
/// Once the timeline is active on all shards, the timeline also gets created on the
/// safekeepers. Finally, notify cplane of the import completion (whether failed or
/// successful), and remove the import from the database and in-memory.
///
/// If this method gets pre-empted by shut down, it will be called again at start-up (on-going
/// imports are stored in the database).
#[instrument(skip_all, fields(
tenant_id=%import.tenant_id,
shard_id=%import.timeline_id,
@@ -3950,59 +3994,80 @@ impl Service {
async fn finalize_timeline_import(
self: &Arc<Self>,
import: TimelineImport,
) -> anyhow::Result<()> {
) -> Result<(), TimelineImportFinalizeError> {
tracing::info!("Finalizing timeline import");
pausable_failpoint!("timeline-import-pre-cplane-notification");
let import_failed = import.completion_error().is_some();
let tenant_id = import.tenant_id;
let timeline_id = import.timeline_id;
if !import_failed {
loop {
if self.cancel.is_cancelled() {
anyhow::bail!("Shut down requested while finalizing import");
}
let active = self.timeline_active_on_all_shards(&import).await?;
match active {
Some(timeline_info) => {
tracing::info!("Timeline became active on all shards");
if self.config.timelines_onto_safekeepers {
// Now that we know the start LSN of this timeline, create it on the
// safekeepers.
self.tenant_timeline_create_safekeepers_until_success(
import.tenant_id,
timeline_info,
)
.await?;
}
break;
}
None => {
tracing::info!("Timeline not active on all shards yet");
tokio::select! {
_ = self.cancel.cancelled() => {
anyhow::bail!("Shut down requested while finalizing import");
},
_ = tokio::time::sleep(Duration::from_secs(5)) => {}
};
}
}
let import_error = import.completion_error();
match import_error {
Some(err) => {
self.notify_cplane_and_delete_import(tenant_id, timeline_id, Err(err))
.await?;
tracing::warn!("Timeline import completed with shard errors");
Ok(())
}
}
None => match self.activate_timeline_post_import(&import).await {
Ok(timeline_info) => {
tracing::info!("Post import timeline activation complete");
if self.config.timelines_onto_safekeepers {
// Now that we know the start LSN of this timeline, create it on the
// safekeepers.
self.tenant_timeline_create_safekeepers_until_success(
import.tenant_id,
timeline_info,
)
.await?;
}
self.notify_cplane_and_delete_import(tenant_id, timeline_id, Ok(()))
.await?;
tracing::info!("Timeline import completed successfully");
Ok(())
}
Err(TimelineImportFinalizeError::ShuttingDown) => {
// We got pre-empted by shut down and will resume after the restart.
Err(TimelineImportFinalizeError::ShuttingDown)
}
Err(err) => {
// Any finalize error apart from shut down is permanent and requires us to notify
// cplane such that it can clean up.
tracing::error!("Import finalize failed with permanent error: {err}");
self.notify_cplane_and_delete_import(
tenant_id,
timeline_id,
Err(err.to_string()),
)
.await?;
Err(err)
}
},
}
}
async fn notify_cplane_and_delete_import(
self: &Arc<Self>,
tenant_id: TenantId,
timeline_id: TimelineId,
import_result: ImportResult,
) -> Result<(), TimelineImportFinalizeError> {
let import_failed = import_result.is_err();
tracing::info!(%import_failed, "Notifying cplane of import completion");
let client = UpcallClient::new(self.get_config(), self.cancel.child_token());
client.notify_import_complete(&import).await?;
client
.notify_import_complete(tenant_id, timeline_id, import_result)
.await
.map_err(|_err| TimelineImportFinalizeError::ShuttingDown)?;
if let Err(err) = self
.persistence
.delete_timeline_import(import.tenant_id, import.timeline_id)
.delete_timeline_import(tenant_id, timeline_id)
.await
{
tracing::warn!("Failed to delete timeline import entry from database: {err}");
@@ -4012,14 +4077,113 @@ impl Service {
.write()
.unwrap()
.tenants
.range_mut(TenantShardId::tenant_range(import.tenant_id))
.range_mut(TenantShardId::tenant_range(tenant_id))
.for_each(|(_id, shard)| shard.importing = TimelineImportState::Idle);
tracing::info!(%import_failed, "Timeline import complete");
Ok(())
}
/// Activate an imported timeline on all shards once the import is complete.
/// Returns the [`TimelineInfo`] reported by shard zero.
async fn activate_timeline_post_import(
self: &Arc<Self>,
import: &TimelineImport,
) -> Result<TimelineInfo, TimelineImportFinalizeError> {
const TIMELINE_ACTIVATE_TIMEOUT: Duration = Duration::from_millis(128);
let mut shards_to_activate: HashSet<ShardIndex> =
import.shard_statuses.0.keys().cloned().collect();
let mut shard_zero_timeline_info = None;
while !shards_to_activate.is_empty() {
if self.cancel.is_cancelled() {
return Err(TimelineImportFinalizeError::ShuttingDown);
}
let targets = {
let locked = self.inner.read().unwrap();
let mut targets = Vec::new();
for (tenant_shard_id, shard) in locked
.tenants
.range(TenantShardId::tenant_range(import.tenant_id))
{
if !import
.shard_statuses
.0
.contains_key(&tenant_shard_id.to_index())
{
return Err(TimelineImportFinalizeError::MismatchedShards(
tenant_shard_id.to_index(),
));
}
if let Some(node_id) = shard.intent.get_attached() {
let node = locked
.nodes
.get(node_id)
.expect("Pageservers may not be deleted while referenced");
targets.push((*tenant_shard_id, node.clone()));
}
}
targets
};
let targeted_tenant_shards: Vec<_> = targets.iter().map(|(tid, _node)| *tid).collect();
let results = self
.tenant_for_shards_api(
targets,
|tenant_shard_id, client| async move {
client
.activate_post_import(
tenant_shard_id,
import.timeline_id,
TIMELINE_ACTIVATE_TIMEOUT,
)
.await
},
1,
1,
SHORT_RECONCILE_TIMEOUT,
&self.cancel,
)
.await;
let mut failed = 0;
for (tid, result) in targeted_tenant_shards.iter().zip(results.into_iter()) {
match result {
Ok(ok) => {
if tid.is_shard_zero() {
shard_zero_timeline_info = Some(ok);
}
shards_to_activate.remove(&tid.to_index());
}
Err(_err) => {
failed += 1;
}
}
}
if failed > 0 {
tracing::info!(
"Failed to activate timeline on {failed} shards post import. Will retry"
);
}
tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(250)) => {},
_ = self.cancel.cancelled() => {
return Err(TimelineImportFinalizeError::ShuttingDown);
}
}
}
Ok(shard_zero_timeline_info.expect("All shards replied"))
}
async fn finalize_timeline_imports(self: &Arc<Self>, imports: Vec<TimelineImport>) {
futures::future::join_all(
imports
@@ -4029,78 +4193,6 @@ impl Service {
.await;
}
/// If the timeline is active on all shards, returns the [`TimelineInfo`]
/// collected from shard 0.
///
/// An error is returned if the shard layout has changed during the import.
/// This is guarded against within the storage controller and the pageserver,
/// and, therefore, unexpected.
async fn timeline_active_on_all_shards(
self: &Arc<Self>,
import: &TimelineImport,
) -> anyhow::Result<Option<TimelineInfo>> {
let targets = {
let locked = self.inner.read().unwrap();
let mut targets = Vec::new();
for (tenant_shard_id, shard) in locked
.tenants
.range(TenantShardId::tenant_range(import.tenant_id))
{
if !import
.shard_statuses
.0
.contains_key(&tenant_shard_id.to_index())
{
anyhow::bail!("Shard layout change detected on completion");
}
if let Some(node_id) = shard.intent.get_attached() {
let node = locked
.nodes
.get(node_id)
.expect("Pageservers may not be deleted while referenced");
targets.push((*tenant_shard_id, node.clone()));
} else {
return Ok(None);
}
}
targets
};
if targets.is_empty() {
anyhow::bail!("No shards found to finalize import for");
}
let results = self
.tenant_for_shards_api(
targets,
|tenant_shard_id, client| async move {
client
.timeline_detail(tenant_shard_id, import.timeline_id)
.await
},
1,
1,
SHORT_RECONCILE_TIMEOUT,
&self.cancel,
)
.await;
let all_active = results.iter().all(|res| match res {
Ok(info) => info.state == TimelineState::Active,
Err(_) => false,
});
if all_active {
// Both unwraps are validated above
Ok(Some(results.into_iter().next().unwrap().unwrap()))
} else {
Ok(None)
}
}
pub(crate) async fn tenant_timeline_archival_config(
&self,
tenant_id: TenantId,

View File

@@ -10,6 +10,7 @@ use crate::persistence::{
DatabaseError, SafekeeperTimelineOpKind, TimelinePendingOpPersistence, TimelinePersistence,
};
use crate::safekeeper::Safekeeper;
use crate::timeline_import::TimelineImportFinalizeError;
use anyhow::Context;
use http_utils::error::ApiError;
use pageserver_api::controller_api::{
@@ -327,12 +328,12 @@ impl Service {
self: &Arc<Self>,
tenant_id: TenantId,
timeline_info: TimelineInfo,
) -> anyhow::Result<()> {
) -> Result<(), TimelineImportFinalizeError> {
const BACKOFF: Duration = Duration::from_secs(5);
loop {
if self.cancel.is_cancelled() {
anyhow::bail!("Shut down requested while finalizing import");
return Err(TimelineImportFinalizeError::ShuttingDown);
}
let res = self
@@ -348,7 +349,7 @@ impl Service {
tracing::error!("Failed to create timeline on safekeepers: {err}");
tokio::select! {
_ = self.cancel.cancelled() => {
anyhow::bail!("Shut down requested while finalizing import");
return Err(TimelineImportFinalizeError::ShuttingDown);
},
_ = tokio::time::sleep(BACKOFF) => {}
};

View File

@@ -46,6 +46,14 @@ pub(crate) enum TimelineImportUpdateFollowUp {
None,
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum TimelineImportFinalizeError {
#[error("Shut down interrupted import finalize")]
ShuttingDown,
#[error("Mismatched shard detected during import finalize: {0}")]
MismatchedShards(ShardIndex),
}
pub(crate) enum TimelineImportUpdateError {
ImportNotFound {
tenant_id: TenantId,
@@ -151,6 +159,8 @@ impl TimelineImport {
}
}
pub(crate) type ImportResult = Result<(), String>;
pub(crate) struct UpcallClient {
authorization_header: Option<String>,
client: reqwest::Client,
@@ -198,7 +208,9 @@ impl UpcallClient {
/// eventual cplane availability. The cplane API is idempotent.
pub(crate) async fn notify_import_complete(
&self,
import: &TimelineImport,
tenant_id: TenantId,
timeline_id: TimelineId,
import_result: ImportResult,
) -> anyhow::Result<()> {
let endpoint = if self.base_url.ends_with('/') {
format!("{}import_complete", self.base_url)
@@ -206,15 +218,13 @@ impl UpcallClient {
format!("{}/import_complete", self.base_url)
};
tracing::info!("Endpoint is {endpoint}");
let request = self
.client
.request(Method::PUT, endpoint)
.json(&ImportCompleteRequest {
tenant_id: import.tenant_id,
timeline_id: import.timeline_id,
error: import.completion_error(),
tenant_id,
timeline_id,
error: import_result.err(),
})
.timeout(IMPORT_COMPLETE_REQUEST_TIMEOUT);

View File

@@ -130,9 +130,8 @@ def test_pgdata_import_smoke(
elif rel_block_size == RelBlockSize.TWO_STRPES_PER_SHARD:
target_relblock_size = (shard_count or 1) * stripe_size * 8192 * 2
elif rel_block_size == RelBlockSize.MULTIPLE_RELATION_SEGMENTS:
# Postgres uses a 1GiB segment size, fixed at compile time, so we must use >2GB of data
# to exercise multiple segments.
target_relblock_size = int(((2.333 * 1024 * 1024 * 1024) // 8192) * 8192)
segment_size = 16 * 1024 * 1024
target_relblock_size = segment_size * 8
else:
raise ValueError
@@ -413,6 +412,88 @@ def test_import_completion_on_restart(
wait_until(cplane_notified)
@run_only_on_default_postgres(reason="PG version is irrelevant here")
def test_import_respects_tenant_shutdown(
neon_env_builder: NeonEnvBuilder, vanilla_pg: VanillaPostgres, make_httpserver: HTTPServer
):
"""
Validate that importing timelines respect the usual timeline life cycle:
1. Shut down on tenant shut-down and resumes upon re-attach
2. Deletion on timeline deletion (TODO)
"""
# Set up mock control plane HTTP server to listen for import completions
import_completion_signaled = Event()
def handler(request: Request) -> Response:
log.info(f"control plane /import_complete request: {request.json}")
import_completion_signaled.set()
return Response(json.dumps({}), status=200)
cplane_mgmt_api_server = make_httpserver
cplane_mgmt_api_server.expect_request(
"/storage/api/v1/import_complete", method="PUT"
).respond_with_handler(handler)
# Plug the cplane mock in
neon_env_builder.control_plane_hooks_api = (
f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/storage/api/v1/"
)
# The import will specifiy a local filesystem path mocking remote storage
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
vanilla_pg.start()
vanilla_pg.stop()
env = neon_env_builder.init_configs()
env.start()
importbucket_path = neon_env_builder.repo_dir / "test_import_completion_bucket"
mock_import_bucket(vanilla_pg, importbucket_path)
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
idempotency = ImportPgdataIdemptencyKey.random()
# Pause before sending the notification
failpoint_name = "import-timeline-pre-execute-pausable"
env.pageserver.http_client().configure_failpoints((failpoint_name, "pause"))
env.storage_controller.tenant_create(tenant_id)
env.storage_controller.timeline_create(
tenant_id,
{
"new_timeline_id": str(timeline_id),
"import_pgdata": {
"idempotency_key": str(idempotency),
"location": {"LocalFs": {"path": str(importbucket_path.absolute())}},
},
},
)
def hit_failpoint():
log.info("Checking log for pattern...")
try:
assert env.pageserver.log_contains(f".*at failpoint {failpoint_name}.*")
except Exception:
log.exception("Failed to find pattern in log")
raise
wait_until(hit_failpoint)
assert not import_completion_signaled.is_set()
# Restart the pageserver while an import job is in progress.
# This clears the failpoint and we expect that the import starts up afresh
# after the restart and eventually completes.
env.pageserver.stop()
env.pageserver.start()
def cplane_notified():
assert import_completion_signaled.is_set()
wait_until(cplane_notified)
def test_fast_import_with_pageserver_ingest(
test_output_dir,
vanilla_pg: VanillaPostgres,
@@ -520,7 +601,9 @@ def test_fast_import_with_pageserver_ingest(
env.neon_cli.mappings_map_branch(import_branch_name, tenant_id, timeline_id)
# Run fast_import
fast_import.set_aws_creds(mock_s3_server, {"RUST_LOG": "aws_config=debug,aws_sdk_kms=debug"})
fast_import.set_aws_creds(
mock_s3_server, {"RUST_LOG": "info,aws_config=debug,aws_sdk_kms=debug"}
)
pg_port = port_distributor.get_port()
fast_import.run_pgdata(pg_port=pg_port, s3prefix=f"s3://{bucket}/{key_prefix}")

View File

@@ -27,8 +27,9 @@ from contextlib import closing
import psycopg2
import pytest
from fixtures.common_types import Lsn
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, wait_for_last_flush_lsn, wait_replica_caughtup
from fixtures.neon_fixtures import NeonEnv, PgBin, wait_for_last_flush_lsn, wait_replica_caughtup
from fixtures.pg_version import PgVersion
from fixtures.utils import query_scalar, skip_on_postgres, wait_until
@@ -695,3 +696,110 @@ def test_replica_start_with_too_many_unused_xids(neon_simple_env: NeonEnv):
with secondary.cursor() as secondary_cur:
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (n_restarts,)
def test_ephemeral_endpoints_vacuum(neon_simple_env: NeonEnv, pg_bin: PgBin):
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
sql = """
CREATE TABLE CHAR_TBL(f1 char(4));
CREATE TABLE FLOAT8_TBL(f1 float8);
CREATE TABLE INT2_TBL(f1 int2);
CREATE TABLE INT4_TBL(f1 int4);
CREATE TABLE INT8_TBL(q1 int8, q2 int8);
CREATE TABLE POINT_TBL(f1 point);
CREATE TABLE TEXT_TBL (f1 text);
CREATE TABLE VARCHAR_TBL(f1 varchar(4));
CREATE TABLE onek (unique1 int4);
CREATE TABLE onek2 AS SELECT * FROM onek;
CREATE TABLE tenk1 (unique1 int4);
CREATE TABLE tenk2 AS SELECT * FROM tenk1;
CREATE TABLE person (name text, age int4,location point);
CREATE TABLE emp (salary int4, manager name) INHERITS (person);
CREATE TABLE student (gpa float8) INHERITS (person);
CREATE TABLE stud_emp ( percent int4) INHERITS (emp, student);
CREATE TABLE road (name text,thepath path);
CREATE TABLE ihighway () INHERITS (road);
CREATE TABLE shighway(surface text) INHERITS (road);
CREATE TABLE BOOLTBL3 (d text, b bool, o int);
CREATE TABLE booltbl4(isfalse bool, istrue bool, isnul bool);
DROP TABLE BOOLTBL3;
DROP TABLE BOOLTBL4;
CREATE TABLE ceil_floor_round (a numeric);
DROP TABLE ceil_floor_round;
CREATE TABLE width_bucket_test (operand_num numeric, operand_f8 float8);
DROP TABLE width_bucket_test;
CREATE TABLE num_input_test (n1 numeric);
CREATE TABLE num_variance (a numeric);
INSERT INTO num_variance VALUES (0);
CREATE TABLE snapshot_test (nr integer, snap txid_snapshot);
CREATE TABLE guid1(guid_field UUID, text_field TEXT DEFAULT(now()));
CREATE TABLE guid2(guid_field UUID, text_field TEXT DEFAULT(now()));
CREATE INDEX guid1_btree ON guid1 USING BTREE (guid_field);
CREATE INDEX guid1_hash ON guid1 USING HASH (guid_field);
TRUNCATE guid1;
DROP TABLE guid1;
DROP TABLE guid2 CASCADE;
CREATE TABLE numrange_test (nr NUMRANGE);
CREATE INDEX numrange_test_btree on numrange_test(nr);
CREATE TABLE numrange_test2(nr numrange);
CREATE INDEX numrange_test2_hash_idx on numrange_test2 using hash (nr);
INSERT INTO numrange_test2 VALUES('[, 5)');
CREATE TABLE textrange_test (tr text);
CREATE INDEX textrange_test_btree on textrange_test(tr);
CREATE TABLE test_range_gist(ir int4range);
CREATE INDEX test_range_gist_idx on test_range_gist using gist (ir);
DROP INDEX test_range_gist_idx;
CREATE INDEX test_range_gist_idx on test_range_gist using gist (ir);
CREATE TABLE test_range_spgist(ir int4range);
CREATE INDEX test_range_spgist_idx on test_range_spgist using spgist (ir);
DROP INDEX test_range_spgist_idx;
CREATE INDEX test_range_spgist_idx on test_range_spgist using spgist (ir);
CREATE TABLE test_range_elem(i int4);
CREATE INDEX test_range_elem_idx on test_range_elem (i);
CREATE INDEX ON test_range_elem using spgist(int4range(i,i+10));
DROP TABLE test_range_elem;
CREATE TABLE test_range_excl(room int4range, speaker int4range, during tsrange, exclude using gist (room with =, during with &&), exclude using gist (speaker with =, during with &&));
CREATE TABLE f_test(f text, i int);
CREATE TABLE i8r_array (f1 int, f2 text);
CREATE TYPE arrayrange as range (subtype=int4[]);
CREATE TYPE two_ints as (a int, b int);
DROP TYPE two_ints cascade;
CREATE TABLE text_support_test (t text);
CREATE TABLE TEMP_FLOAT (f1 FLOAT8);
CREATE TABLE TEMP_INT4 (f1 INT4);
CREATE TABLE TEMP_INT2 (f1 INT2);
CREATE TABLE TEMP_GROUP (f1 INT4, f2 INT4, f3 FLOAT8);
CREATE TABLE POLYGON_TBL(f1 polygon);
CREATE TABLE quad_poly_tbl (id int, p polygon);
INSERT INTO quad_poly_tbl SELECT (x - 1) * 100 + y, polygon(circle(point(x * 10, y * 10), 1 + (x + y) % 10)) FROM generate_series(1, 200) x, generate_series(1, 100) y;
CREATE TABLE quad_poly_tbl_ord_seq2 AS SELECT 1 FROM quad_poly_tbl;
CREATE TABLE quad_poly_tbl_ord_idx2 AS SELECT 1 FROM quad_poly_tbl;
"""
with endpoint.cursor() as cur:
lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
env.endpoints.create_start(branch_name="main", lsn=lsn)
log.info(f"lsn: {lsn}")
for line in sql.split("\n"):
if len(line.strip()) == 0 or line.startswith("--"):
continue
cur.execute(line)
lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
env.endpoints.create_start(branch_name="main", lsn=lsn)
log.info(f"lsn: {lsn}")
cur.execute("VACUUM FULL pg_class;")
for ep in env.endpoints.endpoints:
log.info(f"{ep.endpoint_id} / {ep.pg_port}")
pg_dump_command = ["pg_dumpall", "-f", f"/tmp/dump-{ep.endpoint_id}.sql"]
env_vars = {
"PGPORT": str(ep.pg_port),
"PGUSER": endpoint.default_options["user"],
"PGHOST": endpoint.default_options["host"],
}
pg_bin.run_capture(pg_dump_command, env=env_vars)

16
vendor/revisions.json vendored
View File

@@ -1,18 +1,18 @@
{
"v17": [
"17.4",
"0d59c91c1a23e667f1d1169d5f040b3fa0a0ab44"
"17.5",
"e5374b72997b0afc8374137674e873f7a558120a"
],
"v16": [
"16.8",
"d72d76f2cdee4194dd052ce099e9784aca7c794a"
"16.9",
"bb5eee65ac753b5a66d255ec5fb4c0e33180e8fd"
],
"v15": [
"15.12",
"72f83df76c61ce18d81bd371f0afd2a43d59c052"
"15.13",
"052df87d338dc30687d0c96f1a4d9b6cb4882b2e"
],
"v14": [
"14.17",
"06b405bc982fd53522689aa4acbfd9c44b7993cf"
"14.18",
"ead1e76bdcb71ef87f52f0610bd7333247f75179"
]
}

View File

@@ -39,8 +39,10 @@ env_logger = { version = "0.11" }
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
form_urlencoded = { version = "1" }
futures-channel = { version = "0.3", features = ["sink"] }
futures-core = { version = "0.3" }
futures-executor = { version = "0.3" }
futures-io = { version = "0.3" }
futures-task = { version = "0.3", default-features = false, features = ["std"] }
futures-util = { version = "0.3", features = ["channel", "io", "sink"] }
generic-array = { version = "0.14", default-features = false, features = ["more_lengths", "zeroize"] }
getrandom = { version = "0.2", default-features = false, features = ["std"] }
@@ -70,6 +72,7 @@ num-traits = { version = "0.2", features = ["i128", "libm"] }
once_cell = { version = "1" }
p256 = { version = "0.13", features = ["jwk"] }
parquet = { version = "53", default-features = false, features = ["zstd"] }
percent-encoding = { version = "2" }
prost = { version = "0.13", features = ["no-recursion-limit", "prost-derive"] }
rand = { version = "0.8", features = ["small_rng"] }
regex = { version = "1" }