Compare commits

..

19 Commits

Author SHA1 Message Date
discord9
77e340270e Downgrade rust-toolchain.toml
DO NOT MERGE
2025-04-29 17:52:43 +08:00
Weny Xu
06e8d46ba9 feat: implement batch region opening in metric engine (#6017)
feat: implement batch open metric regions
2025-04-29 09:05:27 +00:00
zyy17
89661c0626 ci: fix the bugs of release-dev-builder-images and add update-dev-builder-image-tag (#6009)
* fix: the dev-builder release job is not triggered by merged event

* ci: add update-dev-builder-image-tag
2025-04-29 06:25:15 +00:00
Weny Xu
a3ae2d7b52 feat: flush leader region before downgrading (#5995)
* feat: flush leader region before downgrading

* test: add unit tests

* chore: apply suggestions from CR
2025-04-29 03:28:00 +00:00
Ruihang Xia
789f585a7f fix: disable recursion limit in prost (#6010)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-04-28 17:21:49 +00:00
jeremyhi
133f404547 fix: sanitize_connection_string (#6012) 2025-04-28 13:56:26 +00:00
discord9
bdd44fd7ec chore: only retry when retry-able in flow (#5987)
* chore: only retry when retry-able

* chore: revert dbg change

* refactor: per review

* fix: check for available frontend first

* docs: more explain&longer timeout&feat: more retry at every level&try send select 1

* fix: use `sql` method for "SELECT 1"

* fix: also put recover flows in spawned task and a dead loop

* test: update transient error in flow rebuild test

* chore: sleep after sqlness sleep

* chore: add a warning

* chore: wait even more time after reboot
2025-04-28 09:49:49 +00:00
Weny Xu
13ac4d5048 fix: only consider the datanode that reports the failure (#6004)
* fix: only consider the datanode that reports the failure

* chore: fix clippy
2025-04-28 06:08:02 +00:00
dennis zhuang
c6448a6ccc feat: remove own greatest fn (#5994) 2025-04-28 05:27:34 +00:00
Yingwen
86aae6733d fix: prune primary key with multiple columns may use default value as statistics (#5996)
* test: incorrect test result when filtering pk with multiple columns

* fix: prune non first tag correctly

Distinguish no column and no stats and only use default value when no
column

* test: update test result

* refactor: rename test file

* test: add test for null filter

* fix: use StatValues for null counts

* test: drop table

* test: fix unstable flow test
2025-04-28 04:53:30 +00:00
liyang
ed1ce8438f ci: update dev-builder image version to 2025-04-15-1a517ec8-202504280… (#6003)
ci: update dev-builder image version to 2025-04-15-1a517ec8-20250428023155
2025-04-28 03:34:31 +00:00
fys
4b921b8425 chore: make txn_helper pub (#6002)
chore: make txn_helper from pub(crate) to pub
2025-04-28 02:52:39 +00:00
Lei, HUANG
1a517ec8ac fix: check if memtable is empty by stats (#5989)
fix/checking-memtable-empty-and-stats:
 - **Refactor timestamp updates**: Simplified timestamp range updates in `PartitionTreeMemtable` and `TimeSeriesMemtable` by replacing `update_timestamp_range` with `fetch_max` and `fetch_min` methods for `max_timestamp` and `min_timestamp`.
   - Affected files: `partition_tree.rs`, `time_series.rs`

 - **Remove unused code**: Deleted the `update_timestamp_range` method from `WriteMetrics` and removed unnecessary imports.
   - Affected file: `stats.rs`

 - **Optimize memtable filtering**: Streamlined the check for empty memtables in `ScanRegion` by directly using `time_range`.
   - Affected file: `scan_region.rs`
2025-04-28 01:57:17 +00:00
discord9
21044c7339 feat: uddsketch_merge udaf (#5992) 2025-04-27 12:43:21 +00:00
Ning Sun
8e1ec2a201 chore: update nix for new toolchain (#5991) 2025-04-27 11:40:44 +00:00
Weny Xu
5ed0a095b6 feat: introduce RegionStatAwareSelector trait (#5990)
* feat: introduce `RegionStatAwareSelector`

* feat: exclude all failed datanodes

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* chore: apply suggestions from CR
2025-04-27 11:22:39 +00:00
shuiyisong
3c943be189 chore: update rust toolchain (#5818)
* chore: update nightly version

* chore: sort lint lines

* chore: minor fix

* chore: update nix

* chore: update toolchain to 2024-04-14

* chore: update toolchain to 2024-04-15

* chore: remove unnecessory test

* chore: do not assert oid in sqlness test

* chore: fix margin issue

* chore: fix cr issues

* chore: fix cr issues

---------

Co-authored-by: Ning Sun <sunning@greptime.com>
2025-04-27 09:02:36 +00:00
Ning Sun
eeba466717 ci: read next release version from toml by default (#5986)
* ci: read next release version from toml by default

* ci: send error message to stderr

* ci: take the first version only
2025-04-27 04:43:44 +00:00
Zhenchi
2ff54486d3 chore: bump main branch version to 0.15 (#5984)
Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2025-04-27 01:39:44 +00:00
152 changed files with 2506 additions and 3421 deletions

View File

@@ -10,17 +10,17 @@ set -e
function create_version() {
# Read from envrionment variables.
if [ -z "$GITHUB_EVENT_NAME" ]; then
echo "GITHUB_EVENT_NAME is empty"
echo "GITHUB_EVENT_NAME is empty" >&2
exit 1
fi
if [ -z "$NEXT_RELEASE_VERSION" ]; then
echo "NEXT_RELEASE_VERSION is empty"
exit 1
echo "NEXT_RELEASE_VERSION is empty, use version from Cargo.toml" >&2
export NEXT_RELEASE_VERSION=$(grep '^version = ' Cargo.toml | cut -d '"' -f 2 | head -n 1)
fi
if [ -z "$NIGHTLY_RELEASE_PREFIX" ]; then
echo "NIGHTLY_RELEASE_PREFIX is empty"
echo "NIGHTLY_RELEASE_PREFIX is empty" >&2
exit 1
fi
@@ -35,7 +35,7 @@ function create_version() {
# It will be like 'dev-2023080819-f0e7216c'.
if [ "$NEXT_RELEASE_VERSION" = dev ]; then
if [ -z "$COMMIT_SHA" ]; then
echo "COMMIT_SHA is empty in dev build"
echo "COMMIT_SHA is empty in dev build" >&2
exit 1
fi
echo "dev-$(date "+%Y%m%d-%s")-$(echo "$COMMIT_SHA" | cut -c1-8)"
@@ -45,7 +45,7 @@ function create_version() {
# Note: Only output 'version=xxx' to stdout when everything is ok, so that it can be used in GitHub Actions Outputs.
if [ "$GITHUB_EVENT_NAME" = push ]; then
if [ -z "$GITHUB_REF_NAME" ]; then
echo "GITHUB_REF_NAME is empty in push event"
echo "GITHUB_REF_NAME is empty in push event" >&2
exit 1
fi
echo "$GITHUB_REF_NAME"
@@ -54,7 +54,7 @@ function create_version() {
elif [ "$GITHUB_EVENT_NAME" = schedule ]; then
echo "$NEXT_RELEASE_VERSION-$NIGHTLY_RELEASE_PREFIX-$(date "+%Y%m%d")"
else
echo "Unsupported GITHUB_EVENT_NAME: $GITHUB_EVENT_NAME"
echo "Unsupported GITHUB_EVENT_NAME: $GITHUB_EVENT_NAME" >&2
exit 1
fi
}

View File

@@ -90,8 +90,6 @@ env:
# The scheduled version is '${{ env.NEXT_RELEASE_VERSION }}-nightly-YYYYMMDD', like v0.2.0-nigthly-20230313;
NIGHTLY_RELEASE_PREFIX: nightly
# Note: The NEXT_RELEASE_VERSION should be modified manually by every formal release.
NEXT_RELEASE_VERSION: v0.14.0
jobs:
allocate-runners:
@@ -135,7 +133,6 @@ jobs:
env:
GITHUB_EVENT_NAME: ${{ github.event_name }}
GITHUB_REF_NAME: ${{ github.ref_name }}
NEXT_RELEASE_VERSION: ${{ env.NEXT_RELEASE_VERSION }}
NIGHTLY_RELEASE_PREFIX: ${{ env.NIGHTLY_RELEASE_PREFIX }}
- name: Allocate linux-amd64 runner

827
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -68,15 +68,16 @@ members = [
resolver = "2"
[workspace.package]
version = "0.14.4"
version = "0.15.0"
edition = "2021"
license = "Apache-2.0"
[workspace.lints]
clippy.print_stdout = "warn"
clippy.print_stderr = "warn"
clippy.dbg_macro = "warn"
clippy.implicit_clone = "warn"
clippy.result_large_err = "allow"
clippy.large_enum_variant = "allow"
clippy.doc_overindented_list_items = "allow"
rust.unknown_lints = "deny"
rust.unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }
@@ -129,7 +130,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "4d4136692fe7fbbd509ebc8c902f6afcc0ce61e4" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "e82b0158cd38d4021edb4e4c0ae77f999051e62f" }
hex = "0.4"
http = "1"
humantime = "2.1"
@@ -161,9 +162,7 @@ parquet = { version = "54.2", default-features = false, features = ["arrow", "as
paste = "1.0"
pin-project = "1.0"
prometheus = { version = "0.13.3", features = ["process"] }
promql-parser = { git = "https://github.com/GreptimeTeam/promql-parser.git", rev = "0410e8b459dda7cb222ce9596f8bf3971bd07bd2", features = [
"ser",
] }
promql-parser = { version = "0.5.1", features = ["ser"] }
prost = { version = "0.13", features = ["no-recursion-limit"] }
raft-engine = { version = "0.4.1", default-features = false }
rand = "0.9"

View File

@@ -8,7 +8,7 @@ CARGO_BUILD_OPTS := --locked
IMAGE_REGISTRY ?= docker.io
IMAGE_NAMESPACE ?= greptime
IMAGE_TAG ?= latest
DEV_BUILDER_IMAGE_TAG ?= 2024-12-25-a71b93dd-20250305072908
DEV_BUILDER_IMAGE_TAG ?= 2025-04-15-1a517ec8-20250428023155
BUILDX_MULTI_PLATFORM_BUILD ?= false
BUILDX_BUILDER_NAME ?= gtbuilder
BASE_IMAGE ?= ubuntu

18
flake.lock generated
View File

@@ -8,11 +8,11 @@
"rust-analyzer-src": "rust-analyzer-src"
},
"locked": {
"lastModified": 1737613896,
"narHash": "sha256-ldqXIglq74C7yKMFUzrS9xMT/EVs26vZpOD68Sh7OcU=",
"lastModified": 1745735608,
"narHash": "sha256-L0jzm815XBFfF2wCFmR+M1CF+beIEFj6SxlqVKF59Ec=",
"owner": "nix-community",
"repo": "fenix",
"rev": "303a062fdd8e89f233db05868468975d17855d80",
"rev": "c39a78eba6ed2a022cc3218db90d485077101496",
"type": "github"
},
"original": {
@@ -41,11 +41,11 @@
},
"nixpkgs": {
"locked": {
"lastModified": 1737569578,
"narHash": "sha256-6qY0pk2QmUtBT9Mywdvif0i/CLVgpCjMUn6g9vB+f3M=",
"lastModified": 1745487689,
"narHash": "sha256-FQoi3R0NjQeBAsEOo49b5tbDPcJSMWc3QhhaIi9eddw=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "47addd76727f42d351590c905d9d1905ca895b82",
"rev": "5630cf13cceac06cefe9fc607e8dfa8fb342dde3",
"type": "github"
},
"original": {
@@ -65,11 +65,11 @@
"rust-analyzer-src": {
"flake": false,
"locked": {
"lastModified": 1737581772,
"narHash": "sha256-t1P2Pe3FAX9TlJsCZbmJ3wn+C4qr6aSMypAOu8WNsN0=",
"lastModified": 1745694049,
"narHash": "sha256-fxvRYH/tS7hGQeg9zCVh5RBcSWT+JGJet7RA8Ss+rC0=",
"owner": "rust-lang",
"repo": "rust-analyzer",
"rev": "582af7ee9c8d84f5d534272fc7de9f292bd849be",
"rev": "d8887c0758bbd2d5f752d5bd405d4491e90e7ed6",
"type": "github"
},
"original": {

View File

@@ -21,7 +21,7 @@
lib = nixpkgs.lib;
rustToolchain = fenix.packages.${system}.fromToolchainName {
name = (lib.importTOML ./rust-toolchain.toml).toolchain.channel;
sha256 = "sha256-f/CVA1EC61EWbh0SjaRNhLL0Ypx2ObupbzigZp8NmL4=";
sha256 = "sha256-arzEYlWLGGYeOhECHpBxQd2joZ4rPKV3qLNnZ+eql6A=";
};
in
{

View File

@@ -84,12 +84,6 @@ mod tests {
let key1 = "3178510";
let key2 = "4215648";
// have collision
assert_eq!(
oid_map.hasher.hash_one(key1) as u32,
oid_map.hasher.hash_one(key2) as u32
);
// insert them into oid_map
let oid1 = oid_map.get_oid(key1);
let oid2 = oid_map.get_oid(key2);

View File

@@ -19,4 +19,4 @@ mod uddsketch_state;
pub use geo_path::{GeoPathAccumulator, GEO_PATH_NAME};
pub(crate) use hll::HllStateType;
pub use hll::{HllState, HLL_MERGE_NAME, HLL_NAME};
pub use uddsketch_state::{UddSketchState, UDDSKETCH_STATE_NAME};
pub use uddsketch_state::{UddSketchState, UDDSKETCH_MERGE_NAME, UDDSKETCH_STATE_NAME};

View File

@@ -31,23 +31,28 @@ use datafusion::physical_plan::expressions::Literal;
use datafusion::prelude::create_udaf;
use datatypes::arrow::array::ArrayRef;
use datatypes::arrow::datatypes::{DataType, Float64Type};
use serde::{Deserialize, Serialize};
use uddsketch::{SketchHashKey, UDDSketch};
pub const UDDSKETCH_STATE_NAME: &str = "uddsketch_state";
#[derive(Debug)]
pub const UDDSKETCH_MERGE_NAME: &str = "uddsketch_merge";
#[derive(Debug, Serialize, Deserialize)]
pub struct UddSketchState {
uddsketch: UDDSketch,
error_rate: f64,
}
impl UddSketchState {
pub fn new(bucket_size: u64, error_rate: f64) -> Self {
Self {
uddsketch: UDDSketch::new(bucket_size, error_rate),
error_rate,
}
}
pub fn udf_impl() -> AggregateUDF {
pub fn state_udf_impl() -> AggregateUDF {
create_udaf(
UDDSKETCH_STATE_NAME,
vec![DataType::Int64, DataType::Float64, DataType::Float64],
@@ -61,18 +66,55 @@ impl UddSketchState {
)
}
/// Create a UDF for the `uddsketch_merge` function.
///
/// `uddsketch_merge` accepts bucket size, error rate, and a binary column of states generated by `uddsketch_state`
/// and merges them into a single state.
///
/// The bucket size and error rate must be the same as the original state.
pub fn merge_udf_impl() -> AggregateUDF {
create_udaf(
UDDSKETCH_MERGE_NAME,
vec![DataType::Int64, DataType::Float64, DataType::Binary],
Arc::new(DataType::Binary),
Volatility::Immutable,
Arc::new(|args| {
let (bucket_size, error_rate) = downcast_accumulator_args(args)?;
Ok(Box::new(UddSketchState::new(bucket_size, error_rate)))
}),
Arc::new(vec![DataType::Binary]),
)
}
fn update(&mut self, value: f64) {
self.uddsketch.add_value(value);
}
fn merge(&mut self, raw: &[u8]) {
if let Ok(uddsketch) = bincode::deserialize::<UDDSketch>(raw) {
if uddsketch.count() != 0 {
self.uddsketch.merge_sketch(&uddsketch);
fn merge(&mut self, raw: &[u8]) -> DfResult<()> {
if let Ok(uddsketch) = bincode::deserialize::<Self>(raw) {
if uddsketch.uddsketch.count() != 0 {
if self.uddsketch.max_allowed_buckets() != uddsketch.uddsketch.max_allowed_buckets()
|| (self.error_rate - uddsketch.error_rate).abs() >= 1e-9
{
return Err(DataFusionError::Plan(format!(
"Merging UDDSketch with different parameters: arguments={:?} vs actual input={:?}",
(
self.uddsketch.max_allowed_buckets(),
self.error_rate
),
(uddsketch.uddsketch.max_allowed_buckets(), uddsketch.error_rate)
)));
}
self.uddsketch.merge_sketch(&uddsketch.uddsketch);
}
} else {
trace!("Warning: Failed to deserialize UDDSketch from {:?}", raw);
return Err(DataFusionError::Plan(
"Failed to deserialize UDDSketch from binary".to_string(),
));
}
Ok(())
}
}
@@ -113,9 +155,21 @@ fn downcast_accumulator_args(args: AccumulatorArgs) -> DfResult<(u64, f64)> {
impl DfAccumulator for UddSketchState {
fn update_batch(&mut self, values: &[ArrayRef]) -> DfResult<()> {
let array = &values[2]; // the third column is data value
let f64_array = as_primitive_array::<Float64Type>(array)?;
for v in f64_array.iter().flatten() {
self.update(v);
match array.data_type() {
DataType::Float64 => {
let f64_array = as_primitive_array::<Float64Type>(array)?;
for v in f64_array.iter().flatten() {
self.update(v);
}
}
// meaning instantiate as `uddsketch_merge`
DataType::Binary => self.merge_batch(&[array.clone()])?,
_ => {
return not_impl_err!(
"UDDSketch functions do not support data type: {}",
array.data_type()
)
}
}
Ok(())
@@ -123,7 +177,7 @@ impl DfAccumulator for UddSketchState {
fn evaluate(&mut self) -> DfResult<ScalarValue> {
Ok(ScalarValue::Binary(Some(
bincode::serialize(&self.uddsketch).map_err(|e| {
bincode::serialize(&self).map_err(|e| {
DataFusionError::Internal(format!("Failed to serialize UDDSketch: {}", e))
})?,
)))
@@ -150,7 +204,7 @@ impl DfAccumulator for UddSketchState {
fn state(&mut self) -> DfResult<Vec<ScalarValue>> {
Ok(vec![ScalarValue::Binary(Some(
bincode::serialize(&self.uddsketch).map_err(|e| {
bincode::serialize(&self).map_err(|e| {
DataFusionError::Internal(format!("Failed to serialize UDDSketch: {}", e))
})?,
))])
@@ -160,7 +214,7 @@ impl DfAccumulator for UddSketchState {
let array = &states[0];
let binary_array = as_binary_array(array)?;
for v in binary_array.iter().flatten() {
self.merge(v);
self.merge(v)?;
}
Ok(())
@@ -182,8 +236,8 @@ mod tests {
let result = state.evaluate().unwrap();
if let ScalarValue::Binary(Some(bytes)) = result {
let deserialized: UDDSketch = bincode::deserialize(&bytes).unwrap();
assert_eq!(deserialized.count(), 3);
let deserialized: UddSketchState = bincode::deserialize(&bytes).unwrap();
assert_eq!(deserialized.uddsketch.count(), 3);
} else {
panic!("Expected binary scalar value");
}
@@ -201,13 +255,15 @@ mod tests {
// Create new state and merge the serialized data
let mut new_state = UddSketchState::new(10, 0.01);
if let ScalarValue::Binary(Some(bytes)) = &serialized {
new_state.merge(bytes);
new_state.merge(bytes).unwrap();
// Verify the merged state matches original by comparing deserialized values
let original_sketch: UDDSketch = bincode::deserialize(bytes).unwrap();
let original_sketch: UddSketchState = bincode::deserialize(bytes).unwrap();
let original_sketch = original_sketch.uddsketch;
let new_result = new_state.evaluate().unwrap();
if let ScalarValue::Binary(Some(new_bytes)) = new_result {
let new_sketch: UDDSketch = bincode::deserialize(&new_bytes).unwrap();
let new_sketch: UddSketchState = bincode::deserialize(&new_bytes).unwrap();
let new_sketch = new_sketch.uddsketch;
assert_eq!(original_sketch.count(), new_sketch.count());
assert_eq!(original_sketch.sum(), new_sketch.sum());
assert_eq!(original_sketch.mean(), new_sketch.mean());
@@ -244,7 +300,8 @@ mod tests {
let result = state.evaluate().unwrap();
if let ScalarValue::Binary(Some(bytes)) = result {
let deserialized: UDDSketch = bincode::deserialize(&bytes).unwrap();
let deserialized: UddSketchState = bincode::deserialize(&bytes).unwrap();
let deserialized = deserialized.uddsketch;
assert_eq!(deserialized.count(), 3);
} else {
panic!("Expected binary scalar value");
@@ -273,7 +330,8 @@ mod tests {
let result = merged_state.evaluate().unwrap();
if let ScalarValue::Binary(Some(bytes)) = result {
let deserialized: UDDSketch = bincode::deserialize(&bytes).unwrap();
let deserialized: UddSketchState = bincode::deserialize(&bytes).unwrap();
let deserialized = deserialized.uddsketch;
assert_eq!(deserialized.count(), 2);
} else {
panic!("Expected binary scalar value");

View File

@@ -12,8 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
use std::iter::repeat_n;
use std::sync::Arc;
use std::{fmt, iter};
use common_query::error::{InvalidFuncArgsSnafu, Result};
use common_query::prelude::Volatility;
@@ -126,9 +127,10 @@ impl Function for MatchesTermFunction {
let term = term_column.get_ref(0).as_string().unwrap();
match term {
None => {
return Ok(Arc::new(BooleanVector::from_iter(
iter::repeat(None).take(text_column.len()),
)));
return Ok(Arc::new(BooleanVector::from_iter(repeat_n(
None,
text_column.len(),
))));
}
Some(term) => Some(MatchesTermFinder::new(term)),
}
@@ -217,7 +219,7 @@ impl MatchesTermFinder {
}
let mut pos = 0;
while let Some(found_pos) = self.finder.find(text[pos..].as_bytes()) {
while let Some(found_pos) = self.finder.find(&text.as_bytes()[pos..]) {
let actual_pos = pos + found_pos;
let prev_ok = self.starts_with_non_alnum

View File

@@ -18,5 +18,4 @@ pub mod flight;
pub mod precision;
pub mod select;
pub use arrow_flight::FlightData;
pub use error::Error;

View File

@@ -24,39 +24,21 @@ use crate::cache::{CacheContainer, Initializer};
use crate::error::Result;
use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
use crate::key::flow::{TableFlowManager, TableFlowManagerRef};
use crate::key::{FlowId, FlowPartitionId};
use crate::kv_backend::KvBackendRef;
use crate::peer::Peer;
use crate::FlownodeId;
/// Flow id&flow partition key
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct FlowIdent {
pub flow_id: FlowId,
pub partition_id: FlowPartitionId,
}
impl FlowIdent {
pub fn new(flow_id: FlowId, partition_id: FlowPartitionId) -> Self {
Self {
flow_id,
partition_id,
}
}
}
/// cache for TableFlowManager, the table_id part is in the outer cache
/// include flownode_id, flow_id, partition_id mapping to Peer
type FlownodeFlowSet = Arc<HashMap<FlowIdent, Peer>>;
type FlownodeSet = Arc<HashMap<FlownodeId, Peer>>;
pub type TableFlownodeSetCacheRef = Arc<TableFlownodeSetCache>;
/// [TableFlownodeSetCache] caches the [TableId] to [FlownodeSet] mapping.
pub type TableFlownodeSetCache = CacheContainer<TableId, FlownodeFlowSet, CacheIdent>;
pub type TableFlownodeSetCache = CacheContainer<TableId, FlownodeSet, CacheIdent>;
/// Constructs a [TableFlownodeSetCache].
pub fn new_table_flownode_set_cache(
name: String,
cache: Cache<TableId, FlownodeFlowSet>,
cache: Cache<TableId, FlownodeSet>,
kv_backend: KvBackendRef,
) -> TableFlownodeSetCache {
let table_flow_manager = Arc::new(TableFlowManager::new(kv_backend));
@@ -65,7 +47,7 @@ pub fn new_table_flownode_set_cache(
CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
}
fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer<TableId, FlownodeFlowSet> {
fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer<TableId, FlownodeSet> {
Arc::new(move |&table_id| {
let table_flow_manager = table_flow_manager.clone();
Box::pin(async move {
@@ -75,12 +57,7 @@ fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer<TableId,
.map(|flows| {
flows
.into_iter()
.map(|(key, value)| {
(
FlowIdent::new(key.flow_id(), key.partition_id()),
value.peer,
)
})
.map(|(key, value)| (key.flownode_id(), value.peer))
.collect::<HashMap<_, _>>()
})
// We must cache the `HashSet` even if it's empty,
@@ -94,33 +71,26 @@ fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer<TableId,
}
async fn handle_create_flow(
cache: &Cache<TableId, FlownodeFlowSet>,
cache: &Cache<TableId, FlownodeSet>,
CreateFlow {
flow_id,
source_table_ids,
partition_to_peer_mapping: flow_part2nodes,
flownodes: flownode_peers,
}: &CreateFlow,
) {
for table_id in source_table_ids {
let entry = cache.entry(*table_id);
entry
.and_compute_with(
async |entry: Option<moka::Entry<u32, FlownodeFlowSet>>| match entry {
async |entry: Option<moka::Entry<u32, Arc<HashMap<u64, _>>>>| match entry {
Some(entry) => {
let mut map = entry.into_value().as_ref().clone();
map.extend(
flow_part2nodes.iter().map(|(part, peer)| {
(FlowIdent::new(*flow_id, *part), peer.clone())
}),
);
map.extend(flownode_peers.iter().map(|peer| (peer.id, peer.clone())));
Op::Put(Arc::new(map))
}
None => {
Op::Put(Arc::new(HashMap::from_iter(flow_part2nodes.iter().map(
|(part, peer)| (FlowIdent::new(*flow_id, *part), peer.clone()),
))))
}
None => Op::Put(Arc::new(HashMap::from_iter(
flownode_peers.iter().map(|peer| (peer.id, peer.clone())),
))),
},
)
.await;
@@ -128,23 +98,21 @@ async fn handle_create_flow(
}
async fn handle_drop_flow(
cache: &Cache<TableId, FlownodeFlowSet>,
cache: &Cache<TableId, FlownodeSet>,
DropFlow {
flow_id,
source_table_ids,
flow_part2node_id,
flownode_ids,
}: &DropFlow,
) {
for table_id in source_table_ids {
let entry = cache.entry(*table_id);
entry
.and_compute_with(
async |entry: Option<moka::Entry<u32, FlownodeFlowSet>>| match entry {
async |entry: Option<moka::Entry<u32, Arc<HashMap<u64, _>>>>| match entry {
Some(entry) => {
let mut set = entry.into_value().as_ref().clone();
for (part, _node) in flow_part2node_id {
let key = FlowIdent::new(*flow_id, *part);
set.remove(&key);
for flownode_id in flownode_ids {
set.remove(flownode_id);
}
Op::Put(Arc::new(set))
@@ -160,7 +128,7 @@ async fn handle_drop_flow(
}
fn invalidator<'a>(
cache: &'a Cache<TableId, FlownodeFlowSet>,
cache: &'a Cache<TableId, FlownodeSet>,
ident: &'a CacheIdent,
) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
@@ -186,7 +154,7 @@ mod tests {
use moka::future::CacheBuilder;
use table::table_name::TableName;
use crate::cache::flow::table_flownode::{new_table_flownode_set_cache, FlowIdent};
use crate::cache::flow::table_flownode::new_table_flownode_set_cache;
use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
use crate::key::flow::flow_info::FlowInfoValue;
use crate::key::flow::flow_route::FlowRouteValue;
@@ -246,16 +214,12 @@ mod tests {
let set = cache.get(1024).await.unwrap().unwrap();
assert_eq!(
set.as_ref().clone(),
HashMap::from_iter(
(1..=3).map(|i| { (FlowIdent::new(1024, (i - 1) as u32), Peer::empty(i),) })
)
HashMap::from_iter((1..=3).map(|i| { (i, Peer::empty(i),) }))
);
let set = cache.get(1025).await.unwrap().unwrap();
assert_eq!(
set.as_ref().clone(),
HashMap::from_iter(
(1..=3).map(|i| { (FlowIdent::new(1024, (i - 1) as u32), Peer::empty(i),) })
)
HashMap::from_iter((1..=3).map(|i| { (i, Peer::empty(i),) }))
);
let result = cache.get(1026).await.unwrap().unwrap();
assert_eq!(result.len(), 0);
@@ -267,9 +231,8 @@ mod tests {
let cache = CacheBuilder::new(128).build();
let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
let ident = vec![CacheIdent::CreateFlow(CreateFlow {
flow_id: 2001,
source_table_ids: vec![1024, 1025],
partition_to_peer_mapping: (1..=5).map(|i| (i as u32, Peer::empty(i + 1))).collect(),
flownodes: (1..=5).map(Peer::empty).collect(),
})];
cache.invalidate(&ident).await.unwrap();
let set = cache.get(1024).await.unwrap().unwrap();
@@ -278,54 +241,6 @@ mod tests {
assert_eq!(set.len(), 5);
}
#[tokio::test]
async fn test_replace_flow() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let cache = CacheBuilder::new(128).build();
let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
let ident = vec![CacheIdent::CreateFlow(CreateFlow {
flow_id: 2001,
source_table_ids: vec![1024, 1025],
partition_to_peer_mapping: (1..=5).map(|i| (i as u32, Peer::empty(i + 1))).collect(),
})];
cache.invalidate(&ident).await.unwrap();
let set = cache.get(1024).await.unwrap().unwrap();
assert_eq!(set.len(), 5);
let set = cache.get(1025).await.unwrap().unwrap();
assert_eq!(set.len(), 5);
let drop_then_create_flow = vec![
CacheIdent::DropFlow(DropFlow {
flow_id: 2001,
source_table_ids: vec![1024, 1025],
flow_part2node_id: (1..=5).map(|i| (i as u32, i + 1)).collect(),
}),
CacheIdent::CreateFlow(CreateFlow {
flow_id: 2001,
source_table_ids: vec![1026, 1027],
partition_to_peer_mapping: (11..=15)
.map(|i| (i as u32, Peer::empty(i + 1)))
.collect(),
}),
CacheIdent::FlowId(2001),
];
cache.invalidate(&drop_then_create_flow).await.unwrap();
let set = cache.get(1024).await.unwrap().unwrap();
assert!(set.is_empty());
let expected = HashMap::from_iter(
(11..=15).map(|i| (FlowIdent::new(2001, i as u32), Peer::empty(i + 1))),
);
let set = cache.get(1026).await.unwrap().unwrap();
assert_eq!(set.as_ref().clone(), expected);
let set = cache.get(1027).await.unwrap().unwrap();
assert_eq!(set.as_ref().clone(), expected);
}
#[tokio::test]
async fn test_drop_flow() {
let mem_kv = Arc::new(MemoryKvBackend::default());
@@ -333,57 +248,34 @@ mod tests {
let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
let ident = vec![
CacheIdent::CreateFlow(CreateFlow {
flow_id: 2001,
source_table_ids: vec![1024, 1025],
partition_to_peer_mapping: (1..=5)
.map(|i| (i as u32, Peer::empty(i + 1)))
.collect(),
flownodes: (1..=5).map(Peer::empty).collect(),
}),
CacheIdent::CreateFlow(CreateFlow {
flow_id: 2002,
source_table_ids: vec![1024, 1025],
partition_to_peer_mapping: (11..=12)
.map(|i| (i as u32, Peer::empty(i + 1)))
.collect(),
}),
// same flownode that hold multiple flows
CacheIdent::CreateFlow(CreateFlow {
flow_id: 2003,
source_table_ids: vec![1024, 1025],
partition_to_peer_mapping: (1..=5)
.map(|i| (i as u32, Peer::empty(i + 1)))
.collect(),
flownodes: (11..=12).map(Peer::empty).collect(),
}),
];
cache.invalidate(&ident).await.unwrap();
let set = cache.get(1024).await.unwrap().unwrap();
assert_eq!(set.len(), 12);
assert_eq!(set.len(), 7);
let set = cache.get(1025).await.unwrap().unwrap();
assert_eq!(set.len(), 12);
assert_eq!(set.len(), 7);
let ident = vec![CacheIdent::DropFlow(DropFlow {
flow_id: 2001,
source_table_ids: vec![1024, 1025],
flow_part2node_id: (1..=5).map(|i| (i as u32, i + 1)).collect(),
flownode_ids: vec![1, 2, 3, 4, 5],
})];
cache.invalidate(&ident).await.unwrap();
let set = cache.get(1024).await.unwrap().unwrap();
assert_eq!(
set.as_ref().clone(),
HashMap::from_iter(
(11..=12)
.map(|i| (FlowIdent::new(2002, i as u32), Peer::empty(i + 1)))
.chain((1..=5).map(|i| (FlowIdent::new(2003, i as u32), Peer::empty(i + 1))))
)
HashMap::from_iter((11..=12).map(|i| { (i, Peer::empty(i),) }))
);
let set = cache.get(1025).await.unwrap().unwrap();
assert_eq!(
set.as_ref().clone(),
HashMap::from_iter(
(11..=12)
.map(|i| (FlowIdent::new(2002, i as u32), Peer::empty(i + 1)))
.chain((1..=5).map(|i| (FlowIdent::new(2003, i as u32), Peer::empty(i + 1))))
)
HashMap::from_iter((11..=12).map(|i| { (i, Peer::empty(i),) }))
);
}
}

View File

@@ -16,12 +16,9 @@ use std::sync::Arc;
use crate::error::Result;
use crate::flow_name::FlowName;
use crate::instruction::{CacheIdent, DropFlow};
use crate::instruction::CacheIdent;
use crate::key::flow::flow_info::FlowInfoKey;
use crate::key::flow::flow_name::FlowNameKey;
use crate::key::flow::flow_route::FlowRouteKey;
use crate::key::flow::flownode_flow::FlownodeFlowKey;
use crate::key::flow::table_flow::TableFlowKey;
use crate::key::schema_name::SchemaNameKey;
use crate::key::table_info::TableInfoKey;
use crate::key::table_name::TableNameKey;
@@ -92,40 +89,9 @@ where
let key: SchemaNameKey = schema_name.into();
self.invalidate_key(&key.to_bytes()).await;
}
CacheIdent::CreateFlow(_) => {
CacheIdent::CreateFlow(_) | CacheIdent::DropFlow(_) => {
// Do nothing
}
CacheIdent::DropFlow(DropFlow {
flow_id,
source_table_ids,
flow_part2node_id,
}) => {
// invalidate flow route/flownode flow/table flow
let mut keys = Vec::with_capacity(
source_table_ids.len() * flow_part2node_id.len()
+ flow_part2node_id.len() * 2,
);
for table_id in source_table_ids {
for (partition_id, node_id) in flow_part2node_id {
let key =
TableFlowKey::new(*table_id, *node_id, *flow_id, *partition_id)
.to_bytes();
keys.push(key);
}
}
for (partition_id, node_id) in flow_part2node_id {
let key =
FlownodeFlowKey::new(*node_id, *flow_id, *partition_id).to_bytes();
keys.push(key);
let key = FlowRouteKey::new(*flow_id, *partition_id).to_bytes();
keys.push(key);
}
for key in keys {
self.invalidate_key(&key).await;
}
}
CacheIdent::FlowName(FlowName {
catalog_name,
flow_name,

View File

@@ -39,7 +39,7 @@ use crate::cache_invalidator::Context;
use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error};
use crate::ddl::DdlContext;
use crate::error::{self, Result, UnexpectedSnafu};
use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
use crate::instruction::{CacheIdent, CreateFlow};
use crate::key::flow::flow_info::FlowInfoValue;
use crate::key::flow::flow_route::FlowRouteValue;
use crate::key::table_name::TableNameKey;
@@ -70,7 +70,6 @@ impl CreateFlowProcedure {
query_context,
state: CreateFlowState::Prepare,
prev_flow_info_value: None,
did_replace: false,
flow_type: None,
},
}
@@ -225,7 +224,6 @@ impl CreateFlowProcedure {
.update_flow_metadata(flow_id, prev_flow_value, &flow_info, flow_routes)
.await?;
info!("Replaced flow metadata for flow {flow_id}");
self.data.did_replace = true;
} else {
self.context
.flow_metadata_manager
@@ -242,43 +240,22 @@ impl CreateFlowProcedure {
debug_assert!(self.data.state == CreateFlowState::InvalidateFlowCache);
// Safety: The flow id must be allocated.
let flow_id = self.data.flow_id.unwrap();
let did_replace = self.data.did_replace;
let ctx = Context {
subject: Some("Invalidate flow cache by creating flow".to_string()),
};
let mut caches = vec![];
// if did replaced, invalidate the flow cache with drop the old flow
if did_replace {
let old_flow_info = self.data.prev_flow_info_value.as_ref().unwrap();
// only drop flow is needed, since flow name haven't changed, and flow id already invalidated below
caches.extend([CacheIdent::DropFlow(DropFlow {
flow_id,
source_table_ids: old_flow_info.source_table_ids.clone(),
flow_part2node_id: old_flow_info.flownode_ids().clone().into_iter().collect(),
})]);
}
let (_flow_info, flow_routes) = (&self.data).into();
let flow_part2peers = flow_routes
.into_iter()
.map(|(part_id, route)| (part_id, route.peer))
.collect();
caches.extend([
CacheIdent::CreateFlow(CreateFlow {
flow_id,
source_table_ids: self.data.source_table_ids.clone(),
partition_to_peer_mapping: flow_part2peers,
}),
CacheIdent::FlowId(flow_id),
]);
self.context
.cache_invalidator
.invalidate(&ctx, &caches)
.invalidate(
&ctx,
&[
CacheIdent::CreateFlow(CreateFlow {
source_table_ids: self.data.source_table_ids.clone(),
flownodes: self.data.peers.clone(),
}),
CacheIdent::FlowId(flow_id),
],
)
.await?;
Ok(Status::done_with_output(flow_id))
@@ -400,10 +377,6 @@ pub struct CreateFlowData {
/// For verify if prev value is consistent when need to update flow metadata.
/// only set when `or_replace` is true.
pub(crate) prev_flow_info_value: Option<DeserializedValueWithBytes<FlowInfoValue>>,
/// Only set to true when replace actually happened.
/// This is used to determine whether to invalidate the cache.
#[serde(default)]
pub(crate) did_replace: bool,
pub(crate) flow_type: Option<FlowType>,
}

View File

@@ -13,7 +13,6 @@
// limitations under the License.
mod metadata;
use api::v1::flow::{flow_request, DropRequest, FlowRequest};
use async_trait::async_trait;
use common_catalog::format_full_flow_name;
@@ -154,12 +153,6 @@ impl DropFlowProcedure {
};
let flow_info_value = self.data.flow_info_value.as_ref().unwrap();
let flow_part2nodes = flow_info_value
.flownode_ids()
.clone()
.into_iter()
.collect::<Vec<_>>();
self.context
.cache_invalidator
.invalidate(
@@ -171,9 +164,8 @@ impl DropFlowProcedure {
flow_name: flow_info_value.flow_name.to_string(),
}),
CacheIdent::DropFlow(DropFlow {
flow_id,
source_table_ids: flow_info_value.source_table_ids.clone(),
flow_part2node_id: flow_part2nodes,
flownode_ids: flow_info_value.flownode_ids.values().cloned().collect(),
}),
],
)

View File

@@ -514,25 +514,11 @@ pub enum Error {
},
#[snafu(display(
"Failed to get a Kafka partition client, topic: {}, partition: {}",
"Failed to build a Kafka partition client, topic: {}, partition: {}",
topic,
partition
))]
KafkaPartitionClient {
topic: String,
partition: i32,
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: rskafka::client::error::Error,
},
#[snafu(display(
"Failed to get offset from Kafka, topic: {}, partition: {}",
topic,
partition
))]
KafkaGetOffset {
BuildKafkaPartitionClient {
topic: String,
partition: i32,
#[snafu(implicit)]
@@ -857,7 +843,7 @@ impl ErrorExt for Error {
| EncodeWalOptions { .. }
| BuildKafkaClient { .. }
| BuildKafkaCtrlClient { .. }
| KafkaPartitionClient { .. }
| BuildKafkaPartitionClient { .. }
| ResolveKafkaEndpoint { .. }
| ProduceRecord { .. }
| CreateKafkaWalTopic { .. }
@@ -866,8 +852,7 @@ impl ErrorExt for Error {
| ProcedureOutput { .. }
| FromUtf8 { .. }
| MetadataCorruption { .. }
| ParseWalOptions { .. }
| KafkaGetOffset { .. } => StatusCode::Unexpected,
| ParseWalOptions { .. } => StatusCode::Unexpected,
SendMessage { .. } | GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal,

View File

@@ -24,7 +24,7 @@ use table::table_name::TableName;
use crate::flow_name::FlowName;
use crate::key::schema_name::SchemaName;
use crate::key::{FlowId, FlowPartitionId};
use crate::key::FlowId;
use crate::peer::Peer;
use crate::{DatanodeId, FlownodeId};
@@ -184,19 +184,14 @@ pub enum CacheIdent {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct CreateFlow {
/// The unique identifier for the flow.
pub flow_id: FlowId,
pub source_table_ids: Vec<TableId>,
/// Mapping of flow partition to peer information
pub partition_to_peer_mapping: Vec<(FlowPartitionId, Peer)>,
pub flownodes: Vec<Peer>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct DropFlow {
pub flow_id: FlowId,
pub source_table_ids: Vec<TableId>,
/// Mapping of flow partition to flownode id
pub flow_part2node_id: Vec<(FlowPartitionId, FlownodeId)>,
pub flownode_ids: Vec<FlownodeId>,
}
/// Flushes a batch of regions.
@@ -222,7 +217,9 @@ pub enum Instruction {
/// Invalidates batch cache.
InvalidateCaches(Vec<CacheIdent>),
/// Flushes regions.
FlushRegion(FlushRegions),
FlushRegions(FlushRegions),
/// Flushes a single region.
FlushRegion(RegionId),
}
/// The reply of [UpgradeRegion].
@@ -253,6 +250,7 @@ pub enum InstructionReply {
CloseRegion(SimpleReply),
UpgradeRegion(UpgradeRegionReply),
DowngradeRegion(DowngradeRegionReply),
FlushRegion(SimpleReply),
}
impl Display for InstructionReply {
@@ -264,6 +262,7 @@ impl Display for InstructionReply {
Self::DowngradeRegion(reply) => {
write!(f, "InstructionReply::DowngradeRegion({})", reply)
}
Self::FlushRegion(reply) => write!(f, "InstructionReply::FlushRegion({})", reply),
}
}
}

View File

@@ -112,7 +112,7 @@ pub mod test_utils;
mod tombstone;
pub mod topic_name;
pub mod topic_region;
pub(crate) mod txn_helper;
pub mod txn_helper;
pub mod view_info;
use std::collections::{BTreeMap, HashMap, HashSet};

View File

@@ -256,11 +256,6 @@ impl DatanodeTableManager {
})?
.and_then(|r| DatanodeTableValue::try_from_raw_value(&r.value))?
.region_info;
// If the region options are the same, we don't need to update it.
if region_info.region_options == new_region_options {
return Ok(Txn::new());
}
// substitute region options only.
region_info.region_options = new_region_options;

View File

@@ -45,7 +45,7 @@ use crate::kv_backend::KvBackendRef;
use crate::rpc::store::BatchDeleteRequest;
/// The key of `__flow/` scope.
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, PartialEq)]
pub struct FlowScoped<T> {
inner: T,
}
@@ -246,32 +246,27 @@ impl FlowMetadataManager {
new_flow_info: &FlowInfoValue,
flow_routes: Vec<(FlowPartitionId, FlowRouteValue)>,
) -> Result<()> {
let (update_flow_flow_name_txn, on_create_flow_flow_name_failure) =
let (create_flow_flow_name_txn, on_create_flow_flow_name_failure) =
self.flow_name_manager.build_update_txn(
&new_flow_info.catalog_name,
&new_flow_info.flow_name,
flow_id,
)?;
let (update_flow_txn, on_create_flow_failure) =
let (create_flow_txn, on_create_flow_failure) =
self.flow_info_manager
.build_update_txn(flow_id, current_flow_info, new_flow_info)?;
let update_flow_routes_txn = self.flow_route_manager.build_update_txn(
flow_id,
current_flow_info,
flow_routes.clone(),
)?;
let create_flow_routes_txn = self
.flow_route_manager
.build_create_txn(flow_id, flow_routes.clone())?;
let update_flownode_flow_txn = self.flownode_flow_manager.build_update_txn(
flow_id,
current_flow_info,
new_flow_info.flownode_ids().clone(),
);
let create_flownode_flow_txn = self
.flownode_flow_manager
.build_create_txn(flow_id, new_flow_info.flownode_ids().clone());
let update_table_flow_txn = self.table_flow_manager.build_update_txn(
let create_table_flow_txn = self.table_flow_manager.build_create_txn(
flow_id,
current_flow_info,
flow_routes
.into_iter()
.map(|(partition_id, route)| (partition_id, TableFlowValue { peer: route.peer }))
@@ -280,11 +275,11 @@ impl FlowMetadataManager {
)?;
let txn = Txn::merge_all(vec![
update_flow_flow_name_txn,
update_flow_txn,
update_flow_routes_txn,
update_flownode_flow_txn,
update_table_flow_txn,
create_flow_flow_name_txn,
create_flow_txn,
create_flow_routes_txn,
create_flownode_flow_txn,
create_table_flow_txn,
]);
info!(
"Creating flow {}.{}({}), with {} txn operations",
@@ -788,141 +783,6 @@ mod tests {
}
}
#[tokio::test]
async fn test_update_flow_metadata_diff_flownode() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
let flow_id = 10;
let flow_value = test_flow_info_value(
"flow",
[(0u32, 1u64), (1u32, 2u64)].into(),
vec![1024, 1025, 1026],
);
let flow_routes = vec![
(
0u32,
FlowRouteValue {
peer: Peer::empty(1),
},
),
(
1,
FlowRouteValue {
peer: Peer::empty(2),
},
),
];
flow_metadata_manager
.create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
.await
.unwrap();
let new_flow_value = {
let mut tmp = flow_value.clone();
tmp.raw_sql = "new".to_string();
// move to different flownodes
tmp.flownode_ids = [(0, 3u64), (1, 4u64)].into();
tmp
};
let new_flow_routes = vec![
(
0u32,
FlowRouteValue {
peer: Peer::empty(3),
},
),
(
1,
FlowRouteValue {
peer: Peer::empty(4),
},
),
];
// Update flow instead
flow_metadata_manager
.update_flow_metadata(
flow_id,
&DeserializedValueWithBytes::from_inner(flow_value.clone()),
&new_flow_value,
new_flow_routes.clone(),
)
.await
.unwrap();
let got = flow_metadata_manager
.flow_info_manager()
.get(flow_id)
.await
.unwrap()
.unwrap();
let routes = flow_metadata_manager
.flow_route_manager()
.routes(flow_id)
.await
.unwrap();
assert_eq!(
routes,
vec![
(
FlowRouteKey::new(flow_id, 0),
FlowRouteValue {
peer: Peer::empty(3),
},
),
(
FlowRouteKey::new(flow_id, 1),
FlowRouteValue {
peer: Peer::empty(4),
},
),
]
);
assert_eq!(got, new_flow_value);
let flows = flow_metadata_manager
.flownode_flow_manager()
.flows(1)
.try_collect::<Vec<_>>()
.await
.unwrap();
// should moved to different flownode
assert_eq!(flows, vec![]);
let flows = flow_metadata_manager
.flownode_flow_manager()
.flows(3)
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(flows, vec![(flow_id, 0)]);
for table_id in [1024, 1025, 1026] {
let nodes = flow_metadata_manager
.table_flow_manager()
.flows(table_id)
.await
.unwrap();
assert_eq!(
nodes,
vec![
(
TableFlowKey::new(table_id, 3, flow_id, 0),
TableFlowValue {
peer: Peer::empty(3)
}
),
(
TableFlowKey::new(table_id, 4, flow_id, 1),
TableFlowValue {
peer: Peer::empty(4)
}
)
]
);
}
}
#[tokio::test]
async fn test_update_flow_metadata_flow_replace_diff_id_err() {
let mem_kv = Arc::new(MemoryKvBackend::default());

View File

@@ -153,15 +153,6 @@ impl FlowInfoValue {
&self.flownode_ids
}
/// Insert a new flownode id for a partition.
pub fn insert_flownode_id(
&mut self,
partition: FlowPartitionId,
node: FlownodeId,
) -> Option<FlownodeId> {
self.flownode_ids.insert(partition, node)
}
/// Returns the `source_table`.
pub fn source_table_ids(&self) -> &[TableId] {
&self.source_table_ids
@@ -281,11 +272,10 @@ impl FlowInfoManager {
let raw_value = new_flow_value.try_as_raw_value()?;
let prev_value = current_flow_value.get_raw_bytes();
let txn = Txn::new()
.when(vec![Compare::new(
key.clone(),
CompareOp::Equal,
Some(prev_value),
)])
.when(vec![
Compare::new(key.clone(), CompareOp::NotEqual, None),
Compare::new(key.clone(), CompareOp::Equal, Some(prev_value)),
])
.and_then(vec![TxnOp::Put(key.clone(), raw_value)])
.or_else(vec![TxnOp::Get(key.clone())]);

View File

@@ -19,12 +19,9 @@ use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use crate::error::{self, Result};
use crate::key::flow::flow_info::FlowInfoValue;
use crate::key::flow::{flownode_addr_helper, FlowScoped};
use crate::key::node_address::NodeAddressKey;
use crate::key::{
BytesAdapter, DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey, MetadataValue,
};
use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetadataKey, MetadataValue};
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::peer::Peer;
@@ -42,7 +39,7 @@ lazy_static! {
/// The key stores the route info of the flow.
///
/// The layout: `__flow/route/{flow_id}/{partition_id}`.
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, PartialEq)]
pub struct FlowRouteKey(FlowScoped<FlowRouteKeyInner>);
impl FlowRouteKey {
@@ -145,12 +142,6 @@ pub struct FlowRouteValue {
pub(crate) peer: Peer,
}
impl From<Peer> for FlowRouteValue {
fn from(peer: Peer) -> Self {
Self { peer }
}
}
impl FlowRouteValue {
/// Returns the `peer`.
pub fn peer(&self) -> &Peer {
@@ -213,33 +204,6 @@ impl FlowRouteManager {
Ok(Txn::new().and_then(txns))
}
/// Builds a update flow routes transaction.
///
/// Puts `__flow/route/{flow_id}/{partition_id}` keys.
/// Also removes `__flow/route/{flow_id}/{old_partition_id}` keys.
pub(crate) fn build_update_txn<I: IntoIterator<Item = (FlowPartitionId, FlowRouteValue)>>(
&self,
flow_id: FlowId,
current_flow_info: &DeserializedValueWithBytes<FlowInfoValue>,
flow_routes: I,
) -> Result<Txn> {
let del_txns = current_flow_info
.flownode_ids()
.iter()
.map(|(partition_id, _)| {
let key = FlowRouteKey::new(flow_id, *partition_id).to_bytes();
Ok(TxnOp::Delete(key))
});
let put_txns = flow_routes.into_iter().map(|(partition_id, route)| {
let key = FlowRouteKey::new(flow_id, partition_id).to_bytes();
Ok(TxnOp::Put(key, route.try_as_raw_value()?))
});
let txns = del_txns.chain(put_txns).collect::<Result<Vec<_>>>()?;
Ok(Txn::new().and_then(txns))
}
async fn remap_flow_route_addresses(
&self,
flow_routes: &mut [(FlowRouteKey, FlowRouteValue)],

View File

@@ -19,9 +19,8 @@ use regex::Regex;
use snafu::OptionExt;
use crate::error::{self, Result};
use crate::key::flow::flow_info::FlowInfoValue;
use crate::key::flow::FlowScoped;
use crate::key::{BytesAdapter, DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey};
use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetadataKey};
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
@@ -166,17 +165,6 @@ impl FlownodeFlowManager {
Self { kv_backend }
}
/// Whether given flow exist on this flownode.
pub async fn exists(
&self,
flownode_id: FlownodeId,
flow_id: FlowId,
partition_id: FlowPartitionId,
) -> Result<bool> {
let key = FlownodeFlowKey::new(flownode_id, flow_id, partition_id).to_bytes();
Ok(self.kv_backend.get(&key).await?.is_some())
}
/// Retrieves all [FlowId] and [FlowPartitionId]s of the specified `flownode_id`.
pub fn flows(
&self,
@@ -214,33 +202,6 @@ impl FlownodeFlowManager {
Txn::new().and_then(txns)
}
/// Builds a update flownode flow transaction.
///
/// Puts `__flownode_flow/{flownode_id}/{flow_id}/{partition_id}` keys.
/// Remove the old `__flownode_flow/{old_flownode_id}/{flow_id}/{old_partition_id}` keys.
pub(crate) fn build_update_txn<I: IntoIterator<Item = (FlowPartitionId, FlownodeId)>>(
&self,
flow_id: FlowId,
current_flow_info: &DeserializedValueWithBytes<FlowInfoValue>,
flownode_ids: I,
) -> Txn {
let del_txns =
current_flow_info
.flownode_ids()
.iter()
.map(|(partition_id, flownode_id)| {
let key = FlownodeFlowKey::new(*flownode_id, flow_id, *partition_id).to_bytes();
TxnOp::Delete(key)
});
let put_txns = flownode_ids.into_iter().map(|(partition_id, flownode_id)| {
let key = FlownodeFlowKey::new(flownode_id, flow_id, partition_id).to_bytes();
TxnOp::Put(key, vec![])
});
let txns = del_txns.chain(put_txns).collect::<Vec<_>>();
Txn::new().and_then(txns)
}
}
#[cfg(test)]

View File

@@ -22,12 +22,9 @@ use snafu::OptionExt;
use table::metadata::TableId;
use crate::error::{self, Result};
use crate::key::flow::flow_info::FlowInfoValue;
use crate::key::flow::{flownode_addr_helper, FlowScoped};
use crate::key::node_address::NodeAddressKey;
use crate::key::{
BytesAdapter, DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey, MetadataValue,
};
use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetadataKey, MetadataValue};
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::peer::Peer;
@@ -218,7 +215,7 @@ impl TableFlowManager {
/// Builds a create table flow transaction.
///
/// Puts `__flow/source_table/{table_id}/{node_id}/{flow_id}/{partition_id}` keys.
/// Puts `__flow/source_table/{table_id}/{node_id}/{partition_id}` keys.
pub fn build_create_txn(
&self,
flow_id: FlowId,
@@ -242,44 +239,6 @@ impl TableFlowManager {
Ok(Txn::new().and_then(txns))
}
/// Builds a update table flow transaction.
///
/// Puts `__flow/source_table/{table_id}/{node_id}/{flow_id}/{partition_id}` keys,
/// Also remove previous
/// `__flow/source_table/{table_id}/{old_node_id}/{flow_id}/{partition_id}` keys.
pub fn build_update_txn(
&self,
flow_id: FlowId,
current_flow_info: &DeserializedValueWithBytes<FlowInfoValue>,
table_flow_values: Vec<(FlowPartitionId, TableFlowValue)>,
source_table_ids: &[TableId],
) -> Result<Txn> {
let mut txns = Vec::with_capacity(2 * source_table_ids.len() * table_flow_values.len());
// first remove the old keys
for (part_id, node_id) in current_flow_info.flownode_ids() {
for source_table_id in current_flow_info.source_table_ids() {
txns.push(TxnOp::Delete(
TableFlowKey::new(*source_table_id, *node_id, flow_id, *part_id).to_bytes(),
));
}
}
for (partition_id, table_flow_value) in table_flow_values {
let flownode_id = table_flow_value.peer.id;
let value = table_flow_value.try_as_raw_value()?;
for source_table_id in source_table_ids {
txns.push(TxnOp::Put(
TableFlowKey::new(*source_table_id, flownode_id, flow_id, partition_id)
.to_bytes(),
value.clone(),
));
}
}
Ok(Txn::new().and_then(txns))
}
async fn remap_table_flow_addresses(
&self,
table_flows: &mut [(TableFlowKey, TableFlowValue)],

View File

@@ -25,7 +25,7 @@ pub struct TxnOpGetResponseSet(Vec<KeyValue>);
impl TxnOpGetResponseSet {
/// Returns a filter to consume a [KeyValue] where the key equals `key`.
pub(crate) fn filter(key: Vec<u8>) -> impl FnMut(&mut TxnOpGetResponseSet) -> Option<Vec<u8>> {
pub fn filter(key: Vec<u8>) -> impl FnMut(&mut TxnOpGetResponseSet) -> Option<Vec<u8>> {
move |set| {
let pos = set.0.iter().position(|kv| kv.key == key);
match pos {
@@ -36,7 +36,7 @@ impl TxnOpGetResponseSet {
}
/// Returns a decoder to decode bytes to `DeserializedValueWithBytes<T>`.
pub(crate) fn decode_with<F, T>(
pub fn decode_with<F, T>(
mut f: F,
) -> impl FnMut(&mut TxnOpGetResponseSet) -> Result<Option<DeserializedValueWithBytes<T>>>
where

View File

@@ -15,8 +15,6 @@
#![feature(assert_matches)]
#![feature(btree_extract_if)]
#![feature(let_chains)]
#![feature(extract_if)]
#![feature(hash_extract_if)]
pub mod cache;
pub mod cache_invalidator;

View File

@@ -176,15 +176,12 @@ impl TableRoute {
})?
.into();
let leader_peer = peers
.get(region_route.leader_peer_index as usize)
.cloned()
.map(Into::into);
let leader_peer = peers.get(region_route.leader_peer_index as usize).cloned();
let follower_peers = region_route
.follower_peer_indexes
.into_iter()
.filter_map(|x| peers.get(x as usize).cloned().map(Into::into))
.filter_map(|x| peers.get(x as usize).cloned())
.collect::<Vec<_>>();
region_routes.push(RegionRoute {

View File

@@ -20,8 +20,6 @@ use api::v1::region::{InsertRequests, RegionRequest};
pub use common_base::AffectedRows;
use common_query::request::QueryRequest;
use common_recordbatch::SendableRecordBatchStream;
use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
use common_wal::config::kafka::MetasrvKafkaConfig;
use crate::cache_invalidator::DummyCacheInvalidator;
use crate::ddl::flow_meta::FlowMetadataAllocator;
@@ -39,8 +37,7 @@ use crate::peer::{Peer, PeerLookupService};
use crate::region_keeper::MemoryRegionKeeper;
use crate::region_registry::LeaderRegionRegistry;
use crate::sequence::SequenceBuilder;
use crate::wal_options_allocator::topic_pool::KafkaTopicPool;
use crate::wal_options_allocator::{build_kafka_topic_creator, WalOptionsAllocator};
use crate::wal_options_allocator::WalOptionsAllocator;
use crate::{DatanodeId, FlownodeId};
#[async_trait::async_trait]
@@ -202,34 +199,3 @@ impl PeerLookupService for NoopPeerLookupService {
Ok(Some(Peer::empty(id)))
}
}
/// Create a kafka topic pool for testing.
pub async fn test_kafka_topic_pool(
broker_endpoints: Vec<String>,
num_topics: usize,
auto_create_topics: bool,
topic_name_prefix: Option<&str>,
) -> KafkaTopicPool {
let mut config = MetasrvKafkaConfig {
connection: KafkaConnectionConfig {
broker_endpoints,
..Default::default()
},
kafka_topic: KafkaTopicConfig {
num_topics,
..Default::default()
},
auto_create_topics,
..Default::default()
};
if let Some(prefix) = topic_name_prefix {
config.kafka_topic.topic_name_prefix = prefix.to_string();
}
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
let topic_creator = build_kafka_topic_creator(&config.connection, &config.kafka_topic)
.await
.unwrap();
KafkaTopicPool::new(&config, kv_backend, topic_creator)
}

View File

@@ -112,9 +112,7 @@ pub async fn build_wal_options_allocator(
NAME_PATTERN_REGEX.is_match(prefix),
InvalidTopicNamePrefixSnafu { prefix }
);
let topic_creator =
build_kafka_topic_creator(&kafka_config.connection, &kafka_config.kafka_topic)
.await?;
let topic_creator = build_kafka_topic_creator(kafka_config).await?;
let topic_pool = KafkaTopicPool::new(kafka_config, kv_backend, topic_creator);
Ok(WalOptionsAllocator::Kafka(topic_pool))
}
@@ -153,16 +151,13 @@ pub fn prepare_wal_options(
mod tests {
use std::assert_matches::assert_matches;
use common_wal::config::kafka::common::KafkaTopicConfig;
use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
use common_wal::config::kafka::MetasrvKafkaConfig;
use common_wal::maybe_skip_kafka_integration_test;
use common_wal::test_util::get_kafka_endpoints;
use common_wal::test_util::run_test_with_kafka_wal;
use super::*;
use crate::error::Error;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::test_util::test_kafka_topic_pool;
use crate::wal_options_allocator::selector::RoundRobinTopicSelector;
// Tests that the wal options allocator could successfully allocate raft-engine wal options.
#[tokio::test]
@@ -202,42 +197,55 @@ mod tests {
assert_matches!(got, Error::InvalidTopicNamePrefix { .. });
}
// Tests that the wal options allocator could successfully allocate Kafka wal options.
#[tokio::test]
async fn test_allocator_with_kafka_allocate_wal_options() {
common_telemetry::init_default_ut_logging();
maybe_skip_kafka_integration_test!();
let num_topics = 5;
let mut topic_pool = test_kafka_topic_pool(
get_kafka_endpoints(),
num_topics,
true,
Some("test_allocator_with_kafka"),
)
.await;
topic_pool.selector = Arc::new(RoundRobinTopicSelector::default());
let topics = topic_pool.topics.clone();
// clean up the topics before test
let topic_creator = topic_pool.topic_creator();
topic_creator.delete_topics(&topics).await.unwrap();
async fn test_allocator_with_kafka() {
run_test_with_kafka_wal(|broker_endpoints| {
Box::pin(async {
let topics = (0..256)
.map(|i| format!("test_allocator_with_kafka_{}_{}", i, uuid::Uuid::new_v4()))
.collect::<Vec<_>>();
// Creates an options allocator.
let allocator = WalOptionsAllocator::Kafka(topic_pool);
allocator.start().await.unwrap();
// Creates a topic manager.
let kafka_topic = KafkaTopicConfig {
replication_factor: broker_endpoints.len() as i16,
..Default::default()
};
let config = MetasrvKafkaConfig {
connection: KafkaConnectionConfig {
broker_endpoints,
..Default::default()
},
kafka_topic,
..Default::default()
};
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
let topic_creator = build_kafka_topic_creator(&config).await.unwrap();
let mut topic_pool = KafkaTopicPool::new(&config, kv_backend, topic_creator);
topic_pool.topics.clone_from(&topics);
topic_pool.selector = Arc::new(selector::RoundRobinTopicSelector::default());
let num_regions = 3;
let regions = (0..num_regions).collect::<Vec<_>>();
let got = allocate_region_wal_options(regions.clone(), &allocator, false).unwrap();
// Creates an options allocator.
let allocator = WalOptionsAllocator::Kafka(topic_pool);
allocator.start().await.unwrap();
// Check the allocated wal options contain the expected topics.
let expected = (0..num_regions)
.map(|i| {
let options = WalOptions::Kafka(KafkaWalOptions {
topic: topics[i as usize].clone(),
});
(i, serde_json::to_string(&options).unwrap())
let num_regions = 32;
let regions = (0..num_regions).collect::<Vec<_>>();
let got = allocate_region_wal_options(regions.clone(), &allocator, false).unwrap();
// Check the allocated wal options contain the expected topics.
let expected = (0..num_regions)
.map(|i| {
let options = WalOptions::Kafka(KafkaWalOptions {
topic: topics[i as usize].clone(),
});
(i, serde_json::to_string(&options).unwrap())
})
.collect::<HashMap<_, _>>();
assert_eq!(got, expected);
})
.collect::<HashMap<_, _>>();
assert_eq!(got, expected);
})
.await;
}
#[tokio::test]

View File

@@ -12,21 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_telemetry::{debug, error, info};
use common_wal::config::kafka::common::{
KafkaConnectionConfig, KafkaTopicConfig, DEFAULT_BACKOFF_CONFIG,
};
use common_telemetry::{error, info};
use common_wal::config::kafka::common::DEFAULT_BACKOFF_CONFIG;
use common_wal::config::kafka::MetasrvKafkaConfig;
use rskafka::client::error::Error as RsKafkaError;
use rskafka::client::error::ProtocolError::TopicAlreadyExists;
use rskafka::client::partition::{Compression, OffsetAt, PartitionClient, UnknownTopicHandling};
use rskafka::client::partition::{Compression, UnknownTopicHandling};
use rskafka::client::{Client, ClientBuilder};
use rskafka::record::Record;
use snafu::ResultExt;
use crate::error::{
BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, CreateKafkaWalTopicSnafu,
KafkaGetOffsetSnafu, KafkaPartitionClientSnafu, ProduceRecordSnafu, ResolveKafkaEndpointSnafu,
Result, TlsConfigSnafu,
BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, BuildKafkaPartitionClientSnafu,
CreateKafkaWalTopicSnafu, ProduceRecordSnafu, ResolveKafkaEndpointSnafu, Result,
TlsConfigSnafu,
};
// Each topic only has one partition for now.
@@ -71,47 +70,21 @@ impl KafkaTopicCreator {
info!("The topic {} already exists", topic);
Ok(())
} else {
error!(e; "Failed to create a topic {}", topic);
error!("Failed to create a topic {}, error {:?}", topic, e);
Err(e).context(CreateKafkaWalTopicSnafu)
}
}
}
}
async fn prepare_topic(&self, topic: &String) -> Result<()> {
let partition_client = self.partition_client(topic).await?;
self.append_noop_record(topic, &partition_client).await?;
Ok(())
}
/// Creates a [PartitionClient] for the given topic.
async fn partition_client(&self, topic: &str) -> Result<PartitionClient> {
self.client
async fn append_noop_record(&self, topic: &String, client: &Client) -> Result<()> {
let partition_client = client
.partition_client(topic, DEFAULT_PARTITION, UnknownTopicHandling::Retry)
.await
.context(KafkaPartitionClientSnafu {
.context(BuildKafkaPartitionClientSnafu {
topic,
partition: DEFAULT_PARTITION,
})
}
/// Appends a noop record to the topic.
/// It only appends a noop record if the topic is empty.
async fn append_noop_record(
&self,
topic: &String,
partition_client: &PartitionClient,
) -> Result<()> {
let end_offset = partition_client
.get_offset(OffsetAt::Latest)
.await
.context(KafkaGetOffsetSnafu {
topic: topic.to_string(),
partition: DEFAULT_PARTITION,
})?;
if end_offset > 0 {
return Ok(());
}
partition_client
.produce(
@@ -125,28 +98,22 @@ impl KafkaTopicCreator {
)
.await
.context(ProduceRecordSnafu { topic })?;
debug!("Appended a noop record to topic {}", topic);
Ok(())
}
/// Creates topics in Kafka.
pub async fn create_topics(&self, topics: &[String]) -> Result<()> {
let tasks = topics
.iter()
.map(|topic| async { self.create_topic(topic, &self.client).await })
.collect::<Vec<_>>();
futures::future::try_join_all(tasks).await.map(|_| ())
}
/// Prepares topics in Kafka.
///
/// It appends a noop record to each topic if the topic is empty.
pub async fn prepare_topics(&self, topics: &[String]) -> Result<()> {
/// 1. Creates missing topics.
/// 2. Appends a noop record to each topic.
pub async fn prepare_topics(&self, topics: &[&String]) -> Result<()> {
// Try to create missing topics.
let tasks = topics
.iter()
.map(|topic| async { self.prepare_topic(topic).await })
.map(|topic| async {
self.create_topic(topic, &self.client).await?;
self.append_noop_record(topic, &self.client).await?;
Ok(())
})
.collect::<Vec<_>>();
futures::future::try_join_all(tasks).await.map(|_| ())
}
@@ -162,244 +129,34 @@ impl KafkaTopicCreator {
}
}
#[cfg(test)]
impl KafkaTopicCreator {
pub async fn delete_topics(&self, topics: &[String]) -> Result<()> {
let tasks = topics
.iter()
.map(|topic| async { self.delete_topic(topic, &self.client).await })
.collect::<Vec<_>>();
futures::future::try_join_all(tasks).await.map(|_| ())
}
async fn delete_topic(&self, topic: &String, client: &Client) -> Result<()> {
let controller = client
.controller_client()
.context(BuildKafkaCtrlClientSnafu)?;
match controller.delete_topic(topic, 10).await {
Ok(_) => {
info!("Successfully deleted topic {}", topic);
Ok(())
}
Err(e) => {
if Self::is_unknown_topic_err(&e) {
info!("The topic {} does not exist", topic);
Ok(())
} else {
panic!("Failed to delete a topic {}, error: {}", topic, e);
}
}
}
}
fn is_unknown_topic_err(e: &RsKafkaError) -> bool {
matches!(
e,
&RsKafkaError::ServerError {
protocol_error: rskafka::client::error::ProtocolError::UnknownTopicOrPartition,
..
}
)
}
pub async fn get_partition_client(&self, topic: &str) -> PartitionClient {
self.partition_client(topic).await.unwrap()
}
}
/// Builds a kafka [Client](rskafka::client::Client).
pub async fn build_kafka_client(connection: &KafkaConnectionConfig) -> Result<Client> {
pub async fn build_kafka_client(config: &MetasrvKafkaConfig) -> Result<Client> {
// Builds an kafka controller client for creating topics.
let broker_endpoints = common_wal::resolve_to_ipv4(&connection.broker_endpoints)
let broker_endpoints = common_wal::resolve_to_ipv4(&config.connection.broker_endpoints)
.await
.context(ResolveKafkaEndpointSnafu)?;
let mut builder = ClientBuilder::new(broker_endpoints).backoff_config(DEFAULT_BACKOFF_CONFIG);
if let Some(sasl) = &connection.sasl {
if let Some(sasl) = &config.connection.sasl {
builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
};
if let Some(tls) = &connection.tls {
if let Some(tls) = &config.connection.tls {
builder = builder.tls_config(tls.to_tls_config().await.context(TlsConfigSnafu)?)
};
builder
.build()
.await
.with_context(|_| BuildKafkaClientSnafu {
broker_endpoints: connection.broker_endpoints.clone(),
broker_endpoints: config.connection.broker_endpoints.clone(),
})
}
/// Builds a [KafkaTopicCreator].
pub async fn build_kafka_topic_creator(
connection: &KafkaConnectionConfig,
kafka_topic: &KafkaTopicConfig,
) -> Result<KafkaTopicCreator> {
let client = build_kafka_client(connection).await?;
pub async fn build_kafka_topic_creator(config: &MetasrvKafkaConfig) -> Result<KafkaTopicCreator> {
let client = build_kafka_client(config).await?;
Ok(KafkaTopicCreator {
client,
num_partitions: kafka_topic.num_partitions,
replication_factor: kafka_topic.replication_factor,
create_topic_timeout: kafka_topic.create_topic_timeout.as_millis() as i32,
num_partitions: config.kafka_topic.num_partitions,
replication_factor: config.kafka_topic.replication_factor,
create_topic_timeout: config.kafka_topic.create_topic_timeout.as_millis() as i32,
})
}
#[cfg(test)]
mod tests {
use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
use common_wal::maybe_skip_kafka_integration_test;
use common_wal::test_util::get_kafka_endpoints;
use super::*;
async fn test_topic_creator(broker_endpoints: Vec<String>) -> KafkaTopicCreator {
let connection = KafkaConnectionConfig {
broker_endpoints,
..Default::default()
};
let kafka_topic = KafkaTopicConfig::default();
build_kafka_topic_creator(&connection, &kafka_topic)
.await
.unwrap()
}
async fn append_records(partition_client: &PartitionClient, num_records: usize) -> Result<()> {
for i in 0..num_records {
partition_client
.produce(
vec![Record {
key: Some(b"test".to_vec()),
value: Some(format!("test {}", i).as_bytes().to_vec()),
timestamp: chrono::Utc::now(),
headers: Default::default(),
}],
Compression::Lz4,
)
.await
.unwrap();
}
Ok(())
}
#[tokio::test]
async fn test_append_noop_record_to_empty_topic() {
common_telemetry::init_default_ut_logging();
maybe_skip_kafka_integration_test!();
let prefix = "append_noop_record_to_empty_topic";
let creator = test_topic_creator(get_kafka_endpoints()).await;
let topic = format!("{}{}", prefix, "0");
// Clean up the topics before test
creator.delete_topics(&[topic.to_string()]).await.unwrap();
creator.create_topics(&[topic.to_string()]).await.unwrap();
let partition_client = creator.partition_client(&topic).await.unwrap();
let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
assert_eq!(end_offset, 0);
// The topic is not empty, so no noop record is appended.
creator
.append_noop_record(&topic, &partition_client)
.await
.unwrap();
let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
assert_eq!(end_offset, 1);
}
#[tokio::test]
async fn test_append_noop_record_to_non_empty_topic() {
common_telemetry::init_default_ut_logging();
maybe_skip_kafka_integration_test!();
let prefix = "append_noop_record_to_non_empty_topic";
let creator = test_topic_creator(get_kafka_endpoints()).await;
let topic = format!("{}{}", prefix, "0");
// Clean up the topics before test
creator.delete_topics(&[topic.to_string()]).await.unwrap();
creator.create_topics(&[topic.to_string()]).await.unwrap();
let partition_client = creator.partition_client(&topic).await.unwrap();
append_records(&partition_client, 2).await.unwrap();
let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
assert_eq!(end_offset, 2);
// The topic is not empty, so no noop record is appended.
creator
.append_noop_record(&topic, &partition_client)
.await
.unwrap();
let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
assert_eq!(end_offset, 2);
}
#[tokio::test]
async fn test_create_topic() {
common_telemetry::init_default_ut_logging();
maybe_skip_kafka_integration_test!();
let prefix = "create_topic";
let creator = test_topic_creator(get_kafka_endpoints()).await;
let topic = format!("{}{}", prefix, "0");
// Clean up the topics before test
creator.delete_topics(&[topic.to_string()]).await.unwrap();
creator.create_topics(&[topic.to_string()]).await.unwrap();
// Should be ok
creator.create_topics(&[topic.to_string()]).await.unwrap();
let partition_client = creator.partition_client(&topic).await.unwrap();
let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
assert_eq!(end_offset, 0);
}
#[tokio::test]
async fn test_prepare_topic() {
common_telemetry::init_default_ut_logging();
maybe_skip_kafka_integration_test!();
let prefix = "prepare_topic";
let creator = test_topic_creator(get_kafka_endpoints()).await;
let topic = format!("{}{}", prefix, "0");
// Clean up the topics before test
creator.delete_topics(&[topic.to_string()]).await.unwrap();
creator.create_topics(&[topic.to_string()]).await.unwrap();
creator.prepare_topic(&topic).await.unwrap();
let partition_client = creator.partition_client(&topic).await.unwrap();
let start_offset = partition_client
.get_offset(OffsetAt::Earliest)
.await
.unwrap();
assert_eq!(start_offset, 0);
let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
assert_eq!(end_offset, 1);
}
#[tokio::test]
async fn test_prepare_topic_with_stale_records_without_pruning() {
common_telemetry::init_default_ut_logging();
maybe_skip_kafka_integration_test!();
let prefix = "prepare_topic_with_stale_records_without_pruning";
let creator = test_topic_creator(get_kafka_endpoints()).await;
let topic = format!("{}{}", prefix, "0");
// Clean up the topics before test
creator.delete_topics(&[topic.to_string()]).await.unwrap();
creator.create_topics(&[topic.to_string()]).await.unwrap();
let partition_client = creator.partition_client(&topic).await.unwrap();
append_records(&partition_client, 10).await.unwrap();
creator.prepare_topic(&topic).await.unwrap();
let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
assert_eq!(end_offset, 10);
let start_offset = partition_client
.get_offset(OffsetAt::Earliest)
.await
.unwrap();
assert_eq!(start_offset, 0);
}
}

View File

@@ -40,21 +40,24 @@ impl KafkaTopicManager {
Ok(topics)
}
/// Returns the topics that are not prepared.
pub async fn unprepare_topics(&self, all_topics: &[String]) -> Result<Vec<String>> {
/// Restores topics from the key-value backend. and returns the topics that are not stored in kvbackend.
pub async fn get_topics_to_create<'a>(
&self,
all_topics: &'a [String],
) -> Result<Vec<&'a String>> {
let existing_topics = self.restore_topics().await?;
let existing_topic_set = existing_topics.iter().collect::<HashSet<_>>();
let mut topics_to_create = Vec::with_capacity(all_topics.len());
for topic in all_topics {
if !existing_topic_set.contains(topic) {
topics_to_create.push(topic.to_string());
topics_to_create.push(topic);
}
}
Ok(topics_to_create)
}
/// Persists prepared topics into the key-value backend.
pub async fn persist_prepared_topics(&self, topics: &[String]) -> Result<()> {
/// Persists topics into the key-value backend.
pub async fn persist_topics(&self, topics: &[String]) -> Result<()> {
self.topic_name_manager
.batch_put(
topics
@@ -67,14 +70,6 @@ impl KafkaTopicManager {
}
}
#[cfg(test)]
impl KafkaTopicManager {
/// Lists all topics in the key-value backend.
pub async fn list_topics(&self) -> Result<Vec<String>> {
self.topic_name_manager.range().await
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
@@ -95,11 +90,11 @@ mod tests {
// No legacy topics.
let mut topics_to_be_created = topic_kvbackend_manager
.unprepare_topics(&all_topics)
.get_topics_to_create(&all_topics)
.await
.unwrap();
topics_to_be_created.sort();
let mut expected = all_topics.clone();
let mut expected = all_topics.iter().collect::<Vec<_>>();
expected.sort();
assert_eq!(expected, topics_to_be_created);
@@ -114,7 +109,7 @@ mod tests {
assert!(res.prev_kv.is_none());
let topics_to_be_created = topic_kvbackend_manager
.unprepare_topics(&all_topics)
.get_topics_to_create(&all_topics)
.await
.unwrap();
assert!(topics_to_be_created.is_empty());
@@ -149,21 +144,21 @@ mod tests {
let topic_kvbackend_manager = KafkaTopicManager::new(kv_backend);
let mut topics_to_be_created = topic_kvbackend_manager
.unprepare_topics(&all_topics)
.get_topics_to_create(&all_topics)
.await
.unwrap();
topics_to_be_created.sort();
let mut expected = all_topics.clone();
let mut expected = all_topics.iter().collect::<Vec<_>>();
expected.sort();
assert_eq!(expected, topics_to_be_created);
// Persists topics to kv backend.
topic_kvbackend_manager
.persist_prepared_topics(&all_topics)
.persist_topics(&all_topics)
.await
.unwrap();
let topics_to_be_created = topic_kvbackend_manager
.unprepare_topics(&all_topics)
.get_topics_to_create(&all_topics)
.await
.unwrap();
assert!(topics_to_be_created.is_empty());

View File

@@ -15,7 +15,6 @@
use std::fmt::{self, Formatter};
use std::sync::Arc;
use common_telemetry::info;
use common_wal::config::kafka::MetasrvKafkaConfig;
use common_wal::TopicSelectorType;
use snafu::ensure;
@@ -78,35 +77,27 @@ impl KafkaTopicPool {
}
/// Tries to activate the topic manager when metasrv becomes the leader.
///
/// First tries to restore persisted topics from the kv backend.
/// If there are unprepared topics (topics that exist in the configuration but not in the kv backend),
/// it will create these topics in Kafka if `auto_create_topics` is enabled.
///
/// Then it prepares all unprepared topics by appending a noop record if the topic is empty,
/// and persists them in the kv backend for future use.
/// If not enough topics retrieved, it will try to contact the Kafka cluster and request creating more topics.
pub async fn activate(&self) -> Result<()> {
if !self.auto_create_topics {
return Ok(());
}
let num_topics = self.topics.len();
ensure!(num_topics > 0, InvalidNumTopicsSnafu { num_topics });
let unprepared_topics = self.topic_manager.unprepare_topics(&self.topics).await?;
let topics_to_be_created = self
.topic_manager
.get_topics_to_create(&self.topics)
.await?;
if !unprepared_topics.is_empty() {
if self.auto_create_topics {
info!("Creating {} topics", unprepared_topics.len());
self.topic_creator.create_topics(&unprepared_topics).await?;
} else {
info!("Auto create topics is disabled, skipping topic creation.");
}
if !topics_to_be_created.is_empty() {
self.topic_creator
.prepare_topics(&unprepared_topics)
.await?;
self.topic_manager
.persist_prepared_topics(&unprepared_topics)
.prepare_topics(&topics_to_be_created)
.await?;
self.topic_manager.persist_topics(&self.topics).await?;
}
info!("Activated topic pool with {} topics", self.topics.len());
Ok(())
}
@@ -123,147 +114,77 @@ impl KafkaTopicPool {
}
}
#[cfg(test)]
impl KafkaTopicPool {
pub(crate) fn topic_manager(&self) -> &KafkaTopicManager {
&self.topic_manager
}
pub(crate) fn topic_creator(&self) -> &KafkaTopicCreator {
&self.topic_creator
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use common_wal::maybe_skip_kafka_integration_test;
use common_wal::test_util::get_kafka_endpoints;
use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
use common_wal::test_util::run_test_with_kafka_wal;
use super::*;
use crate::error::Error;
use crate::test_util::test_kafka_topic_pool;
use crate::wal_options_allocator::selector::RoundRobinTopicSelector;
#[tokio::test]
async fn test_pool_invalid_number_topics_err() {
common_telemetry::init_default_ut_logging();
maybe_skip_kafka_integration_test!();
let endpoints = get_kafka_endpoints();
let pool = test_kafka_topic_pool(endpoints.clone(), 0, false, None).await;
let err = pool.activate().await.unwrap_err();
assert_matches!(err, Error::InvalidNumTopics { .. });
let pool = test_kafka_topic_pool(endpoints, 0, true, None).await;
let err = pool.activate().await.unwrap_err();
assert_matches!(err, Error::InvalidNumTopics { .. });
}
#[tokio::test]
async fn test_pool_activate_unknown_topics_err() {
common_telemetry::init_default_ut_logging();
maybe_skip_kafka_integration_test!();
let pool =
test_kafka_topic_pool(get_kafka_endpoints(), 1, false, Some("unknown_topic")).await;
let err = pool.activate().await.unwrap_err();
assert_matches!(err, Error::KafkaPartitionClient { .. });
}
#[tokio::test]
async fn test_pool_activate() {
common_telemetry::init_default_ut_logging();
maybe_skip_kafka_integration_test!();
let pool =
test_kafka_topic_pool(get_kafka_endpoints(), 2, true, Some("pool_activate")).await;
// clean up the topics before test
let topic_creator = pool.topic_creator();
topic_creator.delete_topics(&pool.topics).await.unwrap();
let topic_manager = pool.topic_manager();
pool.activate().await.unwrap();
let topics = topic_manager.list_topics().await.unwrap();
assert_eq!(topics.len(), 2);
}
#[tokio::test]
async fn test_pool_activate_with_existing_topics() {
common_telemetry::init_default_ut_logging();
maybe_skip_kafka_integration_test!();
let prefix = "pool_activate_with_existing_topics";
let pool = test_kafka_topic_pool(get_kafka_endpoints(), 2, true, Some(prefix)).await;
let topic_creator = pool.topic_creator();
topic_creator.delete_topics(&pool.topics).await.unwrap();
let topic_manager = pool.topic_manager();
// persists one topic info, then pool.activate() will create new topics that not persisted.
topic_manager
.persist_prepared_topics(&pool.topics[0..1])
.await
.unwrap();
pool.activate().await.unwrap();
let topics = topic_manager.list_topics().await.unwrap();
assert_eq!(topics.len(), 2);
let client = pool.topic_creator().client();
let topics = client
.list_topics()
.await
.unwrap()
.into_iter()
.filter(|t| t.name.starts_with(prefix))
.collect::<Vec<_>>();
assert_eq!(topics.len(), 1);
}
use crate::kv_backend::memory::MemoryKvBackend;
use crate::wal_options_allocator::topic_creator::build_kafka_topic_creator;
/// Tests that the topic manager could allocate topics correctly.
#[tokio::test]
async fn test_alloc_topics() {
common_telemetry::init_default_ut_logging();
maybe_skip_kafka_integration_test!();
let num_topics = 5;
let mut topic_pool = test_kafka_topic_pool(
get_kafka_endpoints(),
num_topics,
true,
Some("test_allocator_with_kafka"),
)
run_test_with_kafka_wal(|broker_endpoints| {
Box::pin(async {
// Constructs topics that should be created.
let topics = (0..256)
.map(|i| format!("test_alloc_topics_{}_{}", i, uuid::Uuid::new_v4()))
.collect::<Vec<_>>();
// Creates a topic manager.
let kafka_topic = KafkaTopicConfig {
replication_factor: broker_endpoints.len() as i16,
..Default::default()
};
let config = MetasrvKafkaConfig {
connection: KafkaConnectionConfig {
broker_endpoints,
..Default::default()
},
kafka_topic,
..Default::default()
};
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
let topic_creator = build_kafka_topic_creator(&config).await.unwrap();
let mut topic_pool = KafkaTopicPool::new(&config, kv_backend, topic_creator);
// Replaces the default topic pool with the constructed topics.
topic_pool.topics.clone_from(&topics);
// Replaces the default selector with a round-robin selector without shuffled.
topic_pool.selector = Arc::new(RoundRobinTopicSelector::default());
topic_pool.activate().await.unwrap();
// Selects exactly the number of `num_topics` topics one by one.
let got = (0..topics.len())
.map(|_| topic_pool.select().unwrap())
.cloned()
.collect::<Vec<_>>();
assert_eq!(got, topics);
// Selects exactly the number of `num_topics` topics in a batching manner.
let got = topic_pool
.select_batch(topics.len())
.unwrap()
.into_iter()
.map(ToString::to_string)
.collect::<Vec<_>>();
assert_eq!(got, topics);
// Selects more than the number of `num_topics` topics.
let got = topic_pool
.select_batch(2 * topics.len())
.unwrap()
.into_iter()
.map(ToString::to_string)
.collect::<Vec<_>>();
let expected = vec![topics.clone(); 2]
.into_iter()
.flatten()
.collect::<Vec<_>>();
assert_eq!(got, expected);
})
})
.await;
topic_pool.selector = Arc::new(RoundRobinTopicSelector::default());
let topics = topic_pool.topics.clone();
// clean up the topics before test
let topic_creator = topic_pool.topic_creator();
topic_creator.delete_topics(&topics).await.unwrap();
// Selects exactly the number of `num_topics` topics one by one.
let got = (0..topics.len())
.map(|_| topic_pool.select().unwrap())
.cloned()
.collect::<Vec<_>>();
assert_eq!(got, topics);
// Selects exactly the number of `num_topics` topics in a batching manner.
let got = topic_pool
.select_batch(topics.len())
.unwrap()
.into_iter()
.map(ToString::to_string)
.collect::<Vec<_>>();
assert_eq!(got, topics);
// Selects more than the number of `num_topics` topics.
let got = topic_pool
.select_batch(2 * topics.len())
.unwrap()
.into_iter()
.map(ToString::to_string)
.collect::<Vec<_>>();
let expected = vec![topics.clone(); 2]
.into_iter()
.flatten()
.collect::<Vec<_>>();
assert_eq!(got, expected);
}
}

View File

@@ -24,7 +24,7 @@ use datatypes::prelude::*;
use datatypes::vectors::{Helper as VectorHelper, VectorRef};
use snafu::ResultExt;
use crate::error::{self, Error, FromScalarValueSnafu, IntoVectorSnafu, Result};
use crate::error::{self, FromScalarValueSnafu, IntoVectorSnafu, Result};
use crate::prelude::*;
pub type AggregateFunctionCreatorRef = Arc<dyn AggregateFunctionCreator>;
@@ -166,8 +166,7 @@ impl DfAccumulator for DfAccumulatorAdaptor {
let output_type = self.creator.output_type()?;
let scalar_value = value
.try_to_scalar_value(&output_type)
.context(error::ToScalarValueSnafu)
.map_err(Error::from)?;
.context(error::ToScalarValueSnafu)?;
Ok(scalar_value)
}

View File

@@ -23,16 +23,11 @@ use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
/// The default backoff config for kafka client.
///
/// If the operation fails, the client will retry 3 times.
/// The backoff time is 100ms, 300ms, 900ms.
pub const DEFAULT_BACKOFF_CONFIG: BackoffConfig = BackoffConfig {
init_backoff: Duration::from_millis(100),
max_backoff: Duration::from_secs(1),
base: 3.0,
// The deadline shouldn't be too long,
// otherwise the client will block the worker loop for a long time.
deadline: Some(Duration::from_secs(3)),
max_backoff: Duration::from_secs(10),
base: 2.0,
deadline: Some(Duration::from_secs(120)),
};
/// Default interval for auto WAL pruning.

View File

@@ -31,33 +31,3 @@ where
test(endpoints).await
}
/// Get the kafka endpoints from the environment variable `GT_KAFKA_ENDPOINTS`.
///
/// The format of the environment variable is:
/// ```
/// GT_KAFKA_ENDPOINTS=localhost:9092,localhost:9093
/// ```
pub fn get_kafka_endpoints() -> Vec<String> {
let endpoints = std::env::var("GT_KAFKA_ENDPOINTS").unwrap();
endpoints
.split(',')
.map(|s| s.trim().to_string())
.collect::<Vec<_>>()
}
#[macro_export]
/// Skip the test if the environment variable `GT_KAFKA_ENDPOINTS` is not set.
///
/// The format of the environment variable is:
/// ```
/// GT_KAFKA_ENDPOINTS=localhost:9092,localhost:9093
/// ```
macro_rules! maybe_skip_kafka_integration_test {
() => {
if std::env::var("GT_KAFKA_ENDPOINTS").is_err() {
common_telemetry::warn!("The endpoints is empty, skipping the test");
return;
}
};
}

View File

@@ -398,46 +398,45 @@ impl DatanodeBuilder {
schema_metadata_manager: SchemaMetadataManagerRef,
plugins: Plugins,
) -> Result<Vec<RegionEngineRef>> {
let mut metric_engine_config = metric_engine::config::EngineConfig::default();
let mut mito_engine_config = MitoConfig::default();
let mut file_engine_config = file_engine::config::EngineConfig::default();
let mut engines = vec![];
let mut metric_engine_config = opts.region_engine.iter().find_map(|c| match c {
RegionEngineConfig::Metric(config) => Some(config.clone()),
_ => None,
});
for engine in &opts.region_engine {
match engine {
RegionEngineConfig::Mito(config) => {
mito_engine_config = config.clone();
let mito_engine = Self::build_mito_engine(
opts,
object_store_manager.clone(),
config.clone(),
schema_metadata_manager.clone(),
plugins.clone(),
)
.await?;
let metric_engine = MetricEngine::try_new(
mito_engine.clone(),
metric_engine_config.take().unwrap_or_default(),
)
.context(BuildMetricEngineSnafu)?;
engines.push(Arc::new(mito_engine) as _);
engines.push(Arc::new(metric_engine) as _);
}
RegionEngineConfig::File(config) => {
file_engine_config = config.clone();
let engine = FileRegionEngine::new(
config.clone(),
object_store_manager.default_object_store().clone(), // TODO: implement custom storage for file engine
);
engines.push(Arc::new(engine) as _);
}
RegionEngineConfig::Metric(metric_config) => {
metric_engine_config = metric_config.clone();
RegionEngineConfig::Metric(_) => {
// Already handled in `build_mito_engine`.
}
}
}
let mito_engine = Self::build_mito_engine(
opts,
object_store_manager.clone(),
mito_engine_config,
schema_metadata_manager.clone(),
plugins.clone(),
)
.await?;
let metric_engine = MetricEngine::try_new(mito_engine.clone(), metric_engine_config)
.context(BuildMetricEngineSnafu)?;
let file_engine = FileRegionEngine::new(
file_engine_config,
object_store_manager.default_object_store().clone(), // TODO: implement custom storage for file engine
);
Ok(vec![
Arc::new(mito_engine) as _,
Arc::new(metric_engine) as _,
Arc::new(file_engine) as _,
])
Ok(engines)
}
/// Builds [MitoEngine] according to options.

View File

@@ -39,6 +39,7 @@ pub struct RegionHeartbeatResponseHandler {
region_server: RegionServer,
catchup_tasks: TaskTracker<()>,
downgrade_tasks: TaskTracker<()>,
flush_tasks: TaskTracker<()>,
}
/// Handler of the instruction.
@@ -50,6 +51,7 @@ pub struct HandlerContext {
region_server: RegionServer,
catchup_tasks: TaskTracker<()>,
downgrade_tasks: TaskTracker<()>,
flush_tasks: TaskTracker<()>,
}
impl HandlerContext {
@@ -63,6 +65,7 @@ impl HandlerContext {
region_server,
catchup_tasks: TaskTracker::new(),
downgrade_tasks: TaskTracker::new(),
flush_tasks: TaskTracker::new(),
}
}
}
@@ -74,6 +77,7 @@ impl RegionHeartbeatResponseHandler {
region_server,
catchup_tasks: TaskTracker::new(),
downgrade_tasks: TaskTracker::new(),
flush_tasks: TaskTracker::new(),
}
}
@@ -95,8 +99,11 @@ impl RegionHeartbeatResponseHandler {
handler_context.handle_upgrade_region_instruction(upgrade_region)
})),
Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(),
Instruction::FlushRegion(flush_regions) => Ok(Box::new(move |handler_context| {
handler_context.handle_flush_region_instruction(flush_regions)
Instruction::FlushRegions(flush_regions) => Ok(Box::new(move |handler_context| {
handler_context.handle_flush_regions_instruction(flush_regions)
})),
Instruction::FlushRegion(flush_region) => Ok(Box::new(move |handler_context| {
handler_context.handle_flush_region_instruction(flush_region)
})),
}
}
@@ -111,6 +118,7 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
| Some((_, Instruction::CloseRegion { .. }))
| Some((_, Instruction::DowngradeRegion { .. }))
| Some((_, Instruction::UpgradeRegion { .. }))
| Some((_, Instruction::FlushRegion { .. }))
)
}
@@ -124,12 +132,14 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
let region_server = self.region_server.clone();
let catchup_tasks = self.catchup_tasks.clone();
let downgrade_tasks = self.downgrade_tasks.clone();
let flush_tasks = self.flush_tasks.clone();
let handler = Self::build_handler(instruction)?;
let _handle = common_runtime::spawn_global(async move {
let reply = handler(HandlerContext {
region_server,
catchup_tasks,
downgrade_tasks,
flush_tasks,
})
.await;

View File

@@ -12,16 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::instruction::{FlushRegions, InstructionReply};
use common_meta::instruction::{FlushRegions, InstructionReply, SimpleReply};
use common_telemetry::warn;
use futures_util::future::BoxFuture;
use store_api::region_request::{RegionFlushRequest, RegionRequest};
use store_api::storage::RegionId;
use crate::error;
use crate::heartbeat::handler::HandlerContext;
impl HandlerContext {
pub(crate) fn handle_flush_region_instruction(
pub(crate) fn handle_flush_regions_instruction(
self,
flush_regions: FlushRegions,
) -> BoxFuture<'static, Option<InstructionReply>> {
@@ -49,6 +50,59 @@ impl HandlerContext {
None
})
}
pub(crate) fn handle_flush_region_instruction(
self,
region_id: RegionId,
) -> BoxFuture<'static, Option<InstructionReply>> {
Box::pin(async move {
let Some(writable) = self.region_server.is_region_leader(region_id) else {
return Some(InstructionReply::FlushRegion(SimpleReply {
result: false,
error: Some("Region is not leader".to_string()),
}));
};
if !writable {
return Some(InstructionReply::FlushRegion(SimpleReply {
result: false,
error: Some("Region is not writable".to_string()),
}));
}
let region_server_moved = self.region_server.clone();
let register_result = self
.flush_tasks
.try_register(
region_id,
Box::pin(async move {
let request = RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
});
region_server_moved
.handle_request(region_id, request)
.await?;
Ok(())
}),
)
.await;
if register_result.is_busy() {
warn!("Another flush task is running for the region: {region_id}");
}
let mut watcher = register_result.into_watcher();
let result = self.flush_tasks.wait_until_finish(&mut watcher).await;
match result {
Ok(()) => Some(InstructionReply::FlushRegion(SimpleReply {
result: true,
error: None,
})),
Err(err) => Some(InstructionReply::FlushRegion(SimpleReply {
result: false,
error: Some(format!("{err:?}")),
})),
}
})
}
}
#[cfg(test)]
@@ -84,7 +138,7 @@ mod tests {
let reply = handler_context
.clone()
.handle_flush_region_instruction(FlushRegions {
.handle_flush_regions_instruction(FlushRegions {
region_ids: region_ids.clone(),
})
.await;
@@ -94,7 +148,7 @@ mod tests {
flushed_region_ids.write().unwrap().clear();
let not_found_region_ids = (0..2).map(|i| RegionId::new(2048, i)).collect::<Vec<_>>();
let reply = handler_context
.handle_flush_region_instruction(FlushRegions {
.handle_flush_regions_instruction(FlushRegions {
region_ids: not_found_region_ids.clone(),
})
.await;

View File

@@ -144,6 +144,11 @@ impl<T: Send + Sync + Clone + 'static> TaskTracker<T> {
}
}
/// Waits for a [RegisterResult] and returns a [WaitResult].
pub(crate) async fn wait_until_finish(&self, watcher: &mut TaskWatcher<T>) -> Result<T> {
wait(watcher).await
}
/// Tries to register a new async task, returns [RegisterResult::Busy] if previous task is running.
pub(crate) async fn try_register(
&self,

View File

@@ -25,7 +25,6 @@ use std::sync::Arc;
use std::time::Duration;
use common_telemetry::{info, warn};
use mito2::access_layer::{ATOMIC_WRITE_DIR, OLD_ATOMIC_WRITE_DIR};
use object_store::layers::{LruCacheLayer, RetryInterceptor, RetryLayer};
use object_store::services::Fs;
use object_store::util::{join_dir, normalize_dir, with_instrument_layers};
@@ -169,13 +168,9 @@ async fn build_cache_layer(
if let Some(path) = cache_path.as_ref()
&& !path.trim().is_empty()
{
let atomic_temp_dir = join_dir(path, ATOMIC_WRITE_DIR);
let atomic_temp_dir = join_dir(path, ".tmp/");
clean_temp_dir(&atomic_temp_dir)?;
// Compatible code. Remove this after a major release.
let old_atomic_temp_dir = join_dir(path, OLD_ATOMIC_WRITE_DIR);
clean_temp_dir(&old_atomic_temp_dir)?;
let cache_store = Fs::default()
.root(path)
.atomic_write_dir(&atomic_temp_dir)

View File

@@ -15,7 +15,6 @@
use std::{fs, path};
use common_telemetry::info;
use mito2::access_layer::{ATOMIC_WRITE_DIR, OLD_ATOMIC_WRITE_DIR};
use object_store::services::Fs;
use object_store::util::join_dir;
use object_store::ObjectStore;
@@ -34,13 +33,9 @@ pub async fn new_fs_object_store(
.context(error::CreateDirSnafu { dir: data_home })?;
info!("The file storage home is: {}", data_home);
let atomic_write_dir = join_dir(data_home, ATOMIC_WRITE_DIR);
let atomic_write_dir = join_dir(data_home, ".tmp/");
store::clean_temp_dir(&atomic_write_dir)?;
// Compatible code. Remove this after a major release.
let old_atomic_temp_dir = join_dir(data_home, OLD_ATOMIC_WRITE_DIR);
store::clean_temp_dir(&old_atomic_temp_dir)?;
let builder = Fs::default()
.root(data_home)
.atomic_write_dir(&atomic_write_dir);

View File

@@ -253,9 +253,10 @@ fn create_current_timestamp_vector(
data_type: &ConcreteDataType,
num_rows: usize,
) -> Result<VectorRef> {
let current_timestamp_vector = TimestampMillisecondVector::from_values(
std::iter::repeat(util::current_time_millis()).take(num_rows),
);
let current_timestamp_vector = TimestampMillisecondVector::from_values(std::iter::repeat_n(
util::current_time_millis(),
num_rows,
));
if data_type.is_timestamp() {
current_timestamp_vector.cast(data_type)
} else {

View File

@@ -198,8 +198,7 @@ impl fmt::Debug for ConstantVector {
impl Serializable for ConstantVector {
fn serialize_to_json(&self) -> Result<Vec<serde_json::Value>> {
std::iter::repeat(self.get(0))
.take(self.len())
std::iter::repeat_n(self.get(0), self.len())
.map(serde_json::Value::try_from)
.collect::<serde_json::Result<_>>()
.context(SerializeSnafu)

View File

@@ -412,7 +412,7 @@ pub(crate) fn replicate_decimal128(
// Safety: std::iter::Repeat and std::iter::Take implement TrustedLen.
builder
.mutable_array
.append_trusted_len_iter(std::iter::repeat(data).take(repeat_times));
.append_trusted_len_iter(std::iter::repeat_n(data, repeat_times));
}
}
None => {

View File

@@ -120,9 +120,7 @@ impl fmt::Debug for NullVector {
impl Serializable for NullVector {
fn serialize_to_json(&self) -> Result<Vec<serde_json::Value>> {
Ok(std::iter::repeat(serde_json::Value::Null)
.take(self.len())
.collect())
Ok(std::iter::repeat_n(serde_json::Value::Null, self.len()).collect())
}
}

View File

@@ -388,7 +388,7 @@ pub(crate) fn replicate_primitive<T: LogicalPrimitiveType>(
// Safety: std::iter::Repeat and std::iter::Take implement TrustedLen.
builder
.mutable_array
.append_trusted_len_iter(std::iter::repeat(data).take(repeat_times));
.append_trusted_len_iter(std::iter::repeat_n(data, repeat_times));
}
}
None => {

View File

@@ -16,7 +16,6 @@ async-trait.workspace = true
bytes.workspace = true
cache.workspace = true
catalog.workspace = true
chrono.workspace = true
client.workspace = true
common-base.workspace = true
common-config.workspace = true
@@ -40,13 +39,16 @@ datafusion-expr.workspace = true
datafusion-physical-expr.workspace = true
datafusion-substrait.workspace = true
datatypes.workspace = true
dfir_rs = { version = "0.13.0", default-features = false }
enum-as-inner = "0.6.0"
enum_dispatch = "0.3"
futures.workspace = true
get-size2 = "0.1.2"
greptime-proto.workspace = true
# This fork of hydroflow is simply for keeping our dependency in our org, and pin the version
# otherwise it is the same with upstream repo
chrono.workspace = true
http.workspace = true
hydroflow = { git = "https://github.com/GreptimeTeam/hydroflow.git", branch = "main" }
itertools.workspace = true
lazy_static.workspace = true
meta-client.workspace = true
@@ -58,7 +60,6 @@ partition.workspace = true
prometheus.workspace = true
prost.workspace = true
query.workspace = true
rand.workspace = true
serde.workspace = true
servers.workspace = true
session.workspace = true

View File

@@ -14,7 +14,6 @@
//! impl `FlowNode` trait for FlowNodeManager so standalone can call them
use std::collections::{HashMap, HashSet};
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use api::v1::flow::{
@@ -41,9 +40,9 @@ use crate::batching_mode::engine::BatchingEngine;
use crate::batching_mode::{FRONTEND_SCAN_TIMEOUT, MIN_REFRESH_DURATION};
use crate::engine::FlowEngine;
use crate::error::{
CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, FlowNotRecoveredSnafu,
IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu,
NoAvailableFrontendSnafu, SyncCheckTaskSnafu, UnexpectedSnafu,
CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, IllegalCheckTaskStateSnafu,
InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu, NoAvailableFrontendSnafu,
SyncCheckTaskSnafu, UnexpectedSnafu,
};
use crate::metrics::METRIC_FLOW_TASK_COUNT;
use crate::repr::{self, DiffRow};
@@ -64,7 +63,6 @@ pub struct FlowDualEngine {
flow_metadata_manager: Arc<FlowMetadataManager>,
catalog_manager: Arc<dyn CatalogManager>,
check_task: tokio::sync::Mutex<Option<ConsistentCheckTask>>,
done_recovering: AtomicBool,
}
impl FlowDualEngine {
@@ -81,55 +79,9 @@ impl FlowDualEngine {
flow_metadata_manager,
catalog_manager,
check_task: Mutex::new(None),
done_recovering: AtomicBool::new(false),
}
}
/// Set `done_recovering` to true
/// indicate that we are ready to handle requests
pub fn set_done_recovering(&self) {
info!("FlowDualEngine done recovering");
self.done_recovering
.store(true, std::sync::atomic::Ordering::Release);
}
/// Check if `done_recovering` is true
pub fn is_recover_done(&self) -> bool {
self.done_recovering
.load(std::sync::atomic::Ordering::Acquire)
}
/// wait for recovering to be done, this will only happen when flownode just started
async fn wait_for_all_flow_recover(&self, waiting_req_cnt: usize) -> Result<(), Error> {
if self.is_recover_done() {
return Ok(());
}
warn!(
"FlowDualEngine is not done recovering, {} insert request waiting for recovery",
waiting_req_cnt
);
// wait 3 seconds, check every 1 second
// TODO(discord9): make this configurable
let mut retry = 0;
let max_retry = 3;
while retry < max_retry && !self.is_recover_done() {
warn!(
"FlowDualEngine is not done recovering, retry {} in 1s",
retry
);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
retry += 1;
}
if retry == max_retry {
return FlowNotRecoveredSnafu.fail();
} else {
info!("FlowDualEngine is done recovering");
}
// TODO(discord9): also put to centralized logging for flow once it implemented
Ok(())
}
/// Determine if the engine is in distributed mode
pub fn is_distributed(&self) -> bool {
self.streaming_engine.node_id.is_some()
@@ -283,7 +235,7 @@ impl FlowDualEngine {
to_be_created
);
let mut errors = vec![];
for flow_id in to_be_created.clone() {
for flow_id in to_be_created {
let flow_id = *flow_id;
let info = self
.flow_metadata_manager
@@ -342,16 +294,12 @@ impl FlowDualEngine {
errors.push((flow_id, err));
}
}
if errors.is_empty() {
info!("Recover flows successfully, flows: {:?}", to_be_created);
}
for (flow_id, err) in errors {
warn!("Failed to recreate flow {}, err={:#?}", flow_id, err);
}
} else {
warn!(
"Flows do not exist in flownode for node {:?}, flow_ids={:?}",
"Flownode {:?} found flows not exist in flownode, flow_ids={:?}",
nodeid, to_be_created
);
}
@@ -371,7 +319,7 @@ impl FlowDualEngine {
}
} else {
warn!(
"Flows do not exist in metadata for node {:?}, flow_ids={:?}",
"Flownode {:?} found flows not exist in flownode, flow_ids={:?}",
nodeid, to_be_dropped
);
}
@@ -454,8 +402,6 @@ impl ConsistentCheckTask {
tokio::time::sleep(MIN_REFRESH_DURATION).await;
}
engine.set_done_recovering();
// then do check flows, with configurable allow_create and allow_drop
let (mut allow_create, mut allow_drop) = (false, false);
let mut ret_signal: Option<tokio::sync::oneshot::Sender<()>> = None;
@@ -675,14 +621,11 @@ impl FlowEngine for FlowDualEngine {
&self,
request: api::v1::region::InsertRequests,
) -> Result<(), Error> {
self.wait_for_all_flow_recover(request.requests.len())
.await?;
// TODO(discord9): make as little clone as possible
let mut to_stream_engine = Vec::with_capacity(request.requests.len());
let mut to_batch_engine = request.requests;
{
// not locking this, or recover flows will be starved when also handling flow inserts
let src_table2flow = self.src_table2flow.read().await;
to_batch_engine.retain(|req| {
let region_id = RegionId::from(req.region_id);
@@ -818,17 +761,9 @@ fn to_meta_err(
location: snafu::Location,
) -> impl FnOnce(crate::error::Error) -> common_meta::error::Error {
move |err: crate::error::Error| -> common_meta::error::Error {
match err {
crate::error::Error::FlowNotFound { id, .. } => {
common_meta::error::Error::FlowNotFound {
flow_name: format!("flow_id={id}"),
location,
}
}
_ => common_meta::error::Error::External {
location,
source: BoxedError::new(err),
},
common_meta::error::Error::External {
location,
source: BoxedError::new(err),
}
}
}

View File

@@ -19,8 +19,8 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use common_telemetry::info;
use dfir_rs::scheduled::graph::Dfir;
use enum_as_inner::EnumAsInner;
use hydroflow::scheduled::graph::Hydroflow;
use snafu::ensure;
use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
@@ -49,9 +49,9 @@ pub fn create_worker<'a>() -> (WorkerHandle, Worker<'a>) {
(worker_handle, worker)
}
/// ActiveDataflowState is a wrapper around `Dfir` and `DataflowState`
/// ActiveDataflowState is a wrapper around `Hydroflow` and `DataflowState`
pub(crate) struct ActiveDataflowState<'subgraph> {
df: Dfir<'subgraph>,
df: Hydroflow<'subgraph>,
state: DataflowState,
err_collector: ErrCollector,
}
@@ -59,7 +59,7 @@ pub(crate) struct ActiveDataflowState<'subgraph> {
impl std::fmt::Debug for ActiveDataflowState<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ActiveDataflowState")
.field("df", &"<Dfir>")
.field("df", &"<Hydroflow>")
.field("state", &self.state)
.field("err_collector", &self.err_collector)
.finish()
@@ -69,7 +69,7 @@ impl std::fmt::Debug for ActiveDataflowState<'_> {
impl Default for ActiveDataflowState<'_> {
fn default() -> Self {
ActiveDataflowState {
df: Dfir::new(),
df: Hydroflow::new(),
state: DataflowState::default(),
err_collector: ErrCollector::default(),
}

View File

@@ -39,8 +39,7 @@ use crate::batching_mode::time_window::{find_time_window_expr, TimeWindowExpr};
use crate::batching_mode::utils::sql_to_df_plan;
use crate::engine::FlowEngine;
use crate::error::{
ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, TableNotFoundMetaSnafu,
UnexpectedSnafu, UnsupportedSnafu,
ExternalSnafu, FlowAlreadyExistSnafu, TableNotFoundMetaSnafu, UnexpectedSnafu, UnsupportedSnafu,
};
use crate::{CreateFlowArgs, Error, FlowId, TableName};
@@ -304,7 +303,7 @@ impl BatchingEngine {
})
.transpose()?;
debug!(
info!(
"Flow id={}, found time window expr={}",
flow_id,
phy_expr
@@ -331,7 +330,7 @@ impl BatchingEngine {
let frontend = self.frontend_client.clone();
// check execute once first to detect any error early
task.check_or_create_sink_table(&engine, &frontend).await?;
task.check_execute(&engine, &frontend).await?;
// TODO(discord9): use time wheel or what for better
let handle = common_runtime::spawn_global(async move {
@@ -350,8 +349,7 @@ impl BatchingEngine {
pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> {
if self.tasks.write().await.remove(&flow_id).is_none() {
warn!("Flow {flow_id} not found in tasks");
FlowNotFoundSnafu { id: flow_id }.fail()?;
warn!("Flow {flow_id} not found in tasks")
}
let Some(tx) = self.shutdown_txs.write().await.remove(&flow_id) else {
UnexpectedSnafu {
@@ -368,7 +366,9 @@ impl BatchingEngine {
pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result<usize, Error> {
debug!("Try flush flow {flow_id}");
let task = self.tasks.read().await.get(&flow_id).cloned();
let task = task.with_context(|| FlowNotFoundSnafu { id: flow_id })?;
let task = task.with_context(|| UnexpectedSnafu {
reason: format!("Can't found task for flow {flow_id}"),
})?;
task.mark_all_windows_as_dirty()?;

View File

@@ -27,9 +27,8 @@ use common_meta::peer::Peer;
use common_meta::rpc::store::RangeRequest;
use common_query::Output;
use common_telemetry::warn;
use itertools::Itertools;
use meta_client::client::MetaClient;
use rand::rng;
use rand::seq::SliceRandom;
use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{OptionExt, ResultExt};
@@ -178,9 +177,8 @@ impl FrontendClient {
Ok(res)
}
/// Get the frontend with recent enough(less than 1 minute from now) `last_activity_ts`
/// and is able to process query
async fn get_random_active_frontend(
/// Get the database with maximum `last_activity_ts`& is able to process query
async fn get_latest_active_frontend(
&self,
catalog: &str,
schema: &str,
@@ -199,17 +197,17 @@ impl FrontendClient {
let mut interval = tokio::time::interval(GRPC_CONN_TIMEOUT);
interval.tick().await;
for retry in 0..GRPC_MAX_RETRIES {
let mut frontends = self.scan_for_frontend().await?;
let frontends = self.scan_for_frontend().await?;
let now_in_ms = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis() as i64;
// shuffle the frontends to avoid always pick the same one
frontends.shuffle(&mut rng());
// found node with maximum last_activity_ts
for (_, node_info) in frontends
.iter()
.sorted_by_key(|(_, node_info)| node_info.last_activity_ts)
.rev()
// filter out frontend that have been down for more than 1 min
.filter(|(_, node_info)| {
node_info.last_activity_ts + FRONTEND_ACTIVITY_TIMEOUT.as_millis() as i64
@@ -269,7 +267,7 @@ impl FrontendClient {
) -> Result<u32, Error> {
match self {
FrontendClient::Distributed { .. } => {
let db = self.get_random_active_frontend(catalog, schema).await?;
let db = self.get_latest_active_frontend(catalog, schema).await?;
*peer_desc = Some(PeerDesc::Dist {
peer: db.peer.clone(),

View File

@@ -71,33 +71,18 @@ impl TaskState {
self.last_update_time = Instant::now();
}
/// Compute the next query delay based on the time window size or the last query duration.
/// Aiming to avoid too frequent queries. But also not too long delay.
/// The delay is computed as follows:
/// - If `time_window_size` is set, the delay is half the time window size, constrained to be
/// at least `last_query_duration` and at most `max_timeout`.
/// - If `time_window_size` is not set, the delay defaults to `last_query_duration`, constrained
/// to be at least `MIN_REFRESH_DURATION` and at most `max_timeout`.
/// wait for at least `last_query_duration`, at most `max_timeout` to start next query
///
/// If there are dirty time windows, the function returns an immediate execution time to clean them.
/// TODO: Make this behavior configurable.
/// if have more dirty time window, exec next query immediately
pub fn get_next_start_query_time(
&self,
flow_id: FlowId,
time_window_size: &Option<Duration>,
max_timeout: Option<Duration>,
) -> Instant {
let last_duration = max_timeout
let next_duration = max_timeout
.unwrap_or(self.last_query_duration)
.min(self.last_query_duration)
.max(MIN_REFRESH_DURATION);
let next_duration = time_window_size
.map(|t| {
let half = t / 2;
half.max(last_duration)
})
.unwrap_or(last_duration);
.min(self.last_query_duration);
let next_duration = next_duration.max(MIN_REFRESH_DURATION);
// if have dirty time window, execute immediately to clean dirty time window
if self.dirty_time_windows.windows.is_empty() {

View File

@@ -142,12 +142,26 @@ impl BatchingTask {
Ok(())
}
/// Create sink table if not exists
pub async fn check_or_create_sink_table(
/// Test execute, for check syntax or such
pub async fn check_execute(
&self,
engine: &QueryEngineRef,
frontend_client: &Arc<FrontendClient>,
) -> Result<Option<(u32, Duration)>, Error> {
// use current time to test get a dirty time window, which should be safe
let start = SystemTime::now();
let ts = Timestamp::new_second(
start
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_secs() as _,
);
self.state
.write()
.unwrap()
.dirty_time_windows
.add_lower_bounds(vec![ts].into_iter());
if !self.is_table_exist(&self.config.sink_table_name).await? {
let create_table = self.gen_create_table_expr(engine.clone()).await?;
info!(
@@ -160,8 +174,7 @@ impl BatchingTask {
self.config.sink_table_name.join(".")
);
}
Ok(None)
self.gen_exec_once(engine, frontend_client).await
}
async fn is_table_exist(&self, table_name: &[String; 3]) -> Result<bool, Error> {
@@ -179,7 +192,7 @@ impl BatchingTask {
frontend_client: &Arc<FrontendClient>,
) -> Result<Option<(u32, Duration)>, Error> {
if let Some(new_query) = self.gen_insert_plan(engine).await? {
debug!("Generate new query: {}", new_query);
debug!("Generate new query: {:#?}", new_query);
self.execute_logical_plan(frontend_client, &new_query).await
} else {
debug!("Generate no query");
@@ -380,23 +393,6 @@ impl BatchingTask {
frontend_client: Arc<FrontendClient>,
) {
loop {
// first check if shutdown signal is received
// if so, break the loop
{
let mut state = self.state.write().unwrap();
match state.shutdown_rx.try_recv() {
Ok(()) => break,
Err(TryRecvError::Closed) => {
warn!(
"Unexpected shutdown flow {}, shutdown anyway",
self.config.flow_id
);
break;
}
Err(TryRecvError::Empty) => (),
}
}
let mut new_query = None;
let mut gen_and_exec = async || {
new_query = self.gen_insert_plan(&engine).await?;
@@ -410,15 +406,20 @@ impl BatchingTask {
// normal execute, sleep for some time before doing next query
Ok(Some(_)) => {
let sleep_until = {
let state = self.state.write().unwrap();
let mut state = self.state.write().unwrap();
match state.shutdown_rx.try_recv() {
Ok(()) => break,
Err(TryRecvError::Closed) => {
warn!(
"Unexpected shutdown flow {}, shutdown anyway",
self.config.flow_id
);
break;
}
Err(TryRecvError::Empty) => (),
}
state.get_next_start_query_time(
self.config.flow_id,
&self
.config
.time_window_expr
.as_ref()
.and_then(|t| *t.time_window_size()),
Some(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT),
)
};

View File

@@ -55,9 +55,6 @@ use crate::error::{
use crate::expr::error::DataTypeSnafu;
use crate::Error;
/// Represents a test timestamp in seconds since the Unix epoch.
const DEFAULT_TEST_TIMESTAMP: Timestamp = Timestamp::new_second(17_0000_0000);
/// Time window expr like `date_bin(INTERVAL '1' MINUTE, ts)`, this type help with
/// evaluating the expr using given timestamp
///
@@ -73,7 +70,6 @@ pub struct TimeWindowExpr {
pub column_name: String,
logical_expr: Expr,
df_schema: DFSchema,
eval_time_window_size: Option<std::time::Duration>,
}
impl std::fmt::Display for TimeWindowExpr {
@@ -88,11 +84,6 @@ impl std::fmt::Display for TimeWindowExpr {
}
impl TimeWindowExpr {
/// The time window size of the expr, get from calling `eval` with a test timestamp
pub fn time_window_size(&self) -> &Option<std::time::Duration> {
&self.eval_time_window_size
}
pub fn from_expr(
expr: &Expr,
column_name: &str,
@@ -100,28 +91,12 @@ impl TimeWindowExpr {
session: &SessionState,
) -> Result<Self, Error> {
let phy_expr: PhysicalExprRef = to_phy_expr(expr, df_schema, session)?;
let mut zelf = Self {
Ok(Self {
phy_expr,
column_name: column_name.to_string(),
logical_expr: expr.clone(),
df_schema: df_schema.clone(),
eval_time_window_size: None,
};
let test_ts = DEFAULT_TEST_TIMESTAMP;
let (l, u) = zelf.eval(test_ts)?;
let time_window_size = match (l, u) {
(Some(l), Some(u)) => u.sub(&l).map(|r| r.to_std()).transpose().map_err(|_| {
UnexpectedSnafu {
reason: format!(
"Expect upper bound older than lower bound, found upper={u:?} and lower={l:?}"
),
}
.build()
})?,
_ => None,
};
zelf.eval_time_window_size = time_window_size;
Ok(zelf)
})
}
pub fn eval(

View File

@@ -138,12 +138,9 @@ impl TreeNodeVisitor<'_> for FindGroupByFinalName {
fn f_down(&mut self, node: &Self::Node) -> datafusion_common::Result<TreeNodeRecursion> {
if let LogicalPlan::Aggregate(aggregate) = node {
self.group_exprs = Some(aggregate.group_expr.iter().cloned().collect());
debug!(
"FindGroupByFinalName: Get Group by exprs from Aggregate: {:?}",
self.group_exprs
);
debug!("Group by exprs: {:?}", self.group_exprs);
} else if let LogicalPlan::Distinct(distinct) = node {
debug!("FindGroupByFinalName: Distinct: {}", node);
debug!("Distinct: {:#?}", distinct);
match distinct {
Distinct::All(input) => {
if let LogicalPlan::TableScan(table_scan) = &**input {
@@ -165,10 +162,7 @@ impl TreeNodeVisitor<'_> for FindGroupByFinalName {
self.group_exprs = Some(distinct_on.on_expr.iter().cloned().collect())
}
}
debug!(
"FindGroupByFinalName: Get Group by exprs from Distinct: {:?}",
self.group_exprs
);
debug!("Group by exprs: {:?}", self.group_exprs);
}
Ok(TreeNodeRecursion::Continue)

View File

@@ -18,9 +18,9 @@
use std::collections::BTreeMap;
use dfir_rs::scheduled::graph::Dfir;
use dfir_rs::scheduled::graph_ext::GraphExt;
use dfir_rs::scheduled::port::{PortCtx, SEND};
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::scheduled::graph_ext::GraphExt;
use hydroflow::scheduled::port::{PortCtx, SEND};
use itertools::Itertools;
use snafu::OptionExt;
@@ -38,7 +38,7 @@ mod src_sink;
/// The Context for build a Operator with id of `GlobalId`
pub struct Context<'referred, 'df> {
pub id: GlobalId,
pub df: &'referred mut Dfir<'df>,
pub df: &'referred mut Hydroflow<'df>,
pub compute_state: &'referred mut DataflowState,
/// a list of all collections being used in the operator
///
@@ -361,16 +361,16 @@ mod test {
use std::cell::RefCell;
use std::rc::Rc;
use dfir_rs::scheduled::graph::Dfir;
use dfir_rs::scheduled::graph_ext::GraphExt;
use dfir_rs::scheduled::handoff::VecHandoff;
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::scheduled::graph_ext::GraphExt;
use hydroflow::scheduled::handoff::VecHandoff;
use pretty_assertions::assert_eq;
use super::*;
use crate::repr::Row;
pub fn run_and_check(
state: &mut DataflowState,
df: &mut Dfir,
df: &mut Hydroflow,
time_range: std::ops::Range<i64>,
expected: BTreeMap<i64, Vec<DiffRow>>,
output: Rc<RefCell<Vec<DiffRow>>>,
@@ -416,7 +416,7 @@ mod test {
}
pub fn harness_test_ctx<'r, 'h>(
df: &'r mut Dfir<'h>,
df: &'r mut Hydroflow<'h>,
state: &'r mut DataflowState,
) -> Context<'r, 'h> {
let err_collector = state.get_err_collector();
@@ -436,7 +436,7 @@ mod test {
/// that is it only emit once, not multiple times
#[test]
fn test_render_constant() {
let mut df = Dfir::new();
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
@@ -473,7 +473,7 @@ mod test {
/// a simple example to show how to use source and sink
#[test]
fn example_source_sink() {
let mut df = Dfir::new();
let mut df = Hydroflow::new();
let (send_port, recv_port) = df.make_edge::<_, VecHandoff<i32>>("test_handoff");
df.add_subgraph_source("test_handoff_source", send_port, move |_ctx, send| {
for i in 0..10 {
@@ -498,8 +498,8 @@ mod test {
#[test]
fn test_tee_auto_schedule() {
use dfir_rs::scheduled::handoff::TeeingHandoff as Toff;
let mut df = Dfir::new();
use hydroflow::scheduled::handoff::TeeingHandoff as Toff;
let mut df = Hydroflow::new();
let (send_port, recv_port) = df.make_edge::<_, Toff<i32>>("test_handoff");
let source = df.add_subgraph_source("test_handoff_source", send_port, move |_ctx, send| {
for i in 0..10 {

View File

@@ -14,8 +14,8 @@
use std::collections::BTreeMap;
use dfir_rs::scheduled::graph_ext::GraphExt;
use dfir_rs::scheduled::port::{PortCtx, SEND};
use hydroflow::scheduled::graph_ext::GraphExt;
use hydroflow::scheduled::port::{PortCtx, SEND};
use itertools::Itertools;
use snafu::OptionExt;
@@ -256,7 +256,7 @@ fn eval_mfp_core(
mod test {
use datatypes::data_type::ConcreteDataType;
use dfir_rs::scheduled::graph::Dfir;
use hydroflow::scheduled::graph::Hydroflow;
use super::*;
use crate::compute::render::test::{get_output_handle, harness_test_ctx, run_and_check};
@@ -269,7 +269,7 @@ mod test {
/// namely: if mfp operator can schedule a delete at the correct time
#[test]
fn test_render_mfp_with_temporal() {
let mut df = Dfir::new();
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
@@ -348,7 +348,7 @@ mod test {
/// that is it filter the rows correctly
#[test]
fn test_render_mfp() {
let mut df = Dfir::new();
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
@@ -388,7 +388,7 @@ mod test {
/// test if mfp operator can run multiple times within same tick
#[test]
fn test_render_mfp_multiple_times() {
let mut df = Dfir::new();
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);

View File

@@ -22,7 +22,7 @@ use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::DataType;
use datatypes::value::{ListValue, Value};
use datatypes::vectors::{BooleanVector, NullVector};
use dfir_rs::scheduled::graph_ext::GraphExt;
use hydroflow::scheduled::graph_ext::GraphExt;
use itertools::Itertools;
use snafu::{ensure, OptionExt, ResultExt};
@@ -1212,7 +1212,7 @@ mod test {
use common_time::Timestamp;
use datatypes::data_type::{ConcreteDataType, ConcreteDataType as CDT};
use dfir_rs::scheduled::graph::Dfir;
use hydroflow::scheduled::graph::Hydroflow;
use super::*;
use crate::compute::render::test::{get_output_handle, harness_test_ctx, run_and_check};
@@ -1228,7 +1228,7 @@ mod test {
/// expected: sum(number), window_start, window_end
#[test]
fn test_tumble_group_by() {
let mut df = Dfir::new();
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
const START: i64 = 1625097600000;
@@ -1389,7 +1389,7 @@ mod test {
/// select avg(number) from number;
#[test]
fn test_avg_eval() {
let mut df = Dfir::new();
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
@@ -1500,7 +1500,7 @@ mod test {
/// | col | Int64 |
#[test]
fn test_basic_distinct() {
let mut df = Dfir::new();
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
@@ -1556,7 +1556,7 @@ mod test {
/// | col | Int64 |
#[test]
fn test_basic_batch_reduce_accum() {
let mut df = Dfir::new();
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let now = state.current_time_ref();
let mut ctx = harness_test_ctx(&mut df, &mut state);
@@ -1662,7 +1662,7 @@ mod test {
/// | col | Int64 |
#[test]
fn test_basic_reduce_accum() {
let mut df = Dfir::new();
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
@@ -1739,7 +1739,7 @@ mod test {
/// this test include even more insert/delete case to cover all case for eval_distinct_core
#[test]
fn test_delete_reduce_distinct_accum() {
let mut df = Dfir::new();
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
@@ -1818,7 +1818,7 @@ mod test {
/// this test include insert and delete which should cover all case for eval_distinct_core
#[test]
fn test_basic_reduce_distinct_accum() {
let mut df = Dfir::new();
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
@@ -1896,7 +1896,7 @@ mod test {
/// | col | Int64 |
#[test]
fn test_composite_reduce_distinct_accum() {
let mut df = Dfir::new();
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);

View File

@@ -17,7 +17,7 @@
use std::collections::BTreeMap;
use common_telemetry::{debug, trace};
use dfir_rs::scheduled::graph_ext::GraphExt;
use hydroflow::scheduled::graph_ext::GraphExt;
use itertools::Itertools;
use snafu::OptionExt;
use tokio::sync::broadcast::error::TryRecvError;

View File

@@ -16,16 +16,16 @@ use std::cell::RefCell;
use std::collections::{BTreeMap, VecDeque};
use std::rc::Rc;
use dfir_rs::scheduled::graph::Dfir;
use dfir_rs::scheduled::SubgraphId;
use get_size2::GetSize;
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::scheduled::SubgraphId;
use crate::compute::types::ErrCollector;
use crate::repr::{self, Timestamp};
use crate::utils::{ArrangeHandler, Arrangement};
/// input/output of a dataflow
/// One `ComputeState` manage the input/output/schedule of one `Dfir`
/// One `ComputeState` manage the input/output/schedule of one `Hydroflow`
#[derive(Debug, Default)]
pub struct DataflowState {
/// it is important to use a deque to maintain the order of subgraph here
@@ -38,7 +38,7 @@ pub struct DataflowState {
/// Which means it's also the current time in temporal filter to get current correct result
as_of: Rc<RefCell<Timestamp>>,
/// error collector local to this `ComputeState`,
/// useful for distinguishing errors from different `Dfir`
/// useful for distinguishing errors from different `Hydroflow`
err_collector: ErrCollector,
/// save all used arrange in this dataflow, since usually there is no delete operation
/// we can just keep track of all used arrange and schedule subgraph when they need to be updated
@@ -65,7 +65,7 @@ impl DataflowState {
/// schedule all subgraph that need to run with time <= `as_of` and run_available()
///
/// return true if any subgraph actually executed
pub fn run_available_with_schedule(&mut self, df: &mut Dfir) -> bool {
pub fn run_available_with_schedule(&mut self, df: &mut Hydroflow) -> bool {
// first split keys <= as_of into another map
let mut before = self
.schedule_subgraph

View File

@@ -18,10 +18,10 @@ use std::rc::Rc;
use std::sync::Arc;
use common_error::ext::ErrorExt;
use dfir_rs::scheduled::graph::Dfir;
use dfir_rs::scheduled::handoff::TeeingHandoff;
use dfir_rs::scheduled::port::RecvPort;
use dfir_rs::scheduled::SubgraphId;
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::scheduled::handoff::TeeingHandoff;
use hydroflow::scheduled::port::RecvPort;
use hydroflow::scheduled::SubgraphId;
use itertools::Itertools;
use tokio::sync::Mutex;
@@ -46,7 +46,7 @@ impl<T: 'static + Clone> Collection<T> {
/// clone a collection, require a mutable reference to the hydroflow instance
///
/// Note: need to be the same hydroflow instance that this collection is created from
pub fn clone(&self, df: &mut Dfir) -> Self {
pub fn clone(&self, df: &mut Hydroflow) -> Self {
Collection {
stream: self.stream.tee(df),
}
@@ -151,7 +151,7 @@ impl<T: 'static> CollectionBundle<T> {
}
impl<T: 'static + Clone> CollectionBundle<T> {
pub fn clone(&self, df: &mut Dfir) -> Self {
pub fn clone(&self, df: &mut Hydroflow) -> Self {
Self {
collection: self.collection.clone(df),
arranged: self

View File

@@ -46,12 +46,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Flow engine is still recovering"))]
FlowNotRecovered {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Error encountered while creating flow: {sql}"))]
CreateFlow {
sql: String,
@@ -313,13 +307,12 @@ impl ErrorExt for Error {
| Self::JoinTask { .. }
| Self::Datafusion { .. }
| Self::InsertIntoFlow { .. }
| Self::NoAvailableFrontend { .. }
| Self::FlowNotRecovered { .. } => StatusCode::Internal,
| Self::NoAvailableFrontend { .. } => StatusCode::Internal,
Self::FlowAlreadyExist { .. } => StatusCode::TableAlreadyExists,
Self::TableNotFound { .. }
| Self::TableNotFoundMeta { .. }
| Self::FlowNotFound { .. }
| Self::ListFlows { .. } => StatusCode::TableNotFound,
Self::FlowNotFound { .. } => StatusCode::FlowNotFound,
Self::Plan { .. } | Self::Datatypes { .. } => StatusCode::PlanQuery,
Self::CreateFlow { .. } | Self::Arrow { .. } | Self::Time { .. } => {
StatusCode::EngineExecuteQuery

View File

@@ -21,7 +21,7 @@ use common_error::ext::BoxedError;
use datatypes::prelude::{ConcreteDataType, DataType};
use datatypes::value::Value;
use datatypes::vectors::{BooleanVector, Helper, VectorRef};
use dfir_rs::lattices::cc_traits::Iter;
use hydroflow::lattices::cc_traits::Iter;
use itertools::Itertools;
use snafu::{ensure, OptionExt, ResultExt};

View File

@@ -60,7 +60,7 @@ pub enum GenericFn {
Mul,
Div,
Mod,
// variadic func
// varadic func
And,
Or,
// unmaterized func

View File

@@ -43,7 +43,7 @@ use servers::error::{StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu};
use servers::http::HttpServerBuilder;
use servers::metrics_handler::MetricsHandler;
use servers::server::{ServerHandler, ServerHandlers};
use session::context::QueryContextRef;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{OptionExt, ResultExt};
use tokio::net::TcpListener;
use tokio::sync::{broadcast, oneshot, Mutex};
@@ -54,18 +54,18 @@ use tonic::{Request, Response, Status};
use crate::adapter::flownode_impl::{FlowDualEngine, FlowDualEngineRef};
use crate::adapter::{create_worker, FlowStreamingEngineRef};
use crate::batching_mode::engine::BatchingEngine;
use crate::engine::FlowEngine;
use crate::error::{
to_status_with_last_err, CacheRequiredSnafu, ExternalSnafu, ListFlowsSnafu, ParseAddrSnafu,
ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu,
to_status_with_last_err, CacheRequiredSnafu, CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu,
ListFlowsSnafu, ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu,
};
use crate::heartbeat::HeartbeatTask;
use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS};
use crate::transform::register_function_to_query_engine;
use crate::utils::{SizeReportSender, StateReportHandler};
use crate::{Error, FlownodeOptions, FrontendClient, StreamingEngine};
use crate::{CreateFlowArgs, Error, FlownodeOptions, FrontendClient, StreamingEngine};
pub const FLOW_NODE_SERVER_NAME: &str = "FLOW_NODE_SERVER";
/// wrapping flow node manager to avoid orphan rule with Arc<...>
#[derive(Clone)]
pub struct FlowService {
@@ -397,6 +397,109 @@ impl FlownodeBuilder {
Ok(instance)
}
/// recover all flow tasks in this flownode in distributed mode(nodeid is Some(<num>))
///
/// or recover all existing flow tasks if in standalone mode(nodeid is None)
///
/// TODO(discord9): persistent flow tasks with internal state
async fn recover_flows(&self, manager: &FlowDualEngine) -> Result<usize, Error> {
let nodeid = self.opts.node_id;
let to_be_recovered: Vec<_> = if let Some(nodeid) = nodeid {
let to_be_recover = self
.flow_metadata_manager
.flownode_flow_manager()
.flows(nodeid)
.try_collect::<Vec<_>>()
.await
.context(ListFlowsSnafu { id: Some(nodeid) })?;
to_be_recover.into_iter().map(|(id, _)| id).collect()
} else {
let all_catalogs = self
.catalog_manager
.catalog_names()
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let mut all_flow_ids = vec![];
for catalog in all_catalogs {
let flows = self
.flow_metadata_manager
.flow_name_manager()
.flow_names(&catalog)
.await
.try_collect::<Vec<_>>()
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
all_flow_ids.extend(flows.into_iter().map(|(_, id)| id.flow_id()));
}
all_flow_ids
};
let cnt = to_be_recovered.len();
// TODO(discord9): recover in parallel
info!("Recovering {} flows: {:?}", cnt, to_be_recovered);
for flow_id in to_be_recovered {
let info = self
.flow_metadata_manager
.flow_info_manager()
.get(flow_id)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?
.context(FlowNotFoundSnafu { id: flow_id })?;
let sink_table_name = [
info.sink_table_name().catalog_name.clone(),
info.sink_table_name().schema_name.clone(),
info.sink_table_name().table_name.clone(),
];
let args = CreateFlowArgs {
flow_id: flow_id as _,
sink_table_name,
source_table_ids: info.source_table_ids().to_vec(),
// because recover should only happen on restart the `create_if_not_exists` and `or_replace` can be arbitrary value(since flow doesn't exist)
// but for the sake of consistency and to make sure recover of flow actually happen, we set both to true
// (which is also fine since checks for not allow both to be true is on metasrv and we already pass that)
create_if_not_exists: true,
or_replace: true,
expire_after: info.expire_after(),
comment: Some(info.comment().clone()),
sql: info.raw_sql().clone(),
flow_options: info.options().clone(),
query_ctx: info
.query_context()
.clone()
.map(|ctx| {
ctx.try_into()
.map_err(BoxedError::new)
.context(ExternalSnafu)
})
.transpose()?
// or use default QueryContext with catalog_name from info
// to keep compatibility with old version
.or_else(|| {
Some(
QueryContextBuilder::default()
.current_catalog(info.catalog_name().to_string())
.build(),
)
}),
};
manager
.create_flow(args)
.await
.map_err(BoxedError::new)
.with_context(|_| CreateFlowSnafu {
sql: info.raw_sql().clone(),
})?;
}
Ok(cnt)
}
/// build [`FlowWorkerManager`], note this doesn't take ownership of `self`,
/// nor does it actually start running the worker.
async fn build_manager(
@@ -581,7 +684,7 @@ impl FrontendInvoker {
.start_timer();
self.inserter
.handle_row_inserts(requests, ctx, &self.statement_executor, false, false)
.handle_row_inserts(requests, ctx, &self.statement_executor)
.await
.map_err(BoxedError::new)
.context(common_frontend::error::ExternalSnafu)

View File

@@ -72,10 +72,7 @@ impl GrpcQueryHandler for Instance {
let output = match request {
Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
Request::RowInserts(requests) => {
self.handle_row_inserts(requests, ctx.clone(), false, false)
.await?
}
Request::RowInserts(requests) => self.handle_row_inserts(requests, ctx.clone()).await?,
Request::Deletes(requests) => self.handle_deletes(requests, ctx.clone()).await?,
Request::RowDeletes(requests) => self.handle_row_deletes(requests, ctx.clone()).await?,
Request::Query(query_request) => {
@@ -410,17 +407,9 @@ impl Instance {
&self,
requests: RowInsertRequests,
ctx: QueryContextRef,
accommodate_existing_schema: bool,
is_single_value: bool,
) -> Result<Output> {
self.inserter
.handle_row_inserts(
requests,
ctx,
self.statement_executor.as_ref(),
accommodate_existing_schema,
is_single_value,
)
.handle_row_inserts(requests, ctx, self.statement_executor.as_ref())
.await
.context(TableOperationSnafu)
}
@@ -432,14 +421,7 @@ impl Instance {
ctx: QueryContextRef,
) -> Result<Output> {
self.inserter
.handle_last_non_null_inserts(
requests,
ctx,
self.statement_executor.as_ref(),
true,
// Influx protocol may writes multiple fields (values).
false,
)
.handle_last_non_null_inserts(requests, ctx, self.statement_executor.as_ref())
.await
.context(TableOperationSnafu)
}

View File

@@ -52,9 +52,8 @@ impl OpentsdbProtocolHandler for Instance {
None
};
// OpenTSDB is single value.
let output = self
.handle_row_inserts(requests, ctx, true, true)
.handle_row_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
.context(servers::error::ExecuteGrpcQuerySnafu)?;

View File

@@ -63,7 +63,7 @@ impl OpenTelemetryProtocolHandler for Instance {
None
};
self.handle_row_inserts(requests, ctx, false, false)
self.handle_row_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)

View File

@@ -195,7 +195,7 @@ impl PromStoreProtocolHandler for Instance {
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?
} else {
self.handle_row_inserts(request, ctx.clone(), true, true)
self.handle_row_inserts(request, ctx.clone())
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?

View File

@@ -481,7 +481,7 @@ mod tests {
let mock_values = dic_values
.iter()
.flat_map(|(value, size)| iter::repeat(value.clone()).take(*size))
.flat_map(|(value, size)| std::iter::repeat_n(value.clone(), *size))
.collect::<Vec<_>>();
let sorted_result = sorted_result(&mock_values, segment_row_count);

View File

@@ -182,14 +182,6 @@ impl ClientManager {
}
}
#[cfg(test)]
impl ClientManager {
/// Returns the controller client.
pub(crate) fn controller_client(&self) -> rskafka::client::controller::ControllerClient {
self.client.controller_client().unwrap()
}
}
#[cfg(test)]
mod tests {
use common_wal::test_util::run_test_with_kafka_wal;

View File

@@ -552,14 +552,6 @@ mod tests {
.collect()
}
async fn prepare_topic(logstore: &KafkaLogStore, topic_name: &str) {
let controller_client = logstore.client_manager.controller_client();
controller_client
.create_topic(topic_name.to_string(), 1, 1, 5000)
.await
.unwrap();
}
#[tokio::test]
async fn test_append_batch_basic() {
common_telemetry::init_default_ut_logging();
@@ -581,9 +573,7 @@ mod tests {
};
let logstore = KafkaLogStore::try_new(&config, None).await.unwrap();
let topic_name = uuid::Uuid::new_v4().to_string();
prepare_topic(&logstore, &topic_name).await;
let provider = Provider::kafka_provider(topic_name);
let region_entries = (0..5)
.map(|i| {
let region_id = RegionId::new(1, i);
@@ -657,7 +647,6 @@ mod tests {
};
let logstore = KafkaLogStore::try_new(&config, None).await.unwrap();
let topic_name = uuid::Uuid::new_v4().to_string();
prepare_topic(&logstore, &topic_name).await;
let provider = Provider::kafka_provider(topic_name);
let region_entries = (0..5)
.map(|i| {

View File

@@ -14,7 +14,6 @@
#![feature(result_flattening)]
#![feature(assert_matches)]
#![feature(extract_if)]
#![feature(hash_set_entry)]
pub mod bootstrap;

View File

@@ -48,6 +48,7 @@ use serde::{Deserialize, Serialize};
use servers::export_metrics::ExportMetricsOption;
use servers::http::HttpOptions;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use table::metadata::TableId;
use tokio::sync::broadcast::error::RecvError;
@@ -65,7 +66,7 @@ use crate::procedure::wal_prune::manager::WalPruneTickerRef;
use crate::procedure::ProcedureManagerListenerAdapter;
use crate::pubsub::{PublisherRef, SubscriptionManagerRef};
use crate::region::supervisor::RegionSupervisorTickerRef;
use crate::selector::{Selector, SelectorType};
use crate::selector::{RegionStatAwareSelector, Selector, SelectorType};
use crate::service::mailbox::MailboxRef;
use crate::service::store::cached_kv::LeaderCachedKvBackend;
use crate::state::{become_follower, become_leader, StateRef};
@@ -386,6 +387,8 @@ pub struct SelectorContext {
}
pub type SelectorRef = Arc<dyn Selector<Context = SelectorContext, Output = Vec<Peer>>>;
pub type RegionStatAwareSelectorRef =
Arc<dyn RegionStatAwareSelector<Context = SelectorContext, Output = Vec<(RegionId, Peer)>>>;
pub type ElectionRef = Arc<dyn Election<Leader = LeaderValue>>;
pub struct MetaStateHandler {

View File

@@ -40,7 +40,7 @@ use common_meta::state_store::KvStateStore;
use common_meta::wal_options_allocator::{build_kafka_client, build_wal_options_allocator};
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::ProcedureManagerRef;
use common_telemetry::warn;
use common_telemetry::{info, warn};
use snafu::{ensure, ResultExt};
use crate::cache_invalidator::MetasrvCacheInvalidator;
@@ -54,16 +54,16 @@ use crate::handler::region_lease_handler::{CustomizedRegionLeaseRenewerRef, Regi
use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatMailbox, Pushers};
use crate::lease::MetaPeerLookupService;
use crate::metasrv::{
ElectionRef, Metasrv, MetasrvInfo, MetasrvOptions, SelectTarget, SelectorContext, SelectorRef,
FLOW_ID_SEQ, TABLE_ID_SEQ,
ElectionRef, Metasrv, MetasrvInfo, MetasrvOptions, RegionStatAwareSelectorRef, SelectTarget,
SelectorContext, SelectorRef, FLOW_ID_SEQ, TABLE_ID_SEQ,
};
use crate::procedure::region_migration::manager::RegionMigrationManager;
use crate::procedure::region_migration::DefaultContextFactory;
use crate::procedure::wal_prune::manager::{WalPruneManager, WalPruneTicker};
use crate::procedure::wal_prune::Context as WalPruneContext;
use crate::region::supervisor::{
HeartbeatAcceptor, RegionFailureDetectorControl, RegionSupervisor, RegionSupervisorTicker,
DEFAULT_TICK_INTERVAL,
HeartbeatAcceptor, RegionFailureDetectorControl, RegionSupervisor, RegionSupervisorSelector,
RegionSupervisorTicker, DEFAULT_TICK_INTERVAL,
};
use crate::selector::lease_based::LeaseBasedSelector;
use crate::selector::round_robin::RoundRobinSelector;
@@ -320,13 +320,24 @@ impl MetasrvBuilder {
),
));
region_migration_manager.try_start()?;
let region_supervisor_selector = plugins
.as_ref()
.and_then(|plugins| plugins.get::<RegionStatAwareSelectorRef>());
let supervisor_selector = match region_supervisor_selector {
Some(selector) => {
info!("Using region stat aware selector");
RegionSupervisorSelector::RegionStatAwareSelector(selector)
}
None => RegionSupervisorSelector::NaiveSelector(selector.clone()),
};
let region_failover_handler = if options.enable_region_failover {
let region_supervisor = RegionSupervisor::new(
rx,
options.failure_detector,
selector_ctx.clone(),
selector.clone(),
supervisor_selector,
region_migration_manager.clone(),
maintenance_mode_manager.clone(),
peer_lookup_service.clone(),
@@ -365,7 +376,7 @@ impl MetasrvBuilder {
let (tx, rx) = WalPruneManager::channel();
// Safety: Must be remote WAL.
let remote_wal_options = options.wal.remote_wal_options().unwrap();
let kafka_client = build_kafka_client(&remote_wal_options.connection)
let kafka_client = build_kafka_client(remote_wal_options)
.await
.context(error::BuildKafkaClientSnafu)?;
let wal_prune_context = WalPruneContext {

View File

@@ -141,10 +141,7 @@ pub async fn mock(
if let Some(client) = client {
Ok(TokioIo::new(client))
} else {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Client already taken",
))
Err(std::io::Error::other("Client already taken"))
}
}
}),

View File

@@ -14,6 +14,7 @@
pub(crate) mod close_downgraded_region;
pub(crate) mod downgrade_leader_region;
pub(crate) mod flush_leader_region;
pub(crate) mod manager;
pub(crate) mod migration_abort;
pub(crate) mod migration_end;
@@ -111,6 +112,8 @@ impl PersistentContext {
pub struct Metrics {
/// Elapsed time of downgrading region and upgrading region.
operations_elapsed: Duration,
/// Elapsed time of flushing leader region.
flush_leader_region_elapsed: Duration,
/// Elapsed time of downgrading leader region.
downgrade_leader_region_elapsed: Duration,
/// Elapsed time of open candidate region.
@@ -121,10 +124,15 @@ pub struct Metrics {
impl Display for Metrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let total = self.flush_leader_region_elapsed
+ self.downgrade_leader_region_elapsed
+ self.open_candidate_region_elapsed
+ self.upgrade_candidate_region_elapsed;
write!(
f,
"operations_elapsed: {:?}, downgrade_leader_region_elapsed: {:?}, open_candidate_region_elapsed: {:?}, upgrade_candidate_region_elapsed: {:?}",
self.operations_elapsed,
"total: {:?}, flush_leader_region_elapsed: {:?}, downgrade_leader_region_elapsed: {:?}, open_candidate_region_elapsed: {:?}, upgrade_candidate_region_elapsed: {:?}",
total,
self.flush_leader_region_elapsed,
self.downgrade_leader_region_elapsed,
self.open_candidate_region_elapsed,
self.upgrade_candidate_region_elapsed
@@ -138,6 +146,11 @@ impl Metrics {
self.operations_elapsed += elapsed;
}
/// Updates the elapsed time of flushing leader region.
pub fn update_flush_leader_region_elapsed(&mut self, elapsed: Duration) {
self.flush_leader_region_elapsed += elapsed;
}
/// Updates the elapsed time of downgrading leader region.
pub fn update_downgrade_leader_region_elapsed(&mut self, elapsed: Duration) {
self.downgrade_leader_region_elapsed += elapsed;
@@ -156,10 +169,18 @@ impl Metrics {
impl Drop for Metrics {
fn drop(&mut self) {
if !self.operations_elapsed.is_zero() {
let total = self.flush_leader_region_elapsed
+ self.downgrade_leader_region_elapsed
+ self.open_candidate_region_elapsed
+ self.upgrade_candidate_region_elapsed;
METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
.with_label_values(&["total"])
.observe(total.as_secs_f64());
if !self.flush_leader_region_elapsed.is_zero() {
METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
.with_label_values(&["operations"])
.observe(self.operations_elapsed.as_secs_f64());
.with_label_values(&["flush_leader_region"])
.observe(self.flush_leader_region_elapsed.as_secs_f64());
}
if !self.downgrade_leader_region_elapsed.is_zero() {
@@ -320,6 +341,13 @@ impl Context {
.update_operations_elapsed(instant.elapsed());
}
/// Updates the elapsed time of flushing leader region.
pub fn update_flush_leader_region_elapsed(&mut self, instant: Instant) {
self.volatile_ctx
.metrics
.update_flush_leader_region_elapsed(instant.elapsed());
}
/// Updates the elapsed time of downgrading leader region.
pub fn update_downgrade_leader_region_elapsed(&mut self, instant: Instant) {
self.volatile_ctx
@@ -700,7 +728,8 @@ mod tests {
use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion;
use crate::procedure::region_migration::test_util::*;
use crate::procedure::test_util::{
new_downgrade_region_reply, new_open_region_reply, new_upgrade_region_reply,
new_downgrade_region_reply, new_flush_region_reply, new_open_region_reply,
new_upgrade_region_reply,
};
use crate::service::mailbox::Channel;
@@ -1208,6 +1237,15 @@ mod tests {
to_peer_id,
Arc::new(|id| Ok(new_open_region_reply(id, true, None))),
)),
Assertion::simple(assert_flush_leader_region, assert_no_persist),
),
// Flush Leader Region
Step::next(
"Should be the flush leader region",
Some(mock_datanode_reply(
from_peer_id,
Arc::new(|id| Ok(new_flush_region_reply(id, true, None))),
)),
Assertion::simple(assert_update_metadata_downgrade, assert_no_persist),
),
// UpdateMetadata::Downgrade

View File

@@ -170,7 +170,7 @@ impl DowngradeLeaderRegion {
if error.is_some() {
return error::RetryLaterSnafu {
reason: format!(
"Failed to downgrade the region {} on Datanode {:?}, error: {:?}, elapsed: {:?}",
"Failed to downgrade the region {} on datanode {:?}, error: {:?}, elapsed: {:?}",
region_id, leader, error, now.elapsed()
),
}
@@ -179,13 +179,14 @@ impl DowngradeLeaderRegion {
if !exists {
warn!(
"Trying to downgrade the region {} on Datanode {}, but region doesn't exist!, elapsed: {:?}",
"Trying to downgrade the region {} on datanode {:?}, but region doesn't exist!, elapsed: {:?}",
region_id, leader, now.elapsed()
);
} else {
info!(
"Region {} leader is downgraded, last_entry_id: {:?}, metadata_last_entry_id: {:?}, elapsed: {:?}",
"Region {} leader is downgraded on datanode {:?}, last_entry_id: {:?}, metadata_last_entry_id: {:?}, elapsed: {:?}",
region_id,
leader,
last_entry_id,
metadata_last_entry_id,
now.elapsed()

View File

@@ -0,0 +1,285 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use api::v1::meta::MailboxMessage;
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_procedure::Status;
use common_telemetry::{info, warn};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use tokio::time::Instant;
use crate::error::{self, Error, Result};
use crate::handler::HeartbeatMailbox;
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::{Context, State};
use crate::service::mailbox::Channel;
/// Flushes the leader region before downgrading it.
///
/// This can minimize the time window where the region is not writable.
#[derive(Debug, Serialize, Deserialize)]
pub struct PreFlushRegion;
#[async_trait::async_trait]
#[typetag::serde]
impl State for PreFlushRegion {
async fn next(&mut self, ctx: &mut Context) -> Result<(Box<dyn State>, Status)> {
let timer = Instant::now();
self.flush_region(ctx).await?;
ctx.update_flush_leader_region_elapsed(timer);
// We intentionally don't update `operations_elapsed` here to prevent
// the `next_operation_timeout` from being reduced by the flush operation.
// This ensures sufficient time for subsequent critical operations.
Ok((
Box::new(UpdateMetadata::Downgrade),
Status::executing(false),
))
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl PreFlushRegion {
/// Builds flush leader region instruction.
fn build_flush_leader_region_instruction(&self, ctx: &Context) -> Instruction {
let pc = &ctx.persistent_ctx;
let region_id = pc.region_id;
Instruction::FlushRegion(region_id)
}
/// Tries to flush a leader region.
///
/// Ignore:
/// - [PusherNotFound](error::Error::PusherNotFound), The datanode is unreachable.
/// - [PushMessage](error::Error::PushMessage), The receiver is dropped.
/// - Failed to flush region on the Datanode.
///
/// Abort:
/// - [MailboxTimeout](error::Error::MailboxTimeout), Timeout.
/// - [MailboxReceiver](error::Error::MailboxReceiver), The sender is dropped without sending (impossible).
/// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply).
/// - [ExceededDeadline](error::Error::ExceededDeadline)
/// - Invalid JSON.
async fn flush_region(&self, ctx: &mut Context) -> Result<()> {
let operation_timeout =
ctx.next_operation_timeout()
.context(error::ExceededDeadlineSnafu {
operation: "Flush leader region",
})?;
let flush_instruction = self.build_flush_leader_region_instruction(ctx);
let region_id = ctx.persistent_ctx.region_id;
let leader = &ctx.persistent_ctx.from_peer;
let msg = MailboxMessage::json_message(
&format!("Flush leader region: {}", region_id),
&format!("Metasrv@{}", ctx.server_addr()),
&format!("Datanode-{}@{}", leader.id, leader.addr),
common_time::util::current_time_millis(),
&flush_instruction,
)
.with_context(|_| error::SerializeToJsonSnafu {
input: flush_instruction.to_string(),
})?;
let ch = Channel::Datanode(leader.id);
let now = Instant::now();
let result = ctx.mailbox.send(&ch, msg, operation_timeout).await;
match result {
Ok(receiver) => match receiver.await? {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!(
"Received flush leader region reply: {:?}, region: {}, elapsed: {:?}",
reply,
region_id,
now.elapsed()
);
let InstructionReply::FlushRegion(SimpleReply { result, error }) = reply else {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
reason: "expect flush region reply",
}
.fail();
};
if error.is_some() {
warn!(
"Failed to flush leader region {} on datanode {:?}, error: {:?}. Skip flush operation.",
region_id, leader, error
);
} else if result {
info!(
"The flush leader region {} on datanode {:?} is successful, elapsed: {:?}",
region_id,
leader,
now.elapsed()
);
}
Ok(())
}
Err(Error::MailboxTimeout { .. }) => error::ExceededDeadlineSnafu {
operation: "Flush leader region",
}
.fail(),
Err(err) => Err(err),
},
Err(Error::PusherNotFound { .. }) => {
warn!(
"Failed to flush leader region({}), the datanode({}) is unreachable(PusherNotFound). Skip flush operation.",
region_id,
leader
);
Ok(())
}
Err(err) => Err(err),
}
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use store_api::storage::RegionId;
use super::*;
use crate::procedure::region_migration::test_util::{self, TestingEnv};
use crate::procedure::region_migration::{ContextFactory, PersistentContext};
use crate::procedure::test_util::{
new_close_region_reply, new_flush_region_reply, send_mock_reply,
};
fn new_persistent_context() -> PersistentContext {
test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
}
#[tokio::test]
async fn test_datanode_is_unreachable() {
let state = PreFlushRegion;
// from_peer: 1
// to_peer: 2
let persistent_context = new_persistent_context();
let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
// Should be ok, if leader region is unreachable. it will skip flush operation.
state.flush_region(&mut ctx).await.unwrap();
}
#[tokio::test]
async fn test_unexpected_instruction_reply() {
common_telemetry::init_default_ut_logging();
let state = PreFlushRegion;
// from_peer: 1
// to_peer: 2
let persistent_context = new_persistent_context();
let from_peer_id = persistent_context.from_peer.id;
let mut env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone();
let (tx, rx) = tokio::sync::mpsc::channel(1);
mailbox_ctx
.insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
.await;
// Sends an incorrect reply.
send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
let err = state.flush_region(&mut ctx).await.unwrap_err();
assert_matches!(err, Error::UnexpectedInstructionReply { .. });
assert!(!err.is_retryable());
}
#[tokio::test]
async fn test_instruction_exceeded_deadline() {
let state = PreFlushRegion;
// from_peer: 1
// to_peer: 2
let persistent_context = new_persistent_context();
let from_peer_id = persistent_context.from_peer.id;
let mut env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone();
let (tx, rx) = tokio::sync::mpsc::channel(1);
mailbox_ctx
.insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
.await;
// Sends an timeout error.
send_mock_reply(mailbox, rx, |id| {
Err(error::MailboxTimeoutSnafu { id }.build())
});
let err = state.flush_region(&mut ctx).await.unwrap_err();
assert_matches!(err, Error::ExceededDeadline { .. });
assert!(!err.is_retryable());
}
#[tokio::test]
async fn test_flush_region_failed() {
common_telemetry::init_default_ut_logging();
let state = PreFlushRegion;
// from_peer: 1
// to_peer: 2
let persistent_context = new_persistent_context();
let from_peer_id = persistent_context.from_peer.id;
let mut env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone();
let (tx, rx) = tokio::sync::mpsc::channel(1);
mailbox_ctx
.insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
.await;
send_mock_reply(mailbox, rx, |id| {
Ok(new_flush_region_reply(
id,
false,
Some("test mocked".to_string()),
))
});
// Should be ok, if flush leader region failed. it will skip flush operation.
state.flush_region(&mut ctx).await.unwrap();
}
#[tokio::test]
async fn test_next_update_metadata_downgrade_state() {
common_telemetry::init_default_ut_logging();
let mut state = PreFlushRegion;
// from_peer: 1
// to_peer: 2
let persistent_context = new_persistent_context();
let from_peer_id = persistent_context.from_peer.id;
let mut env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone();
let (tx, rx) = tokio::sync::mpsc::channel(1);
mailbox_ctx
.insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
.await;
send_mock_reply(mailbox, rx, |id| Ok(new_flush_region_reply(id, true, None)));
let (next, _) = state.next(&mut ctx).await.unwrap();
let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
assert_matches!(update_metadata, UpdateMetadata::Downgrade);
}
}

View File

@@ -28,7 +28,7 @@ use tokio::time::Instant;
use crate::error::{self, Result};
use crate::handler::HeartbeatMailbox;
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::flush_leader_region::PreFlushRegion;
use crate::procedure::region_migration::{Context, State};
use crate::service::mailbox::Channel;
@@ -47,10 +47,7 @@ impl State for OpenCandidateRegion {
self.open_candidate_region(ctx, instruction).await?;
ctx.update_open_candidate_region_elapsed(now);
Ok((
Box::new(UpdateMetadata::Downgrade),
Status::executing(false),
))
Ok((Box::new(PreFlushRegion), Status::executing(false)))
}
fn as_any(&self) -> &dyn Any {
@@ -399,7 +396,7 @@ mod tests {
}
#[tokio::test]
async fn test_next_update_metadata_downgrade_state() {
async fn test_next_flush_leader_region_state() {
let mut state = Box::new(OpenCandidateRegion);
// from_peer: 1
// to_peer: 2
@@ -445,8 +442,7 @@ mod tests {
(to_peer_id, region_id)
);
let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
assert_matches!(update_metadata, UpdateMetadata::Downgrade);
let flush_leader_region = next.as_any().downcast_ref::<PreFlushRegion>().unwrap();
assert_matches!(flush_leader_region, PreFlushRegion);
}
}

View File

@@ -44,6 +44,7 @@ use crate::error::{self, Error, Result};
use crate::metasrv::MetasrvInfo;
use crate::procedure::region_migration::close_downgraded_region::CloseDowngradedRegion;
use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion;
use crate::procedure::region_migration::flush_leader_region::PreFlushRegion;
use crate::procedure::region_migration::manager::RegionMigrationProcedureTracker;
use crate::procedure::region_migration::migration_abort::RegionMigrationAbort;
use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
@@ -415,6 +416,11 @@ pub(crate) fn assert_open_candidate_region(next: &dyn State) {
let _ = next.as_any().downcast_ref::<OpenCandidateRegion>().unwrap();
}
/// Asserts the [State] should be [FlushLeaderRegion].
pub(crate) fn assert_flush_leader_region(next: &dyn State) {
let _ = next.as_any().downcast_ref::<PreFlushRegion>().unwrap();
}
/// Asserts the [State] should be [UpdateMetadata::Downgrade].
pub(crate) fn assert_update_metadata_downgrade(next: &dyn State) {
let state = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();

View File

@@ -101,6 +101,24 @@ pub fn new_open_region_reply(id: u64, result: bool, error: Option<String>) -> Ma
}
}
/// Generates a [InstructionReply::FlushRegion] reply.
pub fn new_flush_region_reply(id: u64, result: bool, error: Option<String>) -> MailboxMessage {
MailboxMessage {
id,
subject: "mock".to_string(),
from: "datanode".to_string(),
to: "meta".to_string(),
timestamp_millis: current_time_millis(),
payload: Some(Payload::Json(
serde_json::to_string(&InstructionReply::FlushRegion(SimpleReply {
result,
error,
}))
.unwrap(),
)),
}
}
/// Generates a [InstructionReply::CloseRegion] reply.
pub fn new_close_region_reply(id: u64) -> MailboxMessage {
MailboxMessage {

View File

@@ -52,7 +52,7 @@ use crate::Result;
pub type KafkaClientRef = Arc<Client>;
const DELETE_RECORDS_TIMEOUT: Duration = Duration::from_secs(5);
const DELETE_RECORDS_TIMEOUT: Duration = Duration::from_secs(1);
/// The state of WAL pruning.
#[derive(Debug, Serialize, Deserialize)]
@@ -181,7 +181,7 @@ impl WalPruneProcedure {
let peer_and_instructions = peer_region_ids_map
.into_iter()
.map(|(peer, region_ids)| {
let flush_instruction = Instruction::FlushRegion(FlushRegions { region_ids });
let flush_instruction = Instruction::FlushRegions(FlushRegions { region_ids });
(peer.clone(), flush_instruction)
})
.collect();
@@ -536,7 +536,7 @@ mod tests {
let msg = resp.mailbox_message.unwrap();
let flush_instruction = HeartbeatMailbox::json_instruction(&msg).unwrap();
let mut flush_requested_region_ids = match flush_instruction {
Instruction::FlushRegion(FlushRegions { region_ids, .. }) => region_ids,
Instruction::FlushRegions(FlushRegions { region_ids, .. }) => region_ids,
_ => unreachable!(),
};
let sorted_region_ids = region_ids
@@ -558,7 +558,6 @@ mod tests {
topic_name = format!("test_procedure_execution-{}", topic_name);
let mut env = TestEnv::new();
let context = env.build_wal_prune_context(broker_endpoints).await;
TestEnv::prepare_topic(&context.client, &topic_name).await;
let mut procedure = WalPruneProcedure::new(topic_name.clone(), context, 10, None);
// Before any data in kvbackend is mocked, should return a retryable error.

View File

@@ -78,7 +78,7 @@ impl TestEnv {
kafka_topic,
..Default::default()
};
Arc::new(build_kafka_client(&config.connection).await.unwrap())
Arc::new(build_kafka_client(&config).await.unwrap())
}
pub async fn build_wal_prune_context(&self, broker_endpoints: Vec<String>) -> WalPruneContext {
@@ -91,12 +91,4 @@ impl TestEnv {
mailbox: self.mailbox.mailbox().clone(),
}
}
pub async fn prepare_topic(client: &Arc<Client>, topic_name: &str) {
let controller_client = client.controller_client().unwrap();
controller_client
.create_topic(topic_name.to_string(), 1, 1, 5000)
.await
.unwrap();
}
}

View File

@@ -22,20 +22,20 @@ use common_meta::datanode::Stat;
use common_meta::ddl::{DetectingRegion, RegionFailureDetectorController};
use common_meta::key::maintenance::MaintenanceModeManagerRef;
use common_meta::leadership_notifier::LeadershipChangeListener;
use common_meta::peer::PeerLookupServiceRef;
use common_meta::peer::{Peer, PeerLookupServiceRef};
use common_meta::DatanodeId;
use common_runtime::JoinHandle;
use common_telemetry::{debug, error, info, warn};
use common_time::util::current_time_millis;
use error::Error::{LeaderPeerChanged, MigrationRunning, TableRouteNotFound};
use snafu::{OptionExt, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::RegionId;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::time::{interval, MissedTickBehavior};
use crate::error::{self, Result};
use crate::failure_detector::PhiAccrualFailureDetectorOptions;
use crate::metasrv::{SelectorContext, SelectorRef};
use crate::metasrv::{RegionStatAwareSelectorRef, SelectTarget, SelectorContext, SelectorRef};
use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
use crate::procedure::region_migration::{
RegionMigrationProcedureTask, DEFAULT_REGION_MIGRATION_TIMEOUT,
@@ -203,6 +203,12 @@ pub type RegionSupervisorRef = Arc<RegionSupervisor>;
/// The default tick interval.
pub const DEFAULT_TICK_INTERVAL: Duration = Duration::from_secs(1);
/// Selector for region supervisor.
pub enum RegionSupervisorSelector {
NaiveSelector(SelectorRef),
RegionStatAwareSelector(RegionStatAwareSelectorRef),
}
/// The [`RegionSupervisor`] is used to detect Region failures
/// and initiate Region failover upon detection, ensuring uninterrupted region service.
pub struct RegionSupervisor {
@@ -215,7 +221,7 @@ pub struct RegionSupervisor {
/// The context of [`SelectorRef`]
selector_context: SelectorContext,
/// Candidate node selector.
selector: SelectorRef,
selector: RegionSupervisorSelector,
/// Region migration manager.
region_migration_manager: RegionMigrationManagerRef,
/// The maintenance mode manager.
@@ -288,7 +294,7 @@ impl RegionSupervisor {
event_receiver: Receiver<Event>,
options: PhiAccrualFailureDetectorOptions,
selector_context: SelectorContext,
selector: SelectorRef,
selector: RegionSupervisorSelector,
region_migration_manager: RegionMigrationManagerRef,
maintenance_mode_manager: MaintenanceModeManagerRef,
peer_lookup: PeerLookupServiceRef,
@@ -362,6 +368,7 @@ impl RegionSupervisor {
}
}
// Extracts regions that are migrating(failover), which means they are already being triggered failover.
let migrating_regions = regions
.extract_if(.., |(_, region_id)| {
self.region_migration_manager.tracker().contains(*region_id)
@@ -374,10 +381,43 @@ impl RegionSupervisor {
);
}
warn!("Detects region failures: {:?}", regions);
if regions.is_empty() {
// If all detected regions are failover or migrating, just return.
return;
}
let mut grouped_regions: HashMap<u64, Vec<RegionId>> =
HashMap::with_capacity(regions.len());
for (datanode_id, region_id) in regions {
if let Err(err) = self.do_failover(datanode_id, region_id).await {
error!(err; "Failed to execute region failover for region: {region_id}, datanode: {datanode_id}");
grouped_regions
.entry(datanode_id)
.or_default()
.push(region_id);
}
for (datanode_id, regions) in grouped_regions {
warn!(
"Detects region failures on datanode: {}, regions: {:?}",
datanode_id, regions
);
// We can't use `grouped_regions.keys().cloned().collect::<Vec<_>>()` here
// because there may be false positives in failure detection on the datanode.
// So we only consider the datanode that reports the failure.
let failed_datanodes = [datanode_id];
match self
.generate_failover_tasks(datanode_id, &regions, &failed_datanodes)
.await
{
Ok(tasks) => {
for (task, count) in tasks {
let region_id = task.region_id;
let datanode_id = task.from_peer.id;
if let Err(err) = self.do_failover(task, count).await {
error!(err; "Failed to execute region failover for region: {}, datanode: {}", region_id, datanode_id);
}
}
}
Err(err) => error!(err; "Failed to generate failover tasks"),
}
}
}
@@ -389,49 +429,107 @@ impl RegionSupervisor {
.context(error::MaintenanceModeManagerSnafu)
}
async fn do_failover(&mut self, datanode_id: DatanodeId, region_id: RegionId) -> Result<()> {
let count = *self
.failover_counts
.entry((datanode_id, region_id))
.and_modify(|count| *count += 1)
.or_insert(1);
async fn select_peers(
&self,
from_peer_id: DatanodeId,
regions: &[RegionId],
failure_datanodes: &[DatanodeId],
) -> Result<Vec<(RegionId, Peer)>> {
let exclude_peer_ids = HashSet::from_iter(failure_datanodes.iter().cloned());
match &self.selector {
RegionSupervisorSelector::NaiveSelector(selector) => {
let opt = SelectorOptions {
min_required_items: regions.len(),
allow_duplication: true,
exclude_peer_ids,
};
let peers = selector.select(&self.selector_context, opt).await?;
ensure!(
peers.len() == regions.len(),
error::NoEnoughAvailableNodeSnafu {
required: regions.len(),
available: peers.len(),
select_target: SelectTarget::Datanode,
}
);
let region_peers = regions
.iter()
.zip(peers)
.map(|(region_id, peer)| (*region_id, peer))
.collect::<Vec<_>>();
Ok(region_peers)
}
RegionSupervisorSelector::RegionStatAwareSelector(selector) => {
let peers = selector
.select(
&self.selector_context,
from_peer_id,
regions,
exclude_peer_ids,
)
.await?;
ensure!(
peers.len() == regions.len(),
error::NoEnoughAvailableNodeSnafu {
required: regions.len(),
available: peers.len(),
select_target: SelectTarget::Datanode,
}
);
Ok(peers)
}
}
}
async fn generate_failover_tasks(
&mut self,
from_peer_id: DatanodeId,
regions: &[RegionId],
failed_datanodes: &[DatanodeId],
) -> Result<Vec<(RegionMigrationProcedureTask, u32)>> {
let mut tasks = Vec::with_capacity(regions.len());
let from_peer = self
.peer_lookup
.datanode(datanode_id)
.datanode(from_peer_id)
.await
.context(error::LookupPeerSnafu {
peer_id: datanode_id,
peer_id: from_peer_id,
})?
.context(error::PeerUnavailableSnafu {
peer_id: datanode_id,
peer_id: from_peer_id,
})?;
let mut peers = self
.selector
.select(
&self.selector_context,
SelectorOptions {
min_required_items: 1,
allow_duplication: false,
exclude_peer_ids: HashSet::from([from_peer.id]),
},
)
let region_peers = self
.select_peers(from_peer_id, regions, failed_datanodes)
.await?;
let to_peer = peers.remove(0);
if to_peer.id == from_peer.id {
warn!(
"Skip failover for region: {region_id}, from_peer: {from_peer}, trying to failover to the same peer."
);
return Ok(());
for (region_id, peer) in region_peers {
let count = *self
.failover_counts
.entry((from_peer_id, region_id))
.and_modify(|count| *count += 1)
.or_insert(1);
let task = RegionMigrationProcedureTask {
region_id,
from_peer: from_peer.clone(),
to_peer: peer,
timeout: DEFAULT_REGION_MIGRATION_TIMEOUT * count,
};
tasks.push((task, count));
}
Ok(tasks)
}
async fn do_failover(&mut self, task: RegionMigrationProcedureTask, count: u32) -> Result<()> {
let from_peer_id = task.from_peer.id;
let region_id = task.region_id;
info!(
"Failover for region: {region_id}, from_peer: {from_peer}, to_peer: {to_peer}, tries: {count}"
"Failover for region: {}, from_peer: {}, to_peer: {}, timeout: {:?}, tries: {}",
task.region_id, task.from_peer, task.to_peer, task.timeout, count
);
let task = RegionMigrationProcedureTask {
region_id,
from_peer,
to_peer,
timeout: DEFAULT_REGION_MIGRATION_TIMEOUT * count,
};
if let Err(err) = self.region_migration_manager.submit_procedure(task).await {
return match err {
@@ -439,25 +537,25 @@ impl RegionSupervisor {
MigrationRunning { .. } => {
info!(
"Another region migration is running, skip failover for region: {}, datanode: {}",
region_id, datanode_id
region_id, from_peer_id
);
Ok(())
}
TableRouteNotFound { .. } => {
self.deregister_failure_detectors(vec![(datanode_id, region_id)])
self.deregister_failure_detectors(vec![(from_peer_id, region_id)])
.await;
info!(
"Table route is not found, the table is dropped, removed failover detector for region: {}, datanode: {}",
region_id, datanode_id
region_id, from_peer_id
);
Ok(())
}
LeaderPeerChanged { .. } => {
self.deregister_failure_detectors(vec![(datanode_id, region_id)])
self.deregister_failure_detectors(vec![(from_peer_id, region_id)])
.await;
info!(
"Region's leader peer changed, removed failover detector for region: {}, datanode: {}",
region_id, datanode_id
region_id, from_peer_id
);
Ok(())
}
@@ -521,6 +619,7 @@ pub(crate) mod tests {
use tokio::sync::oneshot;
use tokio::time::sleep;
use super::RegionSupervisorSelector;
use crate::procedure::region_migration::manager::RegionMigrationManager;
use crate::procedure::region_migration::test_util::TestingEnv;
use crate::region::supervisor::{
@@ -548,7 +647,7 @@ pub(crate) mod tests {
rx,
Default::default(),
selector_context,
selector,
RegionSupervisorSelector::NaiveSelector(selector),
region_migration_manager,
maintenance_mode_manager,
peer_lookup,

View File

@@ -23,6 +23,7 @@ pub mod weighted_choose;
use std::collections::HashSet;
use serde::{Deserialize, Serialize};
use store_api::storage::RegionId;
use strum::AsRefStr;
use crate::error;
@@ -36,6 +37,24 @@ pub trait Selector: Send + Sync {
async fn select(&self, ctx: &Self::Context, opts: SelectorOptions) -> Result<Self::Output>;
}
/// A selector that aware of region statistics
///
/// It selects the best destination peer for a list of regions.
/// The selection is based on the region statistics, such as the region leader's write throughput.
#[async_trait::async_trait]
pub trait RegionStatAwareSelector: Send + Sync {
type Context;
type Output;
async fn select(
&self,
ctx: &Self::Context,
from_peer_id: u64,
region_ids: &[RegionId],
exclude_peer_ids: HashSet<u64>,
) -> Result<Self::Output>;
}
#[derive(Debug)]
pub struct SelectorOptions {
/// Minimum number of selected results.

View File

@@ -278,7 +278,7 @@ impl KvBackend for LeaderCachedKvBackend {
let remote_res = self.store.batch_get(remote_req).await?;
let put_req = BatchPutRequest {
kvs: remote_res.kvs.clone().into_iter().map(Into::into).collect(),
kvs: remote_res.kvs.clone().into_iter().collect(),
..Default::default()
};
let _ = self.cache.batch_put(put_req).await?;

View File

@@ -206,9 +206,7 @@ impl DataRegion {
) -> Result<AffectedRows> {
match request.kind {
AlterKind::SetRegionOptions { options: _ }
| AlterKind::UnsetRegionOptions { keys: _ }
| AlterKind::SetIndex { options: _ }
| AlterKind::UnsetIndex { options: _ } => {
| AlterKind::UnsetRegionOptions { keys: _ } => {
let region_id = utils::to_data_region_id(region_id);
self.mito
.handle_request(region_id, RegionRequest::Alter(request))

View File

@@ -42,11 +42,11 @@ pub(crate) use state::MetricEngineState;
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use store_api::region_engine::{
RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic,
SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState,
SyncManifestResponse,
BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef,
RegionStatistic, SetRegionRoleStateResponse, SetRegionRoleStateSuccess,
SettableRegionRoleState, SyncManifestResponse,
};
use store_api::region_request::{BatchRegionDdlRequest, RegionRequest};
use store_api::region_request::{BatchRegionDdlRequest, RegionOpenRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
use crate::config::EngineConfig;
@@ -131,6 +131,17 @@ impl RegionEngine for MetricEngine {
METRIC_ENGINE_NAME
}
async fn handle_batch_open_requests(
&self,
parallelism: usize,
requests: Vec<(RegionId, RegionOpenRequest)>,
) -> Result<BatchResponses, BoxedError> {
self.inner
.handle_batch_open_requests(parallelism, requests)
.await
.map_err(BoxedError::new)
}
async fn handle_batch_ddl_requests(
&self,
batch_request: BatchRegionDdlRequest,

View File

@@ -14,24 +14,80 @@
//! Open a metric region.
use std::collections::HashSet;
use common_telemetry::info;
use mito2::engine::MITO_ENGINE_NAME;
use object_store::util::join_dir;
use snafu::{OptionExt, ResultExt};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metric_engine_consts::{DATA_REGION_SUBDIR, METADATA_REGION_SUBDIR};
use store_api::region_engine::RegionEngine;
use store_api::region_engine::{BatchResponses, RegionEngine};
use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
use store_api::storage::RegionId;
use crate::engine::create::region_options_for_metadata_region;
use crate::engine::options::{set_data_region_options, PhysicalRegionOptions};
use crate::engine::MetricEngineInner;
use crate::error::{OpenMitoRegionSnafu, PhysicalRegionNotFoundSnafu, Result};
use crate::error::{
BatchOpenMitoRegionSnafu, OpenMitoRegionSnafu, PhysicalRegionNotFoundSnafu, Result,
};
use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_REGION_COUNT};
use crate::utils;
impl MetricEngineInner {
pub async fn handle_batch_open_requests(
&self,
parallelism: usize,
requests: Vec<(RegionId, RegionOpenRequest)>,
) -> Result<BatchResponses> {
// We need to open metadata region and data region for each request.
let mut all_requests = Vec::with_capacity(requests.len() * 2);
let mut physical_region_ids = Vec::with_capacity(requests.len());
let mut data_region_ids = HashSet::with_capacity(requests.len());
for (region_id, request) in requests {
if !request.is_physical_table() {
continue;
}
let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
let metadata_region_id = utils::to_metadata_region_id(region_id);
let data_region_id = utils::to_data_region_id(region_id);
let (open_metadata_region_request, open_data_region_request) =
self.transform_open_physical_region_request(request);
all_requests.push((metadata_region_id, open_metadata_region_request));
all_requests.push((data_region_id, open_data_region_request));
physical_region_ids.push((region_id, physical_region_options));
data_region_ids.insert(data_region_id);
}
let results = self
.mito
.handle_batch_open_requests(parallelism, all_requests)
.await
.context(BatchOpenMitoRegionSnafu {})?
.into_iter()
.filter(|(region_id, _)| data_region_ids.contains(region_id))
.collect::<Vec<_>>();
for (physical_region_id, physical_region_options) in physical_region_ids {
let primary_key_encoding = self
.mito
.get_primary_key_encoding(physical_region_id)
.context(PhysicalRegionNotFoundSnafu {
region_id: physical_region_id,
})?;
self.recover_states(
physical_region_id,
primary_key_encoding,
physical_region_options,
)
.await?;
}
Ok(results)
}
/// Open a metric region.
///
/// Only open requests to a physical region matter. Those to logical regions are
@@ -69,12 +125,15 @@ impl MetricEngineInner {
}
}
/// Invokes mito engine to open physical regions (data and metadata).
async fn open_physical_region(
/// Transform the open request to open metadata region and data region.
///
/// Returns:
/// - The open request for metadata region.
/// - The open request for data region.
fn transform_open_physical_region_request(
&self,
region_id: RegionId,
request: RegionOpenRequest,
) -> Result<AffectedRows> {
) -> (RegionOpenRequest, RegionOpenRequest) {
let metadata_region_dir = join_dir(&request.region_dir, METADATA_REGION_SUBDIR);
let data_region_dir = join_dir(&request.region_dir, DATA_REGION_SUBDIR);
@@ -98,8 +157,19 @@ impl MetricEngineInner {
skip_wal_replay: request.skip_wal_replay,
};
(open_metadata_region_request, open_data_region_request)
}
/// Invokes mito engine to open physical regions (data and metadata).
async fn open_physical_region(
&self,
region_id: RegionId,
request: RegionOpenRequest,
) -> Result<AffectedRows> {
let metadata_region_id = utils::to_metadata_region_id(region_id);
let data_region_id = utils::to_data_region_id(region_id);
let (open_metadata_region_request, open_data_region_request) =
self.transform_open_physical_region_request(request);
self.mito
.handle_request(

View File

@@ -42,6 +42,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to batch open mito region"))]
BatchOpenMitoRegion {
source: BoxedError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to close mito region, region id: {}", region_id))]
CloseMitoRegion {
region_id: RegionId,
@@ -337,7 +344,8 @@ impl ErrorExt for Error {
| MitoCatchupOperation { source, .. }
| MitoFlushOperation { source, .. }
| MitoDeleteOperation { source, .. }
| MitoSyncOperation { source, .. } => source.status_code(),
| MitoSyncOperation { source, .. }
| BatchOpenMitoRegion { source, .. } => source.status_code(),
EncodePrimaryKey { source, .. } => source.status_code(),

View File

@@ -16,7 +16,7 @@ use std::sync::Arc;
use object_store::services::Fs;
use object_store::util::{join_dir, with_instrument_layers};
use object_store::{ErrorKind, ObjectStore};
use object_store::ObjectStore;
use smallvec::SmallVec;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
@@ -42,10 +42,6 @@ pub type AccessLayerRef = Arc<AccessLayer>;
/// SST write results.
pub type SstInfoArray = SmallVec<[SstInfo; 2]>;
pub const ATOMIC_WRITE_DIR: &str = "tmp/";
/// For compatibility. Remove this after a major version release.
pub const OLD_ATOMIC_WRITE_DIR: &str = ".tmp/";
/// A layer to access SST files under the same directory.
pub struct AccessLayer {
region_dir: String,
@@ -164,18 +160,13 @@ impl AccessLayer {
fulltext_index_config: request.fulltext_index_config,
bloom_filter_index_config: request.bloom_filter_index_config,
};
// We disable write cache on file system but we still use atomic write.
// TODO(yingwen): If we support other non-fs stores without the write cache, then
// we may have find a way to check whether we need the cleaner.
let cleaner = TempFileCleaner::new(region_id, self.object_store.clone());
let mut writer = ParquetWriter::new_with_object_store(
self.object_store.clone(),
request.metadata,
indexer_builder,
path_provider,
)
.await
.with_file_cleaner(cleaner);
.await;
writer
.write_all(request.source, request.max_sequence, write_opts)
.await?
@@ -222,85 +213,10 @@ pub struct SstWriteRequest {
pub bloom_filter_index_config: BloomFilterConfig,
}
/// Cleaner to remove temp files on the atomic write dir.
pub(crate) struct TempFileCleaner {
region_id: RegionId,
object_store: ObjectStore,
}
impl TempFileCleaner {
/// Constructs the cleaner for the region and store.
pub(crate) fn new(region_id: RegionId, object_store: ObjectStore) -> Self {
Self {
region_id,
object_store,
}
}
/// Removes the SST and index file from the local atomic dir by the file id.
pub(crate) async fn clean_by_file_id(&self, file_id: FileId) {
let sst_key = IndexKey::new(self.region_id, file_id, FileType::Parquet).to_string();
let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin).to_string();
Self::clean_atomic_dir_files(&self.object_store, &[&sst_key, &index_key]).await;
}
/// Removes the files from the local atomic dir by their names.
pub(crate) async fn clean_atomic_dir_files(
local_store: &ObjectStore,
names_to_remove: &[&str],
) {
// We don't know the actual suffix of the file under atomic dir, so we have
// to list the dir. The cost should be acceptable as there won't be to many files.
let Ok(entries) = local_store.list(ATOMIC_WRITE_DIR).await.inspect_err(|e| {
if e.kind() != ErrorKind::NotFound {
common_telemetry::error!(e; "Failed to list tmp files for {:?}", names_to_remove)
}
}) else {
return;
};
// In our case, we can ensure the file id is unique so it is safe to remove all files
// with the same file id under the atomic write dir.
let actual_files: Vec<_> = entries
.into_iter()
.filter_map(|entry| {
if entry.metadata().is_dir() {
return None;
}
// Remove name that matches files_to_remove.
let should_remove = names_to_remove
.iter()
.any(|file| entry.name().starts_with(file));
if should_remove {
Some(entry.path().to_string())
} else {
None
}
})
.collect();
common_telemetry::warn!(
"Clean files {:?} under atomic write dir for {:?}",
actual_files,
names_to_remove
);
if let Err(e) = local_store.delete_iter(actual_files).await {
common_telemetry::error!(e; "Failed to delete tmp file for {:?}", names_to_remove);
}
}
}
pub(crate) async fn new_fs_cache_store(root: &str) -> Result<ObjectStore> {
let atomic_write_dir = join_dir(root, ATOMIC_WRITE_DIR);
let atomic_write_dir = join_dir(root, ".tmp/");
clean_dir(&atomic_write_dir).await?;
// Compatible code. Remove this after a major release.
let old_atomic_temp_dir = join_dir(root, OLD_ATOMIC_WRITE_DIR);
clean_dir(&old_atomic_temp_dir).await?;
let builder = Fs::default().root(root).atomic_write_dir(&atomic_write_dir);
let store = ObjectStore::new(builder).context(OpenDalSnafu)?.finish();

View File

@@ -14,7 +14,6 @@
//! A cache for files.
use std::fmt;
use std::ops::Range;
use std::sync::Arc;
use std::time::{Duration, Instant};
@@ -340,18 +339,6 @@ impl IndexKey {
}
}
impl fmt::Display for IndexKey {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}.{}.{}",
self.region_id.as_u64(),
self.file_id,
self.file_type.as_str()
)
}
}
/// Type of the file.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum FileType {
@@ -393,7 +380,15 @@ pub(crate) struct IndexValue {
///
/// The file name format is `{region_id}.{file_id}.{file_type}`
fn cache_file_path(cache_file_dir: &str, key: IndexKey) -> String {
join_path(cache_file_dir, &key.to_string())
join_path(
cache_file_dir,
&format!(
"{}.{}.{}",
key.region_id.as_u64(),
key.file_id,
key.file_type.as_str()
),
)
}
/// Parse index key from the file name.

View File

@@ -26,7 +26,7 @@ use store_api::storage::RegionId;
use crate::access_layer::{
new_fs_cache_store, FilePathProvider, RegionFilePathFactory, SstInfoArray, SstWriteRequest,
TempFileCleaner, WriteCachePathProvider,
WriteCachePathProvider,
};
use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue};
use crate::error::{self, Result};
@@ -122,7 +122,7 @@ impl WriteCache {
row_group_size: write_opts.row_group_size,
puffin_manager: self
.puffin_manager_factory
.build(store.clone(), path_provider.clone()),
.build(store, path_provider.clone()),
intermediate_manager: self.intermediate_manager.clone(),
index_options: write_request.index_options,
inverted_index_config: write_request.inverted_index_config,
@@ -130,16 +130,14 @@ impl WriteCache {
bloom_filter_index_config: write_request.bloom_filter_index_config,
};
let cleaner = TempFileCleaner::new(region_id, store.clone());
// Write to FileCache.
let mut writer = ParquetWriter::new_with_object_store(
store.clone(),
self.file_cache.local_store(),
write_request.metadata,
indexer,
path_provider.clone(),
path_provider,
)
.await
.with_file_cleaner(cleaner);
.await;
let sst_info = writer
.write_all(write_request.source, write_request.max_sequence, write_opts)
@@ -203,26 +201,6 @@ impl WriteCache {
remote_path: &str,
remote_store: &ObjectStore,
file_size: u64,
) -> Result<()> {
if let Err(e) = self
.download_without_cleaning(index_key, remote_path, remote_store, file_size)
.await
{
let filename = index_key.to_string();
TempFileCleaner::clean_atomic_dir_files(&self.file_cache.local_store(), &[&filename])
.await;
return Err(e);
}
Ok(())
}
async fn download_without_cleaning(
&self,
index_key: IndexKey,
remote_path: &str,
remote_store: &ObjectStore,
file_size: u64,
) -> Result<()> {
const DOWNLOAD_READER_CONCURRENCY: usize = 8;
const DOWNLOAD_READER_CHUNK_SIZE: ReadableSize = ReadableSize::mb(8);
@@ -432,11 +410,9 @@ mod tests {
use common_test_util::temp_dir::create_temp_dir;
use super::*;
use crate::access_layer::{OperationType, ATOMIC_WRITE_DIR};
use crate::access_layer::OperationType;
use crate::cache::test_util::new_fs_store;
use crate::cache::{CacheManager, CacheStrategy};
use crate::error::InvalidBatchSnafu;
use crate::read::Source;
use crate::region::options::IndexOptions;
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::test_util::sst_util::{
@@ -602,82 +578,4 @@ mod tests {
// Check parquet metadata
assert_parquet_metadata_eq(write_parquet_metadata, reader.parquet_metadata());
}
#[tokio::test]
async fn test_write_cache_clean_tmp_files() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let data_home = env.data_home().display().to_string();
let mock_store = env.init_object_store_manager();
let write_cache_dir = create_temp_dir("");
let write_cache_path = write_cache_dir.path().to_str().unwrap();
let write_cache = env
.create_write_cache_from_path(write_cache_path, ReadableSize::mb(10))
.await;
// Create a cache manager using only write cache
let cache_manager = Arc::new(
CacheManager::builder()
.write_cache(Some(write_cache.clone()))
.build(),
);
// Create source
let metadata = Arc::new(sst_region_metadata());
// Creates a source that can return an error to abort the writer.
let source = Source::Iter(Box::new(
[
Ok(new_batch_by_range(&["a", "d"], 0, 60)),
InvalidBatchSnafu {
reason: "Abort the writer",
}
.fail(),
]
.into_iter(),
));
// Write to local cache and upload sst to mock remote store
let write_request = SstWriteRequest {
op_type: OperationType::Flush,
metadata,
source,
storage: None,
max_sequence: None,
cache_manager: cache_manager.clone(),
index_options: IndexOptions::default(),
inverted_index_config: Default::default(),
fulltext_index_config: Default::default(),
bloom_filter_index_config: Default::default(),
};
let write_opts = WriteOptions {
row_group_size: 512,
..Default::default()
};
let upload_request = SstUploadRequest {
dest_path_provider: RegionFilePathFactory::new(data_home.clone()),
remote_store: mock_store.clone(),
};
write_cache
.write_and_upload_sst(write_request, upload_request, &write_opts)
.await
.unwrap_err();
let atomic_write_dir = write_cache_dir.path().join(ATOMIC_WRITE_DIR);
let mut entries = tokio::fs::read_dir(&atomic_write_dir).await.unwrap();
let mut has_files = false;
while let Some(entry) = entries.next_entry().await.unwrap() {
if entry.file_type().await.unwrap().is_dir() {
continue;
}
has_files = true;
common_telemetry::warn!(
"Found remaining temporary file in atomic dir: {}",
entry.path().display()
);
}
assert!(!has_files);
}
}

View File

@@ -57,7 +57,7 @@ use crate::region::options::MergeMode;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
/// Initial vector builder capacity.
const INITIAL_BUILDER_CAPACITY: usize = 4;
const INITIAL_BUILDER_CAPACITY: usize = 16;
/// Vector builder capacity.
const BUILDER_CAPACITY: usize = 512;
@@ -645,19 +645,15 @@ struct Series {
}
impl Series {
pub(crate) fn with_capacity(region_metadata: &RegionMetadataRef, builder_cap: usize) -> Self {
fn new(region_metadata: &RegionMetadataRef) -> Self {
Self {
pk_cache: None,
active: ValueBuilder::new(region_metadata, builder_cap),
active: ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY),
frozen: vec![],
region_metadata: region_metadata.clone(),
}
}
pub(crate) fn new(region_metadata: &RegionMetadataRef) -> Self {
Self::with_capacity(region_metadata, INITIAL_BUILDER_CAPACITY)
}
/// Pushes a row of values into Series. Return the size of values.
fn push<'a>(
&mut self,

View File

@@ -363,9 +363,9 @@ mod tests {
builder
.push_field_array(
*column_id,
Arc::new(Int64Array::from_iter_values(
std::iter::repeat(*field).take(num_rows),
)),
Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
*field, num_rows,
))),
)
.unwrap();
}

View File

@@ -346,7 +346,6 @@ impl BloomFilterIndexer {
#[cfg(test)]
pub(crate) mod tests {
use std::iter;
use api::v1::SemanticType;
use datatypes::data_type::ConcreteDataType;
@@ -461,15 +460,15 @@ pub(crate) mod tests {
Batch::new(
primary_key,
Arc::new(UInt64Vector::from_iter_values(
iter::repeat(0).take(num_rows),
)),
Arc::new(UInt64Vector::from_iter_values(
iter::repeat(0).take(num_rows),
)),
Arc::new(UInt8Vector::from_iter_values(
iter::repeat(1).take(num_rows),
)),
Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
0, num_rows,
))),
Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
0, num_rows,
))),
Arc::new(UInt8Vector::from_iter_values(std::iter::repeat_n(
1, num_rows,
))),
vec![u64_field],
)
.unwrap()

View File

@@ -489,12 +489,12 @@ mod tests {
Arc::new(UInt64Vector::from_iter_values(
(0..num_rows).map(|n| n as u64),
)),
Arc::new(UInt64Vector::from_iter_values(
std::iter::repeat(0).take(num_rows),
)),
Arc::new(UInt8Vector::from_iter_values(
std::iter::repeat(1).take(num_rows),
)),
Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
0, num_rows,
))),
Arc::new(UInt8Vector::from_iter_values(std::iter::repeat_n(
1, num_rows,
))),
vec![
BatchColumn {
column_id: 1,

View File

@@ -326,7 +326,6 @@ impl InvertedIndexer {
#[cfg(test)]
mod tests {
use std::collections::BTreeSet;
use std::iter;
use api::v1::SemanticType;
use datafusion_expr::{binary_expr, col, lit, Expr as DfExpr, Operator};
@@ -424,15 +423,15 @@ mod tests {
Batch::new(
primary_key,
Arc::new(UInt64Vector::from_iter_values(
iter::repeat(0).take(num_rows),
)),
Arc::new(UInt64Vector::from_iter_values(
iter::repeat(0).take(num_rows),
)),
Arc::new(UInt8Vector::from_iter_values(
iter::repeat(1).take(num_rows),
)),
Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
0, num_rows,
))),
Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
0, num_rows,
))),
Arc::new(UInt8Vector::from_iter_values(std::iter::repeat_n(
1, num_rows,
))),
vec![u64_field],
)
.unwrap()

Some files were not shown because too many files have changed in this diff Show More