mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-23 21:30:36 +00:00
Compare commits
4 Commits
release-36
...
neon_basic
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ec672723fa | ||
|
|
b9aa38358f | ||
|
|
d4b64b9ef7 | ||
|
|
b6f5c395cb |
2
.github/workflows/build_and_test.yml
vendored
2
.github/workflows/build_and_test.yml
vendored
@@ -955,7 +955,7 @@ jobs:
|
||||
version: [ v14, v15 ]
|
||||
|
||||
env:
|
||||
EXTENSIONS_IMAGE: ${{ github.ref_name == 'release' && '093970136003' || '369495373322'}}.dkr.ecr.eu-central-1.amazonaws.com/extensions-${{ matrix.version }}:latest
|
||||
EXTENSIONS_IMAGE: ${{ github.ref_name == 'release' && '093970136003' || '369495373322'}}.dkr.ecr.eu-central-1.amazonaws.com/extensions-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }}
|
||||
AWS_ACCESS_KEY_ID: ${{ github.ref_name == 'release' && secrets.AWS_ACCESS_KEY_PROD || secrets.AWS_ACCESS_KEY_DEV }}
|
||||
AWS_SECRET_ACCESS_KEY: ${{ github.ref_name == 'release' && secrets.AWS_SECRET_KEY_PROD || secrets.AWS_SECRET_KEY_DEV }}
|
||||
S3_BUCKETS: ${{ github.ref_name == 'release' && vars.S3_EXTENSIONS_BUCKETS_PROD || vars.S3_EXTENSIONS_BUCKETS_DEV }}
|
||||
|
||||
@@ -551,8 +551,10 @@ FROM build-deps AS pg-embedding-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
ENV PATH "/usr/local/pgsql/bin/:$PATH"
|
||||
RUN wget https://github.com/neondatabase/pg_embedding/archive/refs/tags/0.3.1.tar.gz -O pg_embedding.tar.gz && \
|
||||
echo "c4ae84eef36fa8ec5868f6e061f39812f19ee5ba3604d428d40935685c7be512 pg_embedding.tar.gz" | sha256sum --check && \
|
||||
# eeb3ba7c3a60c95b2604dd543c64b2f1bb4a3703 made on 15/07/2023
|
||||
# There is no release tag yet
|
||||
RUN wget https://github.com/neondatabase/pg_embedding/archive/eeb3ba7c3a60c95b2604dd543c64b2f1bb4a3703.tar.gz -O pg_embedding.tar.gz && \
|
||||
echo "030846df723652f99a8689ce63b66fa0c23477a7fd723533ab8a6b28ab70730f pg_embedding.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_embedding-src && cd pg_embedding-src && tar xvzf ../pg_embedding.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
|
||||
2
Makefile
2
Makefile
@@ -108,8 +108,6 @@ postgres-%: postgres-configure-% \
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pg_buffercache install
|
||||
+@echo "Compiling pageinspect $*"
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pageinspect install
|
||||
+@echo "Compiling amcheck $*"
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/amcheck install
|
||||
|
||||
.PHONY: postgres-clean-%
|
||||
postgres-clean-%:
|
||||
|
||||
@@ -193,13 +193,6 @@ fn main() -> Result<()> {
|
||||
if !spec_set {
|
||||
// No spec provided, hang waiting for it.
|
||||
info!("no compute spec provided, waiting");
|
||||
|
||||
// TODO this can stall startups in the unlikely event that we bind
|
||||
// this compute node while it's busy prewarming. It's not too
|
||||
// bad because it's just 100ms and unlikely, but it's an
|
||||
// avoidable problem.
|
||||
compute.prewarm_postgres()?;
|
||||
|
||||
let mut state = compute.state.lock().unwrap();
|
||||
while state.status != ComputeStatus::ConfigurationPending {
|
||||
state = compute.state_changed.wait(state).unwrap();
|
||||
|
||||
@@ -532,50 +532,6 @@ impl ComputeNode {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Start and stop a postgres process to warm up the VM for startup.
|
||||
pub fn prewarm_postgres(&self) -> Result<()> {
|
||||
info!("prewarming");
|
||||
|
||||
// Create pgdata
|
||||
let pgdata = &format!("{}.warmup", self.pgdata);
|
||||
create_pgdata(pgdata)?;
|
||||
|
||||
// Run initdb to completion
|
||||
info!("running initdb");
|
||||
let initdb_bin = Path::new(&self.pgbin).parent().unwrap().join("initdb");
|
||||
Command::new(initdb_bin)
|
||||
.args(["-D", pgdata])
|
||||
.output()
|
||||
.expect("cannot start initdb process");
|
||||
|
||||
// Write conf
|
||||
use std::io::Write;
|
||||
let conf_path = Path::new(pgdata).join("postgresql.conf");
|
||||
let mut file = std::fs::File::create(conf_path)?;
|
||||
writeln!(file, "shared_buffers=65536")?;
|
||||
writeln!(file, "port=51055")?; // Nobody should be connecting
|
||||
writeln!(file, "shared_preload_libraries = 'neon'")?;
|
||||
|
||||
// Start postgres
|
||||
info!("starting postgres");
|
||||
let mut pg = Command::new(&self.pgbin)
|
||||
.args(["-D", pgdata])
|
||||
.spawn()
|
||||
.expect("cannot start postgres process");
|
||||
|
||||
// Stop it when it's ready
|
||||
info!("waiting for postgres");
|
||||
wait_for_postgres(&mut pg, Path::new(pgdata))?;
|
||||
pg.kill()?;
|
||||
info!("sent kill signal");
|
||||
pg.wait()?;
|
||||
info!("done prewarming");
|
||||
|
||||
// clean up
|
||||
let _ok = fs::remove_dir_all(pgdata);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Start Postgres as a child process and manage DBs/roles.
|
||||
/// After that this will hang waiting on the postmaster process to exit.
|
||||
#[instrument(skip_all)]
|
||||
|
||||
@@ -652,8 +652,8 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
)?;
|
||||
}
|
||||
"start" => {
|
||||
let pg_port: Option<u16> = sub_args.get_one::<u16>("pg-port").copied();
|
||||
let http_port: Option<u16> = sub_args.get_one::<u16>("http-port").copied();
|
||||
// let pg_port: Option<u16> = sub_args.get_one::<u16>("pg-port").copied();
|
||||
// let http_port: Option<u16> = sub_args.get_one::<u16>("http-port").copied();
|
||||
let endpoint_id = sub_args
|
||||
.get_one::<String>("endpoint_id")
|
||||
.ok_or_else(|| anyhow!("No endpoint ID was provided to start"))?;
|
||||
@@ -673,7 +673,10 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
env.safekeepers.iter().map(|sk| sk.id).collect()
|
||||
};
|
||||
|
||||
let endpoint = cplane.endpoints.get(endpoint_id.as_str());
|
||||
let endpoint = cplane
|
||||
.endpoints
|
||||
.get(endpoint_id.as_str())
|
||||
.ok_or_else(|| anyhow::anyhow!("endpoint {endpoint_id} not found"))?;
|
||||
|
||||
let auth_token = if matches!(env.pageserver.pg_auth_type, AuthType::NeonJWT) {
|
||||
let claims = Claims::new(Some(tenant_id), Scope::Tenant);
|
||||
@@ -688,63 +691,17 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
.copied()
|
||||
.unwrap_or(false);
|
||||
|
||||
if let Some(endpoint) = endpoint {
|
||||
match (&endpoint.mode, hot_standby) {
|
||||
(ComputeMode::Static(_), true) => {
|
||||
bail!("Cannot start a node in hot standby mode when it is already configured as a static replica")
|
||||
}
|
||||
(ComputeMode::Primary, true) => {
|
||||
bail!("Cannot start a node as a hot standby replica, it is already configured as primary node")
|
||||
}
|
||||
_ => {}
|
||||
match (&endpoint.mode, hot_standby) {
|
||||
(ComputeMode::Static(_), true) => {
|
||||
bail!("Cannot start a node in hot standby mode when it is already configured as a static replica")
|
||||
}
|
||||
println!("Starting existing endpoint {endpoint_id}...");
|
||||
endpoint.start(&auth_token, safekeepers)?;
|
||||
} else {
|
||||
let branch_name = sub_args
|
||||
.get_one::<String>("branch-name")
|
||||
.map(|s| s.as_str())
|
||||
.unwrap_or(DEFAULT_BRANCH_NAME);
|
||||
let timeline_id = env
|
||||
.get_branch_timeline_id(branch_name, tenant_id)
|
||||
.ok_or_else(|| {
|
||||
anyhow!("Found no timeline id for branch name '{branch_name}'")
|
||||
})?;
|
||||
let lsn = sub_args
|
||||
.get_one::<String>("lsn")
|
||||
.map(|lsn_str| Lsn::from_str(lsn_str))
|
||||
.transpose()
|
||||
.context("Failed to parse Lsn from the request")?;
|
||||
let pg_version = sub_args
|
||||
.get_one::<u32>("pg-version")
|
||||
.copied()
|
||||
.context("Failed to `pg-version` from the argument string")?;
|
||||
|
||||
let mode = match (lsn, hot_standby) {
|
||||
(Some(lsn), false) => ComputeMode::Static(lsn),
|
||||
(None, true) => ComputeMode::Replica,
|
||||
(None, false) => ComputeMode::Primary,
|
||||
(Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"),
|
||||
};
|
||||
|
||||
// when used with custom port this results in non obvious behaviour
|
||||
// port is remembered from first start command, i e
|
||||
// start --port X
|
||||
// stop
|
||||
// start <-- will also use port X even without explicit port argument
|
||||
println!("Starting new endpoint {endpoint_id} (PostgreSQL v{pg_version}) on timeline {timeline_id} ...");
|
||||
|
||||
let ep = cplane.new_endpoint(
|
||||
endpoint_id,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
pg_port,
|
||||
http_port,
|
||||
pg_version,
|
||||
mode,
|
||||
)?;
|
||||
ep.start(&auth_token, safekeepers)?;
|
||||
(ComputeMode::Primary, true) => {
|
||||
bail!("Cannot start a node as a hot standby replica, it is already configured as primary node")
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
println!("Starting existing endpoint {endpoint_id}...");
|
||||
endpoint.start(&auth_token, safekeepers)?;
|
||||
}
|
||||
"stop" => {
|
||||
let endpoint_id = sub_args
|
||||
|
||||
@@ -128,6 +128,20 @@ impl ComputeControlPlane {
|
||||
) -> Result<Arc<Endpoint>> {
|
||||
let pg_port = pg_port.unwrap_or_else(|| self.get_port());
|
||||
let http_port = http_port.unwrap_or_else(|| self.get_port() + 1);
|
||||
|
||||
if matches!(mode, ComputeMode::Primary) {
|
||||
// this check is not complete, as you could have a concurrent attempt at
|
||||
// creating another primary, both reading the state before checking it here,
|
||||
// but it's better than nothing.
|
||||
let mut duplicates = self.endpoints.iter().filter(|(_k, v)| {
|
||||
v.tenant_id == tenant_id && v.timeline_id == timeline_id && v.mode == mode
|
||||
});
|
||||
|
||||
if let Some((key, _)) = duplicates.next() {
|
||||
bail!("attempting to create a duplicate primary endpoint on tenant {tenant_id}, timeline {timeline_id}: endpoint {key:?} exists already. please don't do this, it is not supported.");
|
||||
}
|
||||
}
|
||||
|
||||
let ep = Arc::new(Endpoint {
|
||||
endpoint_id: endpoint_id.to_owned(),
|
||||
pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), pg_port),
|
||||
@@ -564,7 +578,9 @@ impl Endpoint {
|
||||
}
|
||||
Err(e) => {
|
||||
if attempt == MAX_ATTEMPTS {
|
||||
return Err(e).context("timed out waiting to connect to compute_ctl HTTP");
|
||||
return Err(e).context(
|
||||
"timed out waiting to connect to compute_ctl HTTP; last error: {e}",
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ use chrono::{DateTime, Utc};
|
||||
use rand::Rng;
|
||||
use serde::Serialize;
|
||||
|
||||
#[derive(Serialize, Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
|
||||
#[derive(Serialize, Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
|
||||
#[serde(tag = "type")]
|
||||
pub enum EventType {
|
||||
#[serde(rename = "absolute")]
|
||||
@@ -17,32 +17,6 @@ pub enum EventType {
|
||||
},
|
||||
}
|
||||
|
||||
impl EventType {
|
||||
pub fn absolute_time(&self) -> Option<&DateTime<Utc>> {
|
||||
use EventType::*;
|
||||
match self {
|
||||
Absolute { time } => Some(time),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn incremental_timerange(&self) -> Option<std::ops::Range<&DateTime<Utc>>> {
|
||||
// these can most likely be thought of as Range or RangeFull
|
||||
use EventType::*;
|
||||
match self {
|
||||
Incremental {
|
||||
start_time,
|
||||
stop_time,
|
||||
} => Some(start_time..stop_time),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_incremental(&self) -> bool {
|
||||
matches!(self, EventType::Incremental { .. })
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
|
||||
pub struct Event<Extra> {
|
||||
#[serde(flatten)]
|
||||
|
||||
@@ -200,17 +200,13 @@ impl S3Bucket {
|
||||
)
|
||||
}
|
||||
|
||||
pub fn relative_path_to_s3_object(&self, path: &RemotePath) -> String {
|
||||
assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
|
||||
let path_string = path
|
||||
.get_path()
|
||||
.to_string_lossy()
|
||||
.trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR)
|
||||
.to_string();
|
||||
match &self.prefix_in_bucket {
|
||||
Some(prefix) => prefix.clone() + "/" + &path_string,
|
||||
None => path_string,
|
||||
fn relative_path_to_s3_object(&self, path: &RemotePath) -> String {
|
||||
let mut full_path = self.prefix_in_bucket.clone().unwrap_or_default();
|
||||
for segment in path.0.iter() {
|
||||
full_path.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
|
||||
full_path.push_str(segment.to_str().unwrap_or_default());
|
||||
}
|
||||
full_path
|
||||
}
|
||||
|
||||
async fn download_object(&self, request: GetObjectRequest) -> Result<Download, DownloadError> {
|
||||
@@ -431,12 +427,10 @@ impl RemoteStorage for S3Bucket {
|
||||
}
|
||||
|
||||
async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
|
||||
// if prefix is not none then download file `prefix/from`
|
||||
// if prefix is none then download file `from`
|
||||
self.download_object(GetObjectRequest {
|
||||
bucket: self.bucket_name.clone(),
|
||||
key: self.relative_path_to_s3_object(from),
|
||||
range: None,
|
||||
..GetObjectRequest::default()
|
||||
})
|
||||
.await
|
||||
}
|
||||
@@ -529,63 +523,3 @@ impl RemoteStorage for S3Bucket {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::num::NonZeroUsize;
|
||||
use std::path::Path;
|
||||
|
||||
use crate::{RemotePath, S3Bucket, S3Config};
|
||||
|
||||
#[test]
|
||||
fn relative_path() {
|
||||
let all_paths = vec!["", "some/path", "some/path/"];
|
||||
let all_paths: Vec<RemotePath> = all_paths
|
||||
.iter()
|
||||
.map(|x| RemotePath::new(Path::new(x)).expect("bad path"))
|
||||
.collect();
|
||||
let prefixes = [
|
||||
None,
|
||||
Some(""),
|
||||
Some("test/prefix"),
|
||||
Some("test/prefix/"),
|
||||
Some("/test/prefix/"),
|
||||
];
|
||||
let expected_outputs = vec![
|
||||
vec!["", "some/path", "some/path"],
|
||||
vec!["/", "/some/path", "/some/path"],
|
||||
vec![
|
||||
"test/prefix/",
|
||||
"test/prefix/some/path",
|
||||
"test/prefix/some/path",
|
||||
],
|
||||
vec![
|
||||
"test/prefix/",
|
||||
"test/prefix/some/path",
|
||||
"test/prefix/some/path",
|
||||
],
|
||||
vec![
|
||||
"test/prefix/",
|
||||
"test/prefix/some/path",
|
||||
"test/prefix/some/path",
|
||||
],
|
||||
];
|
||||
|
||||
for (prefix_idx, prefix) in prefixes.iter().enumerate() {
|
||||
let config = S3Config {
|
||||
bucket_name: "bucket".to_owned(),
|
||||
bucket_region: "region".to_owned(),
|
||||
prefix_in_bucket: prefix.map(str::to_string),
|
||||
endpoint: None,
|
||||
concurrency_limit: NonZeroUsize::new(100).unwrap(),
|
||||
max_keys_per_list_response: Some(5),
|
||||
};
|
||||
let storage = S3Bucket::new(&config).expect("remote storage init");
|
||||
for (test_path_idx, test_path) in all_paths.iter().enumerate() {
|
||||
let result = storage.relative_path_to_s3_object(test_path);
|
||||
let expected = expected_outputs[prefix_idx][test_path_idx];
|
||||
assert_eq!(result, expected);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ static LOGGING_DONE: OnceCell<()> = OnceCell::new();
|
||||
|
||||
const ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_S3_REMOTE_STORAGE";
|
||||
|
||||
const BASE_PREFIX: &str = "test";
|
||||
const BASE_PREFIX: &str = "test/";
|
||||
|
||||
/// Tests that S3 client can list all prefixes, even if the response come paginated and requires multiple S3 queries.
|
||||
/// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified.
|
||||
|
||||
@@ -7,7 +7,7 @@ use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
|
||||
use crate::tenant::{mgr, LogicalSizeCalculationCause};
|
||||
use anyhow;
|
||||
use chrono::{DateTime, Utc};
|
||||
use chrono::Utc;
|
||||
use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
|
||||
use pageserver_api::models::TenantState;
|
||||
use reqwest::Url;
|
||||
@@ -18,6 +18,12 @@ use std::time::Duration;
|
||||
use tracing::*;
|
||||
use utils::id::{NodeId, TenantId, TimelineId};
|
||||
|
||||
const WRITTEN_SIZE: &str = "written_size";
|
||||
const SYNTHETIC_STORAGE_SIZE: &str = "synthetic_storage_size";
|
||||
const RESIDENT_SIZE: &str = "resident_size";
|
||||
const REMOTE_STORAGE_SIZE: &str = "remote_storage_size";
|
||||
const TIMELINE_LOGICAL_SIZE: &str = "timeline_logical_size";
|
||||
|
||||
const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
|
||||
#[serde_as]
|
||||
@@ -38,121 +44,6 @@ pub struct PageserverConsumptionMetricsKey {
|
||||
pub metric: &'static str,
|
||||
}
|
||||
|
||||
impl PageserverConsumptionMetricsKey {
|
||||
const fn absolute_values(self) -> AbsoluteValueFactory {
|
||||
AbsoluteValueFactory(self)
|
||||
}
|
||||
const fn incremental_values(self) -> IncrementalValueFactory {
|
||||
IncrementalValueFactory(self)
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper type which each individual metric kind can return to produce only absolute values.
|
||||
struct AbsoluteValueFactory(PageserverConsumptionMetricsKey);
|
||||
|
||||
impl AbsoluteValueFactory {
|
||||
fn now(self, val: u64) -> (PageserverConsumptionMetricsKey, (EventType, u64)) {
|
||||
let key = self.0;
|
||||
let time = Utc::now();
|
||||
(key, (EventType::Absolute { time }, val))
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper type which each individual metric kind can return to produce only incremental values.
|
||||
struct IncrementalValueFactory(PageserverConsumptionMetricsKey);
|
||||
|
||||
impl IncrementalValueFactory {
|
||||
#[allow(clippy::wrong_self_convention)]
|
||||
fn from_previous_up_to(
|
||||
self,
|
||||
prev_end: DateTime<Utc>,
|
||||
up_to: DateTime<Utc>,
|
||||
val: u64,
|
||||
) -> (PageserverConsumptionMetricsKey, (EventType, u64)) {
|
||||
let key = self.0;
|
||||
// cannot assert prev_end < up_to because these are realtime clock based
|
||||
(
|
||||
key,
|
||||
(
|
||||
EventType::Incremental {
|
||||
start_time: prev_end,
|
||||
stop_time: up_to,
|
||||
},
|
||||
val,
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
fn key(&self) -> &PageserverConsumptionMetricsKey {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
// the static part of a PageserverConsumptionMetricsKey
|
||||
impl PageserverConsumptionMetricsKey {
|
||||
const fn written_size(tenant_id: TenantId, timeline_id: TimelineId) -> AbsoluteValueFactory {
|
||||
PageserverConsumptionMetricsKey {
|
||||
tenant_id,
|
||||
timeline_id: Some(timeline_id),
|
||||
metric: "written_size",
|
||||
}
|
||||
.absolute_values()
|
||||
}
|
||||
|
||||
/// Values will be the difference of the latest written_size (last_record_lsn) to what we
|
||||
/// previously sent.
|
||||
const fn written_size_delta(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> IncrementalValueFactory {
|
||||
PageserverConsumptionMetricsKey {
|
||||
tenant_id,
|
||||
timeline_id: Some(timeline_id),
|
||||
metric: "written_size_bytes_delta",
|
||||
}
|
||||
.incremental_values()
|
||||
}
|
||||
|
||||
const fn timeline_logical_size(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> AbsoluteValueFactory {
|
||||
PageserverConsumptionMetricsKey {
|
||||
tenant_id,
|
||||
timeline_id: Some(timeline_id),
|
||||
metric: "timeline_logical_size",
|
||||
}
|
||||
.absolute_values()
|
||||
}
|
||||
|
||||
const fn remote_storage_size(tenant_id: TenantId) -> AbsoluteValueFactory {
|
||||
PageserverConsumptionMetricsKey {
|
||||
tenant_id,
|
||||
timeline_id: None,
|
||||
metric: "remote_storage_size",
|
||||
}
|
||||
.absolute_values()
|
||||
}
|
||||
|
||||
const fn resident_size(tenant_id: TenantId) -> AbsoluteValueFactory {
|
||||
PageserverConsumptionMetricsKey {
|
||||
tenant_id,
|
||||
timeline_id: None,
|
||||
metric: "resident_size",
|
||||
}
|
||||
.absolute_values()
|
||||
}
|
||||
|
||||
const fn synthetic_size(tenant_id: TenantId) -> AbsoluteValueFactory {
|
||||
PageserverConsumptionMetricsKey {
|
||||
tenant_id,
|
||||
timeline_id: None,
|
||||
metric: "synthetic_storage_size",
|
||||
}
|
||||
.absolute_values()
|
||||
}
|
||||
}
|
||||
|
||||
/// Main thread that serves metrics collection
|
||||
pub async fn collect_metrics(
|
||||
metric_collection_endpoint: &Url,
|
||||
@@ -188,7 +79,7 @@ pub async fn collect_metrics(
|
||||
.timeout(DEFAULT_HTTP_REPORTING_TIMEOUT)
|
||||
.build()
|
||||
.expect("Failed to create http client with timeout");
|
||||
let mut cached_metrics = HashMap::new();
|
||||
let mut cached_metrics: HashMap<PageserverConsumptionMetricsKey, u64> = HashMap::new();
|
||||
let mut prev_iteration_time: std::time::Instant = std::time::Instant::now();
|
||||
|
||||
loop {
|
||||
@@ -230,13 +121,13 @@ pub async fn collect_metrics(
|
||||
/// - refactor this function (chunking+sending part) to reuse it in proxy module;
|
||||
pub async fn collect_metrics_iteration(
|
||||
client: &reqwest::Client,
|
||||
cached_metrics: &mut HashMap<PageserverConsumptionMetricsKey, (EventType, u64)>,
|
||||
cached_metrics: &mut HashMap<PageserverConsumptionMetricsKey, u64>,
|
||||
metric_collection_endpoint: &reqwest::Url,
|
||||
node_id: NodeId,
|
||||
ctx: &RequestContext,
|
||||
send_cached: bool,
|
||||
) {
|
||||
let mut current_metrics: Vec<(PageserverConsumptionMetricsKey, (EventType, u64))> = Vec::new();
|
||||
let mut current_metrics: Vec<(PageserverConsumptionMetricsKey, u64)> = Vec::new();
|
||||
trace!(
|
||||
"starting collect_metrics_iteration. metric_collection_endpoint: {}",
|
||||
metric_collection_endpoint
|
||||
@@ -275,80 +166,27 @@ pub async fn collect_metrics_iteration(
|
||||
if timeline.is_active() {
|
||||
let timeline_written_size = u64::from(timeline.get_last_record_lsn());
|
||||
|
||||
let (key, written_size_now) =
|
||||
PageserverConsumptionMetricsKey::written_size(tenant_id, timeline.timeline_id)
|
||||
.now(timeline_written_size);
|
||||
|
||||
// last_record_lsn can only go up, right now at least, TODO: #2592 or related
|
||||
// features might change this.
|
||||
|
||||
let written_size_delta_key = PageserverConsumptionMetricsKey::written_size_delta(
|
||||
tenant_id,
|
||||
timeline.timeline_id,
|
||||
);
|
||||
|
||||
// use this when available, because in a stream of incremental values, it will be
|
||||
// accurate where as when last_record_lsn stops moving, we will only cache the last
|
||||
// one of those.
|
||||
let last_stop_time =
|
||||
cached_metrics
|
||||
.get(written_size_delta_key.key())
|
||||
.map(|(until, _val)| {
|
||||
until
|
||||
.incremental_timerange()
|
||||
.expect("never create EventType::Absolute for written_size_delta")
|
||||
.end
|
||||
});
|
||||
|
||||
// by default, use the last sent written_size as the basis for
|
||||
// calculating the delta. if we don't yet have one, use the load time value.
|
||||
let prev = cached_metrics
|
||||
.get(&key)
|
||||
.map(|(prev_at, prev)| {
|
||||
// use the prev time from our last incremental update, or default to latest
|
||||
// absolute update on the first round.
|
||||
let prev_at = prev_at
|
||||
.absolute_time()
|
||||
.expect("never create EventType::Incremental for written_size");
|
||||
let prev_at = last_stop_time.unwrap_or(prev_at);
|
||||
(*prev_at, *prev)
|
||||
})
|
||||
.unwrap_or_else(|| {
|
||||
// if we don't have a previous point of comparison, compare to the load time
|
||||
// lsn.
|
||||
let (disk_consistent_lsn, loaded_at) = &timeline.loaded_at;
|
||||
(DateTime::from(*loaded_at), disk_consistent_lsn.0)
|
||||
});
|
||||
|
||||
// written_size_delta_bytes
|
||||
current_metrics.extend(
|
||||
if let Some(delta) = written_size_now.1.checked_sub(prev.1) {
|
||||
let up_to = written_size_now
|
||||
.0
|
||||
.absolute_time()
|
||||
.expect("never create EventType::Incremental for written_size");
|
||||
let key_value =
|
||||
written_size_delta_key.from_previous_up_to(prev.0, *up_to, delta);
|
||||
Some(key_value)
|
||||
} else {
|
||||
None
|
||||
current_metrics.push((
|
||||
PageserverConsumptionMetricsKey {
|
||||
tenant_id,
|
||||
timeline_id: Some(timeline.timeline_id),
|
||||
metric: WRITTEN_SIZE,
|
||||
},
|
||||
);
|
||||
|
||||
// written_size
|
||||
current_metrics.push((key, written_size_now));
|
||||
timeline_written_size,
|
||||
));
|
||||
|
||||
let span = info_span!("collect_metrics_iteration", tenant_id = %timeline.tenant_id, timeline_id = %timeline.timeline_id);
|
||||
match span.in_scope(|| timeline.get_current_logical_size(ctx)) {
|
||||
// Only send timeline logical size when it is fully calculated.
|
||||
Ok((size, is_exact)) if is_exact => {
|
||||
current_metrics.push(
|
||||
PageserverConsumptionMetricsKey::timeline_logical_size(
|
||||
current_metrics.push((
|
||||
PageserverConsumptionMetricsKey {
|
||||
tenant_id,
|
||||
timeline.timeline_id,
|
||||
)
|
||||
.now(size),
|
||||
);
|
||||
timeline_id: Some(timeline.timeline_id),
|
||||
metric: TIMELINE_LOGICAL_SIZE,
|
||||
},
|
||||
size,
|
||||
));
|
||||
}
|
||||
Ok((_, _)) => {}
|
||||
Err(err) => {
|
||||
@@ -367,10 +205,14 @@ pub async fn collect_metrics_iteration(
|
||||
|
||||
match tenant.get_remote_size().await {
|
||||
Ok(tenant_remote_size) => {
|
||||
current_metrics.push(
|
||||
PageserverConsumptionMetricsKey::remote_storage_size(tenant_id)
|
||||
.now(tenant_remote_size),
|
||||
);
|
||||
current_metrics.push((
|
||||
PageserverConsumptionMetricsKey {
|
||||
tenant_id,
|
||||
timeline_id: None,
|
||||
metric: REMOTE_STORAGE_SIZE,
|
||||
},
|
||||
tenant_remote_size,
|
||||
));
|
||||
}
|
||||
Err(err) => {
|
||||
error!(
|
||||
@@ -380,9 +222,14 @@ pub async fn collect_metrics_iteration(
|
||||
}
|
||||
}
|
||||
|
||||
current_metrics.push(
|
||||
PageserverConsumptionMetricsKey::resident_size(tenant_id).now(tenant_resident_size),
|
||||
);
|
||||
current_metrics.push((
|
||||
PageserverConsumptionMetricsKey {
|
||||
tenant_id,
|
||||
timeline_id: None,
|
||||
metric: RESIDENT_SIZE,
|
||||
},
|
||||
tenant_resident_size,
|
||||
));
|
||||
|
||||
// Note that this metric is calculated in a separate bgworker
|
||||
// Here we only use cached value, which may lag behind the real latest one
|
||||
@@ -390,27 +237,23 @@ pub async fn collect_metrics_iteration(
|
||||
|
||||
if tenant_synthetic_size != 0 {
|
||||
// only send non-zeroes because otherwise these show up as errors in logs
|
||||
current_metrics.push(
|
||||
PageserverConsumptionMetricsKey::synthetic_size(tenant_id)
|
||||
.now(tenant_synthetic_size),
|
||||
);
|
||||
current_metrics.push((
|
||||
PageserverConsumptionMetricsKey {
|
||||
tenant_id,
|
||||
timeline_id: None,
|
||||
metric: SYNTHETIC_STORAGE_SIZE,
|
||||
},
|
||||
tenant_synthetic_size,
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
// Filter metrics, unless we want to send all metrics, including cached ones.
|
||||
// See: https://github.com/neondatabase/neon/issues/3485
|
||||
if !send_cached {
|
||||
current_metrics.retain(|(curr_key, (kind, curr_val))| {
|
||||
if kind.is_incremental() {
|
||||
// incremental values (currently only written_size_delta) should not get any cache
|
||||
// deduplication because they will be used by upstream for "is still alive."
|
||||
true
|
||||
} else {
|
||||
match cached_metrics.get(curr_key) {
|
||||
Some((_, val)) => val != curr_val,
|
||||
None => true,
|
||||
}
|
||||
}
|
||||
current_metrics.retain(|(curr_key, curr_val)| match cached_metrics.get(curr_key) {
|
||||
Some(val) => val != curr_val,
|
||||
None => true,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -429,8 +272,8 @@ pub async fn collect_metrics_iteration(
|
||||
chunk_to_send.clear();
|
||||
|
||||
// enrich metrics with type,timestamp and idempotency key before sending
|
||||
chunk_to_send.extend(chunk.iter().map(|(curr_key, (when, curr_val))| Event {
|
||||
kind: *when,
|
||||
chunk_to_send.extend(chunk.iter().map(|(curr_key, curr_val)| Event {
|
||||
kind: EventType::Absolute { time: Utc::now() },
|
||||
metric: curr_key.metric,
|
||||
idempotency_key: idempotency_key(node_id.to_string()),
|
||||
value: *curr_val,
|
||||
|
||||
@@ -46,6 +46,7 @@ use std::sync::{Mutex, RwLock};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use self::config::TenantConf;
|
||||
use self::delete::DeleteTimelineFlow;
|
||||
use self::metadata::LoadMetadataError;
|
||||
use self::metadata::TimelineMetadata;
|
||||
use self::remote_timeline_client::RemoteTimelineClient;
|
||||
@@ -69,7 +70,6 @@ use crate::tenant::storage_layer::ImageLayer;
|
||||
use crate::tenant::storage_layer::Layer;
|
||||
use crate::InitializationOrder;
|
||||
|
||||
use crate::tenant::timeline::delete::DeleteTimelineFlow;
|
||||
use crate::tenant::timeline::uninit::cleanup_timeline_directory;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use crate::walredo::PostgresRedoManager;
|
||||
@@ -117,6 +117,7 @@ mod remote_timeline_client;
|
||||
pub mod storage_layer;
|
||||
|
||||
pub mod config;
|
||||
pub mod delete;
|
||||
pub mod mgr;
|
||||
pub mod tasks;
|
||||
pub mod upload_queue;
|
||||
|
||||
@@ -15,17 +15,15 @@ use utils::{
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
task_mgr::{self, TaskKind},
|
||||
tenant::{
|
||||
metadata::TimelineMetadata,
|
||||
remote_timeline_client::{
|
||||
self, PersistIndexPartWithDeletedFlagError, RemoteTimelineClient,
|
||||
},
|
||||
CreateTimelineCause, DeleteTimelineError, Tenant,
|
||||
},
|
||||
tenant::{remote_timeline_client, DeleteTimelineError},
|
||||
InitializationOrder,
|
||||
};
|
||||
|
||||
use super::Timeline;
|
||||
use super::{
|
||||
metadata::TimelineMetadata,
|
||||
remote_timeline_client::{PersistIndexPartWithDeletedFlagError, RemoteTimelineClient},
|
||||
CreateTimelineCause, Tenant, Timeline,
|
||||
};
|
||||
|
||||
/// Now that the Timeline is in Stopping state, request all the related tasks to shut down.
|
||||
async fn stop_tasks(timeline: &Timeline) -> Result<(), DeleteTimelineError> {
|
||||
@@ -390,42 +390,39 @@ where
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub async fn dump(&self) -> Result<()> {
|
||||
let mut stack = Vec::new();
|
||||
pub fn dump(&self) -> Result<()> {
|
||||
self.dump_recurse(self.root_blk, &[], 0)
|
||||
}
|
||||
|
||||
stack.push((self.root_blk, String::new(), 0, 0, 0));
|
||||
fn dump_recurse(&self, blknum: u32, path: &[u8], depth: usize) -> Result<()> {
|
||||
let blk = self.reader.read_blk(self.start_blk + blknum)?;
|
||||
let buf: &[u8] = blk.as_ref();
|
||||
|
||||
while let Some((blknum, path, depth, child_idx, key_off)) = stack.pop() {
|
||||
let blk = self.reader.read_blk(self.start_blk + blknum)?;
|
||||
let buf: &[u8] = blk.as_ref();
|
||||
let node = OnDiskNode::<L>::deparse(buf)?;
|
||||
let node = OnDiskNode::<L>::deparse(buf)?;
|
||||
|
||||
if child_idx == 0 {
|
||||
print!("{:indent$}", "", indent = depth * 2);
|
||||
let path_prefix = stack
|
||||
.iter()
|
||||
.map(|(_blknum, path, ..)| path.as_str())
|
||||
.collect::<String>();
|
||||
println!(
|
||||
"blk #{blknum}: path {path_prefix}{path}: prefix {}, suffix_len {}",
|
||||
hex::encode(node.prefix),
|
||||
node.suffix_len
|
||||
);
|
||||
}
|
||||
print!("{:indent$}", "", indent = depth * 2);
|
||||
println!(
|
||||
"blk #{}: path {}: prefix {}, suffix_len {}",
|
||||
blknum,
|
||||
hex::encode(path),
|
||||
hex::encode(node.prefix),
|
||||
node.suffix_len
|
||||
);
|
||||
|
||||
if child_idx + 1 < node.num_children {
|
||||
let key_off = key_off + node.suffix_len as usize;
|
||||
stack.push((blknum, path.clone(), depth, child_idx + 1, key_off));
|
||||
}
|
||||
let mut idx = 0;
|
||||
let mut key_off = 0;
|
||||
while idx < node.num_children {
|
||||
let key = &node.keys[key_off..key_off + node.suffix_len as usize];
|
||||
let val = node.value(child_idx as usize);
|
||||
|
||||
let val = node.value(idx as usize);
|
||||
print!("{:indent$}", "", indent = depth * 2 + 2);
|
||||
println!("{}: {}", hex::encode(key), hex::encode(val.0));
|
||||
|
||||
if node.level > 0 {
|
||||
stack.push((val.to_blknum(), hex::encode(node.prefix), depth + 1, 0, 0));
|
||||
let child_path = [path, node.prefix].concat();
|
||||
self.dump_recurse(val.to_blknum(), &child_path, depth + 1)?;
|
||||
}
|
||||
idx += 1;
|
||||
key_off += node.suffix_len as usize;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -757,8 +754,8 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn basic() -> Result<()> {
|
||||
#[test]
|
||||
fn basic() -> Result<()> {
|
||||
let mut disk = TestDisk::new();
|
||||
let mut writer = DiskBtreeBuilder::<_, 6>::new(&mut disk);
|
||||
|
||||
@@ -778,7 +775,7 @@ mod tests {
|
||||
|
||||
let reader = DiskBtreeReader::new(0, root_offset, disk);
|
||||
|
||||
reader.dump().await?;
|
||||
reader.dump()?;
|
||||
|
||||
// Test the `get` function on all the keys.
|
||||
for (key, val) in all_data.iter() {
|
||||
@@ -838,8 +835,8 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn lots_of_keys() -> Result<()> {
|
||||
#[test]
|
||||
fn lots_of_keys() -> Result<()> {
|
||||
let mut disk = TestDisk::new();
|
||||
let mut writer = DiskBtreeBuilder::<_, 8>::new(&mut disk);
|
||||
|
||||
@@ -859,7 +856,7 @@ mod tests {
|
||||
|
||||
let reader = DiskBtreeReader::new(0, root_offset, disk);
|
||||
|
||||
reader.dump().await?;
|
||||
reader.dump()?;
|
||||
|
||||
use std::sync::Mutex;
|
||||
|
||||
@@ -997,8 +994,8 @@ mod tests {
|
||||
///
|
||||
/// This test contains a particular data set, see disk_btree_test_data.rs
|
||||
///
|
||||
#[tokio::test]
|
||||
async fn particular_data() -> Result<()> {
|
||||
#[test]
|
||||
fn particular_data() -> Result<()> {
|
||||
// Build a tree from it
|
||||
let mut disk = TestDisk::new();
|
||||
let mut writer = DiskBtreeBuilder::<_, 26>::new(&mut disk);
|
||||
@@ -1025,7 +1022,7 @@ mod tests {
|
||||
})?;
|
||||
assert_eq!(count, disk_btree_test_data::TEST_DATA.len());
|
||||
|
||||
reader.dump().await?;
|
||||
reader.dump()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -26,7 +26,7 @@ use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME};
|
||||
use utils::fs_ext::PathExt;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use super::timeline::delete::DeleteTimelineFlow;
|
||||
use super::delete::DeleteTimelineFlow;
|
||||
|
||||
/// The tenants known to the pageserver.
|
||||
/// The enum variants are used to distinguish the different states that the pageserver can be in.
|
||||
|
||||
@@ -223,45 +223,6 @@ mod tests {
|
||||
assert_eq!(part, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn v2_indexpart_is_parsed_with_deleted_at() {
|
||||
let example = r#"{
|
||||
"version":2,
|
||||
"timeline_layers":["000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9"],
|
||||
"missing_layers":["This shouldn't fail deserialization"],
|
||||
"layer_metadata":{
|
||||
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9": { "file_size": 25600000 },
|
||||
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51": { "file_size": 9007199254741001 }
|
||||
},
|
||||
"disk_consistent_lsn":"0/16960E8",
|
||||
"metadata_bytes":[112,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],
|
||||
"deleted_at": "2023-07-31T09:00:00.123"
|
||||
}"#;
|
||||
|
||||
let expected = IndexPart {
|
||||
// note this is not verified, could be anything, but exists for humans debugging.. could be the git version instead?
|
||||
version: 2,
|
||||
timeline_layers: HashSet::from(["000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap()]),
|
||||
layer_metadata: HashMap::from([
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
|
||||
file_size: 25600000,
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata {
|
||||
// serde_json should always parse this but this might be a double with jq for
|
||||
// example.
|
||||
file_size: 9007199254741001,
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
metadata_bytes: [112,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0].to_vec(),
|
||||
deleted_at: Some(chrono::NaiveDateTime::parse_from_str(
|
||||
"2023-07-31T09:00:00.123000000", "%Y-%m-%dT%H:%M:%S.%f").unwrap())
|
||||
};
|
||||
|
||||
let part = serde_json::from_str::<IndexPart>(example).unwrap();
|
||||
assert_eq!(part, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn empty_layers_are_parsed() {
|
||||
let empty_layers_json = r#"{
|
||||
|
||||
@@ -256,7 +256,7 @@ impl Layer for DeltaLayer {
|
||||
file,
|
||||
);
|
||||
|
||||
tree_reader.dump().await?;
|
||||
tree_reader.dump()?;
|
||||
|
||||
let mut cursor = file.block_cursor();
|
||||
|
||||
|
||||
@@ -175,7 +175,7 @@ impl Layer for ImageLayer {
|
||||
let tree_reader =
|
||||
DiskBtreeReader::<_, KEY_SIZE>::new(inner.index_start_blk, inner.index_root_blk, file);
|
||||
|
||||
tree_reader.dump().await?;
|
||||
tree_reader.dump()?;
|
||||
|
||||
tree_reader.visit(&[0u8; KEY_SIZE], VisitDirection::Forwards, |key, value| {
|
||||
println!("key: {} offset {}", hex::encode(key), value);
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
pub mod delete;
|
||||
mod eviction_task;
|
||||
pub mod layer_manager;
|
||||
mod logical_size;
|
||||
@@ -80,7 +79,6 @@ use crate::METADATA_FILE_NAME;
|
||||
use crate::ZERO_PAGE;
|
||||
use crate::{is_temporary, task_mgr};
|
||||
|
||||
use self::delete::DeleteTimelineFlow;
|
||||
pub(super) use self::eviction_task::EvictionTaskTenantState;
|
||||
use self::eviction_task::EvictionTaskTimelineState;
|
||||
use self::layer_manager::LayerManager;
|
||||
@@ -88,6 +86,7 @@ use self::logical_size::LogicalSize;
|
||||
use self::walreceiver::{WalReceiver, WalReceiverConf};
|
||||
|
||||
use super::config::TenantConf;
|
||||
use super::delete::DeleteTimelineFlow;
|
||||
use super::remote_timeline_client::index::IndexPart;
|
||||
use super::remote_timeline_client::RemoteTimelineClient;
|
||||
use super::storage_layer::{
|
||||
@@ -294,10 +293,6 @@ pub struct Timeline {
|
||||
/// Completion shared between all timelines loaded during startup; used to delay heavier
|
||||
/// background tasks until some logical sizes have been calculated.
|
||||
initial_logical_size_attempt: Mutex<Option<completion::Completion>>,
|
||||
|
||||
/// Load or creation time information about the disk_consistent_lsn and when the loading
|
||||
/// happened. Used for consumption metrics.
|
||||
pub(crate) loaded_at: (Lsn, SystemTime),
|
||||
}
|
||||
|
||||
pub struct WalReceiverInfo {
|
||||
@@ -1408,8 +1403,6 @@ impl Timeline {
|
||||
last_freeze_at: AtomicLsn::new(disk_consistent_lsn.0),
|
||||
last_freeze_ts: RwLock::new(Instant::now()),
|
||||
|
||||
loaded_at: (disk_consistent_lsn, SystemTime::now()),
|
||||
|
||||
ancestor_timeline: ancestor,
|
||||
ancestor_lsn: metadata.ancestor_lsn(),
|
||||
|
||||
@@ -1606,7 +1599,7 @@ impl Timeline {
|
||||
if let Some(imgfilename) = ImageFileName::parse_str(&fname) {
|
||||
// create an ImageLayer struct for each image file.
|
||||
if imgfilename.lsn > disk_consistent_lsn {
|
||||
info!(
|
||||
warn!(
|
||||
"found future image layer {} on timeline {} disk_consistent_lsn is {}",
|
||||
imgfilename, self.timeline_id, disk_consistent_lsn
|
||||
);
|
||||
@@ -1638,7 +1631,7 @@ impl Timeline {
|
||||
// is 102, then it might not have been fully flushed to disk
|
||||
// before crash.
|
||||
if deltafilename.lsn_range.end > disk_consistent_lsn + 1 {
|
||||
info!(
|
||||
warn!(
|
||||
"found future delta layer {} on timeline {} disk_consistent_lsn is {}",
|
||||
deltafilename, self.timeline_id, disk_consistent_lsn
|
||||
);
|
||||
@@ -1780,7 +1773,7 @@ impl Timeline {
|
||||
match remote_layer_name {
|
||||
LayerFileName::Image(imgfilename) => {
|
||||
if imgfilename.lsn > up_to_date_disk_consistent_lsn {
|
||||
info!(
|
||||
warn!(
|
||||
"found future image layer {} on timeline {} remote_consistent_lsn is {}",
|
||||
imgfilename, self.timeline_id, up_to_date_disk_consistent_lsn
|
||||
);
|
||||
@@ -1805,7 +1798,7 @@ impl Timeline {
|
||||
// is 102, then it might not have been fully flushed to disk
|
||||
// before crash.
|
||||
if deltafilename.lsn_range.end > up_to_date_disk_consistent_lsn + 1 {
|
||||
info!(
|
||||
warn!(
|
||||
"found future delta layer {} on timeline {} remote_consistent_lsn is {}",
|
||||
deltafilename, self.timeline_id, up_to_date_disk_consistent_lsn
|
||||
);
|
||||
|
||||
@@ -53,12 +53,6 @@ pub enum BackendType<'a, T> {
|
||||
Postgres(Cow<'a, console::provider::mock::Api>, T),
|
||||
/// Authentication via a web browser.
|
||||
Link(Cow<'a, url::ApiUrl>),
|
||||
/// Test backend.
|
||||
Test(&'a dyn TestBackend),
|
||||
}
|
||||
|
||||
pub trait TestBackend: Send + Sync + 'static {
|
||||
fn wake_compute(&self) -> Result<CachedNodeInfo, console::errors::WakeComputeError>;
|
||||
}
|
||||
|
||||
impl std::fmt::Display for BackendType<'_, ()> {
|
||||
@@ -68,7 +62,6 @@ impl std::fmt::Display for BackendType<'_, ()> {
|
||||
Console(endpoint, _) => fmt.debug_tuple("Console").field(&endpoint.url()).finish(),
|
||||
Postgres(endpoint, _) => fmt.debug_tuple("Postgres").field(&endpoint.url()).finish(),
|
||||
Link(url) => fmt.debug_tuple("Link").field(&url.as_str()).finish(),
|
||||
Test(_) => fmt.debug_tuple("Test").finish(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -82,7 +75,6 @@ impl<T> BackendType<'_, T> {
|
||||
Console(c, x) => Console(Cow::Borrowed(c), x),
|
||||
Postgres(c, x) => Postgres(Cow::Borrowed(c), x),
|
||||
Link(c) => Link(Cow::Borrowed(c)),
|
||||
Test(x) => Test(*x),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -97,7 +89,6 @@ impl<'a, T> BackendType<'a, T> {
|
||||
Console(c, x) => Console(c, f(x)),
|
||||
Postgres(c, x) => Postgres(c, f(x)),
|
||||
Link(c) => Link(c),
|
||||
Test(x) => Test(x),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -111,7 +102,6 @@ impl<'a, T, E> BackendType<'a, Result<T, E>> {
|
||||
Console(c, x) => x.map(|x| Console(c, x)),
|
||||
Postgres(c, x) => x.map(|x| Postgres(c, x)),
|
||||
Link(c) => Ok(Link(c)),
|
||||
Test(x) => Ok(Test(x)),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -157,7 +147,6 @@ impl BackendType<'_, ClientCredentials<'_>> {
|
||||
Console(_, creds) => creds.project.clone(),
|
||||
Postgres(_, creds) => creds.project.clone(),
|
||||
Link(_) => Some("link".to_owned()),
|
||||
Test(_) => Some("test".to_owned()),
|
||||
}
|
||||
}
|
||||
/// Authenticate the client via the requested backend, possibly using credentials.
|
||||
@@ -199,9 +188,6 @@ impl BackendType<'_, ClientCredentials<'_>> {
|
||||
.await?
|
||||
.map(CachedNodeInfo::new_uncached)
|
||||
}
|
||||
Test(_) => {
|
||||
unreachable!("this function should never be called in the test backend")
|
||||
}
|
||||
};
|
||||
|
||||
info!("user successfully authenticated");
|
||||
@@ -220,7 +206,6 @@ impl BackendType<'_, ClientCredentials<'_>> {
|
||||
Console(api, creds) => api.wake_compute(extra, creds).map_ok(Some).await,
|
||||
Postgres(api, creds) => api.wake_compute(extra, creds).map_ok(Some).await,
|
||||
Link(_) => Ok(None),
|
||||
Test(x) => x.wake_compute().map(Some),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ use crate::{
|
||||
auth::{self, AuthFlow, ClientCredentials},
|
||||
compute,
|
||||
console::{self, AuthInfo, CachedNodeInfo, ConsoleReqExtra},
|
||||
proxy::handle_try_wake,
|
||||
proxy::{try_wake, NUM_RETRIES_CONNECT},
|
||||
sasl, scram,
|
||||
stream::PqStream,
|
||||
};
|
||||
@@ -51,15 +51,14 @@ pub(super) async fn authenticate(
|
||||
}
|
||||
};
|
||||
|
||||
info!("compute node's state has likely changed; requesting a wake-up");
|
||||
let mut num_retries = 0;
|
||||
let mut node = loop {
|
||||
let wake_res = api.wake_compute(extra, creds).await;
|
||||
match handle_try_wake(wake_res, num_retries)? {
|
||||
ControlFlow::Continue(_) => num_retries += 1,
|
||||
num_retries += 1;
|
||||
match try_wake(api, extra, creds).await? {
|
||||
ControlFlow::Break(n) => break n,
|
||||
ControlFlow::Continue(_) if num_retries < NUM_RETRIES_CONNECT => continue,
|
||||
ControlFlow::Continue(e) => return Err(e.into()),
|
||||
}
|
||||
info!(num_retries, "retrying wake compute");
|
||||
};
|
||||
if let Some(keys) = scram_keys {
|
||||
use tokio_postgres::config::AuthKeys;
|
||||
|
||||
@@ -1,9 +1,7 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::bail;
|
||||
use futures::pin_mut;
|
||||
use futures::StreamExt;
|
||||
use hashbrown::HashMap;
|
||||
use hyper::body::HttpBody;
|
||||
use hyper::http::HeaderName;
|
||||
use hyper::http::HeaderValue;
|
||||
@@ -14,7 +12,6 @@ use serde_json::Value;
|
||||
use tokio_postgres::types::Kind;
|
||||
use tokio_postgres::types::Type;
|
||||
use tokio_postgres::GenericClient;
|
||||
use tokio_postgres::IsolationLevel;
|
||||
use tokio_postgres::Row;
|
||||
use url::Url;
|
||||
|
||||
@@ -40,8 +37,6 @@ const MAX_REQUEST_SIZE: u64 = 1024 * 1024; // 1 MB
|
||||
static RAW_TEXT_OUTPUT: HeaderName = HeaderName::from_static("neon-raw-text-output");
|
||||
static ARRAY_MODE: HeaderName = HeaderName::from_static("neon-array-mode");
|
||||
static ALLOW_POOL: HeaderName = HeaderName::from_static("neon-pool-opt-in");
|
||||
static TXN_ISOLATION_LEVEL: HeaderName = HeaderName::from_static("neon-batch-isolation-level");
|
||||
static TXN_READ_ONLY: HeaderName = HeaderName::from_static("neon-batch-read-only");
|
||||
|
||||
static HEADER_VALUE_TRUE: HeaderValue = HeaderValue::from_static("true");
|
||||
|
||||
@@ -175,7 +170,7 @@ pub async fn handle(
|
||||
request: Request<Body>,
|
||||
sni_hostname: Option<String>,
|
||||
conn_pool: Arc<GlobalConnPool>,
|
||||
) -> anyhow::Result<(Value, HashMap<HeaderName, HeaderValue>)> {
|
||||
) -> anyhow::Result<Value> {
|
||||
//
|
||||
// Determine the destination and connection params
|
||||
//
|
||||
@@ -190,23 +185,6 @@ pub async fn handle(
|
||||
// Allow connection pooling only if explicitly requested
|
||||
let allow_pool = headers.get(&ALLOW_POOL) == Some(&HEADER_VALUE_TRUE);
|
||||
|
||||
// isolation level and read only
|
||||
|
||||
let txn_isolation_level_raw = headers.get(&TXN_ISOLATION_LEVEL).cloned();
|
||||
let txn_isolation_level = match txn_isolation_level_raw {
|
||||
Some(ref x) => Some(match x.as_bytes() {
|
||||
b"Serializable" => IsolationLevel::Serializable,
|
||||
b"ReadUncommitted" => IsolationLevel::ReadUncommitted,
|
||||
b"ReadCommitted" => IsolationLevel::ReadCommitted,
|
||||
b"RepeatableRead" => IsolationLevel::RepeatableRead,
|
||||
_ => bail!("invalid isolation level"),
|
||||
}),
|
||||
None => None,
|
||||
};
|
||||
|
||||
let txn_read_only_raw = headers.get(&TXN_READ_ONLY).cloned();
|
||||
let txn_read_only = txn_read_only_raw.as_ref() == Some(&HEADER_VALUE_TRUE);
|
||||
|
||||
let request_content_length = match request.body().size_hint().upper() {
|
||||
Some(v) => v,
|
||||
None => MAX_REQUEST_SIZE + 1,
|
||||
@@ -230,19 +208,10 @@ pub async fn handle(
|
||||
// Now execute the query and return the result
|
||||
//
|
||||
let result = match payload {
|
||||
Payload::Single(query) => query_to_json(&client, query, raw_output, array_mode)
|
||||
.await
|
||||
.map(|x| (x, HashMap::default())),
|
||||
Payload::Single(query) => query_to_json(&client, query, raw_output, array_mode).await,
|
||||
Payload::Batch(queries) => {
|
||||
let mut results = Vec::new();
|
||||
let mut builder = client.build_transaction();
|
||||
if let Some(isolation_level) = txn_isolation_level {
|
||||
builder = builder.isolation_level(isolation_level);
|
||||
}
|
||||
if txn_read_only {
|
||||
builder = builder.read_only(true);
|
||||
}
|
||||
let transaction = builder.start().await?;
|
||||
let transaction = client.transaction().await?;
|
||||
for query in queries {
|
||||
let result = query_to_json(&transaction, query, raw_output, array_mode).await;
|
||||
match result {
|
||||
@@ -254,15 +223,7 @@ pub async fn handle(
|
||||
}
|
||||
}
|
||||
transaction.commit().await?;
|
||||
let mut headers = HashMap::default();
|
||||
headers.insert(
|
||||
TXN_READ_ONLY.clone(),
|
||||
HeaderValue::try_from(txn_read_only.to_string())?,
|
||||
);
|
||||
if let Some(txn_isolation_level_raw) = txn_isolation_level_raw {
|
||||
headers.insert(TXN_ISOLATION_LEVEL.clone(), txn_isolation_level_raw);
|
||||
}
|
||||
Ok((json!({ "results": results }), headers))
|
||||
Ok(json!({ "results": results }))
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -6,7 +6,6 @@ use crate::{
|
||||
};
|
||||
use bytes::{Buf, Bytes};
|
||||
use futures::{Sink, Stream, StreamExt};
|
||||
use hashbrown::HashMap;
|
||||
use hyper::{
|
||||
server::{
|
||||
accept,
|
||||
@@ -206,7 +205,7 @@ async fn ws_handler(
|
||||
Ok(_) => StatusCode::OK,
|
||||
Err(_) => StatusCode::BAD_REQUEST,
|
||||
};
|
||||
let (json, headers) = match result {
|
||||
let json = match result {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
let message = format!("{:?}", e);
|
||||
@@ -217,10 +216,7 @@ async fn ws_handler(
|
||||
},
|
||||
None => Value::Null,
|
||||
};
|
||||
(
|
||||
json!({ "message": message, "code": code }),
|
||||
HashMap::default(),
|
||||
)
|
||||
json!({ "message": message, "code": code })
|
||||
}
|
||||
};
|
||||
json_response(status_code, json).map(|mut r| {
|
||||
@@ -228,9 +224,6 @@ async fn ws_handler(
|
||||
"Access-Control-Allow-Origin",
|
||||
hyper::http::HeaderValue::from_static("*"),
|
||||
);
|
||||
for (k, v) in headers {
|
||||
r.headers_mut().insert(k, v);
|
||||
}
|
||||
r
|
||||
})
|
||||
} else if request.uri().path() == "/sql" && request.method() == Method::OPTIONS {
|
||||
|
||||
@@ -6,7 +6,7 @@ use crate::{
|
||||
cancellation::{self, CancelMap},
|
||||
compute::{self, PostgresConnection},
|
||||
config::{ProxyConfig, TlsConfig},
|
||||
console::{self, errors::WakeComputeError, messages::MetricsAuxInfo, Api},
|
||||
console::{self, errors::WakeComputeError, messages::MetricsAuxInfo},
|
||||
stream::{PqStream, Stream},
|
||||
};
|
||||
use anyhow::{bail, Context};
|
||||
@@ -347,6 +347,11 @@ async fn connect_to_compute_once(
|
||||
.await
|
||||
}
|
||||
|
||||
enum ConnectionState<E> {
|
||||
Cached(console::CachedNodeInfo),
|
||||
Invalid(compute::ConnCfg, E),
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait ConnectMechanism {
|
||||
type Connection;
|
||||
@@ -402,67 +407,70 @@ where
|
||||
|
||||
mechanism.update_connect_config(&mut node_info.config);
|
||||
|
||||
// try once
|
||||
let (config, err) = match mechanism.connect_once(&node_info, CONNECT_TIMEOUT).await {
|
||||
Ok(res) => return Ok(res),
|
||||
Err(e) => {
|
||||
error!(error = ?e, "could not connect to compute node");
|
||||
(invalidate_cache(node_info), e)
|
||||
}
|
||||
};
|
||||
let mut num_retries = 0;
|
||||
let mut state = ConnectionState::<M::ConnectError>::Cached(node_info);
|
||||
|
||||
let mut num_retries = 1;
|
||||
|
||||
// if we failed to connect, it's likely that the compute node was suspended, wake a new compute node
|
||||
info!("compute node's state has likely changed; requesting a wake-up");
|
||||
let node_info = loop {
|
||||
let wake_res = match creds {
|
||||
auth::BackendType::Console(api, creds) => api.wake_compute(extra, creds).await,
|
||||
auth::BackendType::Postgres(api, creds) => api.wake_compute(extra, creds).await,
|
||||
// nothing to do?
|
||||
auth::BackendType::Link(_) => return Err(err.into()),
|
||||
// test backend
|
||||
auth::BackendType::Test(x) => x.wake_compute(),
|
||||
};
|
||||
|
||||
match handle_try_wake(wake_res, num_retries)? {
|
||||
// failed to wake up but we can continue to retry
|
||||
ControlFlow::Continue(_) => {}
|
||||
// successfully woke up a compute node and can break the wakeup loop
|
||||
ControlFlow::Break(mut node_info) => {
|
||||
node_info.config.reuse_password(&config);
|
||||
mechanism.update_connect_config(&mut node_info.config);
|
||||
break node_info;
|
||||
}
|
||||
}
|
||||
|
||||
let wait_duration = retry_after(num_retries);
|
||||
num_retries += 1;
|
||||
|
||||
time::sleep(wait_duration).await;
|
||||
info!(num_retries, "retrying wake compute");
|
||||
};
|
||||
|
||||
// now that we have a new node, try connect to it repeatedly.
|
||||
// this can error for a few reasons, for instance:
|
||||
// * DNS connection settings haven't quite propagated yet
|
||||
info!("wake_compute success. attempting to connect");
|
||||
loop {
|
||||
match mechanism.connect_once(&node_info, CONNECT_TIMEOUT).await {
|
||||
Ok(res) => return Ok(res),
|
||||
Err(e) => {
|
||||
error!(error = ?e, "could not connect to compute node");
|
||||
if !e.should_retry(num_retries) {
|
||||
return Err(e.into());
|
||||
match state {
|
||||
ConnectionState::Invalid(config, err) => {
|
||||
let wake_res = match creds {
|
||||
auth::BackendType::Console(api, creds) => {
|
||||
try_wake(api.as_ref(), extra, creds).await
|
||||
}
|
||||
auth::BackendType::Postgres(api, creds) => {
|
||||
try_wake(api.as_ref(), extra, creds).await
|
||||
}
|
||||
// nothing to do?
|
||||
auth::BackendType::Link(_) => return Err(err.into()),
|
||||
};
|
||||
|
||||
match wake_res {
|
||||
// there was an error communicating with the control plane
|
||||
Err(e) => return Err(e.into()),
|
||||
// failed to wake up but we can continue to retry
|
||||
Ok(ControlFlow::Continue(_)) => {
|
||||
state = ConnectionState::Invalid(config, err);
|
||||
let wait_duration = retry_after(num_retries);
|
||||
num_retries += 1;
|
||||
|
||||
info!(num_retries, "retrying wake compute");
|
||||
time::sleep(wait_duration).await;
|
||||
continue;
|
||||
}
|
||||
// successfully woke up a compute node and can break the wakeup loop
|
||||
Ok(ControlFlow::Break(mut node_info)) => {
|
||||
node_info.config.reuse_password(&config);
|
||||
mechanism.update_connect_config(&mut node_info.config);
|
||||
state = ConnectionState::Cached(node_info)
|
||||
}
|
||||
}
|
||||
}
|
||||
ConnectionState::Cached(node_info) => {
|
||||
match mechanism.connect_once(&node_info, CONNECT_TIMEOUT).await {
|
||||
Ok(res) => return Ok(res),
|
||||
Err(e) => {
|
||||
error!(error = ?e, "could not connect to compute node");
|
||||
if !e.should_retry(num_retries) {
|
||||
return Err(e.into());
|
||||
}
|
||||
|
||||
// after the first connect failure,
|
||||
// we should invalidate the cache and wake up a new compute node
|
||||
if num_retries == 0 {
|
||||
state = ConnectionState::Invalid(invalidate_cache(node_info), e);
|
||||
} else {
|
||||
state = ConnectionState::Cached(node_info);
|
||||
}
|
||||
|
||||
let wait_duration = retry_after(num_retries);
|
||||
num_retries += 1;
|
||||
|
||||
info!(num_retries, "retrying wake compute");
|
||||
time::sleep(wait_duration).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let wait_duration = retry_after(num_retries);
|
||||
num_retries += 1;
|
||||
|
||||
time::sleep(wait_duration).await;
|
||||
info!(num_retries, "retrying connect_once");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -470,15 +478,15 @@ where
|
||||
/// * Returns Ok(Continue(e)) if there was an error waking but retries are acceptable
|
||||
/// * Returns Ok(Break(node)) if the wakeup succeeded
|
||||
/// * Returns Err(e) if there was an error
|
||||
pub fn handle_try_wake(
|
||||
result: Result<console::CachedNodeInfo, WakeComputeError>,
|
||||
num_retries: u32,
|
||||
pub async fn try_wake(
|
||||
api: &impl console::Api,
|
||||
extra: &console::ConsoleReqExtra<'_>,
|
||||
creds: &auth::ClientCredentials<'_>,
|
||||
) -> Result<ControlFlow<console::CachedNodeInfo, WakeComputeError>, WakeComputeError> {
|
||||
match result {
|
||||
info!("compute node's state has likely changed; requesting a wake-up");
|
||||
match api.wake_compute(extra, creds).await {
|
||||
Err(err) => match &err {
|
||||
WakeComputeError::ApiError(api) if api.should_retry(num_retries) => {
|
||||
Ok(ControlFlow::Continue(err))
|
||||
}
|
||||
WakeComputeError::ApiError(api) if api.could_retry() => Ok(ControlFlow::Continue(err)),
|
||||
_ => Err(err),
|
||||
},
|
||||
// Ready to try again.
|
||||
@@ -490,6 +498,8 @@ pub trait ShouldRetry {
|
||||
fn could_retry(&self) -> bool;
|
||||
fn should_retry(&self, num_retries: u32) -> bool {
|
||||
match self {
|
||||
// retry all errors at least once
|
||||
_ if num_retries == 0 => true,
|
||||
_ if num_retries >= NUM_RETRIES_CONNECT => false,
|
||||
err => err.could_retry(),
|
||||
}
|
||||
@@ -541,9 +551,14 @@ impl ShouldRetry for compute::ConnectionError {
|
||||
}
|
||||
}
|
||||
|
||||
fn retry_after(num_retries: u32) -> time::Duration {
|
||||
// 1.5 seems to be an ok growth factor heuristic
|
||||
BASE_RETRY_WAIT_DURATION.mul_f64(1.5_f64.powi(num_retries as i32))
|
||||
pub fn retry_after(num_retries: u32) -> time::Duration {
|
||||
match num_retries {
|
||||
0 => time::Duration::ZERO,
|
||||
_ => {
|
||||
// 3/2 = 1.5 which seems to be an ok growth factor heuristic
|
||||
BASE_RETRY_WAIT_DURATION * 3_u32.pow(num_retries) / 2_u32.pow(num_retries)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Finish client connection initialization: confirm auth success, send params, etc.
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
//! A group of high-level tests for connection establishing logic and auth.
|
||||
//!
|
||||
use std::borrow::Cow;
|
||||
|
||||
use super::*;
|
||||
use crate::auth::backend::TestBackend;
|
||||
use crate::auth::ClientCredentials;
|
||||
use crate::console::{CachedNodeInfo, NodeInfo};
|
||||
use crate::{auth, http, sasl, scram};
|
||||
use crate::{auth, sasl, scram};
|
||||
use async_trait::async_trait;
|
||||
use rstest::rstest;
|
||||
use tokio_postgres::config::SslMode;
|
||||
@@ -302,18 +302,15 @@ async fn scram_auth_mock() -> anyhow::Result<()> {
|
||||
#[test]
|
||||
fn connect_compute_total_wait() {
|
||||
let mut total_wait = tokio::time::Duration::ZERO;
|
||||
for num_retries in 1..10 {
|
||||
for num_retries in 0..10 {
|
||||
total_wait += retry_after(num_retries);
|
||||
}
|
||||
assert!(total_wait < tokio::time::Duration::from_secs(12));
|
||||
assert!(total_wait > tokio::time::Duration::from_secs(10));
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
#[derive(Clone, Copy)]
|
||||
enum ConnectAction {
|
||||
Wake,
|
||||
WakeFail,
|
||||
WakeRetry,
|
||||
Connect,
|
||||
Retry,
|
||||
Fail,
|
||||
@@ -324,17 +321,6 @@ struct TestConnectMechanism {
|
||||
sequence: Vec<ConnectAction>,
|
||||
}
|
||||
|
||||
impl TestConnectMechanism {
|
||||
fn verify(&self) {
|
||||
let counter = self.counter.lock().unwrap();
|
||||
assert_eq!(
|
||||
*counter,
|
||||
self.sequence.len(),
|
||||
"sequence does not proceed to the end"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
impl TestConnectMechanism {
|
||||
fn new(sequence: Vec<ConnectAction>) -> Self {
|
||||
Self {
|
||||
@@ -384,63 +370,30 @@ impl ConnectMechanism for TestConnectMechanism {
|
||||
ConnectAction::Connect => Ok(TestConnection),
|
||||
ConnectAction::Retry => Err(TestConnectError { retryable: true }),
|
||||
ConnectAction::Fail => Err(TestConnectError { retryable: false }),
|
||||
x => panic!("expecting action {:?}, connect is called instead", x),
|
||||
}
|
||||
}
|
||||
|
||||
fn update_connect_config(&self, _conf: &mut compute::ConnCfg) {}
|
||||
}
|
||||
|
||||
impl TestBackend for TestConnectMechanism {
|
||||
fn wake_compute(&self) -> Result<CachedNodeInfo, console::errors::WakeComputeError> {
|
||||
let mut counter = self.counter.lock().unwrap();
|
||||
let action = self.sequence[*counter];
|
||||
*counter += 1;
|
||||
match action {
|
||||
ConnectAction::Wake => Ok(helper_create_cached_node_info()),
|
||||
ConnectAction::WakeFail => {
|
||||
let err = console::errors::ApiError::Console {
|
||||
status: http::StatusCode::FORBIDDEN,
|
||||
text: "TEST".into(),
|
||||
};
|
||||
assert!(!err.could_retry());
|
||||
Err(console::errors::WakeComputeError::ApiError(err))
|
||||
}
|
||||
ConnectAction::WakeRetry => {
|
||||
let err = console::errors::ApiError::Console {
|
||||
status: http::StatusCode::INTERNAL_SERVER_ERROR,
|
||||
text: "TEST".into(),
|
||||
};
|
||||
assert!(err.could_retry());
|
||||
Err(console::errors::WakeComputeError::ApiError(err))
|
||||
}
|
||||
x => panic!("expecting action {:?}, wake_compute is called instead", x),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn helper_create_cached_node_info() -> CachedNodeInfo {
|
||||
fn helper_create_connect_info() -> (
|
||||
CachedNodeInfo,
|
||||
console::ConsoleReqExtra<'static>,
|
||||
auth::BackendType<'static, ClientCredentials<'static>>,
|
||||
) {
|
||||
let node = NodeInfo {
|
||||
config: compute::ConnCfg::new(),
|
||||
aux: Default::default(),
|
||||
allow_self_signed_compute: false,
|
||||
};
|
||||
CachedNodeInfo::new_uncached(node)
|
||||
}
|
||||
|
||||
fn helper_create_connect_info(
|
||||
mechanism: &TestConnectMechanism,
|
||||
) -> (
|
||||
CachedNodeInfo,
|
||||
console::ConsoleReqExtra<'static>,
|
||||
auth::BackendType<'_, ClientCredentials<'static>>,
|
||||
) {
|
||||
let cache = helper_create_cached_node_info();
|
||||
let cache = CachedNodeInfo::new_uncached(node);
|
||||
let extra = console::ConsoleReqExtra {
|
||||
session_id: uuid::Uuid::new_v4(),
|
||||
application_name: Some("TEST"),
|
||||
};
|
||||
let creds = auth::BackendType::Test(mechanism);
|
||||
let url = "https://TEST_URL".parse().unwrap();
|
||||
let api = console::provider::mock::Api::new(url);
|
||||
let creds = auth::BackendType::Postgres(Cow::Owned(api), ClientCredentials::new_noop());
|
||||
(cache, extra, creds)
|
||||
}
|
||||
|
||||
@@ -448,46 +401,42 @@ fn helper_create_connect_info(
|
||||
async fn connect_to_compute_success() {
|
||||
use ConnectAction::*;
|
||||
let mechanism = TestConnectMechanism::new(vec![Connect]);
|
||||
let (cache, extra, creds) = helper_create_connect_info(&mechanism);
|
||||
let (cache, extra, creds) = helper_create_connect_info();
|
||||
connect_to_compute(&mechanism, cache, &extra, &creds)
|
||||
.await
|
||||
.unwrap();
|
||||
mechanism.verify();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn connect_to_compute_retry() {
|
||||
use ConnectAction::*;
|
||||
let mechanism = TestConnectMechanism::new(vec![Retry, Wake, Retry, Connect]);
|
||||
let (cache, extra, creds) = helper_create_connect_info(&mechanism);
|
||||
let mechanism = TestConnectMechanism::new(vec![Retry, Retry, Connect]);
|
||||
let (cache, extra, creds) = helper_create_connect_info();
|
||||
connect_to_compute(&mechanism, cache, &extra, &creds)
|
||||
.await
|
||||
.unwrap();
|
||||
mechanism.verify();
|
||||
}
|
||||
|
||||
/// Test that we don't retry if the error is not retryable.
|
||||
#[tokio::test]
|
||||
async fn connect_to_compute_non_retry_1() {
|
||||
use ConnectAction::*;
|
||||
let mechanism = TestConnectMechanism::new(vec![Retry, Wake, Retry, Fail]);
|
||||
let (cache, extra, creds) = helper_create_connect_info(&mechanism);
|
||||
let mechanism = TestConnectMechanism::new(vec![Retry, Retry, Fail]);
|
||||
let (cache, extra, creds) = helper_create_connect_info();
|
||||
connect_to_compute(&mechanism, cache, &extra, &creds)
|
||||
.await
|
||||
.unwrap_err();
|
||||
mechanism.verify();
|
||||
}
|
||||
|
||||
/// Even for non-retryable errors, we should retry at least once.
|
||||
#[tokio::test]
|
||||
async fn connect_to_compute_non_retry_2() {
|
||||
use ConnectAction::*;
|
||||
let mechanism = TestConnectMechanism::new(vec![Fail, Wake, Retry, Connect]);
|
||||
let (cache, extra, creds) = helper_create_connect_info(&mechanism);
|
||||
let mechanism = TestConnectMechanism::new(vec![Fail, Retry, Connect]);
|
||||
let (cache, extra, creds) = helper_create_connect_info();
|
||||
connect_to_compute(&mechanism, cache, &extra, &creds)
|
||||
.await
|
||||
.unwrap();
|
||||
mechanism.verify();
|
||||
}
|
||||
|
||||
/// Retry for at most `NUM_RETRIES_CONNECT` times.
|
||||
@@ -496,36 +445,11 @@ async fn connect_to_compute_non_retry_3() {
|
||||
assert_eq!(NUM_RETRIES_CONNECT, 10);
|
||||
use ConnectAction::*;
|
||||
let mechanism = TestConnectMechanism::new(vec![
|
||||
Retry, Wake, Retry, Retry, Retry, Retry, Retry, Retry, Retry, Retry, Retry,
|
||||
Retry, Retry, Retry, Retry, Retry, Retry, Retry, Retry, Retry, Retry,
|
||||
/* the 11th time */ Retry,
|
||||
]);
|
||||
let (cache, extra, creds) = helper_create_connect_info(&mechanism);
|
||||
let (cache, extra, creds) = helper_create_connect_info();
|
||||
connect_to_compute(&mechanism, cache, &extra, &creds)
|
||||
.await
|
||||
.unwrap_err();
|
||||
mechanism.verify();
|
||||
}
|
||||
|
||||
/// Should retry wake compute.
|
||||
#[tokio::test]
|
||||
async fn wake_retry() {
|
||||
use ConnectAction::*;
|
||||
let mechanism = TestConnectMechanism::new(vec![Retry, WakeRetry, Wake, Connect]);
|
||||
let (cache, extra, creds) = helper_create_connect_info(&mechanism);
|
||||
connect_to_compute(&mechanism, cache, &extra, &creds)
|
||||
.await
|
||||
.unwrap();
|
||||
mechanism.verify();
|
||||
}
|
||||
|
||||
/// Wake failed with a non-retryable error.
|
||||
#[tokio::test]
|
||||
async fn wake_non_retry() {
|
||||
use ConnectAction::*;
|
||||
let mechanism = TestConnectMechanism::new(vec![Retry, WakeFail]);
|
||||
let (cache, extra, creds) = helper_create_connect_info(&mechanism);
|
||||
connect_to_compute(&mechanism, cache, &extra, &creds)
|
||||
.await
|
||||
.unwrap_err();
|
||||
mechanism.verify();
|
||||
}
|
||||
|
||||
@@ -234,10 +234,7 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
|
||||
listen_pg_addr_tenant_only
|
||||
);
|
||||
let listener = tcp_listener::bind(listen_pg_addr_tenant_only.clone()).map_err(|e| {
|
||||
error!(
|
||||
"failed to bind to address {}: {}",
|
||||
listen_pg_addr_tenant_only, e
|
||||
);
|
||||
error!("failed to bind to address {}: {}", conf.listen_pg_addr, e);
|
||||
e
|
||||
})?;
|
||||
Some(listener)
|
||||
|
||||
@@ -542,7 +542,7 @@ class S3Storage:
|
||||
access_key: str
|
||||
secret_key: str
|
||||
endpoint: Optional[str] = None
|
||||
prefix_in_bucket: Optional[str] = ""
|
||||
prefix_in_bucket: Optional[str] = None
|
||||
|
||||
def access_env_vars(self) -> Dict[str, str]:
|
||||
return {
|
||||
|
||||
@@ -4,7 +4,7 @@ import shutil
|
||||
import subprocess
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from typing import Any, List, Optional
|
||||
from typing import Any, Optional
|
||||
|
||||
import pytest
|
||||
import toml # TODO: replace with tomllib for Python >= 3.11
|
||||
@@ -14,6 +14,7 @@ from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
PortDistributor,
|
||||
parse_project_git_version_output,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.pageserver.utils import (
|
||||
@@ -62,6 +63,7 @@ def test_create_snapshot(
|
||||
neon_env_builder.pg_version = pg_version
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
neon_env_builder.enable_local_fs_remote_storage()
|
||||
neon_env_builder.preserve_database_files = True
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
@@ -257,15 +259,36 @@ def prepare_snapshot(
|
||||
shutil.rmtree(repo_dir / "pgdatadirs")
|
||||
os.mkdir(repo_dir / "endpoints")
|
||||
|
||||
# Remove wal-redo temp directory if it exists. Newer pageserver versions don't create
|
||||
# them anymore, but old versions did.
|
||||
for tenant in (repo_dir / "tenants").glob("*"):
|
||||
wal_redo_dir = tenant / "wal-redo-datadir.___temp"
|
||||
if wal_redo_dir.exists() and wal_redo_dir.is_dir():
|
||||
shutil.rmtree(wal_redo_dir)
|
||||
|
||||
# Update paths and ports in config files
|
||||
pageserver_toml = repo_dir / "pageserver.toml"
|
||||
pageserver_config = toml.load(pageserver_toml)
|
||||
pageserver_config["remote_storage"]["local_path"] = str(repo_dir / "local_fs_remote_storage")
|
||||
for param in ("listen_http_addr", "listen_pg_addr", "broker_endpoint"):
|
||||
pageserver_config[param] = port_distributor.replace_with_new_port(pageserver_config[param])
|
||||
pageserver_config["listen_http_addr"] = port_distributor.replace_with_new_port(
|
||||
pageserver_config["listen_http_addr"]
|
||||
)
|
||||
pageserver_config["listen_pg_addr"] = port_distributor.replace_with_new_port(
|
||||
pageserver_config["listen_pg_addr"]
|
||||
)
|
||||
# since storage_broker these are overridden by neon_local during pageserver
|
||||
# start; remove both to prevent unknown options during etcd ->
|
||||
# storage_broker migration. TODO: remove once broker is released
|
||||
pageserver_config.pop("broker_endpoint", None)
|
||||
pageserver_config.pop("broker_endpoints", None)
|
||||
etcd_broker_endpoints = [f"http://localhost:{port_distributor.get_port()}/"]
|
||||
if get_neon_version(neon_binpath) == "49da498f651b9f3a53b56c7c0697636d880ddfe0":
|
||||
pageserver_config["broker_endpoints"] = etcd_broker_endpoints # old etcd version
|
||||
|
||||
# We don't use authentication in compatibility tests
|
||||
# so just remove authentication related settings.
|
||||
# Older pageserver versions had just one `auth_type` setting. Now there
|
||||
# are separate settings for pg and http ports. We don't use authentication
|
||||
# in compatibility tests so just remove authentication related settings.
|
||||
pageserver_config.pop("auth_type", None)
|
||||
pageserver_config.pop("pg_auth_type", None)
|
||||
pageserver_config.pop("http_auth_type", None)
|
||||
|
||||
@@ -277,16 +300,31 @@ def prepare_snapshot(
|
||||
|
||||
snapshot_config_toml = repo_dir / "config"
|
||||
snapshot_config = toml.load(snapshot_config_toml)
|
||||
for param in ("listen_http_addr", "listen_pg_addr"):
|
||||
snapshot_config["pageserver"][param] = port_distributor.replace_with_new_port(
|
||||
snapshot_config["pageserver"][param]
|
||||
)
|
||||
snapshot_config["broker"]["listen_addr"] = port_distributor.replace_with_new_port(
|
||||
snapshot_config["broker"]["listen_addr"]
|
||||
|
||||
# Provide up/downgrade etcd <-> storage_broker to make forward/backward
|
||||
# compatibility test happy. TODO: leave only the new part once broker is released.
|
||||
if get_neon_version(neon_binpath) == "49da498f651b9f3a53b56c7c0697636d880ddfe0":
|
||||
# old etcd version
|
||||
snapshot_config["etcd_broker"] = {
|
||||
"etcd_binary_path": shutil.which("etcd"),
|
||||
"broker_endpoints": etcd_broker_endpoints,
|
||||
}
|
||||
snapshot_config.pop("broker", None)
|
||||
else:
|
||||
# new storage_broker version
|
||||
broker_listen_addr = f"127.0.0.1:{port_distributor.get_port()}"
|
||||
snapshot_config["broker"] = {"listen_addr": broker_listen_addr}
|
||||
snapshot_config.pop("etcd_broker", None)
|
||||
|
||||
snapshot_config["pageserver"]["listen_http_addr"] = port_distributor.replace_with_new_port(
|
||||
snapshot_config["pageserver"]["listen_http_addr"]
|
||||
)
|
||||
snapshot_config["pageserver"]["listen_pg_addr"] = port_distributor.replace_with_new_port(
|
||||
snapshot_config["pageserver"]["listen_pg_addr"]
|
||||
)
|
||||
for sk in snapshot_config["safekeepers"]:
|
||||
for param in ("http_port", "pg_port", "pg_tenant_only_port"):
|
||||
sk[param] = port_distributor.replace_with_new_port(sk[param])
|
||||
sk["http_port"] = port_distributor.replace_with_new_port(sk["http_port"])
|
||||
sk["pg_port"] = port_distributor.replace_with_new_port(sk["pg_port"])
|
||||
|
||||
if pg_distrib_dir:
|
||||
snapshot_config["pg_distrib_dir"] = str(pg_distrib_dir)
|
||||
@@ -312,6 +350,12 @@ def prepare_snapshot(
|
||||
), f"there're files referencing `test_create_snapshot/repo`, this path should be replaced with {repo_dir}:\n{rv.stdout}"
|
||||
|
||||
|
||||
# get git SHA of neon binary
|
||||
def get_neon_version(neon_binpath: Path):
|
||||
out = subprocess.check_output([neon_binpath / "neon_local", "--version"]).decode("utf-8")
|
||||
return parse_project_git_version_output(out)
|
||||
|
||||
|
||||
def check_neon_works(
|
||||
repo_dir: Path,
|
||||
neon_target_binpath: Path,
|
||||
@@ -337,6 +381,7 @@ def check_neon_works(
|
||||
config.pg_version = pg_version
|
||||
config.initial_tenant = snapshot_config["default_tenant_id"]
|
||||
config.pg_distrib_dir = pg_distrib_dir
|
||||
config.preserve_database_files = True
|
||||
|
||||
# Use the "target" binaries to launch the storage nodes
|
||||
config_target = config
|
||||
@@ -393,14 +438,6 @@ def check_neon_works(
|
||||
test_output_dir / "dump-from-wal.filediff",
|
||||
)
|
||||
|
||||
# TODO: Run pg_amcheck unconditionally after the next release
|
||||
try:
|
||||
pg_bin.run(["psql", connstr, "--command", "CREATE EXTENSION IF NOT EXISTS amcheck"])
|
||||
except subprocess.CalledProcessError:
|
||||
log.info("Extension amcheck is not available, skipping pg_amcheck")
|
||||
else:
|
||||
pg_bin.run_capture(["pg_amcheck", connstr, "--install-missing", "--verbose"])
|
||||
|
||||
# Check that we can interract with the data
|
||||
pg_bin.run_capture(["pgbench", "--time=10", "--progress=2", connstr])
|
||||
|
||||
@@ -408,15 +445,10 @@ def check_neon_works(
|
||||
assert not initial_dump_differs, "initial dump differs"
|
||||
|
||||
|
||||
def dump_differs(
|
||||
first: Path, second: Path, output: Path, allowed_diffs: Optional[List[str]] = None
|
||||
) -> bool:
|
||||
def dump_differs(first: Path, second: Path, output: Path) -> bool:
|
||||
"""
|
||||
Runs diff(1) command on two SQL dumps and write the output to the given output file.
|
||||
The function supports allowed diffs, if the diff is in the allowed_diffs list, it's not considered as a difference.
|
||||
See the example of it in https://github.com/neondatabase/neon/pull/4425/files#diff-15c5bfdd1d5cc1411b9221091511a60dd13a9edf672bdfbb57dd2ef8bb7815d6
|
||||
|
||||
Returns True if the dumps differ and produced diff is not allowed, False otherwise (in most cases we want it to return False).
|
||||
Returns True if the dumps differ, False otherwise.
|
||||
"""
|
||||
|
||||
with output.open("w") as stdout:
|
||||
@@ -434,30 +466,51 @@ def dump_differs(
|
||||
|
||||
differs = res.returncode != 0
|
||||
|
||||
allowed_diffs = allowed_diffs or []
|
||||
if differs and len(allowed_diffs) > 0:
|
||||
for allowed_diff in allowed_diffs:
|
||||
with tempfile.NamedTemporaryFile(mode="w") as tmp:
|
||||
tmp.write(allowed_diff)
|
||||
tmp.flush()
|
||||
# TODO: Remove after https://github.com/neondatabase/neon/pull/4425 is merged, and a couple of releases are made
|
||||
if differs:
|
||||
with tempfile.NamedTemporaryFile(mode="w") as tmp:
|
||||
tmp.write(PR4425_ALLOWED_DIFF)
|
||||
tmp.flush()
|
||||
|
||||
allowed = subprocess.run(
|
||||
[
|
||||
"diff",
|
||||
"--unified", # Make diff output more readable
|
||||
r"--ignore-matching-lines=^---", # Ignore diff headers
|
||||
r"--ignore-matching-lines=^\+\+\+", # Ignore diff headers
|
||||
"--ignore-matching-lines=^@@", # Ignore diff blocks location
|
||||
"--ignore-matching-lines=^ *$", # Ignore lines with only spaces
|
||||
"--ignore-matching-lines=^ --.*", # Ignore SQL comments in diff
|
||||
"--ignore-blank-lines",
|
||||
str(output),
|
||||
str(tmp.name),
|
||||
],
|
||||
)
|
||||
allowed = subprocess.run(
|
||||
[
|
||||
"diff",
|
||||
"--unified", # Make diff output more readable
|
||||
r"--ignore-matching-lines=^---", # Ignore diff headers
|
||||
r"--ignore-matching-lines=^\+\+\+", # Ignore diff headers
|
||||
"--ignore-matching-lines=^@@", # Ignore diff blocks location
|
||||
"--ignore-matching-lines=^ *$", # Ignore lines with only spaces
|
||||
"--ignore-matching-lines=^ --.*", # Ignore the " --" lines for compatibility with PG14
|
||||
"--ignore-blank-lines",
|
||||
str(output),
|
||||
str(tmp.name),
|
||||
],
|
||||
)
|
||||
|
||||
differs = allowed.returncode != 0
|
||||
if not differs:
|
||||
break
|
||||
differs = allowed.returncode != 0
|
||||
|
||||
return differs
|
||||
|
||||
|
||||
PR4425_ALLOWED_DIFF = """
|
||||
--- /tmp/test_output/test_backward_compatibility[release-pg15]/compatibility_snapshot/dump.sql 2023-06-08 18:12:45.000000000 +0000
|
||||
+++ /tmp/test_output/test_backward_compatibility[release-pg15]/dump.sql 2023-06-13 07:25:35.211733653 +0000
|
||||
@@ -13,12 +13,20 @@
|
||||
|
||||
CREATE ROLE cloud_admin;
|
||||
ALTER ROLE cloud_admin WITH SUPERUSER INHERIT CREATEROLE CREATEDB LOGIN REPLICATION BYPASSRLS;
|
||||
+CREATE ROLE neon_superuser;
|
||||
+ALTER ROLE neon_superuser WITH NOSUPERUSER INHERIT CREATEROLE CREATEDB NOLOGIN NOREPLICATION NOBYPASSRLS;
|
||||
|
||||
--
|
||||
-- User Configurations
|
||||
--
|
||||
|
||||
|
||||
+--
|
||||
+-- Role memberships
|
||||
+--
|
||||
+
|
||||
+GRANT pg_read_all_data TO neon_superuser GRANTED BY cloud_admin;
|
||||
+GRANT pg_write_all_data TO neon_superuser GRANTED BY cloud_admin;
|
||||
"""
|
||||
|
||||
@@ -14,6 +14,10 @@ from fixtures.neon_fixtures import NeonEnvBuilder, PgBin
|
||||
def test_gc_cutoff(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# These warnings are expected, when the pageserver is restarted abruptly
|
||||
env.pageserver.allowed_errors.append(".*found future image layer.*")
|
||||
env.pageserver.allowed_errors.append(".*found future delta layer.*")
|
||||
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
# Use aggressive GC and checkpoint settings, so that we also exercise GC during the test
|
||||
|
||||
@@ -690,6 +690,10 @@ def test_ondemand_download_failure_to_replace(
|
||||
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
lsn = Lsn(pageserver_http.timeline_detail(tenant_id, timeline_id)["last_record_lsn"])
|
||||
|
||||
wait_for_upload(pageserver_http, tenant_id, timeline_id, lsn)
|
||||
|
||||
# remove layers so that they will be redownloaded
|
||||
pageserver_http.tenant_detach(tenant_id)
|
||||
pageserver_http.tenant_attach(tenant_id)
|
||||
@@ -700,10 +704,8 @@ def test_ondemand_download_failure_to_replace(
|
||||
# requesting details with non-incremental size should trigger a download of the only layer
|
||||
# this will need to be adjusted if an index for logical sizes is ever implemented
|
||||
with pytest.raises(PageserverApiException):
|
||||
# PageserverApiException is expected because of the failpoint (timeline_detail building does something)
|
||||
# ReadTimeout can happen on our busy CI, but it should not, because there is no more busylooping
|
||||
# but should it be added back, we would wait for 15s here.
|
||||
pageserver_http.timeline_detail(tenant_id, timeline_id, True, timeout=15)
|
||||
# error message is not useful
|
||||
pageserver_http.timeline_detail(tenant_id, timeline_id, True, timeout=2)
|
||||
|
||||
actual_message = ".* ERROR .*layermap-replace-notfound"
|
||||
assert env.pageserver.log_contains(actual_message) is not None
|
||||
|
||||
@@ -72,6 +72,10 @@ def test_pageserver_restart(neon_env_builder: NeonEnvBuilder):
|
||||
def test_pageserver_chaos(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# These warnings are expected, when the pageserver is restarted abruptly
|
||||
env.pageserver.allowed_errors.append(".*found future image layer.*")
|
||||
env.pageserver.allowed_errors.append(".*found future delta layer.*")
|
||||
|
||||
# Use a tiny checkpoint distance, to create a lot of layers quickly.
|
||||
# That allows us to stress the compaction and layer flushing logic more.
|
||||
tenant, _ = env.neon_cli.create_tenant(
|
||||
|
||||
@@ -265,23 +265,18 @@ def test_sql_over_http_output_options(static_proxy: NeonProxy):
|
||||
def test_sql_over_http_batch(static_proxy: NeonProxy):
|
||||
static_proxy.safe_psql("create role http with login password 'http' superuser")
|
||||
|
||||
def qq(queries: List[Tuple[str, Optional[List[Any]]]], read_only: bool = False) -> Any:
|
||||
def qq(queries: List[Tuple[str, Optional[List[Any]]]]) -> Any:
|
||||
connstr = f"postgresql://http:http@{static_proxy.domain}:{static_proxy.proxy_port}/postgres"
|
||||
response = requests.post(
|
||||
f"https://{static_proxy.domain}:{static_proxy.external_http_port}/sql",
|
||||
data=json.dumps(list(map(lambda x: {"query": x[0], "params": x[1] or []}, queries))),
|
||||
headers={
|
||||
"Content-Type": "application/sql",
|
||||
"Neon-Connection-String": connstr,
|
||||
"Neon-Batch-Isolation-Level": "Serializable",
|
||||
"Neon-Batch-Read-Only": "true" if read_only else "false",
|
||||
},
|
||||
headers={"Content-Type": "application/sql", "Neon-Connection-String": connstr},
|
||||
verify=str(static_proxy.test_output_dir / "proxy.crt"),
|
||||
)
|
||||
assert response.status_code == 200
|
||||
return response.json()["results"], response.headers
|
||||
return response.json()["results"]
|
||||
|
||||
result, headers = qq(
|
||||
result = qq(
|
||||
[
|
||||
("select 42 as answer", None),
|
||||
("select $1 as answer", [42]),
|
||||
@@ -296,9 +291,6 @@ def test_sql_over_http_batch(static_proxy: NeonProxy):
|
||||
]
|
||||
)
|
||||
|
||||
assert headers["Neon-Batch-Isolation-Level"] == "Serializable"
|
||||
assert headers["Neon-Batch-Read-Only"] == "false"
|
||||
|
||||
assert result[0]["rows"] == [{"answer": 42}]
|
||||
assert result[1]["rows"] == [{"answer": "42"}]
|
||||
assert result[2]["rows"] == [{"answer": 42}]
|
||||
@@ -319,14 +311,3 @@ def test_sql_over_http_batch(static_proxy: NeonProxy):
|
||||
assert res["command"] == "DROP"
|
||||
assert res["rowCount"] is None
|
||||
assert len(result) == 10
|
||||
|
||||
result, headers = qq(
|
||||
[
|
||||
("select 42 as answer", None),
|
||||
],
|
||||
True,
|
||||
)
|
||||
assert headers["Neon-Batch-Isolation-Level"] == "Serializable"
|
||||
assert headers["Neon-Batch-Read-Only"] == "true"
|
||||
|
||||
assert result[0]["rows"] == [{"answer": 42}]
|
||||
|
||||
@@ -15,6 +15,10 @@ def test_pageserver_recovery(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
env.pageserver.is_testing_enabled_or_skip()
|
||||
|
||||
# These warnings are expected, when the pageserver is restarted abruptly
|
||||
env.pageserver.allowed_errors.append(".*found future delta layer.*")
|
||||
env.pageserver.allowed_errors.append(".*found future image layer.*")
|
||||
|
||||
# Create a branch for us
|
||||
env.neon_cli.create_branch("test_pageserver_recovery", "main")
|
||||
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: ebedb34d01...12c5dc8281
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 1220c8a63f...e3fbfc4d14
4
vendor/revisions.json
vendored
4
vendor/revisions.json
vendored
@@ -1,4 +1,4 @@
|
||||
{
|
||||
"postgres-v15": "1220c8a63f00101829f9222a5821fc084b4384c7",
|
||||
"postgres-v14": "ebedb34d01c8ac9c31e8ea4628b9854103a1dc8f"
|
||||
"postgres-v15": "e3fbfc4d143b2d3c3c1813ce747f8af35aa9405e",
|
||||
"postgres-v14": "12c5dc8281d20b5bd636e1097eea80a7bc609591"
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user