mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-15 00:20:36 +00:00
Compare commits
19 Commits
alek/remot
...
revert-pre
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0abfc72f1c | ||
|
|
ddbe170454 | ||
|
|
39e458f049 | ||
|
|
e1424647a0 | ||
|
|
705ae2dce9 | ||
|
|
eb78603121 | ||
|
|
f0ad603693 | ||
|
|
e5183f85dc | ||
|
|
89ee8f2028 | ||
|
|
a8f3540f3d | ||
|
|
4338eed8c4 | ||
|
|
2fbdf26094 | ||
|
|
7374634845 | ||
|
|
9fdd3a4a1e | ||
|
|
3681fc39fd | ||
|
|
67d2fa6dec | ||
|
|
cafbe8237e | ||
|
|
3e425c40c0 | ||
|
|
395bd9174e |
@@ -551,10 +551,8 @@ FROM build-deps AS pg-embedding-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
ENV PATH "/usr/local/pgsql/bin/:$PATH"
|
||||
# eeb3ba7c3a60c95b2604dd543c64b2f1bb4a3703 made on 15/07/2023
|
||||
# There is no release tag yet
|
||||
RUN wget https://github.com/neondatabase/pg_embedding/archive/eeb3ba7c3a60c95b2604dd543c64b2f1bb4a3703.tar.gz -O pg_embedding.tar.gz && \
|
||||
echo "030846df723652f99a8689ce63b66fa0c23477a7fd723533ab8a6b28ab70730f pg_embedding.tar.gz" | sha256sum --check && \
|
||||
RUN wget https://github.com/neondatabase/pg_embedding/archive/refs/tags/0.3.1.tar.gz -O pg_embedding.tar.gz && \
|
||||
echo "c4ae84eef36fa8ec5868f6e061f39812f19ee5ba3604d428d40935685c7be512 pg_embedding.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_embedding-src && cd pg_embedding-src && tar xvzf ../pg_embedding.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
@@ -776,13 +774,6 @@ ARG PG_VERSION
|
||||
ARG BUILD_TAG
|
||||
RUN apt update && apt install -y zstd
|
||||
|
||||
# Define extension build numbers
|
||||
# NOTE: it is *not* necessary for the build_tag to be BUILD_TAG. In particular,
|
||||
# you should update the build_tag for each extension only when you build it
|
||||
RUN echo "kq_imcx 5670669815" >> build_tags.txt && \
|
||||
echo "anon 5670669815" >> build_tags.txt && \
|
||||
echo "postgis 5670669815" >> build_tags.txt
|
||||
|
||||
# copy the control files here
|
||||
COPY --from=kq-imcx-pg-build /extensions/ /extensions/
|
||||
COPY --from=pg-anon-pg-build /extensions/ /extensions/
|
||||
|
||||
2
Makefile
2
Makefile
@@ -108,6 +108,8 @@ postgres-%: postgres-configure-% \
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pg_buffercache install
|
||||
+@echo "Compiling pageinspect $*"
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pageinspect install
|
||||
+@echo "Compiling amcheck $*"
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/amcheck install
|
||||
|
||||
.PHONY: postgres-clean-%
|
||||
postgres-clean-%:
|
||||
|
||||
@@ -193,6 +193,13 @@ fn main() -> Result<()> {
|
||||
if !spec_set {
|
||||
// No spec provided, hang waiting for it.
|
||||
info!("no compute spec provided, waiting");
|
||||
|
||||
// TODO this can stall startups in the unlikely event that we bind
|
||||
// this compute node while it's busy prewarming. It's not too
|
||||
// bad because it's just 100ms and unlikely, but it's an
|
||||
// avoidable problem.
|
||||
// compute.prewarm_postgres()?;
|
||||
|
||||
let mut state = compute.state.lock().unwrap();
|
||||
while state.status != ComputeStatus::ConfigurationPending {
|
||||
state = compute.state_changed.wait(state).unwrap();
|
||||
|
||||
@@ -532,6 +532,50 @@ impl ComputeNode {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Start and stop a postgres process to warm up the VM for startup.
|
||||
pub fn prewarm_postgres(&self) -> Result<()> {
|
||||
info!("prewarming");
|
||||
|
||||
// Create pgdata
|
||||
let pgdata = &format!("{}.warmup", self.pgdata);
|
||||
create_pgdata(pgdata)?;
|
||||
|
||||
// Run initdb to completion
|
||||
info!("running initdb");
|
||||
let initdb_bin = Path::new(&self.pgbin).parent().unwrap().join("initdb");
|
||||
Command::new(initdb_bin)
|
||||
.args(["-D", pgdata])
|
||||
.output()
|
||||
.expect("cannot start initdb process");
|
||||
|
||||
// Write conf
|
||||
use std::io::Write;
|
||||
let conf_path = Path::new(pgdata).join("postgresql.conf");
|
||||
let mut file = std::fs::File::create(conf_path)?;
|
||||
writeln!(file, "shared_buffers=65536")?;
|
||||
writeln!(file, "port=51055")?; // Nobody should be connecting
|
||||
writeln!(file, "shared_preload_libraries = 'neon'")?;
|
||||
|
||||
// Start postgres
|
||||
info!("starting postgres");
|
||||
let mut pg = Command::new(&self.pgbin)
|
||||
.args(["-D", pgdata])
|
||||
.spawn()
|
||||
.expect("cannot start postgres process");
|
||||
|
||||
// Stop it when it's ready
|
||||
info!("waiting for postgres");
|
||||
wait_for_postgres(&mut pg, Path::new(pgdata))?;
|
||||
pg.kill()?;
|
||||
info!("sent kill signal");
|
||||
pg.wait()?;
|
||||
info!("done prewarming");
|
||||
|
||||
// clean up
|
||||
let _ok = fs::remove_dir_all(pgdata);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Start Postgres as a child process and manage DBs/roles.
|
||||
/// After that this will hang waiting on the postmaster process to exit.
|
||||
#[instrument(skip_all)]
|
||||
|
||||
@@ -564,9 +564,7 @@ impl Endpoint {
|
||||
}
|
||||
Err(e) => {
|
||||
if attempt == MAX_ATTEMPTS {
|
||||
return Err(e).context(
|
||||
"timed out waiting to connect to compute_ctl HTTP; last error: {e}",
|
||||
);
|
||||
return Err(e).context("timed out waiting to connect to compute_ctl HTTP");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -200,13 +200,17 @@ impl S3Bucket {
|
||||
)
|
||||
}
|
||||
|
||||
fn relative_path_to_s3_object(&self, path: &RemotePath) -> String {
|
||||
let mut full_path = self.prefix_in_bucket.clone().unwrap_or_default();
|
||||
for segment in path.0.iter() {
|
||||
full_path.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
|
||||
full_path.push_str(segment.to_str().unwrap_or_default());
|
||||
pub fn relative_path_to_s3_object(&self, path: &RemotePath) -> String {
|
||||
assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
|
||||
let path_string = path
|
||||
.get_path()
|
||||
.to_string_lossy()
|
||||
.trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR)
|
||||
.to_string();
|
||||
match &self.prefix_in_bucket {
|
||||
Some(prefix) => prefix.clone() + "/" + &path_string,
|
||||
None => path_string,
|
||||
}
|
||||
full_path
|
||||
}
|
||||
|
||||
async fn download_object(&self, request: GetObjectRequest) -> Result<Download, DownloadError> {
|
||||
@@ -427,10 +431,12 @@ impl RemoteStorage for S3Bucket {
|
||||
}
|
||||
|
||||
async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
|
||||
// if prefix is not none then download file `prefix/from`
|
||||
// if prefix is none then download file `from`
|
||||
self.download_object(GetObjectRequest {
|
||||
bucket: self.bucket_name.clone(),
|
||||
key: self.relative_path_to_s3_object(from),
|
||||
..GetObjectRequest::default()
|
||||
range: None,
|
||||
})
|
||||
.await
|
||||
}
|
||||
@@ -523,3 +529,63 @@ impl RemoteStorage for S3Bucket {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::num::NonZeroUsize;
|
||||
use std::path::Path;
|
||||
|
||||
use crate::{RemotePath, S3Bucket, S3Config};
|
||||
|
||||
#[test]
|
||||
fn relative_path() {
|
||||
let all_paths = vec!["", "some/path", "some/path/"];
|
||||
let all_paths: Vec<RemotePath> = all_paths
|
||||
.iter()
|
||||
.map(|x| RemotePath::new(Path::new(x)).expect("bad path"))
|
||||
.collect();
|
||||
let prefixes = [
|
||||
None,
|
||||
Some(""),
|
||||
Some("test/prefix"),
|
||||
Some("test/prefix/"),
|
||||
Some("/test/prefix/"),
|
||||
];
|
||||
let expected_outputs = vec![
|
||||
vec!["", "some/path", "some/path"],
|
||||
vec!["/", "/some/path", "/some/path"],
|
||||
vec![
|
||||
"test/prefix/",
|
||||
"test/prefix/some/path",
|
||||
"test/prefix/some/path",
|
||||
],
|
||||
vec![
|
||||
"test/prefix/",
|
||||
"test/prefix/some/path",
|
||||
"test/prefix/some/path",
|
||||
],
|
||||
vec![
|
||||
"test/prefix/",
|
||||
"test/prefix/some/path",
|
||||
"test/prefix/some/path",
|
||||
],
|
||||
];
|
||||
|
||||
for (prefix_idx, prefix) in prefixes.iter().enumerate() {
|
||||
let config = S3Config {
|
||||
bucket_name: "bucket".to_owned(),
|
||||
bucket_region: "region".to_owned(),
|
||||
prefix_in_bucket: prefix.map(str::to_string),
|
||||
endpoint: None,
|
||||
concurrency_limit: NonZeroUsize::new(100).unwrap(),
|
||||
max_keys_per_list_response: Some(5),
|
||||
};
|
||||
let storage = S3Bucket::new(&config).expect("remote storage init");
|
||||
for (test_path_idx, test_path) in all_paths.iter().enumerate() {
|
||||
let result = storage.relative_path_to_s3_object(test_path);
|
||||
let expected = expected_outputs[prefix_idx][test_path_idx];
|
||||
assert_eq!(result, expected);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ static LOGGING_DONE: OnceCell<()> = OnceCell::new();
|
||||
|
||||
const ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_S3_REMOTE_STORAGE";
|
||||
|
||||
const BASE_PREFIX: &str = "test/";
|
||||
const BASE_PREFIX: &str = "test";
|
||||
|
||||
/// Tests that S3 client can list all prefixes, even if the response come paginated and requires multiple S3 queries.
|
||||
/// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified.
|
||||
|
||||
@@ -46,7 +46,6 @@ use std::sync::{Mutex, RwLock};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use self::config::TenantConf;
|
||||
use self::delete::DeleteTimelineFlow;
|
||||
use self::metadata::LoadMetadataError;
|
||||
use self::metadata::TimelineMetadata;
|
||||
use self::remote_timeline_client::RemoteTimelineClient;
|
||||
@@ -70,6 +69,7 @@ use crate::tenant::storage_layer::ImageLayer;
|
||||
use crate::tenant::storage_layer::Layer;
|
||||
use crate::InitializationOrder;
|
||||
|
||||
use crate::tenant::timeline::delete::DeleteTimelineFlow;
|
||||
use crate::tenant::timeline::uninit::cleanup_timeline_directory;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use crate::walredo::PostgresRedoManager;
|
||||
@@ -117,7 +117,6 @@ mod remote_timeline_client;
|
||||
pub mod storage_layer;
|
||||
|
||||
pub mod config;
|
||||
pub mod delete;
|
||||
pub mod mgr;
|
||||
pub mod tasks;
|
||||
pub mod upload_queue;
|
||||
|
||||
@@ -390,39 +390,42 @@ where
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn dump(&self) -> Result<()> {
|
||||
self.dump_recurse(self.root_blk, &[], 0)
|
||||
}
|
||||
pub async fn dump(&self) -> Result<()> {
|
||||
let mut stack = Vec::new();
|
||||
|
||||
fn dump_recurse(&self, blknum: u32, path: &[u8], depth: usize) -> Result<()> {
|
||||
let blk = self.reader.read_blk(self.start_blk + blknum)?;
|
||||
let buf: &[u8] = blk.as_ref();
|
||||
stack.push((self.root_blk, String::new(), 0, 0, 0));
|
||||
|
||||
let node = OnDiskNode::<L>::deparse(buf)?;
|
||||
while let Some((blknum, path, depth, child_idx, key_off)) = stack.pop() {
|
||||
let blk = self.reader.read_blk(self.start_blk + blknum)?;
|
||||
let buf: &[u8] = blk.as_ref();
|
||||
let node = OnDiskNode::<L>::deparse(buf)?;
|
||||
|
||||
print!("{:indent$}", "", indent = depth * 2);
|
||||
println!(
|
||||
"blk #{}: path {}: prefix {}, suffix_len {}",
|
||||
blknum,
|
||||
hex::encode(path),
|
||||
hex::encode(node.prefix),
|
||||
node.suffix_len
|
||||
);
|
||||
if child_idx == 0 {
|
||||
print!("{:indent$}", "", indent = depth * 2);
|
||||
let path_prefix = stack
|
||||
.iter()
|
||||
.map(|(_blknum, path, ..)| path.as_str())
|
||||
.collect::<String>();
|
||||
println!(
|
||||
"blk #{blknum}: path {path_prefix}{path}: prefix {}, suffix_len {}",
|
||||
hex::encode(node.prefix),
|
||||
node.suffix_len
|
||||
);
|
||||
}
|
||||
|
||||
let mut idx = 0;
|
||||
let mut key_off = 0;
|
||||
while idx < node.num_children {
|
||||
if child_idx + 1 < node.num_children {
|
||||
let key_off = key_off + node.suffix_len as usize;
|
||||
stack.push((blknum, path.clone(), depth, child_idx + 1, key_off));
|
||||
}
|
||||
let key = &node.keys[key_off..key_off + node.suffix_len as usize];
|
||||
let val = node.value(idx as usize);
|
||||
let val = node.value(child_idx as usize);
|
||||
|
||||
print!("{:indent$}", "", indent = depth * 2 + 2);
|
||||
println!("{}: {}", hex::encode(key), hex::encode(val.0));
|
||||
|
||||
if node.level > 0 {
|
||||
let child_path = [path, node.prefix].concat();
|
||||
self.dump_recurse(val.to_blknum(), &child_path, depth + 1)?;
|
||||
stack.push((val.to_blknum(), hex::encode(node.prefix), depth + 1, 0, 0));
|
||||
}
|
||||
idx += 1;
|
||||
key_off += node.suffix_len as usize;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -754,8 +757,8 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn basic() -> Result<()> {
|
||||
#[tokio::test]
|
||||
async fn basic() -> Result<()> {
|
||||
let mut disk = TestDisk::new();
|
||||
let mut writer = DiskBtreeBuilder::<_, 6>::new(&mut disk);
|
||||
|
||||
@@ -775,7 +778,7 @@ mod tests {
|
||||
|
||||
let reader = DiskBtreeReader::new(0, root_offset, disk);
|
||||
|
||||
reader.dump()?;
|
||||
reader.dump().await?;
|
||||
|
||||
// Test the `get` function on all the keys.
|
||||
for (key, val) in all_data.iter() {
|
||||
@@ -835,8 +838,8 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn lots_of_keys() -> Result<()> {
|
||||
#[tokio::test]
|
||||
async fn lots_of_keys() -> Result<()> {
|
||||
let mut disk = TestDisk::new();
|
||||
let mut writer = DiskBtreeBuilder::<_, 8>::new(&mut disk);
|
||||
|
||||
@@ -856,7 +859,7 @@ mod tests {
|
||||
|
||||
let reader = DiskBtreeReader::new(0, root_offset, disk);
|
||||
|
||||
reader.dump()?;
|
||||
reader.dump().await?;
|
||||
|
||||
use std::sync::Mutex;
|
||||
|
||||
@@ -994,8 +997,8 @@ mod tests {
|
||||
///
|
||||
/// This test contains a particular data set, see disk_btree_test_data.rs
|
||||
///
|
||||
#[test]
|
||||
fn particular_data() -> Result<()> {
|
||||
#[tokio::test]
|
||||
async fn particular_data() -> Result<()> {
|
||||
// Build a tree from it
|
||||
let mut disk = TestDisk::new();
|
||||
let mut writer = DiskBtreeBuilder::<_, 26>::new(&mut disk);
|
||||
@@ -1022,7 +1025,7 @@ mod tests {
|
||||
})?;
|
||||
assert_eq!(count, disk_btree_test_data::TEST_DATA.len());
|
||||
|
||||
reader.dump()?;
|
||||
reader.dump().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -26,7 +26,7 @@ use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME};
|
||||
use utils::fs_ext::PathExt;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use super::delete::DeleteTimelineFlow;
|
||||
use super::timeline::delete::DeleteTimelineFlow;
|
||||
|
||||
/// The tenants known to the pageserver.
|
||||
/// The enum variants are used to distinguish the different states that the pageserver can be in.
|
||||
|
||||
@@ -223,6 +223,45 @@ mod tests {
|
||||
assert_eq!(part, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn v2_indexpart_is_parsed_with_deleted_at() {
|
||||
let example = r#"{
|
||||
"version":2,
|
||||
"timeline_layers":["000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9"],
|
||||
"missing_layers":["This shouldn't fail deserialization"],
|
||||
"layer_metadata":{
|
||||
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9": { "file_size": 25600000 },
|
||||
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51": { "file_size": 9007199254741001 }
|
||||
},
|
||||
"disk_consistent_lsn":"0/16960E8",
|
||||
"metadata_bytes":[112,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],
|
||||
"deleted_at": "2023-07-31T09:00:00.123"
|
||||
}"#;
|
||||
|
||||
let expected = IndexPart {
|
||||
// note this is not verified, could be anything, but exists for humans debugging.. could be the git version instead?
|
||||
version: 2,
|
||||
timeline_layers: HashSet::from(["000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap()]),
|
||||
layer_metadata: HashMap::from([
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
|
||||
file_size: 25600000,
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata {
|
||||
// serde_json should always parse this but this might be a double with jq for
|
||||
// example.
|
||||
file_size: 9007199254741001,
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
metadata_bytes: [112,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0].to_vec(),
|
||||
deleted_at: Some(chrono::NaiveDateTime::parse_from_str(
|
||||
"2023-07-31T09:00:00.123000000", "%Y-%m-%dT%H:%M:%S.%f").unwrap())
|
||||
};
|
||||
|
||||
let part = serde_json::from_str::<IndexPart>(example).unwrap();
|
||||
assert_eq!(part, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn empty_layers_are_parsed() {
|
||||
let empty_layers_json = r#"{
|
||||
|
||||
@@ -256,7 +256,7 @@ impl Layer for DeltaLayer {
|
||||
file,
|
||||
);
|
||||
|
||||
tree_reader.dump()?;
|
||||
tree_reader.dump().await?;
|
||||
|
||||
let mut cursor = file.block_cursor();
|
||||
|
||||
|
||||
@@ -175,7 +175,7 @@ impl Layer for ImageLayer {
|
||||
let tree_reader =
|
||||
DiskBtreeReader::<_, KEY_SIZE>::new(inner.index_start_blk, inner.index_root_blk, file);
|
||||
|
||||
tree_reader.dump()?;
|
||||
tree_reader.dump().await?;
|
||||
|
||||
tree_reader.visit(&[0u8; KEY_SIZE], VisitDirection::Forwards, |key, value| {
|
||||
println!("key: {} offset {}", hex::encode(key), value);
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
pub mod delete;
|
||||
mod eviction_task;
|
||||
pub mod layer_manager;
|
||||
mod logical_size;
|
||||
@@ -79,6 +80,7 @@ use crate::METADATA_FILE_NAME;
|
||||
use crate::ZERO_PAGE;
|
||||
use crate::{is_temporary, task_mgr};
|
||||
|
||||
use self::delete::DeleteTimelineFlow;
|
||||
pub(super) use self::eviction_task::EvictionTaskTenantState;
|
||||
use self::eviction_task::EvictionTaskTimelineState;
|
||||
use self::layer_manager::LayerManager;
|
||||
@@ -86,7 +88,6 @@ use self::logical_size::LogicalSize;
|
||||
use self::walreceiver::{WalReceiver, WalReceiverConf};
|
||||
|
||||
use super::config::TenantConf;
|
||||
use super::delete::DeleteTimelineFlow;
|
||||
use super::remote_timeline_client::index::IndexPart;
|
||||
use super::remote_timeline_client::RemoteTimelineClient;
|
||||
use super::storage_layer::{
|
||||
@@ -1599,7 +1600,7 @@ impl Timeline {
|
||||
if let Some(imgfilename) = ImageFileName::parse_str(&fname) {
|
||||
// create an ImageLayer struct for each image file.
|
||||
if imgfilename.lsn > disk_consistent_lsn {
|
||||
warn!(
|
||||
info!(
|
||||
"found future image layer {} on timeline {} disk_consistent_lsn is {}",
|
||||
imgfilename, self.timeline_id, disk_consistent_lsn
|
||||
);
|
||||
@@ -1631,7 +1632,7 @@ impl Timeline {
|
||||
// is 102, then it might not have been fully flushed to disk
|
||||
// before crash.
|
||||
if deltafilename.lsn_range.end > disk_consistent_lsn + 1 {
|
||||
warn!(
|
||||
info!(
|
||||
"found future delta layer {} on timeline {} disk_consistent_lsn is {}",
|
||||
deltafilename, self.timeline_id, disk_consistent_lsn
|
||||
);
|
||||
@@ -1773,7 +1774,7 @@ impl Timeline {
|
||||
match remote_layer_name {
|
||||
LayerFileName::Image(imgfilename) => {
|
||||
if imgfilename.lsn > up_to_date_disk_consistent_lsn {
|
||||
warn!(
|
||||
info!(
|
||||
"found future image layer {} on timeline {} remote_consistent_lsn is {}",
|
||||
imgfilename, self.timeline_id, up_to_date_disk_consistent_lsn
|
||||
);
|
||||
@@ -1798,7 +1799,7 @@ impl Timeline {
|
||||
// is 102, then it might not have been fully flushed to disk
|
||||
// before crash.
|
||||
if deltafilename.lsn_range.end > up_to_date_disk_consistent_lsn + 1 {
|
||||
warn!(
|
||||
info!(
|
||||
"found future delta layer {} on timeline {} remote_consistent_lsn is {}",
|
||||
deltafilename, self.timeline_id, up_to_date_disk_consistent_lsn
|
||||
);
|
||||
|
||||
@@ -15,15 +15,17 @@ use utils::{
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
task_mgr::{self, TaskKind},
|
||||
tenant::{remote_timeline_client, DeleteTimelineError},
|
||||
tenant::{
|
||||
metadata::TimelineMetadata,
|
||||
remote_timeline_client::{
|
||||
self, PersistIndexPartWithDeletedFlagError, RemoteTimelineClient,
|
||||
},
|
||||
CreateTimelineCause, DeleteTimelineError, Tenant,
|
||||
},
|
||||
InitializationOrder,
|
||||
};
|
||||
|
||||
use super::{
|
||||
metadata::TimelineMetadata,
|
||||
remote_timeline_client::{PersistIndexPartWithDeletedFlagError, RemoteTimelineClient},
|
||||
CreateTimelineCause, Tenant, Timeline,
|
||||
};
|
||||
use super::Timeline;
|
||||
|
||||
/// Now that the Timeline is in Stopping state, request all the related tasks to shut down.
|
||||
async fn stop_tasks(timeline: &Timeline) -> Result<(), DeleteTimelineError> {
|
||||
@@ -53,6 +53,12 @@ pub enum BackendType<'a, T> {
|
||||
Postgres(Cow<'a, console::provider::mock::Api>, T),
|
||||
/// Authentication via a web browser.
|
||||
Link(Cow<'a, url::ApiUrl>),
|
||||
/// Test backend.
|
||||
Test(&'a dyn TestBackend),
|
||||
}
|
||||
|
||||
pub trait TestBackend: Send + Sync + 'static {
|
||||
fn wake_compute(&self) -> Result<CachedNodeInfo, console::errors::WakeComputeError>;
|
||||
}
|
||||
|
||||
impl std::fmt::Display for BackendType<'_, ()> {
|
||||
@@ -62,6 +68,7 @@ impl std::fmt::Display for BackendType<'_, ()> {
|
||||
Console(endpoint, _) => fmt.debug_tuple("Console").field(&endpoint.url()).finish(),
|
||||
Postgres(endpoint, _) => fmt.debug_tuple("Postgres").field(&endpoint.url()).finish(),
|
||||
Link(url) => fmt.debug_tuple("Link").field(&url.as_str()).finish(),
|
||||
Test(_) => fmt.debug_tuple("Test").finish(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -75,6 +82,7 @@ impl<T> BackendType<'_, T> {
|
||||
Console(c, x) => Console(Cow::Borrowed(c), x),
|
||||
Postgres(c, x) => Postgres(Cow::Borrowed(c), x),
|
||||
Link(c) => Link(Cow::Borrowed(c)),
|
||||
Test(x) => Test(*x),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -89,6 +97,7 @@ impl<'a, T> BackendType<'a, T> {
|
||||
Console(c, x) => Console(c, f(x)),
|
||||
Postgres(c, x) => Postgres(c, f(x)),
|
||||
Link(c) => Link(c),
|
||||
Test(x) => Test(x),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -102,6 +111,7 @@ impl<'a, T, E> BackendType<'a, Result<T, E>> {
|
||||
Console(c, x) => x.map(|x| Console(c, x)),
|
||||
Postgres(c, x) => x.map(|x| Postgres(c, x)),
|
||||
Link(c) => Ok(Link(c)),
|
||||
Test(x) => Ok(Test(x)),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -147,6 +157,7 @@ impl BackendType<'_, ClientCredentials<'_>> {
|
||||
Console(_, creds) => creds.project.clone(),
|
||||
Postgres(_, creds) => creds.project.clone(),
|
||||
Link(_) => Some("link".to_owned()),
|
||||
Test(_) => Some("test".to_owned()),
|
||||
}
|
||||
}
|
||||
/// Authenticate the client via the requested backend, possibly using credentials.
|
||||
@@ -188,6 +199,9 @@ impl BackendType<'_, ClientCredentials<'_>> {
|
||||
.await?
|
||||
.map(CachedNodeInfo::new_uncached)
|
||||
}
|
||||
Test(_) => {
|
||||
unreachable!("this function should never be called in the test backend")
|
||||
}
|
||||
};
|
||||
|
||||
info!("user successfully authenticated");
|
||||
@@ -206,6 +220,7 @@ impl BackendType<'_, ClientCredentials<'_>> {
|
||||
Console(api, creds) => api.wake_compute(extra, creds).map_ok(Some).await,
|
||||
Postgres(api, creds) => api.wake_compute(extra, creds).map_ok(Some).await,
|
||||
Link(_) => Ok(None),
|
||||
Test(x) => x.wake_compute().map(Some),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ use crate::{
|
||||
auth::{self, AuthFlow, ClientCredentials},
|
||||
compute,
|
||||
console::{self, AuthInfo, CachedNodeInfo, ConsoleReqExtra},
|
||||
proxy::{try_wake, NUM_RETRIES_CONNECT},
|
||||
proxy::handle_try_wake,
|
||||
sasl, scram,
|
||||
stream::PqStream,
|
||||
};
|
||||
@@ -51,14 +51,15 @@ pub(super) async fn authenticate(
|
||||
}
|
||||
};
|
||||
|
||||
info!("compute node's state has likely changed; requesting a wake-up");
|
||||
let mut num_retries = 0;
|
||||
let mut node = loop {
|
||||
num_retries += 1;
|
||||
match try_wake(api, extra, creds).await? {
|
||||
let wake_res = api.wake_compute(extra, creds).await;
|
||||
match handle_try_wake(wake_res, num_retries)? {
|
||||
ControlFlow::Continue(_) => num_retries += 1,
|
||||
ControlFlow::Break(n) => break n,
|
||||
ControlFlow::Continue(_) if num_retries < NUM_RETRIES_CONNECT => continue,
|
||||
ControlFlow::Continue(e) => return Err(e.into()),
|
||||
}
|
||||
info!(num_retries, "retrying wake compute");
|
||||
};
|
||||
if let Some(keys) = scram_keys {
|
||||
use tokio_postgres::config::AuthKeys;
|
||||
|
||||
@@ -6,7 +6,7 @@ use crate::{
|
||||
cancellation::{self, CancelMap},
|
||||
compute::{self, PostgresConnection},
|
||||
config::{ProxyConfig, TlsConfig},
|
||||
console::{self, errors::WakeComputeError, messages::MetricsAuxInfo},
|
||||
console::{self, errors::WakeComputeError, messages::MetricsAuxInfo, Api},
|
||||
stream::{PqStream, Stream},
|
||||
};
|
||||
use anyhow::{bail, Context};
|
||||
@@ -347,11 +347,6 @@ async fn connect_to_compute_once(
|
||||
.await
|
||||
}
|
||||
|
||||
enum ConnectionState<E> {
|
||||
Cached(console::CachedNodeInfo),
|
||||
Invalid(compute::ConnCfg, E),
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait ConnectMechanism {
|
||||
type Connection;
|
||||
@@ -407,70 +402,67 @@ where
|
||||
|
||||
mechanism.update_connect_config(&mut node_info.config);
|
||||
|
||||
let mut num_retries = 0;
|
||||
let mut state = ConnectionState::<M::ConnectError>::Cached(node_info);
|
||||
// try once
|
||||
let (config, err) = match mechanism.connect_once(&node_info, CONNECT_TIMEOUT).await {
|
||||
Ok(res) => return Ok(res),
|
||||
Err(e) => {
|
||||
error!(error = ?e, "could not connect to compute node");
|
||||
(invalidate_cache(node_info), e)
|
||||
}
|
||||
};
|
||||
|
||||
loop {
|
||||
match state {
|
||||
ConnectionState::Invalid(config, err) => {
|
||||
let wake_res = match creds {
|
||||
auth::BackendType::Console(api, creds) => {
|
||||
try_wake(api.as_ref(), extra, creds).await
|
||||
}
|
||||
auth::BackendType::Postgres(api, creds) => {
|
||||
try_wake(api.as_ref(), extra, creds).await
|
||||
}
|
||||
// nothing to do?
|
||||
auth::BackendType::Link(_) => return Err(err.into()),
|
||||
};
|
||||
let mut num_retries = 1;
|
||||
|
||||
match wake_res {
|
||||
// there was an error communicating with the control plane
|
||||
Err(e) => return Err(e.into()),
|
||||
// failed to wake up but we can continue to retry
|
||||
Ok(ControlFlow::Continue(_)) => {
|
||||
state = ConnectionState::Invalid(config, err);
|
||||
let wait_duration = retry_after(num_retries);
|
||||
num_retries += 1;
|
||||
// if we failed to connect, it's likely that the compute node was suspended, wake a new compute node
|
||||
info!("compute node's state has likely changed; requesting a wake-up");
|
||||
let node_info = loop {
|
||||
let wake_res = match creds {
|
||||
auth::BackendType::Console(api, creds) => api.wake_compute(extra, creds).await,
|
||||
auth::BackendType::Postgres(api, creds) => api.wake_compute(extra, creds).await,
|
||||
// nothing to do?
|
||||
auth::BackendType::Link(_) => return Err(err.into()),
|
||||
// test backend
|
||||
auth::BackendType::Test(x) => x.wake_compute(),
|
||||
};
|
||||
|
||||
info!(num_retries, "retrying wake compute");
|
||||
time::sleep(wait_duration).await;
|
||||
continue;
|
||||
}
|
||||
// successfully woke up a compute node and can break the wakeup loop
|
||||
Ok(ControlFlow::Break(mut node_info)) => {
|
||||
node_info.config.reuse_password(&config);
|
||||
mechanism.update_connect_config(&mut node_info.config);
|
||||
state = ConnectionState::Cached(node_info)
|
||||
}
|
||||
}
|
||||
match handle_try_wake(wake_res, num_retries)? {
|
||||
// failed to wake up but we can continue to retry
|
||||
ControlFlow::Continue(_) => {}
|
||||
// successfully woke up a compute node and can break the wakeup loop
|
||||
ControlFlow::Break(mut node_info) => {
|
||||
node_info.config.reuse_password(&config);
|
||||
mechanism.update_connect_config(&mut node_info.config);
|
||||
break node_info;
|
||||
}
|
||||
ConnectionState::Cached(node_info) => {
|
||||
match mechanism.connect_once(&node_info, CONNECT_TIMEOUT).await {
|
||||
Ok(res) => return Ok(res),
|
||||
Err(e) => {
|
||||
error!(error = ?e, "could not connect to compute node");
|
||||
if !e.should_retry(num_retries) {
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
|
||||
// after the first connect failure,
|
||||
// we should invalidate the cache and wake up a new compute node
|
||||
if num_retries == 0 {
|
||||
state = ConnectionState::Invalid(invalidate_cache(node_info), e);
|
||||
} else {
|
||||
state = ConnectionState::Cached(node_info);
|
||||
}
|
||||
let wait_duration = retry_after(num_retries);
|
||||
num_retries += 1;
|
||||
|
||||
let wait_duration = retry_after(num_retries);
|
||||
num_retries += 1;
|
||||
time::sleep(wait_duration).await;
|
||||
info!(num_retries, "retrying wake compute");
|
||||
};
|
||||
|
||||
info!(num_retries, "retrying wake compute");
|
||||
time::sleep(wait_duration).await;
|
||||
}
|
||||
// now that we have a new node, try connect to it repeatedly.
|
||||
// this can error for a few reasons, for instance:
|
||||
// * DNS connection settings haven't quite propagated yet
|
||||
info!("wake_compute success. attempting to connect");
|
||||
loop {
|
||||
match mechanism.connect_once(&node_info, CONNECT_TIMEOUT).await {
|
||||
Ok(res) => return Ok(res),
|
||||
Err(e) => {
|
||||
error!(error = ?e, "could not connect to compute node");
|
||||
if !e.should_retry(num_retries) {
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let wait_duration = retry_after(num_retries);
|
||||
num_retries += 1;
|
||||
|
||||
time::sleep(wait_duration).await;
|
||||
info!(num_retries, "retrying connect_once");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -478,15 +470,15 @@ where
|
||||
/// * Returns Ok(Continue(e)) if there was an error waking but retries are acceptable
|
||||
/// * Returns Ok(Break(node)) if the wakeup succeeded
|
||||
/// * Returns Err(e) if there was an error
|
||||
pub async fn try_wake(
|
||||
api: &impl console::Api,
|
||||
extra: &console::ConsoleReqExtra<'_>,
|
||||
creds: &auth::ClientCredentials<'_>,
|
||||
pub fn handle_try_wake(
|
||||
result: Result<console::CachedNodeInfo, WakeComputeError>,
|
||||
num_retries: u32,
|
||||
) -> Result<ControlFlow<console::CachedNodeInfo, WakeComputeError>, WakeComputeError> {
|
||||
info!("compute node's state has likely changed; requesting a wake-up");
|
||||
match api.wake_compute(extra, creds).await {
|
||||
match result {
|
||||
Err(err) => match &err {
|
||||
WakeComputeError::ApiError(api) if api.could_retry() => Ok(ControlFlow::Continue(err)),
|
||||
WakeComputeError::ApiError(api) if api.should_retry(num_retries) => {
|
||||
Ok(ControlFlow::Continue(err))
|
||||
}
|
||||
_ => Err(err),
|
||||
},
|
||||
// Ready to try again.
|
||||
@@ -498,8 +490,6 @@ pub trait ShouldRetry {
|
||||
fn could_retry(&self) -> bool;
|
||||
fn should_retry(&self, num_retries: u32) -> bool {
|
||||
match self {
|
||||
// retry all errors at least once
|
||||
_ if num_retries == 0 => true,
|
||||
_ if num_retries >= NUM_RETRIES_CONNECT => false,
|
||||
err => err.could_retry(),
|
||||
}
|
||||
@@ -551,14 +541,9 @@ impl ShouldRetry for compute::ConnectionError {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn retry_after(num_retries: u32) -> time::Duration {
|
||||
match num_retries {
|
||||
0 => time::Duration::ZERO,
|
||||
_ => {
|
||||
// 3/2 = 1.5 which seems to be an ok growth factor heuristic
|
||||
BASE_RETRY_WAIT_DURATION * 3_u32.pow(num_retries) / 2_u32.pow(num_retries)
|
||||
}
|
||||
}
|
||||
fn retry_after(num_retries: u32) -> time::Duration {
|
||||
// 1.5 seems to be an ok growth factor heuristic
|
||||
BASE_RETRY_WAIT_DURATION.mul_f64(1.5_f64.powi(num_retries as i32))
|
||||
}
|
||||
|
||||
/// Finish client connection initialization: confirm auth success, send params, etc.
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
//! A group of high-level tests for connection establishing logic and auth.
|
||||
use std::borrow::Cow;
|
||||
|
||||
//!
|
||||
use super::*;
|
||||
use crate::auth::backend::TestBackend;
|
||||
use crate::auth::ClientCredentials;
|
||||
use crate::console::{CachedNodeInfo, NodeInfo};
|
||||
use crate::{auth, sasl, scram};
|
||||
use crate::{auth, http, sasl, scram};
|
||||
use async_trait::async_trait;
|
||||
use rstest::rstest;
|
||||
use tokio_postgres::config::SslMode;
|
||||
@@ -302,15 +302,18 @@ async fn scram_auth_mock() -> anyhow::Result<()> {
|
||||
#[test]
|
||||
fn connect_compute_total_wait() {
|
||||
let mut total_wait = tokio::time::Duration::ZERO;
|
||||
for num_retries in 0..10 {
|
||||
for num_retries in 1..10 {
|
||||
total_wait += retry_after(num_retries);
|
||||
}
|
||||
assert!(total_wait < tokio::time::Duration::from_secs(12));
|
||||
assert!(total_wait > tokio::time::Duration::from_secs(10));
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
enum ConnectAction {
|
||||
Wake,
|
||||
WakeFail,
|
||||
WakeRetry,
|
||||
Connect,
|
||||
Retry,
|
||||
Fail,
|
||||
@@ -321,6 +324,17 @@ struct TestConnectMechanism {
|
||||
sequence: Vec<ConnectAction>,
|
||||
}
|
||||
|
||||
impl TestConnectMechanism {
|
||||
fn verify(&self) {
|
||||
let counter = self.counter.lock().unwrap();
|
||||
assert_eq!(
|
||||
*counter,
|
||||
self.sequence.len(),
|
||||
"sequence does not proceed to the end"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
impl TestConnectMechanism {
|
||||
fn new(sequence: Vec<ConnectAction>) -> Self {
|
||||
Self {
|
||||
@@ -370,30 +384,63 @@ impl ConnectMechanism for TestConnectMechanism {
|
||||
ConnectAction::Connect => Ok(TestConnection),
|
||||
ConnectAction::Retry => Err(TestConnectError { retryable: true }),
|
||||
ConnectAction::Fail => Err(TestConnectError { retryable: false }),
|
||||
x => panic!("expecting action {:?}, connect is called instead", x),
|
||||
}
|
||||
}
|
||||
|
||||
fn update_connect_config(&self, _conf: &mut compute::ConnCfg) {}
|
||||
}
|
||||
|
||||
fn helper_create_connect_info() -> (
|
||||
CachedNodeInfo,
|
||||
console::ConsoleReqExtra<'static>,
|
||||
auth::BackendType<'static, ClientCredentials<'static>>,
|
||||
) {
|
||||
impl TestBackend for TestConnectMechanism {
|
||||
fn wake_compute(&self) -> Result<CachedNodeInfo, console::errors::WakeComputeError> {
|
||||
let mut counter = self.counter.lock().unwrap();
|
||||
let action = self.sequence[*counter];
|
||||
*counter += 1;
|
||||
match action {
|
||||
ConnectAction::Wake => Ok(helper_create_cached_node_info()),
|
||||
ConnectAction::WakeFail => {
|
||||
let err = console::errors::ApiError::Console {
|
||||
status: http::StatusCode::FORBIDDEN,
|
||||
text: "TEST".into(),
|
||||
};
|
||||
assert!(!err.could_retry());
|
||||
Err(console::errors::WakeComputeError::ApiError(err))
|
||||
}
|
||||
ConnectAction::WakeRetry => {
|
||||
let err = console::errors::ApiError::Console {
|
||||
status: http::StatusCode::INTERNAL_SERVER_ERROR,
|
||||
text: "TEST".into(),
|
||||
};
|
||||
assert!(err.could_retry());
|
||||
Err(console::errors::WakeComputeError::ApiError(err))
|
||||
}
|
||||
x => panic!("expecting action {:?}, wake_compute is called instead", x),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn helper_create_cached_node_info() -> CachedNodeInfo {
|
||||
let node = NodeInfo {
|
||||
config: compute::ConnCfg::new(),
|
||||
aux: Default::default(),
|
||||
allow_self_signed_compute: false,
|
||||
};
|
||||
let cache = CachedNodeInfo::new_uncached(node);
|
||||
CachedNodeInfo::new_uncached(node)
|
||||
}
|
||||
|
||||
fn helper_create_connect_info(
|
||||
mechanism: &TestConnectMechanism,
|
||||
) -> (
|
||||
CachedNodeInfo,
|
||||
console::ConsoleReqExtra<'static>,
|
||||
auth::BackendType<'_, ClientCredentials<'static>>,
|
||||
) {
|
||||
let cache = helper_create_cached_node_info();
|
||||
let extra = console::ConsoleReqExtra {
|
||||
session_id: uuid::Uuid::new_v4(),
|
||||
application_name: Some("TEST"),
|
||||
};
|
||||
let url = "https://TEST_URL".parse().unwrap();
|
||||
let api = console::provider::mock::Api::new(url);
|
||||
let creds = auth::BackendType::Postgres(Cow::Owned(api), ClientCredentials::new_noop());
|
||||
let creds = auth::BackendType::Test(mechanism);
|
||||
(cache, extra, creds)
|
||||
}
|
||||
|
||||
@@ -401,42 +448,46 @@ fn helper_create_connect_info() -> (
|
||||
async fn connect_to_compute_success() {
|
||||
use ConnectAction::*;
|
||||
let mechanism = TestConnectMechanism::new(vec![Connect]);
|
||||
let (cache, extra, creds) = helper_create_connect_info();
|
||||
let (cache, extra, creds) = helper_create_connect_info(&mechanism);
|
||||
connect_to_compute(&mechanism, cache, &extra, &creds)
|
||||
.await
|
||||
.unwrap();
|
||||
mechanism.verify();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn connect_to_compute_retry() {
|
||||
use ConnectAction::*;
|
||||
let mechanism = TestConnectMechanism::new(vec![Retry, Retry, Connect]);
|
||||
let (cache, extra, creds) = helper_create_connect_info();
|
||||
let mechanism = TestConnectMechanism::new(vec![Retry, Wake, Retry, Connect]);
|
||||
let (cache, extra, creds) = helper_create_connect_info(&mechanism);
|
||||
connect_to_compute(&mechanism, cache, &extra, &creds)
|
||||
.await
|
||||
.unwrap();
|
||||
mechanism.verify();
|
||||
}
|
||||
|
||||
/// Test that we don't retry if the error is not retryable.
|
||||
#[tokio::test]
|
||||
async fn connect_to_compute_non_retry_1() {
|
||||
use ConnectAction::*;
|
||||
let mechanism = TestConnectMechanism::new(vec![Retry, Retry, Fail]);
|
||||
let (cache, extra, creds) = helper_create_connect_info();
|
||||
let mechanism = TestConnectMechanism::new(vec![Retry, Wake, Retry, Fail]);
|
||||
let (cache, extra, creds) = helper_create_connect_info(&mechanism);
|
||||
connect_to_compute(&mechanism, cache, &extra, &creds)
|
||||
.await
|
||||
.unwrap_err();
|
||||
mechanism.verify();
|
||||
}
|
||||
|
||||
/// Even for non-retryable errors, we should retry at least once.
|
||||
#[tokio::test]
|
||||
async fn connect_to_compute_non_retry_2() {
|
||||
use ConnectAction::*;
|
||||
let mechanism = TestConnectMechanism::new(vec![Fail, Retry, Connect]);
|
||||
let (cache, extra, creds) = helper_create_connect_info();
|
||||
let mechanism = TestConnectMechanism::new(vec![Fail, Wake, Retry, Connect]);
|
||||
let (cache, extra, creds) = helper_create_connect_info(&mechanism);
|
||||
connect_to_compute(&mechanism, cache, &extra, &creds)
|
||||
.await
|
||||
.unwrap();
|
||||
mechanism.verify();
|
||||
}
|
||||
|
||||
/// Retry for at most `NUM_RETRIES_CONNECT` times.
|
||||
@@ -445,11 +496,36 @@ async fn connect_to_compute_non_retry_3() {
|
||||
assert_eq!(NUM_RETRIES_CONNECT, 10);
|
||||
use ConnectAction::*;
|
||||
let mechanism = TestConnectMechanism::new(vec![
|
||||
Retry, Retry, Retry, Retry, Retry, Retry, Retry, Retry, Retry, Retry,
|
||||
Retry, Wake, Retry, Retry, Retry, Retry, Retry, Retry, Retry, Retry, Retry,
|
||||
/* the 11th time */ Retry,
|
||||
]);
|
||||
let (cache, extra, creds) = helper_create_connect_info();
|
||||
let (cache, extra, creds) = helper_create_connect_info(&mechanism);
|
||||
connect_to_compute(&mechanism, cache, &extra, &creds)
|
||||
.await
|
||||
.unwrap_err();
|
||||
mechanism.verify();
|
||||
}
|
||||
|
||||
/// Should retry wake compute.
|
||||
#[tokio::test]
|
||||
async fn wake_retry() {
|
||||
use ConnectAction::*;
|
||||
let mechanism = TestConnectMechanism::new(vec![Retry, WakeRetry, Wake, Connect]);
|
||||
let (cache, extra, creds) = helper_create_connect_info(&mechanism);
|
||||
connect_to_compute(&mechanism, cache, &extra, &creds)
|
||||
.await
|
||||
.unwrap();
|
||||
mechanism.verify();
|
||||
}
|
||||
|
||||
/// Wake failed with a non-retryable error.
|
||||
#[tokio::test]
|
||||
async fn wake_non_retry() {
|
||||
use ConnectAction::*;
|
||||
let mechanism = TestConnectMechanism::new(vec![Retry, WakeFail]);
|
||||
let (cache, extra, creds) = helper_create_connect_info(&mechanism);
|
||||
connect_to_compute(&mechanism, cache, &extra, &creds)
|
||||
.await
|
||||
.unwrap_err();
|
||||
mechanism.verify();
|
||||
}
|
||||
|
||||
@@ -234,7 +234,10 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
|
||||
listen_pg_addr_tenant_only
|
||||
);
|
||||
let listener = tcp_listener::bind(listen_pg_addr_tenant_only.clone()).map_err(|e| {
|
||||
error!("failed to bind to address {}: {}", conf.listen_pg_addr, e);
|
||||
error!(
|
||||
"failed to bind to address {}: {}",
|
||||
listen_pg_addr_tenant_only, e
|
||||
);
|
||||
e
|
||||
})?;
|
||||
Some(listener)
|
||||
|
||||
@@ -4,7 +4,6 @@
|
||||
# for all extensions in extensions subdir.
|
||||
import argparse
|
||||
import json
|
||||
import shutil
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
@@ -46,18 +45,11 @@ if __name__ == "__main__":
|
||||
BUILD_TAG = args.BUILD_TAG
|
||||
public_ext_list = args.public_extensions.split(",")
|
||||
|
||||
build_tags = {}
|
||||
with open("build_tags.txt", "r") as f:
|
||||
for line in f:
|
||||
ext, build_tag = line.strip().split(" ")
|
||||
build_tags[ext] = build_tag
|
||||
|
||||
ext_index = {}
|
||||
library_index = {}
|
||||
EXT_PATH = Path("extensions")
|
||||
for extension in EXT_PATH.iterdir():
|
||||
if extension.is_dir():
|
||||
build_tag = build_tags[extension.name]
|
||||
control_data = {}
|
||||
for control_file in extension.glob("*.control"):
|
||||
if control_file.suffix != ".control":
|
||||
@@ -66,13 +58,8 @@ if __name__ == "__main__":
|
||||
control_data[control_file.name] = f.read()
|
||||
ext_index[extension.name] = {
|
||||
"control_data": control_data,
|
||||
"archive_path": f"{build_tag}/{pg_version}/extensions/{extension.name}.tar.zst",
|
||||
"archive_path": f"{BUILD_TAG}/{pg_version}/extensions/{extension.name}.tar.zst",
|
||||
}
|
||||
# if we didn't build the extension for this build tag
|
||||
# then we don't need to re-upload it. so we delete it
|
||||
if build_tag != BUILD_TAG:
|
||||
shutil.rmtree(extension)
|
||||
|
||||
elif extension.suffix == ".zst":
|
||||
file_list = (
|
||||
str(subprocess.check_output(["tar", "tf", str(extension)]), "utf-8")
|
||||
|
||||
@@ -542,7 +542,7 @@ class S3Storage:
|
||||
access_key: str
|
||||
secret_key: str
|
||||
endpoint: Optional[str] = None
|
||||
prefix_in_bucket: Optional[str] = None
|
||||
prefix_in_bucket: Optional[str] = ""
|
||||
|
||||
def access_env_vars(self) -> Dict[str, str]:
|
||||
return {
|
||||
@@ -1504,6 +1504,7 @@ class NeonCli(AbstractNeonCli):
|
||||
safekeepers: Optional[List[int]] = None,
|
||||
tenant_id: Optional[TenantId] = None,
|
||||
lsn: Optional[Lsn] = None,
|
||||
branch_name: Optional[str] = None,
|
||||
) -> "subprocess.CompletedProcess[str]":
|
||||
args = [
|
||||
"endpoint",
|
||||
@@ -1517,8 +1518,11 @@ class NeonCli(AbstractNeonCli):
|
||||
args.append(f"--lsn={lsn}")
|
||||
args.extend(["--pg-port", str(pg_port)])
|
||||
args.extend(["--http-port", str(http_port)])
|
||||
|
||||
if safekeepers is not None:
|
||||
args.extend(["--safekeepers", (",".join(map(str, safekeepers)))])
|
||||
if branch_name is not None:
|
||||
args.extend(["--branch-name", branch_name])
|
||||
if endpoint_id is not None:
|
||||
args.append(endpoint_id)
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@ import shutil
|
||||
import subprocess
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional
|
||||
from typing import Any, List, Optional
|
||||
|
||||
import pytest
|
||||
import toml # TODO: replace with tomllib for Python >= 3.11
|
||||
@@ -14,7 +14,6 @@ from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
PortDistributor,
|
||||
parse_project_git_version_output,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.pageserver.utils import (
|
||||
@@ -63,7 +62,6 @@ def test_create_snapshot(
|
||||
neon_env_builder.pg_version = pg_version
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
neon_env_builder.enable_local_fs_remote_storage()
|
||||
neon_env_builder.preserve_database_files = True
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
@@ -259,36 +257,15 @@ def prepare_snapshot(
|
||||
shutil.rmtree(repo_dir / "pgdatadirs")
|
||||
os.mkdir(repo_dir / "endpoints")
|
||||
|
||||
# Remove wal-redo temp directory if it exists. Newer pageserver versions don't create
|
||||
# them anymore, but old versions did.
|
||||
for tenant in (repo_dir / "tenants").glob("*"):
|
||||
wal_redo_dir = tenant / "wal-redo-datadir.___temp"
|
||||
if wal_redo_dir.exists() and wal_redo_dir.is_dir():
|
||||
shutil.rmtree(wal_redo_dir)
|
||||
|
||||
# Update paths and ports in config files
|
||||
pageserver_toml = repo_dir / "pageserver.toml"
|
||||
pageserver_config = toml.load(pageserver_toml)
|
||||
pageserver_config["remote_storage"]["local_path"] = str(repo_dir / "local_fs_remote_storage")
|
||||
pageserver_config["listen_http_addr"] = port_distributor.replace_with_new_port(
|
||||
pageserver_config["listen_http_addr"]
|
||||
)
|
||||
pageserver_config["listen_pg_addr"] = port_distributor.replace_with_new_port(
|
||||
pageserver_config["listen_pg_addr"]
|
||||
)
|
||||
# since storage_broker these are overridden by neon_local during pageserver
|
||||
# start; remove both to prevent unknown options during etcd ->
|
||||
# storage_broker migration. TODO: remove once broker is released
|
||||
pageserver_config.pop("broker_endpoint", None)
|
||||
pageserver_config.pop("broker_endpoints", None)
|
||||
etcd_broker_endpoints = [f"http://localhost:{port_distributor.get_port()}/"]
|
||||
if get_neon_version(neon_binpath) == "49da498f651b9f3a53b56c7c0697636d880ddfe0":
|
||||
pageserver_config["broker_endpoints"] = etcd_broker_endpoints # old etcd version
|
||||
for param in ("listen_http_addr", "listen_pg_addr", "broker_endpoint"):
|
||||
pageserver_config[param] = port_distributor.replace_with_new_port(pageserver_config[param])
|
||||
|
||||
# Older pageserver versions had just one `auth_type` setting. Now there
|
||||
# are separate settings for pg and http ports. We don't use authentication
|
||||
# in compatibility tests so just remove authentication related settings.
|
||||
pageserver_config.pop("auth_type", None)
|
||||
# We don't use authentication in compatibility tests
|
||||
# so just remove authentication related settings.
|
||||
pageserver_config.pop("pg_auth_type", None)
|
||||
pageserver_config.pop("http_auth_type", None)
|
||||
|
||||
@@ -300,31 +277,16 @@ def prepare_snapshot(
|
||||
|
||||
snapshot_config_toml = repo_dir / "config"
|
||||
snapshot_config = toml.load(snapshot_config_toml)
|
||||
|
||||
# Provide up/downgrade etcd <-> storage_broker to make forward/backward
|
||||
# compatibility test happy. TODO: leave only the new part once broker is released.
|
||||
if get_neon_version(neon_binpath) == "49da498f651b9f3a53b56c7c0697636d880ddfe0":
|
||||
# old etcd version
|
||||
snapshot_config["etcd_broker"] = {
|
||||
"etcd_binary_path": shutil.which("etcd"),
|
||||
"broker_endpoints": etcd_broker_endpoints,
|
||||
}
|
||||
snapshot_config.pop("broker", None)
|
||||
else:
|
||||
# new storage_broker version
|
||||
broker_listen_addr = f"127.0.0.1:{port_distributor.get_port()}"
|
||||
snapshot_config["broker"] = {"listen_addr": broker_listen_addr}
|
||||
snapshot_config.pop("etcd_broker", None)
|
||||
|
||||
snapshot_config["pageserver"]["listen_http_addr"] = port_distributor.replace_with_new_port(
|
||||
snapshot_config["pageserver"]["listen_http_addr"]
|
||||
)
|
||||
snapshot_config["pageserver"]["listen_pg_addr"] = port_distributor.replace_with_new_port(
|
||||
snapshot_config["pageserver"]["listen_pg_addr"]
|
||||
for param in ("listen_http_addr", "listen_pg_addr"):
|
||||
snapshot_config["pageserver"][param] = port_distributor.replace_with_new_port(
|
||||
snapshot_config["pageserver"][param]
|
||||
)
|
||||
snapshot_config["broker"]["listen_addr"] = port_distributor.replace_with_new_port(
|
||||
snapshot_config["broker"]["listen_addr"]
|
||||
)
|
||||
for sk in snapshot_config["safekeepers"]:
|
||||
sk["http_port"] = port_distributor.replace_with_new_port(sk["http_port"])
|
||||
sk["pg_port"] = port_distributor.replace_with_new_port(sk["pg_port"])
|
||||
for param in ("http_port", "pg_port", "pg_tenant_only_port"):
|
||||
sk[param] = port_distributor.replace_with_new_port(sk[param])
|
||||
|
||||
if pg_distrib_dir:
|
||||
snapshot_config["pg_distrib_dir"] = str(pg_distrib_dir)
|
||||
@@ -350,12 +312,6 @@ def prepare_snapshot(
|
||||
), f"there're files referencing `test_create_snapshot/repo`, this path should be replaced with {repo_dir}:\n{rv.stdout}"
|
||||
|
||||
|
||||
# get git SHA of neon binary
|
||||
def get_neon_version(neon_binpath: Path):
|
||||
out = subprocess.check_output([neon_binpath / "neon_local", "--version"]).decode("utf-8")
|
||||
return parse_project_git_version_output(out)
|
||||
|
||||
|
||||
def check_neon_works(
|
||||
repo_dir: Path,
|
||||
neon_target_binpath: Path,
|
||||
@@ -381,7 +337,6 @@ def check_neon_works(
|
||||
config.pg_version = pg_version
|
||||
config.initial_tenant = snapshot_config["default_tenant_id"]
|
||||
config.pg_distrib_dir = pg_distrib_dir
|
||||
config.preserve_database_files = True
|
||||
|
||||
# Use the "target" binaries to launch the storage nodes
|
||||
config_target = config
|
||||
@@ -438,6 +393,14 @@ def check_neon_works(
|
||||
test_output_dir / "dump-from-wal.filediff",
|
||||
)
|
||||
|
||||
# TODO: Run pg_amcheck unconditionally after the next release
|
||||
try:
|
||||
pg_bin.run(["psql", connstr, "--command", "CREATE EXTENSION IF NOT EXISTS amcheck"])
|
||||
except subprocess.CalledProcessError:
|
||||
log.info("Extension amcheck is not available, skipping pg_amcheck")
|
||||
else:
|
||||
pg_bin.run_capture(["pg_amcheck", connstr, "--install-missing", "--verbose"])
|
||||
|
||||
# Check that we can interract with the data
|
||||
pg_bin.run_capture(["pgbench", "--time=10", "--progress=2", connstr])
|
||||
|
||||
@@ -445,10 +408,15 @@ def check_neon_works(
|
||||
assert not initial_dump_differs, "initial dump differs"
|
||||
|
||||
|
||||
def dump_differs(first: Path, second: Path, output: Path) -> bool:
|
||||
def dump_differs(
|
||||
first: Path, second: Path, output: Path, allowed_diffs: Optional[List[str]] = None
|
||||
) -> bool:
|
||||
"""
|
||||
Runs diff(1) command on two SQL dumps and write the output to the given output file.
|
||||
Returns True if the dumps differ, False otherwise.
|
||||
The function supports allowed diffs, if the diff is in the allowed_diffs list, it's not considered as a difference.
|
||||
See the example of it in https://github.com/neondatabase/neon/pull/4425/files#diff-15c5bfdd1d5cc1411b9221091511a60dd13a9edf672bdfbb57dd2ef8bb7815d6
|
||||
|
||||
Returns True if the dumps differ and produced diff is not allowed, False otherwise (in most cases we want it to return False).
|
||||
"""
|
||||
|
||||
with output.open("w") as stdout:
|
||||
@@ -466,51 +434,30 @@ def dump_differs(first: Path, second: Path, output: Path) -> bool:
|
||||
|
||||
differs = res.returncode != 0
|
||||
|
||||
# TODO: Remove after https://github.com/neondatabase/neon/pull/4425 is merged, and a couple of releases are made
|
||||
if differs:
|
||||
with tempfile.NamedTemporaryFile(mode="w") as tmp:
|
||||
tmp.write(PR4425_ALLOWED_DIFF)
|
||||
tmp.flush()
|
||||
allowed_diffs = allowed_diffs or []
|
||||
if differs and len(allowed_diffs) > 0:
|
||||
for allowed_diff in allowed_diffs:
|
||||
with tempfile.NamedTemporaryFile(mode="w") as tmp:
|
||||
tmp.write(allowed_diff)
|
||||
tmp.flush()
|
||||
|
||||
allowed = subprocess.run(
|
||||
[
|
||||
"diff",
|
||||
"--unified", # Make diff output more readable
|
||||
r"--ignore-matching-lines=^---", # Ignore diff headers
|
||||
r"--ignore-matching-lines=^\+\+\+", # Ignore diff headers
|
||||
"--ignore-matching-lines=^@@", # Ignore diff blocks location
|
||||
"--ignore-matching-lines=^ *$", # Ignore lines with only spaces
|
||||
"--ignore-matching-lines=^ --.*", # Ignore the " --" lines for compatibility with PG14
|
||||
"--ignore-blank-lines",
|
||||
str(output),
|
||||
str(tmp.name),
|
||||
],
|
||||
)
|
||||
allowed = subprocess.run(
|
||||
[
|
||||
"diff",
|
||||
"--unified", # Make diff output more readable
|
||||
r"--ignore-matching-lines=^---", # Ignore diff headers
|
||||
r"--ignore-matching-lines=^\+\+\+", # Ignore diff headers
|
||||
"--ignore-matching-lines=^@@", # Ignore diff blocks location
|
||||
"--ignore-matching-lines=^ *$", # Ignore lines with only spaces
|
||||
"--ignore-matching-lines=^ --.*", # Ignore SQL comments in diff
|
||||
"--ignore-blank-lines",
|
||||
str(output),
|
||||
str(tmp.name),
|
||||
],
|
||||
)
|
||||
|
||||
differs = allowed.returncode != 0
|
||||
differs = allowed.returncode != 0
|
||||
if not differs:
|
||||
break
|
||||
|
||||
return differs
|
||||
|
||||
|
||||
PR4425_ALLOWED_DIFF = """
|
||||
--- /tmp/test_output/test_backward_compatibility[release-pg15]/compatibility_snapshot/dump.sql 2023-06-08 18:12:45.000000000 +0000
|
||||
+++ /tmp/test_output/test_backward_compatibility[release-pg15]/dump.sql 2023-06-13 07:25:35.211733653 +0000
|
||||
@@ -13,12 +13,20 @@
|
||||
|
||||
CREATE ROLE cloud_admin;
|
||||
ALTER ROLE cloud_admin WITH SUPERUSER INHERIT CREATEROLE CREATEDB LOGIN REPLICATION BYPASSRLS;
|
||||
+CREATE ROLE neon_superuser;
|
||||
+ALTER ROLE neon_superuser WITH NOSUPERUSER INHERIT CREATEROLE CREATEDB NOLOGIN NOREPLICATION NOBYPASSRLS;
|
||||
|
||||
--
|
||||
-- User Configurations
|
||||
--
|
||||
|
||||
|
||||
+--
|
||||
+-- Role memberships
|
||||
+--
|
||||
+
|
||||
+GRANT pg_read_all_data TO neon_superuser GRANTED BY cloud_admin;
|
||||
+GRANT pg_write_all_data TO neon_superuser GRANTED BY cloud_admin;
|
||||
"""
|
||||
|
||||
@@ -14,10 +14,6 @@ from fixtures.neon_fixtures import NeonEnvBuilder, PgBin
|
||||
def test_gc_cutoff(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# These warnings are expected, when the pageserver is restarted abruptly
|
||||
env.pageserver.allowed_errors.append(".*found future image layer.*")
|
||||
env.pageserver.allowed_errors.append(".*found future delta layer.*")
|
||||
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
# Use aggressive GC and checkpoint settings, so that we also exercise GC during the test
|
||||
|
||||
@@ -16,11 +16,13 @@ def test_neon_cli_basics(neon_env_builder: NeonEnvBuilder, port_distributor: Por
|
||||
endpoint_id="ep-basic-main", pg_port=pg_port, http_port=http_port
|
||||
)
|
||||
|
||||
env.neon_cli.create_branch(new_branch_name="migration_check")
|
||||
branch_name = "migration-check"
|
||||
|
||||
env.neon_cli.create_branch(new_branch_name=branch_name)
|
||||
pg_port = port_distributor.get_port()
|
||||
http_port = port_distributor.get_port()
|
||||
env.neon_cli.endpoint_start(
|
||||
endpoint_id="ep-migration_check", pg_port=pg_port, http_port=http_port
|
||||
f"ep-{branch_name}", pg_port, http_port, branch_name=branch_name
|
||||
)
|
||||
finally:
|
||||
env.neon_cli.stop()
|
||||
|
||||
@@ -690,10 +690,6 @@ def test_ondemand_download_failure_to_replace(
|
||||
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
lsn = Lsn(pageserver_http.timeline_detail(tenant_id, timeline_id)["last_record_lsn"])
|
||||
|
||||
wait_for_upload(pageserver_http, tenant_id, timeline_id, lsn)
|
||||
|
||||
# remove layers so that they will be redownloaded
|
||||
pageserver_http.tenant_detach(tenant_id)
|
||||
pageserver_http.tenant_attach(tenant_id)
|
||||
@@ -704,8 +700,10 @@ def test_ondemand_download_failure_to_replace(
|
||||
# requesting details with non-incremental size should trigger a download of the only layer
|
||||
# this will need to be adjusted if an index for logical sizes is ever implemented
|
||||
with pytest.raises(PageserverApiException):
|
||||
# error message is not useful
|
||||
pageserver_http.timeline_detail(tenant_id, timeline_id, True, timeout=2)
|
||||
# PageserverApiException is expected because of the failpoint (timeline_detail building does something)
|
||||
# ReadTimeout can happen on our busy CI, but it should not, because there is no more busylooping
|
||||
# but should it be added back, we would wait for 15s here.
|
||||
pageserver_http.timeline_detail(tenant_id, timeline_id, True, timeout=15)
|
||||
|
||||
actual_message = ".* ERROR .*layermap-replace-notfound"
|
||||
assert env.pageserver.log_contains(actual_message) is not None
|
||||
|
||||
@@ -72,10 +72,6 @@ def test_pageserver_restart(neon_env_builder: NeonEnvBuilder):
|
||||
def test_pageserver_chaos(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# These warnings are expected, when the pageserver is restarted abruptly
|
||||
env.pageserver.allowed_errors.append(".*found future image layer.*")
|
||||
env.pageserver.allowed_errors.append(".*found future delta layer.*")
|
||||
|
||||
# Use a tiny checkpoint distance, to create a lot of layers quickly.
|
||||
# That allows us to stress the compaction and layer flushing logic more.
|
||||
tenant, _ = env.neon_cli.create_tenant(
|
||||
|
||||
@@ -15,10 +15,6 @@ def test_pageserver_recovery(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
env.pageserver.is_testing_enabled_or_skip()
|
||||
|
||||
# These warnings are expected, when the pageserver is restarted abruptly
|
||||
env.pageserver.allowed_errors.append(".*found future delta layer.*")
|
||||
env.pageserver.allowed_errors.append(".*found future image layer.*")
|
||||
|
||||
# Create a branch for us
|
||||
env.neon_cli.create_branch("test_pageserver_recovery", "main")
|
||||
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 12c5dc8281...ebedb34d01
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: e3fbfc4d14...1220c8a63f
4
vendor/revisions.json
vendored
4
vendor/revisions.json
vendored
@@ -1,4 +1,4 @@
|
||||
{
|
||||
"postgres-v15": "e3fbfc4d143b2d3c3c1813ce747f8af35aa9405e",
|
||||
"postgres-v14": "12c5dc8281d20b5bd636e1097eea80a7bc609591"
|
||||
"postgres-v15": "1220c8a63f00101829f9222a5821fc084b4384c7",
|
||||
"postgres-v14": "ebedb34d01c8ac9c31e8ea4628b9854103a1dc8f"
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user