mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-17 13:10:38 +00:00
Compare commits
6 Commits
wal_redo_i
...
bodobolero
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bff29cd4f8 | ||
|
|
d5ac582610 | ||
|
|
9a86cd0d4f | ||
|
|
8d68048e73 | ||
|
|
1a412740a5 | ||
|
|
860aa0157d |
18
Cargo.lock
generated
18
Cargo.lock
generated
@@ -1874,12 +1874,6 @@ dependencies = [
|
||||
"syn 2.0.90",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "difflib"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8"
|
||||
|
||||
[[package]]
|
||||
name = "digest"
|
||||
version = "0.10.7"
|
||||
@@ -3337,17 +3331,6 @@ dependencies = [
|
||||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "json-structural-diff"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e878e36a8a44c158505c2c818abdc1350413ad83dcb774a0459f6a7ef2b65cbf"
|
||||
dependencies = [
|
||||
"difflib",
|
||||
"regex",
|
||||
"serde_json",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "jsonwebtoken"
|
||||
version = "9.2.0"
|
||||
@@ -6460,7 +6443,6 @@ dependencies = [
|
||||
"humantime",
|
||||
"hyper 0.14.30",
|
||||
"itertools 0.10.5",
|
||||
"json-structural-diff",
|
||||
"lasso",
|
||||
"measured",
|
||||
"metrics",
|
||||
|
||||
@@ -210,7 +210,6 @@ rustls-native-certs = "0.8"
|
||||
x509-parser = "0.16"
|
||||
whoami = "1.5.1"
|
||||
zerocopy = { version = "0.7", features = ["derive"] }
|
||||
json-structural-diff = { version = "0.2.0" }
|
||||
|
||||
## TODO replace this with tracing
|
||||
env_logger = "0.10"
|
||||
|
||||
@@ -1493,7 +1493,7 @@ FROM pg-build AS pg_duckdb-build
|
||||
ARG PG_VERSION
|
||||
COPY --from=pg_duckdb-src /ext-src/ /ext-src/
|
||||
WORKDIR /ext-src/pg_duckdb-src
|
||||
RUN make install -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
RUN make DUCKDB_STATIC=1 install -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_duckdb.control
|
||||
|
||||
#########################################################################################
|
||||
@@ -1676,11 +1676,7 @@ COPY --from=pg_anon-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_ivm-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_partman-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_mooncake-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
# Disabled temporarily, because it clashed with pg_mooncake. pg_mooncake
|
||||
# also depends on libduckdb, but a different version.
|
||||
#COPY --from=pg_duckdb-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
COPY --from=pg_duckdb-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_repack-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pgaudit-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pgauditlogtofile-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
@@ -1,3 +1,47 @@
|
||||
diff --git a/Makefile b/Makefile
|
||||
index 3235cc8..e232052 100644
|
||||
--- a/Makefile
|
||||
+++ b/Makefile
|
||||
@@ -32,7 +32,16 @@ else
|
||||
DUCKDB_BUILD_TYPE = release
|
||||
endif
|
||||
|
||||
-DUCKDB_LIB = libduckdb$(DLSUFFIX)
|
||||
+DUCKDB_STATIC ?= 0
|
||||
+PG_DUCKDB_LINK_FLAGS = -Wl,-rpath,$(PG_LIB)/ -lpq -Lthird_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/src -Lthird_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/extension/json -Lthird_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/extension/parquet -Lthird_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/extension/core_functions -Lthird_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/extension/jemalloc -Lthird_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/extension/icu -Lthird_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/extension/cached_httpfs -Lthird_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/third_party/yyjson -Lthird_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/third_party/miniz -Lthird_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/third_party/zstd -Lthird_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/third_party/skiplist -Lthird_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/third_party/mbedtls -Lthird_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/third_party/utf8proc -Lthird_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/third_party/hyperloglog -Lthird_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/third_party/fastpforlib -Lthird_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/third_party/re2 -Lthird_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/third_party/libpg_query -Lthird_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/third_party/fsst -Lthird_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/third_party/fmt -L$(PG_LIB) -lstdc++ -llz4
|
||||
+ifeq ($(DUCKDB_STATIC), 1)
|
||||
+ DUCKDB_LIB = libduckdb_static.a
|
||||
+ PG_DUCKDB_LINK_FLAGS += -l:$(DUCKDB_LIB) -l:libjson_extension.a -l:libparquet_extension.a -l:libcore_functions_extension.a -l:libjemalloc_extension.a -l:libicu_extension.a -l:libcached_httpfs_extension.a -l:libduckdb_yyjson.a -l:libduckdb_miniz.a -l:libduckdb_zstd.a -l:libduckdb_skiplistlib.a -l:libduckdb_mbedtls.a -l:libduckdb_utf8proc.a -l:libduckdb_hyperloglog.a -l:libduckdb_fastpforlib.a -l:libduckdb_re2.a -l:libduckdb_pg_query.a -l:libduckdb_fsst.a -l:libduckdb_fmt.a
|
||||
+else
|
||||
+ DUCKDB_LIB = libduckdb$(DLSUFFIX)
|
||||
+ PG_DUCKDB_LINK_FLAGS += -lduckdb
|
||||
+endif
|
||||
+
|
||||
FULL_DUCKDB_LIB = third_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/src/$(DUCKDB_LIB)
|
||||
|
||||
ERROR_ON_WARNING ?=
|
||||
@@ -54,7 +63,7 @@ override PG_CXXFLAGS += -std=c++17 ${DUCKDB_BUILD_CXX_FLAGS} ${COMPILER_FLAGS} -
|
||||
# changes to the vendored code in one place.
|
||||
override PG_CFLAGS += -Wno-declaration-after-statement
|
||||
|
||||
-SHLIB_LINK += -Wl,-rpath,$(PG_LIB)/ -lpq -Lthird_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/src -L$(PG_LIB) -lduckdb -lstdc++ -llz4
|
||||
+SHLIB_LINK += $(PG_DUCKDB_LINK_FLAGS)
|
||||
|
||||
include Makefile.global
|
||||
|
||||
diff --git a/docs/compilation.md b/docs/compilation.md
|
||||
index e25396f..b94a63a 100644
|
||||
--- a/docs/compilation.md
|
||||
+++ b/docs/compilation.md
|
||||
@@ -14,6 +14,8 @@ To build and install, run:
|
||||
make install
|
||||
```
|
||||
|
||||
+If you want to link `libduckdb` statically, set `DUCKDB_STATIC=1` when interacting with `make(1)`.
|
||||
+
|
||||
Add `pg_duckdb` to the `shared_preload_libraries` in your `postgresql.conf` file:
|
||||
|
||||
```ini
|
||||
diff --git a/sql/pg_duckdb--0.2.0--0.3.0.sql b/sql/pg_duckdb--0.2.0--0.3.0.sql
|
||||
index d777d76..af60106 100644
|
||||
--- a/sql/pg_duckdb--0.2.0--0.3.0.sql
|
||||
|
||||
@@ -335,21 +335,13 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'checkpoint_distance' as an integer")?,
|
||||
checkpoint_timeout: settings
|
||||
.remove("checkpoint_timeout")
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'checkpoint_timeout' as duration")?,
|
||||
checkpoint_timeout: settings.remove("checkpoint_timeout").map(|x| x.to_string()),
|
||||
compaction_target_size: settings
|
||||
.remove("compaction_target_size")
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'compaction_target_size' as an integer")?,
|
||||
compaction_period: settings
|
||||
.remove("compaction_period")
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'compaction_period' as duration")?,
|
||||
compaction_period: settings.remove("compaction_period").map(|x| x.to_string()),
|
||||
compaction_threshold: settings
|
||||
.remove("compaction_threshold")
|
||||
.map(|x| x.parse::<usize>())
|
||||
@@ -395,10 +387,7 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'gc_horizon' as an integer")?,
|
||||
gc_period: settings.remove("gc_period")
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'gc_period' as duration")?,
|
||||
gc_period: settings.remove("gc_period").map(|x| x.to_string()),
|
||||
image_creation_threshold: settings
|
||||
.remove("image_creation_threshold")
|
||||
.map(|x| x.parse::<usize>())
|
||||
@@ -414,20 +403,13 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<usize>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'image_creation_preempt_threshold' as integer")?,
|
||||
pitr_interval: settings.remove("pitr_interval")
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'pitr_interval' as duration")?,
|
||||
pitr_interval: settings.remove("pitr_interval").map(|x| x.to_string()),
|
||||
walreceiver_connect_timeout: settings
|
||||
.remove("walreceiver_connect_timeout")
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'walreceiver_connect_timeout' as duration")?,
|
||||
.map(|x| x.to_string()),
|
||||
lagging_wal_timeout: settings
|
||||
.remove("lagging_wal_timeout")
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'lagging_wal_timeout' as duration")?,
|
||||
.map(|x| x.to_string()),
|
||||
max_lsn_wal_lag: settings
|
||||
.remove("max_lsn_wal_lag")
|
||||
.map(|x| x.parse::<NonZeroU64>())
|
||||
@@ -445,14 +427,8 @@ impl PageServerNode {
|
||||
.context("Failed to parse 'min_resident_size_override' as integer")?,
|
||||
evictions_low_residence_duration_metric_threshold: settings
|
||||
.remove("evictions_low_residence_duration_metric_threshold")
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'evictions_low_residence_duration_metric_threshold' as duration")?,
|
||||
heatmap_period: settings
|
||||
.remove("heatmap_period")
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'heatmap_period' as duration")?,
|
||||
.map(|x| x.to_string()),
|
||||
heatmap_period: settings.remove("heatmap_period").map(|x| x.to_string()),
|
||||
lazy_slru_download: settings
|
||||
.remove("lazy_slru_download")
|
||||
.map(|x| x.parse::<bool>())
|
||||
@@ -463,15 +439,10 @@ impl PageServerNode {
|
||||
.map(serde_json::from_str)
|
||||
.transpose()
|
||||
.context("parse `timeline_get_throttle` from json")?,
|
||||
lsn_lease_length: settings.remove("lsn_lease_length")
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'lsn_lease_length' as duration")?,
|
||||
lsn_lease_length: settings.remove("lsn_lease_length").map(|x| x.to_string()),
|
||||
lsn_lease_length_for_ts: settings
|
||||
.remove("lsn_lease_length_for_ts")
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'lsn_lease_length_for_ts' as duration")?,
|
||||
.map(|x| x.to_string()),
|
||||
timeline_offloading: settings
|
||||
.remove("timeline_offloading")
|
||||
.map(|x| x.parse::<bool>())
|
||||
|
||||
@@ -959,7 +959,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
threshold: threshold.into(),
|
||||
},
|
||||
)),
|
||||
heatmap_period: Some(Duration::from_secs(300)),
|
||||
heatmap_period: Some("300s".to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
})
|
||||
|
||||
@@ -526,13 +526,9 @@ pub struct TenantConfigPatch {
|
||||
#[derive(Serialize, Deserialize, Debug, Default, Clone, Eq, PartialEq)]
|
||||
pub struct TenantConfig {
|
||||
pub checkpoint_distance: Option<u64>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub checkpoint_timeout: Option<Duration>,
|
||||
pub checkpoint_timeout: Option<String>,
|
||||
pub compaction_target_size: Option<u64>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub compaction_period: Option<Duration>,
|
||||
pub compaction_period: Option<String>,
|
||||
pub compaction_threshold: Option<usize>,
|
||||
pub compaction_upper_limit: Option<usize>,
|
||||
// defer parsing compaction_algorithm, like eviction_policy
|
||||
@@ -543,38 +539,22 @@ pub struct TenantConfig {
|
||||
pub l0_flush_stall_threshold: Option<usize>,
|
||||
pub l0_flush_wait_upload: Option<bool>,
|
||||
pub gc_horizon: Option<u64>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub gc_period: Option<Duration>,
|
||||
pub gc_period: Option<String>,
|
||||
pub image_creation_threshold: Option<usize>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub pitr_interval: Option<Duration>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub walreceiver_connect_timeout: Option<Duration>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub lagging_wal_timeout: Option<Duration>,
|
||||
pub pitr_interval: Option<String>,
|
||||
pub walreceiver_connect_timeout: Option<String>,
|
||||
pub lagging_wal_timeout: Option<String>,
|
||||
pub max_lsn_wal_lag: Option<NonZeroU64>,
|
||||
pub eviction_policy: Option<EvictionPolicy>,
|
||||
pub min_resident_size_override: Option<u64>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub evictions_low_residence_duration_metric_threshold: Option<Duration>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub heatmap_period: Option<Duration>,
|
||||
pub evictions_low_residence_duration_metric_threshold: Option<String>,
|
||||
pub heatmap_period: Option<String>,
|
||||
pub lazy_slru_download: Option<bool>,
|
||||
pub timeline_get_throttle: Option<ThrottleConfig>,
|
||||
pub image_layer_creation_check_threshold: Option<u8>,
|
||||
pub image_creation_preempt_threshold: Option<usize>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub lsn_lease_length: Option<Duration>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub lsn_lease_length_for_ts: Option<Duration>,
|
||||
pub lsn_lease_length: Option<String>,
|
||||
pub lsn_lease_length_for_ts: Option<String>,
|
||||
pub timeline_offloading: Option<bool>,
|
||||
pub wal_receiver_protocol_override: Option<PostgresClientProtocol>,
|
||||
pub rel_size_v2_enabled: Option<bool>,
|
||||
@@ -584,10 +564,7 @@ pub struct TenantConfig {
|
||||
}
|
||||
|
||||
impl TenantConfig {
|
||||
pub fn apply_patch(
|
||||
self,
|
||||
patch: TenantConfigPatch,
|
||||
) -> Result<TenantConfig, humantime::DurationError> {
|
||||
pub fn apply_patch(self, patch: TenantConfigPatch) -> TenantConfig {
|
||||
let Self {
|
||||
mut checkpoint_distance,
|
||||
mut checkpoint_timeout,
|
||||
@@ -627,17 +604,11 @@ impl TenantConfig {
|
||||
} = self;
|
||||
|
||||
patch.checkpoint_distance.apply(&mut checkpoint_distance);
|
||||
patch
|
||||
.checkpoint_timeout
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut checkpoint_timeout);
|
||||
patch.checkpoint_timeout.apply(&mut checkpoint_timeout);
|
||||
patch
|
||||
.compaction_target_size
|
||||
.apply(&mut compaction_target_size);
|
||||
patch
|
||||
.compaction_period
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut compaction_period);
|
||||
patch.compaction_period.apply(&mut compaction_period);
|
||||
patch.compaction_threshold.apply(&mut compaction_threshold);
|
||||
patch
|
||||
.compaction_upper_limit
|
||||
@@ -655,25 +626,15 @@ impl TenantConfig {
|
||||
.apply(&mut l0_flush_stall_threshold);
|
||||
patch.l0_flush_wait_upload.apply(&mut l0_flush_wait_upload);
|
||||
patch.gc_horizon.apply(&mut gc_horizon);
|
||||
patch
|
||||
.gc_period
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut gc_period);
|
||||
patch.gc_period.apply(&mut gc_period);
|
||||
patch
|
||||
.image_creation_threshold
|
||||
.apply(&mut image_creation_threshold);
|
||||
patch
|
||||
.pitr_interval
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut pitr_interval);
|
||||
patch.pitr_interval.apply(&mut pitr_interval);
|
||||
patch
|
||||
.walreceiver_connect_timeout
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut walreceiver_connect_timeout);
|
||||
patch
|
||||
.lagging_wal_timeout
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut lagging_wal_timeout);
|
||||
patch.lagging_wal_timeout.apply(&mut lagging_wal_timeout);
|
||||
patch.max_lsn_wal_lag.apply(&mut max_lsn_wal_lag);
|
||||
patch.eviction_policy.apply(&mut eviction_policy);
|
||||
patch
|
||||
@@ -681,12 +642,8 @@ impl TenantConfig {
|
||||
.apply(&mut min_resident_size_override);
|
||||
patch
|
||||
.evictions_low_residence_duration_metric_threshold
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut evictions_low_residence_duration_metric_threshold);
|
||||
patch
|
||||
.heatmap_period
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut heatmap_period);
|
||||
patch.heatmap_period.apply(&mut heatmap_period);
|
||||
patch.lazy_slru_download.apply(&mut lazy_slru_download);
|
||||
patch
|
||||
.timeline_get_throttle
|
||||
@@ -697,13 +654,9 @@ impl TenantConfig {
|
||||
patch
|
||||
.image_creation_preempt_threshold
|
||||
.apply(&mut image_creation_preempt_threshold);
|
||||
patch
|
||||
.lsn_lease_length
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut lsn_lease_length);
|
||||
patch.lsn_lease_length.apply(&mut lsn_lease_length);
|
||||
patch
|
||||
.lsn_lease_length_for_ts
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut lsn_lease_length_for_ts);
|
||||
patch.timeline_offloading.apply(&mut timeline_offloading);
|
||||
patch
|
||||
@@ -720,7 +673,7 @@ impl TenantConfig {
|
||||
.gc_compaction_ratio_percent
|
||||
.apply(&mut gc_compaction_ratio_percent);
|
||||
|
||||
Ok(Self {
|
||||
Self {
|
||||
checkpoint_distance,
|
||||
checkpoint_timeout,
|
||||
compaction_target_size,
|
||||
@@ -756,7 +709,7 @@ impl TenantConfig {
|
||||
gc_compaction_enabled,
|
||||
gc_compaction_initial_threshold_kb,
|
||||
gc_compaction_ratio_percent,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2550,7 +2503,7 @@ mod tests {
|
||||
..base.clone()
|
||||
};
|
||||
|
||||
let patched = base.apply_patch(decoded.config).unwrap();
|
||||
let patched = base.apply_patch(decoded.config);
|
||||
|
||||
assert_eq!(patched, expected);
|
||||
}
|
||||
|
||||
@@ -693,15 +693,16 @@ impl TryFrom<&'_ models::TenantConfig> for TenantConfOpt {
|
||||
/// This is a conversion from our internal tenant config object to the one used
|
||||
/// in external APIs.
|
||||
impl From<TenantConfOpt> for models::TenantConfig {
|
||||
// TODO(vlad): These are now the same, but they have different serialization logic.
|
||||
// Can we merge them?
|
||||
fn from(value: TenantConfOpt) -> Self {
|
||||
fn humantime(d: Duration) -> String {
|
||||
format!("{}s", d.as_secs())
|
||||
}
|
||||
Self {
|
||||
checkpoint_distance: value.checkpoint_distance,
|
||||
checkpoint_timeout: value.checkpoint_timeout,
|
||||
checkpoint_timeout: value.checkpoint_timeout.map(humantime),
|
||||
compaction_algorithm: value.compaction_algorithm,
|
||||
compaction_target_size: value.compaction_target_size,
|
||||
compaction_period: value.compaction_period,
|
||||
compaction_period: value.compaction_period.map(humantime),
|
||||
compaction_threshold: value.compaction_threshold,
|
||||
compaction_upper_limit: value.compaction_upper_limit,
|
||||
compaction_l0_first: value.compaction_l0_first,
|
||||
@@ -710,23 +711,24 @@ impl From<TenantConfOpt> for models::TenantConfig {
|
||||
l0_flush_stall_threshold: value.l0_flush_stall_threshold,
|
||||
l0_flush_wait_upload: value.l0_flush_wait_upload,
|
||||
gc_horizon: value.gc_horizon,
|
||||
gc_period: value.gc_period,
|
||||
gc_period: value.gc_period.map(humantime),
|
||||
image_creation_threshold: value.image_creation_threshold,
|
||||
pitr_interval: value.pitr_interval,
|
||||
walreceiver_connect_timeout: value.walreceiver_connect_timeout,
|
||||
lagging_wal_timeout: value.lagging_wal_timeout,
|
||||
pitr_interval: value.pitr_interval.map(humantime),
|
||||
walreceiver_connect_timeout: value.walreceiver_connect_timeout.map(humantime),
|
||||
lagging_wal_timeout: value.lagging_wal_timeout.map(humantime),
|
||||
max_lsn_wal_lag: value.max_lsn_wal_lag,
|
||||
eviction_policy: value.eviction_policy,
|
||||
min_resident_size_override: value.min_resident_size_override,
|
||||
evictions_low_residence_duration_metric_threshold: value
|
||||
.evictions_low_residence_duration_metric_threshold,
|
||||
heatmap_period: value.heatmap_period,
|
||||
.evictions_low_residence_duration_metric_threshold
|
||||
.map(humantime),
|
||||
heatmap_period: value.heatmap_period.map(humantime),
|
||||
lazy_slru_download: value.lazy_slru_download,
|
||||
timeline_get_throttle: value.timeline_get_throttle,
|
||||
image_layer_creation_check_threshold: value.image_layer_creation_check_threshold,
|
||||
image_creation_preempt_threshold: value.image_creation_preempt_threshold,
|
||||
lsn_lease_length: value.lsn_lease_length,
|
||||
lsn_lease_length_for_ts: value.lsn_lease_length_for_ts,
|
||||
lsn_lease_length: value.lsn_lease_length.map(humantime),
|
||||
lsn_lease_length_for_ts: value.lsn_lease_length_for_ts.map(humantime),
|
||||
timeline_offloading: value.timeline_offloading,
|
||||
wal_receiver_protocol_override: value.wal_receiver_protocol_override,
|
||||
rel_size_v2_enabled: value.rel_size_v2_enabled,
|
||||
@@ -758,10 +760,29 @@ mod tests {
|
||||
assert_eq!(small_conf, serde_json::from_str(&json_form).unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_try_from_models_tenant_config_err() {
|
||||
let tenant_config = models::TenantConfig {
|
||||
lagging_wal_timeout: Some("5a".to_string()),
|
||||
..TenantConfig::default()
|
||||
};
|
||||
|
||||
let tenant_conf_opt = TenantConfOpt::try_from(&tenant_config);
|
||||
|
||||
assert!(
|
||||
tenant_conf_opt.is_err(),
|
||||
"Suceeded to convert TenantConfig to TenantConfOpt"
|
||||
);
|
||||
|
||||
let expected_error_str =
|
||||
"lagging_wal_timeout: invalid value: string \"5a\", expected a duration";
|
||||
assert_eq!(tenant_conf_opt.unwrap_err().to_string(), expected_error_str);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_try_from_models_tenant_config_success() {
|
||||
let tenant_config = models::TenantConfig {
|
||||
lagging_wal_timeout: Some(Duration::from_secs(5)),
|
||||
lagging_wal_timeout: Some("5s".to_string()),
|
||||
..TenantConfig::default()
|
||||
};
|
||||
|
||||
|
||||
@@ -136,9 +136,7 @@ impl WalRedoProcess {
|
||||
Ok(0) => break Ok(()), // eof
|
||||
Ok(num_bytes) => {
|
||||
let output = String::from_utf8_lossy(&buf[..num_bytes]);
|
||||
if !output.contains("LOG:") {
|
||||
error!(%output, "received output");
|
||||
}
|
||||
error!(%output, "received output");
|
||||
}
|
||||
Err(e) => {
|
||||
break Err(e);
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -56,7 +56,6 @@ uint32 WAIT_EVENT_NEON_LFC_MAINTENANCE;
|
||||
uint32 WAIT_EVENT_NEON_LFC_READ;
|
||||
uint32 WAIT_EVENT_NEON_LFC_TRUNCATE;
|
||||
uint32 WAIT_EVENT_NEON_LFC_WRITE;
|
||||
uint32 WAIT_EVENT_NEON_LFC_CV_WAIT;
|
||||
uint32 WAIT_EVENT_NEON_PS_STARTING;
|
||||
uint32 WAIT_EVENT_NEON_PS_CONFIGURING;
|
||||
uint32 WAIT_EVENT_NEON_PS_SEND;
|
||||
@@ -539,7 +538,6 @@ neon_shmem_startup_hook(void)
|
||||
WAIT_EVENT_NEON_LFC_READ = WaitEventExtensionNew("Neon/FileCache_Read");
|
||||
WAIT_EVENT_NEON_LFC_TRUNCATE = WaitEventExtensionNew("Neon/FileCache_Truncate");
|
||||
WAIT_EVENT_NEON_LFC_WRITE = WaitEventExtensionNew("Neon/FileCache_Write");
|
||||
WAIT_EVENT_NEON_LFC_CV_WAIT = WaitEventExtensionNew("Neon/FileCache_CvWait");
|
||||
WAIT_EVENT_NEON_PS_STARTING = WaitEventExtensionNew("Neon/PS_Starting");
|
||||
WAIT_EVENT_NEON_PS_CONFIGURING = WaitEventExtensionNew("Neon/PS_Configuring");
|
||||
WAIT_EVENT_NEON_PS_SEND = WaitEventExtensionNew("Neon/PS_SendIO");
|
||||
|
||||
@@ -28,7 +28,6 @@ extern uint32 WAIT_EVENT_NEON_LFC_MAINTENANCE;
|
||||
extern uint32 WAIT_EVENT_NEON_LFC_READ;
|
||||
extern uint32 WAIT_EVENT_NEON_LFC_TRUNCATE;
|
||||
extern uint32 WAIT_EVENT_NEON_LFC_WRITE;
|
||||
extern uint32 WAIT_EVENT_NEON_LFC_CV_WAIT;
|
||||
extern uint32 WAIT_EVENT_NEON_PS_STARTING;
|
||||
extern uint32 WAIT_EVENT_NEON_PS_CONFIGURING;
|
||||
extern uint32 WAIT_EVENT_NEON_PS_SEND;
|
||||
@@ -39,7 +38,6 @@ extern uint32 WAIT_EVENT_NEON_WAL_DL;
|
||||
#define WAIT_EVENT_NEON_LFC_READ WAIT_EVENT_BUFFILE_READ
|
||||
#define WAIT_EVENT_NEON_LFC_TRUNCATE WAIT_EVENT_BUFFILE_TRUNCATE
|
||||
#define WAIT_EVENT_NEON_LFC_WRITE WAIT_EVENT_BUFFILE_WRITE
|
||||
#define WAIT_EVENT_NEON_LFC_CV_WAIT WAIT_EVENT_BUFFILE_READ
|
||||
#define WAIT_EVENT_NEON_PS_STARTING PG_WAIT_EXTENSION
|
||||
#define WAIT_EVENT_NEON_PS_CONFIGURING PG_WAIT_EXTENSION
|
||||
#define WAIT_EVENT_NEON_PS_SEND PG_WAIT_EXTENSION
|
||||
|
||||
@@ -233,7 +233,6 @@ extern char *neon_timeline;
|
||||
extern char *neon_tenant;
|
||||
extern int32 max_cluster_size;
|
||||
extern int neon_protocol_version;
|
||||
extern bool lfc_store_prefetch_result;
|
||||
|
||||
extern shardno_t get_shard_number(BufferTag* tag);
|
||||
|
||||
@@ -302,16 +301,14 @@ extern bool lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum,
|
||||
BlockNumber blkno);
|
||||
extern int lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum,
|
||||
BlockNumber blkno, int nblocks, bits8 *bitmap);
|
||||
extern void lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno);
|
||||
extern void lfc_init(void);
|
||||
extern bool lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
const void* buffer, XLogRecPtr lsn);
|
||||
|
||||
|
||||
static inline bool
|
||||
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
void *buffer)
|
||||
{
|
||||
bits8 rv = 1;
|
||||
bits8 rv = 0;
|
||||
return lfc_readv_select(rinfo, forkNum, blkno, &buffer, 1, &rv) == 1;
|
||||
}
|
||||
|
||||
|
||||
@@ -162,7 +162,7 @@ static uint32 local_request_counter;
|
||||
* UNUSED ------> REQUESTED --> RECEIVED
|
||||
* ^ : | |
|
||||
* | : v |
|
||||
* | : TAG_REMAINS |
|
||||
* | : TAG_UNUSED |
|
||||
* | : | |
|
||||
* +----------------+------------+
|
||||
* :
|
||||
@@ -181,7 +181,7 @@ typedef enum PrefetchStatus
|
||||
/* must fit in uint8; bits 0x1 are used */
|
||||
typedef enum {
|
||||
PRFSF_NONE = 0x0,
|
||||
PRFSF_LFC = 0x1 /* received prefetch result is stored in LFC */
|
||||
PRFSF_SEQ = 0x1,
|
||||
} PrefetchRequestFlags;
|
||||
|
||||
typedef struct PrefetchRequest
|
||||
@@ -305,7 +305,7 @@ GetLastWrittenLSNv(NRelFileInfo relfilenode, ForkNumber forknum,
|
||||
static void
|
||||
neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum,
|
||||
BlockNumber blkno, neon_request_lsns *output,
|
||||
BlockNumber nblocks);
|
||||
BlockNumber nblocks, const bits8 *mask);
|
||||
static bool neon_prefetch_response_usable(neon_request_lsns *request_lsns,
|
||||
PrefetchRequest *slot);
|
||||
|
||||
@@ -363,7 +363,6 @@ compact_prefetch_buffers(void)
|
||||
target_slot->buftag = source_slot->buftag;
|
||||
target_slot->shard_no = source_slot->shard_no;
|
||||
target_slot->status = source_slot->status;
|
||||
target_slot->flags = source_slot->flags;
|
||||
target_slot->response = source_slot->response;
|
||||
target_slot->reqid = source_slot->reqid;
|
||||
target_slot->request_lsns = source_slot->request_lsns;
|
||||
@@ -453,18 +452,6 @@ prefetch_pump_state(void)
|
||||
/* update slot state */
|
||||
slot->status = PRFS_RECEIVED;
|
||||
slot->response = response;
|
||||
|
||||
if (response->tag == T_NeonGetPageResponse && !(slot->flags & PRFSF_LFC) && lfc_store_prefetch_result)
|
||||
{
|
||||
/*
|
||||
* Store prefetched result in LFC (please read comments to lfc_prefetch
|
||||
* explaining why it can be done without holding shared buffer lock
|
||||
*/
|
||||
if (lfc_prefetch(BufTagGetNRelFileInfo(slot->buftag), slot->buftag.forkNum, slot->buftag.blockNum, ((NeonGetPageResponse*)response)->page, slot->request_lsns.not_modified_since))
|
||||
{
|
||||
slot->flags |= PRFSF_LFC;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -726,18 +713,6 @@ prefetch_read(PrefetchRequest *slot)
|
||||
/* update slot state */
|
||||
slot->status = PRFS_RECEIVED;
|
||||
slot->response = response;
|
||||
|
||||
if (response->tag == T_NeonGetPageResponse && !(slot->flags & PRFSF_LFC) && lfc_store_prefetch_result)
|
||||
{
|
||||
/*
|
||||
* Store prefetched result in LFC (please read comments to lfc_prefetch
|
||||
* explaining why it can be done without holding shared buffer lock
|
||||
*/
|
||||
if (lfc_prefetch(BufTagGetNRelFileInfo(buftag), buftag.forkNum, buftag.blockNum, ((NeonGetPageResponse*)response)->page, slot->request_lsns.not_modified_since))
|
||||
{
|
||||
slot->flags |= PRFSF_LFC;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
else
|
||||
@@ -889,7 +864,7 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
|
||||
else
|
||||
neon_get_request_lsns(BufTagGetNRelFileInfo(slot->buftag),
|
||||
slot->buftag.forkNum, slot->buftag.blockNum,
|
||||
&slot->request_lsns, 1);
|
||||
&slot->request_lsns, 1, NULL);
|
||||
request.hdr.lsn = slot->request_lsns.request_lsn;
|
||||
request.hdr.not_modified_since = slot->request_lsns.not_modified_since;
|
||||
|
||||
@@ -915,73 +890,6 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
|
||||
Assert(!found);
|
||||
}
|
||||
|
||||
/*
|
||||
* Lookup of already received prefetch requests. Only already received responses matching required LSNs are accepted.
|
||||
* Present pages are marked in "mask" bitmap and total number of such pages is returned.
|
||||
*/
|
||||
static int
|
||||
prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blocknum, neon_request_lsns *lsns,
|
||||
BlockNumber nblocks, void **buffers, bits8 *mask)
|
||||
{
|
||||
int hits = 0;
|
||||
PrefetchRequest hashkey;
|
||||
|
||||
/*
|
||||
* Use an intermediate PrefetchRequest struct as the hash key to ensure
|
||||
* correct alignment and that the padding bytes are cleared.
|
||||
*/
|
||||
memset(&hashkey.buftag, 0, sizeof(BufferTag));
|
||||
CopyNRelFileInfoToBufTag(hashkey.buftag, rinfo);
|
||||
hashkey.buftag.forkNum = forknum;
|
||||
|
||||
for (int i = 0; i < nblocks; i++)
|
||||
{
|
||||
PrfHashEntry *entry;
|
||||
|
||||
hashkey.buftag.blockNum = blocknum + i;
|
||||
entry = prfh_lookup(MyPState->prf_hash, &hashkey);
|
||||
|
||||
if (entry != NULL)
|
||||
{
|
||||
PrefetchRequest *slot = entry->slot;
|
||||
uint64 ring_index = slot->my_ring_index;
|
||||
Assert(slot == GetPrfSlot(ring_index));
|
||||
|
||||
Assert(slot->status != PRFS_UNUSED);
|
||||
Assert(MyPState->ring_last <= ring_index &&
|
||||
ring_index < MyPState->ring_unused);
|
||||
Assert(BufferTagsEqual(&slot->buftag, &hashkey.buftag));
|
||||
|
||||
if (slot->status != PRFS_RECEIVED)
|
||||
continue;
|
||||
|
||||
/*
|
||||
* If the caller specified a request LSN to use, only accept
|
||||
* prefetch responses that satisfy that request.
|
||||
*/
|
||||
if (!neon_prefetch_response_usable(&lsns[i], slot))
|
||||
continue;
|
||||
|
||||
memcpy(buffers[i], ((NeonGetPageResponse*)slot->response)->page, BLCKSZ);
|
||||
prefetch_set_unused(ring_index);
|
||||
BITMAP_SET(mask, i);
|
||||
|
||||
hits += 1;
|
||||
}
|
||||
}
|
||||
pgBufferUsage.prefetch.hits += hits;
|
||||
return hits;
|
||||
}
|
||||
|
||||
#if PG_MAJORVERSION_NUM < 17
|
||||
static bool
|
||||
prefetch_lookup(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkn, neon_request_lsns *lsns, void *buffer)
|
||||
{
|
||||
bits8 present = 0;
|
||||
return prefetch_lookupv(rinfo, forkNum, blkn, lsns, 1, &buffer, &present) != 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
/*
|
||||
* prefetch_register_bufferv() - register and prefetch buffers
|
||||
*
|
||||
@@ -1105,6 +1013,8 @@ Retry:
|
||||
/* The buffered request is good enough, return that index */
|
||||
if (is_prefetch)
|
||||
pgBufferUsage.prefetch.duplicates++;
|
||||
else
|
||||
pgBufferUsage.prefetch.hits++;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
@@ -1206,7 +1116,6 @@ Retry:
|
||||
slot->buftag = hashkey.buftag;
|
||||
slot->shard_no = get_shard_number(&tag);
|
||||
slot->my_ring_index = ring_index;
|
||||
slot->flags = 0;
|
||||
|
||||
min_ring_index = Min(min_ring_index, ring_index);
|
||||
|
||||
@@ -2147,7 +2056,8 @@ GetLastWrittenLSNv(NRelFileInfo relfilenode, ForkNumber forknum,
|
||||
*/
|
||||
static void
|
||||
neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
neon_request_lsns *output, BlockNumber nblocks)
|
||||
neon_request_lsns *output, BlockNumber nblocks,
|
||||
const bits8 *mask)
|
||||
{
|
||||
XLogRecPtr last_written_lsns[PG_IOV_MAX];
|
||||
|
||||
@@ -2235,6 +2145,9 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
neon_request_lsns *result = &output[i];
|
||||
XLogRecPtr last_written_lsn = last_written_lsns[i];
|
||||
|
||||
if (PointerIsValid(mask) && !BITMAP_ISSET(mask, i))
|
||||
continue;
|
||||
|
||||
if (last_written_lsn > replay_lsn)
|
||||
{
|
||||
/* GetCurrentReplayRecPtr was introduced in v15 */
|
||||
@@ -2277,6 +2190,8 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
neon_request_lsns *result = &output[i];
|
||||
XLogRecPtr last_written_lsn = last_written_lsns[i];
|
||||
|
||||
if (PointerIsValid(mask) && !BITMAP_ISSET(mask, i))
|
||||
continue;
|
||||
/*
|
||||
* Use the latest LSN that was evicted from the buffer cache as the
|
||||
* 'not_modified_since' hint. Any pages modified by later WAL records
|
||||
@@ -2498,7 +2413,7 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
|
||||
}
|
||||
|
||||
neon_get_request_lsns(InfoFromSMgrRel(reln), forkNum,
|
||||
REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1);
|
||||
REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1, NULL);
|
||||
{
|
||||
NeonExistsRequest request = {
|
||||
.hdr.tag = T_NeonExistsRequest,
|
||||
@@ -2917,7 +2832,8 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
while (nblocks > 0)
|
||||
{
|
||||
int iterblocks = Min(nblocks, PG_IOV_MAX);
|
||||
bits8 lfc_present[PG_IOV_MAX / 8] = {0};
|
||||
bits8 lfc_present[PG_IOV_MAX / 8];
|
||||
memset(lfc_present, 0, sizeof(lfc_present));
|
||||
|
||||
if (lfc_cache_containsv(InfoFromSMgrRel(reln), forknum, blocknum,
|
||||
iterblocks, lfc_present) == iterblocks)
|
||||
@@ -2928,13 +2844,12 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
}
|
||||
|
||||
tag.blockNum = blocknum;
|
||||
|
||||
|
||||
for (int i = 0; i < PG_IOV_MAX / 8; i++)
|
||||
lfc_present[i] = ~(lfc_present[i]);
|
||||
|
||||
ring_index = prefetch_register_bufferv(tag, NULL, iterblocks,
|
||||
lfc_present, true);
|
||||
|
||||
nblocks -= iterblocks;
|
||||
blocknum += iterblocks;
|
||||
|
||||
@@ -3190,8 +3105,7 @@ Retry:
|
||||
}
|
||||
}
|
||||
memcpy(buffer, getpage_resp->page, BLCKSZ);
|
||||
if (!lfc_store_prefetch_result)
|
||||
lfc_write(rinfo, forkNum, blockno, buffer);
|
||||
lfc_write(rinfo, forkNum, blockno, buffer);
|
||||
break;
|
||||
}
|
||||
case T_NeonErrorResponse:
|
||||
@@ -3276,17 +3190,6 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
|
||||
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
|
||||
}
|
||||
|
||||
/* Try to read PS results if they are available */
|
||||
prefetch_pump_state();
|
||||
|
||||
neon_get_request_lsns(InfoFromSMgrRel(reln), forkNum, blkno, &request_lsns, 1);
|
||||
|
||||
if (prefetch_lookup(InfoFromSMgrRel(reln), forkNum, blkno, &request_lsns, buffer))
|
||||
{
|
||||
/* Prefetch hit */
|
||||
return;
|
||||
}
|
||||
|
||||
/* Try to read from local file cache */
|
||||
if (lfc_read(InfoFromSMgrRel(reln), forkNum, blkno, buffer))
|
||||
{
|
||||
@@ -3294,11 +3197,9 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
|
||||
return;
|
||||
}
|
||||
|
||||
neon_get_request_lsns(InfoFromSMgrRel(reln), forkNum, blkno, &request_lsns, 1, NULL);
|
||||
neon_read_at_lsn(InfoFromSMgrRel(reln), forkNum, blkno, request_lsns, buffer);
|
||||
|
||||
/*
|
||||
* Try to receive prefetch results once again just to make sure we don't leave the smgr code while the OS might still have buffered bytes.
|
||||
*/
|
||||
prefetch_pump_state();
|
||||
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
@@ -3379,14 +3280,11 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
|
||||
#if PG_MAJORVERSION_NUM >= 17
|
||||
static void
|
||||
neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
void **buffers, BlockNumber nblocks)
|
||||
void **buffers, BlockNumber nblocks)
|
||||
{
|
||||
bits8 prefetch_hits[PG_IOV_MAX / 8] = {0};
|
||||
bits8 lfc_hits[PG_IOV_MAX / 8];
|
||||
bits8 read[PG_IOV_MAX / 8];
|
||||
neon_request_lsns request_lsns[PG_IOV_MAX];
|
||||
int lfc_result;
|
||||
int prefetch_result;
|
||||
|
||||
switch (reln->smgr_relpersistence)
|
||||
{
|
||||
@@ -3409,52 +3307,38 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
neon_log(ERROR, "Read request too large: %d is larger than max %d",
|
||||
nblocks, PG_IOV_MAX);
|
||||
|
||||
/* Try to read PS results if they are available */
|
||||
prefetch_pump_state();
|
||||
|
||||
neon_get_request_lsns(InfoFromSMgrRel(reln), forknum, blocknum,
|
||||
request_lsns, nblocks);
|
||||
|
||||
|
||||
prefetch_result = prefetch_lookupv(InfoFromSMgrRel(reln), forknum, blocknum, request_lsns, nblocks, buffers, prefetch_hits);
|
||||
|
||||
if (prefetch_result == nblocks)
|
||||
return;
|
||||
|
||||
/* invert the result: exclude prefetched blocks */
|
||||
for (int i = 0; i < PG_IOV_MAX / 8; i++)
|
||||
lfc_hits[i] = ~prefetch_hits[i];
|
||||
memset(read, 0, sizeof(read));
|
||||
|
||||
/* Try to read from local file cache */
|
||||
lfc_result = lfc_readv_select(InfoFromSMgrRel(reln), forknum, blocknum, buffers,
|
||||
nblocks, lfc_hits);
|
||||
nblocks, read);
|
||||
|
||||
if (lfc_result > 0)
|
||||
MyNeonCounters->file_cache_hits_total += lfc_result;
|
||||
|
||||
/* Read all blocks from LFC, so we're done */
|
||||
if (prefetch_result + lfc_result == nblocks)
|
||||
if (lfc_result == nblocks)
|
||||
return;
|
||||
|
||||
if (lfc_result <= 0)
|
||||
if (lfc_result == -1)
|
||||
{
|
||||
/* can't use the LFC result, so read all blocks from PS */
|
||||
for (int i = 0; i < PG_IOV_MAX / 8; i++)
|
||||
read[i] = ~prefetch_hits[i];
|
||||
read[i] = 0xFF;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* invert the result: exclude blocks read from lfc */
|
||||
for (int i = 0; i < PG_IOV_MAX / 8; i++)
|
||||
read[i] = ~(prefetch_hits[i] | lfc_hits[i]);
|
||||
read[i] = ~(read[i]);
|
||||
}
|
||||
|
||||
neon_get_request_lsns(InfoFromSMgrRel(reln), forknum, blocknum,
|
||||
request_lsns, nblocks, read);
|
||||
|
||||
neon_read_at_lsnv(InfoFromSMgrRel(reln), forknum, blocknum, request_lsns,
|
||||
buffers, nblocks, read);
|
||||
|
||||
/*
|
||||
* Try to receive prefetch results once again just to make sure we don't leave the smgr code while the OS might still have buffered bytes.
|
||||
*/
|
||||
prefetch_pump_state();
|
||||
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
@@ -3726,7 +3610,7 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
|
||||
}
|
||||
|
||||
neon_get_request_lsns(InfoFromSMgrRel(reln), forknum,
|
||||
REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1);
|
||||
REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1, NULL);
|
||||
|
||||
{
|
||||
NeonNblocksRequest request = {
|
||||
@@ -3811,7 +3695,7 @@ neon_dbsize(Oid dbNode)
|
||||
NRelFileInfo dummy_node = {0};
|
||||
|
||||
neon_get_request_lsns(dummy_node, MAIN_FORKNUM,
|
||||
REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1);
|
||||
REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1, NULL);
|
||||
|
||||
{
|
||||
NeonDbSizeRequest request = {
|
||||
@@ -4546,12 +4430,7 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
|
||||
if (no_redo_needed)
|
||||
{
|
||||
SetLastWrittenLSNForBlock(end_recptr, rinfo, forknum, blkno);
|
||||
/*
|
||||
* Redo changes if page exists in LFC.
|
||||
* We should perform this check after assigning LwLSN to prevent
|
||||
* prefetching of some older version of the page by some other backend.
|
||||
*/
|
||||
no_redo_needed = !lfc_cache_contains(rinfo, forknum, blkno);
|
||||
lfc_evict(rinfo, forknum, blkno);
|
||||
}
|
||||
|
||||
LWLockRelease(partitionLock);
|
||||
|
||||
@@ -32,8 +32,8 @@
|
||||
|
||||
#include "inmem_smgr.h"
|
||||
|
||||
/* Size of the in-memory smgr: XLR_MAX_BLOCK_ID is 32, but we can update up to 3 forks for each block */
|
||||
#define MAX_PAGES 100
|
||||
/* Size of the in-memory smgr */
|
||||
#define MAX_PAGES 64
|
||||
|
||||
/* If more than WARN_PAGES are used, print a warning in the log */
|
||||
#define WARN_PAGES 32
|
||||
@@ -285,12 +285,12 @@ inmem_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
* WARN_PAGES, print a warning so that we get alerted and get to
|
||||
* investigate why we're accessing so many buffers.
|
||||
*/
|
||||
if (used_pages >= WARN_PAGES)
|
||||
ereport(WARNING, (errmsg("inmem_write() called for %u/%u/%u.%u blk %u: used_pages %u",
|
||||
RelFileInfoFmt(InfoFromSMgrRel(reln)),
|
||||
forknum,
|
||||
blocknum,
|
||||
used_pages), errbacktrace()));
|
||||
elog(used_pages >= WARN_PAGES ? WARNING : DEBUG1,
|
||||
"inmem_write() called for %u/%u/%u.%u blk %u: used_pages %u",
|
||||
RelFileInfoFmt(InfoFromSMgrRel(reln)),
|
||||
forknum,
|
||||
blocknum,
|
||||
used_pages);
|
||||
if (used_pages == MAX_PAGES)
|
||||
elog(ERROR, "Inmem storage overflow");
|
||||
|
||||
|
||||
@@ -142,7 +142,7 @@ static BufferTag target_redo_tag;
|
||||
|
||||
static XLogReaderState *reader_state;
|
||||
|
||||
#define TRACE DEBUG1
|
||||
#define TRACE LOG
|
||||
|
||||
#ifdef HAVE_LIBSECCOMP
|
||||
|
||||
@@ -194,7 +194,6 @@ static PgSeccompRule allowed_syscalls[] =
|
||||
* is stored in MyProcPid anyway.
|
||||
*/
|
||||
PG_SCMP_ALLOW(getpid),
|
||||
PG_SCMP_ALLOW(futex), /* needed for errbacktrace */
|
||||
|
||||
/* Enable those for a proper shutdown. */
|
||||
#if 0
|
||||
@@ -254,7 +253,7 @@ WalRedoMain(int argc, char *argv[])
|
||||
* which is super strange but that's not something we can solve
|
||||
* for here. ¯\_(-_-)_/¯
|
||||
*/
|
||||
SetConfigOption("log_min_messages", "WARNING", PGC_SUSET, PGC_S_OVERRIDE);
|
||||
SetConfigOption("log_min_messages", "FATAL", PGC_SUSET, PGC_S_OVERRIDE);
|
||||
SetConfigOption("client_min_messages", "ERROR", PGC_SUSET,
|
||||
PGC_S_OVERRIDE);
|
||||
|
||||
@@ -759,11 +758,6 @@ BeginRedoForBlock(StringInfo input_message)
|
||||
{
|
||||
reln->smgr_cached_nblocks[forknum] = blknum + 1;
|
||||
}
|
||||
if (target_redo_tag.forkNum == MAIN_FORKNUM)
|
||||
{
|
||||
reln->smgr_cached_nblocks[FSM_FORKNUM] = MaxBlockNumber;
|
||||
reln->smgr_cached_nblocks[VISIBILITYMAP_FORKNUM] = MaxBlockNumber;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -1059,9 +1053,6 @@ GetPage(StringInfo input_message)
|
||||
DropRelationAllLocalBuffers(rinfo);
|
||||
wal_redo_buffer = InvalidBuffer;
|
||||
|
||||
/* Remove relation from SMGR relastion cache */
|
||||
AtEOXact_SMgr();
|
||||
|
||||
elog(TRACE, "Page sent back for block %u", blknum);
|
||||
}
|
||||
|
||||
|
||||
@@ -24,7 +24,6 @@ hex.workspace = true
|
||||
hyper0.workspace = true
|
||||
humantime.workspace = true
|
||||
itertools.workspace = true
|
||||
json-structural-diff.workspace = true
|
||||
lasso.workspace = true
|
||||
once_cell.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
|
||||
@@ -598,10 +598,7 @@ async fn handle_tenant_timeline_passthrough(
|
||||
|
||||
let _timer = latency.start_timer(labels.clone());
|
||||
|
||||
let client = mgmt_api::Client::new(
|
||||
node.base_url(),
|
||||
service.get_config().pageserver_jwt_token.as_deref(),
|
||||
);
|
||||
let client = mgmt_api::Client::new(node.base_url(), service.get_config().jwt_token.as_deref());
|
||||
let resp = client.get_raw(path).await.map_err(|e|
|
||||
// We return 503 here because if we can't successfully send a request to the pageserver,
|
||||
// either we aren't available or the pageserver is unavailable.
|
||||
@@ -1357,7 +1354,10 @@ async fn handle_safekeeper_scheduling_policy(
|
||||
.set_safekeeper_scheduling_policy(id, body.scheduling_policy)
|
||||
.await?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
Ok(Response::builder()
|
||||
.status(StatusCode::NO_CONTENT)
|
||||
.body(Body::empty())
|
||||
.unwrap())
|
||||
}
|
||||
|
||||
/// Common wrapper for request handlers that call into Service and will operate on tenants: they must only
|
||||
|
||||
@@ -53,10 +53,6 @@ struct Cli {
|
||||
#[arg(long)]
|
||||
jwt_token: Option<String>,
|
||||
|
||||
/// Token for authenticating this service with the safekeepers it controls
|
||||
#[arg(long)]
|
||||
safekeeper_jwt_token: Option<String>,
|
||||
|
||||
/// Token for authenticating this service with the control plane, when calling
|
||||
/// the compute notification endpoint
|
||||
#[arg(long)]
|
||||
@@ -157,8 +153,7 @@ impl Default for StrictMode {
|
||||
struct Secrets {
|
||||
database_url: String,
|
||||
public_key: Option<JwtAuth>,
|
||||
pageserver_jwt_token: Option<String>,
|
||||
safekeeper_jwt_token: Option<String>,
|
||||
jwt_token: Option<String>,
|
||||
control_plane_jwt_token: Option<String>,
|
||||
peer_jwt_token: Option<String>,
|
||||
}
|
||||
@@ -166,7 +161,6 @@ struct Secrets {
|
||||
impl Secrets {
|
||||
const DATABASE_URL_ENV: &'static str = "DATABASE_URL";
|
||||
const PAGESERVER_JWT_TOKEN_ENV: &'static str = "PAGESERVER_JWT_TOKEN";
|
||||
const SAFEKEEPER_JWT_TOKEN_ENV: &'static str = "SAFEKEEPER_JWT_TOKEN";
|
||||
const CONTROL_PLANE_JWT_TOKEN_ENV: &'static str = "CONTROL_PLANE_JWT_TOKEN";
|
||||
const PEER_JWT_TOKEN_ENV: &'static str = "PEER_JWT_TOKEN";
|
||||
const PUBLIC_KEY_ENV: &'static str = "PUBLIC_KEY";
|
||||
@@ -190,14 +184,7 @@ impl Secrets {
|
||||
let this = Self {
|
||||
database_url,
|
||||
public_key,
|
||||
pageserver_jwt_token: Self::load_secret(
|
||||
&args.jwt_token,
|
||||
Self::PAGESERVER_JWT_TOKEN_ENV,
|
||||
),
|
||||
safekeeper_jwt_token: Self::load_secret(
|
||||
&args.safekeeper_jwt_token,
|
||||
Self::SAFEKEEPER_JWT_TOKEN_ENV,
|
||||
),
|
||||
jwt_token: Self::load_secret(&args.jwt_token, Self::PAGESERVER_JWT_TOKEN_ENV),
|
||||
control_plane_jwt_token: Self::load_secret(
|
||||
&args.control_plane_jwt_token,
|
||||
Self::CONTROL_PLANE_JWT_TOKEN_ENV,
|
||||
@@ -277,17 +264,11 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
|
||||
let secrets = Secrets::load(&args).await?;
|
||||
|
||||
// TODO: once we've rolled out the safekeeper JWT token everywhere, put it into the validation code below
|
||||
tracing::info!(
|
||||
"safekeeper_jwt_token set: {:?}",
|
||||
secrets.safekeeper_jwt_token.is_some()
|
||||
);
|
||||
|
||||
// Validate required secrets and arguments are provided in strict mode
|
||||
match strict_mode {
|
||||
StrictMode::Strict
|
||||
if (secrets.public_key.is_none()
|
||||
|| secrets.pageserver_jwt_token.is_none()
|
||||
|| secrets.jwt_token.is_none()
|
||||
|| secrets.control_plane_jwt_token.is_none()) =>
|
||||
{
|
||||
// Production systems should always have secrets configured: if public_key was not set
|
||||
@@ -312,8 +293,7 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
}
|
||||
|
||||
let config = Config {
|
||||
pageserver_jwt_token: secrets.pageserver_jwt_token,
|
||||
safekeeper_jwt_token: secrets.safekeeper_jwt_token,
|
||||
jwt_token: secrets.jwt_token,
|
||||
control_plane_jwt_token: secrets.control_plane_jwt_token,
|
||||
peer_jwt_token: secrets.peer_jwt_token,
|
||||
compute_hook_url: args.compute_hook_url,
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use crate::pageserver_client::PageserverClient;
|
||||
use crate::persistence::Persistence;
|
||||
use crate::{compute_hook, service};
|
||||
use json_structural_diff::JsonDiff;
|
||||
use pageserver_api::controller_api::{AvailabilityZone, MigrationConfig, PlacementPolicy};
|
||||
use pageserver_api::models::{
|
||||
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig, TenantWaitLsnRequest,
|
||||
@@ -25,7 +24,7 @@ use crate::compute_hook::{ComputeHook, NotifyError};
|
||||
use crate::node::Node;
|
||||
use crate::tenant_shard::{IntentState, ObservedState, ObservedStateDelta, ObservedStateLocation};
|
||||
|
||||
const DEFAULT_HEATMAP_PERIOD: Duration = Duration::from_secs(60);
|
||||
const DEFAULT_HEATMAP_PERIOD: &str = "60s";
|
||||
|
||||
/// Object with the lifetime of the background reconcile task that is created
|
||||
/// for tenants which have a difference between their intent and observed states.
|
||||
@@ -297,7 +296,7 @@ impl Reconciler {
|
||||
.location_config(tenant_shard_id, config.clone(), flush_ms, lazy)
|
||||
.await
|
||||
},
|
||||
&self.service_config.pageserver_jwt_token,
|
||||
&self.service_config.jwt_token,
|
||||
1,
|
||||
3,
|
||||
timeout,
|
||||
@@ -418,7 +417,7 @@ impl Reconciler {
|
||||
let client = PageserverClient::new(
|
||||
node.get_id(),
|
||||
node.base_url(),
|
||||
self.service_config.pageserver_jwt_token.as_deref(),
|
||||
self.service_config.jwt_token.as_deref(),
|
||||
);
|
||||
|
||||
client
|
||||
@@ -441,7 +440,7 @@ impl Reconciler {
|
||||
let client = PageserverClient::new(
|
||||
node.get_id(),
|
||||
node.base_url(),
|
||||
self.service_config.pageserver_jwt_token.as_deref(),
|
||||
self.service_config.jwt_token.as_deref(),
|
||||
);
|
||||
|
||||
let timelines = client.timeline_list(&tenant_shard_id).await?;
|
||||
@@ -479,7 +478,7 @@ impl Reconciler {
|
||||
)
|
||||
.await
|
||||
},
|
||||
&self.service_config.pageserver_jwt_token,
|
||||
&self.service_config.jwt_token,
|
||||
1,
|
||||
3,
|
||||
request_download_timeout * 2,
|
||||
@@ -772,7 +771,7 @@ impl Reconciler {
|
||||
let observed_conf = match attached_node
|
||||
.with_client_retries(
|
||||
|client| async move { client.get_location_config(tenant_shard_id).await },
|
||||
&self.service_config.pageserver_jwt_token,
|
||||
&self.service_config.jwt_token,
|
||||
1,
|
||||
1,
|
||||
Duration::from_secs(5),
|
||||
@@ -881,27 +880,7 @@ impl Reconciler {
|
||||
self.generation = Some(generation);
|
||||
wanted_conf.generation = generation.into();
|
||||
}
|
||||
|
||||
let diff = match observed {
|
||||
Some(ObservedStateLocation {
|
||||
conf: Some(observed),
|
||||
}) => {
|
||||
let diff = JsonDiff::diff(
|
||||
&serde_json::to_value(observed.clone()).unwrap(),
|
||||
&serde_json::to_value(wanted_conf.clone()).unwrap(),
|
||||
false,
|
||||
);
|
||||
|
||||
if let Some(json_diff) = diff.diff {
|
||||
serde_json::to_string(&json_diff).unwrap_or("diff err".to_string())
|
||||
} else {
|
||||
"unknown".to_string()
|
||||
}
|
||||
}
|
||||
_ => "full".to_string(),
|
||||
};
|
||||
|
||||
tracing::info!(node_id=%node.get_id(), "Observed configuration requires update: {diff}");
|
||||
tracing::info!(node_id=%node.get_id(), "Observed configuration requires update.");
|
||||
|
||||
// Because `node` comes from a ref to &self, clone it before calling into a &mut self
|
||||
// function: this could be avoided by refactoring the state mutated by location_config into
|
||||
@@ -1120,7 +1099,7 @@ impl Reconciler {
|
||||
match origin
|
||||
.with_client_retries(
|
||||
|client| async move { client.get_location_config(tenant_shard_id).await },
|
||||
&self.service_config.pageserver_jwt_token,
|
||||
&self.service_config.jwt_token,
|
||||
1,
|
||||
3,
|
||||
Duration::from_secs(5),
|
||||
@@ -1201,7 +1180,7 @@ fn ha_aware_config(config: &TenantConfig, has_secondaries: bool) -> TenantConfig
|
||||
let mut config = config.clone();
|
||||
if has_secondaries {
|
||||
if config.heatmap_period.is_none() {
|
||||
config.heatmap_period = Some(DEFAULT_HEATMAP_PERIOD);
|
||||
config.heatmap_period = Some(DEFAULT_HEATMAP_PERIOD.to_string());
|
||||
}
|
||||
} else {
|
||||
config.heatmap_period = None;
|
||||
|
||||
@@ -348,12 +348,7 @@ pub struct Config {
|
||||
// All pageservers managed by one instance of this service must have
|
||||
// the same public key. This JWT token will be used to authenticate
|
||||
// this service to the pageservers it manages.
|
||||
pub pageserver_jwt_token: Option<String>,
|
||||
|
||||
// All safekeepers managed by one instance of this service must have
|
||||
// the same public key. This JWT token will be used to authenticate
|
||||
// this service to the safekeepers it manages.
|
||||
pub safekeeper_jwt_token: Option<String>,
|
||||
pub jwt_token: Option<String>,
|
||||
|
||||
// This JWT token will be used to authenticate this service to the control plane.
|
||||
pub control_plane_jwt_token: Option<String>,
|
||||
@@ -887,7 +882,7 @@ impl Service {
|
||||
let response = node
|
||||
.with_client_retries(
|
||||
|client| async move { client.list_location_config().await },
|
||||
&self.config.pageserver_jwt_token,
|
||||
&self.config.jwt_token,
|
||||
1,
|
||||
5,
|
||||
timeout,
|
||||
@@ -988,7 +983,7 @@ impl Service {
|
||||
let client = PageserverClient::new(
|
||||
node.get_id(),
|
||||
node.base_url(),
|
||||
self.config.pageserver_jwt_token.as_deref(),
|
||||
self.config.jwt_token.as_deref(),
|
||||
);
|
||||
match client
|
||||
.location_config(
|
||||
@@ -1558,14 +1553,14 @@ impl Service {
|
||||
let reconcilers_cancel = cancel.child_token();
|
||||
|
||||
let heartbeater_ps = Heartbeater::new(
|
||||
config.pageserver_jwt_token.clone(),
|
||||
config.jwt_token.clone(),
|
||||
config.max_offline_interval,
|
||||
config.max_warming_up_interval,
|
||||
cancel.clone(),
|
||||
);
|
||||
|
||||
let heartbeater_sk = Heartbeater::new(
|
||||
config.safekeeper_jwt_token.clone(),
|
||||
config.jwt_token.clone(),
|
||||
config.max_offline_interval,
|
||||
config.max_warming_up_interval,
|
||||
cancel.clone(),
|
||||
@@ -1912,7 +1907,7 @@ impl Service {
|
||||
let configs = match node
|
||||
.with_client_retries(
|
||||
|client| async move { client.list_location_config().await },
|
||||
&self.config.pageserver_jwt_token,
|
||||
&self.config.jwt_token,
|
||||
1,
|
||||
5,
|
||||
SHORT_RECONCILE_TIMEOUT,
|
||||
@@ -1970,7 +1965,7 @@ impl Service {
|
||||
.location_config(tenant_shard_id, config, None, false)
|
||||
.await
|
||||
},
|
||||
&self.config.pageserver_jwt_token,
|
||||
&self.config.jwt_token,
|
||||
1,
|
||||
5,
|
||||
SHORT_RECONCILE_TIMEOUT,
|
||||
@@ -2926,9 +2921,7 @@ impl Service {
|
||||
first
|
||||
};
|
||||
|
||||
let updated_config = base
|
||||
.apply_patch(patch)
|
||||
.map_err(|err| ApiError::BadRequest(anyhow::anyhow!(err)))?;
|
||||
let updated_config = base.apply_patch(patch);
|
||||
self.set_tenant_config_and_reconcile(tenant_id, updated_config)
|
||||
.await
|
||||
}
|
||||
@@ -3107,7 +3100,7 @@ impl Service {
|
||||
let client = PageserverClient::new(
|
||||
node.get_id(),
|
||||
node.base_url(),
|
||||
self.config.pageserver_jwt_token.as_deref(),
|
||||
self.config.jwt_token.as_deref(),
|
||||
);
|
||||
|
||||
tracing::info!("Doing time travel recovery for shard {tenant_shard_id}",);
|
||||
@@ -3168,7 +3161,7 @@ impl Service {
|
||||
let client = PageserverClient::new(
|
||||
node.get_id(),
|
||||
node.base_url(),
|
||||
self.config.pageserver_jwt_token.as_deref(),
|
||||
self.config.jwt_token.as_deref(),
|
||||
);
|
||||
futs.push(async move {
|
||||
let result = client
|
||||
@@ -3291,7 +3284,7 @@ impl Service {
|
||||
.tenant_delete(TenantShardId::unsharded(tenant_id))
|
||||
.await
|
||||
},
|
||||
&self.config.pageserver_jwt_token,
|
||||
&self.config.jwt_token,
|
||||
1,
|
||||
3,
|
||||
RECONCILE_TIMEOUT,
|
||||
@@ -3510,7 +3503,7 @@ impl Service {
|
||||
let timeline_info = create_one(
|
||||
shard_zero_tid,
|
||||
shard_zero_locations,
|
||||
self.config.pageserver_jwt_token.clone(),
|
||||
self.config.jwt_token.clone(),
|
||||
create_req.clone(),
|
||||
)
|
||||
.await?;
|
||||
@@ -3526,7 +3519,7 @@ impl Service {
|
||||
// Create timeline on remaining shards with number >0
|
||||
if !targets.0.is_empty() {
|
||||
// If we had multiple shards, issue requests for the remainder now.
|
||||
let jwt = &self.config.pageserver_jwt_token;
|
||||
let jwt = &self.config.jwt_token;
|
||||
self.tenant_for_shards(
|
||||
targets
|
||||
.0
|
||||
@@ -3609,7 +3602,7 @@ impl Service {
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
node,
|
||||
self.config.pageserver_jwt_token.clone(),
|
||||
self.config.jwt_token.clone(),
|
||||
req.clone(),
|
||||
))
|
||||
})
|
||||
@@ -3690,7 +3683,7 @@ impl Service {
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
node,
|
||||
self.config.pageserver_jwt_token.clone(),
|
||||
self.config.jwt_token.clone(),
|
||||
))
|
||||
})
|
||||
.await?;
|
||||
@@ -3764,7 +3757,7 @@ impl Service {
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
node,
|
||||
self.config.pageserver_jwt_token.clone(),
|
||||
self.config.jwt_token.clone(),
|
||||
dir,
|
||||
))
|
||||
})
|
||||
@@ -3879,7 +3872,7 @@ impl Service {
|
||||
futs.push(async move {
|
||||
node.with_client_retries(
|
||||
|client| op(tenant_shard_id, client),
|
||||
&self.config.pageserver_jwt_token,
|
||||
&self.config.jwt_token,
|
||||
warn_threshold,
|
||||
max_retries,
|
||||
timeout,
|
||||
@@ -4128,7 +4121,7 @@ impl Service {
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
node,
|
||||
self.config.pageserver_jwt_token.clone(),
|
||||
self.config.jwt_token.clone(),
|
||||
))
|
||||
})
|
||||
.await?;
|
||||
@@ -4150,7 +4143,7 @@ impl Service {
|
||||
shard_zero_tid,
|
||||
timeline_id,
|
||||
shard_zero_locations.latest.node,
|
||||
self.config.pageserver_jwt_token.clone(),
|
||||
self.config.jwt_token.clone(),
|
||||
)
|
||||
.await?;
|
||||
Ok(shard_zero_status)
|
||||
@@ -4549,7 +4542,7 @@ impl Service {
|
||||
|
||||
client.location_config(child_id, config, None, false).await
|
||||
},
|
||||
&self.config.pageserver_jwt_token,
|
||||
&self.config.jwt_token,
|
||||
1,
|
||||
10,
|
||||
Duration::from_secs(5),
|
||||
@@ -5149,7 +5142,7 @@ impl Service {
|
||||
let client = PageserverClient::new(
|
||||
node.get_id(),
|
||||
node.base_url(),
|
||||
self.config.pageserver_jwt_token.as_deref(),
|
||||
self.config.jwt_token.as_deref(),
|
||||
);
|
||||
let response = client
|
||||
.tenant_shard_split(
|
||||
@@ -5475,7 +5468,7 @@ impl Service {
|
||||
let client = PageserverClient::new(
|
||||
node.get_id(),
|
||||
node.base_url(),
|
||||
self.config.pageserver_jwt_token.as_deref(),
|
||||
self.config.jwt_token.as_deref(),
|
||||
);
|
||||
|
||||
let scan_result = client
|
||||
@@ -6656,12 +6649,11 @@ impl Service {
|
||||
) -> Option<ReconcilerWaiter> {
|
||||
let reconcile_needed = shard.get_reconcile_needed(nodes);
|
||||
|
||||
let reconcile_reason = match reconcile_needed {
|
||||
match reconcile_needed {
|
||||
ReconcileNeeded::No => return None,
|
||||
ReconcileNeeded::WaitExisting(waiter) => return Some(waiter),
|
||||
ReconcileNeeded::Yes(reason) => {
|
||||
ReconcileNeeded::Yes => {
|
||||
// Fall through to try and acquire units for spawning reconciler
|
||||
reason
|
||||
}
|
||||
};
|
||||
|
||||
@@ -6700,7 +6692,6 @@ impl Service {
|
||||
};
|
||||
|
||||
shard.spawn_reconciler(
|
||||
reconcile_reason,
|
||||
&self.result_tx,
|
||||
nodes,
|
||||
&self.compute_hook,
|
||||
@@ -6825,7 +6816,7 @@ impl Service {
|
||||
// with the frequency of background calls, this acts as an implicit rate limit that runs a small
|
||||
// trickle of optimizations in the background, rather than executing a large number in parallel
|
||||
// when a change occurs.
|
||||
const MAX_OPTIMIZATIONS_EXEC_PER_PASS: usize = 16;
|
||||
const MAX_OPTIMIZATIONS_EXEC_PER_PASS: usize = 2;
|
||||
|
||||
// Synchronous prepare: scan shards for possible scheduling optimizations
|
||||
let candidate_work = self.optimize_all_plan();
|
||||
@@ -6876,7 +6867,7 @@ impl Service {
|
||||
// How many candidate optimizations we will generate, before evaluating them for readniess: setting
|
||||
// this higher than the execution limit gives us a chance to execute some work even if the first
|
||||
// few optimizations we find are not ready.
|
||||
const MAX_OPTIMIZATIONS_PLAN_PER_PASS: usize = 64;
|
||||
const MAX_OPTIMIZATIONS_PLAN_PER_PASS: usize = 8;
|
||||
|
||||
let mut work = Vec::new();
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
@@ -7103,7 +7094,7 @@ impl Service {
|
||||
match attached_node
|
||||
.with_client_retries(
|
||||
|client| async move { client.tenant_heatmap_upload(tenant_shard_id).await },
|
||||
&self.config.pageserver_jwt_token,
|
||||
&self.config.jwt_token,
|
||||
3,
|
||||
10,
|
||||
SHORT_RECONCILE_TIMEOUT,
|
||||
@@ -7139,7 +7130,7 @@ impl Service {
|
||||
)
|
||||
.await
|
||||
},
|
||||
&self.config.pageserver_jwt_token,
|
||||
&self.config.jwt_token,
|
||||
3,
|
||||
10,
|
||||
SHORT_RECONCILE_TIMEOUT,
|
||||
@@ -7194,7 +7185,7 @@ impl Service {
|
||||
let request = request_ref.clone();
|
||||
client.top_tenant_shards(request.clone()).await
|
||||
},
|
||||
&self.config.pageserver_jwt_token,
|
||||
&self.config.jwt_token,
|
||||
3,
|
||||
3,
|
||||
Duration::from_secs(5),
|
||||
@@ -7367,7 +7358,7 @@ impl Service {
|
||||
match node
|
||||
.with_client_retries(
|
||||
|client| async move { client.tenant_secondary_status(tenant_shard_id).await },
|
||||
&self.config.pageserver_jwt_token,
|
||||
&self.config.jwt_token,
|
||||
1,
|
||||
3,
|
||||
Duration::from_millis(250),
|
||||
|
||||
@@ -481,14 +481,7 @@ pub(crate) enum ReconcileNeeded {
|
||||
/// spawned: wait for the existing reconciler rather than spawning a new one.
|
||||
WaitExisting(ReconcilerWaiter),
|
||||
/// shard needs reconciliation: call into [`TenantShard::spawn_reconciler`]
|
||||
Yes(ReconcileReason),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum ReconcileReason {
|
||||
ActiveNodesDirty,
|
||||
UnknownLocation,
|
||||
PendingComputeNotification,
|
||||
Yes,
|
||||
}
|
||||
|
||||
/// Pending modification to the observed state of a tenant shard.
|
||||
@@ -1348,18 +1341,12 @@ impl TenantShard {
|
||||
|
||||
let active_nodes_dirty = self.dirty(pageservers);
|
||||
|
||||
let reconcile_needed = match (
|
||||
active_nodes_dirty,
|
||||
dirty_observed,
|
||||
self.pending_compute_notification,
|
||||
) {
|
||||
(true, _, _) => ReconcileNeeded::Yes(ReconcileReason::ActiveNodesDirty),
|
||||
(_, true, _) => ReconcileNeeded::Yes(ReconcileReason::UnknownLocation),
|
||||
(_, _, true) => ReconcileNeeded::Yes(ReconcileReason::PendingComputeNotification),
|
||||
_ => ReconcileNeeded::No,
|
||||
};
|
||||
// Even if there is no pageserver work to be done, if we have a pending notification to computes,
|
||||
// wake up a reconciler to send it.
|
||||
let do_reconcile =
|
||||
active_nodes_dirty || dirty_observed || self.pending_compute_notification;
|
||||
|
||||
if matches!(reconcile_needed, ReconcileNeeded::No) {
|
||||
if !do_reconcile {
|
||||
tracing::debug!("Not dirty, no reconciliation needed.");
|
||||
return ReconcileNeeded::No;
|
||||
}
|
||||
@@ -1402,7 +1389,7 @@ impl TenantShard {
|
||||
}
|
||||
}
|
||||
|
||||
reconcile_needed
|
||||
ReconcileNeeded::Yes
|
||||
}
|
||||
|
||||
/// Ensure the sequence number is set to a value where waiting for this value will make us wait
|
||||
@@ -1492,7 +1479,6 @@ impl TenantShard {
|
||||
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
|
||||
pub(crate) fn spawn_reconciler(
|
||||
&mut self,
|
||||
reason: ReconcileReason,
|
||||
result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResultRequest>,
|
||||
pageservers: &Arc<HashMap<NodeId, Node>>,
|
||||
compute_hook: &Arc<ComputeHook>,
|
||||
@@ -1552,7 +1538,7 @@ impl TenantShard {
|
||||
let reconcile_seq = self.sequence;
|
||||
let long_reconcile_threshold = service_config.long_reconcile_threshold;
|
||||
|
||||
tracing::info!(seq=%reconcile_seq, "Spawning Reconciler ({reason:?})");
|
||||
tracing::info!(seq=%reconcile_seq, "Spawning Reconciler for sequence {}", self.sequence);
|
||||
let must_notify = self.pending_compute_notification;
|
||||
let reconciler_span = tracing::info_span!(parent: None, "reconciler", seq=%reconcile_seq,
|
||||
tenant_id=%reconciler.tenant_shard_id.tenant_id,
|
||||
|
||||
@@ -1,101 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
from fixtures.utils import USE_LFC
|
||||
|
||||
|
||||
@pytest.mark.timeout(600)
|
||||
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
|
||||
def test_lfc_prefetch(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test resizing the Local File Cache
|
||||
"""
|
||||
env = neon_simple_env
|
||||
endpoint = env.endpoints.create_start(
|
||||
"main",
|
||||
config_lines=[
|
||||
"neon.max_file_cache_size=1GB",
|
||||
"neon.file_cache_size_limit=1GB",
|
||||
"effective_io_concurrency=100",
|
||||
"shared_buffers=1MB",
|
||||
"enable_bitmapscan=off",
|
||||
"enable_seqscan=off",
|
||||
"autovacuum=off",
|
||||
],
|
||||
)
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
cur.execute("create extension neon")
|
||||
cur.execute("create table t(pk integer, sk integer, filler text default repeat('x',200))")
|
||||
cur.execute("set statement_timeout=0")
|
||||
cur.execute("select setseed(0.5)")
|
||||
cur.execute("insert into t values (generate_series(1,1000000),random()*1000000)")
|
||||
cur.execute("create index on t(sk)")
|
||||
cur.execute("vacuum t")
|
||||
|
||||
# reset LFC
|
||||
cur.execute("alter system set neon.file_cache_size_limit=0")
|
||||
cur.execute("select pg_reload_conf()")
|
||||
time.sleep(1)
|
||||
cur.execute("alter system set neon.file_cache_size_limit='1GB'")
|
||||
cur.execute("select pg_reload_conf()")
|
||||
|
||||
cur.execute(
|
||||
"explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 100000 and 200000 limit 100) s1"
|
||||
)
|
||||
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
|
||||
log.info(f"Unused prefetches: {prefetch_expired}")
|
||||
|
||||
cur.execute(
|
||||
"explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 200000 and 300000 limit 100) s2"
|
||||
)
|
||||
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
|
||||
log.info(f"Unused prefetches: {prefetch_expired}")
|
||||
|
||||
cur.execute(
|
||||
"explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 300000 and 400000 limit 100) s3"
|
||||
)
|
||||
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
|
||||
log.info(f"Unused prefetches: {prefetch_expired}")
|
||||
|
||||
cur.execute(
|
||||
"explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 100000 and 200000 limit 100) s4"
|
||||
)
|
||||
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
|
||||
log.info(f"Unused prefetches: {prefetch_expired}")
|
||||
|
||||
# if prefetch requests are not stored in LFC, we continue to sent unused prefetch request tyo PS
|
||||
assert prefetch_expired > 0
|
||||
|
||||
cur.execute("set neon.store_prefetch_result_in_lfc=on")
|
||||
|
||||
cur.execute(
|
||||
"explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 500000 and 600000 limit 100) s5"
|
||||
)
|
||||
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
|
||||
log.info(f"Unused prefetches: {prefetch_expired}")
|
||||
|
||||
cur.execute(
|
||||
"explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 600000 and 700000 limit 100) s6"
|
||||
)
|
||||
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
|
||||
log.info(f"Unused prefetches: {prefetch_expired}")
|
||||
|
||||
cur.execute(
|
||||
"explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 700000 and 800000 limit 100) s7"
|
||||
)
|
||||
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
|
||||
log.info(f"Unused prefetches: {prefetch_expired}")
|
||||
|
||||
cur.execute(
|
||||
"explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 500000 and 600000 limit 100) s8"
|
||||
)
|
||||
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
|
||||
log.info(f"Unused prefetches: {prefetch_expired}")
|
||||
|
||||
# No redundant prefethc requrests if prefetch results are stored in LFC
|
||||
assert prefetch_expired == 0
|
||||
@@ -3817,43 +3817,3 @@ def test_update_node_on_registration(neon_env_builder: NeonEnvBuilder):
|
||||
nodes = env.storage_controller.node_list()
|
||||
assert len(nodes) == 1
|
||||
assert nodes[0]["listen_https_port"] is None
|
||||
|
||||
|
||||
def test_storage_controller_location_conf_equivalence(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Validate that a storage controller restart with no shards in a transient state
|
||||
performs zero reconciliations at start-up. Implicitly, this means that the location
|
||||
configs returned by the pageserver are identical to the persisted state in the
|
||||
storage controller database.
|
||||
"""
|
||||
neon_env_builder.num_pageservers = 1
|
||||
neon_env_builder.storage_controller_config = {
|
||||
"start_as_candidate": False,
|
||||
}
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
tenant_id = TenantId.generate()
|
||||
env.storage_controller.tenant_create(
|
||||
tenant_id, shard_count=2, tenant_config={"pitr_interval": "1h2m3s"}
|
||||
)
|
||||
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
reconciles_before_restart = env.storage_controller.get_metric_value(
|
||||
"storage_controller_reconcile_complete_total", filter={"status": "ok"}
|
||||
)
|
||||
|
||||
assert reconciles_before_restart != 0
|
||||
|
||||
env.storage_controller.stop()
|
||||
env.storage_controller.start()
|
||||
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
reconciles_after_restart = env.storage_controller.get_metric_value(
|
||||
"storage_controller_reconcile_complete_total", filter={"status": "ok"}
|
||||
)
|
||||
|
||||
assert reconciles_after_restart == 0
|
||||
|
||||
Reference in New Issue
Block a user