Merge remote-tracking branch 'origin/main' into HEAD

Conflicts:
	libs/pageserver_api/src/models.rs
	pageserver/src/lib.rs
	pageserver/src/tenant_mgr.rs

There was a merge conflict following attach_tenant() where
I didn't understand why Git called out a conflict.
I went through the changes in `origin/main` since the last
merge done by Heikki, couldn't find anything that would
conflict there.

Original git diff right after after `git merge` follows:

   diff --cc libs/pageserver_api/src/models.rs
   index 750585b58,aefd79336..000000000
   --- a/libs/pageserver_api/src/models.rs
   +++ b/libs/pageserver_api/src/models.rs
   @@@ -15,17 -15,13 +15,27 @@@ use bytes::{BufMut, Bytes, BytesMut}
     /// A state of a tenant in pageserver's memory.
     #[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
     pub enum TenantState {
   ++<<<<<<< HEAD
    +    // This tenant is being loaded from local disk
    +    Loading,
    +    // This tenant is being downloaded from cloud storage.
    +    Attaching,
    +    /// Tenant is fully operational
    +    Active,
    +    /// A tenant is recognized by pageserver, but it is being detached or the system is being
    +    /// shut down.
    +    Paused,
    +    /// A tenant is recognized by the pageserver, but can no longer used for any operations,
    +    /// because it failed to get activated.
   ++=======
   +     /// Tenant is fully operational, its background jobs might be running or not.
   +     Active { background_jobs_running: bool },
   +     /// A tenant is recognized by pageserver, but it is being detached or the
   +     /// system is being shut down.
   +     Paused,
   +     /// A tenant is recognized by the pageserver, but can no longer be used for
   +     /// any operations, because it failed to be activated.
   ++>>>>>>> origin/main
         Broken,
     }

   diff --cc pageserver/src/lib.rs
   index 2d5b66f57,e3112223e..000000000
   --- a/pageserver/src/lib.rs
   +++ b/pageserver/src/lib.rs
   @@@ -22,7 -23,11 +23,13 @@@ pub mod walreceiver
     pub mod walrecord;
     pub mod walredo;

   ++<<<<<<< HEAD
   ++=======
   + use std::collections::HashMap;
   + use std::path::Path;
   +
   ++>>>>>>> origin/main
     use tracing::info;
    -use utils:🆔:{TenantId, TimelineId};

     use crate::task_mgr::TaskKind;

   @@@ -103,14 -108,51 +110,64 @@@ fn exponential_backoff_duration_seconds
         }
     }

   ++<<<<<<< HEAD
    +/// A suffix to be used during file sync from the remote storage,
    +/// to ensure that we do not leave corrupted files that pretend to be layers.
    +const TEMP_FILE_SUFFIX: &str = "___temp";
   ++=======
   + /// A newtype to store arbitrary data grouped by tenant and timeline ids.
   + /// One could use [`utils:🆔:TenantTimelineId`] for grouping, but that would
   + /// not include the cases where a certain tenant has zero timelines.
   + /// This is sometimes important: a tenant could be registered during initial load from FS,
   + /// even if he has no timelines on disk.
   + #[derive(Debug)]
   + pub struct TenantTimelineValues<T>(HashMap<TenantId, HashMap<TimelineId, T>>);
   +
   + impl<T> TenantTimelineValues<T> {
   +     fn new() -> Self {
   +         Self(HashMap::new())
   +     }
   + }
   +
   + /// The name of the metadata file pageserver creates per timeline.
   + /// Full path: `tenants/<tenant_id>/timelines/<timeline_id>/metadata`.
   + pub const METADATA_FILE_NAME: &str = "metadata";
   +
   + /// Per-tenant configuration file.
   + /// Full path: `tenants/<tenant_id>/config`.
   + pub const TENANT_CONFIG_NAME: &str = "config";
   +
   + /// A suffix used for various temporary files. Any temporary files found in the
   + /// data directory at pageserver startup can be automatically removed.
   + pub const TEMP_FILE_SUFFIX: &str = "___temp";
   +
   + /// A marker file to mark that a timeline directory was not fully initialized.
   + /// If a timeline directory with this marker is encountered at pageserver startup,
   + /// the timeline directory and the marker file are both removed.
   + /// Full path: `tenants/<tenant_id>/timelines/<timeline_id>___uninit`.
   + pub const TIMELINE_UNINIT_MARK_SUFFIX: &str = "___uninit";
   +
   + pub fn is_temporary(path: &Path) -> bool {
   +     match path.file_name() {
   +         Some(name) => name.to_string_lossy().ends_with(TEMP_FILE_SUFFIX),
   +         None => false,
   +     }
   + }
   +
   + pub fn is_uninit_mark(path: &Path) -> bool {
   +     match path.file_name() {
   +         Some(name) => name
   +             .to_string_lossy()
   +             .ends_with(TIMELINE_UNINIT_MARK_SUFFIX),
   +         None => false,
   ++    }
   ++}
   ++>>>>>>> origin/main
    +
    +pub fn is_temporary(path: &std::path::Path) -> bool {
    +    match path.file_name() {
    +        Some(name) => name.to_string_lossy().ends_with(TEMP_FILE_SUFFIX),
    +        None => false,
         }
     }

   diff --cc pageserver/src/tenant_mgr.rs
   index 73593bc48,061d7fa19..000000000
   --- a/pageserver/src/tenant_mgr.rs
   +++ b/pageserver/src/tenant_mgr.rs
   @@@ -13,11 -13,18 +13,22 @@@ use tracing::*
     use remote_storage::GenericRemoteStorage;

     use crate::config::PageServerConf;
   ++<<<<<<< HEAD
   ++=======
   + use crate::http::models::TenantInfo;
   + use crate::storage_sync::index::{LayerFileMetadata, RemoteIndex, RemoteTimelineIndex};
   + use crate::storage_sync::{self, LocalTimelineInitStatus, SyncStartupData, TimelineLocalFiles};
   ++>>>>>>> origin/main
     use crate::task_mgr::{self, TaskKind};
    -use crate::tenant::{
    -    ephemeral_file::is_ephemeral_file, metadata::TimelineMetadata, Tenant, TenantState,
    -};
    +use crate::tenant::{Tenant, TenantState};
     use crate::tenant_config::TenantConfOpt;
   ++<<<<<<< HEAD
   ++=======
   + use crate::walredo::PostgresRedoManager;
   + use crate::{is_temporary, is_uninit_mark, METADATA_FILE_NAME, TEMP_FILE_SUFFIX};
   ++>>>>>>> origin/main

    -use utils::crashsafe::{self, path_with_suffix_extension};
    +use utils::fs_ext::PathExt;
     use utils:🆔:{TenantId, TimelineId};

     mod tenants_state {
   @@@ -341,87 -521,334 +352,247 @@@ pub fn list_tenants() -> Vec<(TenantId
             .collect()
     }

    -#[derive(Debug)]
    -pub enum TenantAttachData {
    -    Ready(HashMap<TimelineId, TimelineLocalFiles>),
    -    Broken(anyhow::Error),
    -}
    -/// Attempts to collect information about all tenant and timelines, existing on the local FS.
    -/// If finds any, deletes all temporary files and directories, created before. Also removes empty directories,
    -/// that may appear due to such removals.
    -/// Does not fail on particular timeline or tenant collection errors, rather logging them and ignoring the entities.
    -fn local_tenant_timeline_files(
    -    config: &'static PageServerConf,
    -) -> anyhow::Result<HashMap<TenantId, TenantAttachData>> {
    -    let _entered = info_span!("local_tenant_timeline_files").entered();
    -
    -    let mut local_tenant_timeline_files = HashMap::new();
    -    let tenants_dir = config.tenants_path();
    -    for tenants_dir_entry in fs::read_dir(&tenants_dir)
    -        .with_context(|| format!("Failed to list tenants dir {}", tenants_dir.display()))?
    -    {
    -        match &tenants_dir_entry {
    -            Ok(tenants_dir_entry) => {
    -                let tenant_dir_path = tenants_dir_entry.path();
    -                if is_temporary(&tenant_dir_path) {
    -                    info!(
    -                        "Found temporary tenant directory, removing: {}",
    -                        tenant_dir_path.display()
    -                    );
    -                    if let Err(e) = fs::remove_dir_all(&tenant_dir_path) {
    -                        error!(
    -                            "Failed to remove temporary directory '{}': {:?}",
    -                            tenant_dir_path.display(),
    -                            e
    -                        );
    -                    }
    -                } else {
    -                    match collect_timelines_for_tenant(config, &tenant_dir_path) {
    -                        Ok((tenant_id, TenantAttachData::Broken(e))) => {
    -                            local_tenant_timeline_files.entry(tenant_id).or_insert(TenantAttachData::Broken(e));
    -                        },
    -                        Ok((tenant_id, TenantAttachData::Ready(collected_files))) => {
    -                            if collected_files.is_empty() {
    -                                match remove_if_empty(&tenant_dir_path) {
    -                                    Ok(true) => info!("Removed empty tenant directory {}", tenant_dir_path.display()),
    -                                    Ok(false) => {
    -                                        // insert empty timeline entry: it has some non-temporary files inside that we cannot remove
    -                                        // so make obvious for HTTP API callers, that something exists there and try to load the tenant
    -                                        let _ = local_tenant_timeline_files.entry(tenant_id).or_insert_with(|| TenantAttachData::Ready(HashMap::new()));
    -                                    },
    -                                    Err(e) => error!("Failed to remove empty tenant directory: {e:?}"),
    -                                }
    -                            } else {
    -                                match local_tenant_timeline_files.entry(tenant_id) {
    -                                    hash_map::Entry::Vacant(entry) => {
    -                                        entry.insert(TenantAttachData::Ready(collected_files));
    -                                    }
    -                                    hash_map::Entry::Occupied(entry) =>{
    -                                        if let TenantAttachData::Ready(old_timelines) = entry.into_mut() {
    -                                            old_timelines.extend(collected_files);
    -                                        }
    -                                    },
    -                                }
    -                            }
    -                        },
    -                        Err(e) => error!(
    -                            "Failed to collect tenant files from dir '{}' for entry {:?}, reason: {:#}",
    -                            tenants_dir.display(),
    -                            tenants_dir_entry,
    -                            e
    -                        ),
    -                    }
    +/// Execute Attach mgmt API command.
    +///
    +/// Downloading all the tenant data is performed in the background, this merely
    +/// spawns the background task and returns quickly.
    +pub async fn attach_tenant(
    +    conf: &'static PageServerConf,
    +    tenant_id: TenantId,
    +    remote_storage: &GenericRemoteStorage,
    +) -> anyhow::Result<()> {
    +    match tenants_state::write_tenants().entry(tenant_id) {
    +        hash_map::Entry::Occupied(e) => {
    +            // Cannot attach a tenant that already exists. The error message depends on
    +            // the state it's in.
    +            match e.get().current_state() {
    +                TenantState::Attaching => {
    +                    anyhow::bail!("tenant {tenant_id} attach is already in progress")
                     }
   ++<<<<<<< HEAD
    +                current_state => {
    +                    anyhow::bail!("tenant already exists, current state: {current_state:?}")
   ++=======
   +             }
   +             Err(e) => error!(
   +                 "Failed to list tenants dir entry {:?} in directory {}, reason: {:?}",
   +                 tenants_dir_entry,
   +                 tenants_dir.display(),
   +                 e
   +             ),
   +         }
   +     }
   +
   +     info!(
   +         "Collected files for {} tenants",
   +         local_tenant_timeline_files.len(),
   +     );
   +     Ok(local_tenant_timeline_files)
   + }
   +
   + fn remove_if_empty(tenant_dir_path: &Path) -> anyhow::Result<bool> {
   +     let directory_is_empty = tenant_dir_path
   +         .read_dir()
   +         .with_context(|| {
   +             format!(
   +                 "Failed to read directory '{}' contents",
   +                 tenant_dir_path.display()
   +             )
   +         })?
   +         .next()
   +         .is_none();
   +
   +     if directory_is_empty {
   +         fs::remove_dir_all(&tenant_dir_path).with_context(|| {
   +             format!(
   +                 "Failed to remove empty directory '{}'",
   +                 tenant_dir_path.display(),
   +             )
   +         })?;
   +
   +         Ok(true)
   +     } else {
   +         Ok(false)
   +     }
   + }
   +
   + fn collect_timelines_for_tenant(
   +     config: &'static PageServerConf,
   +     tenant_path: &Path,
   + ) -> anyhow::Result<(TenantId, TenantAttachData)> {
   +     let tenant_id = tenant_path
   +         .file_name()
   +         .and_then(OsStr::to_str)
   +         .unwrap_or_default()
   +         .parse::<TenantId>()
   +         .context("Could not parse tenant id out of the tenant dir name")?;
   +     let timelines_dir = config.timelines_path(&tenant_id);
   +
   +     if !timelines_dir.as_path().is_dir() {
   +         return Ok((
   +             tenant_id,
   +             TenantAttachData::Broken(anyhow::anyhow!(
   +                 "Tenant {} has no timelines directory at {}",
   +                 tenant_id,
   +                 timelines_dir.display()
   +             )),
   +         ));
   +     }
   +
   +     let mut tenant_timelines = HashMap::new();
   +     for timelines_dir_entry in fs::read_dir(&timelines_dir)
   +         .with_context(|| format!("Failed to list timelines dir entry for tenant {tenant_id}"))?
   +     {
   +         match timelines_dir_entry {
   +             Ok(timelines_dir_entry) => {
   +                 let timeline_dir = timelines_dir_entry.path();
   +                 if is_temporary(&timeline_dir) {
   +                     info!(
   +                         "Found temporary timeline directory, removing: {}",
   +                         timeline_dir.display()
   +                     );
   +                     if let Err(e) = fs::remove_dir_all(&timeline_dir) {
   +                         error!(
   +                             "Failed to remove temporary directory '{}': {:?}",
   +                             timeline_dir.display(),
   +                             e
   +                         );
   +                     }
   +                 } else if is_uninit_mark(&timeline_dir) {
   +                     let timeline_uninit_mark_file = &timeline_dir;
   +                     info!(
   +                         "Found an uninit mark file {}, removing the timeline and its uninit mark",
   +                         timeline_uninit_mark_file.display()
   +                     );
   +                     let timeline_id = timeline_uninit_mark_file
   +                         .file_stem()
   +                         .and_then(OsStr::to_str)
   +                         .unwrap_or_default()
   +                         .parse::<TimelineId>()
   +                         .with_context(|| {
   +                             format!(
   +                                 "Could not parse timeline id out of the timeline uninit mark name {}",
   +                                 timeline_uninit_mark_file.display()
   +                             )
   +                         })?;
   +                     let timeline_dir = config.timeline_path(&timeline_id, &tenant_id);
   +                     if let Err(e) =
   +                         remove_timeline_and_uninit_mark(&timeline_dir, timeline_uninit_mark_file)
   +                     {
   +                         error!("Failed to clean up uninit marked timeline: {e:?}");
   +                     }
   +                 } else {
   +                     let timeline_id = timeline_dir
   +                         .file_name()
   +                         .and_then(OsStr::to_str)
   +                         .unwrap_or_default()
   +                         .parse::<TimelineId>()
   +                         .with_context(|| {
   +                             format!(
   +                                 "Could not parse timeline id out of the timeline dir name {}",
   +                                 timeline_dir.display()
   +                             )
   +                         })?;
   +                     let timeline_uninit_mark_file =
   +                         config.timeline_uninit_mark_file_path(tenant_id, timeline_id);
   +                     if timeline_uninit_mark_file.exists() {
   +                         info!("Found an uninit mark file for timeline {tenant_id}/{timeline_id}, removing the timeline and its uninit mark");
   +                         if let Err(e) = remove_timeline_and_uninit_mark(
   +                             &timeline_dir,
   +                             &timeline_uninit_mark_file,
   +                         ) {
   +                             error!("Failed to clean up uninit marked timeline: {e:?}");
   +                         }
   +                     } else {
   +                         match collect_timeline_files(&timeline_dir) {
   +                             Ok((metadata, timeline_files)) => {
   +                                 tenant_timelines.insert(
   +                                     timeline_id,
   +                                     TimelineLocalFiles::collected(metadata, timeline_files),
   +                                 );
   +                             }
   +                             Err(e) => {
   +                                 error!(
   +                                     "Failed to process timeline dir contents at '{}', reason: {:?}",
   +                                     timeline_dir.display(),
   +                                     e
   +                                 );
   +                                 match remove_if_empty(&timeline_dir) {
   +                                     Ok(true) => info!(
   +                                         "Removed empty timeline directory {}",
   +                                         timeline_dir.display()
   +                                     ),
   +                                     Ok(false) => (),
   +                                     Err(e) => {
   +                                         error!("Failed to remove empty timeline directory: {e:?}")
   +                                     }
   +                                 }
   +                             }
   +                         }
   +                     }
   ++>>>>>>> origin/main
                     }
                 }
    -            Err(e) => {
    -                error!("Failed to list timelines for entry tenant {tenant_id}, reason: {e:?}")
    -            }
    +        }
    +        hash_map::Entry::Vacant(v) => {
    +            let tenant = Tenant::spawn_attach(conf, tenant_id, remote_storage)?;
    +            v.insert(tenant);
    +            Ok(())
             }
         }
    -
    -    if tenant_timelines.is_empty() {
    -        // this is normal, we've removed all broken, empty and temporary timeline dirs
    -        // but should allow the tenant to stay functional and allow creating new timelines
    -        // on a restart, we require tenants to have the timelines dir, so leave it on disk
    -        debug!("Tenant {tenant_id} has no timelines loaded");
    -    }
    -
    -    Ok((tenant_id, TenantAttachData::Ready(tenant_timelines)))
     }

    -fn remove_timeline_and_uninit_mark(timeline_dir: &Path, uninit_mark: &Path) -> anyhow::Result<()> {
    -    fs::remove_dir_all(&timeline_dir)
    -        .or_else(|e| {
    -            if e.kind() == std::io::ErrorKind::NotFound {
    -                // we can leave the uninit mark without a timeline dir,
    -                // just remove the mark then
    -                Ok(())
    -            } else {
    -                Err(e)
    -            }
    -        })
    -        .with_context(|| {
    -            format!(
    -                "Failed to remove unit marked timeline directory {}",
    -                timeline_dir.display()
    -            )
    -        })?;
    -    fs::remove_file(&uninit_mark).with_context(|| {
    -        format!(
    -            "Failed to remove timeline uninit mark file {}",
    -            uninit_mark.display()
    -        )
    -    })?;
    +#[cfg(feature = "testing")]
    +use {
    +    crate::repository::GcResult, pageserver_api::models::TimelineGcRequest,
    +    utils::http::error::ApiError,
    +};

    -    Ok(())
    -}
    +#[cfg(feature = "testing")]
    +pub fn immediate_gc(
    +    tenant_id: TenantId,
    +    timeline_id: TimelineId,
    +    gc_req: TimelineGcRequest,
    +) -> Result<tokio::sync::oneshot::Receiver<Result<GcResult, anyhow::Error>>, ApiError> {
    +    let guard = tenants_state::read_tenants();

    -// discover timeline files and extract timeline metadata
    -//  NOTE: ephemeral files are excluded from the list
    -fn collect_timeline_files(
    -    timeline_dir: &Path,
    -) -> anyhow::Result<(TimelineMetadata, HashMap<PathBuf, LayerFileMetadata>)> {
    -    let mut timeline_files = HashMap::new();
    -    let mut timeline_metadata_path = None;
    -
    -    let timeline_dir_entries =
    -        fs::read_dir(&timeline_dir).context("Failed to list timeline dir contents")?;
    -    for entry in timeline_dir_entries {
    -        let entry_path = entry.context("Failed to list timeline dir entry")?.path();
    -        let metadata = entry_path.metadata()?;
    -
    -        if metadata.is_file() {
    -            if entry_path.file_name().and_then(OsStr::to_str) == Some(METADATA_FILE_NAME) {
    -                timeline_metadata_path = Some(entry_path);
    -            } else if is_ephemeral_file(&entry_path.file_name().unwrap().to_string_lossy()) {
    -                debug!("skipping ephemeral file {}", entry_path.display());
    -                continue;
    -            } else if is_temporary(&entry_path) {
    -                info!("removing temp timeline file at {}", entry_path.display());
    -                fs::remove_file(&entry_path).with_context(|| {
    -                    format!(
    -                        "failed to remove temp download file at {}",
    -                        entry_path.display()
    -                    )
    -                })?;
    -            } else {
    -                let layer_metadata = LayerFileMetadata::new(metadata.len());
    -                timeline_files.insert(entry_path, layer_metadata);
    +    let tenant = guard
    +        .get(&tenant_id)
    +        .map(Arc::clone)
    +        .with_context(|| format!("Tenant {tenant_id} not found"))
    +        .map_err(ApiError::NotFound)?;
    +
    +    let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
    +    // Use tenant's pitr setting
    +    let pitr = tenant.get_pitr_interval();
    +
    +    // Run in task_mgr to avoid race with detach operation
    +    let (task_done, wait_task_done) = tokio::sync::oneshot::channel();
    +    task_mgr::spawn(
    +        &tokio::runtime::Handle::current(),
    +        TaskKind::GarbageCollector,
    +        Some(tenant_id),
    +        Some(timeline_id),
    +        &format!("timeline_gc_handler garbage collection run for tenant {tenant_id} timeline {timeline_id}"),
    +        false,
    +        async move {
    +            fail::fail_point!("immediate_gc_task_pre");
    +            let result = tenant
    +                .gc_iteration(Some(timeline_id), gc_horizon, pitr, true)
    +                .instrument(info_span!("manual_gc", tenant = %tenant_id, timeline = %timeline_id))
    +                .await;
    +                // FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
    +                // better once the types support it.
    +            match task_done.send(result) {
    +                Ok(_) => (),
    +                Err(result) => error!("failed to send gc result: {result:?}"),
                 }
    +            Ok(())
             }
    -    }
    -
    -    // FIXME (rodionov) if attach call succeeded, and then pageserver is restarted before download is completed
    -    //   then attach is lost. There would be no retries for that,
    -    //   initial collect will fail because there is no metadata.
    -    //   We either need to start download if we see empty dir after restart or attach caller should
    -    //   be aware of that and retry attach if awaits_download for timeline switched from true to false
    -    //   but timelinne didn't appear locally.
    -    //   Check what happens with remote index in that case.
    -    let timeline_metadata_path = match timeline_metadata_path {
    -        Some(path) => path,
    -        None => anyhow::bail!("No metadata file found in the timeline directory"),
    -    };
    -    let metadata = TimelineMetadata::from_bytes(
    -        &fs::read(&timeline_metadata_path).context("Failed to read timeline metadata file")?,
    -    )
    -    .context("Failed to parse timeline metadata file bytes")?;
    -
    -    anyhow::ensure!(
    -        metadata.ancestor_timeline().is_some() || !timeline_files.is_empty(),
    -        "Timeline has no ancestor and no layer files"
         );

    -    Ok((metadata, timeline_files))
    +    // drop the guard until after we've spawned the task so that timeline shutdown will wait for the task
    +    drop(guard);
    +
    +    Ok(wait_task_done)
     }
   diff --git a/vendor/postgres-v14 b/vendor/postgres-v14
   index da50d99db..360ff1c63 160000
   --- a/vendor/postgres-v14
   +++ b/vendor/postgres-v14
   @@ -1 +1 @@
   -Subproject commit da50d99db54848f7a3e910f920aaad7dc6915d36
   +Subproject commit 360ff1c637a57d351a7a5a391d8e8afd8fde8c3a
   diff --git a/vendor/postgres-v15 b/vendor/postgres-v15
   index 780c3f8e3..d31b3f7c6 160000
   --- a/vendor/postgres-v15
   +++ b/vendor/postgres-v15
   @@ -1 +1 @@
   -Subproject commit 780c3f8e3524c2e32a2e28884c7b647fcebf71d7
   +Subproject commit d31b3f7c6d108e52c8bb11e812ce4e266501ea3d
This commit is contained in:
Christian Schwarz
2022-11-25 12:19:53 -05:00
94 changed files with 3728 additions and 1572 deletions

View File

@@ -420,8 +420,9 @@ class AuthKeys:
pub: str
priv: str
def generate_management_token(self) -> str:
token = jwt.encode({"scope": "pageserverapi"}, self.priv, algorithm="RS256")
def generate_token(self, *, scope: str, **token_data: str) -> str:
token = jwt.encode({"scope": scope, **token_data}, self.priv, algorithm="RS256")
# cast(Any, self.priv)
# jwt.encode can return 'bytes' or 'str', depending on Python version or type
# hinting or something (not sure what). If it returned 'bytes', convert it to 'str'
@@ -431,17 +432,14 @@ class AuthKeys:
return token
def generate_pageserver_token(self) -> str:
return self.generate_token(scope="pageserverapi")
def generate_safekeeper_token(self) -> str:
return self.generate_token(scope="safekeeperdata")
def generate_tenant_token(self, tenant_id: TenantId) -> str:
token = jwt.encode(
{"scope": "tenant", "tenant_id": str(tenant_id)},
self.priv,
algorithm="RS256",
)
if isinstance(token, bytes):
token = token.decode()
return token
return self.generate_token(scope="tenant", tenant_id=str(tenant_id))
class MockS3Server:
@@ -1761,6 +1759,8 @@ class NeonPageserver(PgProtocol):
".*manual_gc.*is_shutdown_requested\\(\\) called in an unexpected task or thread.*",
".*tenant_list: timeline is not found in remote index while it is present in the tenants registry.*",
".*Removing intermediate uninit mark file.*",
# FIXME: known race condition in TaskHandle: https://github.com/neondatabase/neon/issues/2885
".*sender is dropped while join handle is still alive.*",
]
def start(
@@ -2094,7 +2094,8 @@ class NeonProxy(PgProtocol):
def start(self):
"""
Starts a proxy with option '--auth-backend postgres' and a postgres instance already provided though '--auth-endpoint <postgress-instance>'."
Starts a proxy with option '--auth-backend postgres' and a postgres instance
already provided though '--auth-endpoint <postgress-instance>'."
"""
assert self._popen is None
assert self.auth_endpoint is not None
@@ -2499,7 +2500,8 @@ class Safekeeper:
# "replication=0" hacks psycopg not to send additional queries
# on startup, see https://github.com/psycopg/psycopg2/pull/482
connstr = f"host=localhost port={self.port.pg} replication=0 options='-c timeline_id={timeline_id} tenant_id={tenant_id}'"
token = self.env.auth_keys.generate_tenant_token(tenant_id)
connstr = f"host=localhost port={self.port.pg} password={token} replication=0 options='-c timeline_id={timeline_id} tenant_id={tenant_id}'"
with closing(psycopg2.connect(connstr)) as conn:
# server doesn't support transactions

View File

@@ -0,0 +1,62 @@
import pytest
from fixtures.neon_fixtures import NeonEnvBuilder, wait_for_last_flush_lsn
#
# Benchmark effect of prefetch on bulk update operations
#
# A sequential scan that's part of a bulk update is the same as any other sequential scan,
# but dirtying the pages as you go affects the last-written LSN tracking. We used to have
# an issue with the last-written LSN cache where rapidly evicting dirty pages always
# invalidated the prefetched responses, which showed up in bad performance in this test.
#
@pytest.mark.timeout(10000)
@pytest.mark.parametrize("fillfactor", [10, 50, 100])
def test_bulk_update(neon_env_builder: NeonEnvBuilder, zenbenchmark, fillfactor):
env = neon_env_builder.init_start()
n_records = 1000000
timeline_id = env.neon_cli.create_branch("test_bulk_update")
tenant_id = env.initial_tenant
pg = env.postgres.create_start("test_bulk_update")
cur = pg.connect().cursor()
cur.execute("set statement_timeout=0")
cur.execute(f"create table t(x integer) WITH (fillfactor={fillfactor})")
with zenbenchmark.record_duration("insert-1"):
cur.execute(f"insert into t values (generate_series(1,{n_records}))")
cur.execute("vacuum t")
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
with zenbenchmark.record_duration("update-no-prefetch"):
cur.execute("update t set x=x+1")
cur.execute("vacuum t")
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
with zenbenchmark.record_duration("delete-no-prefetch"):
cur.execute("delete from t")
cur.execute("drop table t")
cur.execute("set enable_seqscan_prefetch=on")
cur.execute("set seqscan_prefetch_buffers=100")
cur.execute(f"create table t2(x integer) WITH (fillfactor={fillfactor})")
with zenbenchmark.record_duration("insert-2"):
cur.execute(f"insert into t2 values (generate_series(1,{n_records}))")
cur.execute("vacuum t2")
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
with zenbenchmark.record_duration("update-with-prefetch"):
cur.execute("update t2 set x=x+1")
cur.execute("vacuum t2")
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
with zenbenchmark.record_duration("delete-with-prefetch"):
cur.execute("delete from t2")

View File

@@ -88,7 +88,7 @@ def run_test_pgbench(env: PgCompare, scale: int, duration: int, workload_type: P
env.zenbenchmark.record("scale", scale, "", MetricReport.TEST_PARAM)
password = env.pg.default_options.get("password", None)
options = "-cstatement_timeout=1h " + env.pg.default_options.get("options", "")
options = "-cstatement_timeout=0 " + env.pg.default_options.get("options", "")
# drop password from the connection string by passing password=None and set password separately
connstr = env.pg.connstr(password=None, options=options)

View File

@@ -22,16 +22,21 @@ from pytest_lazyfixture import lazy_fixture # type: ignore
],
)
@pytest.mark.parametrize(
"env",
"env, scale",
[
# Run on all envs
pytest.param(lazy_fixture("neon_compare"), id="neon"),
pytest.param(lazy_fixture("vanilla_compare"), id="vanilla"),
pytest.param(lazy_fixture("remote_compare"), id="remote", marks=pytest.mark.remote_cluster),
# Run on all envs. Use 50x larger table on remote cluster to make sure
# it doesn't fit in shared buffers, which are larger on remote than local.
pytest.param(lazy_fixture("neon_compare"), 1, id="neon"),
pytest.param(lazy_fixture("vanilla_compare"), 1, id="vanilla"),
pytest.param(
lazy_fixture("remote_compare"), 50, id="remote", marks=pytest.mark.remote_cluster
),
],
)
def test_seqscans(env: PgCompare, rows: int, iters: int, workers: int):
with closing(env.pg.connect()) as conn:
def test_seqscans(env: PgCompare, scale: int, rows: int, iters: int, workers: int):
rows = scale * rows
with closing(env.pg.connect(options="-cstatement_timeout=0")) as conn:
with conn.cursor() as cur:
cur.execute("drop table if exists t;")
cur.execute("create table t (i integer);")

View File

@@ -154,7 +154,7 @@ def test_pgbench_simple_update_workload(pg_compare: PgCompare, scale: int, durat
def start_pgbench_intensive_initialization(env: PgCompare, scale: int, done_event: threading.Event):
with env.record_duration("run_duration"):
# Needs to increase the statement timeout (default: 120s) because the
# Disable statement timeout (default: 120s) because the
# initialization step can be slow with a large scale.
env.pg_bin.run_capture(
[
@@ -162,7 +162,7 @@ def start_pgbench_intensive_initialization(env: PgCompare, scale: int, done_even
f"-s{scale}",
"-i",
"-Idtg",
env.pg.connstr(options="-cstatement_timeout=600s"),
env.pg.connstr(options="-cstatement_timeout=0"),
]
)

View File

@@ -1,7 +1,7 @@
from contextlib import closing
import pytest
from fixtures.neon_fixtures import NeonEnvBuilder, PageserverApiException
from fixtures.neon_fixtures import NeonEnvBuilder, PageserverApiException, PgProtocol
from fixtures.types import TenantId
@@ -16,13 +16,13 @@ def test_pageserver_auth(neon_env_builder: NeonEnvBuilder):
invalid_tenant_token = env.auth_keys.generate_tenant_token(TenantId.generate())
invalid_tenant_http_client = env.pageserver.http_client(invalid_tenant_token)
management_token = env.auth_keys.generate_management_token()
management_http_client = env.pageserver.http_client(management_token)
pageserver_token = env.auth_keys.generate_pageserver_token()
pageserver_http_client = env.pageserver.http_client(pageserver_token)
# this does not invoke auth check and only decodes jwt and checks it for validity
# check both tokens
ps.safe_psql("set FOO", password=tenant_token)
ps.safe_psql("set FOO", password=management_token)
ps.safe_psql("set FOO", password=pageserver_token)
new_timeline_id = env.neon_cli.create_branch(
"test_pageserver_auth", tenant_id=env.initial_tenant
@@ -33,7 +33,7 @@ def test_pageserver_auth(neon_env_builder: NeonEnvBuilder):
tenant_id=env.initial_tenant, ancestor_timeline_id=new_timeline_id
)
# console can create branches for tenant
management_http_client.timeline_create(
pageserver_http_client.timeline_create(
tenant_id=env.initial_tenant, ancestor_timeline_id=new_timeline_id
)
@@ -46,7 +46,7 @@ def test_pageserver_auth(neon_env_builder: NeonEnvBuilder):
)
# create tenant using management token
management_http_client.tenant_create()
pageserver_http_client.tenant_create()
# fail to create tenant using tenant token
with pytest.raises(
@@ -73,3 +73,73 @@ def test_compute_auth_to_pageserver(neon_env_builder: NeonEnvBuilder):
cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
cur.execute("SELECT sum(key) FROM t")
assert cur.fetchone() == (5000050000,)
@pytest.mark.parametrize("auth_enabled", [False, True])
def test_auth_failures(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
neon_env_builder.auth_enabled = auth_enabled
env = neon_env_builder.init_start()
branch = f"test_auth_failures_auth_enabled_{auth_enabled}"
timeline_id = env.neon_cli.create_branch(branch)
env.postgres.create_start(branch)
tenant_token = env.auth_keys.generate_tenant_token(env.initial_tenant)
invalid_tenant_token = env.auth_keys.generate_tenant_token(TenantId.generate())
pageserver_token = env.auth_keys.generate_pageserver_token()
safekeeper_token = env.auth_keys.generate_safekeeper_token()
def check_connection(
pg_protocol: PgProtocol, command: str, expect_success: bool, **conn_kwargs
):
def op():
with closing(pg_protocol.connect(**conn_kwargs)) as conn:
with conn.cursor() as cur:
cur.execute(command)
if expect_success:
op()
else:
with pytest.raises(Exception):
op()
def check_pageserver(expect_success: bool, **conn_kwargs):
check_connection(
env.pageserver,
f"get_last_record_rlsn {env.initial_tenant} {timeline_id}",
expect_success,
**conn_kwargs,
)
check_pageserver(not auth_enabled)
if auth_enabled:
check_pageserver(True, password=tenant_token)
env.pageserver.allowed_errors.append(".*Tenant id mismatch. Permission denied.*")
check_pageserver(False, password=invalid_tenant_token)
check_pageserver(True, password=pageserver_token)
env.pageserver.allowed_errors.append(
".*SafekeeperData scope makes no sense for Pageserver.*"
)
check_pageserver(False, password=safekeeper_token)
def check_safekeeper(expect_success: bool, **conn_kwargs):
check_connection(
PgProtocol(
host="localhost",
port=env.safekeepers[0].port.pg,
options=f"ztenantid={env.initial_tenant} ztimelineid={timeline_id}",
),
"IDENTIFY_SYSTEM",
expect_success,
**conn_kwargs,
)
check_safekeeper(not auth_enabled)
if auth_enabled:
check_safekeeper(True, password=tenant_token)
check_safekeeper(False, password=invalid_tenant_token)
check_safekeeper(False, password=pageserver_token)
check_safekeeper(True, password=safekeeper_token)

View File

@@ -181,7 +181,7 @@ def test_pageserver_http_api_client_auth_enabled(neon_env_builder: NeonEnvBuilde
neon_env_builder.auth_enabled = True
env = neon_env_builder.init_start()
management_token = env.auth_keys.generate_management_token()
pageserver_token = env.auth_keys.generate_pageserver_token()
with env.pageserver.http_client(auth_token=management_token) as client:
with env.pageserver.http_client(auth_token=pageserver_token) as client:
check_client(client, env.initial_tenant)

View File

@@ -1,5 +1,4 @@
import json
import subprocess
from urllib.parse import urlparse
import psycopg2
@@ -29,108 +28,65 @@ def test_password_hack(static_proxy: NeonProxy):
static_proxy.safe_psql("select 1", sslsni=0, user=user, password=magic)
def get_session_id_from_uri_line(uri_prefix, uri_line):
def get_session_id(uri_prefix, uri_line):
assert uri_prefix in uri_line
url_parts = urlparse(uri_line)
psql_session_id = url_parts.path[1:]
assert psql_session_id.isalnum(), "session_id should only contain alphanumeric chars."
link_auth_uri_prefix = uri_line[: -len(url_parts.path)]
# invariant: the prefix must match the uri_prefix.
assert (
link_auth_uri_prefix == uri_prefix
), f"Line='{uri_line}' should contain a http auth link of form '{uri_prefix}/<psql_session_id>'."
# invariant: the entire link_auth_uri should be on its own line, module spaces.
assert " ".join(uri_line.split(" ")) == f"{uri_prefix}/{psql_session_id}"
assert psql_session_id.isalnum(), "session_id should only contain alphanumeric chars"
return psql_session_id
def create_and_send_db_info(local_vanilla_pg, psql_session_id, mgmt_port):
pg_user = "proxy"
pg_password = "password"
local_vanilla_pg.start()
query = f"create user {pg_user} with login superuser password '{pg_password}'"
local_vanilla_pg.safe_psql(query)
port = local_vanilla_pg.default_options["port"]
host = local_vanilla_pg.default_options["host"]
dbname = local_vanilla_pg.default_options["dbname"]
db_info_dict = {
"session_id": psql_session_id,
"result": {
"Success": {
"host": host,
"port": port,
"dbname": dbname,
"user": pg_user,
"password": pg_password,
}
},
}
db_info_str = json.dumps(db_info_dict)
cmd_args = [
"psql",
"-h",
"127.0.0.1", # localhost
"-p",
f"{mgmt_port}",
"-c",
db_info_str,
]
log.info(f"Sending to proxy the user and db info: {' '.join(cmd_args)}")
p = subprocess.Popen(cmd_args, stdout=subprocess.PIPE)
out, err = p.communicate()
assert "ok" in str(out)
async def get_uri_line_from_process_welcome_notice(link_auth_uri_prefix, proc):
"""
Returns the line from the welcome notice from proc containing link_auth_uri_prefix.
:param link_auth_uri_prefix: the uri prefix used to indicate the line of interest
:param proc: the process to read the welcome message from.
:return: a line containing the full link authentication uri.
"""
max_num_lines_of_welcome_message = 15
for attempt in range(max_num_lines_of_welcome_message):
raw_line = await proc.stderr.readline()
line = raw_line.decode("utf-8").strip()
async def find_auth_link(link_auth_uri_prefix, proc):
for _ in range(100):
line = (await proc.stderr.readline()).decode("utf-8").strip()
log.info(f"psql line: {line}")
if link_auth_uri_prefix in line:
log.info(f"SUCCESS, found auth url: {line}")
return line
assert False, f"did not find line containing '{link_auth_uri_prefix}'"
async def activate_link_auth(local_vanilla_pg, link_proxy, psql_session_id):
pg_user = "proxy"
log.info("creating a new user for link auth test")
local_vanilla_pg.start()
local_vanilla_pg.safe_psql(f"create user {pg_user} with login superuser")
db_info = json.dumps(
{
"session_id": psql_session_id,
"result": {
"Success": {
"host": local_vanilla_pg.default_options["host"],
"port": local_vanilla_pg.default_options["port"],
"dbname": local_vanilla_pg.default_options["dbname"],
"user": pg_user,
"project": "irrelevant",
}
},
}
)
log.info("sending session activation message")
psql = await PSQL(host=link_proxy.host, port=link_proxy.mgmt_port).run(db_info)
out = (await psql.stdout.read()).decode("utf-8").strip()
assert out == "ok"
@pytest.mark.asyncio
async def test_psql_session_id(vanilla_pg: VanillaPostgres, link_proxy: NeonProxy):
"""
Test copied and modified from: test_project_psql_link_auth test from cloud/tests_e2e/tests/test_project.py
Step 1. establish connection to the proxy
Step 2. retrieve session_id:
Step 2.1: read welcome message
Step 2.2: parse session_id
Step 3. create a vanilla_pg and send user and db info via command line (using Popen) a psql query via mgmt port to proxy.
Step 4. assert that select 1 has been executed correctly.
"""
psql = PSQL(
host=link_proxy.host,
port=link_proxy.proxy_port,
)
proc = await psql.run("select 42")
psql = await PSQL(host=link_proxy.host, port=link_proxy.proxy_port).run("select 42")
uri_prefix = link_proxy.link_auth_uri_prefix
line_str = await get_uri_line_from_process_welcome_notice(uri_prefix, proc)
link = await find_auth_link(uri_prefix, psql)
psql_session_id = get_session_id_from_uri_line(uri_prefix, line_str)
log.info(f"Parsed psql_session_id='{psql_session_id}' from Neon welcome message.")
psql_session_id = get_session_id(uri_prefix, link)
await activate_link_auth(vanilla_pg, link_proxy, psql_session_id)
create_and_send_db_info(vanilla_pg, psql_session_id, link_proxy.mgmt_port)
assert proc.stdout is not None
out = (await proc.stdout.read()).decode("utf-8").strip()
assert psql.stdout is not None
out = (await psql.stdout.read()).decode("utf-8").strip()
assert out == "42"

View File

@@ -192,10 +192,8 @@ def test_get_tenant_size_with_multiple_branches(neon_env_builder: NeonEnvBuilder
"first-branch", main_branch_name, tenant_id
)
# unsure why this happens, the size difference is more than a page alignment
size_after_first_branch = http_client.tenant_size(tenant_id)
assert size_after_first_branch > size_at_branch
assert size_after_first_branch - size_at_branch == gc_horizon
assert size_after_first_branch == size_at_branch
first_branch_pg = env.postgres.create_start("first-branch", tenant_id=tenant_id)
@@ -221,7 +219,7 @@ def test_get_tenant_size_with_multiple_branches(neon_env_builder: NeonEnvBuilder
"second-branch", main_branch_name, tenant_id
)
size_after_second_branch = http_client.tenant_size(tenant_id)
assert size_after_second_branch > size_after_continuing_on_main
assert size_after_second_branch == size_after_continuing_on_main
second_branch_pg = env.postgres.create_start("second-branch", tenant_id=tenant_id)

View File

@@ -0,0 +1,46 @@
import time
from fixtures.neon_fixtures import NeonEnvBuilder
#
# Test truncation of FSM and VM forks of a relation
#
def test_truncate(neon_env_builder: NeonEnvBuilder, zenbenchmark):
env = neon_env_builder.init_start()
n_records = 10000
n_iter = 10
# Problems with FSM/VM forks truncation are most frequently detected during page reconstruction triggered
# by image layer generation. So adjust default parameters to make it happen more frequently.
tenant, _ = env.neon_cli.create_tenant(
conf={
"gc_period": "100 m",
"gc_horizon": "1048576",
"checkpoint_distance": "1000000",
"compaction_period": "1 s",
"compaction_threshold": "3",
"image_creation_threshold": "1",
"compaction_target_size": "1000000",
}
)
env.neon_cli.create_timeline("test_truncate", tenant_id=tenant)
pg = env.postgres.create_start("test_truncate", tenant_id=tenant)
cur = pg.connect().cursor()
cur.execute("create table t1(x integer)")
cur.execute(f"insert into t1 values (generate_series(1,{n_records}))")
cur.execute("vacuum t1")
for i in range(n_iter):
cur.execute(f"delete from t1 where x>{n_records//2}")
cur.execute("vacuum t1")
time.sleep(1) # let pageserver a chance to create image layers
cur.execute(f"insert into t1 values (generate_series({n_records//2+1}, {n_records}))")
cur.execute("vacuum t1")
time.sleep(1) # let pageserver a chance to create image layers
cur.execute("select count(*) from t1")
res = cur.fetchone()
assert res is not None
assert res[0] == n_records