mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-09 14:32:57 +00:00
Create tenants in temporary directory first (#2426)
This commit is contained in:
@@ -21,6 +21,8 @@ use std::collections::BTreeSet;
|
||||
use std::collections::HashMap;
|
||||
use std::fs;
|
||||
use std::fs::File;
|
||||
use std::fs::OpenOptions;
|
||||
use std::io::Write;
|
||||
use std::num::NonZeroU64;
|
||||
use std::ops::Bound::Included;
|
||||
use std::path::Path;
|
||||
@@ -38,6 +40,7 @@ use crate::tenant_config::{TenantConf, TenantConfOpt};
|
||||
use crate::metrics::STORAGE_TIME;
|
||||
use crate::repository::GcResult;
|
||||
use crate::task_mgr;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use crate::walredo::WalRedoManager;
|
||||
use crate::CheckpointConfig;
|
||||
|
||||
@@ -663,14 +666,14 @@ impl Repository {
|
||||
}
|
||||
|
||||
pub fn persist_tenant_config(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: ZTenantId,
|
||||
target_config_path: &Path,
|
||||
tenant_conf: TenantConfOpt,
|
||||
first_save: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
let _enter = info_span!("saving tenantconf").entered();
|
||||
let target_config_path = TenantConf::path(conf, tenant_id);
|
||||
info!("save tenantconf to {}", target_config_path.display());
|
||||
info!("persisting tenantconf to {}", target_config_path.display());
|
||||
|
||||
// TODO this will prepend comments endlessly
|
||||
let mut conf_content = r#"# This file contains a specific per-tenant's config.
|
||||
# It is read in case of pageserver restart.
|
||||
|
||||
@@ -681,12 +684,48 @@ impl Repository {
|
||||
// Convert the config to a toml file.
|
||||
conf_content += &toml_edit::easy::to_string(&tenant_conf)?;
|
||||
|
||||
fs::write(&target_config_path, conf_content).with_context(|| {
|
||||
format!(
|
||||
"Failed to write config file into path '{}'",
|
||||
target_config_path.display()
|
||||
)
|
||||
})
|
||||
let mut target_config_file = VirtualFile::open_with_options(
|
||||
target_config_path,
|
||||
OpenOptions::new().write(true).create_new(first_save),
|
||||
)?;
|
||||
|
||||
target_config_file
|
||||
.write(conf_content.as_bytes())
|
||||
.context("Failed to write toml bytes into file")
|
||||
.and_then(|_| {
|
||||
target_config_file
|
||||
.sync_all()
|
||||
.context("Faile to fsync config file")
|
||||
})
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to write config file into path '{}'",
|
||||
target_config_path.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
// fsync the parent directory to ensure the directory entry is durable
|
||||
if first_save {
|
||||
target_config_path
|
||||
.parent()
|
||||
.context("Config file does not have a parent")
|
||||
.and_then(|target_config_parent| {
|
||||
File::open(target_config_parent).context("Failed to open config parent")
|
||||
})
|
||||
.and_then(|tenant_dir| {
|
||||
tenant_dir
|
||||
.sync_all()
|
||||
.context("Failed to fsync config parent")
|
||||
})
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to fsync on firts save for config {}",
|
||||
target_config_path.display()
|
||||
)
|
||||
})?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
//
|
||||
|
||||
@@ -9,16 +9,14 @@ use crate::layered_repository::Repository;
|
||||
use crate::storage_sync::index::{RemoteIndex, RemoteTimelineIndex};
|
||||
use crate::storage_sync::{self, LocalTimelineInitStatus, SyncStartupData};
|
||||
use crate::task_mgr::{self, TaskKind};
|
||||
use crate::tenant_config::TenantConfOpt;
|
||||
use crate::tenant_config::{TenantConf, TenantConfOpt};
|
||||
use crate::walredo::{PostgresRedoManager, WalRedoManager};
|
||||
use crate::{TenantTimelineValues, TEMP_FILE_SUFFIX};
|
||||
use anyhow::{ensure, Context};
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::hash_map::{self, Entry};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use anyhow::Context;
|
||||
use remote_storage::{path_with_suffix_extension, GenericRemoteStorage};
|
||||
use std::collections::{hash_map, HashMap, HashSet};
|
||||
use std::ffi::OsStr;
|
||||
use std::fmt;
|
||||
use std::fs;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
use tracing::*;
|
||||
@@ -58,7 +56,7 @@ struct Tenant {
|
||||
repo: Arc<Repository>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
pub enum TenantState {
|
||||
// This tenant exists on local disk, and the layer map has been loaded into memory.
|
||||
// The local disk might have some newer files that don't exist in cloud storage yet.
|
||||
@@ -74,8 +72,8 @@ pub enum TenantState {
|
||||
Broken,
|
||||
}
|
||||
|
||||
impl fmt::Display for TenantState {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
impl std::fmt::Display for TenantState {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::Active => f.write_str("Active"),
|
||||
Self::Idle => f.write_str("Idle"),
|
||||
@@ -252,21 +250,71 @@ fn create_repo(
|
||||
wal_redo_manager: Arc<dyn WalRedoManager + Send + Sync>,
|
||||
remote_index: RemoteIndex,
|
||||
) -> anyhow::Result<Arc<Repository>> {
|
||||
let repo_dir = conf.tenant_path(&tenant_id);
|
||||
ensure!(
|
||||
!repo_dir.exists(),
|
||||
"cannot create new tenant repo: '{}' directory already exists",
|
||||
tenant_id
|
||||
let target_tenant_directory = conf.tenant_path(&tenant_id);
|
||||
anyhow::ensure!(
|
||||
!target_tenant_directory.exists(),
|
||||
"cannot create new tenant repo: '{tenant_id}' directory already exists",
|
||||
);
|
||||
|
||||
// top-level dir may exist if we are creating it through CLI
|
||||
crashsafe_dir::create_dir_all(&repo_dir)
|
||||
.with_context(|| format!("could not create directory {}", repo_dir.display()))?;
|
||||
crashsafe_dir::create_dir(conf.timelines_path(&tenant_id))?;
|
||||
info!("created directory structure in {}", repo_dir.display());
|
||||
let temporary_tenant_dir =
|
||||
path_with_suffix_extension(&target_tenant_directory, TEMP_FILE_SUFFIX);
|
||||
debug!(
|
||||
"Creating temporary directory structure in {}",
|
||||
temporary_tenant_dir.display()
|
||||
);
|
||||
|
||||
// Save tenant's config
|
||||
Repository::persist_tenant_config(conf, tenant_id, tenant_conf)?;
|
||||
let temporary_tenant_timelines_dir = rebase_directory(
|
||||
&conf.timelines_path(&tenant_id),
|
||||
&target_tenant_directory,
|
||||
&temporary_tenant_dir,
|
||||
)?;
|
||||
let temporary_tenant_config_path = rebase_directory(
|
||||
&TenantConf::path(conf, tenant_id),
|
||||
&target_tenant_directory,
|
||||
&temporary_tenant_dir,
|
||||
)?;
|
||||
|
||||
// top-level dir may exist if we are creating it through CLI
|
||||
crashsafe_dir::create_dir_all(&temporary_tenant_dir).with_context(|| {
|
||||
format!(
|
||||
"could not create temporary tenant directory {}",
|
||||
temporary_tenant_dir.display()
|
||||
)
|
||||
})?;
|
||||
// first, create a config in the top-level temp directory, fsync the file
|
||||
Repository::persist_tenant_config(&temporary_tenant_config_path, tenant_conf, true)?;
|
||||
// then, create a subdirectory in the top-level temp directory, fsynced
|
||||
crashsafe_dir::create_dir(&temporary_tenant_timelines_dir).with_context(|| {
|
||||
format!(
|
||||
"could not create temporary tenant timelines directory {}",
|
||||
temporary_tenant_timelines_dir.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
fail::fail_point!("tenant-creation-before-tmp-rename", |_| {
|
||||
anyhow::bail!("failpoint tenant-creation-before-tmp-rename");
|
||||
});
|
||||
|
||||
// move-rename tmp directory with all files synced into a permanent directory, fsync its parent
|
||||
fs::rename(&temporary_tenant_dir, &target_tenant_directory).with_context(|| {
|
||||
format!(
|
||||
"failed to move temporary tenant directory {} into the permanent one {}",
|
||||
temporary_tenant_dir.display(),
|
||||
target_tenant_directory.display()
|
||||
)
|
||||
})?;
|
||||
let target_dir_parent = target_tenant_directory.parent().with_context(|| {
|
||||
format!(
|
||||
"Failed to get tenant dir parent for {}",
|
||||
target_tenant_directory.display()
|
||||
)
|
||||
})?;
|
||||
fs::File::open(target_dir_parent)?.sync_all()?;
|
||||
|
||||
info!(
|
||||
"created directory structure in {}",
|
||||
target_tenant_directory.display()
|
||||
);
|
||||
|
||||
Ok(Arc::new(Repository::new(
|
||||
conf,
|
||||
@@ -278,6 +326,17 @@ fn create_repo(
|
||||
)))
|
||||
}
|
||||
|
||||
fn rebase_directory(original_path: &Path, base: &Path, new_base: &Path) -> anyhow::Result<PathBuf> {
|
||||
let relative_path = original_path.strip_prefix(base).with_context(|| {
|
||||
format!(
|
||||
"Failed to strip base prefix '{}' off path '{}'",
|
||||
base.display(),
|
||||
original_path.display()
|
||||
)
|
||||
})?;
|
||||
Ok(new_base.join(relative_path))
|
||||
}
|
||||
|
||||
pub fn create_tenant(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_conf: TenantConfOpt,
|
||||
@@ -285,11 +344,11 @@ pub fn create_tenant(
|
||||
remote_index: RemoteIndex,
|
||||
) -> anyhow::Result<Option<ZTenantId>> {
|
||||
match tenants_state::write_tenants().entry(tenant_id) {
|
||||
Entry::Occupied(_) => {
|
||||
hash_map::Entry::Occupied(_) => {
|
||||
debug!("tenant {tenant_id} already exists");
|
||||
Ok(None)
|
||||
}
|
||||
Entry::Vacant(v) => {
|
||||
hash_map::Entry::Vacant(v) => {
|
||||
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
|
||||
let repo = create_repo(conf, tenant_conf, tenant_id, wal_redo_manager, remote_index)?;
|
||||
v.insert(Tenant {
|
||||
@@ -310,7 +369,7 @@ pub fn update_tenant_config(
|
||||
info!("configuring tenant {tenant_id}");
|
||||
get_repository_for_tenant(tenant_id)?.update_tenant_config(tenant_conf);
|
||||
|
||||
Repository::persist_tenant_config(conf, tenant_id, tenant_conf)?;
|
||||
Repository::persist_tenant_config(&TenantConf::path(conf, tenant_id), tenant_conf, false)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -424,7 +483,7 @@ pub async fn detach_tenant(
|
||||
// we will attempt to remove files which no longer exist. This can be fixed by having shutdown
|
||||
// mechanism for repository that will clean temporary data to avoid any references to ephemeral files
|
||||
let local_tenant_directory = conf.tenant_path(&tenant_id);
|
||||
std::fs::remove_dir_all(&local_tenant_directory).with_context(|| {
|
||||
fs::remove_dir_all(&local_tenant_directory).with_context(|| {
|
||||
format!(
|
||||
"Failed to remove local tenant directory '{}'",
|
||||
local_tenant_directory.display()
|
||||
@@ -472,7 +531,7 @@ fn local_tenant_timeline_files(
|
||||
|
||||
let mut local_tenant_timeline_files = TenantTimelineValues::new();
|
||||
let tenants_dir = config.tenants_path();
|
||||
for tenants_dir_entry in std::fs::read_dir(&tenants_dir)
|
||||
for tenants_dir_entry in fs::read_dir(&tenants_dir)
|
||||
.with_context(|| format!("Failed to list tenants dir {}", tenants_dir.display()))?
|
||||
{
|
||||
match &tenants_dir_entry {
|
||||
@@ -483,7 +542,7 @@ fn local_tenant_timeline_files(
|
||||
"Found temporary tenant directory, removing: {}",
|
||||
tenant_dir_path.display()
|
||||
);
|
||||
if let Err(e) = std::fs::remove_dir_all(&tenant_dir_path) {
|
||||
if let Err(e) = fs::remove_dir_all(&tenant_dir_path) {
|
||||
error!(
|
||||
"Failed to remove temporary directory '{}': {:?}",
|
||||
tenant_dir_path.display(),
|
||||
@@ -545,7 +604,7 @@ fn remove_if_empty(tenant_dir_path: &Path) -> anyhow::Result<bool> {
|
||||
.is_none();
|
||||
|
||||
if directory_is_empty {
|
||||
std::fs::remove_dir_all(&tenant_dir_path).with_context(|| {
|
||||
fs::remove_dir_all(&tenant_dir_path).with_context(|| {
|
||||
format!(
|
||||
"Failed to remove empty directory '{}'",
|
||||
tenant_dir_path.display(),
|
||||
@@ -582,7 +641,7 @@ fn collect_timelines_for_tenant(
|
||||
let timelines_dir = config.timelines_path(&tenant_id);
|
||||
|
||||
let mut tenant_timelines = HashMap::new();
|
||||
for timelines_dir_entry in std::fs::read_dir(&timelines_dir)
|
||||
for timelines_dir_entry in fs::read_dir(&timelines_dir)
|
||||
.with_context(|| format!("Failed to list timelines dir entry for tenant {tenant_id}"))?
|
||||
{
|
||||
match timelines_dir_entry {
|
||||
@@ -593,7 +652,7 @@ fn collect_timelines_for_tenant(
|
||||
"Found temporary timeline directory, removing: {}",
|
||||
timeline_dir.display()
|
||||
);
|
||||
if let Err(e) = std::fs::remove_dir_all(&timeline_dir) {
|
||||
if let Err(e) = fs::remove_dir_all(&timeline_dir) {
|
||||
error!(
|
||||
"Failed to remove temporary directory '{}': {:?}",
|
||||
timeline_dir.display(),
|
||||
@@ -660,7 +719,7 @@ fn collect_timeline_files(
|
||||
.parse::<ZTimelineId>()
|
||||
.context("Could not parse timeline id out of the timeline dir name")?;
|
||||
let timeline_dir_entries =
|
||||
std::fs::read_dir(&timeline_dir).context("Failed to list timeline dir contents")?;
|
||||
fs::read_dir(&timeline_dir).context("Failed to list timeline dir contents")?;
|
||||
for entry in timeline_dir_entries {
|
||||
let entry_path = entry.context("Failed to list timeline dir entry")?.path();
|
||||
if entry_path.is_file() {
|
||||
@@ -671,7 +730,7 @@ fn collect_timeline_files(
|
||||
continue;
|
||||
} else if is_temporary(&entry_path) {
|
||||
info!("removing temp timeline file at {}", entry_path.display());
|
||||
std::fs::remove_file(&entry_path).with_context(|| {
|
||||
fs::remove_file(&entry_path).with_context(|| {
|
||||
format!(
|
||||
"failed to remove temp download file at {}",
|
||||
entry_path.display()
|
||||
@@ -695,7 +754,7 @@ fn collect_timeline_files(
|
||||
None => anyhow::bail!("No metadata file found in the timeline directory"),
|
||||
};
|
||||
let metadata = TimelineMetadata::from_bytes(
|
||||
&std::fs::read(&timeline_metadata_path).context("Failed to read timeline metadata file")?,
|
||||
&fs::read(&timeline_metadata_path).context("Failed to read timeline metadata file")?,
|
||||
)
|
||||
.context("Failed to parse timeline metadata file bytes")?;
|
||||
|
||||
|
||||
@@ -1,16 +1,55 @@
|
||||
import os
|
||||
from contextlib import closing
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.metrics import PAGESERVER_PER_TENANT_METRICS, parse_metrics
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
|
||||
from fixtures.types import Lsn, ZTenantId
|
||||
from prometheus_client.samples import Sample
|
||||
|
||||
|
||||
def test_tenant_creation_fails(neon_simple_env: NeonEnv):
|
||||
tenants_dir = Path(neon_simple_env.repo_dir) / "tenants"
|
||||
initial_tenants = sorted(
|
||||
map(lambda t: t.split()[0], neon_simple_env.neon_cli.list_tenants().stdout.splitlines())
|
||||
)
|
||||
initial_tenant_dirs = set([d for d in tenants_dir.iterdir()])
|
||||
|
||||
neon_simple_env.pageserver.safe_psql("failpoints tenant-creation-before-tmp-rename=return")
|
||||
with pytest.raises(Exception, match="tenant-creation-before-tmp-rename"):
|
||||
_ = neon_simple_env.neon_cli.create_tenant()
|
||||
|
||||
new_tenants = sorted(
|
||||
map(lambda t: t.split()[0], neon_simple_env.neon_cli.list_tenants().stdout.splitlines())
|
||||
)
|
||||
assert initial_tenants == new_tenants, "should not create new tenants"
|
||||
|
||||
new_tenant_dirs = list(set([d for d in tenants_dir.iterdir()]) - initial_tenant_dirs)
|
||||
assert len(new_tenant_dirs) == 1, "should have new tenant directory created"
|
||||
tmp_tenant_dir = new_tenant_dirs[0]
|
||||
assert str(tmp_tenant_dir).endswith(
|
||||
".___temp"
|
||||
), "new tenant directory created should be a temporary one"
|
||||
|
||||
neon_simple_env.pageserver.stop()
|
||||
neon_simple_env.pageserver.start()
|
||||
|
||||
tenants_after_restart = sorted(
|
||||
map(lambda t: t.split()[0], neon_simple_env.neon_cli.list_tenants().stdout.splitlines())
|
||||
)
|
||||
dirs_after_restart = set([d for d in tenants_dir.iterdir()])
|
||||
assert (
|
||||
tenants_after_restart == initial_tenants
|
||||
), "should load all non-corrupt tenants after restart"
|
||||
assert (
|
||||
dirs_after_restart == initial_tenant_dirs
|
||||
), "pageserver should clean its temp tenant dirs on restart"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("with_safekeepers", [False, True])
|
||||
def test_tenants_normal_work(neon_env_builder: NeonEnvBuilder, with_safekeepers: bool):
|
||||
if with_safekeepers:
|
||||
|
||||
Reference in New Issue
Block a user