feat: implement postgres kvbackend (#4421)

This commit is contained in:
Lanqing Yang
2024-08-14 15:49:32 -07:00
committed by GitHub
parent 2c3fccb516
commit 93be81c041
16 changed files with 801 additions and 8 deletions

View File

@@ -0,0 +1,30 @@
name: Setup PostgreSQL
description: Deploy PostgreSQL on Kubernetes
inputs:
postgres-replicas:
default: 1
description: "Number of PostgreSQL replicas"
namespace:
default: "postgres-namespace"
postgres-version:
default: "14.2"
description: "PostgreSQL version"
storage-size:
default: "1Gi"
description: "Storage size for PostgreSQL"
runs:
using: composite
steps:
- name: Install PostgreSQL
shell: bash
run: |
helm upgrade \
--install postgresql oci://registry-1.docker.io/bitnamicharts/postgresql \
--set replicaCount=${{ inputs.postgres-replicas }} \
--set image.tag=${{ inputs.postgres-version }} \
--set persistence.size=${{ inputs.storage-size }} \
--set postgresql.username=greptimedb \
--set postgresql.password=admin \
--create-namespace \
-n ${{ inputs.namespace }}

View File

@@ -338,6 +338,8 @@ jobs:
uses: ./.github/actions/setup-kafka-cluster
- name: Setup Etcd cluser
uses: ./.github/actions/setup-etcd-cluster
- name: Setup Postgres cluser
uses: ./.github/actions/setup-postgres-cluster
# Prepares for fuzz tests
- uses: arduino/setup-protoc@v3
with:
@@ -476,6 +478,8 @@ jobs:
uses: ./.github/actions/setup-kafka-cluster
- name: Setup Etcd cluser
uses: ./.github/actions/setup-etcd-cluster
- name: Setup Postgres cluser
uses: ./.github/actions/setup-postgres-cluster
# Prepares for fuzz tests
- uses: arduino/setup-protoc@v3
with:
@@ -702,6 +706,9 @@ jobs:
- name: Setup minio
working-directory: tests-integration/fixtures/minio
run: docker compose -f docker-compose-standalone.yml up -d --wait
- name: Setup postgres server
working-directory: tests-integration/fixtures/postgres
run: docker compose -f docker-compose-standalone.yml up -d --wait
- name: Run nextest cases
run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F pyo3_backend -F dashboard
env:
@@ -718,6 +725,7 @@ jobs:
GT_MINIO_REGION: us-west-2
GT_MINIO_ENDPOINT_URL: http://127.0.0.1:9000
GT_ETCD_ENDPOINTS: http://127.0.0.1:2379
GT_POSTGRES_ENDPOINTS: postgres://greptimedb:admin@127.0.0.1:5432/postgres
GT_KAFKA_ENDPOINTS: 127.0.0.1:9092
GT_KAFKA_SASL_ENDPOINTS: 127.0.0.1:9093
UNITTEST_LOG_DIR: "__unittest_logs"

3
Cargo.lock generated
View File

@@ -2094,6 +2094,7 @@ dependencies = [
"strum 0.25.0",
"table",
"tokio",
"tokio-postgres",
"tonic 0.11.0",
"typetag",
"uuid",
@@ -6129,6 +6130,7 @@ dependencies = [
"api",
"async-trait",
"chrono",
"clap 4.5.7",
"client",
"common-base",
"common-catalog",
@@ -6170,6 +6172,7 @@ dependencies = [
"store-api",
"table",
"tokio",
"tokio-postgres",
"tokio-stream",
"toml 0.8.14",
"tonic 0.11.0",

View File

@@ -174,6 +174,7 @@ sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "5
strum = { version = "0.25", features = ["derive"] }
tempfile = "3"
tokio = { version = "1.36", features = ["full"] }
tokio-postgres = "0.7"
tokio-stream = { version = "0.1" }
tokio-util = { version = "0.7", features = ["io-util", "compat"] }
toml = "0.8.8"

View File

@@ -22,6 +22,7 @@ use common_telemetry::info;
use common_telemetry::logging::TracingOptions;
use common_version::{short_version, version};
use meta_srv::bootstrap::MetasrvInstance;
use meta_srv::metasrv::BackendImpl;
use snafu::ResultExt;
use tracing_appender::non_blocking::WorkerGuard;
@@ -137,6 +138,9 @@ struct StartCommand {
/// The max operations per txn
#[clap(long)]
max_txn_ops: Option<usize>,
/// The database backend.
#[clap(long, value_enum)]
backend: Option<BackendImpl>,
}
impl StartCommand {
@@ -219,6 +223,12 @@ impl StartCommand {
opts.max_txn_ops = max_txn_ops;
}
if let Some(backend) = &self.backend {
opts.backend.clone_from(backend);
} else {
opts.backend = BackendImpl::default()
}
// Disable dashboard in metasrv.
opts.http.disable_dashboard = true;

View File

@@ -6,6 +6,7 @@ license.workspace = true
[features]
testing = []
pg_kvbackend = ["dep:tokio-postgres"]
[lints]
workspace = true
@@ -56,6 +57,7 @@ store-api.workspace = true
strum.workspace = true
table.workspace = true
tokio.workspace = true
tokio-postgres = { workspace = true, optional = true }
tonic.workspace = true
typetag = "0.2"

View File

@@ -643,6 +643,15 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to parse {} from str to utf8", name))]
StrFromUtf8 {
name: String,
#[snafu(source)]
error: std::str::Utf8Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Value not exists"))]
ValueNotExist {
#[snafu(implicit)]
@@ -651,6 +660,24 @@ pub enum Error {
#[snafu(display("Failed to get cache"))]
GetCache { source: Arc<Error> },
#[cfg(feature = "pg_kvbackend")]
#[snafu(display("Failed to execute via Postgres"))]
PostgresExecution {
#[snafu(source)]
error: tokio_postgres::Error,
#[snafu(implicit)]
location: Location,
},
#[cfg(feature = "pg_kvbackend")]
#[snafu(display("Failed to connect to Postgres"))]
ConnectPostgres {
#[snafu(source)]
error: tokio_postgres::Error,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -704,7 +731,8 @@ impl ErrorExt for Error {
| UnexpectedLogicalRouteTable { .. }
| ProcedureOutput { .. }
| FromUtf8 { .. }
| MetadataCorruption { .. } => StatusCode::Unexpected,
| MetadataCorruption { .. }
| StrFromUtf8 { .. } => StatusCode::Unexpected,
SendMessage { .. } | GetKvCache { .. } | CacheNotGet { .. } | RenameTable { .. } => {
StatusCode::Internal
@@ -749,6 +777,11 @@ impl ErrorExt for Error {
| ParseNum { .. }
| InvalidRole { .. }
| EmptyDdlTasks { .. } => StatusCode::InvalidArguments,
#[cfg(feature = "pg_kvbackend")]
PostgresExecution { .. } => StatusCode::Internal,
#[cfg(feature = "pg_kvbackend")]
ConnectPostgres { .. } => StatusCode::Internal,
}
}

View File

@@ -31,6 +31,8 @@ use crate::rpc::KeyValue;
pub mod chroot;
pub mod etcd;
pub mod memory;
#[cfg(feature = "pg_kvbackend")]
pub mod postgres;
pub mod test;
pub mod txn;

View File

@@ -0,0 +1,626 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::borrow::Cow;
use std::sync::Arc;
use snafu::ResultExt;
use tokio_postgres::types::ToSql;
use tokio_postgres::{Client, NoTls};
use super::{KvBackend, TxnService};
use crate::error::{ConnectPostgresSnafu, Error, PostgresExecutionSnafu, Result, StrFromUtf8Snafu};
use crate::kv_backend::txn::{Txn as KvTxn, TxnResponse as KvTxnResponse};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
};
use crate::rpc::KeyValue;
/// Posgres backend store for metasrv
pub struct PgStore {
// TODO: Consider using sqlx crate.
client: Client,
}
const EMPTY: &[u8] = &[0];
// TODO: allow users to configure metadata table name.
const METADKV_CREATION: &str =
"CREATE TABLE IF NOT EXISTS greptime_metakv(k varchar PRIMARY KEY, v varchar)";
const FULL_TABLE_SCAN: &str = "SELECT k, v FROM greptime_metakv $1 ORDER BY K";
const POINT_GET: &str = "SELECT k, v FROM greptime_metakv WHERE k = $1";
const PREFIX_SCAN: &str = "SELECT k, v FROM greptime_metakv WHERE k LIKE $1 ORDER BY K";
const RANGE_SCAN_LEFT_BOUNDED: &str = "SELECT k, v FROM greptime_metakv WHERE k >= $1 ORDER BY K";
const RANGE_SCAN_FULL_RANGE: &str =
"SELECT k, v FROM greptime_metakv WHERE k >= $1 AND K < $2 ORDER BY K";
const FULL_TABLE_DELETE: &str = "DELETE FROM greptime_metakv RETURNING k,v";
const POINT_DELETE: &str = "DELETE FROM greptime_metakv WHERE K = $1 RETURNING k,v;";
const PREFIX_DELETE: &str = "DELETE FROM greptime_metakv WHERE k LIKE $1 RETURNING k,v;";
const RANGE_DELETE_LEFT_BOUNDED: &str = "DELETE FROM greptime_metakv WHERE k >= $1 RETURNING k,v;";
const RANGE_DELETE_FULL_RANGE: &str =
"DELETE FROM greptime_metakv WHERE k >= $1 AND K < $2 RETURNING k,v;";
const CAS: &str = r#"
WITH prev AS (
SELECT k,v FROM greptime_metakv WHERE k = $1 AND v = $2
), update AS (
UPDATE greptime_metakv
SET k=$1,
v=$2
WHERE
k=$1 AND v=$3
)
SELECT k, v FROM prev;
"#;
const PUT_IF_NOT_EXISTS: &str = r#"
WITH prev AS (
select k,v from greptime_metakv where k = $1
), insert AS (
INSERT INTO greptime_metakv
VALUES ($1, $2)
ON CONFLICT (k) DO NOTHING
)
SELECT k, v FROM prev;"#;
impl PgStore {
/// Create pgstore impl of KvBackendRef from url.
pub async fn with_url(url: &str) -> Result<KvBackendRef> {
// TODO: support tls.
let (client, conn) = tokio_postgres::connect(url, NoTls)
.await
.context(ConnectPostgresSnafu)?;
tokio::spawn(async move { conn.await.context(ConnectPostgresSnafu) });
Self::with_pg_client(client).await
}
/// Create pgstore impl of KvBackendRef from tokio-postgres client.
pub async fn with_pg_client(client: Client) -> Result<KvBackendRef> {
// This step ensures the postgres metadata backend is ready to use.
// We check if greptime_metakv table exists, and we will create a new table
// if it does not exist.
client
.execute(METADKV_CREATION, &[])
.await
.context(PostgresExecutionSnafu)?;
Ok(Arc::new(Self { client }))
}
async fn put_if_not_exists(&self, key: &str, value: &str) -> Result<bool> {
let res = self
.client
.query(PUT_IF_NOT_EXISTS, &[&key, &value])
.await
.context(PostgresExecutionSnafu)?;
Ok(res.is_empty())
}
}
fn select_range_template(req: &RangeRequest) -> &str {
if req.range_end.is_empty() {
return POINT_GET;
}
if req.key == EMPTY && req.range_end == EMPTY {
FULL_TABLE_SCAN
} else if req.range_end == EMPTY {
RANGE_SCAN_LEFT_BOUNDED
} else if is_prefix_range(&req.key, &req.range_end) {
PREFIX_SCAN
} else {
RANGE_SCAN_FULL_RANGE
}
}
fn select_range_delete_template(req: &DeleteRangeRequest) -> &str {
if req.range_end.is_empty() {
return POINT_DELETE;
}
if req.key == EMPTY && req.range_end == EMPTY {
FULL_TABLE_DELETE
} else if req.range_end == EMPTY {
RANGE_DELETE_LEFT_BOUNDED
} else if is_prefix_range(&req.key, &req.range_end) {
PREFIX_DELETE
} else {
RANGE_DELETE_FULL_RANGE
}
}
// Generate dynamic parameterized sql for batch get.
fn generate_batch_get_query(key_len: usize) -> String {
let in_placeholders: Vec<String> = (1..=key_len).map(|i| format!("${}", i)).collect();
let in_clause = in_placeholders.join(", ");
format!(
"SELECT k, v FROM greptime_metakv WHERE k in ({});",
in_clause
)
}
// Generate dynamic parameterized sql for batch delete.
fn generate_batch_delete_query(key_len: usize) -> String {
let in_placeholders: Vec<String> = (1..=key_len).map(|i| format!("${}", i)).collect();
let in_clause = in_placeholders.join(", ");
format!(
"DELETE FROM greptime_metakv WHERE k in ({}) RETURNING k, v;",
in_clause
)
}
// Generate dynamic parameterized sql for batch upsert.
fn generate_batch_upsert_query(kv_len: usize) -> String {
let in_placeholders: Vec<String> = (1..=kv_len).map(|i| format!("${}", i)).collect();
let in_clause = in_placeholders.join(", ");
let mut param_index = kv_len + 1;
let mut values_placeholders = Vec::new();
for _ in 0..kv_len {
values_placeholders.push(format!("(${0}, ${1})", param_index, param_index + 1));
param_index += 2;
}
let values_clause = values_placeholders.join(", ");
format!(
r#"
WITH prev AS (
SELECT k,v FROM greptime_metakv WHERE k IN ({in_clause})
), update AS (
INSERT INTO greptime_metakv (k, v) VALUES
{values_clause}
ON CONFLICT (
k
) DO UPDATE SET
v = excluded.v
)
SELECT k, v FROM prev;
"#
)
}
// Trim null byte at the end and convert bytes to string.
fn process_bytes<'a>(data: &'a [u8], name: &str) -> Result<&'a str> {
let mut len = data.len();
// remove trailing null bytes to avoid error in postgres encoding.
while len > 0 && data[len - 1] == 0 {
len -= 1;
}
let res = std::str::from_utf8(&data[0..len]).context(StrFromUtf8Snafu { name })?;
Ok(res)
}
#[async_trait::async_trait]
impl KvBackend for PgStore {
fn name(&self) -> &str {
"Postgres"
}
fn as_any(&self) -> &dyn Any {
self
}
async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
let mut params = vec![];
let template = select_range_template(&req);
if req.key != EMPTY {
let key = process_bytes(&req.key, "rangeKey")?;
if template == PREFIX_SCAN {
let prefix = format!("{key}%");
params.push(Cow::Owned(prefix))
} else {
params.push(Cow::Borrowed(key))
}
}
if template == RANGE_SCAN_FULL_RANGE && req.range_end != EMPTY {
let range_end = process_bytes(&req.range_end, "rangeEnd")?;
params.push(Cow::Borrowed(range_end));
}
let limit = req.limit as usize;
let limit_cause = match limit > 0 {
true => format!(" LIMIT {};", limit + 1),
false => ";".to_string(),
};
let template = format!("{}{}", template, limit_cause);
let params: Vec<&(dyn ToSql + Sync)> = params
.iter()
.map(|x| match x {
Cow::Borrowed(borrowed) => borrowed as &(dyn ToSql + Sync),
Cow::Owned(owned) => owned as &(dyn ToSql + Sync),
})
.collect();
let res = self
.client
.query(&template, &params)
.await
.context(PostgresExecutionSnafu)?;
let kvs: Vec<KeyValue> = res
.into_iter()
.map(|r| {
let key: String = r.get(0);
if req.keys_only {
return KeyValue {
key: key.into_bytes(),
value: vec![],
};
}
let value: String = r.get(1);
KeyValue {
key: key.into_bytes(),
value: value.into_bytes(),
}
})
.collect();
if limit == 0 || limit > kvs.len() {
return Ok(RangeResponse { kvs, more: false });
}
let (filtered_kvs, _) = kvs.split_at(limit);
Ok(RangeResponse {
kvs: filtered_kvs.to_vec(),
more: kvs.len() > limit,
})
}
async fn put(&self, req: PutRequest) -> Result<PutResponse> {
let kv = KeyValue {
key: req.key,
value: req.value,
};
let mut res = self
.batch_put(BatchPutRequest {
kvs: vec![kv],
prev_kv: req.prev_kv,
})
.await?;
if !res.prev_kvs.is_empty() {
return Ok(PutResponse {
prev_kv: Some(res.prev_kvs.remove(0)),
});
}
Ok(PutResponse { prev_kv: None })
}
async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
let mut in_params = Vec::with_capacity(req.kvs.len());
let mut values_params = Vec::with_capacity(req.kvs.len() * 2);
for kv in &req.kvs {
let processed_key = process_bytes(&kv.key, "BatchPutRequestKey")?;
in_params.push(processed_key);
let processed_value = process_bytes(&kv.value, "BatchPutRequestValue")?;
values_params.push(processed_key);
values_params.push(processed_value);
}
in_params.extend(values_params);
let params: Vec<&(dyn ToSql + Sync)> =
in_params.iter().map(|x| x as &(dyn ToSql + Sync)).collect();
let query = generate_batch_upsert_query(req.kvs.len());
let res = self
.client
.query(&query, &params)
.await
.context(PostgresExecutionSnafu)?;
if req.prev_kv {
let kvs: Vec<KeyValue> = res
.into_iter()
.map(|r| {
let key: String = r.get(0);
let value: String = r.get(1);
KeyValue {
key: key.into_bytes(),
value: value.into_bytes(),
}
})
.collect();
if !kvs.is_empty() {
return Ok(BatchPutResponse { prev_kvs: kvs });
}
}
Ok(BatchPutResponse { prev_kvs: vec![] })
}
async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
if req.keys.is_empty() {
return Ok(BatchGetResponse { kvs: vec![] });
}
let query = generate_batch_get_query(req.keys.len());
let value_params = req
.keys
.iter()
.map(|k| process_bytes(k, "BatchGetRequestKey"))
.collect::<Result<Vec<&str>>>()?;
let params: Vec<&(dyn ToSql + Sync)> = value_params
.iter()
.map(|x| x as &(dyn ToSql + Sync))
.collect();
let res = self
.client
.query(&query, &params)
.await
.context(PostgresExecutionSnafu)?;
let kvs: Vec<KeyValue> = res
.into_iter()
.map(|r| {
let key: String = r.get(0);
let value: String = r.get(1);
KeyValue {
key: key.into_bytes(),
value: value.into_bytes(),
}
})
.collect();
Ok(BatchGetResponse { kvs })
}
async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
let mut params = vec![];
let template = select_range_delete_template(&req);
if req.key != EMPTY {
let key = process_bytes(&req.key, "deleteRangeKey")?;
if template == PREFIX_DELETE {
let prefix = format!("{key}%");
params.push(Cow::Owned(prefix));
} else {
params.push(Cow::Borrowed(key));
}
}
if template == RANGE_DELETE_FULL_RANGE && req.range_end != EMPTY {
let range_end = process_bytes(&req.range_end, "deleteRangeEnd")?;
params.push(Cow::Borrowed(range_end));
}
let params: Vec<&(dyn ToSql + Sync)> = params
.iter()
.map(|x| match x {
Cow::Borrowed(borrowed) => borrowed as &(dyn ToSql + Sync),
Cow::Owned(owned) => owned as &(dyn ToSql + Sync),
})
.collect();
let res = self
.client
.query(template, &params)
.await
.context(PostgresExecutionSnafu)?;
let deleted = res.len() as i64;
if !req.prev_kv {
return Ok({
DeleteRangeResponse {
deleted,
prev_kvs: vec![],
}
});
}
let kvs: Vec<KeyValue> = res
.into_iter()
.map(|r| {
let key: String = r.get(0);
let value: String = r.get(1);
KeyValue {
key: key.into_bytes(),
value: value.into_bytes(),
}
})
.collect();
Ok(DeleteRangeResponse {
deleted,
prev_kvs: kvs,
})
}
async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
if req.keys.is_empty() {
return Ok(BatchDeleteResponse { prev_kvs: vec![] });
}
let query = generate_batch_delete_query(req.keys.len());
let value_params = req
.keys
.iter()
.map(|k| process_bytes(k, "BatchDeleteRequestKey"))
.collect::<Result<Vec<&str>>>()?;
let params: Vec<&(dyn ToSql + Sync)> = value_params
.iter()
.map(|x| x as &(dyn ToSql + Sync))
.collect();
let res = self
.client
.query(&query, &params)
.await
.context(PostgresExecutionSnafu)?;
if !req.prev_kv {
return Ok(BatchDeleteResponse { prev_kvs: vec![] });
}
let kvs: Vec<KeyValue> = res
.into_iter()
.map(|r| {
let key: String = r.get(0);
let value: String = r.get(1);
KeyValue {
key: key.into_bytes(),
value: value.into_bytes(),
}
})
.collect();
Ok(BatchDeleteResponse { prev_kvs: kvs })
}
async fn compare_and_put(&self, req: CompareAndPutRequest) -> Result<CompareAndPutResponse> {
let key = process_bytes(&req.key, "CASKey")?;
let value = process_bytes(&req.value, "CASValue")?;
if req.expect.is_empty() {
let put_res = self.put_if_not_exists(key, value).await?;
return Ok(CompareAndPutResponse {
success: put_res,
prev_kv: None,
});
}
let expect = process_bytes(&req.expect, "CASExpect")?;
let res = self
.client
.query(CAS, &[&key, &value, &expect])
.await
.context(PostgresExecutionSnafu)?;
match res.is_empty() {
true => Ok(CompareAndPutResponse {
success: false,
prev_kv: None,
}),
false => {
let mut kvs: Vec<KeyValue> = res
.into_iter()
.map(|r| {
let key: String = r.get(0);
let value: String = r.get(1);
KeyValue {
key: key.into_bytes(),
value: value.into_bytes(),
}
})
.collect();
Ok(CompareAndPutResponse {
success: true,
prev_kv: Some(kvs.remove(0)),
})
}
}
}
}
#[async_trait::async_trait]
impl TxnService for PgStore {
type Error = Error;
async fn txn(&self, _txn: KvTxn) -> Result<KvTxnResponse> {
// TODO: implement txn for pg kv backend.
unimplemented!()
}
fn max_txn_ops(&self) -> usize {
unreachable!("postgres backend does not support max_txn_ops!")
}
}
fn is_prefix_range(start: &[u8], end: &[u8]) -> bool {
if start.len() != end.len() {
return false;
}
let l = start.len();
let same_prefix = start[0..l - 1] == end[0..l - 1];
if let (Some(rhs), Some(lhs)) = (start.last(), end.last()) {
return same_prefix && (*rhs + 1) == *lhs;
}
false
}
#[cfg(test)]
mod tests {
use super::*;
use crate::kv_backend::test::{
prepare_kv_with_prefix, test_kv_batch_delete_with_prefix, test_kv_batch_get_with_prefix,
test_kv_compare_and_put_with_prefix, test_kv_delete_range_with_prefix,
test_kv_put_with_prefix, test_kv_range_2_with_prefix, test_kv_range_with_prefix,
unprepare_kv,
};
async fn build_pg_kv_backend() -> Option<PgStore> {
let endpoints = std::env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default();
if endpoints.is_empty() {
return None;
}
let (client, connection) = tokio_postgres::connect(&endpoints, NoTls).await.unwrap();
tokio::spawn(connection);
let _ = client.execute(METADKV_CREATION, &[]).await;
Some(PgStore { client })
}
#[tokio::test]
async fn test_put() {
if let Some(kv_backend) = build_pg_kv_backend().await {
let prefix = b"put/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_put_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
}
}
#[tokio::test]
async fn test_range() {
if let Some(kv_backend) = build_pg_kv_backend().await {
let prefix = b"range/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_range_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
}
}
#[tokio::test]
async fn test_range_2() {
if let Some(kv_backend) = build_pg_kv_backend().await {
test_kv_range_2_with_prefix(kv_backend, b"range2/".to_vec()).await;
}
}
#[tokio::test]
async fn test_batch_get() {
if let Some(kv_backend) = build_pg_kv_backend().await {
let prefix = b"batchGet/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_batch_get_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_compare_and_put() {
if let Some(kv_backend) = build_pg_kv_backend().await {
let kv_backend = Arc::new(kv_backend);
test_kv_compare_and_put_with_prefix(kv_backend, b"compareAndPut/".to_vec()).await;
}
}
#[tokio::test]
async fn test_delete_range() {
if let Some(kv_backend) = build_pg_kv_backend().await {
let prefix = b"deleteRange/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_delete_range_with_prefix(kv_backend, prefix.to_vec()).await;
}
}
#[tokio::test]
async fn test_batch_delete() {
if let Some(kv_backend) = build_pg_kv_backend().await {
let prefix = b"batchDelete/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_batch_delete_with_prefix(kv_backend, prefix.to_vec()).await;
}
}
}

View File

@@ -6,6 +6,7 @@ license.workspace = true
[features]
mock = []
pg_kvbackend = ["dep:tokio-postgres"]
[lints]
workspace = true
@@ -13,6 +14,7 @@ workspace = true
[dependencies]
api.workspace = true
async-trait = "0.1"
clap.workspace = true
client.workspace = true
common-base.workspace = true
common-catalog.workspace = true
@@ -52,6 +54,7 @@ snafu.workspace = true
store-api.workspace = true
table.workspace = true
tokio.workspace = true
tokio-postgres = { workspace = true, optional = true }
tokio-stream = { workspace = true, features = ["net"] }
toml.workspace = true
tonic.workspace = true

View File

@@ -24,6 +24,8 @@ use common_config::Configurable;
use common_meta::kv_backend::chroot::ChrootKvBackend;
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::memory::MemoryKvBackend;
#[cfg(feature = "pg_kvbackend")]
use common_meta::kv_backend::postgres::PgStore;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
use common_telemetry::info;
use etcd_client::Client;
@@ -33,17 +35,23 @@ use servers::export_metrics::ExportMetricsTask;
use servers::http::{HttpServer, HttpServerBuilder};
use servers::metrics_handler::MetricsHandler;
use servers::server::Server;
#[cfg(feature = "pg_kvbackend")]
use snafu::OptionExt;
use snafu::ResultExt;
use tokio::net::TcpListener;
use tokio::sync::mpsc::{self, Receiver, Sender};
#[cfg(feature = "pg_kvbackend")]
use tokio_postgres::NoTls;
use tonic::transport::server::{Router, TcpIncoming};
use crate::election::etcd::EtcdElection;
#[cfg(feature = "pg_kvbackend")]
use crate::error::InvalidArgumentsSnafu;
use crate::error::{InitExportMetricsTaskSnafu, TomlFormatSnafu};
use crate::lock::etcd::EtcdLock;
use crate::lock::memory::MemLock;
use crate::metasrv::builder::MetasrvBuilder;
use crate::metasrv::{Metasrv, MetasrvOptions, SelectorRef};
use crate::metasrv::{BackendImpl, Metasrv, MetasrvOptions, SelectorRef};
use crate::selector::lease_based::LeaseBasedSelector;
use crate::selector::load_based::LoadBasedSelector;
use crate::selector::round_robin::RoundRobinSelector;
@@ -185,14 +193,14 @@ pub async fn metasrv_builder(
plugins: Plugins,
kv_backend: Option<KvBackendRef>,
) -> Result<MetasrvBuilder> {
let (kv_backend, election, lock) = match (kv_backend, opts.use_memory_store) {
let (kv_backend, election, lock) = match (kv_backend, &opts.backend) {
(Some(kv_backend), _) => (kv_backend, None, Some(Arc::new(MemLock::default()) as _)),
(None, true) => (
(None, BackendImpl::MemoryStore) => (
Arc::new(MemoryKvBackend::new()) as _,
None,
Some(Arc::new(MemLock::default()) as _),
),
(None, false) => {
(None, BackendImpl::EtcdStore) => {
let etcd_client = create_etcd_client(opts).await?;
let kv_backend = {
let etcd_backend =
@@ -222,6 +230,13 @@ pub async fn metasrv_builder(
)?),
)
}
#[cfg(feature = "pg_kvbackend")]
(None, BackendImpl::PostgresStore) => {
let pg_client = create_postgres_client(opts).await?;
let kv_backend = PgStore::with_pg_client(pg_client).await.unwrap();
// TODO: implement locking and leader election for pg backend.
(kv_backend, None, None)
}
};
let in_memory = Arc::new(MemoryKvBackend::new()) as ResettableKvBackendRef;
@@ -253,3 +268,14 @@ async fn create_etcd_client(opts: &MetasrvOptions) -> Result<Client> {
.await
.context(error::ConnectEtcdSnafu)
}
#[cfg(feature = "pg_kvbackend")]
async fn create_postgres_client(opts: &MetasrvOptions) -> Result<tokio_postgres::Client> {
let postgres_url = opts.store_addrs.first().context(InvalidArgumentsSnafu {
err_msg: "empty store addrs",
})?;
let (client, _) = tokio_postgres::connect(postgres_url, NoTls)
.await
.context(error::ConnectPostgresSnafu)?;
Ok(client)
}

View File

@@ -865,6 +865,22 @@ pub enum Error {
location: Location,
source: BoxedError,
},
#[cfg(feature = "pg_kvbackend")]
#[snafu(display("Failed to execute via postgres"))]
PostgresExecution {
#[snafu(implicit)]
location: Location,
},
#[cfg(feature = "pg_kvbackend")]
#[snafu(display("Failed to connect to PostgresSQL"))]
ConnectPostgres {
#[snafu(source)]
error: tokio_postgres::Error,
#[snafu(implicit)]
location: Location,
},
}
impl Error {
@@ -1003,6 +1019,10 @@ impl ErrorExt for Error {
Error::Other { source, .. } => source.status_code(),
Error::LookupPeer { source, .. } => source.status_code(),
#[cfg(feature = "pg_kvbackend")]
Error::ConnectPostgres { .. } => StatusCode::Internal,
#[cfg(feature = "pg_kvbackend")]
Error::PostgresExecution { .. } => StatusCode::Internal,
}
}

View File

@@ -19,6 +19,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use clap::ValueEnum;
use common_base::readable_size::ReadableSize;
use common_base::Plugins;
use common_config::Configurable;
@@ -65,6 +66,19 @@ pub const TABLE_ID_SEQ: &str = "table_id";
pub const FLOW_ID_SEQ: &str = "flow_id";
pub const METASRV_HOME: &str = "/tmp/metasrv";
// The datastores that implements metadata kvbackend.
#[derive(Clone, Debug, PartialEq, Serialize, Default, Deserialize, ValueEnum)]
pub enum BackendImpl {
// Etcd as metadata storage.
#[default]
EtcdStore,
// In memory metadata storage - mostly used for testing.
MemoryStore,
#[cfg(feature = "pg_kvbackend")]
// Postgres as metadata storage.
PostgresStore,
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct MetasrvOptions {
@@ -114,6 +128,8 @@ pub struct MetasrvOptions {
pub max_txn_ops: usize,
/// The tracing options.
pub tracing: TracingOptions,
/// The datastore for kv metadata.
pub backend: BackendImpl,
}
impl Default for MetasrvOptions {
@@ -146,6 +162,7 @@ impl Default for MetasrvOptions {
store_key_prefix: String::new(),
max_txn_ops: 128,
tracing: TracingOptions::default(),
backend: BackendImpl::EtcdStore,
}
}
}

View File

@@ -90,5 +90,5 @@ rand.workspace = true
script.workspace = true
session = { workspace = true, features = ["testing"] }
store-api.workspace = true
tokio-postgres = "0.7"
tokio-postgres = { workspace = true }
url = "2.3"

View File

@@ -0,0 +1,12 @@
version: '3.9'
services:
postgres:
image: postgres:14-alpine
ports:
- 5432:5432
volumes:
- ~/apps/postgres:/var/lib/postgresql/data
environment:
- POSTGRES_USER=greptimedb
- POSTGRES_DB=postgres
- POSTGRES_PASSWORD=admin

View File

@@ -245,8 +245,8 @@ impl Env {
DEFAULT_LOG_LEVEL.to_string(),
subcommand.to_string(),
"start".to_string(),
"--use-memory-store".to_string(),
"true".to_string(),
"--backend".to_string(),
"memory-store".to_string(),
"--enable-region-failover".to_string(),
"false".to_string(),
"--http-addr=127.0.0.1:5002".to_string(),