Compare commits

..

6 Commits

Author SHA1 Message Date
BodoBolero
bff29cd4f8 Rebased submodule references to match main 2025-02-21 11:59:18 +01:00
Heikki Linnakangas
d5ac582610 Split plv8 build into two parts (#10920)
Plv8 consists of two parts:
1. the V8 engine, which is built from vendored sources, and
2. the PostgreSQL extension.

Split those into two separate steps in the Dockerfile. The first step
doesn't need any PostgreSQL sources or any other files from the neon
repository, just the build tools and the upstream plv8 sources. Use the
build-deps image as the base for that step, so that the layer can be
cached and doesn't need to be rebuilt every time. This is worthwhile
because the V8 build takes a very long time.
2025-02-21 11:59:18 +01:00
BodoBolero
9a86cd0d4f link all duckdb static libs into pg_duckdb.so 2025-02-21 11:50:36 +01:00
BodoBolero
8d68048e73 re-enable pg_duckdb in this PR 2025-02-21 08:50:34 +01:00
Peter Bendel
1a412740a5 Merge branch 'main' into bodobolero/duckdb_static 2025-02-21 08:49:42 +01:00
BodoBolero
860aa0157d link duckdb statically in pg_duckdb to avoid conflict with pg_mooncake's libduckdb.so 2025-02-21 08:06:24 +01:00
24 changed files with 473 additions and 1094 deletions

18
Cargo.lock generated
View File

@@ -1874,12 +1874,6 @@ dependencies = [
"syn 2.0.90",
]
[[package]]
name = "difflib"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8"
[[package]]
name = "digest"
version = "0.10.7"
@@ -3337,17 +3331,6 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "json-structural-diff"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e878e36a8a44c158505c2c818abdc1350413ad83dcb774a0459f6a7ef2b65cbf"
dependencies = [
"difflib",
"regex",
"serde_json",
]
[[package]]
name = "jsonwebtoken"
version = "9.2.0"
@@ -6460,7 +6443,6 @@ dependencies = [
"humantime",
"hyper 0.14.30",
"itertools 0.10.5",
"json-structural-diff",
"lasso",
"measured",
"metrics",

View File

@@ -210,7 +210,6 @@ rustls-native-certs = "0.8"
x509-parser = "0.16"
whoami = "1.5.1"
zerocopy = { version = "0.7", features = ["derive"] }
json-structural-diff = { version = "0.2.0" }
## TODO replace this with tracing
env_logger = "0.10"

View File

@@ -1493,7 +1493,7 @@ FROM pg-build AS pg_duckdb-build
ARG PG_VERSION
COPY --from=pg_duckdb-src /ext-src/ /ext-src/
WORKDIR /ext-src/pg_duckdb-src
RUN make install -j $(getconf _NPROCESSORS_ONLN) && \
RUN make DUCKDB_STATIC=1 install -j $(getconf _NPROCESSORS_ONLN) && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_duckdb.control
#########################################################################################
@@ -1676,11 +1676,7 @@ COPY --from=pg_anon-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_ivm-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_partman-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_mooncake-build /usr/local/pgsql/ /usr/local/pgsql/
# Disabled temporarily, because it clashed with pg_mooncake. pg_mooncake
# also depends on libduckdb, but a different version.
#COPY --from=pg_duckdb-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_duckdb-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_repack-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pgaudit-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pgauditlogtofile-build /usr/local/pgsql/ /usr/local/pgsql/

View File

@@ -1,3 +1,47 @@
diff --git a/Makefile b/Makefile
index 3235cc8..e232052 100644
--- a/Makefile
+++ b/Makefile
@@ -32,7 +32,16 @@ else
DUCKDB_BUILD_TYPE = release
endif
-DUCKDB_LIB = libduckdb$(DLSUFFIX)
+DUCKDB_STATIC ?= 0
+PG_DUCKDB_LINK_FLAGS = -Wl,-rpath,$(PG_LIB)/ -lpq -Lthird_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/src -Lthird_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/extension/json -Lthird_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/extension/parquet -Lthird_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/extension/core_functions -Lthird_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/extension/jemalloc -Lthird_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/extension/icu -Lthird_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/extension/cached_httpfs -Lthird_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/third_party/yyjson -Lthird_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/third_party/miniz -Lthird_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/third_party/zstd -Lthird_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/third_party/skiplist -Lthird_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/third_party/mbedtls -Lthird_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/third_party/utf8proc -Lthird_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/third_party/hyperloglog -Lthird_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/third_party/fastpforlib -Lthird_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/third_party/re2 -Lthird_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/third_party/libpg_query -Lthird_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/third_party/fsst -Lthird_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/third_party/fmt -L$(PG_LIB) -lstdc++ -llz4
+ifeq ($(DUCKDB_STATIC), 1)
+ DUCKDB_LIB = libduckdb_static.a
+ PG_DUCKDB_LINK_FLAGS += -l:$(DUCKDB_LIB) -l:libjson_extension.a -l:libparquet_extension.a -l:libcore_functions_extension.a -l:libjemalloc_extension.a -l:libicu_extension.a -l:libcached_httpfs_extension.a -l:libduckdb_yyjson.a -l:libduckdb_miniz.a -l:libduckdb_zstd.a -l:libduckdb_skiplistlib.a -l:libduckdb_mbedtls.a -l:libduckdb_utf8proc.a -l:libduckdb_hyperloglog.a -l:libduckdb_fastpforlib.a -l:libduckdb_re2.a -l:libduckdb_pg_query.a -l:libduckdb_fsst.a -l:libduckdb_fmt.a
+else
+ DUCKDB_LIB = libduckdb$(DLSUFFIX)
+ PG_DUCKDB_LINK_FLAGS += -lduckdb
+endif
+
FULL_DUCKDB_LIB = third_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/src/$(DUCKDB_LIB)
ERROR_ON_WARNING ?=
@@ -54,7 +63,7 @@ override PG_CXXFLAGS += -std=c++17 ${DUCKDB_BUILD_CXX_FLAGS} ${COMPILER_FLAGS} -
# changes to the vendored code in one place.
override PG_CFLAGS += -Wno-declaration-after-statement
-SHLIB_LINK += -Wl,-rpath,$(PG_LIB)/ -lpq -Lthird_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/src -L$(PG_LIB) -lduckdb -lstdc++ -llz4
+SHLIB_LINK += $(PG_DUCKDB_LINK_FLAGS)
include Makefile.global
diff --git a/docs/compilation.md b/docs/compilation.md
index e25396f..b94a63a 100644
--- a/docs/compilation.md
+++ b/docs/compilation.md
@@ -14,6 +14,8 @@ To build and install, run:
make install
```
+If you want to link `libduckdb` statically, set `DUCKDB_STATIC=1` when interacting with `make(1)`.
+
Add `pg_duckdb` to the `shared_preload_libraries` in your `postgresql.conf` file:
```ini
diff --git a/sql/pg_duckdb--0.2.0--0.3.0.sql b/sql/pg_duckdb--0.2.0--0.3.0.sql
index d777d76..af60106 100644
--- a/sql/pg_duckdb--0.2.0--0.3.0.sql

View File

@@ -335,21 +335,13 @@ impl PageServerNode {
.map(|x| x.parse::<u64>())
.transpose()
.context("Failed to parse 'checkpoint_distance' as an integer")?,
checkpoint_timeout: settings
.remove("checkpoint_timeout")
.map(humantime::parse_duration)
.transpose()
.context("Failed to parse 'checkpoint_timeout' as duration")?,
checkpoint_timeout: settings.remove("checkpoint_timeout").map(|x| x.to_string()),
compaction_target_size: settings
.remove("compaction_target_size")
.map(|x| x.parse::<u64>())
.transpose()
.context("Failed to parse 'compaction_target_size' as an integer")?,
compaction_period: settings
.remove("compaction_period")
.map(humantime::parse_duration)
.transpose()
.context("Failed to parse 'compaction_period' as duration")?,
compaction_period: settings.remove("compaction_period").map(|x| x.to_string()),
compaction_threshold: settings
.remove("compaction_threshold")
.map(|x| x.parse::<usize>())
@@ -395,10 +387,7 @@ impl PageServerNode {
.map(|x| x.parse::<u64>())
.transpose()
.context("Failed to parse 'gc_horizon' as an integer")?,
gc_period: settings.remove("gc_period")
.map(humantime::parse_duration)
.transpose()
.context("Failed to parse 'gc_period' as duration")?,
gc_period: settings.remove("gc_period").map(|x| x.to_string()),
image_creation_threshold: settings
.remove("image_creation_threshold")
.map(|x| x.parse::<usize>())
@@ -414,20 +403,13 @@ impl PageServerNode {
.map(|x| x.parse::<usize>())
.transpose()
.context("Failed to parse 'image_creation_preempt_threshold' as integer")?,
pitr_interval: settings.remove("pitr_interval")
.map(humantime::parse_duration)
.transpose()
.context("Failed to parse 'pitr_interval' as duration")?,
pitr_interval: settings.remove("pitr_interval").map(|x| x.to_string()),
walreceiver_connect_timeout: settings
.remove("walreceiver_connect_timeout")
.map(humantime::parse_duration)
.transpose()
.context("Failed to parse 'walreceiver_connect_timeout' as duration")?,
.map(|x| x.to_string()),
lagging_wal_timeout: settings
.remove("lagging_wal_timeout")
.map(humantime::parse_duration)
.transpose()
.context("Failed to parse 'lagging_wal_timeout' as duration")?,
.map(|x| x.to_string()),
max_lsn_wal_lag: settings
.remove("max_lsn_wal_lag")
.map(|x| x.parse::<NonZeroU64>())
@@ -445,14 +427,8 @@ impl PageServerNode {
.context("Failed to parse 'min_resident_size_override' as integer")?,
evictions_low_residence_duration_metric_threshold: settings
.remove("evictions_low_residence_duration_metric_threshold")
.map(humantime::parse_duration)
.transpose()
.context("Failed to parse 'evictions_low_residence_duration_metric_threshold' as duration")?,
heatmap_period: settings
.remove("heatmap_period")
.map(humantime::parse_duration)
.transpose()
.context("Failed to parse 'heatmap_period' as duration")?,
.map(|x| x.to_string()),
heatmap_period: settings.remove("heatmap_period").map(|x| x.to_string()),
lazy_slru_download: settings
.remove("lazy_slru_download")
.map(|x| x.parse::<bool>())
@@ -463,15 +439,10 @@ impl PageServerNode {
.map(serde_json::from_str)
.transpose()
.context("parse `timeline_get_throttle` from json")?,
lsn_lease_length: settings.remove("lsn_lease_length")
.map(humantime::parse_duration)
.transpose()
.context("Failed to parse 'lsn_lease_length' as duration")?,
lsn_lease_length: settings.remove("lsn_lease_length").map(|x| x.to_string()),
lsn_lease_length_for_ts: settings
.remove("lsn_lease_length_for_ts")
.map(humantime::parse_duration)
.transpose()
.context("Failed to parse 'lsn_lease_length_for_ts' as duration")?,
.map(|x| x.to_string()),
timeline_offloading: settings
.remove("timeline_offloading")
.map(|x| x.parse::<bool>())

View File

@@ -959,7 +959,7 @@ async fn main() -> anyhow::Result<()> {
threshold: threshold.into(),
},
)),
heatmap_period: Some(Duration::from_secs(300)),
heatmap_period: Some("300s".to_string()),
..Default::default()
},
})

View File

@@ -526,13 +526,9 @@ pub struct TenantConfigPatch {
#[derive(Serialize, Deserialize, Debug, Default, Clone, Eq, PartialEq)]
pub struct TenantConfig {
pub checkpoint_distance: Option<u64>,
#[serde(default)]
#[serde(with = "humantime_serde")]
pub checkpoint_timeout: Option<Duration>,
pub checkpoint_timeout: Option<String>,
pub compaction_target_size: Option<u64>,
#[serde(default)]
#[serde(with = "humantime_serde")]
pub compaction_period: Option<Duration>,
pub compaction_period: Option<String>,
pub compaction_threshold: Option<usize>,
pub compaction_upper_limit: Option<usize>,
// defer parsing compaction_algorithm, like eviction_policy
@@ -543,38 +539,22 @@ pub struct TenantConfig {
pub l0_flush_stall_threshold: Option<usize>,
pub l0_flush_wait_upload: Option<bool>,
pub gc_horizon: Option<u64>,
#[serde(default)]
#[serde(with = "humantime_serde")]
pub gc_period: Option<Duration>,
pub gc_period: Option<String>,
pub image_creation_threshold: Option<usize>,
#[serde(default)]
#[serde(with = "humantime_serde")]
pub pitr_interval: Option<Duration>,
#[serde(default)]
#[serde(with = "humantime_serde")]
pub walreceiver_connect_timeout: Option<Duration>,
#[serde(default)]
#[serde(with = "humantime_serde")]
pub lagging_wal_timeout: Option<Duration>,
pub pitr_interval: Option<String>,
pub walreceiver_connect_timeout: Option<String>,
pub lagging_wal_timeout: Option<String>,
pub max_lsn_wal_lag: Option<NonZeroU64>,
pub eviction_policy: Option<EvictionPolicy>,
pub min_resident_size_override: Option<u64>,
#[serde(default)]
#[serde(with = "humantime_serde")]
pub evictions_low_residence_duration_metric_threshold: Option<Duration>,
#[serde(default)]
#[serde(with = "humantime_serde")]
pub heatmap_period: Option<Duration>,
pub evictions_low_residence_duration_metric_threshold: Option<String>,
pub heatmap_period: Option<String>,
pub lazy_slru_download: Option<bool>,
pub timeline_get_throttle: Option<ThrottleConfig>,
pub image_layer_creation_check_threshold: Option<u8>,
pub image_creation_preempt_threshold: Option<usize>,
#[serde(default)]
#[serde(with = "humantime_serde")]
pub lsn_lease_length: Option<Duration>,
#[serde(default)]
#[serde(with = "humantime_serde")]
pub lsn_lease_length_for_ts: Option<Duration>,
pub lsn_lease_length: Option<String>,
pub lsn_lease_length_for_ts: Option<String>,
pub timeline_offloading: Option<bool>,
pub wal_receiver_protocol_override: Option<PostgresClientProtocol>,
pub rel_size_v2_enabled: Option<bool>,
@@ -584,10 +564,7 @@ pub struct TenantConfig {
}
impl TenantConfig {
pub fn apply_patch(
self,
patch: TenantConfigPatch,
) -> Result<TenantConfig, humantime::DurationError> {
pub fn apply_patch(self, patch: TenantConfigPatch) -> TenantConfig {
let Self {
mut checkpoint_distance,
mut checkpoint_timeout,
@@ -627,17 +604,11 @@ impl TenantConfig {
} = self;
patch.checkpoint_distance.apply(&mut checkpoint_distance);
patch
.checkpoint_timeout
.map(|v| humantime::parse_duration(&v))?
.apply(&mut checkpoint_timeout);
patch.checkpoint_timeout.apply(&mut checkpoint_timeout);
patch
.compaction_target_size
.apply(&mut compaction_target_size);
patch
.compaction_period
.map(|v| humantime::parse_duration(&v))?
.apply(&mut compaction_period);
patch.compaction_period.apply(&mut compaction_period);
patch.compaction_threshold.apply(&mut compaction_threshold);
patch
.compaction_upper_limit
@@ -655,25 +626,15 @@ impl TenantConfig {
.apply(&mut l0_flush_stall_threshold);
patch.l0_flush_wait_upload.apply(&mut l0_flush_wait_upload);
patch.gc_horizon.apply(&mut gc_horizon);
patch
.gc_period
.map(|v| humantime::parse_duration(&v))?
.apply(&mut gc_period);
patch.gc_period.apply(&mut gc_period);
patch
.image_creation_threshold
.apply(&mut image_creation_threshold);
patch
.pitr_interval
.map(|v| humantime::parse_duration(&v))?
.apply(&mut pitr_interval);
patch.pitr_interval.apply(&mut pitr_interval);
patch
.walreceiver_connect_timeout
.map(|v| humantime::parse_duration(&v))?
.apply(&mut walreceiver_connect_timeout);
patch
.lagging_wal_timeout
.map(|v| humantime::parse_duration(&v))?
.apply(&mut lagging_wal_timeout);
patch.lagging_wal_timeout.apply(&mut lagging_wal_timeout);
patch.max_lsn_wal_lag.apply(&mut max_lsn_wal_lag);
patch.eviction_policy.apply(&mut eviction_policy);
patch
@@ -681,12 +642,8 @@ impl TenantConfig {
.apply(&mut min_resident_size_override);
patch
.evictions_low_residence_duration_metric_threshold
.map(|v| humantime::parse_duration(&v))?
.apply(&mut evictions_low_residence_duration_metric_threshold);
patch
.heatmap_period
.map(|v| humantime::parse_duration(&v))?
.apply(&mut heatmap_period);
patch.heatmap_period.apply(&mut heatmap_period);
patch.lazy_slru_download.apply(&mut lazy_slru_download);
patch
.timeline_get_throttle
@@ -697,13 +654,9 @@ impl TenantConfig {
patch
.image_creation_preempt_threshold
.apply(&mut image_creation_preempt_threshold);
patch
.lsn_lease_length
.map(|v| humantime::parse_duration(&v))?
.apply(&mut lsn_lease_length);
patch.lsn_lease_length.apply(&mut lsn_lease_length);
patch
.lsn_lease_length_for_ts
.map(|v| humantime::parse_duration(&v))?
.apply(&mut lsn_lease_length_for_ts);
patch.timeline_offloading.apply(&mut timeline_offloading);
patch
@@ -720,7 +673,7 @@ impl TenantConfig {
.gc_compaction_ratio_percent
.apply(&mut gc_compaction_ratio_percent);
Ok(Self {
Self {
checkpoint_distance,
checkpoint_timeout,
compaction_target_size,
@@ -756,7 +709,7 @@ impl TenantConfig {
gc_compaction_enabled,
gc_compaction_initial_threshold_kb,
gc_compaction_ratio_percent,
})
}
}
}
@@ -2550,7 +2503,7 @@ mod tests {
..base.clone()
};
let patched = base.apply_patch(decoded.config).unwrap();
let patched = base.apply_patch(decoded.config);
assert_eq!(patched, expected);
}

View File

@@ -693,15 +693,16 @@ impl TryFrom<&'_ models::TenantConfig> for TenantConfOpt {
/// This is a conversion from our internal tenant config object to the one used
/// in external APIs.
impl From<TenantConfOpt> for models::TenantConfig {
// TODO(vlad): These are now the same, but they have different serialization logic.
// Can we merge them?
fn from(value: TenantConfOpt) -> Self {
fn humantime(d: Duration) -> String {
format!("{}s", d.as_secs())
}
Self {
checkpoint_distance: value.checkpoint_distance,
checkpoint_timeout: value.checkpoint_timeout,
checkpoint_timeout: value.checkpoint_timeout.map(humantime),
compaction_algorithm: value.compaction_algorithm,
compaction_target_size: value.compaction_target_size,
compaction_period: value.compaction_period,
compaction_period: value.compaction_period.map(humantime),
compaction_threshold: value.compaction_threshold,
compaction_upper_limit: value.compaction_upper_limit,
compaction_l0_first: value.compaction_l0_first,
@@ -710,23 +711,24 @@ impl From<TenantConfOpt> for models::TenantConfig {
l0_flush_stall_threshold: value.l0_flush_stall_threshold,
l0_flush_wait_upload: value.l0_flush_wait_upload,
gc_horizon: value.gc_horizon,
gc_period: value.gc_period,
gc_period: value.gc_period.map(humantime),
image_creation_threshold: value.image_creation_threshold,
pitr_interval: value.pitr_interval,
walreceiver_connect_timeout: value.walreceiver_connect_timeout,
lagging_wal_timeout: value.lagging_wal_timeout,
pitr_interval: value.pitr_interval.map(humantime),
walreceiver_connect_timeout: value.walreceiver_connect_timeout.map(humantime),
lagging_wal_timeout: value.lagging_wal_timeout.map(humantime),
max_lsn_wal_lag: value.max_lsn_wal_lag,
eviction_policy: value.eviction_policy,
min_resident_size_override: value.min_resident_size_override,
evictions_low_residence_duration_metric_threshold: value
.evictions_low_residence_duration_metric_threshold,
heatmap_period: value.heatmap_period,
.evictions_low_residence_duration_metric_threshold
.map(humantime),
heatmap_period: value.heatmap_period.map(humantime),
lazy_slru_download: value.lazy_slru_download,
timeline_get_throttle: value.timeline_get_throttle,
image_layer_creation_check_threshold: value.image_layer_creation_check_threshold,
image_creation_preempt_threshold: value.image_creation_preempt_threshold,
lsn_lease_length: value.lsn_lease_length,
lsn_lease_length_for_ts: value.lsn_lease_length_for_ts,
lsn_lease_length: value.lsn_lease_length.map(humantime),
lsn_lease_length_for_ts: value.lsn_lease_length_for_ts.map(humantime),
timeline_offloading: value.timeline_offloading,
wal_receiver_protocol_override: value.wal_receiver_protocol_override,
rel_size_v2_enabled: value.rel_size_v2_enabled,
@@ -758,10 +760,29 @@ mod tests {
assert_eq!(small_conf, serde_json::from_str(&json_form).unwrap());
}
#[test]
fn test_try_from_models_tenant_config_err() {
let tenant_config = models::TenantConfig {
lagging_wal_timeout: Some("5a".to_string()),
..TenantConfig::default()
};
let tenant_conf_opt = TenantConfOpt::try_from(&tenant_config);
assert!(
tenant_conf_opt.is_err(),
"Suceeded to convert TenantConfig to TenantConfOpt"
);
let expected_error_str =
"lagging_wal_timeout: invalid value: string \"5a\", expected a duration";
assert_eq!(tenant_conf_opt.unwrap_err().to_string(), expected_error_str);
}
#[test]
fn test_try_from_models_tenant_config_success() {
let tenant_config = models::TenantConfig {
lagging_wal_timeout: Some(Duration::from_secs(5)),
lagging_wal_timeout: Some("5s".to_string()),
..TenantConfig::default()
};

View File

@@ -136,9 +136,7 @@ impl WalRedoProcess {
Ok(0) => break Ok(()), // eof
Ok(num_bytes) => {
let output = String::from_utf8_lossy(&buf[..num_bytes]);
if !output.contains("LOG:") {
error!(%output, "received output");
}
error!(%output, "received output");
}
Err(e) => {
break Err(e);

File diff suppressed because it is too large Load Diff

View File

@@ -56,7 +56,6 @@ uint32 WAIT_EVENT_NEON_LFC_MAINTENANCE;
uint32 WAIT_EVENT_NEON_LFC_READ;
uint32 WAIT_EVENT_NEON_LFC_TRUNCATE;
uint32 WAIT_EVENT_NEON_LFC_WRITE;
uint32 WAIT_EVENT_NEON_LFC_CV_WAIT;
uint32 WAIT_EVENT_NEON_PS_STARTING;
uint32 WAIT_EVENT_NEON_PS_CONFIGURING;
uint32 WAIT_EVENT_NEON_PS_SEND;
@@ -539,7 +538,6 @@ neon_shmem_startup_hook(void)
WAIT_EVENT_NEON_LFC_READ = WaitEventExtensionNew("Neon/FileCache_Read");
WAIT_EVENT_NEON_LFC_TRUNCATE = WaitEventExtensionNew("Neon/FileCache_Truncate");
WAIT_EVENT_NEON_LFC_WRITE = WaitEventExtensionNew("Neon/FileCache_Write");
WAIT_EVENT_NEON_LFC_CV_WAIT = WaitEventExtensionNew("Neon/FileCache_CvWait");
WAIT_EVENT_NEON_PS_STARTING = WaitEventExtensionNew("Neon/PS_Starting");
WAIT_EVENT_NEON_PS_CONFIGURING = WaitEventExtensionNew("Neon/PS_Configuring");
WAIT_EVENT_NEON_PS_SEND = WaitEventExtensionNew("Neon/PS_SendIO");

View File

@@ -28,7 +28,6 @@ extern uint32 WAIT_EVENT_NEON_LFC_MAINTENANCE;
extern uint32 WAIT_EVENT_NEON_LFC_READ;
extern uint32 WAIT_EVENT_NEON_LFC_TRUNCATE;
extern uint32 WAIT_EVENT_NEON_LFC_WRITE;
extern uint32 WAIT_EVENT_NEON_LFC_CV_WAIT;
extern uint32 WAIT_EVENT_NEON_PS_STARTING;
extern uint32 WAIT_EVENT_NEON_PS_CONFIGURING;
extern uint32 WAIT_EVENT_NEON_PS_SEND;
@@ -39,7 +38,6 @@ extern uint32 WAIT_EVENT_NEON_WAL_DL;
#define WAIT_EVENT_NEON_LFC_READ WAIT_EVENT_BUFFILE_READ
#define WAIT_EVENT_NEON_LFC_TRUNCATE WAIT_EVENT_BUFFILE_TRUNCATE
#define WAIT_EVENT_NEON_LFC_WRITE WAIT_EVENT_BUFFILE_WRITE
#define WAIT_EVENT_NEON_LFC_CV_WAIT WAIT_EVENT_BUFFILE_READ
#define WAIT_EVENT_NEON_PS_STARTING PG_WAIT_EXTENSION
#define WAIT_EVENT_NEON_PS_CONFIGURING PG_WAIT_EXTENSION
#define WAIT_EVENT_NEON_PS_SEND PG_WAIT_EXTENSION

View File

@@ -233,7 +233,6 @@ extern char *neon_timeline;
extern char *neon_tenant;
extern int32 max_cluster_size;
extern int neon_protocol_version;
extern bool lfc_store_prefetch_result;
extern shardno_t get_shard_number(BufferTag* tag);
@@ -302,16 +301,14 @@ extern bool lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno);
extern int lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno, int nblocks, bits8 *bitmap);
extern void lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno);
extern void lfc_init(void);
extern bool lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
const void* buffer, XLogRecPtr lsn);
static inline bool
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
void *buffer)
{
bits8 rv = 1;
bits8 rv = 0;
return lfc_readv_select(rinfo, forkNum, blkno, &buffer, 1, &rv) == 1;
}

View File

@@ -162,7 +162,7 @@ static uint32 local_request_counter;
* UNUSED ------> REQUESTED --> RECEIVED
* ^ : | |
* | : v |
* | : TAG_REMAINS |
* | : TAG_UNUSED |
* | : | |
* +----------------+------------+
* :
@@ -181,7 +181,7 @@ typedef enum PrefetchStatus
/* must fit in uint8; bits 0x1 are used */
typedef enum {
PRFSF_NONE = 0x0,
PRFSF_LFC = 0x1 /* received prefetch result is stored in LFC */
PRFSF_SEQ = 0x1,
} PrefetchRequestFlags;
typedef struct PrefetchRequest
@@ -305,7 +305,7 @@ GetLastWrittenLSNv(NRelFileInfo relfilenode, ForkNumber forknum,
static void
neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum,
BlockNumber blkno, neon_request_lsns *output,
BlockNumber nblocks);
BlockNumber nblocks, const bits8 *mask);
static bool neon_prefetch_response_usable(neon_request_lsns *request_lsns,
PrefetchRequest *slot);
@@ -363,7 +363,6 @@ compact_prefetch_buffers(void)
target_slot->buftag = source_slot->buftag;
target_slot->shard_no = source_slot->shard_no;
target_slot->status = source_slot->status;
target_slot->flags = source_slot->flags;
target_slot->response = source_slot->response;
target_slot->reqid = source_slot->reqid;
target_slot->request_lsns = source_slot->request_lsns;
@@ -453,18 +452,6 @@ prefetch_pump_state(void)
/* update slot state */
slot->status = PRFS_RECEIVED;
slot->response = response;
if (response->tag == T_NeonGetPageResponse && !(slot->flags & PRFSF_LFC) && lfc_store_prefetch_result)
{
/*
* Store prefetched result in LFC (please read comments to lfc_prefetch
* explaining why it can be done without holding shared buffer lock
*/
if (lfc_prefetch(BufTagGetNRelFileInfo(slot->buftag), slot->buftag.forkNum, slot->buftag.blockNum, ((NeonGetPageResponse*)response)->page, slot->request_lsns.not_modified_since))
{
slot->flags |= PRFSF_LFC;
}
}
}
}
@@ -726,18 +713,6 @@ prefetch_read(PrefetchRequest *slot)
/* update slot state */
slot->status = PRFS_RECEIVED;
slot->response = response;
if (response->tag == T_NeonGetPageResponse && !(slot->flags & PRFSF_LFC) && lfc_store_prefetch_result)
{
/*
* Store prefetched result in LFC (please read comments to lfc_prefetch
* explaining why it can be done without holding shared buffer lock
*/
if (lfc_prefetch(BufTagGetNRelFileInfo(buftag), buftag.forkNum, buftag.blockNum, ((NeonGetPageResponse*)response)->page, slot->request_lsns.not_modified_since))
{
slot->flags |= PRFSF_LFC;
}
}
return true;
}
else
@@ -889,7 +864,7 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
else
neon_get_request_lsns(BufTagGetNRelFileInfo(slot->buftag),
slot->buftag.forkNum, slot->buftag.blockNum,
&slot->request_lsns, 1);
&slot->request_lsns, 1, NULL);
request.hdr.lsn = slot->request_lsns.request_lsn;
request.hdr.not_modified_since = slot->request_lsns.not_modified_since;
@@ -915,73 +890,6 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
Assert(!found);
}
/*
* Lookup of already received prefetch requests. Only already received responses matching required LSNs are accepted.
* Present pages are marked in "mask" bitmap and total number of such pages is returned.
*/
static int
prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blocknum, neon_request_lsns *lsns,
BlockNumber nblocks, void **buffers, bits8 *mask)
{
int hits = 0;
PrefetchRequest hashkey;
/*
* Use an intermediate PrefetchRequest struct as the hash key to ensure
* correct alignment and that the padding bytes are cleared.
*/
memset(&hashkey.buftag, 0, sizeof(BufferTag));
CopyNRelFileInfoToBufTag(hashkey.buftag, rinfo);
hashkey.buftag.forkNum = forknum;
for (int i = 0; i < nblocks; i++)
{
PrfHashEntry *entry;
hashkey.buftag.blockNum = blocknum + i;
entry = prfh_lookup(MyPState->prf_hash, &hashkey);
if (entry != NULL)
{
PrefetchRequest *slot = entry->slot;
uint64 ring_index = slot->my_ring_index;
Assert(slot == GetPrfSlot(ring_index));
Assert(slot->status != PRFS_UNUSED);
Assert(MyPState->ring_last <= ring_index &&
ring_index < MyPState->ring_unused);
Assert(BufferTagsEqual(&slot->buftag, &hashkey.buftag));
if (slot->status != PRFS_RECEIVED)
continue;
/*
* If the caller specified a request LSN to use, only accept
* prefetch responses that satisfy that request.
*/
if (!neon_prefetch_response_usable(&lsns[i], slot))
continue;
memcpy(buffers[i], ((NeonGetPageResponse*)slot->response)->page, BLCKSZ);
prefetch_set_unused(ring_index);
BITMAP_SET(mask, i);
hits += 1;
}
}
pgBufferUsage.prefetch.hits += hits;
return hits;
}
#if PG_MAJORVERSION_NUM < 17
static bool
prefetch_lookup(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkn, neon_request_lsns *lsns, void *buffer)
{
bits8 present = 0;
return prefetch_lookupv(rinfo, forkNum, blkn, lsns, 1, &buffer, &present) != 0;
}
#endif
/*
* prefetch_register_bufferv() - register and prefetch buffers
*
@@ -1105,6 +1013,8 @@ Retry:
/* The buffered request is good enough, return that index */
if (is_prefetch)
pgBufferUsage.prefetch.duplicates++;
else
pgBufferUsage.prefetch.hits++;
continue;
}
}
@@ -1206,7 +1116,6 @@ Retry:
slot->buftag = hashkey.buftag;
slot->shard_no = get_shard_number(&tag);
slot->my_ring_index = ring_index;
slot->flags = 0;
min_ring_index = Min(min_ring_index, ring_index);
@@ -2147,7 +2056,8 @@ GetLastWrittenLSNv(NRelFileInfo relfilenode, ForkNumber forknum,
*/
static void
neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
neon_request_lsns *output, BlockNumber nblocks)
neon_request_lsns *output, BlockNumber nblocks,
const bits8 *mask)
{
XLogRecPtr last_written_lsns[PG_IOV_MAX];
@@ -2235,6 +2145,9 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
neon_request_lsns *result = &output[i];
XLogRecPtr last_written_lsn = last_written_lsns[i];
if (PointerIsValid(mask) && !BITMAP_ISSET(mask, i))
continue;
if (last_written_lsn > replay_lsn)
{
/* GetCurrentReplayRecPtr was introduced in v15 */
@@ -2277,6 +2190,8 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
neon_request_lsns *result = &output[i];
XLogRecPtr last_written_lsn = last_written_lsns[i];
if (PointerIsValid(mask) && !BITMAP_ISSET(mask, i))
continue;
/*
* Use the latest LSN that was evicted from the buffer cache as the
* 'not_modified_since' hint. Any pages modified by later WAL records
@@ -2498,7 +2413,7 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
}
neon_get_request_lsns(InfoFromSMgrRel(reln), forkNum,
REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1);
REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1, NULL);
{
NeonExistsRequest request = {
.hdr.tag = T_NeonExistsRequest,
@@ -2917,7 +2832,8 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
while (nblocks > 0)
{
int iterblocks = Min(nblocks, PG_IOV_MAX);
bits8 lfc_present[PG_IOV_MAX / 8] = {0};
bits8 lfc_present[PG_IOV_MAX / 8];
memset(lfc_present, 0, sizeof(lfc_present));
if (lfc_cache_containsv(InfoFromSMgrRel(reln), forknum, blocknum,
iterblocks, lfc_present) == iterblocks)
@@ -2928,13 +2844,12 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
}
tag.blockNum = blocknum;
for (int i = 0; i < PG_IOV_MAX / 8; i++)
lfc_present[i] = ~(lfc_present[i]);
ring_index = prefetch_register_bufferv(tag, NULL, iterblocks,
lfc_present, true);
nblocks -= iterblocks;
blocknum += iterblocks;
@@ -3190,8 +3105,7 @@ Retry:
}
}
memcpy(buffer, getpage_resp->page, BLCKSZ);
if (!lfc_store_prefetch_result)
lfc_write(rinfo, forkNum, blockno, buffer);
lfc_write(rinfo, forkNum, blockno, buffer);
break;
}
case T_NeonErrorResponse:
@@ -3276,17 +3190,6 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
}
/* Try to read PS results if they are available */
prefetch_pump_state();
neon_get_request_lsns(InfoFromSMgrRel(reln), forkNum, blkno, &request_lsns, 1);
if (prefetch_lookup(InfoFromSMgrRel(reln), forkNum, blkno, &request_lsns, buffer))
{
/* Prefetch hit */
return;
}
/* Try to read from local file cache */
if (lfc_read(InfoFromSMgrRel(reln), forkNum, blkno, buffer))
{
@@ -3294,11 +3197,9 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
return;
}
neon_get_request_lsns(InfoFromSMgrRel(reln), forkNum, blkno, &request_lsns, 1, NULL);
neon_read_at_lsn(InfoFromSMgrRel(reln), forkNum, blkno, request_lsns, buffer);
/*
* Try to receive prefetch results once again just to make sure we don't leave the smgr code while the OS might still have buffered bytes.
*/
prefetch_pump_state();
#ifdef DEBUG_COMPARE_LOCAL
@@ -3379,14 +3280,11 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
#if PG_MAJORVERSION_NUM >= 17
static void
neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
void **buffers, BlockNumber nblocks)
void **buffers, BlockNumber nblocks)
{
bits8 prefetch_hits[PG_IOV_MAX / 8] = {0};
bits8 lfc_hits[PG_IOV_MAX / 8];
bits8 read[PG_IOV_MAX / 8];
neon_request_lsns request_lsns[PG_IOV_MAX];
int lfc_result;
int prefetch_result;
switch (reln->smgr_relpersistence)
{
@@ -3409,52 +3307,38 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
neon_log(ERROR, "Read request too large: %d is larger than max %d",
nblocks, PG_IOV_MAX);
/* Try to read PS results if they are available */
prefetch_pump_state();
neon_get_request_lsns(InfoFromSMgrRel(reln), forknum, blocknum,
request_lsns, nblocks);
prefetch_result = prefetch_lookupv(InfoFromSMgrRel(reln), forknum, blocknum, request_lsns, nblocks, buffers, prefetch_hits);
if (prefetch_result == nblocks)
return;
/* invert the result: exclude prefetched blocks */
for (int i = 0; i < PG_IOV_MAX / 8; i++)
lfc_hits[i] = ~prefetch_hits[i];
memset(read, 0, sizeof(read));
/* Try to read from local file cache */
lfc_result = lfc_readv_select(InfoFromSMgrRel(reln), forknum, blocknum, buffers,
nblocks, lfc_hits);
nblocks, read);
if (lfc_result > 0)
MyNeonCounters->file_cache_hits_total += lfc_result;
/* Read all blocks from LFC, so we're done */
if (prefetch_result + lfc_result == nblocks)
if (lfc_result == nblocks)
return;
if (lfc_result <= 0)
if (lfc_result == -1)
{
/* can't use the LFC result, so read all blocks from PS */
for (int i = 0; i < PG_IOV_MAX / 8; i++)
read[i] = ~prefetch_hits[i];
read[i] = 0xFF;
}
else
{
/* invert the result: exclude blocks read from lfc */
for (int i = 0; i < PG_IOV_MAX / 8; i++)
read[i] = ~(prefetch_hits[i] | lfc_hits[i]);
read[i] = ~(read[i]);
}
neon_get_request_lsns(InfoFromSMgrRel(reln), forknum, blocknum,
request_lsns, nblocks, read);
neon_read_at_lsnv(InfoFromSMgrRel(reln), forknum, blocknum, request_lsns,
buffers, nblocks, read);
/*
* Try to receive prefetch results once again just to make sure we don't leave the smgr code while the OS might still have buffered bytes.
*/
prefetch_pump_state();
#ifdef DEBUG_COMPARE_LOCAL
@@ -3726,7 +3610,7 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
}
neon_get_request_lsns(InfoFromSMgrRel(reln), forknum,
REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1);
REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1, NULL);
{
NeonNblocksRequest request = {
@@ -3811,7 +3695,7 @@ neon_dbsize(Oid dbNode)
NRelFileInfo dummy_node = {0};
neon_get_request_lsns(dummy_node, MAIN_FORKNUM,
REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1);
REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1, NULL);
{
NeonDbSizeRequest request = {
@@ -4546,12 +4430,7 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
if (no_redo_needed)
{
SetLastWrittenLSNForBlock(end_recptr, rinfo, forknum, blkno);
/*
* Redo changes if page exists in LFC.
* We should perform this check after assigning LwLSN to prevent
* prefetching of some older version of the page by some other backend.
*/
no_redo_needed = !lfc_cache_contains(rinfo, forknum, blkno);
lfc_evict(rinfo, forknum, blkno);
}
LWLockRelease(partitionLock);

View File

@@ -32,8 +32,8 @@
#include "inmem_smgr.h"
/* Size of the in-memory smgr: XLR_MAX_BLOCK_ID is 32, but we can update up to 3 forks for each block */
#define MAX_PAGES 100
/* Size of the in-memory smgr */
#define MAX_PAGES 64
/* If more than WARN_PAGES are used, print a warning in the log */
#define WARN_PAGES 32
@@ -285,12 +285,12 @@ inmem_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
* WARN_PAGES, print a warning so that we get alerted and get to
* investigate why we're accessing so many buffers.
*/
if (used_pages >= WARN_PAGES)
ereport(WARNING, (errmsg("inmem_write() called for %u/%u/%u.%u blk %u: used_pages %u",
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forknum,
blocknum,
used_pages), errbacktrace()));
elog(used_pages >= WARN_PAGES ? WARNING : DEBUG1,
"inmem_write() called for %u/%u/%u.%u blk %u: used_pages %u",
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forknum,
blocknum,
used_pages);
if (used_pages == MAX_PAGES)
elog(ERROR, "Inmem storage overflow");

View File

@@ -142,7 +142,7 @@ static BufferTag target_redo_tag;
static XLogReaderState *reader_state;
#define TRACE DEBUG1
#define TRACE LOG
#ifdef HAVE_LIBSECCOMP
@@ -194,7 +194,6 @@ static PgSeccompRule allowed_syscalls[] =
* is stored in MyProcPid anyway.
*/
PG_SCMP_ALLOW(getpid),
PG_SCMP_ALLOW(futex), /* needed for errbacktrace */
/* Enable those for a proper shutdown. */
#if 0
@@ -254,7 +253,7 @@ WalRedoMain(int argc, char *argv[])
* which is super strange but that's not something we can solve
* for here. ¯\_(-_-)_/¯
*/
SetConfigOption("log_min_messages", "WARNING", PGC_SUSET, PGC_S_OVERRIDE);
SetConfigOption("log_min_messages", "FATAL", PGC_SUSET, PGC_S_OVERRIDE);
SetConfigOption("client_min_messages", "ERROR", PGC_SUSET,
PGC_S_OVERRIDE);
@@ -759,11 +758,6 @@ BeginRedoForBlock(StringInfo input_message)
{
reln->smgr_cached_nblocks[forknum] = blknum + 1;
}
if (target_redo_tag.forkNum == MAIN_FORKNUM)
{
reln->smgr_cached_nblocks[FSM_FORKNUM] = MaxBlockNumber;
reln->smgr_cached_nblocks[VISIBILITYMAP_FORKNUM] = MaxBlockNumber;
}
}
/*
@@ -1059,9 +1053,6 @@ GetPage(StringInfo input_message)
DropRelationAllLocalBuffers(rinfo);
wal_redo_buffer = InvalidBuffer;
/* Remove relation from SMGR relastion cache */
AtEOXact_SMgr();
elog(TRACE, "Page sent back for block %u", blknum);
}

View File

@@ -24,7 +24,6 @@ hex.workspace = true
hyper0.workspace = true
humantime.workspace = true
itertools.workspace = true
json-structural-diff.workspace = true
lasso.workspace = true
once_cell.workspace = true
pageserver_api.workspace = true

View File

@@ -598,10 +598,7 @@ async fn handle_tenant_timeline_passthrough(
let _timer = latency.start_timer(labels.clone());
let client = mgmt_api::Client::new(
node.base_url(),
service.get_config().pageserver_jwt_token.as_deref(),
);
let client = mgmt_api::Client::new(node.base_url(), service.get_config().jwt_token.as_deref());
let resp = client.get_raw(path).await.map_err(|e|
// We return 503 here because if we can't successfully send a request to the pageserver,
// either we aren't available or the pageserver is unavailable.
@@ -1357,7 +1354,10 @@ async fn handle_safekeeper_scheduling_policy(
.set_safekeeper_scheduling_policy(id, body.scheduling_policy)
.await?;
json_response(StatusCode::OK, ())
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
.body(Body::empty())
.unwrap())
}
/// Common wrapper for request handlers that call into Service and will operate on tenants: they must only

View File

@@ -53,10 +53,6 @@ struct Cli {
#[arg(long)]
jwt_token: Option<String>,
/// Token for authenticating this service with the safekeepers it controls
#[arg(long)]
safekeeper_jwt_token: Option<String>,
/// Token for authenticating this service with the control plane, when calling
/// the compute notification endpoint
#[arg(long)]
@@ -157,8 +153,7 @@ impl Default for StrictMode {
struct Secrets {
database_url: String,
public_key: Option<JwtAuth>,
pageserver_jwt_token: Option<String>,
safekeeper_jwt_token: Option<String>,
jwt_token: Option<String>,
control_plane_jwt_token: Option<String>,
peer_jwt_token: Option<String>,
}
@@ -166,7 +161,6 @@ struct Secrets {
impl Secrets {
const DATABASE_URL_ENV: &'static str = "DATABASE_URL";
const PAGESERVER_JWT_TOKEN_ENV: &'static str = "PAGESERVER_JWT_TOKEN";
const SAFEKEEPER_JWT_TOKEN_ENV: &'static str = "SAFEKEEPER_JWT_TOKEN";
const CONTROL_PLANE_JWT_TOKEN_ENV: &'static str = "CONTROL_PLANE_JWT_TOKEN";
const PEER_JWT_TOKEN_ENV: &'static str = "PEER_JWT_TOKEN";
const PUBLIC_KEY_ENV: &'static str = "PUBLIC_KEY";
@@ -190,14 +184,7 @@ impl Secrets {
let this = Self {
database_url,
public_key,
pageserver_jwt_token: Self::load_secret(
&args.jwt_token,
Self::PAGESERVER_JWT_TOKEN_ENV,
),
safekeeper_jwt_token: Self::load_secret(
&args.safekeeper_jwt_token,
Self::SAFEKEEPER_JWT_TOKEN_ENV,
),
jwt_token: Self::load_secret(&args.jwt_token, Self::PAGESERVER_JWT_TOKEN_ENV),
control_plane_jwt_token: Self::load_secret(
&args.control_plane_jwt_token,
Self::CONTROL_PLANE_JWT_TOKEN_ENV,
@@ -277,17 +264,11 @@ async fn async_main() -> anyhow::Result<()> {
let secrets = Secrets::load(&args).await?;
// TODO: once we've rolled out the safekeeper JWT token everywhere, put it into the validation code below
tracing::info!(
"safekeeper_jwt_token set: {:?}",
secrets.safekeeper_jwt_token.is_some()
);
// Validate required secrets and arguments are provided in strict mode
match strict_mode {
StrictMode::Strict
if (secrets.public_key.is_none()
|| secrets.pageserver_jwt_token.is_none()
|| secrets.jwt_token.is_none()
|| secrets.control_plane_jwt_token.is_none()) =>
{
// Production systems should always have secrets configured: if public_key was not set
@@ -312,8 +293,7 @@ async fn async_main() -> anyhow::Result<()> {
}
let config = Config {
pageserver_jwt_token: secrets.pageserver_jwt_token,
safekeeper_jwt_token: secrets.safekeeper_jwt_token,
jwt_token: secrets.jwt_token,
control_plane_jwt_token: secrets.control_plane_jwt_token,
peer_jwt_token: secrets.peer_jwt_token,
compute_hook_url: args.compute_hook_url,

View File

@@ -1,7 +1,6 @@
use crate::pageserver_client::PageserverClient;
use crate::persistence::Persistence;
use crate::{compute_hook, service};
use json_structural_diff::JsonDiff;
use pageserver_api::controller_api::{AvailabilityZone, MigrationConfig, PlacementPolicy};
use pageserver_api::models::{
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig, TenantWaitLsnRequest,
@@ -25,7 +24,7 @@ use crate::compute_hook::{ComputeHook, NotifyError};
use crate::node::Node;
use crate::tenant_shard::{IntentState, ObservedState, ObservedStateDelta, ObservedStateLocation};
const DEFAULT_HEATMAP_PERIOD: Duration = Duration::from_secs(60);
const DEFAULT_HEATMAP_PERIOD: &str = "60s";
/// Object with the lifetime of the background reconcile task that is created
/// for tenants which have a difference between their intent and observed states.
@@ -297,7 +296,7 @@ impl Reconciler {
.location_config(tenant_shard_id, config.clone(), flush_ms, lazy)
.await
},
&self.service_config.pageserver_jwt_token,
&self.service_config.jwt_token,
1,
3,
timeout,
@@ -418,7 +417,7 @@ impl Reconciler {
let client = PageserverClient::new(
node.get_id(),
node.base_url(),
self.service_config.pageserver_jwt_token.as_deref(),
self.service_config.jwt_token.as_deref(),
);
client
@@ -441,7 +440,7 @@ impl Reconciler {
let client = PageserverClient::new(
node.get_id(),
node.base_url(),
self.service_config.pageserver_jwt_token.as_deref(),
self.service_config.jwt_token.as_deref(),
);
let timelines = client.timeline_list(&tenant_shard_id).await?;
@@ -479,7 +478,7 @@ impl Reconciler {
)
.await
},
&self.service_config.pageserver_jwt_token,
&self.service_config.jwt_token,
1,
3,
request_download_timeout * 2,
@@ -772,7 +771,7 @@ impl Reconciler {
let observed_conf = match attached_node
.with_client_retries(
|client| async move { client.get_location_config(tenant_shard_id).await },
&self.service_config.pageserver_jwt_token,
&self.service_config.jwt_token,
1,
1,
Duration::from_secs(5),
@@ -881,27 +880,7 @@ impl Reconciler {
self.generation = Some(generation);
wanted_conf.generation = generation.into();
}
let diff = match observed {
Some(ObservedStateLocation {
conf: Some(observed),
}) => {
let diff = JsonDiff::diff(
&serde_json::to_value(observed.clone()).unwrap(),
&serde_json::to_value(wanted_conf.clone()).unwrap(),
false,
);
if let Some(json_diff) = diff.diff {
serde_json::to_string(&json_diff).unwrap_or("diff err".to_string())
} else {
"unknown".to_string()
}
}
_ => "full".to_string(),
};
tracing::info!(node_id=%node.get_id(), "Observed configuration requires update: {diff}");
tracing::info!(node_id=%node.get_id(), "Observed configuration requires update.");
// Because `node` comes from a ref to &self, clone it before calling into a &mut self
// function: this could be avoided by refactoring the state mutated by location_config into
@@ -1120,7 +1099,7 @@ impl Reconciler {
match origin
.with_client_retries(
|client| async move { client.get_location_config(tenant_shard_id).await },
&self.service_config.pageserver_jwt_token,
&self.service_config.jwt_token,
1,
3,
Duration::from_secs(5),
@@ -1201,7 +1180,7 @@ fn ha_aware_config(config: &TenantConfig, has_secondaries: bool) -> TenantConfig
let mut config = config.clone();
if has_secondaries {
if config.heatmap_period.is_none() {
config.heatmap_period = Some(DEFAULT_HEATMAP_PERIOD);
config.heatmap_period = Some(DEFAULT_HEATMAP_PERIOD.to_string());
}
} else {
config.heatmap_period = None;

View File

@@ -348,12 +348,7 @@ pub struct Config {
// All pageservers managed by one instance of this service must have
// the same public key. This JWT token will be used to authenticate
// this service to the pageservers it manages.
pub pageserver_jwt_token: Option<String>,
// All safekeepers managed by one instance of this service must have
// the same public key. This JWT token will be used to authenticate
// this service to the safekeepers it manages.
pub safekeeper_jwt_token: Option<String>,
pub jwt_token: Option<String>,
// This JWT token will be used to authenticate this service to the control plane.
pub control_plane_jwt_token: Option<String>,
@@ -887,7 +882,7 @@ impl Service {
let response = node
.with_client_retries(
|client| async move { client.list_location_config().await },
&self.config.pageserver_jwt_token,
&self.config.jwt_token,
1,
5,
timeout,
@@ -988,7 +983,7 @@ impl Service {
let client = PageserverClient::new(
node.get_id(),
node.base_url(),
self.config.pageserver_jwt_token.as_deref(),
self.config.jwt_token.as_deref(),
);
match client
.location_config(
@@ -1558,14 +1553,14 @@ impl Service {
let reconcilers_cancel = cancel.child_token();
let heartbeater_ps = Heartbeater::new(
config.pageserver_jwt_token.clone(),
config.jwt_token.clone(),
config.max_offline_interval,
config.max_warming_up_interval,
cancel.clone(),
);
let heartbeater_sk = Heartbeater::new(
config.safekeeper_jwt_token.clone(),
config.jwt_token.clone(),
config.max_offline_interval,
config.max_warming_up_interval,
cancel.clone(),
@@ -1912,7 +1907,7 @@ impl Service {
let configs = match node
.with_client_retries(
|client| async move { client.list_location_config().await },
&self.config.pageserver_jwt_token,
&self.config.jwt_token,
1,
5,
SHORT_RECONCILE_TIMEOUT,
@@ -1970,7 +1965,7 @@ impl Service {
.location_config(tenant_shard_id, config, None, false)
.await
},
&self.config.pageserver_jwt_token,
&self.config.jwt_token,
1,
5,
SHORT_RECONCILE_TIMEOUT,
@@ -2926,9 +2921,7 @@ impl Service {
first
};
let updated_config = base
.apply_patch(patch)
.map_err(|err| ApiError::BadRequest(anyhow::anyhow!(err)))?;
let updated_config = base.apply_patch(patch);
self.set_tenant_config_and_reconcile(tenant_id, updated_config)
.await
}
@@ -3107,7 +3100,7 @@ impl Service {
let client = PageserverClient::new(
node.get_id(),
node.base_url(),
self.config.pageserver_jwt_token.as_deref(),
self.config.jwt_token.as_deref(),
);
tracing::info!("Doing time travel recovery for shard {tenant_shard_id}",);
@@ -3168,7 +3161,7 @@ impl Service {
let client = PageserverClient::new(
node.get_id(),
node.base_url(),
self.config.pageserver_jwt_token.as_deref(),
self.config.jwt_token.as_deref(),
);
futs.push(async move {
let result = client
@@ -3291,7 +3284,7 @@ impl Service {
.tenant_delete(TenantShardId::unsharded(tenant_id))
.await
},
&self.config.pageserver_jwt_token,
&self.config.jwt_token,
1,
3,
RECONCILE_TIMEOUT,
@@ -3510,7 +3503,7 @@ impl Service {
let timeline_info = create_one(
shard_zero_tid,
shard_zero_locations,
self.config.pageserver_jwt_token.clone(),
self.config.jwt_token.clone(),
create_req.clone(),
)
.await?;
@@ -3526,7 +3519,7 @@ impl Service {
// Create timeline on remaining shards with number >0
if !targets.0.is_empty() {
// If we had multiple shards, issue requests for the remainder now.
let jwt = &self.config.pageserver_jwt_token;
let jwt = &self.config.jwt_token;
self.tenant_for_shards(
targets
.0
@@ -3609,7 +3602,7 @@ impl Service {
tenant_shard_id,
timeline_id,
node,
self.config.pageserver_jwt_token.clone(),
self.config.jwt_token.clone(),
req.clone(),
))
})
@@ -3690,7 +3683,7 @@ impl Service {
tenant_shard_id,
timeline_id,
node,
self.config.pageserver_jwt_token.clone(),
self.config.jwt_token.clone(),
))
})
.await?;
@@ -3764,7 +3757,7 @@ impl Service {
tenant_shard_id,
timeline_id,
node,
self.config.pageserver_jwt_token.clone(),
self.config.jwt_token.clone(),
dir,
))
})
@@ -3879,7 +3872,7 @@ impl Service {
futs.push(async move {
node.with_client_retries(
|client| op(tenant_shard_id, client),
&self.config.pageserver_jwt_token,
&self.config.jwt_token,
warn_threshold,
max_retries,
timeout,
@@ -4128,7 +4121,7 @@ impl Service {
tenant_shard_id,
timeline_id,
node,
self.config.pageserver_jwt_token.clone(),
self.config.jwt_token.clone(),
))
})
.await?;
@@ -4150,7 +4143,7 @@ impl Service {
shard_zero_tid,
timeline_id,
shard_zero_locations.latest.node,
self.config.pageserver_jwt_token.clone(),
self.config.jwt_token.clone(),
)
.await?;
Ok(shard_zero_status)
@@ -4549,7 +4542,7 @@ impl Service {
client.location_config(child_id, config, None, false).await
},
&self.config.pageserver_jwt_token,
&self.config.jwt_token,
1,
10,
Duration::from_secs(5),
@@ -5149,7 +5142,7 @@ impl Service {
let client = PageserverClient::new(
node.get_id(),
node.base_url(),
self.config.pageserver_jwt_token.as_deref(),
self.config.jwt_token.as_deref(),
);
let response = client
.tenant_shard_split(
@@ -5475,7 +5468,7 @@ impl Service {
let client = PageserverClient::new(
node.get_id(),
node.base_url(),
self.config.pageserver_jwt_token.as_deref(),
self.config.jwt_token.as_deref(),
);
let scan_result = client
@@ -6656,12 +6649,11 @@ impl Service {
) -> Option<ReconcilerWaiter> {
let reconcile_needed = shard.get_reconcile_needed(nodes);
let reconcile_reason = match reconcile_needed {
match reconcile_needed {
ReconcileNeeded::No => return None,
ReconcileNeeded::WaitExisting(waiter) => return Some(waiter),
ReconcileNeeded::Yes(reason) => {
ReconcileNeeded::Yes => {
// Fall through to try and acquire units for spawning reconciler
reason
}
};
@@ -6700,7 +6692,6 @@ impl Service {
};
shard.spawn_reconciler(
reconcile_reason,
&self.result_tx,
nodes,
&self.compute_hook,
@@ -6825,7 +6816,7 @@ impl Service {
// with the frequency of background calls, this acts as an implicit rate limit that runs a small
// trickle of optimizations in the background, rather than executing a large number in parallel
// when a change occurs.
const MAX_OPTIMIZATIONS_EXEC_PER_PASS: usize = 16;
const MAX_OPTIMIZATIONS_EXEC_PER_PASS: usize = 2;
// Synchronous prepare: scan shards for possible scheduling optimizations
let candidate_work = self.optimize_all_plan();
@@ -6876,7 +6867,7 @@ impl Service {
// How many candidate optimizations we will generate, before evaluating them for readniess: setting
// this higher than the execution limit gives us a chance to execute some work even if the first
// few optimizations we find are not ready.
const MAX_OPTIMIZATIONS_PLAN_PER_PASS: usize = 64;
const MAX_OPTIMIZATIONS_PLAN_PER_PASS: usize = 8;
let mut work = Vec::new();
let mut locked = self.inner.write().unwrap();
@@ -7103,7 +7094,7 @@ impl Service {
match attached_node
.with_client_retries(
|client| async move { client.tenant_heatmap_upload(tenant_shard_id).await },
&self.config.pageserver_jwt_token,
&self.config.jwt_token,
3,
10,
SHORT_RECONCILE_TIMEOUT,
@@ -7139,7 +7130,7 @@ impl Service {
)
.await
},
&self.config.pageserver_jwt_token,
&self.config.jwt_token,
3,
10,
SHORT_RECONCILE_TIMEOUT,
@@ -7194,7 +7185,7 @@ impl Service {
let request = request_ref.clone();
client.top_tenant_shards(request.clone()).await
},
&self.config.pageserver_jwt_token,
&self.config.jwt_token,
3,
3,
Duration::from_secs(5),
@@ -7367,7 +7358,7 @@ impl Service {
match node
.with_client_retries(
|client| async move { client.tenant_secondary_status(tenant_shard_id).await },
&self.config.pageserver_jwt_token,
&self.config.jwt_token,
1,
3,
Duration::from_millis(250),

View File

@@ -481,14 +481,7 @@ pub(crate) enum ReconcileNeeded {
/// spawned: wait for the existing reconciler rather than spawning a new one.
WaitExisting(ReconcilerWaiter),
/// shard needs reconciliation: call into [`TenantShard::spawn_reconciler`]
Yes(ReconcileReason),
}
#[derive(Debug)]
pub(crate) enum ReconcileReason {
ActiveNodesDirty,
UnknownLocation,
PendingComputeNotification,
Yes,
}
/// Pending modification to the observed state of a tenant shard.
@@ -1348,18 +1341,12 @@ impl TenantShard {
let active_nodes_dirty = self.dirty(pageservers);
let reconcile_needed = match (
active_nodes_dirty,
dirty_observed,
self.pending_compute_notification,
) {
(true, _, _) => ReconcileNeeded::Yes(ReconcileReason::ActiveNodesDirty),
(_, true, _) => ReconcileNeeded::Yes(ReconcileReason::UnknownLocation),
(_, _, true) => ReconcileNeeded::Yes(ReconcileReason::PendingComputeNotification),
_ => ReconcileNeeded::No,
};
// Even if there is no pageserver work to be done, if we have a pending notification to computes,
// wake up a reconciler to send it.
let do_reconcile =
active_nodes_dirty || dirty_observed || self.pending_compute_notification;
if matches!(reconcile_needed, ReconcileNeeded::No) {
if !do_reconcile {
tracing::debug!("Not dirty, no reconciliation needed.");
return ReconcileNeeded::No;
}
@@ -1402,7 +1389,7 @@ impl TenantShard {
}
}
reconcile_needed
ReconcileNeeded::Yes
}
/// Ensure the sequence number is set to a value where waiting for this value will make us wait
@@ -1492,7 +1479,6 @@ impl TenantShard {
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
pub(crate) fn spawn_reconciler(
&mut self,
reason: ReconcileReason,
result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResultRequest>,
pageservers: &Arc<HashMap<NodeId, Node>>,
compute_hook: &Arc<ComputeHook>,
@@ -1552,7 +1538,7 @@ impl TenantShard {
let reconcile_seq = self.sequence;
let long_reconcile_threshold = service_config.long_reconcile_threshold;
tracing::info!(seq=%reconcile_seq, "Spawning Reconciler ({reason:?})");
tracing::info!(seq=%reconcile_seq, "Spawning Reconciler for sequence {}", self.sequence);
let must_notify = self.pending_compute_notification;
let reconciler_span = tracing::info_span!(parent: None, "reconciler", seq=%reconcile_seq,
tenant_id=%reconciler.tenant_shard_id.tenant_id,

View File

@@ -1,101 +0,0 @@
from __future__ import annotations
import time
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
from fixtures.utils import USE_LFC
@pytest.mark.timeout(600)
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
def test_lfc_prefetch(neon_simple_env: NeonEnv):
"""
Test resizing the Local File Cache
"""
env = neon_simple_env
endpoint = env.endpoints.create_start(
"main",
config_lines=[
"neon.max_file_cache_size=1GB",
"neon.file_cache_size_limit=1GB",
"effective_io_concurrency=100",
"shared_buffers=1MB",
"enable_bitmapscan=off",
"enable_seqscan=off",
"autovacuum=off",
],
)
conn = endpoint.connect()
cur = conn.cursor()
cur.execute("create extension neon")
cur.execute("create table t(pk integer, sk integer, filler text default repeat('x',200))")
cur.execute("set statement_timeout=0")
cur.execute("select setseed(0.5)")
cur.execute("insert into t values (generate_series(1,1000000),random()*1000000)")
cur.execute("create index on t(sk)")
cur.execute("vacuum t")
# reset LFC
cur.execute("alter system set neon.file_cache_size_limit=0")
cur.execute("select pg_reload_conf()")
time.sleep(1)
cur.execute("alter system set neon.file_cache_size_limit='1GB'")
cur.execute("select pg_reload_conf()")
cur.execute(
"explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 100000 and 200000 limit 100) s1"
)
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
log.info(f"Unused prefetches: {prefetch_expired}")
cur.execute(
"explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 200000 and 300000 limit 100) s2"
)
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
log.info(f"Unused prefetches: {prefetch_expired}")
cur.execute(
"explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 300000 and 400000 limit 100) s3"
)
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
log.info(f"Unused prefetches: {prefetch_expired}")
cur.execute(
"explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 100000 and 200000 limit 100) s4"
)
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
log.info(f"Unused prefetches: {prefetch_expired}")
# if prefetch requests are not stored in LFC, we continue to sent unused prefetch request tyo PS
assert prefetch_expired > 0
cur.execute("set neon.store_prefetch_result_in_lfc=on")
cur.execute(
"explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 500000 and 600000 limit 100) s5"
)
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
log.info(f"Unused prefetches: {prefetch_expired}")
cur.execute(
"explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 600000 and 700000 limit 100) s6"
)
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
log.info(f"Unused prefetches: {prefetch_expired}")
cur.execute(
"explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 700000 and 800000 limit 100) s7"
)
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
log.info(f"Unused prefetches: {prefetch_expired}")
cur.execute(
"explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 500000 and 600000 limit 100) s8"
)
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
log.info(f"Unused prefetches: {prefetch_expired}")
# No redundant prefethc requrests if prefetch results are stored in LFC
assert prefetch_expired == 0

View File

@@ -3817,43 +3817,3 @@ def test_update_node_on_registration(neon_env_builder: NeonEnvBuilder):
nodes = env.storage_controller.node_list()
assert len(nodes) == 1
assert nodes[0]["listen_https_port"] is None
def test_storage_controller_location_conf_equivalence(neon_env_builder: NeonEnvBuilder):
"""
Validate that a storage controller restart with no shards in a transient state
performs zero reconciliations at start-up. Implicitly, this means that the location
configs returned by the pageserver are identical to the persisted state in the
storage controller database.
"""
neon_env_builder.num_pageservers = 1
neon_env_builder.storage_controller_config = {
"start_as_candidate": False,
}
env = neon_env_builder.init_configs()
env.start()
tenant_id = TenantId.generate()
env.storage_controller.tenant_create(
tenant_id, shard_count=2, tenant_config={"pitr_interval": "1h2m3s"}
)
env.storage_controller.reconcile_until_idle()
reconciles_before_restart = env.storage_controller.get_metric_value(
"storage_controller_reconcile_complete_total", filter={"status": "ok"}
)
assert reconciles_before_restart != 0
env.storage_controller.stop()
env.storage_controller.start()
env.storage_controller.reconcile_until_idle()
reconciles_after_restart = env.storage_controller.get_metric_value(
"storage_controller_reconcile_complete_total", filter={"status": "ok"}
)
assert reconciles_after_restart == 0