Compare commits

..

1 Commits

Author SHA1 Message Date
Bojan Serafimov
9d6b78861d WIP 2022-01-11 12:06:32 -05:00
48 changed files with 354 additions and 1106 deletions

3
Cargo.lock generated
View File

@@ -1424,19 +1424,16 @@ dependencies = [
"bytes",
"clap",
"hex",
"hyper",
"lazy_static",
"md5",
"parking_lot",
"rand",
"reqwest",
"routerify",
"rustls 0.19.1",
"serde",
"serde_json",
"tokio",
"tokio-postgres",
"zenith_metrics",
"zenith_utils",
]

View File

@@ -294,7 +294,6 @@ impl PostgresNode {
conf.append("max_replication_slots", "10");
conf.append("hot_standby", "on");
conf.append("shared_buffers", "1MB");
conf.append("zenith.file_cache_size", "4096");
conf.append("fsync", "off");
conf.append("max_connections", "100");
conf.append("wal_level", "replica");

View File

@@ -9,7 +9,6 @@
use anyhow::{anyhow, bail, Context, Result};
use std::fs;
use std::path::Path;
use std::process::Command;
pub mod compute;
pub mod local_env;
@@ -32,19 +31,3 @@ pub fn read_pidfile(pidfile: &Path) -> Result<i32> {
}
Ok(pid)
}
fn fill_rust_env_vars(cmd: &mut Command) -> &mut Command {
let cmd = cmd.env_clear().env("RUST_BACKTRACE", "1");
let var = "LLVM_PROFILE_FILE";
if let Some(val) = std::env::var_os(var) {
cmd.env(var, val);
}
const RUST_LOG_KEY: &str = "RUST_LOG";
if let Ok(rust_log_value) = std::env::var(RUST_LOG_KEY) {
cmd.env(RUST_LOG_KEY, rust_log_value)
} else {
cmd
}
}

View File

@@ -17,8 +17,8 @@ use thiserror::Error;
use zenith_utils::http::error::HttpErrorBody;
use crate::local_env::{LocalEnv, SafekeeperConf};
use crate::read_pidfile;
use crate::storage::PageServerNode;
use crate::{fill_rust_env_vars, read_pidfile};
use zenith_utils::connstring::connection_address;
#[derive(Error, Debug)]
@@ -118,17 +118,22 @@ impl SafekeeperNode {
let listen_http = format!("localhost:{}", self.conf.http_port);
let mut cmd = Command::new(self.env.safekeeper_bin()?);
fill_rust_env_vars(
cmd.args(&["-D", self.datadir_path().to_str().unwrap()])
.args(&["--listen-pg", &listen_pg])
.args(&["--listen-http", &listen_http])
.args(&["--recall", "1 second"])
.arg("--daemonize"),
);
cmd.args(&["-D", self.datadir_path().to_str().unwrap()])
.args(&["--listen-pg", &listen_pg])
.args(&["--listen-http", &listen_http])
.args(&["--recall", "1 second"])
.arg("--daemonize")
.env_clear()
.env("RUST_BACKTRACE", "1");
if !self.conf.sync {
cmd.arg("--no-sync");
}
let var = "LLVM_PROFILE_FILE";
if let Some(val) = std::env::var_os(var) {
cmd.env(var, val);
}
if !cmd.status()?.success() {
bail!(
"Safekeeper failed to start. See '{}' for details.",

View File

@@ -19,7 +19,7 @@ use zenith_utils::postgres_backend::AuthType;
use zenith_utils::zid::ZTenantId;
use crate::local_env::LocalEnv;
use crate::{fill_rust_env_vars, read_pidfile};
use crate::read_pidfile;
use pageserver::branches::BranchInfo;
use pageserver::tenant_mgr::TenantInfo;
use zenith_utils::connstring::connection_address;
@@ -96,49 +96,46 @@ impl PageServerNode {
.unwrap()
}
pub fn init(
&self,
create_tenant: Option<&str>,
config_overrides: &[&str],
) -> anyhow::Result<()> {
pub fn init(&self, create_tenant: Option<&str>) -> anyhow::Result<()> {
let mut cmd = Command::new(self.env.pageserver_bin()?);
let var = "LLVM_PROFILE_FILE";
if let Some(val) = std::env::var_os(var) {
cmd.env(var, val);
}
// FIXME: the paths should be shell-escaped to handle paths with spaces, quotas etc.
let base_data_dir_param = self.env.base_data_dir.display().to_string();
let pg_distrib_dir_param =
format!("pg_distrib_dir='{}'", self.env.pg_distrib_dir.display());
let authg_type_param = format!("auth_type='{}'", self.env.pageserver.auth_type);
let listen_http_addr_param = format!(
"listen_http_addr='{}'",
self.env.pageserver.listen_http_addr
);
let listen_pg_addr_param =
format!("listen_pg_addr='{}'", self.env.pageserver.listen_pg_addr);
let mut args = Vec::with_capacity(20);
args.push("--init");
args.extend(["-D", &base_data_dir_param]);
args.extend(["-c", &pg_distrib_dir_param]);
args.extend(["-c", &authg_type_param]);
args.extend(["-c", &listen_http_addr_param]);
args.extend(["-c", &listen_pg_addr_param]);
for config_override in config_overrides {
args.extend(["-c", config_override]);
}
let mut args = vec![
"--init".to_string(),
"-D".to_string(),
self.env.base_data_dir.display().to_string(),
"-c".to_string(),
format!("pg_distrib_dir='{}'", self.env.pg_distrib_dir.display()),
"-c".to_string(),
format!("auth_type='{}'", self.env.pageserver.auth_type),
"-c".to_string(),
format!(
"listen_http_addr='{}'",
self.env.pageserver.listen_http_addr
),
"-c".to_string(),
format!("listen_pg_addr='{}'", self.env.pageserver.listen_pg_addr),
];
if self.env.pageserver.auth_type != AuthType::Trust {
args.extend([
"-c",
"auth_validation_public_key_path='auth_public_key.pem'",
"-c".to_string(),
"auth_validation_public_key_path='auth_public_key.pem'".to_string(),
]);
}
if let Some(tenantid) = create_tenant {
args.extend(["--create-tenant", tenantid])
args.extend(["--create-tenant".to_string(), tenantid.to_string()])
}
let status = fill_rust_env_vars(cmd.args(args))
let status = cmd
.args(args)
.env_clear()
.env("RUST_BACKTRACE", "1")
.status()
.expect("pageserver init failed");
@@ -157,7 +154,7 @@ impl PageServerNode {
self.repo_path().join("pageserver.pid")
}
pub fn start(&self, config_overrides: &[&str]) -> anyhow::Result<()> {
pub fn start(&self) -> anyhow::Result<()> {
print!(
"Starting pageserver at '{}' in '{}'",
connection_address(&self.pg_connection_config),
@@ -166,16 +163,16 @@ impl PageServerNode {
io::stdout().flush().unwrap();
let mut cmd = Command::new(self.env.pageserver_bin()?);
cmd.args(&["-D", self.repo_path().to_str().unwrap()])
.arg("--daemonize")
.env_clear()
.env("RUST_BACKTRACE", "1");
let repo_path = self.repo_path();
let mut args = vec!["-D", repo_path.to_str().unwrap()];
for config_override in config_overrides {
args.extend(["-c", config_override]);
let var = "LLVM_PROFILE_FILE";
if let Some(val) = std::env::var_os(var) {
cmd.env(var, val);
}
fill_rust_env_vars(cmd.args(&args).arg("--daemonize"));
if !cmd.status()?.success() {
bail!(
"Pageserver failed to start. See '{}' for details.",

View File

@@ -147,10 +147,6 @@ bucket_name = 'some-sample-bucket'
# Name of the region where the bucket is located at
bucket_region = 'eu-north-1'
# A "subfolder" in the bucket, to use the same bucket separately by multiple pageservers at once.
# Optional, pageserver uses entire bucket if the prefix is not specified.
prefix_in_bucket = '/some/prefix/'
# Access key to connect to the bucket ("login" part of the credentials)
access_key_id = 'SOMEKEYAAAAASADSAH*#'

View File

@@ -129,13 +129,13 @@ There are the following implementations present:
* local filesystem — to use in tests mainly
* AWS S3 - to use in production
Implementation details are covered in the [backup readme](./src/remote_storage/README.md) and corresponding Rust file docs, parameters documentation can be found at [settings docs](../docs/settings.md).
Implementation details are covered in the [backup readme](./src/remote_storage/README.md) and corresponding Rust file docs.
The backup service is disabled by default and can be enabled to interact with a single remote storage.
CLI examples:
* Local FS: `${PAGESERVER_BIN} -c "remote_storage={local_path='/some/local/path/'}"`
* AWS S3 : `${PAGESERVER_BIN} -c "remote_storage={bucket_name='some-sample-bucket',bucket_region='eu-north-1', prefix_in_bucket='/test_prefix/',access_key_id='SOMEKEYAAAAASADSAH*#',secret_access_key='SOMEsEcReTsd292v'}"`
* AWS S3 : `${PAGESERVER_BIN} -c "remote_storage={bucket_name='some-sample-bucket',bucket_region='eu-north-1',access_key_id='SOMEKEYAAAAASADSAH*#',secret_access_key='SOMEsEcReTsd292v'}"`
For Amazon AWS S3, a key id and secret access key could be located in `~/.aws/credentials` if awscli was ever configured to work with the desired bucket, on the AWS Settings page for a certain user. Also note, that the bucket names does not contain any protocols when used on AWS.
For local S3 installations, refer to the their documentation for name format and credentials.
@@ -154,7 +154,6 @@ or
[remote_storage]
bucket_name = 'some-sample-bucket'
bucket_region = 'eu-north-1'
prefix_in_bucket = '/test_prefix/'
access_key_id = 'SOMEKEYAAAAASADSAH*#'
secret_access_key = 'SOMEsEcReTsd292v'
```

View File

@@ -53,12 +53,12 @@ fn main() -> Result<()> {
)
// See `settings.md` for more details on the extra configuration patameters pageserver can process
.arg(
Arg::with_name("config-override")
Arg::with_name("config-option")
.short("c")
.takes_value(true)
.number_of_values(1)
.multiple(true)
.help("Additional configuration overrides of the ones from the toml config file (or new ones to add there).
.help("Additional configuration options or overrides of the ones from the toml config file.
Any option has to be a valid toml document, example: `-c \"foo='hey'\"` `-c \"foo={value=1}\"`"),
)
.get_matches();
@@ -105,7 +105,7 @@ fn main() -> Result<()> {
};
// Process any extra options given with -c
if let Some(values) = arg_matches.values_of("config-override") {
if let Some(values) = arg_matches.values_of("config-option") {
for option_line in values {
let doc = toml_edit::Document::from_str(option_line).with_context(|| {
format!(
@@ -195,10 +195,9 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
}
let signals = signals::install_shutdown_handlers()?;
let (async_shutdown_tx, async_shutdown_rx) = tokio::sync::watch::channel(());
let mut threads = Vec::new();
let sync_startup = remote_storage::start_local_timeline_sync(conf, async_shutdown_rx)
let sync_startup = remote_storage::start_local_timeline_sync(conf)
.context("Failed to set up local files sync with external storage")?;
if let Some(handle) = sync_startup.sync_loop_handle {
@@ -256,7 +255,6 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
signal.name()
);
async_shutdown_tx.send(())?;
postgres_backend::set_pgbackend_shutdown_requested();
tenant_mgr::shutdown_all_tenants()?;
endpoint::shutdown();

View File

@@ -45,16 +45,14 @@ impl BranchInfo {
repo: &Arc<dyn Repository>,
include_non_incremental_logical_size: bool,
) -> Result<Self> {
let path = path.as_ref();
let name = path.file_name().unwrap().to_string_lossy().to_string();
let timeline_id = std::fs::read_to_string(path)
.with_context(|| {
format!(
"Failed to read branch file contents at path '{}'",
path.display()
)
})?
.parse::<ZTimelineId>()?;
let name = path
.as_ref()
.file_name()
.unwrap()
.to_str()
.unwrap()
.to_string();
let timeline_id = std::fs::read_to_string(path)?.parse::<ZTimelineId>()?;
let timeline = match repo.get_timeline(timeline_id)? {
RepositoryTimeline::Local(local_entry) => local_entry,

View File

@@ -43,9 +43,6 @@ pub mod defaults {
pub const DEFAULT_PAGE_CACHE_SIZE: usize = 8192;
pub const DEFAULT_MAX_FILE_DESCRIPTORS: usize = 100;
pub const DEFAULT_MAX_DELTA_LAYERS: usize = 10;
pub const DEFAULT_IMAGE_LAYER_GENERATION_THRESHOLD: usize = 50;
///
/// Default built-in configuration file.
///
@@ -93,21 +90,6 @@ pub struct PageServerConf {
pub page_cache_size: usize,
pub max_file_descriptors: usize,
//
// Minimal total size of delta layeres which triggers generation of image layer by checkpointer.
// It is specified as percent of maximal sigment size (RELISH_SEG_SIZE).
// I.e. it means that checkpoint will create image layer in addition to delta layer only when total size
// of delta layers since last image layer exceeds specified percent of segment size.
//
pub image_layer_generation_threshold: usize,
//
// Maximal number of delta layers which can be stored before image layere should be generated.
// The garbage collector needs image layers in order to delete files.
// If this number is too large it can result in too many small files on disk.
//
pub max_delta_layers: usize,
// Repository directory, relative to current working directory.
// Normally, the page server changes the current working directory
// to the repository, and 'workdir' is always '.'. But we don't do
@@ -153,8 +135,6 @@ pub struct S3Config {
pub bucket_name: String,
/// The region where the bucket is located at.
pub bucket_region: String,
/// A "subfolder" in the bucket, to use the same bucket separately by multiple pageservers at once.
pub prefix_in_bucket: Option<String>,
/// "Login" to use when connecting to bucket.
/// Can be empty for cases like AWS k8s IAM
/// where we can allow certain pods to connect
@@ -169,7 +149,6 @@ impl std::fmt::Debug for S3Config {
f.debug_struct("S3Config")
.field("bucket_name", &self.bucket_name)
.field("bucket_region", &self.bucket_region)
.field("prefix_in_bucket", &self.prefix_in_bucket)
.finish()
}
}
@@ -246,9 +225,6 @@ impl PageServerConf {
page_cache_size: DEFAULT_PAGE_CACHE_SIZE,
max_file_descriptors: DEFAULT_MAX_FILE_DESCRIPTORS,
max_delta_layers: DEFAULT_MAX_DELTA_LAYERS,
image_layer_generation_threshold: DEFAULT_IMAGE_LAYER_GENERATION_THRESHOLD,
pg_distrib_dir: PathBuf::new(),
auth_validation_public_key_path: None,
auth_type: AuthType::Trust,
@@ -271,10 +247,6 @@ impl PageServerConf {
"max_file_descriptors" => {
conf.max_file_descriptors = parse_toml_u64(key, item)? as usize
}
"max_delta_layers" => conf.max_delta_layers = parse_toml_u64(key, item)? as usize,
"image_layer_generation_threshold" => {
conf.image_layer_generation_threshold = parse_toml_u64(key, item)? as usize
}
"pg_distrib_dir" => {
conf.pg_distrib_dir = PathBuf::from(parse_toml_string(key, item)?)
}
@@ -360,26 +332,18 @@ impl PageServerConf {
bail!("'bucket_name' option is mandatory if 'bucket_region' is given ")
}
(None, Some(bucket_name), Some(bucket_region)) => RemoteStorageKind::AwsS3(S3Config {
bucket_name: parse_toml_string("bucket_name", bucket_name)?,
bucket_region: parse_toml_string("bucket_region", bucket_region)?,
bucket_name: bucket_name.as_str().unwrap().to_string(),
bucket_region: bucket_region.as_str().unwrap().to_string(),
access_key_id: toml
.get("access_key_id")
.map(|access_key_id| parse_toml_string("access_key_id", access_key_id))
.transpose()?,
.map(|x| x.as_str().unwrap().to_string()),
secret_access_key: toml
.get("secret_access_key")
.map(|secret_access_key| {
parse_toml_string("secret_access_key", secret_access_key)
})
.transpose()?,
prefix_in_bucket: toml
.get("prefix_in_bucket")
.map(|prefix_in_bucket| parse_toml_string("prefix_in_bucket", prefix_in_bucket))
.transpose()?,
.map(|x| x.as_str().unwrap().to_string()),
}),
(Some(local_path), None, None) => RemoteStorageKind::LocalFs(PathBuf::from(
parse_toml_string("local_path", local_path)?,
)),
(Some(local_path), None, None) => {
RemoteStorageKind::LocalFs(PathBuf::from(local_path.as_str().unwrap()))
}
(Some(_), Some(_), _) => bail!("local_path and bucket_name are mutually exclusive"),
};
@@ -404,8 +368,6 @@ impl PageServerConf {
gc_period: Duration::from_secs(10),
page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE,
max_file_descriptors: defaults::DEFAULT_MAX_FILE_DESCRIPTORS,
max_delta_layers: defaults::DEFAULT_MAX_DELTA_LAYERS,
image_layer_generation_threshold: defaults::DEFAULT_IMAGE_LAYER_GENERATION_THRESHOLD,
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
superuser: "zenith_admin".to_string(),
@@ -477,9 +439,6 @@ gc_horizon = 222
page_cache_size = 444
max_file_descriptors = 333
max_delta_layers = 10
image_layer_generation_threshold = 50
# initial superuser role name to use when creating a new tenant
initial_superuser_name = 'zzzz'
@@ -510,9 +469,6 @@ initial_superuser_name = 'zzzz'
superuser: defaults::DEFAULT_SUPERUSER.to_string(),
page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE,
max_file_descriptors: defaults::DEFAULT_MAX_FILE_DESCRIPTORS,
max_delta_layers: defaults::DEFAULT_MAX_DELTA_LAYERS,
image_layer_generation_threshold:
defaults::DEFAULT_IMAGE_LAYER_GENERATION_THRESHOLD,
workdir,
pg_distrib_dir,
auth_type: AuthType::Trust,
@@ -554,8 +510,6 @@ initial_superuser_name = 'zzzz'
superuser: "zzzz".to_string(),
page_cache_size: 444,
max_file_descriptors: 333,
max_delta_layers: 10,
image_layer_generation_threshold: 50,
workdir,
pg_distrib_dir,
auth_type: AuthType::Trust,
@@ -631,7 +585,6 @@ pg_distrib_dir='{}'
let bucket_name = "some-sample-bucket".to_string();
let bucket_region = "eu-north-1".to_string();
let prefix_in_bucket = "test_prefix".to_string();
let access_key_id = "SOMEKEYAAAAASADSAH*#".to_string();
let secret_access_key = "SOMEsEcReTsd292v".to_string();
let max_concurrent_sync = NonZeroUsize::new(111).unwrap();
@@ -644,14 +597,13 @@ max_concurrent_sync = {}
max_sync_errors = {}
bucket_name = '{}'
bucket_region = '{}'
prefix_in_bucket = '{}'
access_key_id = '{}'
secret_access_key = '{}'"#,
max_concurrent_sync, max_sync_errors, bucket_name, bucket_region, prefix_in_bucket, access_key_id, secret_access_key
max_concurrent_sync, max_sync_errors, bucket_name, bucket_region, access_key_id, secret_access_key
),
format!(
"remote_storage={{max_concurrent_sync={}, max_sync_errors={}, bucket_name='{}', bucket_region='{}', prefix_in_bucket='{}', access_key_id='{}', secret_access_key='{}'}}",
max_concurrent_sync, max_sync_errors, bucket_name, bucket_region, prefix_in_bucket, access_key_id, secret_access_key
"remote_storage={{max_concurrent_sync = {}, max_sync_errors = {}, bucket_name='{}', bucket_region='{}', access_key_id='{}', secret_access_key='{}'}}",
max_concurrent_sync, max_sync_errors, bucket_name, bucket_region, access_key_id, secret_access_key
),
];
@@ -685,7 +637,6 @@ pg_distrib_dir='{}'
bucket_region: bucket_region.clone(),
access_key_id: Some(access_key_id.clone()),
secret_access_key: Some(secret_access_key.clone()),
prefix_in_bucket: Some(prefix_in_bucket.clone())
}),
},
"Remote storage config should correctly parse the S3 config"

View File

@@ -1,6 +1,6 @@
use std::sync::Arc;
use anyhow::{Context, Result};
use anyhow::{bail, Context, Result};
use hyper::header;
use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
@@ -190,27 +190,18 @@ async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>,
}
#[derive(Debug, Serialize)]
#[serde(tag = "type")]
enum TimelineInfo {
Local {
#[serde(with = "hex")]
timeline_id: ZTimelineId,
#[serde(with = "hex")]
tenant_id: ZTenantId,
#[serde(with = "opt_display_serde")]
ancestor_timeline_id: Option<ZTimelineId>,
last_record_lsn: Lsn,
prev_record_lsn: Lsn,
start_lsn: Lsn,
disk_consistent_lsn: Lsn,
timeline_state: Option<TimelineSyncState>,
},
Remote {
#[serde(with = "hex")]
timeline_id: ZTimelineId,
#[serde(with = "hex")]
tenant_id: ZTenantId,
},
struct TimelineInfo {
#[serde(with = "hex")]
timeline_id: ZTimelineId,
#[serde(with = "hex")]
tenant_id: ZTenantId,
#[serde(with = "opt_display_serde")]
ancestor_timeline_id: Option<ZTimelineId>,
last_record_lsn: Lsn,
prev_record_lsn: Lsn,
start_lsn: Lsn,
disk_consistent_lsn: Lsn,
timeline_state: Option<TimelineSyncState>,
}
async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
@@ -224,12 +215,9 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
info_span!("timeline_detail_handler", tenant = %tenant_id, timeline = %timeline_id)
.entered();
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
Ok::<_, anyhow::Error>(match repo.get_timeline(timeline_id)?.local_timeline() {
None => TimelineInfo::Remote {
timeline_id,
tenant_id,
},
Some(timeline) => TimelineInfo::Local {
match repo.get_timeline(timeline_id)?.local_timeline() {
None => bail!("Timeline with id {} is not present locally", timeline_id),
Some(timeline) => Ok::<_, anyhow::Error>(TimelineInfo {
timeline_id,
tenant_id,
ancestor_timeline_id: timeline.get_ancestor_timeline_id(),
@@ -238,8 +226,8 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
prev_record_lsn: timeline.get_prev_record_lsn(),
start_lsn: timeline.get_start_lsn(),
timeline_state: repo.get_timeline_state(timeline_id),
},
})
}),
}
})
.await
.map_err(ApiError::from_err)??;

View File

@@ -41,7 +41,6 @@ use crate::repository::{
TimelineWriter, ZenithWalRecord,
};
use crate::tenant_mgr;
use crate::virtual_file::VirtualFile;
use crate::walreceiver;
use crate::walreceiver::IS_WAL_RECEIVER;
use crate::walredo::WalRedoManager;
@@ -128,13 +127,7 @@ pub struct LayeredRepository {
conf: &'static PageServerConf,
tenantid: ZTenantId,
timelines: Mutex<HashMap<ZTimelineId, LayeredTimelineEntry>>,
// This mutex prevents creation of new timelines during GC.
// Adding yet another mutex (in addition to `timelines`) is needed because holding
// `timelines` mutex during all GC iteration (especially with enforced checkpoint)
// may block for a long time `get_timeline`, `get_timelines_state`,... and other operations
// with timelines, which in turn may cause dropping replication connection, expiration of wait_for_lsn
// timeout...
gc_cs: Mutex<()>,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
/// Makes every timeline to backup their files to remote storage.
upload_relishes: bool,
@@ -193,8 +186,6 @@ impl Repository for LayeredRepository {
// We need to hold this lock to prevent GC from starting at the same time. GC scans the directory to learn
// about timelines, so otherwise a race condition is possible, where we create new timeline and GC
// concurrently removes data that is needed by the new timeline.
let _gc_cs = self.gc_cs.lock().unwrap();
let mut timelines = self.timelines.lock().unwrap();
let src_timeline = match self.get_or_init_timeline(src, &mut timelines)? {
LayeredTimelineEntry::Local(timeline) => timeline,
@@ -368,7 +359,7 @@ fn shutdown_timeline(
timeline
.upload_relishes
.store(false, atomic::Ordering::Relaxed);
walreceiver::stop_wal_receiver(tenant_id, timeline_id);
walreceiver::stop_wal_receiver(timeline_id);
trace!("repo shutdown. checkpoint timeline {}", timeline_id);
// Do not reconstruct pages to reduce shutdown time
timeline.checkpoint(CheckpointConfig::Flush)?;
@@ -498,7 +489,6 @@ impl LayeredRepository {
tenantid,
conf,
timelines: Mutex::new(HashMap::new()),
gc_cs: Mutex::new(()),
walredo_mgr,
upload_relishes,
}
@@ -515,10 +505,10 @@ impl LayeredRepository {
let _enter = info_span!("saving metadata").entered();
let path = metadata_path(conf, timelineid, tenantid);
// use OpenOptions to ensure file presence is consistent with first_save
let mut file = VirtualFile::open_with_options(
&path,
OpenOptions::new().write(true).create_new(first_save),
)?;
let mut file = OpenOptions::new()
.write(true)
.create_new(first_save)
.open(&path)?;
let metadata_bytes = data.to_bytes().context("Failed to get metadata bytes")?;
@@ -585,8 +575,7 @@ impl LayeredRepository {
let now = Instant::now();
// grab mutex to prevent new timelines from being created here.
let _gc_cs = self.gc_cs.lock().unwrap();
// TODO: We will hold it for a long time
let mut timelines = self.timelines.lock().unwrap();
// Scan all timelines. For each timeline, remember the timeline ID and
@@ -674,7 +663,6 @@ impl LayeredRepository {
}
if let Some(cutoff) = timeline.get_last_record_lsn().checked_sub(horizon) {
drop(timelines);
let branchpoints: Vec<Lsn> = all_branchpoints
.range((
Included((timelineid, Lsn(0))),
@@ -694,7 +682,6 @@ impl LayeredRepository {
let result = timeline.gc_timeline(branchpoints, cutoff)?;
totals += result;
timelines = self.timelines.lock().unwrap();
}
}
@@ -772,12 +759,6 @@ pub struct LayeredTimeline {
/// to avoid deadlock.
write_lock: Mutex<()>,
// Prevent concurrent checkpoints.
// Checkpoints are normally performed by one thread. But checkpoint can also be manually requested by admin
// (that's used in tests), and shutdown also forces a checkpoint. These forced checkpoints run in a different thread
// and could be triggered at the same time as a normal checkpoint.
checkpoint_cs: Mutex<()>,
// Needed to ensure that we can't create a branch at a point that was already garbage collected
latest_gc_cutoff_lsn: AtomicLsn,
@@ -1137,7 +1118,6 @@ impl LayeredTimeline {
upload_relishes: AtomicBool::new(upload_relishes),
write_lock: Mutex::new(()),
checkpoint_cs: Mutex::new(()),
latest_gc_cutoff_lsn: AtomicLsn::from(metadata.latest_gc_cutoff_lsn()),
initdb_lsn: metadata.initdb_lsn(),
@@ -1455,9 +1435,6 @@ impl LayeredTimeline {
///
/// NOTE: This has nothing to do with checkpoint in PostgreSQL.
fn checkpoint_internal(&self, checkpoint_distance: u64, reconstruct_pages: bool) -> Result<()> {
// Prevent concurrent checkpoints
let _checkpoint_cs = self.checkpoint_cs.lock().unwrap();
let mut write_guard = self.write_lock.lock().unwrap();
let mut layers = self.layers.lock().unwrap();
@@ -1598,7 +1575,7 @@ impl LayeredTimeline {
Ok(())
}
fn evict_layer(&self, layer_id: LayerId, mut reconstruct_pages: bool) -> Result<Vec<PathBuf>> {
fn evict_layer(&self, layer_id: LayerId, reconstruct_pages: bool) -> Result<Vec<PathBuf>> {
// Mark the layer as no longer accepting writes and record the end_lsn.
// This happens in-place, no new layers are created now.
// We call `get_last_record_lsn` again, which may be different from the
@@ -1611,27 +1588,8 @@ impl LayeredTimeline {
let global_layer_map = GLOBAL_LAYER_MAP.read().unwrap();
if let Some(oldest_layer) = global_layer_map.get(&layer_id) {
let last_lsn = self.get_last_record_lsn();
// Avoid creation of image layers if there are not so much deltas
if reconstruct_pages
&& oldest_layer.get_seg_tag().rel.is_blocky()
&& self.conf.image_layer_generation_threshold != 0
{
let (n_delta_layers, total_delta_size) =
layers.count_delta_layers(oldest_layer.get_seg_tag(), last_lsn)?;
let logical_segment_size =
oldest_layer.get_seg_size(last_lsn)? as u64 * BLCKSZ as u64;
let physical_deltas_size = total_delta_size + oldest_layer.get_physical_size()?;
if logical_segment_size * self.conf.image_layer_generation_threshold as u64
> physical_deltas_size * 100
&& n_delta_layers < self.conf.max_delta_layers
{
reconstruct_pages = false;
}
}
drop(global_layer_map);
oldest_layer.freeze(last_lsn);
oldest_layer.freeze(self.get_last_record_lsn());
// The layer is no longer open, update the layer map to reflect this.
// We will replace it with on-disk historics below.

View File

@@ -161,14 +161,6 @@ pub struct DeltaLayerInner {
}
impl DeltaLayerInner {
fn get_physical_size(&self) -> Result<u64> {
Ok(if let Some(book) = &self.book {
book.chapter_reader(PAGE_VERSIONS_CHAPTER)?.len()
} else {
0
})
}
fn get_seg_size(&self, lsn: Lsn) -> Result<SegmentBlk> {
// Scan the VecMap backwards, starting from the given entry.
let slice = self
@@ -297,12 +289,6 @@ impl Layer for DeltaLayer {
}
}
// Get physical size of the layer
fn get_physical_size(&self) -> Result<u64> {
// TODO: is it actually necessary to load layer to get it's size?
self.load()?.get_physical_size()
}
/// Get size of the relation at given LSN
fn get_seg_size(&self, lsn: Lsn) -> Result<SegmentBlk> {
assert!(lsn >= self.start_lsn);

View File

@@ -41,7 +41,7 @@ pub struct EphemeralFile {
_timelineid: ZTimelineId,
file: Arc<VirtualFile>,
pub pos: u64,
pos: u64,
}
impl EphemeralFile {

View File

@@ -201,11 +201,6 @@ impl Layer for ImageLayer {
}
}
// Get physical size of the layer
fn get_physical_size(&self) -> Result<u64> {
Ok(self.get_seg_size(Lsn(0))? as u64 * BLOCK_SIZE as u64)
}
/// Does this segment exist at given LSN?
fn get_seg_exists(&self, _lsn: Lsn) -> Result<bool> {
Ok(true)

View File

@@ -80,10 +80,6 @@ impl InMemoryLayerInner {
assert!(self.end_lsn.is_none());
}
fn get_physical_size(&self) -> u64 {
self.page_versions.size()
}
fn get_seg_size(&self, lsn: Lsn) -> SegmentBlk {
// Scan the BTreeMap backwards, starting from the given entry.
let slice = self.seg_sizes.slice_range(..=lsn);
@@ -225,12 +221,7 @@ impl Layer for InMemoryLayer {
}
}
// Get physical size of the layer
fn get_physical_size(&self) -> Result<u64> {
Ok(self.inner.read().unwrap().get_physical_size() as u64)
}
/// Get logical size of the relation at given LSN
/// Get size of the relation at given LSN
fn get_seg_size(&self, lsn: Lsn) -> Result<SegmentBlk> {
assert!(lsn >= self.start_lsn);
ensure!(
@@ -625,7 +616,7 @@ impl InMemoryLayer {
let image_lsn: Option<Lsn>;
let delta_end_lsn: Option<Lsn>;
if self.is_dropped() || !reconstruct_pages {
// Create just a delta layer containing all the
// The segment was dropped. Create just a delta layer containing all the
// changes up to and including the drop.
delta_end_lsn = Some(end_lsn_exclusive);
image_lsn = None;

View File

@@ -111,14 +111,6 @@ where
}
}
/// Iterate over all items with start bound <= 'key'
pub fn iter_older(&self, key: I::Key) -> IntervalIter<I> {
IntervalIter {
point_iter: self.points.range(..key),
elem_iter: None,
}
}
/// Iterate over all items
pub fn iter(&self) -> IntervalIter<I> {
IntervalIter {
@@ -238,35 +230,6 @@ where
}
}
impl<'a, I> DoubleEndedIterator for IntervalIter<'a, I>
where
I: IntervalItem + ?Sized,
{
fn next_back(&mut self) -> Option<Self::Item> {
// Iterate over all elements in all the points in 'point_iter'. To avoid
// returning the same element twice, we only return each element at its
// starting point.
loop {
// Return next remaining element from the current point
if let Some((point_key, elem_iter)) = &mut self.elem_iter {
while let Some(elem) = elem_iter.next_back() {
if elem.start_key() == *point_key {
return Some(Arc::clone(elem));
}
}
}
// No more elements at this point. Move to next point.
if let Some((point_key, point)) = self.point_iter.next_back() {
self.elem_iter = Some((*point_key, point.elements.iter()));
continue;
} else {
// No more points, all done
return None;
}
}
}
}
impl<I: ?Sized> Default for IntervalTree<I>
where
I: IntervalItem,

View File

@@ -199,14 +199,6 @@ impl LayerMap {
}
}
pub fn count_delta_layers(&self, seg: SegmentTag, lsn: Lsn) -> Result<(usize, u64)> {
if let Some(segentry) = self.segs.get(&seg) {
segentry.count_delta_layers(lsn)
} else {
Ok((0, 0))
}
}
/// Is there any layer for given segment that is alive at the lsn?
///
/// This is a public wrapper for SegEntry fucntion,
@@ -328,22 +320,6 @@ impl SegEntry {
.any(|layer| !layer.is_incremental())
}
// Count number of delta layers preceeding specified `lsn`.
// Perform backward iteration from exclusive upper bound until image layer is reached.
pub fn count_delta_layers(&self, lsn: Lsn) -> Result<(usize, u64)> {
let mut count: usize = 0;
let mut total_size: u64 = 0;
let mut iter = self.historic.iter_older(lsn);
while let Some(layer) = iter.next_back() {
if !layer.is_incremental() {
break;
}
count += 1;
total_size += layer.get_physical_size()?;
}
Ok((count, total_size))
}
// Set new open layer for a SegEntry.
// It's ok to rewrite previous open layer,
// but only if it is not writeable anymore.

View File

@@ -39,10 +39,6 @@ impl PageVersions {
}
}
pub fn size(&self) -> u64 {
self.file.pos
}
pub fn append_or_update_last(
&mut self,
blknum: u32,

View File

@@ -154,15 +154,12 @@ pub trait Layer: Send + Sync {
reconstruct_data: &mut PageReconstructData,
) -> Result<PageReconstructResult>;
/// Return logical size of the segment at given LSN. (Only for blocky relations.)
/// Return size of the segment at given LSN. (Only for blocky relations.)
fn get_seg_size(&self, lsn: Lsn) -> Result<SegmentBlk>;
/// Does the segment exist at given LSN? Or was it dropped before it.
fn get_seg_exists(&self, lsn: Lsn) -> Result<bool>;
// Get physical size of the layer
fn get_physical_size(&self) -> Result<u64>;
/// Does this layer only contain some data for the segment (incremental),
/// or does it contain a version of every page? This is important to know
/// for garbage collecting old layers: an incremental layer depends on

View File

@@ -13,7 +13,6 @@
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use lazy_static::lazy_static;
use postgres_ffi::pg_constants::BLCKSZ;
use regex::Regex;
use std::net::TcpListener;
use std::str;
@@ -43,8 +42,6 @@ use crate::tenant_mgr;
use crate::walreceiver;
use crate::CheckpointConfig;
const CHUNK_SIZE: u32 = 128; // 1Mb
// Wrapped in libpq CopyData
enum PagestreamFeMessage {
Exists(PagestreamExistsRequest),
@@ -94,8 +91,7 @@ struct PagestreamNblocksResponse {
#[derive(Debug)]
struct PagestreamGetPageResponse {
n_blocks: u32,
data: Bytes,
page: Bytes,
}
#[derive(Debug)]
@@ -166,8 +162,7 @@ impl PagestreamBeMessage {
Self::GetPage(resp) => {
bytes.put_u8(102); /* tag from pagestore_client.h */
bytes.put_u32(resp.n_blocks);
bytes.put(&resp.data[..]);
bytes.put(&resp.page[..]);
}
Self::Error(resp) => {
@@ -443,18 +438,11 @@ impl PageServerHandler {
.entered();
let tag = RelishTag::Relation(req.rel);
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest)?;
let rel_size = timeline.get_relish_size(tag, lsn)?.unwrap_or(0);
let blkno = req.blkno;
let n_blocks = u32::min(blkno + CHUNK_SIZE, rel_size) - blkno;
let mut data = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
for i in 0..n_blocks {
let page = timeline.get_page_at_lsn(tag, blkno + i, lsn)?;
data.extend_from_slice(&page);
}
assert!(data.len() == n_blocks as usize * BLCKSZ as usize);
let page = timeline.get_page_at_lsn(tag, req.blkno, lsn)?;
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
n_blocks,
data: data.freeze(),
page,
}))
}
@@ -606,7 +594,7 @@ impl postgres_backend::Handler for PageServerHandler {
tenant_mgr::get_timeline_for_tenant(tenantid, timelineid)
.context("Failed to fetch local timeline for callmemaybe requests")?;
walreceiver::launch_wal_receiver(self.conf, tenantid, timelineid, &connstr);
walreceiver::launch_wal_receiver(self.conf, timelineid, &connstr, tenantid.to_owned());
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("branch_create ") {

View File

@@ -5,7 +5,7 @@
//! There are a few components the storage machinery consists of:
//! * [`RemoteStorage`] trait a CRUD-like generic abstraction to use for adapting external storages with a few implementations:
//! * [`local_fs`] allows to use local file system as an external storage
//! * [`rust_s3`] uses AWS S3 bucket as an external storage
//! * [`rust_s3`] uses AWS S3 bucket entirely as an external storage
//!
//! * synchronization logic at [`storage_sync`] module that keeps pageserver state (both runtime one and the workdir files) and storage state in sync.
//! Synchronization internals are split into submodules
@@ -93,7 +93,7 @@ use std::{
};
use anyhow::{bail, Context};
use tokio::{io, sync};
use tokio::io;
use tracing::{error, info};
use zenith_utils::zid::{ZTenantId, ZTimelineId};
@@ -135,7 +135,6 @@ pub struct SyncStartupData {
/// Along with that, scans tenant files local and remote (if the sync gets enabled) to check the initial timeline states.
pub fn start_local_timeline_sync(
config: &'static PageServerConf,
shutdown_hook: sync::watch::Receiver<()>,
) -> anyhow::Result<SyncStartupData> {
let local_timeline_files = local_tenant_timeline_files(config)
.context("Failed to collect local tenant timeline files")?;
@@ -143,7 +142,6 @@ pub fn start_local_timeline_sync(
match &config.remote_storage_config {
Some(storage_config) => match &storage_config.storage {
RemoteStorageKind::LocalFs(root) => storage_sync::spawn_storage_sync_thread(
shutdown_hook,
config,
local_timeline_files,
LocalFs::new(root.clone(), &config.workdir)?,
@@ -151,7 +149,6 @@ pub fn start_local_timeline_sync(
storage_config.max_sync_errors,
),
RemoteStorageKind::AwsS3(s3_config) => storage_sync::spawn_storage_sync_thread(
shutdown_hook,
config,
local_timeline_files,
S3::new(s3_config, &config.workdir)?,

View File

@@ -1,8 +1,6 @@
//! AWS S3 storage wrapper around `rust_s3` library.
//!
//! Respects `prefix_in_bucket` property from [`S3Config`],
//! allowing multiple pageservers to independently work with the same S3 bucket, if
//! their bucket prefixes are both specified and different.
//! Currently does not allow multiple pageservers to use the same bucket concurrently: objects are
//! placed in the root of the bucket.
use std::path::{Path, PathBuf};
@@ -25,26 +23,8 @@ impl S3ObjectKey {
&self.0
}
fn download_destination(
&self,
pageserver_workdir: &Path,
prefix_to_strip: Option<&str>,
) -> PathBuf {
let path_without_prefix = match prefix_to_strip {
Some(prefix) => self.0.strip_prefix(prefix).unwrap_or_else(|| {
panic!(
"Could not strip prefix '{}' from S3 object key '{}'",
prefix, self.0
)
}),
None => &self.0,
};
pageserver_workdir.join(
path_without_prefix
.split(S3_FILE_SEPARATOR)
.collect::<PathBuf>(),
)
fn download_destination(&self, pageserver_workdir: &Path) -> PathBuf {
pageserver_workdir.join(self.0.split(S3_FILE_SEPARATOR).collect::<PathBuf>())
}
}
@@ -52,7 +32,6 @@ impl S3ObjectKey {
pub struct S3 {
pageserver_workdir: &'static Path,
bucket: Bucket,
prefix_in_bucket: Option<String>,
}
impl S3 {
@@ -70,20 +49,6 @@ impl S3 {
None,
)
.context("Failed to create the s3 credentials")?;
let prefix_in_bucket = aws_config.prefix_in_bucket.as_deref().map(|prefix| {
let mut prefix = prefix;
while prefix.starts_with(S3_FILE_SEPARATOR) {
prefix = &prefix[1..]
}
let mut prefix = prefix.to_string();
while prefix.ends_with(S3_FILE_SEPARATOR) {
prefix.pop();
}
prefix
});
Ok(Self {
bucket: Bucket::new_with_path_style(
aws_config.bucket_name.as_str(),
@@ -92,7 +57,6 @@ impl S3 {
)
.context("Failed to create the s3 bucket")?,
pageserver_workdir,
prefix_in_bucket,
})
}
}
@@ -103,7 +67,7 @@ impl RemoteStorage for S3 {
fn storage_path(&self, local_path: &Path) -> anyhow::Result<Self::StoragePath> {
let relative_path = strip_path_prefix(self.pageserver_workdir, local_path)?;
let mut key = self.prefix_in_bucket.clone().unwrap_or_default();
let mut key = String::new();
for segment in relative_path {
key.push(S3_FILE_SEPARATOR);
key.push_str(&segment.to_string_lossy());
@@ -112,14 +76,13 @@ impl RemoteStorage for S3 {
}
fn local_path(&self, storage_path: &Self::StoragePath) -> anyhow::Result<PathBuf> {
Ok(storage_path
.download_destination(self.pageserver_workdir, self.prefix_in_bucket.as_deref()))
Ok(storage_path.download_destination(self.pageserver_workdir))
}
async fn list(&self) -> anyhow::Result<Vec<Self::StoragePath>> {
let list_response = self
.bucket
.list(self.prefix_in_bucket.clone().unwrap_or_default(), None)
.list(String::new(), None)
.await
.context("Failed to list s3 objects")?;
@@ -262,7 +225,7 @@ mod tests {
assert_eq!(
local_path,
key.download_destination(&repo_harness.conf.workdir, None),
key.download_destination(&repo_harness.conf.workdir),
"Download destination should consist of s3 path joined with the pageserver workdir prefix"
);
@@ -276,18 +239,14 @@ mod tests {
let segment_1 = "matching";
let segment_2 = "file";
let local_path = &repo_harness.conf.workdir.join(segment_1).join(segment_2);
let storage = dummy_storage(&repo_harness.conf.workdir);
let expected_key = S3ObjectKey(format!(
"{}{SEPARATOR}{}{SEPARATOR}{}",
storage.prefix_in_bucket.as_deref().unwrap_or_default(),
"{SEPARATOR}{}{SEPARATOR}{}",
segment_1,
segment_2,
SEPARATOR = S3_FILE_SEPARATOR,
));
let actual_key = storage
let actual_key = dummy_storage(&repo_harness.conf.workdir)
.storage_path(local_path)
.expect("Matching path should map to S3 path normally");
assert_eq!(
@@ -349,30 +308,18 @@ mod tests {
let timeline_dir = repo_harness.timeline_path(&TIMELINE_ID);
let relative_timeline_path = timeline_dir.strip_prefix(&repo_harness.conf.workdir)?;
let s3_key = create_s3_key(
&relative_timeline_path.join("not a metadata"),
storage.prefix_in_bucket.as_deref(),
);
let s3_key = create_s3_key(&relative_timeline_path.join("not a metadata"));
assert_eq!(
s3_key.download_destination(
&repo_harness.conf.workdir,
storage.prefix_in_bucket.as_deref()
),
s3_key.download_destination(&repo_harness.conf.workdir),
storage
.local_path(&s3_key)
.expect("For a valid input, valid S3 info should be parsed"),
"Should be able to parse metadata out of the correctly named remote delta file"
);
let s3_key = create_s3_key(
&relative_timeline_path.join(METADATA_FILE_NAME),
storage.prefix_in_bucket.as_deref(),
);
let s3_key = create_s3_key(&relative_timeline_path.join(METADATA_FILE_NAME));
assert_eq!(
s3_key.download_destination(
&repo_harness.conf.workdir,
storage.prefix_in_bucket.as_deref()
),
s3_key.download_destination(&repo_harness.conf.workdir),
storage
.local_path(&s3_key)
.expect("For a valid input, valid S3 info should be parsed"),
@@ -409,18 +356,18 @@ mod tests {
Credentials::anonymous().unwrap(),
)
.unwrap(),
prefix_in_bucket: Some("dummy_prefix/".to_string()),
}
}
fn create_s3_key(relative_file_path: &Path, prefix: Option<&str>) -> S3ObjectKey {
S3ObjectKey(relative_file_path.iter().fold(
prefix.unwrap_or_default().to_string(),
|mut path_string, segment| {
path_string.push(S3_FILE_SEPARATOR);
path_string.push_str(segment.to_str().unwrap());
path_string
},
))
fn create_s3_key(relative_file_path: &Path) -> S3ObjectKey {
S3ObjectKey(
relative_file_path
.iter()
.fold(String::new(), |mut path_string, segment| {
path_string.push(S3_FILE_SEPARATOR);
path_string.push_str(segment.to_str().unwrap());
path_string
}),
)
}
}

View File

@@ -86,15 +86,10 @@ use std::{
use anyhow::{bail, Context};
use futures::stream::{FuturesUnordered, StreamExt};
use lazy_static::lazy_static;
use tokio::{fs, sync::RwLock};
use tokio::{
fs,
runtime::Runtime,
sync::{
mpsc::{self, UnboundedReceiver},
watch::Receiver,
RwLock,
},
time::{Duration, Instant},
sync::mpsc::{self, UnboundedReceiver},
time::Instant,
};
use tracing::*;
@@ -351,7 +346,6 @@ pub(super) fn spawn_storage_sync_thread<
P: std::fmt::Debug + Send + Sync + 'static,
S: RemoteStorage<StoragePath = P> + Send + Sync + 'static,
>(
shutdown_hook: Receiver<()>,
conf: &'static PageServerConf,
local_timeline_files: HashMap<TimelineSyncId, (TimelineMetadata, Vec<PathBuf>)>,
storage: S,
@@ -390,7 +384,6 @@ pub(super) fn spawn_storage_sync_thread<
.spawn(move || {
storage_sync_loop(
runtime,
shutdown_hook,
conf,
receiver,
remote_index,
@@ -406,18 +399,11 @@ pub(super) fn spawn_storage_sync_thread<
})
}
enum LoopStep {
NewStates(HashMap<ZTenantId, HashMap<ZTimelineId, TimelineSyncState>>),
Shutdown,
}
#[allow(clippy::too_many_arguments)]
fn storage_sync_loop<
P: std::fmt::Debug + Send + Sync + 'static,
S: RemoteStorage<StoragePath = P> + Send + Sync + 'static,
>(
runtime: Runtime,
mut shutdown_hook: Receiver<()>,
runtime: tokio::runtime::Runtime,
conf: &'static PageServerConf,
mut receiver: UnboundedReceiver<SyncTask>,
index: RemoteTimelineIndex,
@@ -426,34 +412,23 @@ fn storage_sync_loop<
max_sync_errors: NonZeroU32,
) -> anyhow::Result<()> {
let remote_assets = Arc::new((storage, RwLock::new(index)));
loop {
let loop_step = runtime.block_on(async {
tokio::select! {
new_timeline_states = loop_step(
conf,
&mut receiver,
Arc::clone(&remote_assets),
max_concurrent_sync,
max_sync_errors,
)
.instrument(debug_span!("storage_sync_loop_step")) => LoopStep::NewStates(new_timeline_states),
_ = shutdown_hook.changed() => LoopStep::Shutdown,
}
});
match loop_step {
LoopStep::NewStates(new_timeline_states) => {
// Batch timeline download registration to ensure that the external registration code won't block any running tasks before.
set_timeline_states(conf, new_timeline_states);
debug!("Sync loop step completed");
}
LoopStep::Shutdown => {
debug!("Shutdown requested, stopping");
break;
}
}
while !crate::tenant_mgr::shutdown_requested() {
let new_timeline_states = runtime.block_on(
loop_step(
conf,
&mut receiver,
Arc::clone(&remote_assets),
max_concurrent_sync,
max_sync_errors,
)
.instrument(debug_span!("storage_sync_loop_step")),
);
// Batch timeline download registration to ensure that the external registration code won't block any running tasks before.
set_timeline_states(conf, new_timeline_states);
debug!("Sync loop step completed");
}
debug!("Shutdown requested, stopping");
Ok(())
}
@@ -564,7 +539,7 @@ async fn process_task<
"Waiting {} seconds before starting the task",
seconds_to_wait
);
tokio::time::sleep(Duration::from_secs_f64(seconds_to_wait)).await;
tokio::time::sleep(tokio::time::Duration::from_secs_f64(seconds_to_wait)).await;
}
let sync_start = Instant::now();

View File

@@ -202,7 +202,7 @@ async fn try_download_archive<
archive_to_download.disk_consistent_lsn(),
local_metadata.disk_consistent_lsn()
),
Err(e) => warn!("Failed to read local metadata file, assuming it's safe to override its with the download. Read: {:#}", e),
Err(e) => warn!("Failed to read local metadata file, assuing it's safe to override its with the download. Read: {:#}", e),
}
compression::uncompress_file_stream_with_index(
conf.timeline_path(&timeline_id, &tenant_id),

View File

@@ -198,7 +198,7 @@ pub fn get_repository_for_tenant(tenantid: ZTenantId) -> Result<Arc<dyn Reposito
match &tenant.repo {
Some(repo) => Ok(Arc::clone(repo)),
None => bail!("Repository for tenant {} is not yet valid", tenantid),
None => anyhow::bail!("Repository for tenant {} is not yet valid", tenantid),
}
}

View File

@@ -10,46 +10,15 @@
//! This is similar to PostgreSQL's virtual file descriptor facility in
//! src/backend/storage/file/fd.c
//!
use lazy_static::lazy_static;
use std::fs::{File, OpenOptions};
use std::io::{Error, ErrorKind, Read, Seek, SeekFrom, Write};
use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{RwLock, RwLockWriteGuard};
use zenith_metrics::{register_histogram_vec, register_int_gauge_vec, HistogramVec, IntGaugeVec};
use once_cell::sync::OnceCell;
// Metrics collected on disk IO operations
const STORAGE_IO_TIME_BUCKETS: &[f64] = &[
0.000001, // 1 usec
0.00001, // 10 usec
0.0001, // 100 usec
0.001, // 1 msec
0.01, // 10 msec
0.1, // 100 msec
1.0, // 1 sec
];
lazy_static! {
static ref STORAGE_IO_TIME: HistogramVec = register_histogram_vec!(
"pageserver_io_time",
"Time spent in IO operations",
&["operation", "tenant_id", "timeline_id"],
STORAGE_IO_TIME_BUCKETS.into()
)
.expect("failed to define a metric");
}
lazy_static! {
static ref STORAGE_IO_SIZE: IntGaugeVec = register_int_gauge_vec!(
"pageserver_io_size",
"Amount of bytes",
&["operation", "tenant_id", "timeline_id"]
)
.expect("failed to define a metric");
}
///
/// A virtual file descriptor. You can use this just like std::fs::File, but internally
/// the underlying file is closed if the system is low on file descriptors,
@@ -82,10 +51,6 @@ pub struct VirtualFile {
/// storing it here.
pub path: PathBuf,
open_options: OpenOptions,
/// For metrics
tenantid: String,
timelineid: String,
}
#[derive(PartialEq, Clone, Copy)]
@@ -180,13 +145,7 @@ impl OpenFiles {
// old file.
//
if let Some(old_file) = slot_guard.file.take() {
// We do not have information about tenantid/timelineid of evicted file.
// It is possible to store path together with file or use filepath crate,
// but as far as close() is not expected to be fast, it is not so critical to gather
// precise per-tenant statistic here.
STORAGE_IO_TIME
.with_label_values(&["close", "-", "-"])
.observe_closure_duration(|| drop(old_file));
drop(old_file);
}
// Prepare the slot for reuse and return it
@@ -226,20 +185,9 @@ impl VirtualFile {
path: &Path,
open_options: &OpenOptions,
) -> Result<VirtualFile, std::io::Error> {
let parts = path.to_str().unwrap().split('/').collect::<Vec<&str>>();
let tenantid;
let timelineid;
if parts.len() > 5 && parts[parts.len() - 5] == "tenants" {
tenantid = parts[parts.len() - 4].to_string();
timelineid = parts[parts.len() - 2].to_string();
} else {
tenantid = "*".to_string();
timelineid = "*".to_string();
}
let (handle, mut slot_guard) = get_open_files().find_victim_slot();
let file = STORAGE_IO_TIME
.with_label_values(&["open", &tenantid, &timelineid])
.observe_closure_duration(|| open_options.open(path))?;
let file = open_options.open(path)?;
// Strip all options other than read and write.
//
@@ -256,8 +204,6 @@ impl VirtualFile {
pos: 0,
path: path.to_path_buf(),
open_options: reopen_options,
tenantid,
timelineid,
};
slot_guard.file.replace(file);
@@ -267,13 +213,13 @@ impl VirtualFile {
/// Call File::sync_all() on the underlying File.
pub fn sync_all(&self) -> Result<(), Error> {
self.with_file("fsync", |file| file.sync_all())?
self.with_file(|file| file.sync_all())?
}
/// Helper function that looks up the underlying File for this VirtualFile,
/// opening it and evicting some other File if necessary. It calls 'func'
/// with the physical File.
fn with_file<F, R>(&self, op: &str, mut func: F) -> Result<R, Error>
fn with_file<F, R>(&self, mut func: F) -> Result<R, Error>
where
F: FnMut(&File) -> R,
{
@@ -296,9 +242,7 @@ impl VirtualFile {
if let Some(file) = &slot_guard.file {
// Found a cached file descriptor.
slot.recently_used.store(true, Ordering::Relaxed);
return Ok(STORAGE_IO_TIME
.with_label_values(&[op, &self.tenantid, &self.timelineid])
.observe_closure_duration(|| func(file)));
return Ok(func(file));
}
}
}
@@ -323,9 +267,7 @@ impl VirtualFile {
let (handle, mut slot_guard) = open_files.find_victim_slot();
// Open the physical file
let file = STORAGE_IO_TIME
.with_label_values(&["open", &self.tenantid, &self.timelineid])
.observe_closure_duration(|| self.open_options.open(&self.path))?;
let file = self.open_options.open(&self.path)?;
// Perform the requested operation on it
//
@@ -334,9 +276,7 @@ impl VirtualFile {
// library RwLock doesn't allow downgrading without releasing the lock,
// and that doesn't seem worth the trouble. (parking_lot RwLock would
// allow it)
let result = STORAGE_IO_TIME
.with_label_values(&[op, &self.tenantid, &self.timelineid])
.observe_closure_duration(|| func(&file));
let result = func(&file);
// Store the File in the slot and update the handle in the VirtualFile
// to point to it.
@@ -359,13 +299,7 @@ impl Drop for VirtualFile {
let mut slot_guard = slot.inner.write().unwrap();
if slot_guard.tag == handle.tag {
slot.recently_used.store(false, Ordering::Relaxed);
// Unlike files evicted by replacement algorithm, here
// we group close time by tenantid/timelineid.
// At allows to compare number/time of "normal" file closes
// with file eviction.
STORAGE_IO_TIME
.with_label_values(&["close", &self.tenantid, &self.timelineid])
.observe_closure_duration(|| slot_guard.file.take());
slot_guard.file.take();
}
}
}
@@ -401,7 +335,7 @@ impl Seek for VirtualFile {
self.pos = offset;
}
SeekFrom::End(offset) => {
self.pos = self.with_file("seek", |mut file| file.seek(SeekFrom::End(offset)))??
self.pos = self.with_file(|mut file| file.seek(SeekFrom::End(offset)))??
}
SeekFrom::Current(offset) => {
let pos = self.pos as i128 + offset as i128;
@@ -423,23 +357,11 @@ impl Seek for VirtualFile {
impl FileExt for VirtualFile {
fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
let result = self.with_file("read", |file| file.read_at(buf, offset))?;
if let Ok(size) = result {
STORAGE_IO_SIZE
.with_label_values(&["read", &self.tenantid, &self.timelineid])
.add(size as i64);
}
result
self.with_file(|file| file.read_at(buf, offset))?
}
fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
let result = self.with_file("write", |file| file.write_at(buf, offset))?;
if let Ok(size) = result {
STORAGE_IO_SIZE
.with_label_values(&["write", &self.tenantid, &self.timelineid])
.add(size as i64);
}
result
self.with_file(|file| file.write_at(buf, offset))?
}
}

View File

@@ -44,7 +44,7 @@ struct WalReceiverEntry {
}
lazy_static! {
static ref WAL_RECEIVERS: Mutex<HashMap<(ZTenantId, ZTimelineId), WalReceiverEntry>> =
static ref WAL_RECEIVERS: Mutex<HashMap<ZTimelineId, WalReceiverEntry>> =
Mutex::new(HashMap::new());
}
@@ -60,10 +60,10 @@ thread_local! {
// In future we can make this more granular and send shutdown signals
// per tenant/timeline to cancel inactive walreceivers.
// TODO deal with blocking pg connections
pub fn stop_wal_receiver(tenantid: ZTenantId, timelineid: ZTimelineId) {
pub fn stop_wal_receiver(timelineid: ZTimelineId) {
let mut receivers = WAL_RECEIVERS.lock();
if let Some(r) = receivers.get_mut(&(tenantid, timelineid)) {
if let Some(r) = receivers.get_mut(&timelineid) {
match r.wal_receiver_interrupt_sender.take() {
Some(s) => {
if s.send(()).is_err() {
@@ -84,9 +84,9 @@ pub fn stop_wal_receiver(tenantid: ZTenantId, timelineid: ZTimelineId) {
}
}
fn drop_wal_receiver(tenantid: ZTenantId, timelineid: ZTimelineId) {
pub fn drop_wal_receiver(timelineid: ZTimelineId, tenantid: ZTenantId) {
let mut receivers = WAL_RECEIVERS.lock();
receivers.remove(&(tenantid, timelineid));
receivers.remove(&timelineid);
// Check if it was the last walreceiver of the tenant.
// TODO now we store one WalReceiverEntry per timeline,
@@ -104,13 +104,13 @@ fn drop_wal_receiver(tenantid: ZTenantId, timelineid: ZTimelineId) {
// Launch a new WAL receiver, or tell one that's running about change in connection string
pub fn launch_wal_receiver(
conf: &'static PageServerConf,
tenantid: ZTenantId,
timelineid: ZTimelineId,
wal_producer_connstr: &str,
tenantid: ZTenantId,
) {
let mut receivers = WAL_RECEIVERS.lock();
match receivers.get_mut(&(tenantid, timelineid)) {
match receivers.get_mut(&timelineid) {
Some(receiver) => {
receiver.wal_producer_connstr = wal_producer_connstr.into();
}
@@ -121,7 +121,7 @@ pub fn launch_wal_receiver(
.name("WAL receiver thread".into())
.spawn(move || {
IS_WAL_RECEIVER.with(|c| c.set(true));
thread_main(conf, tenantid, timelineid, rx);
thread_main(conf, timelineid, tenantid, rx);
})
.unwrap();
@@ -131,7 +131,7 @@ pub fn launch_wal_receiver(
wal_receiver_interrupt_sender: Some(tx),
tenantid,
};
receivers.insert((tenantid, timelineid), receiver);
receivers.insert(timelineid, receiver);
// Update tenant state and start tenant threads, if they are not running yet.
tenant_mgr::set_tenant_state(tenantid, TenantState::Active).unwrap();
@@ -141,11 +141,11 @@ pub fn launch_wal_receiver(
}
// Look up current WAL producer connection string in the hash table
fn get_wal_producer_connstr(tenantid: ZTenantId, timelineid: ZTimelineId) -> String {
fn get_wal_producer_connstr(timelineid: ZTimelineId) -> String {
let receivers = WAL_RECEIVERS.lock();
receivers
.get(&(tenantid, timelineid))
.get(&timelineid)
.unwrap()
.wal_producer_connstr
.clone()
@@ -156,15 +156,15 @@ fn get_wal_producer_connstr(tenantid: ZTenantId, timelineid: ZTimelineId) -> Str
//
fn thread_main(
conf: &'static PageServerConf,
tenantid: ZTenantId,
timelineid: ZTimelineId,
tenantid: ZTenantId,
interrupt_receiver: oneshot::Receiver<()>,
) {
let _enter = info_span!("WAL receiver", timeline = %timelineid, tenant = %tenantid).entered();
info!("WAL receiver thread started");
// Look up the current WAL producer address
let wal_producer_connstr = get_wal_producer_connstr(tenantid, timelineid);
let wal_producer_connstr = get_wal_producer_connstr(timelineid);
// Make a connection to the WAL safekeeper, or directly to the primary PostgreSQL server,
// and start streaming WAL from it.
@@ -188,7 +188,7 @@ fn thread_main(
// Drop it from list of active WAL_RECEIVERS
// so that next callmemaybe request launched a new thread
drop_wal_receiver(tenantid, timelineid);
drop_wal_receiver(timelineid, tenantid);
}
fn walreceiver_main(

View File

@@ -13,8 +13,6 @@ lazy_static = "1.4.0"
md5 = "0.7.0"
rand = "0.8.3"
hex = "0.4.3"
hyper = "0.14"
routerify = "2"
parking_lot = "0.11.2"
serde = "1"
serde_json = "1"
@@ -25,4 +23,3 @@ rustls = "0.19.1"
reqwest = { version = "0.11", default-features = false, features = ["blocking", "json", "rustls-tls"] }
zenith_utils = { path = "../zenith_utils" }
zenith_metrics = { path = "../zenith_metrics" }

View File

@@ -1,15 +0,0 @@
use hyper::{Body, Request, Response, StatusCode};
use routerify::RouterBuilder;
use zenith_utils::http::endpoint;
use zenith_utils::http::error::ApiError;
use zenith_utils::http::json::json_response;
async fn status_handler(_: Request<Body>) -> Result<Response<Body>, ApiError> {
Ok(json_response(StatusCode::OK, "")?)
}
pub fn make_router() -> RouterBuilder<hyper::Body, ApiError> {
let router = endpoint::make_router();
router.get("/v1/status", status_handler)
}

View File

@@ -9,18 +9,16 @@ use anyhow::bail;
use clap::{App, Arg};
use state::{ProxyConfig, ProxyState};
use std::thread;
use zenith_utils::http::endpoint;
use zenith_utils::{tcp_listener, GIT_VERSION};
mod cplane_api;
mod http;
mod mgmt;
mod proxy;
mod state;
mod waiters;
fn main() -> anyhow::Result<()> {
zenith_metrics::set_common_metrics_prefix("zenith_proxy");
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let arg_matches = App::new("Zenith proxy/router")
.version(GIT_VERSION)
.arg(
@@ -39,14 +37,6 @@ fn main() -> anyhow::Result<()> {
.help("listen for management callback connection on ip:port")
.default_value("127.0.0.1:7000"),
)
.arg(
Arg::with_name("http")
.short("h")
.long("http")
.takes_value(true)
.help("listen for incoming http connections (metrics, etc) on ip:port")
.default_value("127.0.0.1:7001"),
)
.arg(
Arg::with_name("uri")
.short("u")
@@ -93,7 +83,6 @@ fn main() -> anyhow::Result<()> {
let config = ProxyConfig {
proxy_address: arg_matches.value_of("proxy").unwrap().parse()?,
mgmt_address: arg_matches.value_of("mgmt").unwrap().parse()?,
http_address: arg_matches.value_of("http").unwrap().parse()?,
redirect_uri: arg_matches.value_of("uri").unwrap().parse()?,
auth_endpoint: arg_matches.value_of("auth-endpoint").unwrap().parse()?,
ssl_config,
@@ -103,35 +92,16 @@ fn main() -> anyhow::Result<()> {
println!("Version: {}", GIT_VERSION);
// Check that we can bind to address before further initialization
println!("Starting http on {}", state.conf.http_address);
let http_listener = tcp_listener::bind(state.conf.http_address)?;
println!("Starting proxy on {}", state.conf.proxy_address);
let pageserver_listener = tcp_listener::bind(state.conf.proxy_address)?;
println!("Starting mgmt on {}", state.conf.mgmt_address);
let mgmt_listener = tcp_listener::bind(state.conf.mgmt_address)?;
let threads = [
thread::Builder::new()
.name("Http thread".into())
.spawn(move || {
let router = http::make_router();
endpoint::serve_thread_main(router, http_listener)
})?,
// Spawn a thread to listen for connections. It will spawn further threads
// for each connection.
thread::Builder::new()
.name("Listener thread".into())
.spawn(move || proxy::thread_main(state, pageserver_listener))?,
thread::Builder::new()
.name("Mgmt thread".into())
.spawn(move || mgmt::thread_main(state, mgmt_listener))?,
];
for t in threads {
t.join().unwrap()?;
}
tokio::try_join!(
proxy::thread_main(state, pageserver_listener),
mgmt::thread_main(state, mgmt_listener),
)?;
Ok(())
}

View File

@@ -16,7 +16,7 @@ use crate::{cplane_api::DatabaseInfo, ProxyState};
///
/// Listens for connections, and launches a new handler thread for each.
///
pub fn thread_main(state: &'static ProxyState, listener: TcpListener) -> anyhow::Result<()> {
pub async fn thread_main(state: &'static ProxyState, listener: TcpListener) -> anyhow::Result<()> {
loop {
let (socket, peer_addr) = listener.accept()?;
println!("accepted connection from {}", peer_addr);

View File

@@ -14,6 +14,7 @@ use zenith_utils::postgres_backend::{self, PostgresBackend, ProtoState, Stream};
use zenith_utils::pq_proto::{BeMessage as Be, FeMessage as Fe, *};
use zenith_utils::sock_split::{ReadStream, WriteStream};
#[derive(Clone)]
struct CancelClosure {
socket_addr: SocketAddr,
cancel_token: tokio_postgres::CancelToken,
@@ -35,9 +36,14 @@ lazy_static! {
static ref CANCEL_MAP: Mutex<HashMap<CancelKeyData, CancelClosure>> = Mutex::new(HashMap::new());
}
thread_local! {
// Used to clean up the CANCEL_MAP. Might not be necessary if we use tokio thread pool in main loop.
static THREAD_CANCEL_KEY_DATA: Cell<Option<CancelKeyData>> = Cell::new(None);
/// Create new CancelKeyData with backend_pid that doesn't necessarily
/// correspond to the backend_pid of any actual backend.
fn fabricate_cancel_key_data() -> CancelKeyData {
let mut rng = StdRng::from_entropy();
CancelKeyData {
backend_pid: rng.gen(),
cancel_key: rng.gen(),
}
}
///
@@ -45,7 +51,7 @@ thread_local! {
///
/// Listens for connections, and launches a new handler thread for each.
///
pub fn thread_main(
pub async fn thread_main(
state: &'static ProxyState,
listener: std::net::TcpListener,
) -> anyhow::Result<()> {
@@ -54,23 +60,16 @@ pub fn thread_main(
println!("accepted connection from {}", peer_addr);
socket.set_nodelay(true).unwrap();
// TODO Use a threadpool instead. Maybe use tokio's threadpool by
// spawning a future into its runtime. Tokio's JoinError should
// allow us to handle cleanup properly even if the future panics.
thread::Builder::new()
.name("Proxy thread".into())
.spawn(move || {
if let Err(err) = proxy_conn_main(state, socket) {
println!("error: {}", err);
}
// Clean up CANCEL_MAP.
THREAD_CANCEL_KEY_DATA.with(|cell| {
if let Some(cancel_key_data) = cell.get() {
CANCEL_MAP.lock().remove(&cancel_key_data);
};
});
})?;
tokio::task::spawn(async move {
let cancel_key_data = fabricate_cancel_key_data();
let res = tokio::task::spawn(proxy_conn_main(state, socket, cancel_key_data)).await;
CANCEL_MAP.lock().remove(&cancel_key_data);
match res {
Err(join_err) => println!("join error: {}", join_err),
Ok(Err(conn_err)) => println!("connection error: {}", conn_err),
Ok(Ok(())) => {},
}
});
}
}
@@ -81,7 +80,7 @@ struct ProxyConnection {
pgb: PostgresBackend,
}
pub fn proxy_conn_main(state: &'static ProxyState, socket: TcpStream) -> anyhow::Result<()> {
pub async fn proxy_conn_main(state: &'static ProxyState, socket: TcpStream, cancel_key_data: CancelKeyData) -> anyhow::Result<()> {
let conn = ProxyConnection {
state,
psql_session_id: hex::encode(rand::random::<[u8; 8]>()),
@@ -93,7 +92,7 @@ pub fn proxy_conn_main(state: &'static ProxyState, socket: TcpStream) -> anyhow:
)?,
};
let (client, server) = match conn.handle_client()? {
let (client, server) = match conn.handle_client(cancel_key_data).await? {
Some(x) => x,
None => return Ok(()),
};
@@ -105,28 +104,41 @@ pub fn proxy_conn_main(state: &'static ProxyState, socket: TcpStream) -> anyhow:
_ => panic!("invalid stream type"),
};
proxy(client.split(), server.split())
proxy(client.split(), server.split()).await
}
impl ProxyConnection {
/// Returns Ok(None) when connection was successfully closed.
fn handle_client(mut self) -> anyhow::Result<Option<(Stream, TcpStream)>> {
let mut authenticate = || {
let (username, dbname) = match self.handle_startup()? {
Some(x) => x,
None => return Ok(None),
};
async fn handle_client(mut self, cancel_key_data: CancelKeyData) -> anyhow::Result<Option<(Stream, TcpStream)>> {
let (username, dbname) = match self.handle_startup().await? {
Some(x) => x,
None => return Ok(None),
};
// Both scenarios here should end up producing database credentials
if username.ends_with("@zenith") {
let dbinfo = {
if true || username.ends_with("@zenith") {
self.handle_existing_user(&username, &dbname).map(Some)
} else {
self.handle_new_user().map(Some)
}
};
let conn = match authenticate() {
Ok(Some(db_info)) => connect_to_db(db_info),
// let mut authenticate = || async {
// let (username, dbname) = match self.handle_startup().await? {
// Some(x) => x,
// None => return Ok(None),
// };
// // Both scenarios here should end up producing database credentials
// if true || username.ends_with("@zenith") {
// self.handle_existing_user(&username, &dbname).map(Some)
// } else {
// self.handle_new_user().map(Some)
// }
// };
let conn = match dbinfo {
Ok(Some(info)) => connect_to_db(info),
Ok(None) => return Ok(None),
Err(e) => {
// Report the error to the client
@@ -137,11 +149,8 @@ impl ProxyConnection {
// We'll get rid of this once migration to async is complete
let (pg_version, db_stream) = {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
let (pg_version, stream, cancel_key_data) = runtime.block_on(conn)?;
let (pg_version, stream, cancel_closure) = conn.await?;
CANCEL_MAP.lock().insert(cancel_key_data, cancel_closure);
self.pgb
.write_message(&BeMessage::BackendKeyData(cancel_key_data))?;
let stream = stream.into_std()?;
@@ -161,7 +170,7 @@ impl ProxyConnection {
}
/// Returns Ok(None) when connection was successfully closed.
fn handle_startup(&mut self) -> anyhow::Result<Option<(String, String)>> {
async fn handle_startup(&mut self) -> anyhow::Result<Option<(String, String)>> {
let have_tls = self.pgb.tls_config.is_some();
let mut encrypted = false;
@@ -198,12 +207,9 @@ impl ProxyConnection {
return Ok(Some((get_param("user")?, get_param("database")?)));
}
FeStartupPacket::CancelRequest(cancel_key_data) => {
if let Some(cancel_closure) = CANCEL_MAP.lock().get(&cancel_key_data) {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
runtime.block_on(cancel_closure.try_cancel_query());
let entry = CANCEL_MAP.lock().get(&cancel_key_data).map(core::clone::Clone::clone);
if let Some(cancel_closure) = entry {
cancel_closure.try_cancel_query().await;
}
return Ok(None);
}
@@ -231,14 +237,21 @@ impl ProxyConnection {
.split_last()
.ok_or_else(|| anyhow!("unexpected password message"))?;
let cplane = CPlaneApi::new(&self.state.conf.auth_endpoint, &self.state.waiters);
let db_info = cplane.authenticate_proxy_request(
user,
db,
md5_response,
&md5_salt,
&self.psql_session_id,
)?;
let db_info = DatabaseInfo {
host: "localhost".into(),
port: 5432,
dbname: "postgres".into(),
user: "postgres".into(),
password: Some("postgres".into()),
};
// let cplane = CPlaneApi::new(&self.state.conf.auth_endpoint, &self.state.waiters);
// let db_info = cplane.authenticate_proxy_request(
// user,
// db,
// md5_response,
// &md5_salt,
// &self.psql_session_id,
// )?;
self.pgb
.write_message_noflush(&Be::AuthenticationOk)?
@@ -287,7 +300,7 @@ fn hello_message(redirect_uri: &str, session_id: &str) -> String {
/// Create a TCP connection to a postgres database, authenticate with it, and receive the ReadyForQuery message
async fn connect_to_db(
db_info: DatabaseInfo,
) -> anyhow::Result<(String, tokio::net::TcpStream, CancelKeyData)> {
) -> anyhow::Result<(String, tokio::net::TcpStream, CancelClosure)> {
// Make raw connection. When connect_raw finishes we've received ReadyForQuery.
let socket_addr = db_info.socket_addr()?;
let mut socket = tokio::net::TcpStream::connect(socket_addr).await?;
@@ -295,41 +308,21 @@ async fn connect_to_db(
// NOTE We effectively ignore some ParameterStatus and NoticeResponse
// messages here. Not sure if that could break something.
let (client, conn) = config.connect_raw(&mut socket, NoTls).await?;
// Save info for potentially cancelling the query later
let mut rng = StdRng::from_entropy();
let cancel_key_data = CancelKeyData {
// HACK We'd rather get the real backend_pid but tokio_postgres doesn't
// expose it and we don't want to do another roundtrip to query
// for it. The client will be able to notice that this is not the
// actual backend_pid, but backend_pid is not used for anything
// so it doesn't matter.
backend_pid: rng.gen(),
cancel_key: rng.gen(),
};
let cancel_closure = CancelClosure {
socket_addr,
cancel_token: client.cancel_token(),
};
CANCEL_MAP.lock().insert(cancel_key_data, cancel_closure);
THREAD_CANCEL_KEY_DATA.with(|cell| {
let prev_value = cell.replace(Some(cancel_key_data));
assert!(
prev_value.is_none(),
"THREAD_CANCEL_KEY_DATA was already set"
);
});
let version = conn.parameter("server_version").unwrap();
Ok((version.into(), socket, cancel_key_data))
Ok((version.into(), socket, cancel_closure))
}
/// Concurrently proxy both directions of the client and server connections
fn proxy(
async fn proxy(
(client_read, client_write): (ReadStream, WriteStream),
(server_read, server_write): (ReadStream, WriteStream),
) -> anyhow::Result<()> {
fn do_proxy(mut reader: impl io::Read, mut writer: WriteStream) -> io::Result<u64> {
async fn do_proxy(mut reader: impl io::Read, mut writer: WriteStream) -> io::Result<u64> {
/// FlushWriter will make sure that every message is sent as soon as possible
struct FlushWriter<W>(W);
@@ -354,10 +347,9 @@ fn proxy(
res
}
let client_to_server_jh = thread::spawn(move || do_proxy(client_read, server_write));
do_proxy(server_read, client_write)?;
client_to_server_jh.join().unwrap()?;
tokio::try_join!(
do_proxy(client_read, server_write),
do_proxy(server_read, client_write),
)?;
Ok(())
}

View File

@@ -10,12 +10,8 @@ pub struct ProxyConfig {
/// main entrypoint for users to connect to
pub proxy_address: SocketAddr,
/// internally used for status and prometheus metrics
pub http_address: SocketAddr,
/// management endpoint. Upon user account creation control plane
/// http management endpoint. Upon user account creation control plane
/// will notify us here, so that we can 'unfreeze' user session.
/// TODO It uses postgres protocol over TCP but should be migrated to http.
pub mgmt_address: SocketAddr,
/// send unauthenticated users to this URI

View File

@@ -47,9 +47,6 @@ Useful environment variables:
`TEST_OUTPUT`: Set the directory where test state and test output files
should go.
`TEST_SHARED_FIXTURES`: Try to re-use a single pageserver for all the tests.
`ZENITH_PAGESERVER_OVERRIDES`: add a `;`-separated set of configs that will be passed as
`--pageserver-config-override=${value}` parameter values when zenith cli is invoked
`RUST_LOG`: logging configuration to pass into Zenith CLI
Let stdout, stderr and `INFO` log messages go to the terminal instead of capturing them:
`pytest -s --log-cli-level=INFO ...`

View File

@@ -5,7 +5,7 @@ import psycopg2.extras
import pytest
from fixtures.log_helper import log
from fixtures.utils import print_gc_result
from fixtures.zenith_fixtures import ZenithEnvBuilder
from fixtures.zenith_fixtures import ZenithEnv
pytest_plugins = ("fixtures.zenith_fixtures")
@@ -13,18 +13,10 @@ pytest_plugins = ("fixtures.zenith_fixtures")
#
# Create a couple of branches off the main branch, at a historical point in time.
#
def test_branch_behind(zenith_env_builder: ZenithEnvBuilder):
# Use safekeeper in this test to avoid a subtle race condition.
# Without safekeeper, walreceiver reconnection can stuck
# because of IO deadlock.
#
# See https://github.com/zenithdb/zenith/issues/1068
zenith_env_builder.num_safekeepers = 1
env = zenith_env_builder.init()
def test_branch_behind(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
# Branch at the point where only 100 rows were inserted
env.zenith_cli(["branch", "test_branch_behind", "main"])
env.zenith_cli(["branch", "test_branch_behind", "empty"])
pgmain = env.postgres.create_start('test_branch_behind')
log.info("postgres is running on 'test_branch_behind' branch")

View File

@@ -1,5 +1,5 @@
import json
from uuid import uuid4, UUID
from uuid import uuid4
import pytest
import psycopg2
import requests
@@ -96,15 +96,6 @@ def check_client(client: ZenithPageserverHttpClient, initial_tenant: str):
client.tenant_create(tenant_id)
assert tenant_id.hex in {t['id'] for t in client.tenant_list()}
# check its timelines
timelines = client.timeline_list(tenant_id)
assert len(timelines) > 0
for timeline_id_str in timelines:
timeline_details = client.timeline_details(tenant_id.hex, timeline_id_str)
assert timeline_details['type'] == 'Local'
assert timeline_details['tenant_id'] == tenant_id.hex
assert timeline_details['timeline_id'] == timeline_id_str
# create branch
branch_name = uuid4().hex
client.branch_create(tenant_id, branch_name, "main")

View File

@@ -1,88 +0,0 @@
# It's possible to run any regular test with the local fs remote storage via
# env ZENITH_PAGESERVER_OVERRIDES="remote_storage={local_path='/tmp/zenith_zzz/'}" pipenv ......
import tempfile, time, shutil, os
from contextlib import closing
from pathlib import Path
from fixtures.zenith_fixtures import ZenithEnvBuilder, LocalFsStorage, check_restored_datadir_content
from fixtures.log_helper import log
pytest_plugins = ("fixtures.zenith_fixtures")
#
# Tests that a piece of data is backed up and restored correctly:
#
# 1. Initial pageserver
# * starts a pageserver with remote storage, stores specific data in its tables
# * triggers a checkpoint (which produces a local data scheduled for backup), gets the corresponding timeline id
# * polls the timeline status to ensure it's copied remotely
# * stops the pageserver, clears all local directories
#
# 2. Second pageserver
# * starts another pageserver, connected to the same remote storage
# * same timeline id is queried for status, triggering timeline's download
# * timeline status is polled until it's downloaded
# * queries the specific data, ensuring that it matches the one stored before
#
def test_remote_storage_backup_and_restore(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.rust_log_override = 'debug'
zenith_env_builder.num_safekeepers = 1
zenith_env_builder.enable_local_fs_remote_storage()
data_id = 1
data_secret = 'very secret secret'
##### First start, insert secret data and upload it to the remote storage
env = zenith_env_builder.init()
pg = env.postgres.create_start()
tenant_id = pg.safe_psql("show zenith.zenith_tenant")[0][0]
timeline_id = pg.safe_psql("show zenith.zenith_timeline")[0][0]
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute(f'''
CREATE TABLE t1(id int primary key, secret text);
INSERT INTO t1 VALUES ({data_id}, '{data_secret}');
''')
# run checkpoint manually to be sure that data landed in remote storage
with closing(env.pageserver.connect()) as psconn:
with psconn.cursor() as pscur:
pscur.execute(f"do_gc {tenant_id} {timeline_id}")
log.info("waiting for upload") # TODO api to check if upload is done
time.sleep(2)
##### Stop the first pageserver instance, erase all its data
env.postgres.stop_all()
env.pageserver.stop()
dir_to_clear = Path(env.repo_dir) / 'tenants'
shutil.rmtree(dir_to_clear)
os.mkdir(dir_to_clear)
##### Second start, restore the data and ensure it's the same
env.pageserver.start()
log.info("waiting for timeline redownload")
client = env.pageserver.http_client()
attempts = 0
while True:
timeline_details = client.timeline_details(tenant_id, timeline_id)
assert timeline_details['timeline_id'] == timeline_id
assert timeline_details['tenant_id'] == tenant_id
if timeline_details['type'] == 'Local':
log.info("timeline downloaded, checking its data")
break
attempts += 1
if attempts > 10:
raise Exception("timeline redownload failed")
log.debug("still waiting")
time.sleep(1)
pg = env.postgres.create_start()
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute(f'SELECT secret FROM t1 WHERE id = {data_id};')
assert cur.fetchone() == (data_secret, )

View File

@@ -66,8 +66,8 @@ def test_layerfiles_gc(zenith_simple_env: ZenithEnv):
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
row = pscur.fetchone()
print_gc_result(row)
assert row['layer_relfiles_total'] == layer_relfiles_remain + 1
assert row['layer_relfiles_removed'] == 0
assert row['layer_relfiles_total'] == layer_relfiles_remain + 2
assert row['layer_relfiles_removed'] == 2
assert row['layer_relfiles_dropped'] == 0
# Insert two more rows and run GC.
@@ -81,7 +81,7 @@ def test_layerfiles_gc(zenith_simple_env: ZenithEnv):
row = pscur.fetchone()
print_gc_result(row)
assert row['layer_relfiles_total'] == layer_relfiles_remain + 2
assert row['layer_relfiles_removed'] == 0
assert row['layer_relfiles_removed'] == 2
assert row['layer_relfiles_dropped'] == 0
# Do it again. Should again create two new layer files and remove old ones.
@@ -92,8 +92,8 @@ def test_layerfiles_gc(zenith_simple_env: ZenithEnv):
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
row = pscur.fetchone()
print_gc_result(row)
assert row['layer_relfiles_total'] == layer_relfiles_remain + 3
assert row['layer_relfiles_removed'] == 0
assert row['layer_relfiles_total'] == layer_relfiles_remain + 2
assert row['layer_relfiles_removed'] == 2
assert row['layer_relfiles_dropped'] == 0
# Run GC again, with no changes in the database. Should not remove anything.
@@ -101,7 +101,7 @@ def test_layerfiles_gc(zenith_simple_env: ZenithEnv):
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
row = pscur.fetchone()
print_gc_result(row)
assert row['layer_relfiles_total'] == layer_relfiles_remain + 3
assert row['layer_relfiles_total'] == layer_relfiles_remain
assert row['layer_relfiles_removed'] == 0
assert row['layer_relfiles_dropped'] == 0
@@ -121,7 +121,9 @@ def test_layerfiles_gc(zenith_simple_env: ZenithEnv):
# Each relation fork is counted separately, hence 3.
assert row['layer_relfiles_needed_as_tombstone'] == 3
assert row['layer_relfiles_removed'] == 0
# The catalog updates also create new layer files of the catalogs, which
# are counted as 'removed'
assert row['layer_relfiles_removed'] > 0
# TODO Change the test to check actual CG of dropped layers.
# Each relation fork is counted separately, hence 3.

View File

@@ -25,7 +25,7 @@ from dataclasses import dataclass
# Type-related stuff
from psycopg2.extensions import connection as PgConnection
from typing import Any, Callable, Dict, Iterator, List, Optional, TypeVar, cast, Union
from typing import Any, Callable, Dict, Iterator, List, Optional, TypeVar, cast
from typing_extensions import Literal
import pytest
@@ -342,14 +342,10 @@ class ZenithEnvBuilder:
def __init__(self,
repo_dir: Path,
port_distributor: PortDistributor,
pageserver_remote_storage: Optional[RemoteStorage] = None,
num_safekeepers: int = 0,
pageserver_auth_enabled: bool = False,
rust_log_override: Optional[str] = None):
pageserver_auth_enabled: bool = False):
self.repo_dir = repo_dir
self.rust_log_override = rust_log_override
self.port_distributor = port_distributor
self.pageserver_remote_storage = pageserver_remote_storage
self.num_safekeepers = num_safekeepers
self.pageserver_auth_enabled = pageserver_auth_enabled
self.env: Optional[ZenithEnv] = None
@@ -360,11 +356,6 @@ class ZenithEnvBuilder:
self.env = ZenithEnv(self)
return self.env
def enable_local_fs_remote_storage(self):
assert self.pageserver_remote_storage is None, "remote storage is enabled already"
self.pageserver_remote_storage = LocalFsStorage(
Path(self.repo_dir / 'local_fs_remote_storage'))
def __enter__(self):
return self
@@ -413,7 +404,6 @@ class ZenithEnv:
"""
def __init__(self, config: ZenithEnvBuilder):
self.repo_dir = config.repo_dir
self.rust_log_override = config.rust_log_override
self.port_distributor = config.port_distributor
self.postgres = PostgresFactory(self)
@@ -444,9 +434,7 @@ auth_type = '{pageserver_auth_type}'
"""
# Create a corresponding ZenithPageserver object
self.pageserver = ZenithPageserver(self,
port=pageserver_port,
remote_storage=config.pageserver_remote_storage)
self.pageserver = ZenithPageserver(self, port=pageserver_port)
# Create config and a Safekeeper object for each safekeeper
for i in range(1, config.num_safekeepers + 1):
@@ -477,8 +465,6 @@ sync = false # Disable fsyncs to make the tests go faster
tmp.flush()
cmd = ['init', f'--config={tmp.name}']
append_pageserver_param_overrides(cmd, config.pageserver_remote_storage)
self.zenith_cli(cmd)
# Start up the page server and all the safekeepers
@@ -523,9 +509,6 @@ sync = false # Disable fsyncs to make the tests go faster
env_vars['ZENITH_REPO_DIR'] = str(self.repo_dir)
env_vars['POSTGRES_DISTRIB_DIR'] = str(pg_distrib_dir)
if self.rust_log_override is not None:
env_vars['RUST_LOG'] = self.rust_log_override
# Pass coverage settings
var = 'LLVM_PROFILE_FILE'
val = os.environ.get(var)
@@ -682,20 +665,6 @@ class ZenithPageserverHttpClient(requests.Session):
res.raise_for_status()
return res.json()
def timeline_list(self, tenant_id: uuid.UUID) -> List[str]:
res = self.get(f"http://localhost:{self.port}/v1/timeline/{tenant_id.hex}")
res.raise_for_status()
res_json = res.json()
assert isinstance(res_json, list)
return res_json
def timeline_details(self, tenant_id: str, timeline_id: str) -> Dict[Any, Any]:
res = self.get(f"http://localhost:{self.port}/v1/timeline/{tenant_id}/{timeline_id}")
res.raise_for_status()
res_json = res.json()
assert isinstance(res_json, dict)
return res_json
def get_metrics(self) -> str:
res = self.get(f"http://localhost:{self.port}/metrics")
res.raise_for_status()
@@ -708,38 +677,17 @@ class PageserverPort:
http: int
@dataclass
class LocalFsStorage:
root: Path
@dataclass
class S3Storage:
bucket: str
region: str
access_key: str
secret_key: str
RemoteStorage = Union[LocalFsStorage, S3Storage]
class ZenithPageserver(PgProtocol):
"""
An object representing a running pageserver.
Initializes the repository via `zenith init`.
"""
def __init__(self,
env: ZenithEnv,
port: PageserverPort,
remote_storage: Optional[RemoteStorage] = None,
enable_auth=False):
def __init__(self, env: ZenithEnv, port: PageserverPort, enable_auth=False):
super().__init__(host='localhost', port=port.pg)
self.env = env
self.running = False
self.service_port = port # do not shadow PgProtocol.port which is just int
self.remote_storage = remote_storage
def start(self) -> 'ZenithPageserver':
"""
@@ -748,10 +696,7 @@ class ZenithPageserver(PgProtocol):
"""
assert self.running == False
start_args = ['pageserver', 'start']
append_pageserver_param_overrides(start_args, self.remote_storage)
self.env.zenith_cli(start_args)
self.env.zenith_cli(['pageserver', 'start'])
self.running = True
return self
@@ -784,28 +729,6 @@ class ZenithPageserver(PgProtocol):
)
def append_pageserver_param_overrides(params_to_update: List[str],
pageserver_remote_storage: Optional[RemoteStorage]):
if pageserver_remote_storage is not None:
if isinstance(pageserver_remote_storage, LocalFsStorage):
pageserver_storage_override = f"local_path='{pageserver_remote_storage.root}'"
elif isinstance(pageserver_remote_storage, S3Storage):
pageserver_storage_override = f"bucket_name='{pageserver_remote_storage.bucket}',\
bucket_region='{pageserver_remote_storage.region}',access_key_id='{pageserver_remote_storage.access_key}',\
secret_access_key='{pageserver_remote_storage.secret_key}'"
else:
raise Exception(f'Unknown storage configuration {pageserver_remote_storage}')
params_to_update.append(
f'--pageserver-config-override=remote_storage={{{pageserver_storage_override}}}')
env_overrides = os.getenv('ZENITH_PAGESERVER_OVERRIDES')
if env_overrides is not None:
params_to_update += [
f'--pageserver-config-override={o.strip()}' for o in env_overrides.split(';')
]
class PgBin:
""" A helper class for executing postgres binaries """
def __init__(self, log_dir: str):

View File

@@ -10,7 +10,6 @@ use std::fs::File;
use std::path::{Path, PathBuf};
use std::thread;
use tracing::*;
use walkeeper::timeline::{CreateControlFile, FileStorage};
use zenith_utils::http::endpoint;
use zenith_utils::{logging, tcp_listener, GIT_VERSION};
@@ -87,21 +86,8 @@ fn main() -> Result<()> {
.takes_value(false)
.help("Do not wait for changes to be written safely to disk"),
)
.arg(
Arg::with_name("dump-control-file")
.long("dump-control-file")
.takes_value(true)
.help("Dump control file at path specifed by this argument and exit"),
)
.get_matches();
if let Some(addr) = arg_matches.value_of("dump-control-file") {
let state = FileStorage::load_control_file(Path::new(addr), CreateControlFile::False)?;
let json = serde_json::to_string(&state)?;
print!("{}", json);
return Ok(());
}
let mut conf: SafeKeeperConf = Default::default();
if let Some(dir) = arg_matches.value_of("datadir") {

View File

@@ -62,7 +62,7 @@ impl Default for SafeKeeperConf {
daemonize: false,
no_sync: false,
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
listen_http_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
ttl: None,
recall_period: defaults::DEFAULT_RECALL_PERIOD,
}

View File

@@ -30,7 +30,7 @@ use zenith_utils::pq_proto::SystemId;
use zenith_utils::zid::{ZTenantId, ZTimelineId};
pub const SK_MAGIC: u32 = 0xcafeceefu32;
pub const SK_FORMAT_VERSION: u32 = 3;
pub const SK_FORMAT_VERSION: u32 = 2;
const SK_PROTOCOL_VERSION: u32 = 1;
const UNKNOWN_SERVER_VERSION: u32 = 0;
@@ -133,11 +133,9 @@ pub struct ServerInfo {
/// Postgres server version
pub pg_version: u32,
pub system_id: SystemId,
#[serde(with = "hex")]
pub tenant_id: ZTenantId,
/// Zenith timelineid
#[serde(with = "hex")]
pub timeline_id: ZTimelineId,
pub ztli: ZTimelineId,
pub wal_seg_size: u32,
}
@@ -151,7 +149,6 @@ pub struct SafeKeeperState {
pub server: ServerInfo,
/// Unique id of the last *elected* proposer we dealed with. Not needed
/// for correctness, exists for monitoring purposes.
#[serde(with = "hex")]
pub proposer_uuid: PgUuid,
/// part of WAL acknowledged by quorum and available locally
pub commit_lsn: Lsn,
@@ -174,7 +171,7 @@ impl SafeKeeperState {
pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */
system_id: 0, /* Postgres system identifier */
tenant_id: ZTenantId::from([0u8; 16]),
timeline_id: ZTimelineId::from([0u8; 16]),
ztli: ZTimelineId::from([0u8; 16]),
wal_seg_size: 0,
},
proposer_uuid: [0; 16],
@@ -563,13 +560,13 @@ where
// set basic info about server, if not yet
self.s.server.system_id = msg.system_id;
self.s.server.tenant_id = msg.tenant_id;
self.s.server.timeline_id = msg.ztli;
self.s.server.ztli = msg.ztli;
self.s.server.wal_seg_size = msg.wal_seg_size;
self.storage
.persist(&self.s)
.with_context(|| "failed to persist shared state")?;
self.metrics = SafeKeeperMetrics::new(self.s.server.timeline_id);
self.metrics = SafeKeeperMetrics::new(self.s.server.ztli);
info!(
"processed greeting from proposer {:?}, sending term {:?}",

View File

@@ -79,7 +79,7 @@ struct ReplicationConnGuard {
impl Drop for ReplicationConnGuard {
fn drop(&mut self) {
self.timeline.remove_replica(self.replica);
self.timeline.update_replica_state(self.replica, None);
}
}
@@ -120,12 +120,14 @@ impl ReplicationConn {
/// This is spawned into the background by `handle_start_replication`.
fn background_thread(
mut stream_in: ReadStream,
replica_guard: Arc<ReplicationConnGuard>,
timeline: Arc<Timeline>,
replica_id: usize,
) -> Result<()> {
let replica_id = replica_guard.replica;
let timeline = &replica_guard.timeline;
let mut state = ReplicaState::new();
let _guard = ReplicationConnGuard {
replica: replica_id,
timeline: timeline.clone(),
};
// Wait for replica's feedback.
while let Some(msg) = FeMessage::read(&mut stream_in)? {
match &msg {
@@ -138,7 +140,7 @@ impl ReplicationConn {
// Note: deserializing is on m[1..] because we skip the tag byte.
state.hs_feedback = HotStandbyFeedback::des(&m[1..])
.context("failed to deserialize HotStandbyFeedback")?;
timeline.update_replica_state(replica_id, state);
timeline.update_replica_state(replica_id, Some(state));
}
Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => {
let reply = StandbyReply::des(&m[1..])
@@ -146,7 +148,7 @@ impl ReplicationConn {
state.last_received_lsn = reply.write_lsn;
state.disk_consistent_lsn = reply.flush_lsn;
state.remote_consistent_lsn = reply.apply_lsn;
timeline.update_replica_state(replica_id, state);
timeline.update_replica_state(replica_id, Some(state));
}
_ => warn!("unexpected message {:?}", msg),
}
@@ -205,23 +207,16 @@ impl ReplicationConn {
// This replica_id is used below to check if it's time to stop replication.
let replica_id = bg_timeline.add_replica(state);
// Use a guard object to remove our entry from the timeline, when the background
// thread and us have both finished using it.
let replica_guard = Arc::new(ReplicationConnGuard {
replica: replica_id,
timeline: bg_timeline,
});
let bg_replica_guard = Arc::clone(&replica_guard);
// TODO: here we got two threads, one for writing WAL and one for receiving
// feedback. If one of them fails, we should shutdown the other one too.
let _ = thread::Builder::new()
.name("HotStandbyFeedback thread".into())
.spawn(move || {
if let Err(err) = Self::background_thread(bg_stream_in, bg_replica_guard) {
if let Err(err) = Self::background_thread(bg_stream_in, bg_timeline, replica_id) {
error!("Replication background thread failed: {}", err);
}
})?;
})
.unwrap();
let mut wal_seg_size: usize;
loop {

View File

@@ -9,7 +9,7 @@ use std::cmp::{max, min};
use std::collections::HashMap;
use std::fs::{self, File, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::path::PathBuf;
use std::sync::{Arc, Condvar, Mutex};
use std::time::Duration;
use tracing::*;
@@ -121,7 +121,7 @@ impl SharedState {
}
/// Assign new replica ID. We choose first empty cell in the replicas vector
/// or extend the vector if there are no free slots.
/// or extend the vector if there are not free items.
pub fn add_replica(&mut self, state: ReplicaState) -> usize {
if let Some(pos) = self.replicas.iter().position(|r| r.is_none()) {
self.replicas[pos] = Some(state);
@@ -136,14 +136,13 @@ impl SharedState {
/// If create=false and file doesn't exist, bails out.
fn create_restore(
conf: &SafeKeeperConf,
timeline_id: ZTimelineId,
timelineid: ZTimelineId,
create: CreateControlFile,
) -> Result<Self> {
let state = FileStorage::load_control_file_conf(conf, timeline_id, create)
let (file_storage, state) = FileStorage::load_from_control_file(conf, timelineid, create)
.with_context(|| "failed to load from control file")?;
let file_storage = FileStorage::new(timeline_id, conf);
let flush_lsn = if state.server.wal_seg_size != 0 {
let wal_dir = conf.timeline_dir(&timeline_id);
let wal_dir = conf.timeline_dir(&timelineid);
find_end_of_wal(
&wal_dir,
state.server.wal_seg_size as usize,
@@ -298,15 +297,9 @@ impl Timeline {
shared_state.add_replica(state)
}
pub fn update_replica_state(&self, id: usize, state: ReplicaState) {
pub fn update_replica_state(&self, id: usize, state: Option<ReplicaState>) {
let mut shared_state = self.mutex.lock().unwrap();
shared_state.replicas[id] = Some(state);
}
pub fn remove_replica(&self, id: usize) {
let mut shared_state = self.mutex.lock().unwrap();
assert!(shared_state.replicas[id].is_some());
shared_state.replicas[id] = None;
shared_state.replicas[id] = state;
}
pub fn get_end_of_wal(&self) -> Lsn {
@@ -388,7 +381,7 @@ impl GlobalTimelines {
}
#[derive(Debug)]
pub struct FileStorage {
struct FileStorage {
// save timeline dir to avoid reconstructing it every time
timeline_dir: PathBuf,
conf: SafeKeeperConf,
@@ -396,17 +389,6 @@ pub struct FileStorage {
}
impl FileStorage {
fn new(timeline_id: ZTimelineId, conf: &SafeKeeperConf) -> FileStorage {
let timeline_dir = conf.timeline_dir(&timeline_id);
let timelineid_str = format!("{}", timeline_id);
FileStorage {
timeline_dir,
conf: conf.clone(),
persist_control_file_seconds: PERSIST_CONTROL_FILE_SECONDS
.with_label_values(&[&timelineid_str]),
}
}
// Check the magic/version in the on-disk data and deserialize it, if possible.
fn deser_sk_state(buf: &mut &[u8]) -> Result<SafeKeeperState> {
// Read the version independent part
@@ -427,24 +409,20 @@ impl FileStorage {
upgrade_control_file(buf, version)
}
fn load_control_file_conf(
conf: &SafeKeeperConf,
timeline_id: ZTimelineId,
create: CreateControlFile,
) -> Result<SafeKeeperState> {
let path = conf.timeline_dir(&timeline_id).join(CONTROL_FILE_NAME);
Self::load_control_file(path, create)
}
/// Read in the control file.
/// Fetch and lock control file (prevent running more than one instance of safekeeper)
/// If create=false and file doesn't exist, bails out.
pub fn load_control_file<P: AsRef<Path>>(
control_file_path: P,
fn load_from_control_file(
conf: &SafeKeeperConf,
timelineid: ZTimelineId,
create: CreateControlFile,
) -> Result<SafeKeeperState> {
) -> Result<(FileStorage, SafeKeeperState)> {
let timeline_dir = conf.timeline_dir(&timelineid);
let control_file_path = timeline_dir.join(CONTROL_FILE_NAME);
info!(
"loading control file {}, create={:?}",
control_file_path.as_ref().display(),
control_file_path.display(),
create,
);
@@ -456,7 +434,7 @@ impl FileStorage {
.with_context(|| {
format!(
"failed to open control file at {}",
control_file_path.as_ref().display(),
control_file_path.display(),
)
})?;
@@ -487,15 +465,21 @@ impl FileStorage {
);
FileStorage::deser_sk_state(&mut &buf[..buf.len() - CHECKSUM_SIZE]).with_context(
|| {
format!(
"while reading control file {}",
control_file_path.as_ref().display(),
)
},
|| format!("while reading control file {}", control_file_path.display(),),
)?
};
Ok(state)
let timelineid_str = format!("{}", timelineid);
Ok((
FileStorage {
timeline_dir,
conf: conf.clone(),
persist_control_file_seconds: PERSIST_CONTROL_FILE_SECONDS
.with_label_values(&[&timelineid_str]),
},
state,
))
}
}
@@ -565,7 +549,7 @@ impl Storage for FileStorage {
let mut start_pos = startpos;
const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
let wal_seg_size = server.wal_seg_size as usize;
let ztli = server.timeline_id;
let ztli = server.ztli;
/* Extract WAL location for this block */
let mut xlogoff = start_pos.segment_offset(wal_seg_size) as usize;
@@ -653,7 +637,7 @@ impl Storage for FileStorage {
let partial;
const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
let wal_seg_size = server.wal_seg_size as usize;
let ztli = server.timeline_id;
let ztli = server.ztli;
/* Extract WAL location for this block */
let mut xlogoff = end_pos.segment_offset(wal_seg_size) as usize;
@@ -753,10 +737,7 @@ mod test {
) -> Result<(FileStorage, SafeKeeperState)> {
fs::create_dir_all(&conf.timeline_dir(&timeline_id))
.expect("failed to create timeline dir");
Ok((
FileStorage::new(timeline_id, conf),
FileStorage::load_control_file_conf(conf, timeline_id, create)?,
))
FileStorage::load_from_control_file(conf, timeline_id, create)
}
#[test]

View File

@@ -5,12 +5,7 @@ use crate::safekeeper::{
use anyhow::{bail, Result};
use serde::{Deserialize, Serialize};
use tracing::*;
use zenith_utils::{
bin_ser::LeSer,
lsn::Lsn,
pq_proto::SystemId,
zid::{ZTenantId, ZTimelineId},
};
use zenith_utils::{bin_ser::LeSer, lsn::Lsn};
/// Persistent consensus state of the acceptor.
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -40,36 +35,6 @@ struct SafeKeeperStateV1 {
wal_start_lsn: Lsn,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct ServerInfoV2 {
/// Postgres server version
pub pg_version: u32,
pub system_id: SystemId,
pub tenant_id: ZTenantId,
/// Zenith timelineid
pub ztli: ZTimelineId,
pub wal_seg_size: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SafeKeeperStateV2 {
/// persistent acceptor state
pub acceptor_state: AcceptorState,
/// information about server
pub server: ServerInfoV2,
/// Unique id of the last *elected* proposer we dealed with. Not needed
/// for correctness, exists for monitoring purposes.
pub proposer_uuid: PgUuid,
/// part of WAL acknowledged by quorum and available locally
pub commit_lsn: Lsn,
/// minimal LSN which may be needed for recovery of some safekeeper (end_lsn
/// of last record streamed to everyone)
pub truncate_lsn: Lsn,
// Safekeeper starts receiving WAL from this LSN, zeros before it ought to
// be skipped during decoding.
pub wal_start_lsn: Lsn,
}
pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState> {
// migrate to storing full term history
if version == 1 {
@@ -90,25 +55,6 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
truncate_lsn: oldstate.truncate_lsn,
wal_start_lsn: oldstate.wal_start_lsn,
});
// migrate to hexing some zids
} else if version == 2 {
info!("reading safekeeper control file version {}", version);
let oldstate = SafeKeeperStateV2::des(&buf[..buf.len()])?;
let server = ServerInfo {
pg_version: oldstate.server.pg_version,
system_id: oldstate.server.system_id,
tenant_id: oldstate.server.tenant_id,
timeline_id: oldstate.server.ztli,
wal_seg_size: oldstate.server.wal_seg_size,
};
return Ok(SafeKeeperState {
acceptor_state: oldstate.acceptor_state,
server,
proposer_uuid: oldstate.proposer_uuid,
commit_lsn: oldstate.commit_lsn,
truncate_lsn: oldstate.truncate_lsn,
wal_start_lsn: oldstate.wal_start_lsn,
});
}
bail!("unsupported safekeeper control file version {}", version)
}

View File

@@ -102,21 +102,12 @@ fn main() -> Result<()> {
.required(false)
.value_name("stop-mode");
let pageserver_config_args = Arg::with_name("pageserver-config-override")
.long("pageserver-config-override")
.takes_value(true)
.number_of_values(1)
.multiple(true)
.help("Additional pageserver's configuration options or overrides, refer to pageserver's 'config-override' CLI parameter docs for more")
.required(false);
let matches = App::new("Zenith CLI")
.setting(AppSettings::ArgRequiredElseHelp)
.version(GIT_VERSION)
.subcommand(
SubCommand::with_name("init")
.about("Initialize a new Zenith repository")
.arg(pageserver_config_args.clone())
.arg(
Arg::with_name("config")
.long("config")
@@ -142,10 +133,10 @@ fn main() -> Result<()> {
.setting(AppSettings::ArgRequiredElseHelp)
.about("Manage pageserver")
.subcommand(SubCommand::with_name("status"))
.subcommand(SubCommand::with_name("start").about("Start local pageserver").arg(pageserver_config_args.clone()))
.subcommand(SubCommand::with_name("start").about("Start local pageserver"))
.subcommand(SubCommand::with_name("stop").about("Stop local pageserver")
.arg(stop_mode_arg.clone()))
.subcommand(SubCommand::with_name("restart").about("Restart local pageserver").arg(pageserver_config_args))
.subcommand(SubCommand::with_name("restart").about("Restart local pageserver"))
)
.subcommand(
SubCommand::with_name("safekeeper")
@@ -412,7 +403,6 @@ fn handle_init(init_match: &ArgMatches) -> Result<()> {
if let Err(e) = pageserver.init(
// default_tenantid was generated by the `env.init()` call above
Some(&env.default_tenantid.unwrap().to_string()),
&pageserver_config_overrides(init_match),
) {
eprintln!("pageserver init failed: {}", e);
exit(1);
@@ -421,14 +411,6 @@ fn handle_init(init_match: &ArgMatches) -> Result<()> {
Ok(())
}
fn pageserver_config_overrides<'a>(init_match: &'a ArgMatches) -> Vec<&'a str> {
init_match
.values_of("pageserver-config-override")
.into_iter()
.flatten()
.collect()
}
fn handle_tenant(tenant_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
let pageserver = PageServerNode::from_env(env);
match tenant_match.subcommand() {
@@ -590,8 +572,8 @@ fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
let pageserver = PageServerNode::from_env(env);
match sub_match.subcommand() {
("start", Some(start_match)) => {
if let Err(e) = pageserver.start(&pageserver_config_overrides(start_match)) {
("start", Some(_sub_m)) => {
if let Err(e) = pageserver.start() {
eprintln!("pageserver start failed: {}", e);
exit(1);
}
@@ -606,20 +588,22 @@ fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
}
}
("restart", Some(restart_match)) => {
("restart", Some(_sub_m)) => {
//TODO what shutdown strategy should we use here?
if let Err(e) = pageserver.stop(false) {
eprintln!("pageserver stop failed: {}", e);
exit(1);
}
if let Err(e) = pageserver.start(&pageserver_config_overrides(restart_match)) {
if let Err(e) = pageserver.start() {
eprintln!("pageserver start failed: {}", e);
exit(1);
}
}
(sub_name, _) => bail!("Unexpected pageserver subcommand '{}'", sub_name),
(sub_name, _) => {
bail!("Unexpected pageserver subcommand '{}'", sub_name)
}
}
Ok(())
}
@@ -678,12 +662,12 @@ fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
Ok(())
}
fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
fn handle_start_all(_sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
let pageserver = PageServerNode::from_env(env);
// Postgres nodes are not started automatically
if let Err(e) = pageserver.start(&pageserver_config_overrides(sub_match)) {
if let Err(e) = pageserver.start() {
eprintln!("pageserver start failed: {}", e);
exit(1);
}