Compare commits

..

1 Commits

Author SHA1 Message Date
Christian Schwarz
af246e340e DNM: notes on buffered writer integration from current code perspective 2025-04-10 14:58:59 +02:00
7 changed files with 130 additions and 46 deletions

View File

@@ -318,7 +318,7 @@ impl PageServerNode {
self.conf.id, datadir,
)
})?;
let args = vec!["-D", datadir_path_str, "--dev"];
let args = vec!["-D", datadir_path_str];
background_process::start_process(
"pageserver",

View File

@@ -162,7 +162,6 @@ impl SafekeeperNode {
listen_http,
"--availability-zone".to_owned(),
availability_zone,
"--dev".to_owned(),
];
if let Some(pg_tenant_only_port) = self.conf.pg_tenant_only_port {
let listen_pg_tenant_only = format!("{}:{}", self.listen_addr, pg_tenant_only_port);

View File

@@ -9,7 +9,7 @@ use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use anyhow::{Context, anyhow, bail};
use anyhow::{Context, anyhow};
use camino::Utf8Path;
use clap::{Arg, ArgAction, Command};
use http_utils::tls_certs::ReloadingCertificateResolver;
@@ -79,8 +79,6 @@ fn main() -> anyhow::Result<()> {
return Ok(());
}
let dev_mode = arg_matches.get_flag("dev");
// Initialize up failpoints support
let scenario = failpoint_support::init();
@@ -101,20 +99,6 @@ fn main() -> anyhow::Result<()> {
let (conf, ignored) = initialize_config(&identity_file_path, &cfg_file_path, &workdir)?;
if !dev_mode {
if matches!(conf.http_auth_type, AuthType::Trust)
|| matches!(conf.pg_auth_type, AuthType::Trust)
{
bail!(
"Pageserver refuses to start with HTTP or PostgreSQL API authentication disabled.\n\
Run with --dev to allow running without authentication.\n\
This is insecure and should only be used in development environments."
);
}
} else {
warn!("Starting in dev mode: this may be an insecure configuration.");
}
// Initialize logging.
//
// It must be initialized before the custom panic hook is installed below.
@@ -832,12 +816,6 @@ fn cli() -> Command {
.action(ArgAction::SetTrue)
.help("Show enabled compile time features"),
)
.arg(
Arg::new("dev")
.long("dev")
.action(ArgAction::SetTrue)
.help("Run in development mode (disables security checks)"),
)
}
#[test]

View File

@@ -1596,6 +1596,7 @@ impl DeltaLayerIterator<'_> {
#[cfg(test)]
pub(crate) mod test {
use std::collections::BTreeMap;
use std::io::Read;
use bytes::Bytes;
use itertools::MinMaxResult;
@@ -1604,12 +1605,14 @@ pub(crate) mod test {
use rand::prelude::{SeedableRng, SliceRandom, StdRng};
use super::*;
use crate::tenant::remote_timeline_client::LayerFileMetadata;
use crate::DEFAULT_PG_VERSION;
use crate::assert_u64_eq_usize::U64IsUsize;
use crate::context::DownloadBehavior;
use crate::task_mgr::TaskKind;
use crate::tenant::disk_btree::tests::TestDisk;
use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
use crate::tenant::storage_layer::{Layer, ResidentLayer};
use crate::tenant::storage_layer::{IoConcurrency, Layer, ResidentLayer};
use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner;
use crate::tenant::{Tenant, Timeline};
@@ -2305,4 +2308,126 @@ pub(crate) mod test {
}
}
}
#[tokio::test]
async fn test_delta_layer_padding() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_delta_layer_padding").await?;
let (tenant, ctx) = harness.load().await;
let timeline_id = TimelineId::generate();
let timeline = tenant
.create_test_timeline(timeline_id, constants::LSN_OFFSET, DEFAULT_PG_VERSION, &ctx)
.await?;
tracing::info!("Generating test data ...");
fn make_key(id: u32) -> Key {
let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
key.field6 = id;
key
}
let layer_key_range = make_key(0)..make_key(0x100);
let layer_lsn_range = Lsn(0x10)..Lsn(0x20);
let mut writer = DeltaLayerWriter::new(
harness.conf,
timeline_id,
harness.tenant_shard_id,
layer_key_range.start,
layer_lsn_range.clone(),
&ctx,
)
.await?;
let img_key = make_key(0x23);
let img_lsn = Lsn(0x15);
let img_value = b"abcdefgh";
writer
.put_value(
img_key,
img_lsn,
Value::Image(Bytes::from_owner(img_value)),
&ctx,
)
.await?;
let (desc, path) = writer.finish(layer_key_range.end, &ctx).await?;
let md = std::fs::metadata(&path)?;
println!("{md:?}");
let layer = Layer::for_resident(
harness.conf,
&timeline,
path.clone(),
desc.layer_name(),
LayerFileMetadata {
file_size: md.len(),
generation: tenant.generation(),
shard: tenant.shard_identity.shard_index(),
},
);
let layer = layer.drop_eviction_guard();
let io_concurrency = IoConcurrency::sequential();
let mut reconstruct_state = ValuesReconstructState::new(io_concurrency);
layer
.get_values_reconstruct_data(
KeySpace::single(layer_key_range),
layer_lsn_range.clone(),
&mut reconstruct_state,
&ctx,
)
.await?;
assert_eq!(reconstruct_state.keys.len(), 1);
let (found_key, state) = reconstruct_state.keys.drain().next().unwrap();
assert_eq!(found_key, img_key);
let state = state.collect_pending_ios().await?;
assert_eq!(state.records.len(), 0);
let (lsn, img) = state.img.unwrap();
assert_eq!(lsn, Lsn(0x15));
assert_eq!(img, Bytes::from_static(img_value));
// make some assertions about the data layout
let mut buf = Vec::with_capacity(md.len().into_usize());
std::fs::File::open(&path)?.read_to_end(&mut buf)?;
let offsets: Vec<_> = buf
.windows(img_value.len())
.positions(|window| window == img_value)
.collect();
assert_eq!(
offsets.len(),
1,
"img value appears multiple times in bit pattern on disk"
);
let offset = offsets[0];
println!("img value offset: {offset}");
let end = offset + img_value.len();
assert_ne!(
end % PAGE_SZ,
0,
"img value is so short it doesn't fill a full page"
);
let expect_index_block_start = end.next_multiple_of(PAGE_SZ);
assert!(
expect_index_block_start < md.len().into_usize(),
"there is a block after the block that contains the value"
);
let resident = layer.download_and_keep_resident(&ctx).await?;
let delta = resident.get_as_delta(&ctx).await?;
assert_eq!(
expect_index_block_start,
delta.index_start_offset().into_usize(),
"this test's understanding that the next block is the index block is correct"
);
assert_eq!(
expect_index_block_start + PAGE_SZ,
md.len().into_usize(),
"the index block is one block long"
);
Ok(())
}
}

View File

@@ -120,6 +120,8 @@ where
mut self,
ctx: &RequestContext,
) -> Result<(u64, Arc<W>), FlushTaskError> {
// TODO: this is incorrect/infeasible with direct IO because tail may be only be partially filled, e.g., 23 bytes in it.
// The buffer is guaranteed to be aligned, but the write system call will fail with EINVAL because the buffer size is not right.
self.flush(ctx).await?;
let Self {

View File

@@ -226,9 +226,6 @@ struct Args {
/// Path to the JWT auth token used to authenticate with other safekeepers.
#[arg(long)]
auth_token_path: Option<Utf8PathBuf>,
#[arg(long, help = "Run in development mode (disables security checks)")]
dev: bool,
}
// Like PathBufValueParser, but allows empty string.
@@ -346,21 +343,6 @@ async fn main() -> anyhow::Result<()> {
}
};
if !args.dev {
let http_auth_enabled = args.http_auth_public_key_path.is_some();
let pg_auth_enabled = args.pg_auth_public_key_path.is_some();
let pg_tenant_only_auth_enabled = args.pg_tenant_only_auth_public_key_path.is_some();
if !http_auth_enabled || !pg_auth_enabled || !pg_tenant_only_auth_enabled {
bail!(
"Safekeeper refuses to start with HTTP, PostgreSQL, or tenant-only PostgreSQL API authentication disabled.\n\
Run with --dev to allow running without authentication.\n\
This is insecure and should only be used in development environments."
);
}
} else {
warn!("Starting in dev mode: this may be an insecure configuration.");
}
// Load JWT auth token to connect to other safekeepers for pull_timeline.
// First check if the env var is present, then check the arg with the path.
// We want to deprecate and remove the env var method in the future.

View File

@@ -126,8 +126,6 @@ DEFAULT_STORAGE_CONTROLLER_ALLOWED_ERRORS = [
".*startup_reconcile: Could not scan node.*",
# Tests run in dev mode
".*Starting in dev mode.*",
".*Starting in dev mode - authentication security checks are disabled.*",
".*Starting in dev mode: this may be an insecure configuration.*",
# Tests that stop endpoints & use the storage controller's neon_local notification
# mechanism might fail (neon_local's stopping and endpoint isn't atomic wrt the storage
# controller's attempts to notify the endpoint).