mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-13 11:10:36 +00:00
Compare commits
10 Commits
release-67
...
skyzh/safe
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a6b6597d1b | ||
|
|
c07cea80bd | ||
|
|
a2e2362ee9 | ||
|
|
0a567acdb9 | ||
|
|
69ea2776e9 | ||
|
|
4dc9cb7cf9 | ||
|
|
7424e7269c | ||
|
|
5dc68e4e6a | ||
|
|
7cfd116856 | ||
|
|
d696c41807 |
2
.github/workflows/build_and_test.yml
vendored
2
.github/workflows/build_and_test.yml
vendored
@@ -773,7 +773,7 @@ jobs:
|
||||
matrix:
|
||||
version: [ v14, v15, v16, v17 ]
|
||||
env:
|
||||
VM_BUILDER_VERSION: v0.29.3
|
||||
VM_BUILDER_VERSION: v0.35.0
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
@@ -56,6 +56,7 @@ use utils::http::endpoint::request_span;
|
||||
use utils::http::request::must_parse_query_param;
|
||||
use utils::http::request::{get_request_param, must_get_query_param, parse_query_param};
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::deletion_queue::DeletionQueueClient;
|
||||
use crate::pgdatadir_mapping::LsnForTimestamp;
|
||||
@@ -80,7 +81,6 @@ use crate::tenant::timeline::CompactionError;
|
||||
use crate::tenant::timeline::Timeline;
|
||||
use crate::tenant::GetTimelineError;
|
||||
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError};
|
||||
use crate::{config::PageServerConf, tenant::mgr};
|
||||
use crate::{disk_usage_eviction_task, tenant};
|
||||
use pageserver_api::models::{
|
||||
StatusResponse, TenantConfigRequest, TenantInfo, TimelineCreateRequest, TimelineGcRequest,
|
||||
@@ -1719,8 +1719,13 @@ async fn timeline_gc_handler(
|
||||
|
||||
let gc_req: TimelineGcRequest = json_request(&mut request).await?;
|
||||
|
||||
let state = get_state(&request);
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let gc_result = mgr::immediate_gc(tenant_shard_id, timeline_id, gc_req, cancel, &ctx).await?;
|
||||
let gc_result = state
|
||||
.tenant_manager
|
||||
.immediate_gc(tenant_shard_id, timeline_id, gc_req, cancel, &ctx)
|
||||
.await?;
|
||||
|
||||
json_response(StatusCode::OK, gc_result)
|
||||
}
|
||||
|
||||
@@ -2197,6 +2197,82 @@ impl TenantManager {
|
||||
|
||||
Ok((wanted_bytes, shard_count as u32))
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id))]
|
||||
pub(crate) async fn immediate_gc(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
gc_req: TimelineGcRequest,
|
||||
cancel: CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<GcResult, ApiError> {
|
||||
let tenant = {
|
||||
let guard = self.tenants.read().unwrap();
|
||||
guard
|
||||
.get(&tenant_shard_id)
|
||||
.cloned()
|
||||
.with_context(|| format!("tenant {tenant_shard_id}"))
|
||||
.map_err(|e| ApiError::NotFound(e.into()))?
|
||||
};
|
||||
|
||||
let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
|
||||
// Use tenant's pitr setting
|
||||
let pitr = tenant.get_pitr_interval();
|
||||
|
||||
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
|
||||
|
||||
// Run in task_mgr to avoid race with tenant_detach operation
|
||||
let ctx: RequestContext =
|
||||
ctx.detached_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
|
||||
|
||||
let _gate_guard = tenant.gate.enter().map_err(|_| ApiError::ShuttingDown)?;
|
||||
|
||||
fail::fail_point!("immediate_gc_task_pre");
|
||||
|
||||
#[allow(unused_mut)]
|
||||
let mut result = tenant
|
||||
.gc_iteration(Some(timeline_id), gc_horizon, pitr, &cancel, &ctx)
|
||||
.await;
|
||||
// FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
|
||||
// better once the types support it.
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
{
|
||||
// we need to synchronize with drop completion for python tests without polling for
|
||||
// log messages
|
||||
if let Ok(result) = result.as_mut() {
|
||||
let mut js = tokio::task::JoinSet::new();
|
||||
for layer in std::mem::take(&mut result.doomed_layers) {
|
||||
js.spawn(layer.wait_drop());
|
||||
}
|
||||
tracing::info!(
|
||||
total = js.len(),
|
||||
"starting to wait for the gc'd layers to be dropped"
|
||||
);
|
||||
while let Some(res) = js.join_next().await {
|
||||
res.expect("wait_drop should not panic");
|
||||
}
|
||||
}
|
||||
|
||||
let timeline = tenant.get_timeline(timeline_id, false).ok();
|
||||
let rtc = timeline.as_ref().map(|x| &x.remote_client);
|
||||
|
||||
if let Some(rtc) = rtc {
|
||||
// layer drops schedule actions on remote timeline client to actually do the
|
||||
// deletions; don't care about the shutdown error, just exit fast
|
||||
drop(rtc.wait_completion().await);
|
||||
}
|
||||
}
|
||||
|
||||
result.map_err(|e| match e {
|
||||
GcError::TenantCancelled | GcError::TimelineCancelled => ApiError::ShuttingDown,
|
||||
GcError::TimelineNotFound => {
|
||||
ApiError::NotFound(anyhow::anyhow!("Timeline not found").into())
|
||||
}
|
||||
other => ApiError::InternalServerError(anyhow::anyhow!(other)),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
@@ -2341,7 +2417,7 @@ enum TenantSlotDropError {
|
||||
/// Errors that can happen any time we are walking the tenant map to try and acquire
|
||||
/// the TenantSlot for a particular tenant.
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum TenantMapError {
|
||||
pub(crate) enum TenantMapError {
|
||||
// Tried to read while initializing
|
||||
#[error("tenant map is still initializing")]
|
||||
StillInitializing,
|
||||
@@ -2371,7 +2447,7 @@ pub enum TenantMapError {
|
||||
/// The `old_value` may be dropped before the SlotGuard is dropped, by calling
|
||||
/// `drop_old_value`. It is an error to call this without shutting down
|
||||
/// the conents of `old_value`.
|
||||
pub struct SlotGuard {
|
||||
pub(crate) struct SlotGuard {
|
||||
tenant_shard_id: TenantShardId,
|
||||
old_value: Option<TenantSlot>,
|
||||
upserted: bool,
|
||||
@@ -2764,81 +2840,6 @@ use {
|
||||
utils::http::error::ApiError,
|
||||
};
|
||||
|
||||
#[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id))]
|
||||
pub(crate) async fn immediate_gc(
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
gc_req: TimelineGcRequest,
|
||||
cancel: CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<GcResult, ApiError> {
|
||||
let tenant = {
|
||||
let guard = TENANTS.read().unwrap();
|
||||
guard
|
||||
.get(&tenant_shard_id)
|
||||
.cloned()
|
||||
.with_context(|| format!("tenant {tenant_shard_id}"))
|
||||
.map_err(|e| ApiError::NotFound(e.into()))?
|
||||
};
|
||||
|
||||
let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
|
||||
// Use tenant's pitr setting
|
||||
let pitr = tenant.get_pitr_interval();
|
||||
|
||||
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
|
||||
|
||||
// Run in task_mgr to avoid race with tenant_detach operation
|
||||
let ctx: RequestContext =
|
||||
ctx.detached_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
|
||||
|
||||
let _gate_guard = tenant.gate.enter().map_err(|_| ApiError::ShuttingDown)?;
|
||||
|
||||
fail::fail_point!("immediate_gc_task_pre");
|
||||
|
||||
#[allow(unused_mut)]
|
||||
let mut result = tenant
|
||||
.gc_iteration(Some(timeline_id), gc_horizon, pitr, &cancel, &ctx)
|
||||
.await;
|
||||
// FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
|
||||
// better once the types support it.
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
{
|
||||
// we need to synchronize with drop completion for python tests without polling for
|
||||
// log messages
|
||||
if let Ok(result) = result.as_mut() {
|
||||
let mut js = tokio::task::JoinSet::new();
|
||||
for layer in std::mem::take(&mut result.doomed_layers) {
|
||||
js.spawn(layer.wait_drop());
|
||||
}
|
||||
tracing::info!(
|
||||
total = js.len(),
|
||||
"starting to wait for the gc'd layers to be dropped"
|
||||
);
|
||||
while let Some(res) = js.join_next().await {
|
||||
res.expect("wait_drop should not panic");
|
||||
}
|
||||
}
|
||||
|
||||
let timeline = tenant.get_timeline(timeline_id, false).ok();
|
||||
let rtc = timeline.as_ref().map(|x| &x.remote_client);
|
||||
|
||||
if let Some(rtc) = rtc {
|
||||
// layer drops schedule actions on remote timeline client to actually do the
|
||||
// deletions; don't care about the shutdown error, just exit fast
|
||||
drop(rtc.wait_completion().await);
|
||||
}
|
||||
}
|
||||
|
||||
result.map_err(|e| match e {
|
||||
GcError::TenantCancelled | GcError::TimelineCancelled => ApiError::ShuttingDown,
|
||||
GcError::TimelineNotFound => {
|
||||
ApiError::NotFound(anyhow::anyhow!("Timeline not found").into())
|
||||
}
|
||||
other => ApiError::InternalServerError(anyhow::anyhow!(other)),
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
# neon extension
|
||||
comment = 'cloud storage for PostgreSQL'
|
||||
# TODO: bump default version to 1.5, after we are certain that we don't
|
||||
# need to rollback the compute image
|
||||
default_version = '1.4'
|
||||
default_version = '1.5'
|
||||
module_pathname = '$libdir/neon'
|
||||
relocatable = true
|
||||
trusted = true
|
||||
|
||||
@@ -274,7 +274,7 @@ fn build_config(args: &LocalProxyCliArgs) -> anyhow::Result<&'static ProxyConfig
|
||||
rate_limit_ip_subnet: 64,
|
||||
ip_allowlist_check_enabled: true,
|
||||
},
|
||||
require_client_ip: false,
|
||||
proxy_protocol_v2: config::ProxyProtocolV2::Rejected,
|
||||
handshake_timeout: Duration::from_secs(10),
|
||||
region: "local".into(),
|
||||
wake_compute_retry_config: RetryConfig::parse(RetryConfig::WAKE_COMPUTE_DEFAULT_VALUES)?,
|
||||
|
||||
@@ -17,6 +17,7 @@ use proxy::config::AuthenticationConfig;
|
||||
use proxy::config::CacheOptions;
|
||||
use proxy::config::HttpConfig;
|
||||
use proxy::config::ProjectInfoCacheOptions;
|
||||
use proxy::config::ProxyProtocolV2;
|
||||
use proxy::console;
|
||||
use proxy::context::parquet::ParquetUploadArgs;
|
||||
use proxy::http;
|
||||
@@ -144,9 +145,6 @@ struct ProxyCliArgs {
|
||||
/// size of the threadpool for password hashing
|
||||
#[clap(long, default_value_t = 4)]
|
||||
scram_thread_pool_size: u8,
|
||||
/// Require that all incoming requests have a Proxy Protocol V2 packet **and** have an IP address associated.
|
||||
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
|
||||
require_client_ip: bool,
|
||||
/// Disable dynamic rate limiter and store the metrics to ensure its production behaviour.
|
||||
#[clap(long, default_value_t = true, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
|
||||
disable_dynamic_rate_limiter: bool,
|
||||
@@ -229,6 +227,11 @@ struct ProxyCliArgs {
|
||||
/// Configure if this is a private access proxy for the POC: In that case the proxy will ignore the IP allowlist
|
||||
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
|
||||
is_private_access_proxy: bool,
|
||||
|
||||
/// Configure whether all incoming requests have a Proxy Protocol V2 packet.
|
||||
// TODO(conradludgate): switch default to rejected or required once we've updated all deployments
|
||||
#[clap(value_enum, long, default_value_t = ProxyProtocolV2::Supported)]
|
||||
proxy_protocol_v2: ProxyProtocolV2,
|
||||
}
|
||||
|
||||
#[derive(clap::Args, Clone, Copy, Debug)]
|
||||
@@ -704,7 +707,7 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
|
||||
allow_self_signed_compute: args.allow_self_signed_compute,
|
||||
http_config,
|
||||
authentication_config,
|
||||
require_client_ip: args.require_client_ip,
|
||||
proxy_protocol_v2: args.proxy_protocol_v2,
|
||||
handshake_timeout: args.handshake_timeout,
|
||||
region: args.region.clone(),
|
||||
wake_compute_retry_config: config::RetryConfig::parse(&args.wake_compute_retry)?,
|
||||
|
||||
@@ -7,6 +7,7 @@ use crate::{
|
||||
Host,
|
||||
};
|
||||
use anyhow::{bail, ensure, Context, Ok};
|
||||
use clap::ValueEnum;
|
||||
use itertools::Itertools;
|
||||
use remote_storage::RemoteStorageConfig;
|
||||
use rustls::{
|
||||
@@ -30,7 +31,7 @@ pub struct ProxyConfig {
|
||||
pub allow_self_signed_compute: bool,
|
||||
pub http_config: HttpConfig,
|
||||
pub authentication_config: AuthenticationConfig,
|
||||
pub require_client_ip: bool,
|
||||
pub proxy_protocol_v2: ProxyProtocolV2,
|
||||
pub region: String,
|
||||
pub handshake_timeout: Duration,
|
||||
pub wake_compute_retry_config: RetryConfig,
|
||||
@@ -38,6 +39,16 @@ pub struct ProxyConfig {
|
||||
pub connect_to_compute_retry_config: RetryConfig,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, ValueEnum, PartialEq)]
|
||||
pub enum ProxyProtocolV2 {
|
||||
/// Connection will error if PROXY protocol v2 header is missing
|
||||
Required,
|
||||
/// Connection will parse PROXY protocol v2 header, but accept the connection if it's missing.
|
||||
Supported,
|
||||
/// Connection will error if PROXY protocol v2 header is provided
|
||||
Rejected,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct MetricCollectionConfig {
|
||||
pub endpoint: reqwest::Url,
|
||||
|
||||
@@ -10,6 +10,7 @@ pub(crate) mod wake_compute;
|
||||
pub use copy_bidirectional::copy_bidirectional_client_compute;
|
||||
pub use copy_bidirectional::ErrorSource;
|
||||
|
||||
use crate::config::ProxyProtocolV2;
|
||||
use crate::{
|
||||
auth,
|
||||
cancellation::{self, CancellationHandlerMain, CancellationHandlerMainInternal},
|
||||
@@ -93,15 +94,19 @@ pub async fn task_main(
|
||||
|
||||
connections.spawn(async move {
|
||||
let (socket, peer_addr) = match read_proxy_protocol(socket).await {
|
||||
Ok((socket, Some(addr))) => (socket, addr.ip()),
|
||||
Err(e) => {
|
||||
error!("per-client task finished with an error: {e:#}");
|
||||
return;
|
||||
}
|
||||
Ok((_socket, None)) if config.require_client_ip => {
|
||||
error!("missing required client IP");
|
||||
Ok((_socket, None)) if config.proxy_protocol_v2 == ProxyProtocolV2::Required => {
|
||||
error!("missing required proxy protocol header");
|
||||
return;
|
||||
}
|
||||
Ok((_socket, Some(_))) if config.proxy_protocol_v2 == ProxyProtocolV2::Rejected => {
|
||||
error!("proxy protocol header not supported");
|
||||
return;
|
||||
}
|
||||
Ok((socket, Some(addr))) => (socket, addr.ip()),
|
||||
Ok((socket, None)) => (socket, peer_addr.ip()),
|
||||
};
|
||||
|
||||
|
||||
@@ -1,13 +1,15 @@
|
||||
//! This module implements Timeline lifecycle management and has all necessary code
|
||||
//! to glue together SafeKeeper and all other background services.
|
||||
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use anyhow::{anyhow, bail, Context, Result};
|
||||
use camino::Utf8PathBuf;
|
||||
use remote_storage::RemotePath;
|
||||
use safekeeper_api::models::TimelineTermBumpResponse;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::fs::{self};
|
||||
use tokio::fs::{self, File};
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::crashsafe::{durable_rename, fsync_async_opt};
|
||||
use utils::id::TenantId;
|
||||
|
||||
use std::cmp::max;
|
||||
@@ -27,6 +29,8 @@ use utils::{
|
||||
use storage_broker::proto::SafekeeperTimelineInfo;
|
||||
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
|
||||
|
||||
use crate::control_file::{ CONTROL_FILE_NAME};
|
||||
use crate::pull_timeline::create_temp_timeline_dir;
|
||||
use crate::rate_limit::RateLimiter;
|
||||
use crate::receive_wal::WalReceivers;
|
||||
use crate::safekeeper::{
|
||||
@@ -614,23 +618,49 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
// Create timeline directory.
|
||||
fs::create_dir_all(&self.timeline_dir).await?;
|
||||
// Create a temporary timeline directory
|
||||
let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, self.ttid).await?;
|
||||
|
||||
// Init the control file
|
||||
let init_control_file = async {
|
||||
let guard = shared_state.sk.state_mut();
|
||||
let path = tli_dir_path.join(CONTROL_FILE_NAME);
|
||||
let buf = guard.write_to_buf()?;
|
||||
let mut control_file = File::create(&path)
|
||||
.await
|
||||
.with_context(|| format!("failed to create init control file at: {}", path))?;
|
||||
control_file.write_all(&buf).await.with_context(|| {
|
||||
format!(
|
||||
"failed to write safekeeper state into control file at: {}",
|
||||
path
|
||||
)
|
||||
})?;
|
||||
control_file.flush().await.with_context(|| {
|
||||
format!(
|
||||
"failed to flush safekeeper state into control file at: {}",
|
||||
path
|
||||
)
|
||||
})?;
|
||||
drop(control_file);
|
||||
Ok(())
|
||||
};
|
||||
|
||||
// Write timeline to disk and start background tasks.
|
||||
if let Err(e) = shared_state.sk.state_mut().flush().await {
|
||||
if let Err(e) = init_control_file.await {
|
||||
// Bootstrap failed, cancel timeline and remove timeline directory.
|
||||
self.cancel(shared_state);
|
||||
|
||||
if let Err(fs_err) = fs::remove_dir_all(&self.timeline_dir).await {
|
||||
warn!(
|
||||
"failed to remove timeline {} directory after bootstrap failure: {}",
|
||||
self.ttid, fs_err
|
||||
);
|
||||
}
|
||||
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
info!(
|
||||
"moving timeline {} from {} to {}",
|
||||
self.ttid, tli_dir_path, self.timeline_dir
|
||||
);
|
||||
tokio::fs::create_dir_all(&self.timeline_dir).await?;
|
||||
// fsync tenant dir creation
|
||||
fsync_async_opt(&conf.workdir, !conf.no_sync).await?;
|
||||
durable_rename(&tli_dir_path, &self.timeline_dir, !conf.no_sync).await?;
|
||||
|
||||
self.bootstrap(conf, broker_active_set, partial_backup_rate_limiter);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -950,9 +950,6 @@ class NeonEnv:
|
||||
|
||||
safekeepers - An array containing objects representing the safekeepers
|
||||
|
||||
pg_bin - pg_bin.run() can be used to execute Postgres client binaries,
|
||||
like psql or pg_dump
|
||||
|
||||
initial_tenant - tenant ID of the initial tenant created in the repository
|
||||
|
||||
neon_cli - can be used to run the 'neon' CLI tool
|
||||
@@ -3300,6 +3297,8 @@ class PgBin:
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def pg_bin(test_output_dir: Path, pg_distrib_dir: Path, pg_version: PgVersion) -> PgBin:
|
||||
"""pg_bin.run() can be used to execute Postgres client binaries, like psql or pg_dump"""
|
||||
|
||||
return PgBin(test_output_dir, pg_distrib_dir, pg_version)
|
||||
|
||||
|
||||
|
||||
@@ -53,7 +53,7 @@ def test_branch_and_gc(neon_simple_env: NeonEnv, build_type: str):
|
||||
env = neon_simple_env
|
||||
pageserver_http_client = env.pageserver.http_client()
|
||||
|
||||
tenant, _ = env.neon_cli.create_tenant(
|
||||
tenant, timeline_main = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
# disable background GC
|
||||
"gc_period": "0s",
|
||||
@@ -70,8 +70,7 @@ def test_branch_and_gc(neon_simple_env: NeonEnv, build_type: str):
|
||||
}
|
||||
)
|
||||
|
||||
timeline_main = env.neon_cli.create_timeline("test_main", tenant_id=tenant)
|
||||
endpoint_main = env.endpoints.create_start("test_main", tenant_id=tenant)
|
||||
endpoint_main = env.endpoints.create_start("main", tenant_id=tenant)
|
||||
|
||||
main_cur = endpoint_main.connect().cursor()
|
||||
|
||||
@@ -92,7 +91,7 @@ def test_branch_and_gc(neon_simple_env: NeonEnv, build_type: str):
|
||||
pageserver_http_client.timeline_gc(tenant, timeline_main, lsn2 - lsn1 + 1024)
|
||||
|
||||
env.neon_cli.create_branch(
|
||||
"test_branch", "test_main", tenant_id=tenant, ancestor_start_lsn=lsn1
|
||||
"test_branch", ancestor_branch_name="main", ancestor_start_lsn=lsn1, tenant_id=tenant
|
||||
)
|
||||
endpoint_branch = env.endpoints.create_start("test_branch", tenant_id=tenant)
|
||||
|
||||
|
||||
@@ -252,7 +252,7 @@ def test_forward_compatibility(
|
||||
# not using env.pageserver.version because it was initialized before
|
||||
prev_pageserver_version_str = env.get_binary_version("pageserver")
|
||||
prev_pageserver_version_match = re.search(
|
||||
"Neon page server git-env:(.*) failpoints: (.*), features: (.*)",
|
||||
"Neon page server git(?:-env)?:(.*) failpoints: (.*), features: (.*)",
|
||||
prev_pageserver_version_str,
|
||||
)
|
||||
if prev_pageserver_version_match is not None:
|
||||
@@ -263,12 +263,12 @@ def test_forward_compatibility(
|
||||
)
|
||||
|
||||
# does not include logs from previous runs
|
||||
assert not env.pageserver.log_contains("git-env:" + prev_pageserver_version)
|
||||
assert not env.pageserver.log_contains(f"git(-env)?:{prev_pageserver_version}")
|
||||
|
||||
env.start()
|
||||
|
||||
# ensure the specified pageserver is running
|
||||
assert env.pageserver.log_contains("git-env:" + prev_pageserver_version)
|
||||
assert env.pageserver.log_contains(f"git(-env)?:{prev_pageserver_version}")
|
||||
|
||||
check_neon_works(
|
||||
env,
|
||||
|
||||
@@ -31,9 +31,7 @@ def helper_compare_timeline_list(
|
||||
)
|
||||
)
|
||||
|
||||
timelines_cli = env.neon_cli.list_timelines()
|
||||
assert timelines_cli == env.neon_cli.list_timelines(initial_tenant)
|
||||
|
||||
timelines_cli = env.neon_cli.list_timelines(initial_tenant)
|
||||
cli_timeline_ids = sorted([timeline_id for (_, timeline_id) in timelines_cli])
|
||||
assert timelines_api == cli_timeline_ids
|
||||
|
||||
|
||||
@@ -24,7 +24,7 @@ def test_neon_extension(neon_env_builder: NeonEnvBuilder):
|
||||
# IMPORTANT:
|
||||
# If the version has changed, the test should be updated.
|
||||
# Ensure that the default version is also updated in the neon.control file
|
||||
assert cur.fetchone() == ("1.4",)
|
||||
assert cur.fetchone() == ("1.5",)
|
||||
cur.execute("SELECT * from neon.NEON_STAT_FILE_CACHE")
|
||||
res = cur.fetchall()
|
||||
log.info(res)
|
||||
@@ -48,7 +48,7 @@ def test_neon_extension_compatibility(neon_env_builder: NeonEnvBuilder):
|
||||
# IMPORTANT:
|
||||
# If the version has changed, the test should be updated.
|
||||
# Ensure that the default version is also updated in the neon.control file
|
||||
assert cur.fetchone() == ("1.4",)
|
||||
assert cur.fetchone() == ("1.5",)
|
||||
cur.execute("SELECT * from neon.NEON_STAT_FILE_CACHE")
|
||||
all_versions = ["1.5", "1.4", "1.3", "1.2", "1.1", "1.0"]
|
||||
current_version = "1.5"
|
||||
|
||||
@@ -174,8 +174,7 @@ def test_pageserver_chaos(
|
||||
"checkpoint_distance": "5000000",
|
||||
}
|
||||
)
|
||||
env.neon_cli.create_timeline("test_pageserver_chaos", tenant_id=tenant)
|
||||
endpoint = env.endpoints.create_start("test_pageserver_chaos", tenant_id=tenant)
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant)
|
||||
|
||||
# Create table, and insert some rows. Make it big enough that it doesn't fit in
|
||||
# shared_buffers, otherwise the SELECT after restart will just return answer
|
||||
|
||||
@@ -27,20 +27,15 @@ def test_empty_tenant_size(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
(tenant_id, _) = env.neon_cli.create_tenant()
|
||||
(tenant_id, timeline_id) = env.neon_cli.create_tenant()
|
||||
http_client = env.pageserver.http_client()
|
||||
initial_size = http_client.tenant_size(tenant_id)
|
||||
|
||||
# we should never have zero, because there should be the initdb "changes"
|
||||
assert initial_size > 0, "initial implementation returns ~initdb tenant_size"
|
||||
|
||||
main_branch_name = "main"
|
||||
|
||||
branch_name, main_timeline_id = env.neon_cli.list_timelines(tenant_id)[0]
|
||||
assert branch_name == main_branch_name
|
||||
|
||||
endpoint = env.endpoints.create_start(
|
||||
main_branch_name,
|
||||
"main",
|
||||
tenant_id=tenant_id,
|
||||
config_lines=["autovacuum=off", "checkpoint_timeout=10min"],
|
||||
)
|
||||
@@ -54,7 +49,7 @@ def test_empty_tenant_size(neon_env_builder: NeonEnvBuilder):
|
||||
# The transaction above will make the compute generate a checkpoint.
|
||||
# In turn, the pageserver persists the checkpoint. This should only be
|
||||
# one key with a size of a couple hundred bytes.
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, main_timeline_id)
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
size = http_client.tenant_size(tenant_id)
|
||||
|
||||
assert size >= initial_size and size - initial_size < 1024
|
||||
@@ -306,7 +301,8 @@ def test_single_branch_get_tenant_size_grows(
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=tenant_config)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
branch_name, timeline_id = env.neon_cli.list_timelines(tenant_id)[0]
|
||||
timeline_id = env.initial_timeline
|
||||
branch_name = "main"
|
||||
|
||||
http_client = env.pageserver.http_client()
|
||||
|
||||
@@ -516,7 +512,8 @@ def test_get_tenant_size_with_multiple_branches(
|
||||
env.pageserver.allowed_errors.append(".*InternalServerError\\(No such file or directory.*")
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
main_branch_name, main_timeline_id = env.neon_cli.list_timelines(tenant_id)[0]
|
||||
main_timeline_id = env.initial_timeline
|
||||
main_branch_name = "main"
|
||||
|
||||
http_client = env.pageserver.http_client()
|
||||
|
||||
|
||||
@@ -71,10 +71,9 @@ def test_tenants_many(neon_env_builder: NeonEnvBuilder):
|
||||
"checkpoint_distance": "5000000",
|
||||
}
|
||||
)
|
||||
env.neon_cli.create_timeline("test_tenants_many", tenant_id=tenant)
|
||||
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_tenants_many",
|
||||
"main",
|
||||
tenant_id=tenant,
|
||||
)
|
||||
tenants_endpoints.append((tenant, endpoint))
|
||||
|
||||
@@ -638,7 +638,7 @@ def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder):
|
||||
wait_until(50, 0.1, first_request_finished)
|
||||
|
||||
# check that the timeline is gone
|
||||
wait_timeline_detail_404(ps_http, env.initial_tenant, child_timeline_id, iterations=2)
|
||||
wait_timeline_detail_404(ps_http, env.initial_tenant, child_timeline_id, iterations=10)
|
||||
|
||||
|
||||
def test_timeline_delete_works_for_remote_smoke(
|
||||
|
||||
@@ -26,8 +26,7 @@ def test_truncate(neon_env_builder: NeonEnvBuilder, zenbenchmark):
|
||||
}
|
||||
)
|
||||
|
||||
env.neon_cli.create_timeline("test_truncate", tenant_id=tenant)
|
||||
endpoint = env.endpoints.create_start("test_truncate", tenant_id=tenant)
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant)
|
||||
cur = endpoint.connect().cursor()
|
||||
cur.execute("create table t1(x integer)")
|
||||
cur.execute(f"insert into t1 values (generate_series(1,{n_records}))")
|
||||
|
||||
Reference in New Issue
Block a user