mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-02 13:00:37 +00:00
Compare commits
1 Commits
devin/1746
...
tristan957
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3b56c3eecd |
5
.github/actionlint.yml
vendored
5
.github/actionlint.yml
vendored
@@ -33,14 +33,9 @@ config-variables:
|
||||
- REMOTE_STORAGE_AZURE_CONTAINER
|
||||
- REMOTE_STORAGE_AZURE_REGION
|
||||
- SLACK_CICD_CHANNEL_ID
|
||||
- SLACK_COMPUTE_CHANNEL_ID
|
||||
- SLACK_ON_CALL_DEVPROD_STREAM
|
||||
- SLACK_ON_CALL_QA_STAGING_STREAM
|
||||
- SLACK_ON_CALL_STORAGE_STAGING_STREAM
|
||||
- SLACK_ONCALL_COMPUTE_GROUP
|
||||
- SLACK_ONCALL_PROXY_GROUP
|
||||
- SLACK_ONCALL_STORAGE_GROUP
|
||||
- SLACK_PROXY_CHANNEL_ID
|
||||
- SLACK_RUST_CHANNEL_ID
|
||||
- SLACK_STORAGE_CHANNEL_ID
|
||||
- SLACK_UPCOMING_RELEASE_CHANNEL_ID
|
||||
|
||||
71
.github/workflows/benchmarking.yml
vendored
71
.github/workflows/benchmarking.yml
vendored
@@ -53,77 +53,6 @@ concurrency:
|
||||
cancel-in-progress: true
|
||||
|
||||
jobs:
|
||||
cleanup:
|
||||
runs-on: [ self-hosted, us-east-2, x64 ]
|
||||
container:
|
||||
image: ghcr.io/neondatabase/build-tools:pinned-bookworm
|
||||
credentials:
|
||||
username: ${{ github.actor }}
|
||||
password: ${{ secrets.GITHUB_TOKEN }}
|
||||
options: --init
|
||||
env:
|
||||
ORG_ID: org-solitary-dew-09443886
|
||||
LIMIT: 100
|
||||
SEARCH: "Created by actions/neon-project-create; GITHUB_RUN_ID"
|
||||
BASE_URL: https://console-stage.neon.build/api/v2
|
||||
DRY_RUN: "false" # Set to "true" to just test out the workflow
|
||||
|
||||
steps:
|
||||
- name: Harden the runner (Audit all outbound calls)
|
||||
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
|
||||
with:
|
||||
egress-policy: audit
|
||||
|
||||
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
|
||||
|
||||
- name: Cleanup inactive Neon projects left over from prior runs
|
||||
env:
|
||||
API_KEY: ${{ secrets.NEON_STAGING_API_KEY }}
|
||||
run: |
|
||||
set -euo pipefail
|
||||
|
||||
NOW=$(date -u +%s)
|
||||
DAYS_AGO=$((NOW - 5 * 86400))
|
||||
|
||||
REQUEST_URL="$BASE_URL/projects?limit=$LIMIT&search=$(printf '%s' "$SEARCH" | jq -sRr @uri)&org_id=$ORG_ID"
|
||||
|
||||
echo "Requesting project list from:"
|
||||
echo "$REQUEST_URL"
|
||||
|
||||
response=$(curl -s -X GET "$REQUEST_URL" \
|
||||
--header "Accept: application/json" \
|
||||
--header "Content-Type: application/json" \
|
||||
--header "Authorization: Bearer ${API_KEY}" )
|
||||
|
||||
echo "Response:"
|
||||
echo "$response" | jq .
|
||||
|
||||
projects_to_delete=$(echo "$response" | jq --argjson cutoff "$DAYS_AGO" '
|
||||
.projects[]
|
||||
| select(.compute_last_active_at != null)
|
||||
| select((.compute_last_active_at | fromdateiso8601) < $cutoff)
|
||||
| {id, name, compute_last_active_at}
|
||||
')
|
||||
|
||||
if [ -z "$projects_to_delete" ]; then
|
||||
echo "No projects eligible for deletion."
|
||||
exit 0
|
||||
fi
|
||||
|
||||
echo "Projects that will be deleted:"
|
||||
echo "$projects_to_delete" | jq -r '.id'
|
||||
|
||||
if [ "$DRY_RUN" = "false" ]; then
|
||||
echo "$projects_to_delete" | jq -r '.id' | while read -r project_id; do
|
||||
echo "Deleting project: $project_id"
|
||||
curl -s -X DELETE "$BASE_URL/projects/$project_id" \
|
||||
--header "Accept: application/json" \
|
||||
--header "Content-Type: application/json" \
|
||||
--header "Authorization: Bearer ${API_KEY}"
|
||||
done
|
||||
else
|
||||
echo "Dry run enabled — no projects were deleted."
|
||||
fi
|
||||
bench:
|
||||
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
|
||||
permissions:
|
||||
|
||||
37
.github/workflows/build_and_test.yml
vendored
37
.github/workflows/build_and_test.yml
vendored
@@ -1434,10 +1434,10 @@ jobs:
|
||||
;;
|
||||
esac
|
||||
|
||||
notify-release-deploy-failure:
|
||||
needs: [ meta, deploy ]
|
||||
notify-storage-release-deploy-failure:
|
||||
needs: [ deploy ]
|
||||
# We want this to run even if (transitive) dependencies are skipped, because deploy should really be successful on release branch workflow runs.
|
||||
if: contains(fromJSON('["storage-release", "compute-release", "proxy-release"]'), needs.meta.outputs.run-kind) && needs.deploy.result != 'success' && always()
|
||||
if: github.ref_name == 'release' && needs.deploy.result != 'success' && always()
|
||||
runs-on: ubuntu-22.04
|
||||
steps:
|
||||
- name: Harden the runner (Audit all outbound calls)
|
||||
@@ -1445,40 +1445,15 @@ jobs:
|
||||
with:
|
||||
egress-policy: audit
|
||||
|
||||
- name: Post release-deploy failure to team slack channel
|
||||
- name: Post release-deploy failure to team-storage slack channel
|
||||
uses: slackapi/slack-github-action@485a9d42d3a73031f12ec201c457e2162c45d02d # v2.0.0
|
||||
env:
|
||||
TEAM_ONCALL: >-
|
||||
${{
|
||||
fromJSON(format('{
|
||||
"storage-release": "<!subteam^{0}|@oncall-storage>",
|
||||
"compute-release": "<!subteam^{1}|@oncall-compute>",
|
||||
"proxy-release": "<!subteam^{2}|@oncall-proxy>"
|
||||
}',
|
||||
vars.SLACK_ONCALL_STORAGE_GROUP,
|
||||
vars.SLACK_ONCALL_COMPUTE_GROUP,
|
||||
vars.SLACK_ONCALL_PROXY_GROUP
|
||||
))[needs.meta.outputs.run-kind]
|
||||
}}
|
||||
CHANNEL: >-
|
||||
${{
|
||||
fromJSON(format('{
|
||||
"storage-release": "{0}",
|
||||
"compute-release": "{1}",
|
||||
"proxy-release": "{2}"
|
||||
}',
|
||||
vars.SLACK_STORAGE_CHANNEL_ID,
|
||||
vars.SLACK_COMPUTE_CHANNEL_ID,
|
||||
vars.SLACK_PROXY_CHANNEL_ID
|
||||
))[needs.meta.outputs.run-kind]
|
||||
}}
|
||||
with:
|
||||
method: chat.postMessage
|
||||
token: ${{ secrets.SLACK_BOT_TOKEN }}
|
||||
payload: |
|
||||
channel: ${{ env.CHANNEL }}
|
||||
channel: ${{ vars.SLACK_STORAGE_CHANNEL_ID }}
|
||||
text: |
|
||||
🔴 ${{ env.TEAM_ONCALL }}: deploy job on release branch had unexpected status "${{ needs.deploy.result }}" <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>.
|
||||
🔴 <!subteam^S06CJ87UMNY|@oncall-storage>: deploy job on release branch had unexpected status "${{ needs.deploy.result }}" <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>.
|
||||
|
||||
# The job runs on `release` branch and copies compatibility data and Neon artifact from the last *release PR* to the latest directory
|
||||
promote-compatibility-data:
|
||||
|
||||
@@ -210,24 +210,20 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
|
||||
// may be empty. In that case, we need to dig them from the GUCs in the
|
||||
// cluster.settings field.
|
||||
let pageserver_connstr = spec
|
||||
.pageserver_connstring
|
||||
.clone()
|
||||
.or_else(|| spec.cluster.settings.find("neon.pageserver_connstring"))
|
||||
.ok_or("pageserver connstr should be provided")?;
|
||||
let safekeeper_connstrings = if spec.safekeeper_connstrings.is_empty() {
|
||||
if matches!(spec.mode, ComputeMode::Primary) {
|
||||
spec.cluster
|
||||
.settings
|
||||
.find("neon.safekeepers")
|
||||
.ok_or("safekeeper connstrings should be provided")?
|
||||
.split(',')
|
||||
.map(|str| str.to_string())
|
||||
.collect()
|
||||
} else {
|
||||
vec![]
|
||||
}
|
||||
.cluster
|
||||
.settings
|
||||
.find("neon.pageserver_connstring")
|
||||
.expect("pageserver connstr should be provided");
|
||||
let safekeeper_connstrings = if matches!(spec.mode, ComputeMode::Primary) {
|
||||
spec.cluster
|
||||
.settings
|
||||
.find("neon.safekeepers")
|
||||
.ok_or("safekeeper connstrings should be provided")?
|
||||
.split(',')
|
||||
.map(|str| str.to_string())
|
||||
.collect()
|
||||
} else {
|
||||
spec.safekeeper_connstrings.clone()
|
||||
vec![]
|
||||
};
|
||||
let storage_auth_token = spec.storage_auth_token.clone();
|
||||
let tenant_id: TenantId = if let Some(tenant_id) = spec.tenant_id {
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
use anyhow::Result;
|
||||
use std::fmt::Write as FmtWrite;
|
||||
use std::fs::{File, OpenOptions};
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
@@ -56,29 +55,9 @@ pub fn write_postgres_conf(
|
||||
|
||||
// Add options for connecting to storage
|
||||
writeln!(file, "# Neon storage settings")?;
|
||||
if let Some(s) = &spec.pageserver_connstring {
|
||||
writeln!(file, "neon.pageserver_connstring={}", escape_conf_value(s))?;
|
||||
}
|
||||
if let Some(stripe_size) = spec.shard_stripe_size {
|
||||
writeln!(file, "neon.stripe_size={stripe_size}")?;
|
||||
}
|
||||
if !spec.safekeeper_connstrings.is_empty() {
|
||||
let mut neon_safekeepers_value = String::new();
|
||||
tracing::info!(
|
||||
"safekeepers_connstrings is not zero, gen: {:?}",
|
||||
spec.safekeepers_generation
|
||||
);
|
||||
// If generation is given, prepend sk list with g#number:
|
||||
if let Some(generation) = spec.safekeepers_generation {
|
||||
write!(neon_safekeepers_value, "g#{}:", generation)?;
|
||||
}
|
||||
neon_safekeepers_value.push_str(&spec.safekeeper_connstrings.join(","));
|
||||
writeln!(
|
||||
file,
|
||||
"neon.safekeepers={}",
|
||||
escape_conf_value(&neon_safekeepers_value)
|
||||
)?;
|
||||
}
|
||||
if let Some(s) = &spec.tenant_id {
|
||||
writeln!(file, "neon.tenant_id={}", escape_conf_value(&s.to_string()))?;
|
||||
}
|
||||
@@ -165,6 +144,21 @@ pub fn write_postgres_conf(
|
||||
if spec.cluster.settings.is_some() {
|
||||
writeln!(file, "# Managed by compute_ctl: begin")?;
|
||||
write!(file, "{}", spec.cluster.settings.as_pg_settings())?;
|
||||
// If generation is given, prepend sk list with g#number:
|
||||
if let Some(generation) = spec.safekeepers_generation {
|
||||
let neon_safekeepers = spec.cluster.settings.find("neon.safekeepers").unwrap();
|
||||
// Don't try to add it if it already exists
|
||||
if !neon_safekeepers.starts_with("g#") {
|
||||
writeln!(
|
||||
file,
|
||||
"# Overwriting original neon.safekeepers value to include generation"
|
||||
)?;
|
||||
writeln!(
|
||||
file,
|
||||
"neon.safekeepers = 'g#{generation}:{neon_safekeepers}'"
|
||||
)?;
|
||||
}
|
||||
}
|
||||
writeln!(file, "# Managed by compute_ctl: end")?;
|
||||
}
|
||||
|
||||
|
||||
@@ -366,47 +366,24 @@ impl ComputeNode {
|
||||
// and can thus use all `max_connections` connection slots. However, that's generally not
|
||||
// very efficient, so we generally still limit it to a smaller number.
|
||||
if compute_state.status == ComputeStatus::Init {
|
||||
// If the settings contain 'max_connections', use that as template
|
||||
if let Some(config) = spec.cluster.settings.find("max_connections") {
|
||||
config.parse::<usize>().ok()
|
||||
} else {
|
||||
// Otherwise, try to find the setting in the postgresql_conf string
|
||||
spec.cluster
|
||||
.postgresql_conf
|
||||
.iter()
|
||||
.flat_map(|conf| conf.split("\n"))
|
||||
.filter_map(|line| {
|
||||
if !line.contains("max_connections") {
|
||||
return None;
|
||||
}
|
||||
// If the settings contain 'max_connections', use that as a
|
||||
// template. Otherwise, if we didn't find max_connections, default
|
||||
// to 10 concurrent connections.
|
||||
spec.cluster
|
||||
.settings
|
||||
.find("max_connections")
|
||||
.map_or(10, |guc| {
|
||||
let max_connections = guc.parse::<usize>().unwrap();
|
||||
|
||||
let (key, value) = line.split_once("=")?;
|
||||
let key = key
|
||||
.trim_start_matches(char::is_whitespace)
|
||||
.trim_end_matches(char::is_whitespace);
|
||||
|
||||
let value = value
|
||||
.trim_start_matches(char::is_whitespace)
|
||||
.trim_end_matches(char::is_whitespace);
|
||||
|
||||
if key != "max_connections" {
|
||||
return None;
|
||||
}
|
||||
|
||||
value.parse::<usize>().ok()
|
||||
})
|
||||
.next()
|
||||
}
|
||||
// If max_connections is present, use at most 1/3rd of that.
|
||||
// When max_connections is lower than 30, try to use at least 10 connections, but
|
||||
// never more than max_connections.
|
||||
.map(|limit| match limit {
|
||||
0..10 => limit,
|
||||
10..30 => 10,
|
||||
30.. => limit / 3,
|
||||
})
|
||||
// If we didn't find max_connections, default to 10 concurrent connections.
|
||||
.unwrap_or(10)
|
||||
// If max_connections is present, use at most 1/3rd of that.
|
||||
// When max_connections is lower than 30, try to use at least 10 connections, but
|
||||
// never more than max_connections.
|
||||
match max_connections {
|
||||
0..10 => max_connections,
|
||||
10..30 => 10,
|
||||
30.. => max_connections / 3,
|
||||
}
|
||||
})
|
||||
} else {
|
||||
// state == Running
|
||||
// Because the cluster is already in the Running state, we should assume users are
|
||||
|
||||
@@ -50,8 +50,8 @@ use compute_api::responses::{
|
||||
ComputeConfig, ComputeCtlConfig, ComputeStatus, ComputeStatusResponse, TlsConfig,
|
||||
};
|
||||
use compute_api::spec::{
|
||||
Cluster, ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, Database, PgIdent,
|
||||
RemoteExtSpec, Role,
|
||||
Cluster, ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, Database, GenericOption,
|
||||
PgIdent, RemoteExtSpec, Role,
|
||||
};
|
||||
use jsonwebtoken::jwk::{
|
||||
AlgorithmParameters, CommonParameters, EllipticCurve, Jwk, JwkSet, KeyAlgorithm, KeyOperations,
|
||||
@@ -711,7 +711,18 @@ impl Endpoint {
|
||||
} else {
|
||||
Vec::new()
|
||||
},
|
||||
settings: None,
|
||||
settings: Some(vec![
|
||||
GenericOption {
|
||||
name: "neon.pageserver_connstring".to_string(),
|
||||
value: Some(pageserver_connstring),
|
||||
vartype: "string".to_string(),
|
||||
},
|
||||
GenericOption {
|
||||
name: "neon.safekeepers".to_string(),
|
||||
value: Some(safekeeper_connstrings.join(",")),
|
||||
vartype: "string".to_string(),
|
||||
},
|
||||
]),
|
||||
postgresql_conf: Some(postgresql_conf.clone()),
|
||||
},
|
||||
delta_operations: None,
|
||||
@@ -721,9 +732,7 @@ impl Endpoint {
|
||||
branch_id: None,
|
||||
endpoint_id: Some(self.endpoint_id.clone()),
|
||||
mode: self.mode,
|
||||
pageserver_connstring: Some(pageserver_connstring),
|
||||
safekeepers_generation: safekeepers_generation.map(|g| g.into_inner()),
|
||||
safekeeper_connstrings,
|
||||
storage_auth_token: auth_token.clone(),
|
||||
remote_extensions,
|
||||
pgbouncer_settings: None,
|
||||
@@ -928,6 +937,8 @@ impl Endpoint {
|
||||
stripe_size: Option<ShardStripeSize>,
|
||||
safekeepers: Option<Vec<NodeId>>,
|
||||
) -> Result<()> {
|
||||
let mut pg_settings: Vec<GenericOption> = vec![];
|
||||
|
||||
let (mut spec, compute_ctl_config) = {
|
||||
let config_path = self.endpoint_path().join("config.json");
|
||||
let file = std::fs::File::open(config_path)?;
|
||||
@@ -958,7 +969,12 @@ impl Endpoint {
|
||||
|
||||
let pageserver_connstr = Self::build_pageserver_connstr(&pageservers);
|
||||
assert!(!pageserver_connstr.is_empty());
|
||||
spec.pageserver_connstring = Some(pageserver_connstr);
|
||||
pg_settings.push(GenericOption {
|
||||
name: "neon.pageserver_connstring".to_string(),
|
||||
value: Some(pageserver_connstr),
|
||||
vartype: "string".to_string(),
|
||||
});
|
||||
|
||||
if stripe_size.is_some() {
|
||||
spec.shard_stripe_size = stripe_size.map(|s| s.0 as usize);
|
||||
}
|
||||
@@ -966,9 +982,22 @@ impl Endpoint {
|
||||
// If safekeepers are not specified, don't change them.
|
||||
if let Some(safekeepers) = safekeepers {
|
||||
let safekeeper_connstrings = self.build_safekeepers_connstrs(safekeepers)?;
|
||||
spec.safekeeper_connstrings = safekeeper_connstrings;
|
||||
pg_settings.push(GenericOption {
|
||||
name: "neon.safekeepers".to_string(),
|
||||
value: Some(safekeeper_connstrings.join(",")),
|
||||
vartype: "string".to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
spec.cluster.settings = match spec.cluster.settings {
|
||||
// Append the new settings to the configured settings
|
||||
Some(mut settings) => {
|
||||
settings.append(&mut pg_settings);
|
||||
Some(settings)
|
||||
}
|
||||
None => Some(pg_settings),
|
||||
};
|
||||
|
||||
let client = reqwest::Client::builder()
|
||||
.timeout(Duration::from_secs(120))
|
||||
.build()
|
||||
|
||||
@@ -90,19 +90,17 @@ pub struct ComputeSpec {
|
||||
|
||||
// Information needed to connect to the storage layer.
|
||||
//
|
||||
// `tenant_id`, `timeline_id` and `pageserver_connstring` are always needed.
|
||||
// `tenant_id` and `timeline_id` are always needed.
|
||||
//
|
||||
// Depending on `mode`, this can be a primary read-write node, a read-only
|
||||
// replica, or a read-only node pinned at an older LSN.
|
||||
// `safekeeper_connstrings` must be set for a primary.
|
||||
//
|
||||
// For backwards compatibility, the control plane may leave out all of
|
||||
// For backwards compatibility, the control plane may leave out both of
|
||||
// these, and instead set the "neon.tenant_id", "neon.timeline_id",
|
||||
// etc. GUCs in cluster.settings. TODO: Once the control plane has been
|
||||
// updated to fill these fields, we can make these non optional.
|
||||
pub tenant_id: Option<TenantId>,
|
||||
pub timeline_id: Option<TimelineId>,
|
||||
pub pageserver_connstring: Option<String>,
|
||||
|
||||
// More neon ids that we expose to the compute_ctl
|
||||
// and to postgres as neon extension GUCs.
|
||||
@@ -121,8 +119,6 @@ pub struct ComputeSpec {
|
||||
/// compute_ctl with postgres_ffi.
|
||||
#[serde(default)]
|
||||
pub safekeepers_generation: Option<u32>,
|
||||
#[serde(default)]
|
||||
pub safekeeper_connstrings: Vec<String>,
|
||||
|
||||
#[serde(default)]
|
||||
pub mode: ComputeMode,
|
||||
|
||||
@@ -16,7 +16,6 @@ pub struct Collector {
|
||||
const NMETRICS: usize = 2;
|
||||
|
||||
static CLK_TCK_F64: Lazy<f64> = Lazy::new(|| {
|
||||
// SAFETY: libc::sysconf is safe, it merely returns a value.
|
||||
let long = unsafe { libc::sysconf(libc::_SC_CLK_TCK) };
|
||||
if long == -1 {
|
||||
panic!("sysconf(_SC_CLK_TCK) failed");
|
||||
|
||||
@@ -10,7 +10,6 @@ use pageserver::tenant::storage_layer::{DeltaLayer, ImageLayer, delta_layer, ima
|
||||
use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
|
||||
use pageserver::virtual_file::api::IoMode;
|
||||
use pageserver::{page_cache, virtual_file};
|
||||
use pageserver_api::key::Key;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use crate::layer_map_analyzer::parse_filename;
|
||||
@@ -28,7 +27,6 @@ pub(crate) enum LayerCmd {
|
||||
path: PathBuf,
|
||||
tenant: String,
|
||||
timeline: String,
|
||||
key: Option<Key>,
|
||||
},
|
||||
/// Dump all information of a layer file
|
||||
DumpLayer {
|
||||
@@ -102,7 +100,6 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
path,
|
||||
tenant,
|
||||
timeline,
|
||||
key,
|
||||
} => {
|
||||
let timeline_path = path
|
||||
.join(TENANTS_SEGMENT_NAME)
|
||||
@@ -110,37 +107,21 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
.join(TIMELINES_SEGMENT_NAME)
|
||||
.join(timeline);
|
||||
let mut idx = 0;
|
||||
let mut to_print = Vec::default();
|
||||
for layer in fs::read_dir(timeline_path)? {
|
||||
let layer = layer?;
|
||||
if let Ok(layer_file) = parse_filename(&layer.file_name().into_string().unwrap()) {
|
||||
if let Some(key) = key {
|
||||
if layer_file.key_range.start <= *key && *key < layer_file.key_range.end {
|
||||
to_print.push((idx, layer_file));
|
||||
}
|
||||
} else {
|
||||
to_print.push((idx, layer_file));
|
||||
}
|
||||
println!(
|
||||
"[{:3}] key:{}-{}\n lsn:{}-{}\n delta:{}",
|
||||
idx,
|
||||
layer_file.key_range.start,
|
||||
layer_file.key_range.end,
|
||||
layer_file.lsn_range.start,
|
||||
layer_file.lsn_range.end,
|
||||
layer_file.is_delta,
|
||||
);
|
||||
idx += 1;
|
||||
}
|
||||
}
|
||||
|
||||
if key.is_some() {
|
||||
to_print
|
||||
.sort_by_key(|(_idx, layer_file)| std::cmp::Reverse(layer_file.lsn_range.end));
|
||||
}
|
||||
|
||||
for (idx, layer_file) in to_print {
|
||||
println!(
|
||||
"[{:3}] key:{}-{}\n lsn:{}-{}\n delta:{}",
|
||||
idx,
|
||||
layer_file.key_range.start,
|
||||
layer_file.key_range.end,
|
||||
layer_file.lsn_range.start,
|
||||
layer_file.lsn_range.end,
|
||||
layer_file.is_delta,
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
LayerCmd::DumpLayer {
|
||||
|
||||
@@ -2469,7 +2469,6 @@ async fn timeline_checkpoint_handler(
|
||||
.map_err(|e|
|
||||
match e {
|
||||
CompactionError::ShuttingDown => ApiError::ShuttingDown,
|
||||
CompactionError::Cancelled => ApiError::ShuttingDown,
|
||||
CompactionError::Offload(e) => ApiError::InternalServerError(anyhow::anyhow!(e)),
|
||||
CompactionError::CollectKeySpaceError(e) => ApiError::InternalServerError(anyhow::anyhow!(e)),
|
||||
CompactionError::Other(e) => ApiError::InternalServerError(e),
|
||||
|
||||
@@ -3198,7 +3198,6 @@ impl TenantShard {
|
||||
match err {
|
||||
err if err.is_cancel() => {}
|
||||
CompactionError::ShuttingDown => (),
|
||||
CompactionError::Cancelled => (),
|
||||
// Offload failures don't trip the circuit breaker, since they're cheap to retry and
|
||||
// shouldn't block compaction.
|
||||
CompactionError::Offload(_) => {}
|
||||
|
||||
@@ -90,18 +90,10 @@ impl Header {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum BlobWriterError {
|
||||
#[error("cancelled")]
|
||||
Cancelled,
|
||||
#[error(transparent)]
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum WriteBlobError {
|
||||
#[error(transparent)]
|
||||
Flush(BlobWriterError),
|
||||
Flush(FlushTaskError),
|
||||
#[error("blob too large ({len} bytes)")]
|
||||
BlobTooLarge { len: usize },
|
||||
#[error(transparent)]
|
||||
@@ -246,16 +238,14 @@ where
|
||||
cancel: CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
flush_task_span: tracing::Span,
|
||||
) -> Result<Self, BlobWriterError> {
|
||||
let gate_token = gate.enter().map_err(|_| BlobWriterError::Cancelled)?;
|
||||
|
||||
) -> anyhow::Result<Self> {
|
||||
Ok(Self {
|
||||
io_buf: Some(BytesMut::new()),
|
||||
writer: BufferedWriter::new(
|
||||
file,
|
||||
start_offset,
|
||||
|| IoBufferMut::with_capacity(Self::CAPACITY),
|
||||
gate_token,
|
||||
gate.enter()?,
|
||||
cancel,
|
||||
ctx,
|
||||
flush_task_span,
|
||||
@@ -275,16 +265,13 @@ where
|
||||
&mut self,
|
||||
src_buf: FullSlice<Buf>,
|
||||
ctx: &RequestContext,
|
||||
) -> (FullSlice<Buf>, Result<(), BlobWriterError>) {
|
||||
) -> (FullSlice<Buf>, Result<(), FlushTaskError>) {
|
||||
let res = self
|
||||
.writer
|
||||
// TODO: why are we taking a FullSlice if we're going to pass a borrow downstack?
|
||||
// Can remove all the complexity around owned buffers upstack
|
||||
.write_buffered_borrowed(&src_buf, ctx)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
FlushTaskError::Cancelled => BlobWriterError::Cancelled,
|
||||
})
|
||||
.map(|len| {
|
||||
self.offset += len as u64;
|
||||
});
|
||||
@@ -431,10 +418,8 @@ where
|
||||
self,
|
||||
mode: BufferedWriterShutdownMode,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<W, BlobWriterError> {
|
||||
let (_, file) = self.writer.shutdown(mode, ctx).await.map_err(|e| match e {
|
||||
FlushTaskError::Cancelled => BlobWriterError::Cancelled,
|
||||
})?;
|
||||
) -> Result<W, FlushTaskError> {
|
||||
let (_, file) = self.writer.shutdown(mode, ctx).await?;
|
||||
Ok(file)
|
||||
}
|
||||
}
|
||||
@@ -482,11 +467,8 @@ pub(crate) mod tests {
|
||||
.await?,
|
||||
gate.enter()?,
|
||||
);
|
||||
let mut wtr = BlobWriter::new(file, 0, &gate, cancel.clone(), ctx, info_span!("test"))
|
||||
.map_err(|e| match e {
|
||||
BlobWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"),
|
||||
BlobWriterError::Other(err) => err,
|
||||
})?;
|
||||
let mut wtr =
|
||||
BlobWriter::new(file, 0, &gate, cancel.clone(), ctx, info_span!("test")).unwrap();
|
||||
for blob in blobs.iter() {
|
||||
let (_, res) = if compression {
|
||||
let res = wtr
|
||||
@@ -508,11 +490,7 @@ pub(crate) mod tests {
|
||||
BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
BlobWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"),
|
||||
BlobWriterError::Other(err) => err,
|
||||
})?;
|
||||
.await?;
|
||||
file.disarm_into_inner()
|
||||
};
|
||||
Ok((temp_dir, pathbuf, offsets))
|
||||
|
||||
@@ -19,14 +19,6 @@ use crate::context::RequestContext;
|
||||
use crate::tenant::Timeline;
|
||||
use crate::tenant::storage_layer::Layer;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum BatchSplitWriterError {
|
||||
#[error("cancelled")]
|
||||
Cancelled,
|
||||
#[error(transparent)]
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
pub(crate) enum BatchWriterResult {
|
||||
Produced(ResidentLayer),
|
||||
Discarded(PersistentLayerKey),
|
||||
@@ -105,7 +97,7 @@ impl BatchLayerWriter {
|
||||
self,
|
||||
tline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Vec<ResidentLayer>, BatchSplitWriterError> {
|
||||
) -> anyhow::Result<Vec<ResidentLayer>> {
|
||||
let res = self
|
||||
.finish_with_discard_fn(tline, ctx, |_| async { false })
|
||||
.await?;
|
||||
@@ -123,7 +115,7 @@ impl BatchLayerWriter {
|
||||
tline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
discard_fn: D,
|
||||
) -> Result<Vec<BatchWriterResult>, BatchSplitWriterError>
|
||||
) -> anyhow::Result<Vec<BatchWriterResult>>
|
||||
where
|
||||
D: Fn(&PersistentLayerKey) -> F,
|
||||
F: Future<Output = bool>,
|
||||
@@ -147,14 +139,14 @@ impl BatchLayerWriter {
|
||||
generated_layers.push(BatchWriterResult::Discarded(layer_key));
|
||||
} else {
|
||||
let res = match inner {
|
||||
LayerWriterWrapper::Delta(writer) => writer
|
||||
.finish(layer_key.key_range.end, ctx)
|
||||
.await
|
||||
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e))),
|
||||
LayerWriterWrapper::Image(writer) => writer
|
||||
.finish_with_end_key(layer_key.key_range.end, ctx)
|
||||
.await
|
||||
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e))),
|
||||
LayerWriterWrapper::Delta(writer) => {
|
||||
writer.finish(layer_key.key_range.end, ctx).await
|
||||
}
|
||||
LayerWriterWrapper::Image(writer) => {
|
||||
writer
|
||||
.finish_with_end_key(layer_key.key_range.end, ctx)
|
||||
.await
|
||||
}
|
||||
};
|
||||
let layer = match res {
|
||||
Ok((desc, path)) => {
|
||||
@@ -163,7 +155,7 @@ impl BatchLayerWriter {
|
||||
Err(e) => {
|
||||
tokio::fs::remove_file(&path).await.ok();
|
||||
clean_up_layers(generated_layers);
|
||||
return Err(BatchSplitWriterError::Other(e));
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -243,7 +235,7 @@ impl<'a> SplitImageLayerWriter<'a> {
|
||||
key: Key,
|
||||
img: Bytes,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), BatchSplitWriterError> {
|
||||
) -> anyhow::Result<()> {
|
||||
// The current estimation is an upper bound of the space that the key/image could take
|
||||
// because we did not consider compression in this estimation. The resulting image layer
|
||||
// could be smaller than the target size.
|
||||
@@ -261,8 +253,7 @@ impl<'a> SplitImageLayerWriter<'a> {
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e)))?;
|
||||
.await?;
|
||||
let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
|
||||
self.batches.add_unfinished_image_writer(
|
||||
prev_image_writer,
|
||||
@@ -271,10 +262,7 @@ impl<'a> SplitImageLayerWriter<'a> {
|
||||
);
|
||||
self.start_key = key;
|
||||
}
|
||||
self.inner
|
||||
.put_image(key, img, ctx)
|
||||
.await
|
||||
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e)))
|
||||
self.inner.put_image(key, img, ctx).await
|
||||
}
|
||||
|
||||
pub(crate) async fn finish_with_discard_fn<D, F>(
|
||||
@@ -283,7 +271,7 @@ impl<'a> SplitImageLayerWriter<'a> {
|
||||
ctx: &RequestContext,
|
||||
end_key: Key,
|
||||
discard_fn: D,
|
||||
) -> Result<Vec<BatchWriterResult>, BatchSplitWriterError>
|
||||
) -> anyhow::Result<Vec<BatchWriterResult>>
|
||||
where
|
||||
D: Fn(&PersistentLayerKey) -> F,
|
||||
F: Future<Output = bool>,
|
||||
@@ -303,7 +291,7 @@ impl<'a> SplitImageLayerWriter<'a> {
|
||||
tline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
end_key: Key,
|
||||
) -> Result<Vec<BatchWriterResult>, BatchSplitWriterError> {
|
||||
) -> anyhow::Result<Vec<BatchWriterResult>> {
|
||||
self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
|
||||
.await
|
||||
}
|
||||
@@ -358,7 +346,7 @@ impl<'a> SplitDeltaLayerWriter<'a> {
|
||||
lsn: Lsn,
|
||||
val: Value,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), BatchSplitWriterError> {
|
||||
) -> anyhow::Result<()> {
|
||||
// The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
|
||||
// number, and therefore the final layer size could be a little bit larger or smaller than the target.
|
||||
//
|
||||
@@ -378,8 +366,7 @@ impl<'a> SplitDeltaLayerWriter<'a> {
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e)))?,
|
||||
.await?,
|
||||
));
|
||||
}
|
||||
let (_, inner) = self.inner.as_mut().unwrap();
|
||||
@@ -399,8 +386,7 @@ impl<'a> SplitDeltaLayerWriter<'a> {
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e)))?;
|
||||
.await?;
|
||||
let (start_key, prev_delta_writer) =
|
||||
self.inner.replace((key, next_delta_writer)).unwrap();
|
||||
self.batches.add_unfinished_delta_writer(
|
||||
@@ -410,19 +396,16 @@ impl<'a> SplitDeltaLayerWriter<'a> {
|
||||
);
|
||||
} else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
|
||||
// We have to produce a very large file b/c a key is updated too often.
|
||||
return Err(BatchSplitWriterError::Other(anyhow::anyhow!(
|
||||
anyhow::bail!(
|
||||
"a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
|
||||
key,
|
||||
inner.estimated_size()
|
||||
)));
|
||||
);
|
||||
}
|
||||
}
|
||||
self.last_key_written = key;
|
||||
let (_, inner) = self.inner.as_mut().unwrap();
|
||||
inner
|
||||
.put_value(key, lsn, val, ctx)
|
||||
.await
|
||||
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e)))
|
||||
inner.put_value(key, lsn, val, ctx).await
|
||||
}
|
||||
|
||||
pub(crate) async fn finish_with_discard_fn<D, F>(
|
||||
@@ -430,7 +413,7 @@ impl<'a> SplitDeltaLayerWriter<'a> {
|
||||
tline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
discard_fn: D,
|
||||
) -> Result<Vec<BatchWriterResult>, BatchSplitWriterError>
|
||||
) -> anyhow::Result<Vec<BatchWriterResult>>
|
||||
where
|
||||
D: Fn(&PersistentLayerKey) -> F,
|
||||
F: Future<Output = bool>,
|
||||
@@ -456,7 +439,7 @@ impl<'a> SplitDeltaLayerWriter<'a> {
|
||||
self,
|
||||
tline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Vec<BatchWriterResult>, BatchSplitWriterError> {
|
||||
) -> anyhow::Result<Vec<BatchWriterResult>> {
|
||||
self.finish_with_discard_fn(tline, ctx, |_| async { false })
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -35,9 +35,7 @@ use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
|
||||
use crate::tenant::blob_io::BlobWriterError;
|
||||
|
||||
use anyhow::{Context, Result, bail};
|
||||
use anyhow::{Context, Result, bail, ensure};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use futures::StreamExt;
|
||||
use itertools::Itertools;
|
||||
@@ -78,7 +76,7 @@ use crate::tenant::vectored_blob_io::{
|
||||
use crate::virtual_file::TempVirtualFile;
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
|
||||
use crate::virtual_file::owned_buffers_io::write::{Buffer, BufferedWriterShutdownMode};
|
||||
use crate::virtual_file::{self, IoBuffer, IoBufferMut, VirtualFile};
|
||||
use crate::virtual_file::{self, IoBuffer, IoBufferMut, MaybeFatalIo, VirtualFile};
|
||||
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
|
||||
|
||||
///
|
||||
@@ -450,11 +448,7 @@ impl DeltaLayerWriterInner {
|
||||
cancel,
|
||||
ctx,
|
||||
info_span!(parent: None, "delta_layer_writer_flush_task", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %path),
|
||||
)
|
||||
.map_err(|e| match e {
|
||||
BlobWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"),
|
||||
BlobWriterError::Other(err) => err,
|
||||
})?;
|
||||
)?;
|
||||
|
||||
// Initialize the b-tree index builder
|
||||
let block_buf = BlockBuf::new();
|
||||
@@ -483,12 +477,15 @@ impl DeltaLayerWriterInner {
|
||||
lsn: Lsn,
|
||||
val: Value,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), DeltaLayerWriterError> {
|
||||
let val_ser =
|
||||
Value::ser(&val).map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
|
||||
|
||||
) -> anyhow::Result<()> {
|
||||
let (_, res) = self
|
||||
.put_value_bytes(key, lsn, val_ser.slice_len(), val.will_init(), ctx)
|
||||
.put_value_bytes(
|
||||
key,
|
||||
lsn,
|
||||
Value::ser(&val)?.slice_len(),
|
||||
val.will_init(),
|
||||
ctx,
|
||||
)
|
||||
.await;
|
||||
res
|
||||
}
|
||||
@@ -500,46 +497,25 @@ impl DeltaLayerWriterInner {
|
||||
val: FullSlice<Buf>,
|
||||
will_init: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> (FullSlice<Buf>, Result<(), DeltaLayerWriterError>)
|
||||
) -> (FullSlice<Buf>, anyhow::Result<()>)
|
||||
where
|
||||
Buf: IoBuf + Send,
|
||||
{
|
||||
if self.lsn_range.start > lsn {
|
||||
return (
|
||||
val,
|
||||
Err(DeltaLayerWriterError::Other(anyhow::anyhow!(
|
||||
"lsn_start={}, lsn={}",
|
||||
self.lsn_range.start,
|
||||
lsn
|
||||
))),
|
||||
);
|
||||
}
|
||||
|
||||
assert!(
|
||||
self.lsn_range.start <= lsn,
|
||||
"lsn_start={}, lsn={}",
|
||||
self.lsn_range.start,
|
||||
lsn
|
||||
);
|
||||
// We don't want to use compression in delta layer creation
|
||||
let compression = ImageCompressionAlgorithm::Disabled;
|
||||
let (val, res) = self
|
||||
.blob_writer
|
||||
.write_blob_maybe_compressed(val, ctx, compression)
|
||||
.await;
|
||||
|
||||
let off = match res {
|
||||
Ok((off, _)) => off,
|
||||
Err(e) => {
|
||||
return (
|
||||
val,
|
||||
Err(match e {
|
||||
crate::tenant::blob_io::WriteBlobError::Flush(blob_err) => match blob_err {
|
||||
crate::tenant::blob_io::BlobWriterError::Cancelled => {
|
||||
DeltaLayerWriterError::Cancelled
|
||||
}
|
||||
crate::tenant::blob_io::BlobWriterError::Other(err) => {
|
||||
DeltaLayerWriterError::Other(err)
|
||||
}
|
||||
},
|
||||
other => DeltaLayerWriterError::Other(anyhow::anyhow!(other)),
|
||||
}),
|
||||
);
|
||||
}
|
||||
Err(e) => return (val, Err(anyhow::anyhow!(e))),
|
||||
};
|
||||
|
||||
let blob_ref = BlobRef::new(off, will_init);
|
||||
@@ -549,10 +525,7 @@ impl DeltaLayerWriterInner {
|
||||
|
||||
self.num_keys += 1;
|
||||
|
||||
(
|
||||
val,
|
||||
res.map_err(|e| DeltaLayerWriterError::Other(anyhow::anyhow!(e))),
|
||||
)
|
||||
(val, res.map_err(|e| anyhow::anyhow!(e)))
|
||||
}
|
||||
|
||||
fn size(&self) -> u64 {
|
||||
@@ -566,7 +539,7 @@ impl DeltaLayerWriterInner {
|
||||
self,
|
||||
key_end: Key,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(PersistentLayerDesc, Utf8PathBuf), DeltaLayerWriterError> {
|
||||
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
|
||||
let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
|
||||
|
||||
let file = self
|
||||
@@ -575,24 +548,17 @@ impl DeltaLayerWriterInner {
|
||||
BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
BlobWriterError::Cancelled => DeltaLayerWriterError::Cancelled,
|
||||
BlobWriterError::Other(err) => DeltaLayerWriterError::Other(err),
|
||||
})?;
|
||||
.await?;
|
||||
|
||||
// Write out the index
|
||||
let (index_root_blk, block_buf) = self
|
||||
.tree
|
||||
.finish()
|
||||
.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
|
||||
let (index_root_blk, block_buf) = self.tree.finish()?;
|
||||
let mut offset = index_start_blk as u64 * PAGE_SZ as u64;
|
||||
|
||||
// TODO(yuchen): https://github.com/neondatabase/neon/issues/10092
|
||||
// Should we just replace BlockBuf::blocks with one big buffer
|
||||
for buf in block_buf.blocks {
|
||||
let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await;
|
||||
res.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
|
||||
res?;
|
||||
offset += PAGE_SZ as u64;
|
||||
}
|
||||
assert!(self.lsn_range.start < self.lsn_range.end);
|
||||
@@ -609,27 +575,24 @@ impl DeltaLayerWriterInner {
|
||||
};
|
||||
|
||||
// Writes summary at the first block (offset 0).
|
||||
let buf = summary
|
||||
.ser_into_page()
|
||||
.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
|
||||
let buf = summary.ser_into_page()?;
|
||||
let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
|
||||
res.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
|
||||
res?;
|
||||
|
||||
let metadata = file
|
||||
.metadata()
|
||||
.await
|
||||
.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
|
||||
.context("get file metadata to determine size")?;
|
||||
|
||||
// 5GB limit for objects without multipart upload (which we don't want to use)
|
||||
// Make it a little bit below to account for differing GB units
|
||||
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html
|
||||
if metadata.len() > S3_UPLOAD_LIMIT {
|
||||
return Err(DeltaLayerWriterError::Other(anyhow::anyhow!(
|
||||
"Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
|
||||
file.path(),
|
||||
metadata.len()
|
||||
)));
|
||||
}
|
||||
ensure!(
|
||||
metadata.len() <= S3_UPLOAD_LIMIT,
|
||||
"Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
|
||||
file.path(),
|
||||
metadata.len()
|
||||
);
|
||||
|
||||
// Note: Because we opened the file in write-only mode, we cannot
|
||||
// reuse the same VirtualFile for reading later. That's why we don't
|
||||
@@ -646,7 +609,7 @@ impl DeltaLayerWriterInner {
|
||||
// fsync the file
|
||||
file.sync_all()
|
||||
.await
|
||||
.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
|
||||
.maybe_fatal_err("delta_layer sync_all")?;
|
||||
|
||||
trace!("created delta layer {}", self.path);
|
||||
|
||||
@@ -731,7 +694,7 @@ impl DeltaLayerWriter {
|
||||
lsn: Lsn,
|
||||
val: Value,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), DeltaLayerWriterError> {
|
||||
) -> anyhow::Result<()> {
|
||||
self.inner
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
@@ -746,7 +709,7 @@ impl DeltaLayerWriter {
|
||||
val: FullSlice<Buf>,
|
||||
will_init: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> (FullSlice<Buf>, Result<(), DeltaLayerWriterError>)
|
||||
) -> (FullSlice<Buf>, anyhow::Result<()>)
|
||||
where
|
||||
Buf: IoBuf + Send,
|
||||
{
|
||||
@@ -768,7 +731,7 @@ impl DeltaLayerWriter {
|
||||
mut self,
|
||||
key_end: Key,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(PersistentLayerDesc, Utf8PathBuf), DeltaLayerWriterError> {
|
||||
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
|
||||
self.inner.take().unwrap().finish(key_end, ctx).await
|
||||
}
|
||||
|
||||
@@ -782,14 +745,6 @@ impl DeltaLayerWriter {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum DeltaLayerWriterError {
|
||||
#[error("cancelled")]
|
||||
Cancelled,
|
||||
#[error(transparent)]
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum RewriteSummaryError {
|
||||
#[error("magic mismatch")]
|
||||
@@ -800,7 +755,7 @@ pub enum RewriteSummaryError {
|
||||
|
||||
impl From<std::io::Error> for RewriteSummaryError {
|
||||
fn from(e: std::io::Error) -> Self {
|
||||
Self::Other(anyhow::Error::new(e))
|
||||
Self::Other(anyhow::anyhow!(e))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -33,9 +33,7 @@ use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
|
||||
use crate::tenant::blob_io::BlobWriterError;
|
||||
|
||||
use anyhow::{Context, Result, bail};
|
||||
use anyhow::{Context, Result, bail, ensure};
|
||||
use bytes::Bytes;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use hex;
|
||||
@@ -76,7 +74,7 @@ use crate::tenant::vectored_blob_io::{
|
||||
use crate::virtual_file::TempVirtualFile;
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
|
||||
use crate::virtual_file::owned_buffers_io::write::{Buffer, BufferedWriterShutdownMode};
|
||||
use crate::virtual_file::{self, IoBuffer, IoBufferMut, VirtualFile};
|
||||
use crate::virtual_file::{self, IoBuffer, IoBufferMut, MaybeFatalIo, VirtualFile};
|
||||
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
|
||||
|
||||
///
|
||||
@@ -342,14 +340,6 @@ impl ImageLayer {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ImageLayerWriterError {
|
||||
#[error("flush task cancelled")]
|
||||
Cancelled,
|
||||
#[error(transparent)]
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum RewriteSummaryError {
|
||||
#[error("magic mismatch")]
|
||||
@@ -824,11 +814,7 @@ impl ImageLayerWriterInner {
|
||||
cancel,
|
||||
ctx,
|
||||
info_span!(parent: None, "image_layer_writer_flush_task", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %path),
|
||||
)
|
||||
.map_err(|e| match e {
|
||||
BlobWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"),
|
||||
BlobWriterError::Other(err) => err,
|
||||
})?;
|
||||
)?;
|
||||
|
||||
// Initialize the b-tree index builder
|
||||
let block_buf = BlockBuf::new();
|
||||
@@ -864,13 +850,8 @@ impl ImageLayerWriterInner {
|
||||
key: Key,
|
||||
img: Bytes,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), ImageLayerWriterError> {
|
||||
if !self.key_range.contains(&key) {
|
||||
return Err(ImageLayerWriterError::Other(anyhow::anyhow!(
|
||||
"key not in range"
|
||||
)));
|
||||
}
|
||||
|
||||
) -> anyhow::Result<()> {
|
||||
ensure!(self.key_range.contains(&key));
|
||||
let compression = self.conf.image_compression;
|
||||
let uncompressed_len = img.len() as u64;
|
||||
self.uncompressed_bytes += uncompressed_len;
|
||||
@@ -880,18 +861,7 @@ impl ImageLayerWriterInner {
|
||||
.write_blob_maybe_compressed(img.slice_len(), ctx, compression)
|
||||
.await;
|
||||
// TODO: re-use the buffer for `img` further upstack
|
||||
let (off, compression_info) = res.map_err(|e| match e {
|
||||
crate::tenant::blob_io::WriteBlobError::Flush(blob_err) => match blob_err {
|
||||
crate::tenant::blob_io::BlobWriterError::Cancelled => {
|
||||
ImageLayerWriterError::Cancelled
|
||||
}
|
||||
crate::tenant::blob_io::BlobWriterError::Other(err) => {
|
||||
ImageLayerWriterError::Other(err)
|
||||
}
|
||||
},
|
||||
other => ImageLayerWriterError::Other(anyhow::anyhow!(other)),
|
||||
})?;
|
||||
|
||||
let (off, compression_info) = res?;
|
||||
if compression_info.compressed_size.is_some() {
|
||||
// The image has been considered for compression at least
|
||||
self.uncompressed_bytes_eligible += uncompressed_len;
|
||||
@@ -903,9 +873,7 @@ impl ImageLayerWriterInner {
|
||||
|
||||
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
||||
key.write_to_byte_slice(&mut keybuf);
|
||||
self.tree
|
||||
.append(&keybuf, off)
|
||||
.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
|
||||
self.tree.append(&keybuf, off)?;
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
{
|
||||
@@ -925,12 +893,8 @@ impl ImageLayerWriterInner {
|
||||
key: Key,
|
||||
raw_with_header: Bytes,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), ImageLayerWriterError> {
|
||||
if !self.key_range.contains(&key) {
|
||||
return Err(ImageLayerWriterError::Other(anyhow::anyhow!(
|
||||
"key not in range"
|
||||
)));
|
||||
}
|
||||
) -> anyhow::Result<()> {
|
||||
ensure!(self.key_range.contains(&key));
|
||||
|
||||
// NB: we don't update the (un)compressed metrics, since we can't determine them without
|
||||
// decompressing the image. This seems okay.
|
||||
@@ -940,23 +904,11 @@ impl ImageLayerWriterInner {
|
||||
.blob_writer
|
||||
.write_blob_raw(raw_with_header.slice_len(), ctx)
|
||||
.await;
|
||||
let offset = res.map_err(|e| match e {
|
||||
crate::tenant::blob_io::WriteBlobError::Flush(blob_err) => match blob_err {
|
||||
crate::tenant::blob_io::BlobWriterError::Cancelled => {
|
||||
ImageLayerWriterError::Cancelled
|
||||
}
|
||||
crate::tenant::blob_io::BlobWriterError::Other(err) => {
|
||||
ImageLayerWriterError::Other(err)
|
||||
}
|
||||
},
|
||||
other => ImageLayerWriterError::Other(anyhow::anyhow!(other)),
|
||||
})?;
|
||||
let offset = res?;
|
||||
|
||||
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
||||
key.write_to_byte_slice(&mut keybuf);
|
||||
self.tree
|
||||
.append(&keybuf, offset)
|
||||
.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
|
||||
self.tree.append(&keybuf, offset)?;
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
{
|
||||
@@ -973,7 +925,7 @@ impl ImageLayerWriterInner {
|
||||
self,
|
||||
ctx: &RequestContext,
|
||||
end_key: Option<Key>,
|
||||
) -> Result<(PersistentLayerDesc, Utf8PathBuf), ImageLayerWriterError> {
|
||||
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
|
||||
let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
|
||||
|
||||
// Calculate compression ratio
|
||||
@@ -996,24 +948,17 @@ impl ImageLayerWriterInner {
|
||||
BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
BlobWriterError::Cancelled => ImageLayerWriterError::Cancelled,
|
||||
BlobWriterError::Other(err) => ImageLayerWriterError::Other(err),
|
||||
})?;
|
||||
.await?;
|
||||
|
||||
// Write out the index
|
||||
let mut offset = index_start_blk as u64 * PAGE_SZ as u64;
|
||||
let (index_root_blk, block_buf) = self
|
||||
.tree
|
||||
.finish()
|
||||
.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
|
||||
let (index_root_blk, block_buf) = self.tree.finish()?;
|
||||
|
||||
// TODO(yuchen): https://github.com/neondatabase/neon/issues/10092
|
||||
// Should we just replace BlockBuf::blocks with one big buffer?
|
||||
for buf in block_buf.blocks {
|
||||
let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await;
|
||||
res.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
|
||||
res?;
|
||||
offset += PAGE_SZ as u64;
|
||||
}
|
||||
|
||||
@@ -1036,18 +981,14 @@ impl ImageLayerWriterInner {
|
||||
};
|
||||
|
||||
// Writes summary at the first block (offset 0).
|
||||
let buf = summary
|
||||
.ser_into_page()
|
||||
.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
|
||||
let buf = summary.ser_into_page()?;
|
||||
let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
|
||||
res.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
|
||||
res?;
|
||||
|
||||
let metadata = file.metadata().await.map_err(|e| {
|
||||
ImageLayerWriterError::Other(anyhow::anyhow!(
|
||||
"get metadata to determine file size: {}",
|
||||
e
|
||||
))
|
||||
})?;
|
||||
let metadata = file
|
||||
.metadata()
|
||||
.await
|
||||
.context("get metadata to determine file size")?;
|
||||
|
||||
let desc = PersistentLayerDesc::new_img(
|
||||
self.tenant_shard_id,
|
||||
@@ -1070,9 +1011,9 @@ impl ImageLayerWriterInner {
|
||||
// set inner.file here. The first read will have to re-open it.
|
||||
|
||||
// fsync the file
|
||||
file.sync_all().await.map_err(|e| {
|
||||
ImageLayerWriterError::Other(anyhow::anyhow!("image_layer sync_all: {}", e))
|
||||
})?;
|
||||
file.sync_all()
|
||||
.await
|
||||
.maybe_fatal_err("image_layer sync_all")?;
|
||||
|
||||
trace!("created image layer {}", self.path);
|
||||
|
||||
@@ -1152,7 +1093,7 @@ impl ImageLayerWriter {
|
||||
key: Key,
|
||||
img: Bytes,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), ImageLayerWriterError> {
|
||||
) -> anyhow::Result<()> {
|
||||
self.inner.as_mut().unwrap().put_image(key, img, ctx).await
|
||||
}
|
||||
|
||||
@@ -1167,7 +1108,7 @@ impl ImageLayerWriter {
|
||||
key: Key,
|
||||
raw_with_header: Bytes,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), ImageLayerWriterError> {
|
||||
) -> anyhow::Result<()> {
|
||||
self.inner
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
@@ -1191,7 +1132,7 @@ impl ImageLayerWriter {
|
||||
pub(crate) async fn finish(
|
||||
mut self,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(PersistentLayerDesc, Utf8PathBuf), ImageLayerWriterError> {
|
||||
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
|
||||
self.inner.take().unwrap().finish(ctx, None).await
|
||||
}
|
||||
|
||||
@@ -1200,7 +1141,7 @@ impl ImageLayerWriter {
|
||||
mut self,
|
||||
end_key: Key,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(PersistentLayerDesc, Utf8PathBuf), ImageLayerWriterError> {
|
||||
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
|
||||
self.inner.take().unwrap().finish(ctx, Some(end_key)).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,7 +26,6 @@ use utils::lsn::Lsn;
|
||||
use utils::vec_map::VecMap;
|
||||
use wal_decoder::serialized_batch::{SerializedValueBatch, SerializedValueMeta, ValueMeta};
|
||||
|
||||
use super::delta_layer::DeltaLayerWriterError;
|
||||
use super::{DeltaLayerWriter, PersistentLayerDesc, ValuesReconstructState};
|
||||
use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64, u64_to_usize};
|
||||
use crate::config::PageServerConf;
|
||||
@@ -582,17 +581,7 @@ impl InMemoryLayer {
|
||||
estimated_in_mem_size: AtomicU64::new(0),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum InMemoryLayerError {
|
||||
#[error("flush task cancelled")]
|
||||
Cancelled,
|
||||
#[error(transparent)]
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
impl InMemoryLayer {
|
||||
/// Write path.
|
||||
///
|
||||
/// Errors are not retryable, the [`InMemoryLayer`] must be discarded, and not be read from.
|
||||
@@ -602,7 +591,7 @@ impl InMemoryLayer {
|
||||
&self,
|
||||
serialized_batch: SerializedValueBatch,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), InMemoryLayerError> {
|
||||
) -> anyhow::Result<()> {
|
||||
let mut inner = self.inner.write().await;
|
||||
self.assert_writable();
|
||||
|
||||
@@ -616,11 +605,7 @@ impl InMemoryLayer {
|
||||
} = serialized_batch;
|
||||
|
||||
// Write the batch to the file
|
||||
inner
|
||||
.file
|
||||
.write_raw(&raw, ctx)
|
||||
.await
|
||||
.map_err(|e| InMemoryLayerError::Other(anyhow::anyhow!(e)))?;
|
||||
inner.file.write_raw(&raw, ctx).await?;
|
||||
let new_size = inner.file.len();
|
||||
|
||||
let expected_new_len = base_offset
|
||||
@@ -652,8 +637,7 @@ impl InMemoryLayer {
|
||||
batch_offset,
|
||||
len,
|
||||
will_init,
|
||||
})
|
||||
.map_err(|e| InMemoryLayerError::Other(anyhow::anyhow!(e)))?;
|
||||
})?;
|
||||
|
||||
let vec_map = inner.index.entry(key).or_default();
|
||||
let old = vec_map.append_or_update_last(lsn, index_entry).unwrap().0;
|
||||
@@ -810,25 +794,14 @@ impl InMemoryLayer {
|
||||
ctx,
|
||||
)
|
||||
.await;
|
||||
res.map_err(|e| match e {
|
||||
DeltaLayerWriterError::Cancelled => {
|
||||
anyhow::anyhow!("flush task cancelled")
|
||||
}
|
||||
DeltaLayerWriterError::Other(err) => err,
|
||||
})?;
|
||||
res?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// MAX is used here because we identify L0 layers by full key range
|
||||
let (desc, path) = delta_layer_writer
|
||||
.finish(Key::MAX, ctx)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
DeltaLayerWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"),
|
||||
DeltaLayerWriterError::Other(err) => err,
|
||||
})?;
|
||||
let (desc, path) = delta_layer_writer.finish(Key::MAX, ctx).await?;
|
||||
|
||||
// Hold the permit until all the IO is done, including the fsync in `delta_layer_writer.finish()``.
|
||||
//
|
||||
|
||||
@@ -300,7 +300,6 @@ pub(crate) fn log_compaction_error(
|
||||
let level = match err {
|
||||
e if e.is_cancel() => return,
|
||||
ShuttingDown => return,
|
||||
Cancelled => return,
|
||||
Offload(_) => Level::ERROR,
|
||||
AlreadyRunning(_) => Level::ERROR,
|
||||
CollectKeySpaceError(_) => Level::ERROR,
|
||||
|
||||
@@ -119,8 +119,6 @@ use crate::tenant::gc_result::GcResult;
|
||||
use crate::tenant::layer_map::LayerMap;
|
||||
use crate::tenant::metadata::TimelineMetadata;
|
||||
use crate::tenant::storage_layer::delta_layer::DeltaEntry;
|
||||
use crate::tenant::storage_layer::image_layer::ImageLayerWriterError;
|
||||
use crate::tenant::storage_layer::inmemory_layer::InMemoryLayerError;
|
||||
use crate::tenant::storage_layer::inmemory_layer::IndexEntry;
|
||||
use crate::tenant::storage_layer::{
|
||||
AsLayerDesc, BatchLayerWriter, DeltaLayerWriter, EvictionError, ImageLayerName,
|
||||
@@ -775,21 +773,6 @@ impl From<layer_manager::Shutdown> for CreateImageLayersError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<crate::tenant::storage_layer::batch_split_writer::BatchSplitWriterError>
|
||||
for CreateImageLayersError
|
||||
{
|
||||
fn from(err: crate::tenant::storage_layer::batch_split_writer::BatchSplitWriterError) -> Self {
|
||||
match err {
|
||||
crate::tenant::storage_layer::batch_split_writer::BatchSplitWriterError::Cancelled => {
|
||||
Self::Cancelled
|
||||
}
|
||||
crate::tenant::storage_layer::batch_split_writer::BatchSplitWriterError::Other(err) => {
|
||||
Self::Other(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug, Clone)]
|
||||
pub(crate) enum FlushLayerError {
|
||||
/// Timeline cancellation token was cancelled
|
||||
@@ -2058,9 +2041,6 @@ impl Timeline {
|
||||
Err(CompactionError::ShuttingDown) => {
|
||||
// Covered by the `Err(e) if e.is_cancel()` branch.
|
||||
}
|
||||
Err(CompactionError::Cancelled) => {
|
||||
// Covered by the `Err(e) if e.is_cancel()` branch.
|
||||
}
|
||||
Err(CompactionError::AlreadyRunning(_)) => {
|
||||
// Covered by the `Err(e) if e.is_cancel()` branch.
|
||||
}
|
||||
@@ -5252,17 +5232,7 @@ impl Timeline {
|
||||
};
|
||||
|
||||
// Write all the keys we just read into our new image layer.
|
||||
image_layer_writer
|
||||
.put_image(img_key, img, ctx)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
ImageLayerWriterError::Cancelled => CreateImageLayersError::Other(
|
||||
anyhow::anyhow!("flush task cancelled"),
|
||||
),
|
||||
ImageLayerWriterError::Other(err) => {
|
||||
CreateImageLayersError::Other(err)
|
||||
}
|
||||
})?;
|
||||
image_layer_writer.put_image(img_key, img, ctx).await?;
|
||||
wrote_keys = true;
|
||||
}
|
||||
}
|
||||
@@ -5359,15 +5329,7 @@ impl Timeline {
|
||||
|
||||
// TODO: split image layers to avoid too large layer files. Too large image files are not handled
|
||||
// on the normal data path either.
|
||||
image_layer_writer
|
||||
.put_image(k, v, ctx)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
ImageLayerWriterError::Cancelled => {
|
||||
CreateImageLayersError::Other(anyhow::anyhow!("flush task cancelled"))
|
||||
}
|
||||
ImageLayerWriterError::Other(err) => CreateImageLayersError::Other(err),
|
||||
})?;
|
||||
image_layer_writer.put_image(k, v, ctx).await?;
|
||||
}
|
||||
|
||||
if wrote_any_image {
|
||||
@@ -5881,8 +5843,6 @@ pub(crate) enum CompactionError {
|
||||
Other(anyhow::Error),
|
||||
#[error("Compaction already running: {0}")]
|
||||
AlreadyRunning(&'static str),
|
||||
#[error("cancelled")]
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
impl CompactionError {
|
||||
@@ -5897,7 +5857,6 @@ impl CompactionError {
|
||||
PageReconstructError::Cancelled
|
||||
))
|
||||
| Self::Offload(OffloadError::Cancelled)
|
||||
| Self::Cancelled
|
||||
)
|
||||
}
|
||||
|
||||
@@ -6963,22 +6922,9 @@ impl Timeline {
|
||||
)
|
||||
.await?;
|
||||
for (key, img) in images {
|
||||
image_layer_writer
|
||||
.put_image(key, img, ctx)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
ImageLayerWriterError::Cancelled => {
|
||||
anyhow::anyhow!("flush task cancelled")
|
||||
}
|
||||
ImageLayerWriterError::Other(err) => err,
|
||||
})?;
|
||||
image_layer_writer.put_image(key, img, ctx).await?;
|
||||
}
|
||||
let (desc, path) = image_layer_writer.finish(ctx).await.map_err(|e| match e {
|
||||
ImageLayerWriterError::Cancelled => {
|
||||
anyhow::anyhow!("flush task cancelled")
|
||||
}
|
||||
ImageLayerWriterError::Other(err) => err,
|
||||
})?;
|
||||
let (desc, path) = image_layer_writer.finish(ctx).await?;
|
||||
let image_layer = Layer::finish_creating(self.conf, self, desc, &path)?;
|
||||
info!("force created image layer {}", image_layer.local_path());
|
||||
{
|
||||
@@ -7432,10 +7378,7 @@ impl TimelineWriter<'_> {
|
||||
state.max_lsn = std::cmp::max(state.max_lsn, Some(batch_max_lsn));
|
||||
}
|
||||
|
||||
res.map_err(|e| match e {
|
||||
InMemoryLayerError::Cancelled => anyhow::anyhow!("flush task cancelled"),
|
||||
InMemoryLayerError::Other(err) => err,
|
||||
})
|
||||
res
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -1547,7 +1547,7 @@ impl Timeline {
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
|
||||
.map_err(CompactionError::Other)?;
|
||||
|
||||
// Safety of layer rewrites:
|
||||
// - We are writing to a different local file path than we are reading from, so the old Layer
|
||||
@@ -1572,7 +1572,7 @@ impl Timeline {
|
||||
let (desc, path) = image_layer_writer
|
||||
.finish(ctx)
|
||||
.await
|
||||
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
|
||||
.map_err(CompactionError::Other)?;
|
||||
let new_layer = Layer::finish_creating(self.conf, self, desc, &path)
|
||||
.map_err(CompactionError::Other)?;
|
||||
info!(layer=%new_layer, "rewrote layer, {} -> {} bytes",
|
||||
@@ -2140,7 +2140,7 @@ impl Timeline {
|
||||
.unwrap()
|
||||
.finish(prev_key.unwrap().next(), ctx)
|
||||
.await
|
||||
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
|
||||
.map_err(CompactionError::Other)?;
|
||||
let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
|
||||
.map_err(CompactionError::Other)?;
|
||||
|
||||
@@ -2199,7 +2199,7 @@ impl Timeline {
|
||||
.unwrap()
|
||||
.put_value(key, lsn, value, ctx)
|
||||
.await
|
||||
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
|
||||
.map_err(CompactionError::Other)?;
|
||||
} else {
|
||||
let owner = self.shard_identity.get_shard_number(&key);
|
||||
|
||||
@@ -2217,7 +2217,7 @@ impl Timeline {
|
||||
let (desc, path) = writer
|
||||
.finish(prev_key.unwrap().next(), ctx)
|
||||
.await
|
||||
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
|
||||
.map_err(CompactionError::Other)?;
|
||||
let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
|
||||
.map_err(CompactionError::Other)?;
|
||||
new_layers.push(new_delta);
|
||||
@@ -3682,7 +3682,8 @@ impl Timeline {
|
||||
let (desc, path) = delta_writer_before
|
||||
.finish(job_desc.compaction_key_range.start, ctx)
|
||||
.await
|
||||
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
|
||||
.context("failed to finish delta layer writer")
|
||||
.map_err(CompactionError::Other)?;
|
||||
let layer = Layer::finish_creating(self.conf, self, desc, &path)
|
||||
.context("failed to finish creating delta layer")
|
||||
.map_err(CompactionError::Other)?;
|
||||
@@ -3692,7 +3693,8 @@ impl Timeline {
|
||||
let (desc, path) = delta_writer_after
|
||||
.finish(key.key_range.end, ctx)
|
||||
.await
|
||||
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
|
||||
.context("failed to finish delta layer writer")
|
||||
.map_err(CompactionError::Other)?;
|
||||
let layer = Layer::finish_creating(self.conf, self, desc, &path)
|
||||
.context("failed to finish creating delta layer")
|
||||
.map_err(CompactionError::Other)?;
|
||||
@@ -3711,7 +3713,8 @@ impl Timeline {
|
||||
writer
|
||||
.finish_with_discard_fn(self, ctx, end_key, discard)
|
||||
.await
|
||||
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?
|
||||
.context("failed to finish image layer writer")
|
||||
.map_err(CompactionError::Other)?
|
||||
} else {
|
||||
drop(writer);
|
||||
Vec::new()
|
||||
@@ -3724,7 +3727,8 @@ impl Timeline {
|
||||
delta_layer_writer
|
||||
.finish_with_discard_fn(self, ctx, discard)
|
||||
.await
|
||||
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?
|
||||
.context("failed to finish delta layer writer")
|
||||
.map_err(CompactionError::Other)?
|
||||
} else {
|
||||
drop(delta_layer_writer);
|
||||
Vec::new()
|
||||
@@ -4249,10 +4253,7 @@ impl TimelineAdaptor {
|
||||
unfinished_image_layer,
|
||||
} = outcome
|
||||
{
|
||||
let (desc, path) = unfinished_image_layer
|
||||
.finish(ctx)
|
||||
.await
|
||||
.map_err(|e| CreateImageLayersError::Other(anyhow::anyhow!(e)))?;
|
||||
let (desc, path) = unfinished_image_layer.finish(ctx).await?;
|
||||
let image_layer =
|
||||
Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
|
||||
self.new_images.push(image_layer);
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::tenant::storage_layer::delta_layer::DeltaLayerWriterError;
|
||||
use anyhow::Context;
|
||||
use bytes::Bytes;
|
||||
use http_utils::error::ApiError;
|
||||
@@ -817,10 +816,7 @@ async fn copy_lsn_prefix(
|
||||
let (desc, path) = writer
|
||||
.finish(reused_highest_key, ctx)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
DeltaLayerWriterError::Cancelled => Error::ShuttingDown,
|
||||
DeltaLayerWriterError::Other(err) => Error::Prepare(err),
|
||||
})?;
|
||||
.map_err(Error::Prepare)?;
|
||||
let copied = Layer::finish_creating(target_timeline.conf, target_timeline, desc, &path)
|
||||
.map_err(Error::Prepare)?;
|
||||
|
||||
|
||||
@@ -14,6 +14,8 @@
|
||||
use std::fs::File;
|
||||
use std::io::{Error, ErrorKind};
|
||||
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
|
||||
#[cfg(target_os = "linux")]
|
||||
use std::os::unix::fs::OpenOptionsExt;
|
||||
use std::sync::LazyLock;
|
||||
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering};
|
||||
|
||||
@@ -97,7 +99,7 @@ impl VirtualFile {
|
||||
|
||||
pub async fn open_with_options_v2<P: AsRef<Utf8Path>>(
|
||||
path: P,
|
||||
#[cfg_attr(not(target_os = "linux"), allow(unused_mut))] mut open_options: OpenOptions,
|
||||
open_options: &OpenOptions,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Self, std::io::Error> {
|
||||
let mode = get_io_mode();
|
||||
@@ -110,16 +112,21 @@ impl VirtualFile {
|
||||
#[cfg(target_os = "linux")]
|
||||
(IoMode::DirectRw, _) => true,
|
||||
};
|
||||
if set_o_direct {
|
||||
let open_options = open_options.clone();
|
||||
let open_options = if set_o_direct {
|
||||
#[cfg(target_os = "linux")]
|
||||
{
|
||||
open_options = open_options.custom_flags(nix::libc::O_DIRECT);
|
||||
let mut open_options = open_options;
|
||||
open_options.custom_flags(nix::libc::O_DIRECT);
|
||||
open_options
|
||||
}
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
unreachable!(
|
||||
"O_DIRECT is not supported on this platform, IoMode's that result in set_o_direct=true shouldn't even be defined"
|
||||
);
|
||||
}
|
||||
} else {
|
||||
open_options
|
||||
};
|
||||
let inner = VirtualFileInner::open_with_options(path, open_options, ctx).await?;
|
||||
Ok(VirtualFile { inner, _mode: mode })
|
||||
}
|
||||
@@ -523,7 +530,7 @@ impl VirtualFileInner {
|
||||
path: P,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<VirtualFileInner, std::io::Error> {
|
||||
Self::open_with_options(path.as_ref(), OpenOptions::new().read(true), ctx).await
|
||||
Self::open_with_options(path.as_ref(), OpenOptions::new().read(true).clone(), ctx).await
|
||||
}
|
||||
|
||||
/// Open a file with given options.
|
||||
@@ -551,11 +558,10 @@ impl VirtualFileInner {
|
||||
// It would perhaps be nicer to check just for the read and write flags
|
||||
// explicitly, but OpenOptions doesn't contain any functions to read flags,
|
||||
// only to set them.
|
||||
let reopen_options = open_options
|
||||
.clone()
|
||||
.create(false)
|
||||
.create_new(false)
|
||||
.truncate(false);
|
||||
let mut reopen_options = open_options.clone();
|
||||
reopen_options.create(false);
|
||||
reopen_options.create_new(false);
|
||||
reopen_options.truncate(false);
|
||||
|
||||
let vfile = VirtualFileInner {
|
||||
handle: RwLock::new(handle),
|
||||
@@ -1301,7 +1307,7 @@ mod tests {
|
||||
opts: OpenOptions,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<MaybeVirtualFile, anyhow::Error> {
|
||||
let vf = VirtualFile::open_with_options_v2(&path, opts, ctx).await?;
|
||||
let vf = VirtualFile::open_with_options_v2(&path, &opts, ctx).await?;
|
||||
Ok(MaybeVirtualFile::VirtualFile(vf))
|
||||
}
|
||||
}
|
||||
@@ -1368,7 +1374,7 @@ mod tests {
|
||||
let _ = file_a.read_string_at(0, 1, &ctx).await.unwrap_err();
|
||||
|
||||
// Close the file and re-open for reading
|
||||
let mut file_a = A::open(path_a, OpenOptions::new().read(true), &ctx).await?;
|
||||
let mut file_a = A::open(path_a, OpenOptions::new().read(true).to_owned(), &ctx).await?;
|
||||
|
||||
// cannot write to a file opened in read-only mode
|
||||
let _ = file_a
|
||||
@@ -1387,7 +1393,8 @@ mod tests {
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create(true)
|
||||
.truncate(true),
|
||||
.truncate(true)
|
||||
.to_owned(),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -1405,7 +1412,12 @@ mod tests {
|
||||
|
||||
let mut vfiles = Vec::new();
|
||||
for _ in 0..100 {
|
||||
let mut vfile = A::open(path_b.clone(), OpenOptions::new().read(true), &ctx).await?;
|
||||
let mut vfile = A::open(
|
||||
path_b.clone(),
|
||||
OpenOptions::new().read(true).to_owned(),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
assert_eq!("FOOBAR", vfile.read_string_at(0, 6, &ctx).await?);
|
||||
vfiles.push(vfile);
|
||||
}
|
||||
@@ -1454,7 +1466,7 @@ mod tests {
|
||||
for _ in 0..VIRTUAL_FILES {
|
||||
let f = VirtualFileInner::open_with_options(
|
||||
&test_file_path,
|
||||
OpenOptions::new().read(true),
|
||||
OpenOptions::new().read(true).clone(),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
//! Enum-dispatch to the `OpenOptions` type of the respective [`super::IoEngineKind`];
|
||||
|
||||
use std::os::fd::OwnedFd;
|
||||
use std::os::unix::fs::OpenOptionsExt;
|
||||
use std::path::Path;
|
||||
|
||||
use super::io_engine::IoEngine;
|
||||
@@ -44,7 +43,7 @@ impl OpenOptions {
|
||||
self.write
|
||||
}
|
||||
|
||||
pub fn read(mut self, read: bool) -> Self {
|
||||
pub fn read(&mut self, read: bool) -> &mut OpenOptions {
|
||||
match &mut self.inner {
|
||||
Inner::StdFs(x) => {
|
||||
let _ = x.read(read);
|
||||
@@ -57,7 +56,7 @@ impl OpenOptions {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn write(mut self, write: bool) -> Self {
|
||||
pub fn write(&mut self, write: bool) -> &mut OpenOptions {
|
||||
self.write = write;
|
||||
match &mut self.inner {
|
||||
Inner::StdFs(x) => {
|
||||
@@ -71,7 +70,7 @@ impl OpenOptions {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn create(mut self, create: bool) -> Self {
|
||||
pub fn create(&mut self, create: bool) -> &mut OpenOptions {
|
||||
match &mut self.inner {
|
||||
Inner::StdFs(x) => {
|
||||
let _ = x.create(create);
|
||||
@@ -84,7 +83,7 @@ impl OpenOptions {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn create_new(mut self, create_new: bool) -> Self {
|
||||
pub fn create_new(&mut self, create_new: bool) -> &mut OpenOptions {
|
||||
match &mut self.inner {
|
||||
Inner::StdFs(x) => {
|
||||
let _ = x.create_new(create_new);
|
||||
@@ -97,7 +96,7 @@ impl OpenOptions {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn truncate(mut self, truncate: bool) -> Self {
|
||||
pub fn truncate(&mut self, truncate: bool) -> &mut OpenOptions {
|
||||
match &mut self.inner {
|
||||
Inner::StdFs(x) => {
|
||||
let _ = x.truncate(truncate);
|
||||
@@ -125,8 +124,10 @@ impl OpenOptions {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn mode(mut self, mode: u32) -> Self {
|
||||
impl std::os::unix::prelude::OpenOptionsExt for OpenOptions {
|
||||
fn mode(&mut self, mode: u32) -> &mut OpenOptions {
|
||||
match &mut self.inner {
|
||||
Inner::StdFs(x) => {
|
||||
let _ = x.mode(mode);
|
||||
@@ -139,7 +140,7 @@ impl OpenOptions {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn custom_flags(mut self, flags: i32) -> Self {
|
||||
fn custom_flags(&mut self, flags: i32) -> &mut OpenOptions {
|
||||
match &mut self.inner {
|
||||
Inner::StdFs(x) => {
|
||||
let _ = x.custom_flags(flags);
|
||||
|
||||
@@ -26,7 +26,6 @@
|
||||
#include "portability/instr_time.h"
|
||||
#include "postmaster/interrupt.h"
|
||||
#include "storage/buf_internals.h"
|
||||
#include "storage/fd.h"
|
||||
#include "storage/ipc.h"
|
||||
#include "storage/lwlock.h"
|
||||
#include "storage/pg_shmem.h"
|
||||
@@ -80,7 +79,6 @@ int neon_protocol_version = 3;
|
||||
static int neon_compute_mode = 0;
|
||||
static int max_reconnect_attempts = 60;
|
||||
static int stripe_size;
|
||||
static int max_sockets;
|
||||
|
||||
static int pageserver_response_log_timeout = 10000;
|
||||
/* 2.5 minutes. A bit higher than highest default TCP retransmission timeout */
|
||||
@@ -338,13 +336,6 @@ load_shard_map(shardno_t shard_no, char *connstr_p, shardno_t *num_shards_p)
|
||||
pageserver_disconnect(i);
|
||||
}
|
||||
pagestore_local_counter = end_update_counter;
|
||||
|
||||
/* Reserve file descriptors for sockets */
|
||||
while (max_sockets < num_shards)
|
||||
{
|
||||
max_sockets += 1;
|
||||
ReserveExternalFD();
|
||||
}
|
||||
}
|
||||
|
||||
if (num_shards_p)
|
||||
|
||||
@@ -4,7 +4,6 @@
|
||||
|
||||
#include "miscadmin.h"
|
||||
#include "access/xlog.h"
|
||||
#include "access/xlog_internal.h"
|
||||
#include "storage/ipc.h"
|
||||
#include "storage/shmem.h"
|
||||
#include "storage/buf_internals.h"
|
||||
@@ -397,10 +396,9 @@ SetLastWrittenLSNForBlockRangeInternal(XLogRecPtr lsn,
|
||||
XLogRecPtr
|
||||
neon_set_lwlsn_block_range(XLogRecPtr lsn, NRelFileInfo rlocator, ForkNumber forknum, BlockNumber from, BlockNumber n_blocks)
|
||||
{
|
||||
if (lsn == InvalidXLogRecPtr || n_blocks == 0 || LwLsnCache->lastWrittenLsnCacheSize == 0)
|
||||
if (lsn < FirstNormalUnloggedLSN || n_blocks == 0 || LwLsnCache->lastWrittenLsnCacheSize == 0)
|
||||
return lsn;
|
||||
|
||||
Assert(lsn >= WalSegMinSize);
|
||||
LWLockAcquire(LastWrittenLsnLock, LW_EXCLUSIVE);
|
||||
lsn = SetLastWrittenLSNForBlockRangeInternal(lsn, rlocator, forknum, from, n_blocks);
|
||||
LWLockRelease(LastWrittenLsnLock);
|
||||
@@ -437,6 +435,7 @@ neon_set_lwlsn_block_v(const XLogRecPtr *lsns, NRelFileInfo relfilenode,
|
||||
NInfoGetRelNumber(relfilenode) == InvalidOid)
|
||||
return InvalidXLogRecPtr;
|
||||
|
||||
|
||||
BufTagInit(key, relNumber, forknum, blockno, spcOid, dbOid);
|
||||
|
||||
LWLockAcquire(LastWrittenLsnLock, LW_EXCLUSIVE);
|
||||
@@ -445,10 +444,6 @@ neon_set_lwlsn_block_v(const XLogRecPtr *lsns, NRelFileInfo relfilenode,
|
||||
{
|
||||
XLogRecPtr lsn = lsns[i];
|
||||
|
||||
if (lsn == InvalidXLogRecPtr)
|
||||
continue;
|
||||
|
||||
Assert(lsn >= WalSegMinSize);
|
||||
key.blockNum = blockno + i;
|
||||
entry = hash_search(lastWrittenLsnCache, &key, HASH_ENTER, &found);
|
||||
if (found)
|
||||
|
||||
@@ -150,7 +150,7 @@ NeonWALReaderFree(NeonWALReader *state)
|
||||
* fetched from timeline 'tli'.
|
||||
*
|
||||
* Returns NEON_WALREAD_SUCCESS if succeeded, NEON_WALREAD_ERROR if an error
|
||||
* occurs, in which case 'err' has the description. Error always closes remote
|
||||
* occurs, in which case 'err' has the desciption. Error always closes remote
|
||||
* connection, if there was any, so socket subscription should be removed.
|
||||
*
|
||||
* NEON_WALREAD_WOULDBLOCK means caller should obtain socket to wait for with
|
||||
|
||||
@@ -1989,14 +1989,8 @@ neon_start_unlogged_build(SMgrRelation reln)
|
||||
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
|
||||
}
|
||||
|
||||
#if PG_MAJORVERSION_NUM >= 17
|
||||
/*
|
||||
* We have to disable this check for pg14-16 because sorted build of GIST index requires
|
||||
* to perform unlogged build several times
|
||||
*/
|
||||
if (smgrnblocks(reln, MAIN_FORKNUM) != 0)
|
||||
neon_log(ERROR, "cannot perform unlogged index build, index is not empty ");
|
||||
#endif
|
||||
|
||||
unlogged_build_rel = reln;
|
||||
unlogged_build_phase = UNLOGGED_BUILD_PHASE_1;
|
||||
|
||||
@@ -124,7 +124,6 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
|
||||
}
|
||||
else
|
||||
{
|
||||
wp->safekeepers_generation = INVALID_GENERATION;
|
||||
host = wp->config->safekeepers_list;
|
||||
}
|
||||
wp_log(LOG, "safekeepers_generation=%u", wp->safekeepers_generation);
|
||||
@@ -757,7 +756,7 @@ UpdateMemberSafekeeperPtr(WalProposer *wp, Safekeeper *sk)
|
||||
{
|
||||
SafekeeperId *sk_id = &wp->mconf.members.m[i];
|
||||
|
||||
if (sk_id->node_id == sk->greetResponse.nodeId)
|
||||
if (wp->mconf.members.m[i].node_id == sk->greetResponse.nodeId)
|
||||
{
|
||||
/*
|
||||
* If mconf or list of safekeepers to connect to changed (the
|
||||
@@ -782,7 +781,7 @@ UpdateMemberSafekeeperPtr(WalProposer *wp, Safekeeper *sk)
|
||||
{
|
||||
SafekeeperId *sk_id = &wp->mconf.new_members.m[i];
|
||||
|
||||
if (sk_id->node_id == sk->greetResponse.nodeId)
|
||||
if (wp->mconf.new_members.m[i].node_id == sk->greetResponse.nodeId)
|
||||
{
|
||||
if (wp->new_members_safekeepers[i] != NULL && wp->new_members_safekeepers[i] != sk)
|
||||
{
|
||||
@@ -1072,6 +1071,7 @@ RecvVoteResponse(Safekeeper *sk)
|
||||
/* ready for elected message */
|
||||
sk->state = SS_WAIT_ELECTED;
|
||||
|
||||
wp->n_votes++;
|
||||
/* Are we already elected? */
|
||||
if (wp->state == WPS_CAMPAIGN)
|
||||
{
|
||||
|
||||
@@ -845,6 +845,9 @@ typedef struct WalProposer
|
||||
/* timeline globally starts at this LSN */
|
||||
XLogRecPtr timelineStartLsn;
|
||||
|
||||
/* number of votes collected from safekeepers */
|
||||
int n_votes;
|
||||
|
||||
/* number of successful connections over the lifetime of walproposer */
|
||||
int n_connected;
|
||||
|
||||
|
||||
@@ -409,22 +409,14 @@ impl JwkCacheEntryLock {
|
||||
|
||||
if let Some(exp) = payload.expiration {
|
||||
if now >= exp + CLOCK_SKEW_LEEWAY {
|
||||
return Err(JwtError::InvalidClaims(JwtClaimsError::JwtTokenHasExpired(
|
||||
exp.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_secs(),
|
||||
)));
|
||||
return Err(JwtError::InvalidClaims(JwtClaimsError::JwtTokenHasExpired));
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(nbf) = payload.not_before {
|
||||
if nbf >= now + CLOCK_SKEW_LEEWAY {
|
||||
return Err(JwtError::InvalidClaims(
|
||||
JwtClaimsError::JwtTokenNotYetReadyToUse(
|
||||
nbf.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_secs(),
|
||||
),
|
||||
JwtClaimsError::JwtTokenNotYetReadyToUse,
|
||||
));
|
||||
}
|
||||
}
|
||||
@@ -542,10 +534,10 @@ struct JwtPayload<'a> {
|
||||
#[serde(rename = "aud", default)]
|
||||
audience: OneOrMany,
|
||||
/// Expiration - Time after which the JWT expires
|
||||
#[serde(rename = "exp", deserialize_with = "numeric_date_opt", default)]
|
||||
#[serde(deserialize_with = "numeric_date_opt", rename = "exp", default)]
|
||||
expiration: Option<SystemTime>,
|
||||
/// Not before - Time before which the JWT is not valid
|
||||
#[serde(rename = "nbf", deserialize_with = "numeric_date_opt", default)]
|
||||
/// Not before - Time after which the JWT expires
|
||||
#[serde(deserialize_with = "numeric_date_opt", rename = "nbf", default)]
|
||||
not_before: Option<SystemTime>,
|
||||
|
||||
// the following entries are only extracted for the sake of debug logging.
|
||||
@@ -617,15 +609,8 @@ impl<'de> Deserialize<'de> for OneOrMany {
|
||||
}
|
||||
|
||||
fn numeric_date_opt<'de, D: Deserializer<'de>>(d: D) -> Result<Option<SystemTime>, D::Error> {
|
||||
<Option<u64>>::deserialize(d)?
|
||||
.map(|t| {
|
||||
SystemTime::UNIX_EPOCH
|
||||
.checked_add(Duration::from_secs(t))
|
||||
.ok_or_else(|| {
|
||||
serde::de::Error::custom(format_args!("timestamp out of bounds: {t}"))
|
||||
})
|
||||
})
|
||||
.transpose()
|
||||
let d = <Option<u64>>::deserialize(d)?;
|
||||
Ok(d.map(|n| SystemTime::UNIX_EPOCH + Duration::from_secs(n)))
|
||||
}
|
||||
|
||||
struct JwkRenewalPermit<'a> {
|
||||
@@ -761,11 +746,11 @@ pub enum JwtClaimsError {
|
||||
#[error("invalid JWT token audience")]
|
||||
InvalidJwtTokenAudience,
|
||||
|
||||
#[error("JWT token has expired (exp={0})")]
|
||||
JwtTokenHasExpired(u64),
|
||||
#[error("JWT token has expired")]
|
||||
JwtTokenHasExpired,
|
||||
|
||||
#[error("JWT token is not yet ready to use (nbf={0})")]
|
||||
JwtTokenNotYetReadyToUse(u64),
|
||||
#[error("JWT token is not yet ready to use")]
|
||||
JwtTokenNotYetReadyToUse,
|
||||
}
|
||||
|
||||
#[allow(dead_code, reason = "Debug use only")]
|
||||
@@ -1248,14 +1233,14 @@ X0n5X2/pBLJzxZc62ccvZYVnctBiFs6HbSnxpuMQCfkt/BcR/ttIepBQQIW86wHL
|
||||
"nbf": now + 60,
|
||||
"aud": "neon",
|
||||
}},
|
||||
error: JwtClaimsError::JwtTokenNotYetReadyToUse(now + 60),
|
||||
error: JwtClaimsError::JwtTokenNotYetReadyToUse,
|
||||
},
|
||||
Test {
|
||||
body: json! {{
|
||||
"exp": now - 60,
|
||||
"aud": ["neon"],
|
||||
}},
|
||||
error: JwtClaimsError::JwtTokenHasExpired(now - 60),
|
||||
error: JwtClaimsError::JwtTokenHasExpired,
|
||||
},
|
||||
Test {
|
||||
body: json! {{
|
||||
|
||||
@@ -32,6 +32,12 @@ pub(crate) enum ComputeUserInfoParseError {
|
||||
option: EndpointId,
|
||||
},
|
||||
|
||||
#[error(
|
||||
"Common name inferred from SNI ('{}') is not known",
|
||||
.cn,
|
||||
)]
|
||||
UnknownCommonName { cn: String },
|
||||
|
||||
#[error("Project name ('{0}') must contain only alphanumeric characters and hyphen.")]
|
||||
MalformedProjectName(EndpointId),
|
||||
}
|
||||
@@ -60,15 +66,22 @@ impl ComputeUserInfoMaybeEndpoint {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn endpoint_sni(sni: &str, common_names: &HashSet<String>) -> Option<EndpointId> {
|
||||
let (subdomain, common_name) = sni.split_once('.')?;
|
||||
pub(crate) fn endpoint_sni(
|
||||
sni: &str,
|
||||
common_names: &HashSet<String>,
|
||||
) -> Result<Option<EndpointId>, ComputeUserInfoParseError> {
|
||||
let Some((subdomain, common_name)) = sni.split_once('.') else {
|
||||
return Err(ComputeUserInfoParseError::UnknownCommonName { cn: sni.into() });
|
||||
};
|
||||
if !common_names.contains(common_name) {
|
||||
return None;
|
||||
return Err(ComputeUserInfoParseError::UnknownCommonName {
|
||||
cn: common_name.into(),
|
||||
});
|
||||
}
|
||||
if subdomain == SERVERLESS_DRIVER_SNI {
|
||||
return None;
|
||||
return Ok(None);
|
||||
}
|
||||
Some(EndpointId::from(subdomain))
|
||||
Ok(Some(EndpointId::from(subdomain)))
|
||||
}
|
||||
|
||||
impl ComputeUserInfoMaybeEndpoint {
|
||||
@@ -100,8 +113,15 @@ impl ComputeUserInfoMaybeEndpoint {
|
||||
})
|
||||
.map(|name| name.into());
|
||||
|
||||
let endpoint_from_domain =
|
||||
sni.and_then(|sni_str| common_names.and_then(|cn| endpoint_sni(sni_str, cn)));
|
||||
let endpoint_from_domain = if let Some(sni_str) = sni {
|
||||
if let Some(cn) = common_names {
|
||||
endpoint_sni(sni_str, cn)?
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let endpoint = match (endpoint_option, endpoint_from_domain) {
|
||||
// Invariant: if we have both project name variants, they should match.
|
||||
@@ -404,34 +424,21 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_unknown_sni() {
|
||||
fn parse_inconsistent_sni() {
|
||||
let options = StartupMessageParams::new([("user", "john_doe")]);
|
||||
|
||||
let sni = Some("project.localhost");
|
||||
let common_names = Some(["example.com".into()].into());
|
||||
|
||||
let ctx = RequestContext::test();
|
||||
let info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())
|
||||
.unwrap();
|
||||
|
||||
assert!(info.endpoint_id.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_unknown_sni_with_options() {
|
||||
let options = StartupMessageParams::new([
|
||||
("user", "john_doe"),
|
||||
("options", "endpoint=foo-bar-baz-1234"),
|
||||
]);
|
||||
|
||||
let sni = Some("project.localhost");
|
||||
let common_names = Some(["example.com".into()].into());
|
||||
|
||||
let ctx = RequestContext::test();
|
||||
let info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(info.endpoint_id.as_deref(), Some("foo-bar-baz-1234"));
|
||||
let err = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())
|
||||
.expect_err("should fail");
|
||||
match err {
|
||||
UnknownCommonName { cn } => {
|
||||
assert_eq!(cn, "localhost");
|
||||
}
|
||||
_ => panic!("bad error: {err:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -24,6 +24,9 @@ pub(crate) enum HandshakeError {
|
||||
#[error("protocol violation")]
|
||||
ProtocolViolation,
|
||||
|
||||
#[error("missing certificate")]
|
||||
MissingCertificate,
|
||||
|
||||
#[error("{0}")]
|
||||
StreamUpgradeError(#[from] StreamUpgradeError),
|
||||
|
||||
@@ -39,6 +42,10 @@ impl ReportableError for HandshakeError {
|
||||
match self {
|
||||
HandshakeError::EarlyData => crate::error::ErrorKind::User,
|
||||
HandshakeError::ProtocolViolation => crate::error::ErrorKind::User,
|
||||
// This error should not happen, but will if we have no default certificate and
|
||||
// the client sends no SNI extension.
|
||||
// If they provide SNI then we can be sure there is a certificate that matches.
|
||||
HandshakeError::MissingCertificate => crate::error::ErrorKind::Service,
|
||||
HandshakeError::StreamUpgradeError(upgrade) => match upgrade {
|
||||
StreamUpgradeError::AlreadyTls => crate::error::ErrorKind::Service,
|
||||
StreamUpgradeError::Io(_) => crate::error::ErrorKind::ClientDisconnect,
|
||||
@@ -139,7 +146,7 @@ pub(crate) async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
// try parse endpoint
|
||||
let ep = conn_info
|
||||
.server_name()
|
||||
.and_then(|sni| endpoint_sni(sni, &tls.common_names));
|
||||
.and_then(|sni| endpoint_sni(sni, &tls.common_names).ok().flatten());
|
||||
if let Some(ep) = ep {
|
||||
ctx.set_endpoint_id(ep);
|
||||
}
|
||||
@@ -154,8 +161,10 @@ pub(crate) async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
}
|
||||
}
|
||||
|
||||
let (_, tls_server_end_point) =
|
||||
tls.cert_resolver.resolve(conn_info.server_name());
|
||||
let (_, tls_server_end_point) = tls
|
||||
.cert_resolver
|
||||
.resolve(conn_info.server_name())
|
||||
.ok_or(HandshakeError::MissingCertificate)?;
|
||||
|
||||
stream = PqStream {
|
||||
framed: Framed {
|
||||
|
||||
@@ -98,7 +98,8 @@ fn generate_tls_config<'a>(
|
||||
.with_no_client_auth()
|
||||
.with_single_cert(vec![cert.clone()], key.clone_key())?;
|
||||
|
||||
let cert_resolver = CertResolver::new(key, vec![cert])?;
|
||||
let mut cert_resolver = CertResolver::new();
|
||||
cert_resolver.add_cert(key, vec![cert], true)?;
|
||||
|
||||
let common_names = cert_resolver.get_common_names();
|
||||
|
||||
|
||||
@@ -199,7 +199,8 @@ fn get_conn_info(
|
||||
let endpoint = match connection_url.host() {
|
||||
Some(url::Host::Domain(hostname)) => {
|
||||
if let Some(tls) = tls {
|
||||
endpoint_sni(hostname, &tls.common_names).ok_or(ConnInfoError::MalformedEndpoint)?
|
||||
endpoint_sni(hostname, &tls.common_names)?
|
||||
.ok_or(ConnInfoError::MalformedEndpoint)?
|
||||
} else {
|
||||
hostname
|
||||
.split_once('.')
|
||||
|
||||
@@ -5,7 +5,6 @@ use anyhow::{Context, bail};
|
||||
use itertools::Itertools;
|
||||
use rustls::crypto::ring::{self, sign};
|
||||
use rustls::pki_types::{CertificateDer, PrivateKeyDer};
|
||||
use rustls::sign::CertifiedKey;
|
||||
use x509_cert::der::{Reader, SliceReader};
|
||||
|
||||
use super::{PG_ALPN_PROTOCOL, TlsServerEndPoint};
|
||||
@@ -26,8 +25,10 @@ pub fn configure_tls(
|
||||
certs_dir: Option<&String>,
|
||||
allow_tls_keylogfile: bool,
|
||||
) -> anyhow::Result<TlsConfig> {
|
||||
let mut cert_resolver = CertResolver::new();
|
||||
|
||||
// add default certificate
|
||||
let mut cert_resolver = CertResolver::parse_new(key_path, cert_path)?;
|
||||
cert_resolver.add_cert_path(key_path, cert_path, true)?;
|
||||
|
||||
// add extra certificates
|
||||
if let Some(certs_dir) = certs_dir {
|
||||
@@ -39,8 +40,11 @@ pub fn configure_tls(
|
||||
let key_path = path.join("tls.key");
|
||||
let cert_path = path.join("tls.crt");
|
||||
if key_path.exists() && cert_path.exists() {
|
||||
cert_resolver
|
||||
.add_cert_path(&key_path.to_string_lossy(), &cert_path.to_string_lossy())?;
|
||||
cert_resolver.add_cert_path(
|
||||
&key_path.to_string_lossy(),
|
||||
&cert_path.to_string_lossy(),
|
||||
false,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -79,42 +83,92 @@ pub fn configure_tls(
|
||||
})
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Default, Debug)]
|
||||
pub struct CertResolver {
|
||||
certs: HashMap<String, (Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint)>,
|
||||
default: (Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint),
|
||||
default: Option<(Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint)>,
|
||||
}
|
||||
|
||||
impl CertResolver {
|
||||
fn parse_new(key_path: &str, cert_path: &str) -> anyhow::Result<Self> {
|
||||
let (priv_key, cert_chain) = parse_key_cert(key_path, cert_path)?;
|
||||
Self::new(priv_key, cert_chain)
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
pub fn new(
|
||||
priv_key: PrivateKeyDer<'static>,
|
||||
cert_chain: Vec<CertificateDer<'static>>,
|
||||
) -> anyhow::Result<Self> {
|
||||
let (common_name, cert, tls_server_end_point) = process_key_cert(priv_key, cert_chain)?;
|
||||
fn add_cert_path(
|
||||
&mut self,
|
||||
key_path: &str,
|
||||
cert_path: &str,
|
||||
is_default: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
let priv_key = {
|
||||
let key_bytes = std::fs::read(key_path)
|
||||
.with_context(|| format!("Failed to read TLS keys at '{key_path}'"))?;
|
||||
rustls_pemfile::private_key(&mut &key_bytes[..])
|
||||
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
|
||||
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
|
||||
};
|
||||
|
||||
let mut certs = HashMap::new();
|
||||
let default = (cert.clone(), tls_server_end_point);
|
||||
certs.insert(common_name, (cert, tls_server_end_point));
|
||||
Ok(Self { certs, default })
|
||||
let cert_chain_bytes = std::fs::read(cert_path)
|
||||
.context(format!("Failed to read TLS cert file at '{cert_path}.'"))?;
|
||||
|
||||
let cert_chain = {
|
||||
rustls_pemfile::certs(&mut &cert_chain_bytes[..])
|
||||
.try_collect()
|
||||
.with_context(|| {
|
||||
format!("Failed to read TLS certificate chain from bytes from file at '{cert_path}'.")
|
||||
})?
|
||||
};
|
||||
|
||||
self.add_cert(priv_key, cert_chain, is_default)
|
||||
}
|
||||
|
||||
fn add_cert_path(&mut self, key_path: &str, cert_path: &str) -> anyhow::Result<()> {
|
||||
let (priv_key, cert_chain) = parse_key_cert(key_path, cert_path)?;
|
||||
self.add_cert(priv_key, cert_chain)
|
||||
}
|
||||
|
||||
fn add_cert(
|
||||
pub fn add_cert(
|
||||
&mut self,
|
||||
priv_key: PrivateKeyDer<'static>,
|
||||
cert_chain: Vec<CertificateDer<'static>>,
|
||||
is_default: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
let (common_name, cert, tls_server_end_point) = process_key_cert(priv_key, cert_chain)?;
|
||||
let key = sign::any_supported_type(&priv_key).context("invalid private key")?;
|
||||
|
||||
let first_cert = &cert_chain[0];
|
||||
let tls_server_end_point = TlsServerEndPoint::new(first_cert)?;
|
||||
|
||||
let certificate = SliceReader::new(first_cert)
|
||||
.context("Failed to parse cerficiate")?
|
||||
.decode::<x509_cert::Certificate>()
|
||||
.context("Failed to parse cerficiate")?;
|
||||
|
||||
let common_name = certificate.tbs_certificate.subject.to_string();
|
||||
|
||||
// We need to get the canonical name for this certificate so we can match them against any domain names
|
||||
// seen within the proxy codebase.
|
||||
//
|
||||
// In scram-proxy we use wildcard certificates only, with the database endpoint as the wildcard subdomain, taken from SNI.
|
||||
// We need to remove the wildcard prefix for the purposes of certificate selection.
|
||||
//
|
||||
// auth-broker does not use SNI and instead uses the Neon-Connection-String header.
|
||||
// Auth broker has the subdomain `apiauth` we need to remove for the purposes of validating the Neon-Connection-String.
|
||||
//
|
||||
// Console Redirect proxy does not use any wildcard domains and does not need any certificate selection or conn string
|
||||
// validation, so let's we can continue with any common-name
|
||||
let common_name = if let Some(s) = common_name.strip_prefix("CN=*.") {
|
||||
s.to_string()
|
||||
} else if let Some(s) = common_name.strip_prefix("CN=apiauth.") {
|
||||
s.to_string()
|
||||
} else if let Some(s) = common_name.strip_prefix("CN=") {
|
||||
s.to_string()
|
||||
} else {
|
||||
bail!("Failed to parse common name from certificate")
|
||||
};
|
||||
|
||||
let cert = Arc::new(rustls::sign::CertifiedKey::new(cert_chain, key));
|
||||
|
||||
if is_default {
|
||||
self.default = Some((cert.clone(), tls_server_end_point));
|
||||
}
|
||||
|
||||
self.certs.insert(common_name, (cert, tls_server_end_point));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -123,82 +177,12 @@ impl CertResolver {
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_key_cert(
|
||||
key_path: &str,
|
||||
cert_path: &str,
|
||||
) -> anyhow::Result<(PrivateKeyDer<'static>, Vec<CertificateDer<'static>>)> {
|
||||
let priv_key = {
|
||||
let key_bytes = std::fs::read(key_path)
|
||||
.with_context(|| format!("Failed to read TLS keys at '{key_path}'"))?;
|
||||
rustls_pemfile::private_key(&mut &key_bytes[..])
|
||||
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
|
||||
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
|
||||
};
|
||||
|
||||
let cert_chain_bytes = std::fs::read(cert_path)
|
||||
.context(format!("Failed to read TLS cert file at '{cert_path}.'"))?;
|
||||
|
||||
let cert_chain = {
|
||||
rustls_pemfile::certs(&mut &cert_chain_bytes[..])
|
||||
.try_collect()
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to read TLS certificate chain from bytes from file at '{cert_path}'."
|
||||
)
|
||||
})?
|
||||
};
|
||||
|
||||
Ok((priv_key, cert_chain))
|
||||
}
|
||||
|
||||
fn process_key_cert(
|
||||
priv_key: PrivateKeyDer<'static>,
|
||||
cert_chain: Vec<CertificateDer<'static>>,
|
||||
) -> anyhow::Result<(String, Arc<CertifiedKey>, TlsServerEndPoint)> {
|
||||
let key = sign::any_supported_type(&priv_key).context("invalid private key")?;
|
||||
|
||||
let first_cert = &cert_chain[0];
|
||||
let tls_server_end_point = TlsServerEndPoint::new(first_cert)?;
|
||||
|
||||
let certificate = SliceReader::new(first_cert)
|
||||
.context("Failed to parse cerficiate")?
|
||||
.decode::<x509_cert::Certificate>()
|
||||
.context("Failed to parse cerficiate")?;
|
||||
|
||||
let common_name = certificate.tbs_certificate.subject.to_string();
|
||||
|
||||
// We need to get the canonical name for this certificate so we can match them against any domain names
|
||||
// seen within the proxy codebase.
|
||||
//
|
||||
// In scram-proxy we use wildcard certificates only, with the database endpoint as the wildcard subdomain, taken from SNI.
|
||||
// We need to remove the wildcard prefix for the purposes of certificate selection.
|
||||
//
|
||||
// auth-broker does not use SNI and instead uses the Neon-Connection-String header.
|
||||
// Auth broker has the subdomain `apiauth` we need to remove for the purposes of validating the Neon-Connection-String.
|
||||
//
|
||||
// Console Redirect proxy does not use any wildcard domains and does not need any certificate selection or conn string
|
||||
// validation, so let's we can continue with any common-name
|
||||
let common_name = if let Some(s) = common_name.strip_prefix("CN=*.") {
|
||||
s.to_string()
|
||||
} else if let Some(s) = common_name.strip_prefix("CN=apiauth.") {
|
||||
s.to_string()
|
||||
} else if let Some(s) = common_name.strip_prefix("CN=") {
|
||||
s.to_string()
|
||||
} else {
|
||||
bail!("Failed to parse common name from certificate")
|
||||
};
|
||||
|
||||
let cert = Arc::new(rustls::sign::CertifiedKey::new(cert_chain, key));
|
||||
|
||||
Ok((common_name, cert, tls_server_end_point))
|
||||
}
|
||||
|
||||
impl rustls::server::ResolvesServerCert for CertResolver {
|
||||
fn resolve(
|
||||
&self,
|
||||
client_hello: rustls::server::ClientHello<'_>,
|
||||
) -> Option<Arc<rustls::sign::CertifiedKey>> {
|
||||
Some(self.resolve(client_hello.server_name()).0)
|
||||
self.resolve(client_hello.server_name()).map(|x| x.0)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -206,7 +190,7 @@ impl CertResolver {
|
||||
pub fn resolve(
|
||||
&self,
|
||||
server_name: Option<&str>,
|
||||
) -> (Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint) {
|
||||
) -> Option<(Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint)> {
|
||||
// loop here and cut off more and more subdomains until we find
|
||||
// a match to get a proper wildcard support. OTOH, we now do not
|
||||
// use nested domains, so keep this simple for now.
|
||||
@@ -216,17 +200,12 @@ impl CertResolver {
|
||||
if let Some(mut sni_name) = server_name {
|
||||
loop {
|
||||
if let Some(cert) = self.certs.get(sni_name) {
|
||||
return cert.clone();
|
||||
return Some(cert.clone());
|
||||
}
|
||||
if let Some((_, rest)) = sni_name.split_once('.') {
|
||||
sni_name = rest;
|
||||
} else {
|
||||
// The customer has some custom DNS mapping - just return
|
||||
// a default certificate.
|
||||
//
|
||||
// This will error if the customer uses anything stronger
|
||||
// than sslmode=require. That's a choice they can make.
|
||||
return self.default.clone();
|
||||
return None;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
||||
@@ -1279,8 +1279,7 @@ class NeonEnv:
|
||||
)
|
||||
|
||||
tenant_config = ps_cfg.setdefault("tenant_config", {})
|
||||
# This feature is pending rollout.
|
||||
# tenant_config["rel_size_v2_enabled"] = True
|
||||
tenant_config["rel_size_v2_enabled"] = True # Enable relsize_v2 by default in tests
|
||||
|
||||
if self.pageserver_remote_storage is not None:
|
||||
ps_cfg["remote_storage"] = remote_storage_to_toml_dict(
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import math # Add this import
|
||||
import os
|
||||
import time
|
||||
import traceback
|
||||
from pathlib import Path
|
||||
@@ -88,10 +87,7 @@ def test_cumulative_statistics_persistence(
|
||||
- insert additional tuples that by itself are not enough to trigger auto-vacuum but in combination with the previous tuples are
|
||||
- verify that autovacuum is triggered by the combination of tuples inserted before and after endpoint suspension
|
||||
"""
|
||||
project = neon_api.create_project(
|
||||
pg_version,
|
||||
f"Test cumulative statistics persistence, GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}",
|
||||
)
|
||||
project = neon_api.create_project(pg_version)
|
||||
project_id = project["project"]["id"]
|
||||
neon_api.wait_for_operation_to_finish(project_id)
|
||||
endpoint_id = project["endpoints"][0]["id"]
|
||||
|
||||
@@ -62,9 +62,7 @@ def test_ro_replica_lag(
|
||||
|
||||
pgbench_duration = f"-T{test_duration_min * 60 * 2}"
|
||||
|
||||
project = neon_api.create_project(
|
||||
pg_version, f"Test readonly replica lag, GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}"
|
||||
)
|
||||
project = neon_api.create_project(pg_version)
|
||||
project_id = project["project"]["id"]
|
||||
log.info("Project ID: %s", project_id)
|
||||
log.info("Primary endpoint ID: %s", project["endpoints"][0]["id"])
|
||||
@@ -197,9 +195,7 @@ def test_replication_start_stop(
|
||||
pgbench_duration = f"-T{2**num_replicas * configuration_test_time_sec}"
|
||||
error_occurred = False
|
||||
|
||||
project = neon_api.create_project(
|
||||
pg_version, f"Test replication start stop, GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}"
|
||||
)
|
||||
project = neon_api.create_project(pg_version)
|
||||
project_id = project["project"]["id"]
|
||||
log.info("Project ID: %s", project_id)
|
||||
log.info("Primary endpoint ID: %s", project["endpoints"][0]["id"])
|
||||
|
||||
@@ -186,7 +186,7 @@ def test_fully_custom_config(positive_env: NeonEnv):
|
||||
"type": "interpreted",
|
||||
"args": {"format": "bincode", "compression": {"zstd": {"level": 1}}},
|
||||
},
|
||||
"rel_size_v2_enabled": True,
|
||||
"rel_size_v2_enabled": False, # test suite enables it by default as of https://github.com/neondatabase/neon/issues/11081, so, custom config means disabling it
|
||||
"gc_compaction_enabled": True,
|
||||
"gc_compaction_verification": False,
|
||||
"gc_compaction_initial_threshold_kb": 1024000,
|
||||
|
||||
@@ -202,8 +202,6 @@ def test_pageserver_gc_compaction_preempt(
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=conf)
|
||||
|
||||
env.pageserver.allowed_errors.append(".*The timeline or pageserver is shutting down.*")
|
||||
env.pageserver.allowed_errors.append(".*flush task cancelled.*")
|
||||
env.pageserver.allowed_errors.append(".*failed to pipe.*")
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
@@ -231,7 +229,7 @@ def test_pageserver_gc_compaction_preempt(
|
||||
|
||||
|
||||
@skip_in_debug_build("only run with release build")
|
||||
@pytest.mark.timeout(900) # This test is slow with sanitizers enabled, especially on ARM
|
||||
@pytest.mark.timeout(600) # This test is slow with sanitizers enabled, especially on ARM
|
||||
@pytest.mark.parametrize(
|
||||
"with_branches",
|
||||
["with_branches", "no_branches"],
|
||||
|
||||
@@ -1,28 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
|
||||
|
||||
#
|
||||
# Test unlogged build for GIST index
|
||||
#
|
||||
def test_gist(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
con = endpoint.connect()
|
||||
cur = con.cursor()
|
||||
iterations = 100
|
||||
|
||||
for _ in range(iterations):
|
||||
cur.execute(
|
||||
"CREATE TABLE pvactst (i INT, a INT[], p POINT) with (autovacuum_enabled = off)"
|
||||
)
|
||||
cur.execute(
|
||||
"INSERT INTO pvactst SELECT i, array[1,2,3], point(i, i+1) FROM generate_series(1,1000) i"
|
||||
)
|
||||
cur.execute("CREATE INDEX gist_pvactst ON pvactst USING gist (p)")
|
||||
cur.execute("VACUUM pvactst")
|
||||
cur.execute("DROP TABLE pvactst")
|
||||
@@ -471,7 +471,7 @@ def test_tx_abort_with_many_relations(
|
||||
try:
|
||||
# Rollback phase should be fast: this is one WAL record that we should process efficiently
|
||||
fut = exec.submit(rollback_and_wait)
|
||||
fut.result(timeout=15 if reldir_type == "v1" else 30)
|
||||
fut.result(timeout=15)
|
||||
except:
|
||||
exec.shutdown(wait=False, cancel_futures=True)
|
||||
raise
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: c8dab02bfc...d3c9d61fb7
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: b838c8969b...8ecb12f21d
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 05ddf212e2...37496f87b5
6
vendor/revisions.json
vendored
6
vendor/revisions.json
vendored
@@ -5,14 +5,14 @@
|
||||
],
|
||||
"v16": [
|
||||
"16.8",
|
||||
"05ddf212e2e07b788b5c8b88bdcf98630941f6ae"
|
||||
"37496f87b5324af53c56127e278ee5b1e8435253"
|
||||
],
|
||||
"v15": [
|
||||
"15.12",
|
||||
"b838c8969b7c63f3e637a769656f5f36793b797c"
|
||||
"8ecb12f21d862dfa39f7204b8f5e1c00a2a225b3"
|
||||
],
|
||||
"v14": [
|
||||
"14.17",
|
||||
"c8dab02bfc003ae7bd59096919042d7840f3c194"
|
||||
"d3c9d61fb7a362a165dac7060819dd9d6ad68c28"
|
||||
]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user