refactor: refactor PgStore (#5309)

* refactor: refactor PgStore

* fix: election use bytea and txn use Serializable to avoid read unrepeatable (#4)

* fix: election use bytea as well

* fix: use Serializable to avoid read unrepeatable

* chore: remove unused error

* ci: enable pg kvbackend and sqlness

* ci: switch on pg_kvbackend feature

* fix: fix sqlness runner

* chore: add pg_kvbackend feature gate

* build(ci): add feature gate

* fix: add retry for `PgStore` txn

* fix: correct `SET_IDLE_SESSION_TIMEOUT`

---------

Co-authored-by: Yohan Wal <1035325592@qq.com>
Co-authored-by: CookiePieWw <profsyb@gmail.com>
This commit is contained in:
Weny Xu
2025-01-07 15:27:58 +08:00
committed by GitHub
parent bc2f05d949
commit 3d9df822ad
18 changed files with 807 additions and 665 deletions

View File

@@ -100,7 +100,7 @@ jobs:
- name: Build greptime binaries
shell: bash
# `cargo gc` will invoke `cargo build` with specified args
run: cargo gc -- --bin greptime --bin sqlness-runner
run: cargo gc -- --bin greptime --bin sqlness-runner --features pg_kvbackend
- name: Pack greptime binaries
shell: bash
run: |
@@ -261,7 +261,7 @@ jobs:
- name: Build greptime bianry
shell: bash
# `cargo gc` will invoke `cargo build` with specified args
run: cargo gc --profile ci -- --bin greptime
run: cargo gc --profile ci -- --bin greptime --features pg_kvbackend
- name: Pack greptime binary
shell: bash
run: |
@@ -573,6 +573,9 @@ jobs:
- name: "Remote WAL"
opts: "-w kafka -k 127.0.0.1:9092"
kafka: true
- name: "Pg Kvbackend"
opts: "--setup-pg"
kafka: false
timeout-minutes: 60
steps:
- uses: actions/checkout@v4

1
Cargo.lock generated
View File

@@ -2174,6 +2174,7 @@ dependencies = [
"async-recursion",
"async-stream",
"async-trait",
"backon",
"base64 0.21.7",
"bytes",
"chrono",

View File

@@ -99,6 +99,7 @@ arrow-schema = { version = "51.0", features = ["serde"] }
async-stream = "0.3"
async-trait = "0.1"
axum = { version = "0.6", features = ["headers"] }
backon = "1"
base64 = "0.21"
bigdecimal = "0.4.2"
bitflags = "2.4.1"

View File

@@ -78,7 +78,9 @@ impl BenchTableMetadataCommand {
#[cfg(feature = "pg_kvbackend")]
let kv_backend = if let Some(postgres_addr) = &self.postgres_addr {
info!("Using postgres as kv backend");
PgStore::with_url(postgres_addr, 128).await.unwrap()
PgStore::with_url(postgres_addr, "greptime_metakv", 128)
.await
.unwrap()
} else {
kv_backend
};

View File

@@ -6,7 +6,7 @@ license.workspace = true
[features]
testing = []
pg_kvbackend = ["dep:tokio-postgres"]
pg_kvbackend = ["dep:tokio-postgres", "dep:backon"]
[lints]
workspace = true
@@ -17,6 +17,7 @@ api.workspace = true
async-recursion = "1.0"
async-stream = "0.3"
async-trait.workspace = true
backon = { workspace = true, optional = true }
base64.workspace = true
bytes.workspace = true
chrono.workspace = true

View File

@@ -639,15 +639,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to parse {} from str to utf8", name))]
StrFromUtf8 {
name: String,
#[snafu(source)]
error: std::str::Utf8Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Value not exists"))]
ValueNotExist {
#[snafu(implicit)]
@@ -658,8 +649,9 @@ pub enum Error {
GetCache { source: Arc<Error> },
#[cfg(feature = "pg_kvbackend")]
#[snafu(display("Failed to execute via Postgres"))]
#[snafu(display("Failed to execute via Postgres, sql: {}", sql))]
PostgresExecution {
sql: String,
#[snafu(source)]
error: tokio_postgres::Error,
#[snafu(implicit)]
@@ -693,6 +685,13 @@ pub enum Error {
operation: String,
},
#[cfg(feature = "pg_kvbackend")]
#[snafu(display("Postgres transaction retry failed"))]
PostgresTransactionRetryFailed {
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Datanode table info not found, table id: {}, datanode id: {}",
table_id,
@@ -756,8 +755,7 @@ impl ErrorExt for Error {
| UnexpectedLogicalRouteTable { .. }
| ProcedureOutput { .. }
| FromUtf8 { .. }
| MetadataCorruption { .. }
| StrFromUtf8 { .. } => StatusCode::Unexpected,
| MetadataCorruption { .. } => StatusCode::Unexpected,
SendMessage { .. } | GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal,
@@ -807,7 +805,8 @@ impl ErrorExt for Error {
PostgresExecution { .. }
| CreatePostgresPool { .. }
| GetPostgresConnection { .. }
| PostgresTransaction { .. } => StatusCode::Internal,
| PostgresTransaction { .. }
| PostgresTransactionRetryFailed { .. } => StatusCode::Internal,
Error::DatanodeTableInfoNotFound { .. } => StatusCode::Internal,
}
}
@@ -818,6 +817,20 @@ impl ErrorExt for Error {
}
impl Error {
#[cfg(feature = "pg_kvbackend")]
/// Check if the error is a serialization error.
pub fn is_serialization_error(&self) -> bool {
match self {
Error::PostgresTransaction { error, .. } => {
error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE)
}
Error::PostgresExecution { error, .. } => {
error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE)
}
_ => false,
}
}
/// Creates a new [Error::RetryLater] error from source `err`.
pub fn retry_later<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
Error::RetryLater {

View File

@@ -591,7 +591,7 @@ mod tests {
#[tokio::test]
async fn test_range_2() {
if let Some(kv_backend) = build_kv_backend().await {
test_kv_range_2_with_prefix(kv_backend, b"range2/".to_vec()).await;
test_kv_range_2_with_prefix(&kv_backend, b"range2/".to_vec()).await;
}
}
@@ -618,7 +618,8 @@ mod tests {
if let Some(kv_backend) = build_kv_backend().await {
let prefix = b"deleteRange/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_delete_range_with_prefix(kv_backend, prefix.to_vec()).await;
test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
}
}
@@ -627,20 +628,20 @@ mod tests {
if let Some(kv_backend) = build_kv_backend().await {
let prefix = b"batchDelete/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_batch_delete_with_prefix(kv_backend, prefix.to_vec()).await;
test_kv_batch_delete_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
}
}
#[tokio::test]
async fn test_etcd_txn() {
if let Some(kv_backend) = build_kv_backend().await {
let kv_backend_ref = Arc::new(kv_backend);
test_txn_one_compare_op(kv_backend_ref.clone()).await;
text_txn_multi_compare_op(kv_backend_ref.clone()).await;
test_txn_compare_equal(kv_backend_ref.clone()).await;
test_txn_compare_greater(kv_backend_ref.clone()).await;
test_txn_compare_less(kv_backend_ref.clone()).await;
test_txn_compare_not_equal(kv_backend_ref).await;
test_txn_one_compare_op(&kv_backend).await;
text_txn_multi_compare_op(&kv_backend).await;
test_txn_compare_equal(&kv_backend).await;
test_txn_compare_greater(&kv_backend).await;
test_txn_compare_less(&kv_backend).await;
test_txn_compare_not_equal(&kv_backend).await;
}
}
}

View File

@@ -355,7 +355,7 @@ mod tests {
async fn test_range_2() {
let kv = MemoryKvBackend::<Error>::new();
test_kv_range_2(kv).await;
test_kv_range_2(&kv).await;
}
#[tokio::test]
@@ -376,24 +376,24 @@ mod tests {
async fn test_delete_range() {
let kv_backend = mock_mem_store_with_data().await;
test_kv_delete_range(kv_backend).await;
test_kv_delete_range(&kv_backend).await;
}
#[tokio::test]
async fn test_batch_delete() {
let kv_backend = mock_mem_store_with_data().await;
test_kv_batch_delete(kv_backend).await;
test_kv_batch_delete(&kv_backend).await;
}
#[tokio::test]
async fn test_memory_txn() {
let kv_backend = Arc::new(MemoryKvBackend::<Error>::new());
test_txn_one_compare_op(kv_backend.clone()).await;
text_txn_multi_compare_op(kv_backend.clone()).await;
test_txn_compare_equal(kv_backend.clone()).await;
test_txn_compare_greater(kv_backend.clone()).await;
test_txn_compare_less(kv_backend.clone()).await;
test_txn_compare_not_equal(kv_backend).await;
let kv_backend = MemoryKvBackend::<Error>::new();
test_txn_one_compare_op(&kv_backend).await;
text_txn_multi_compare_op(&kv_backend).await;
test_txn_compare_equal(&kv_backend).await;
test_txn_compare_greater(&kv_backend).await;
test_txn_compare_less(&kv_backend).await;
test_txn_compare_not_equal(&kv_backend).await;
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -61,14 +61,18 @@ pub async fn prepare_kv_with_prefix(kv_backend: &impl KvBackend, prefix: Vec<u8>
pub async fn unprepare_kv(kv_backend: &impl KvBackend, prefix: &[u8]) {
let range_end = util::get_prefix_end_key(prefix);
assert!(kv_backend
.delete_range(DeleteRangeRequest {
key: prefix.to_vec(),
range_end,
..Default::default()
})
.await
.is_ok());
assert!(
kv_backend
.delete_range(DeleteRangeRequest {
key: prefix.to_vec(),
range_end,
..Default::default()
})
.await
.is_ok(),
"prefix: {:?}",
std::str::from_utf8(prefix).unwrap()
);
}
pub async fn test_kv_put(kv_backend: &impl KvBackend) {
@@ -170,11 +174,11 @@ pub async fn test_kv_range_with_prefix(kv_backend: &impl KvBackend, prefix: Vec<
assert_eq!(b"val1", resp.kvs[0].value());
}
pub async fn test_kv_range_2(kv_backend: impl KvBackend) {
pub async fn test_kv_range_2(kv_backend: &impl KvBackend) {
test_kv_range_2_with_prefix(kv_backend, vec![]).await;
}
pub async fn test_kv_range_2_with_prefix(kv_backend: impl KvBackend, prefix: Vec<u8>) {
pub async fn test_kv_range_2_with_prefix(kv_backend: &impl KvBackend, prefix: Vec<u8>) {
let atest = [prefix.clone(), b"atest".to_vec()].concat();
let test = [prefix.clone(), b"test".to_vec()].concat();
@@ -348,11 +352,11 @@ pub async fn test_kv_compare_and_put_with_prefix(
assert!(resp.is_none());
}
pub async fn test_kv_delete_range(kv_backend: impl KvBackend) {
pub async fn test_kv_delete_range(kv_backend: &impl KvBackend) {
test_kv_delete_range_with_prefix(kv_backend, vec![]).await;
}
pub async fn test_kv_delete_range_with_prefix(kv_backend: impl KvBackend, prefix: Vec<u8>) {
pub async fn test_kv_delete_range_with_prefix(kv_backend: &impl KvBackend, prefix: Vec<u8>) {
let key3 = [prefix.clone(), b"key3".to_vec()].concat();
let req = DeleteRangeRequest {
key: key3.clone(),
@@ -403,11 +407,11 @@ pub async fn test_kv_delete_range_with_prefix(kv_backend: impl KvBackend, prefix
assert!(resp.kvs.is_empty());
}
pub async fn test_kv_batch_delete(kv_backend: impl KvBackend) {
pub async fn test_kv_batch_delete(kv_backend: &impl KvBackend) {
test_kv_batch_delete_with_prefix(kv_backend, vec![]).await;
}
pub async fn test_kv_batch_delete_with_prefix(kv_backend: impl KvBackend, prefix: Vec<u8>) {
pub async fn test_kv_batch_delete_with_prefix(kv_backend: &impl KvBackend, prefix: Vec<u8>) {
let key1 = [prefix.clone(), b"key1".to_vec()].concat();
let key100 = [prefix.clone(), b"key100".to_vec()].concat();
assert!(kv_backend.get(&key1).await.unwrap().is_some());
@@ -447,7 +451,7 @@ pub async fn test_kv_batch_delete_with_prefix(kv_backend: impl KvBackend, prefix
assert!(kv_backend.get(&key11).await.unwrap().is_none());
}
pub async fn test_txn_one_compare_op(kv_backend: KvBackendRef) {
pub async fn test_txn_one_compare_op(kv_backend: &impl KvBackend) {
let _ = kv_backend
.put(PutRequest {
key: vec![11],
@@ -472,7 +476,7 @@ pub async fn test_txn_one_compare_op(kv_backend: KvBackendRef) {
assert_eq!(txn_response.responses.len(), 1);
}
pub async fn text_txn_multi_compare_op(kv_backend: KvBackendRef) {
pub async fn text_txn_multi_compare_op(kv_backend: &impl KvBackend) {
for i in 1..3 {
let _ = kv_backend
.put(PutRequest {
@@ -502,7 +506,7 @@ pub async fn text_txn_multi_compare_op(kv_backend: KvBackendRef) {
assert_eq!(txn_response.responses.len(), 2);
}
pub async fn test_txn_compare_equal(kv_backend: KvBackendRef) {
pub async fn test_txn_compare_equal(kv_backend: &impl KvBackend) {
let key = vec![101u8];
kv_backend.delete(&key, false).await.unwrap();
@@ -531,7 +535,7 @@ pub async fn test_txn_compare_equal(kv_backend: KvBackendRef) {
assert!(txn_response.succeeded);
}
pub async fn test_txn_compare_greater(kv_backend: KvBackendRef) {
pub async fn test_txn_compare_greater(kv_backend: &impl KvBackend) {
let key = vec![102u8];
kv_backend.delete(&key, false).await.unwrap();
@@ -571,7 +575,7 @@ pub async fn test_txn_compare_greater(kv_backend: KvBackendRef) {
);
}
pub async fn test_txn_compare_less(kv_backend: KvBackendRef) {
pub async fn test_txn_compare_less(kv_backend: &impl KvBackend) {
let key = vec![103u8];
kv_backend.delete(&[3], false).await.unwrap();
@@ -611,7 +615,7 @@ pub async fn test_txn_compare_less(kv_backend: KvBackendRef) {
);
}
pub async fn test_txn_compare_not_equal(kv_backend: KvBackendRef) {
pub async fn test_txn_compare_not_equal(kv_backend: &impl KvBackend) {
let key = vec![104u8];
kv_backend.delete(&key, false).await.unwrap();

View File

@@ -266,7 +266,7 @@ impl PutRequest {
}
}
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Default)]
pub struct PutResponse {
pub prev_kv: Option<KeyValue>,
}
@@ -425,7 +425,7 @@ impl BatchPutRequest {
}
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
pub struct BatchPutResponse {
pub prev_kvs: Vec<KeyValue>,
}
@@ -509,7 +509,7 @@ impl BatchDeleteRequest {
}
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
pub struct BatchDeleteResponse {
pub prev_kvs: Vec<KeyValue>,
}
@@ -754,6 +754,19 @@ impl TryFrom<PbDeleteRangeResponse> for DeleteRangeResponse {
}
impl DeleteRangeResponse {
/// Creates a new [`DeleteRangeResponse`] with the given deleted count.
pub fn new(deleted: i64) -> Self {
Self {
deleted,
prev_kvs: vec![],
}
}
/// Creates a new [`DeleteRangeResponse`] with the given deleted count and previous key-value pairs.
pub fn with_prev_kvs(&mut self, prev_kvs: Vec<KeyValue>) {
self.prev_kvs = prev_kvs;
}
pub fn to_proto_resp(self, header: PbResponseHeader) -> PbDeleteRangeResponse {
PbDeleteRangeResponse {
header: Some(header),

View File

@@ -13,7 +13,7 @@ workspace = true
[dependencies]
async-stream.workspace = true
async-trait.workspace = true
backon = "1"
backon.workspace = true
common-base.workspace = true
common-error.workspace = true
common-macro.workspace = true

View File

@@ -644,7 +644,7 @@ mod tests {
let dir = create_temp_dir("range2");
let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
test_kv_range_2(backend).await;
test_kv_range_2(&backend).await;
}
#[tokio::test]
@@ -671,7 +671,7 @@ mod tests {
let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
prepare_kv(&backend).await;
test_kv_batch_delete(backend).await;
test_kv_batch_delete(&backend).await;
}
#[tokio::test]
@@ -680,7 +680,7 @@ mod tests {
let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
prepare_kv(&backend).await;
test_kv_delete_range(backend).await;
test_kv_delete_range(&backend).await;
}
#[tokio::test(flavor = "multi_thread")]

View File

@@ -229,7 +229,8 @@ pub async fn metasrv_builder(
#[cfg(feature = "pg_kvbackend")]
(None, BackendImpl::PostgresStore) => {
let pool = create_postgres_pool(opts).await?;
let kv_backend = PgStore::with_pg_pool(pool, opts.max_txn_ops)
// TODO(CookiePie): use table name from config.
let kv_backend = PgStore::with_pg_pool(pool, "greptime_metakv", opts.max_txn_ops)
.await
.context(error::KvBackendSnafu)?;
// Client for election should be created separately since we need a different session keep-alive idle time.

View File

@@ -23,6 +23,7 @@ use itertools::Itertools;
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::broadcast;
use tokio::time::MissedTickBehavior;
use tokio_postgres::types::ToSql;
use tokio_postgres::Client;
use crate::election::{
@@ -39,7 +40,7 @@ const CAMPAIGN: &str = "SELECT pg_try_advisory_lock({})";
const STEP_DOWN: &str = "SELECT pg_advisory_unlock({})";
// Currently the session timeout is longer than the leader lease time, so the leader lease may expire while the session is still alive.
// Either the leader reconnects and step down or the session expires and the lock is released.
const SET_IDLE_SESSION_TIMEOUT: &str = "SET idle_in_transaction_session_timeout = '10s';";
const SET_IDLE_SESSION_TIMEOUT: &str = "SET idle_session_timeout = '10s';";
// Separator between value and expire time.
const LEASE_SEP: &str = r#"||__metadata_lease_sep||"#;
@@ -50,7 +51,7 @@ WITH prev AS (
SELECT k, v FROM greptime_metakv WHERE k = $1
), insert AS (
INSERT INTO greptime_metakv
VALUES($1, $2 || $3 || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $4, 'YYYY-MM-DD HH24:MI:SS.MS'))
VALUES($1, convert_to($2 || $3 || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $4, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8'))
ON CONFLICT (k) DO NOTHING
)
@@ -61,7 +62,7 @@ SELECT k, v FROM prev;
const CAS_WITH_EXPIRE_TIME: &str = r#"
UPDATE greptime_metakv
SET k=$1,
v=$3 || $4 || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $5, 'YYYY-MM-DD HH24:MI:SS.MS')
v=convert_to($3 || $4 || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $5, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8')
WHERE
k=$1 AND v=$2
"#;
@@ -329,12 +330,13 @@ impl PgElection {
/// Returns value, expire time and current time. If `with_origin` is true, the origin string is also returned.
async fn get_value_with_lease(
&self,
key: &String,
key: &str,
with_origin: bool,
) -> Result<Option<(String, Timestamp, Timestamp, Option<String>)>> {
let key = key.as_bytes().to_vec();
let res = self
.client
.query(GET_WITH_CURRENT_TIMESTAMP, &[&key])
.query(GET_WITH_CURRENT_TIMESTAMP, &[&key as &(dyn ToSql + Sync)])
.await
.context(PostgresExecutionSnafu)?;
@@ -342,7 +344,7 @@ impl PgElection {
Ok(None)
} else {
// Safety: Checked if res is empty above.
let current_time_str = res[0].get(1);
let current_time_str = res[0].try_get(1).unwrap_or_default();
let current_time = match Timestamp::from_str(current_time_str, None) {
Ok(ts) => ts,
Err(_) => UnexpectedSnafu {
@@ -351,8 +353,9 @@ impl PgElection {
.fail()?,
};
// Safety: Checked if res is empty above.
let value_and_expire_time = res[0].get(0);
let (value, expire_time) = parse_value_and_expire_time(value_and_expire_time)?;
let value_and_expire_time =
String::from_utf8_lossy(res[0].try_get(0).unwrap_or_default());
let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
if with_origin {
Ok(Some((
@@ -372,17 +375,20 @@ impl PgElection {
&self,
key_prefix: &str,
) -> Result<(Vec<(String, Timestamp)>, Timestamp)> {
let key_prefix = format!("{}%", key_prefix);
let key_prefix = format!("{}%", key_prefix).as_bytes().to_vec();
let res = self
.client
.query(PREFIX_GET_WITH_CURRENT_TIMESTAMP, &[&key_prefix])
.query(
PREFIX_GET_WITH_CURRENT_TIMESTAMP,
&[(&key_prefix as &(dyn ToSql + Sync))],
)
.await
.context(PostgresExecutionSnafu)?;
let mut values_with_leases = vec![];
let mut current = Timestamp::default();
for row in res {
let current_time_str = row.get(1);
let current_time_str = row.try_get(1).unwrap_or_default();
current = match Timestamp::from_str(current_time_str, None) {
Ok(ts) => ts,
Err(_) => UnexpectedSnafu {
@@ -391,8 +397,8 @@ impl PgElection {
.fail()?,
};
let value_and_expire_time = row.get(0);
let (value, expire_time) = parse_value_and_expire_time(value_and_expire_time)?;
let value_and_expire_time = String::from_utf8_lossy(row.try_get(0).unwrap_or_default());
let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
values_with_leases.push((value, expire_time));
}
@@ -400,13 +406,15 @@ impl PgElection {
}
async fn update_value_with_lease(&self, key: &str, prev: &str, updated: &str) -> Result<()> {
let key = key.as_bytes().to_vec();
let prev = prev.as_bytes().to_vec();
let res = self
.client
.execute(
CAS_WITH_EXPIRE_TIME,
&[
&key,
&prev,
&key as &(dyn ToSql + Sync),
&prev as &(dyn ToSql + Sync),
&updated,
&LEASE_SEP,
&(self.candidate_lease_ttl_secs as f64),
@@ -418,7 +426,7 @@ impl PgElection {
ensure!(
res == 1,
UnexpectedSnafu {
violated: format!("Failed to update key: {}", key),
violated: format!("Failed to update key: {}", String::from_utf8_lossy(&key)),
}
);
@@ -432,12 +440,17 @@ impl PgElection {
value: &str,
lease_ttl_secs: u64,
) -> Result<bool> {
let key = key.as_bytes().to_vec();
let lease_ttl_secs = lease_ttl_secs as f64;
let params: Vec<&(dyn ToSql + Sync)> = vec![
&key as &(dyn ToSql + Sync),
&value as &(dyn ToSql + Sync),
&LEASE_SEP,
&lease_ttl_secs,
];
let res = self
.client
.query(
PUT_IF_NOT_EXISTS_WITH_EXPIRE_TIME,
&[&key, &value, &LEASE_SEP, &(lease_ttl_secs as f64)],
)
.query(PUT_IF_NOT_EXISTS_WITH_EXPIRE_TIME, &params)
.await
.context(PostgresExecutionSnafu)?;
Ok(res.is_empty())
@@ -445,10 +458,11 @@ impl PgElection {
/// Returns `true` if the deletion is successful.
/// Caution: Should only delete the key if the lease is expired.
async fn delete_value(&self, key: &String) -> Result<bool> {
async fn delete_value(&self, key: &str) -> Result<bool> {
let key = key.as_bytes().to_vec();
let res = self
.client
.query(POINT_DELETE, &[&key])
.query(POINT_DELETE, &[&key as &(dyn ToSql + Sync)])
.await
.context(PostgresExecutionSnafu)?;
@@ -635,6 +649,8 @@ mod tests {
use super::*;
use crate::error::PostgresExecutionSnafu;
const CREATE_TABLE: &str =
"CREATE TABLE IF NOT EXISTS greptime_metakv(k bytea PRIMARY KEY, v bytea);";
async fn create_postgres_client() -> Result<Client> {
let endpoint = env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default();
@@ -650,6 +666,7 @@ mod tests {
tokio::spawn(async move {
connection.await.context(PostgresExecutionSnafu).unwrap();
});
client.execute(CREATE_TABLE, &[]).await.unwrap();
Ok(client)
}
@@ -1152,6 +1169,7 @@ mod tests {
#[tokio::test]
async fn test_follower_action() {
common_telemetry::init_default_ut_logging();
let candidate_lease_ttl_secs = 5;
let store_key_prefix = uuid::Uuid::new_v4().to_string();

View File

@@ -70,6 +70,7 @@ pub enum WalConfig {
pub struct StoreConfig {
pub store_addrs: Vec<String>,
pub setup_etcd: bool,
pub setup_pg: bool,
}
#[derive(Clone)]
@@ -159,6 +160,7 @@ impl Env {
self.build_db();
self.setup_wal();
self.setup_etcd();
self.setup_pg();
let db_ctx = GreptimeDBContext::new(self.wal.clone(), self.store_config.clone());
@@ -383,7 +385,21 @@ impl Env {
"-c".to_string(),
self.generate_config_file(subcommand, db_ctx),
];
if db_ctx.store_config().store_addrs.is_empty() {
if db_ctx.store_config().setup_pg {
let client_ports = self
.store_config
.store_addrs
.iter()
.map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
.collect::<Vec<_>>();
let client_port = client_ports.first().unwrap_or(&5432);
let pg_server_addr = format!(
"postgresql://greptimedb:admin@127.0.0.1:{}/postgres",
client_port
);
args.extend(vec!["--backend".to_string(), "postgres-store".to_string()]);
args.extend(vec!["--store-addrs".to_string(), pg_server_addr]);
} else if db_ctx.store_config().store_addrs.is_empty() {
args.extend(vec!["--backend".to_string(), "memory-store".to_string()])
}
(args, vec![METASRV_ADDR.to_string()])
@@ -570,6 +586,20 @@ impl Env {
}
}
/// Setup PostgreSql if needed.
fn setup_pg(&self) {
if self.store_config.setup_pg {
let client_ports = self
.store_config
.store_addrs
.iter()
.map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
.collect::<Vec<_>>();
let client_port = client_ports.first().unwrap_or(&5432);
util::setup_pg(*client_port, None);
}
}
/// Generate config file to `/tmp/{subcommand}-{current_time}.toml`
fn generate_config_file(&self, subcommand: &str, db_ctx: &GreptimeDBContext) -> String {
let mut tt = TinyTemplate::new();

View File

@@ -106,6 +106,10 @@ struct Args {
/// Whether to setup etcd, by default it is false.
#[clap(long, default_value = "false")]
setup_etcd: bool,
/// Whether to setup pg, by default it is false.
#[clap(long, default_value = "false")]
setup_pg: bool,
}
#[tokio::main]
@@ -154,6 +158,7 @@ async fn main() {
let store = StoreConfig {
store_addrs: args.store_addrs.clone(),
setup_etcd: args.setup_etcd,
setup_pg: args.setup_pg,
};
let runner = Runner::new(

View File

@@ -305,6 +305,53 @@ pub fn stop_rm_etcd() {
}
}
/// Set up a PostgreSQL server in docker.
pub fn setup_pg(pg_port: u16, pg_version: Option<&str>) {
if std::process::Command::new("docker")
.args(["-v"])
.status()
.is_err()
{
panic!("Docker is not installed");
}
let pg_image = if let Some(pg_version) = pg_version {
format!("postgres:{pg_version}")
} else {
"postgres:latest".to_string()
};
let pg_password = "admin";
let pg_user = "greptimedb";
let mut arg_list = vec![];
arg_list.extend(["run", "-d"]);
let pg_password_env = format!("POSTGRES_PASSWORD={pg_password}");
let pg_user_env = format!("POSTGRES_USER={pg_user}");
let pg_port_forward = format!("{pg_port}:5432");
arg_list.extend(["-e", &pg_password_env, "-e", &pg_user_env]);
arg_list.extend(["-p", &pg_port_forward]);
arg_list.extend(["--name", "greptimedb_pg", &pg_image]);
let mut cmd = std::process::Command::new("docker");
cmd.args(arg_list);
println!("Starting PostgreSQL with command: {:?}", cmd);
let status = cmd.status();
if status.is_err() {
panic!("Failed to start PostgreSQL: {:?}", status);
} else if let Ok(status) = status {
if status.success() {
println!("Started PostgreSQL with port {}", pg_port);
} else {
panic!("Failed to start PostgreSQL: {:?}", status);
}
}
}
/// Get the dir of test cases. This function only works when the runner is run
/// under the project's dir because it depends on some envs set by cargo.
pub fn get_case_dir(case_dir: Option<PathBuf>) -> String {