Compare commits

..

1 Commits

Author SHA1 Message Date
luofucong
1e37847f48 x 2025-01-02 15:21:29 +08:00
69 changed files with 526 additions and 1841 deletions

39
Cargo.lock generated
View File

@@ -2192,8 +2192,6 @@ dependencies = [
"datafusion-common",
"datafusion-expr",
"datatypes",
"deadpool",
"deadpool-postgres",
"derive_builder 0.12.0",
"etcd-client",
"futures",
@@ -3315,39 +3313,6 @@ dependencies = [
"sqlparser_derive 0.1.1",
]
[[package]]
name = "deadpool"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb84100978c1c7b37f09ed3ce3e5f843af02c2a2c431bae5b19230dad2c1b490"
dependencies = [
"async-trait",
"deadpool-runtime",
"num_cpus",
"tokio",
]
[[package]]
name = "deadpool-postgres"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bda39fa1cfff190d8924d447ad04fd22772c250438ca5ce1dfb3c80621c05aaa"
dependencies = [
"deadpool",
"tokio",
"tokio-postgres",
"tracing",
]
[[package]]
name = "deadpool-runtime"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b"
dependencies = [
"tokio",
]
[[package]]
name = "debugid"
version = "0.8.0"
@@ -4593,7 +4558,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=43ddd8dea69f4df0fe2e8b5cdc0044d2cfa35908#43ddd8dea69f4df0fe2e8b5cdc0044d2cfa35908"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=a875e976441188028353f7274a46a7e6e065c5d4#a875e976441188028353f7274a46a7e6e065c5d4"
dependencies = [
"prost 0.12.6",
"serde",
@@ -6566,8 +6531,6 @@ dependencies = [
"common-wal",
"dashmap",
"datatypes",
"deadpool",
"deadpool-postgres",
"derive_builder 0.12.0",
"etcd-client",
"futures",

View File

@@ -118,15 +118,13 @@ datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion
datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "7823ef2f63663907edab46af0d51359900f608d6" }
datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "7823ef2f63663907edab46af0d51359900f608d6" }
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "7823ef2f63663907edab46af0d51359900f608d6" }
deadpool = "0.10"
deadpool-postgres = "0.12"
derive_builder = "0.12"
dotenv = "0.15"
etcd-client = "0.13"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "43ddd8dea69f4df0fe2e8b5cdc0044d2cfa35908" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a875e976441188028353f7274a46a7e6e065c5d4" }
hex = "0.4"
http = "0.2"
humantime = "2.1"

View File

@@ -94,7 +94,7 @@
| `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `/tmp/greptimedb/` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}`. An empty string means disabling. |
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}/object_cache/read`. An empty string means disabling. |
| `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. |
| `storage.bucket` | String | Unset | The S3 bucket name.<br/>**It's only used when the storage type is `S3`, `Oss` and `Gcs`**. |
| `storage.root` | String | Unset | The S3 data will be stored in the specified prefix, for example, `s3://${bucket}/${root}`.<br/>**It's only used when the storage type is `S3`, `Oss` and `Azblob`**. |
@@ -132,10 +132,10 @@
| `region_engine.mito.vector_cache_size` | String | Auto | Cache size for vectors and arrow arrays. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
| `region_engine.mito.page_cache_size` | String | Auto | Cache size for pages of SST row groups. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/8 of OS memory. |
| `region_engine.mito.selector_result_cache_size` | String | Auto | Cache size for time series selector (e.g. `last_value()`). Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
| `region_engine.mito.enable_write_cache` | Bool | `false` | Whether to enable the write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance. |
| `region_engine.mito.write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}`. |
| `region_engine.mito.write_cache_size` | String | `5GiB` | Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. |
| `region_engine.mito.write_cache_ttl` | String | Unset | TTL for write cache. |
| `region_engine.mito.enable_experimental_write_cache` | Bool | `false` | Whether to enable the experimental write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance. |
| `region_engine.mito.experimental_write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}/object_cache/write`. |
| `region_engine.mito.experimental_write_cache_size` | String | `5GiB` | Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. |
| `region_engine.mito.experimental_write_cache_ttl` | String | Unset | TTL for write cache. |
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
@@ -151,7 +151,7 @@
| `region_engine.mito.inverted_index.intermediate_path` | String | `""` | Deprecated, use `region_engine.mito.index.aux_path` instead. |
| `region_engine.mito.inverted_index.metadata_cache_size` | String | `64MiB` | Cache size for inverted index metadata. |
| `region_engine.mito.inverted_index.content_cache_size` | String | `128MiB` | Cache size for inverted index content. |
| `region_engine.mito.inverted_index.content_cache_page_size` | String | `64KiB` | Page size for inverted index content cache. |
| `region_engine.mito.inverted_index.content_cache_page_size` | String | `8MiB` | Page size for inverted index content cache. |
| `region_engine.mito.fulltext_index` | -- | -- | The options for full-text index in Mito engine. |
| `region_engine.mito.fulltext_index.create_on_flush` | String | `auto` | Whether to create the index on flush.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.fulltext_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
@@ -466,10 +466,10 @@
| `region_engine.mito.vector_cache_size` | String | Auto | Cache size for vectors and arrow arrays. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
| `region_engine.mito.page_cache_size` | String | Auto | Cache size for pages of SST row groups. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/8 of OS memory. |
| `region_engine.mito.selector_result_cache_size` | String | Auto | Cache size for time series selector (e.g. `last_value()`). Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
| `region_engine.mito.enable_write_cache` | Bool | `false` | Whether to enable the write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance. |
| `region_engine.mito.write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}`. |
| `region_engine.mito.write_cache_size` | String | `5GiB` | Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. |
| `region_engine.mito.write_cache_ttl` | String | Unset | TTL for write cache. |
| `region_engine.mito.enable_experimental_write_cache` | Bool | `false` | Whether to enable the experimental write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance. |
| `region_engine.mito.experimental_write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}`. |
| `region_engine.mito.experimental_write_cache_size` | String | `5GiB` | Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. |
| `region_engine.mito.experimental_write_cache_ttl` | String | Unset | TTL for write cache. |
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
@@ -485,7 +485,7 @@
| `region_engine.mito.inverted_index.intermediate_path` | String | `""` | Deprecated, use `region_engine.mito.index.aux_path` instead. |
| `region_engine.mito.inverted_index.metadata_cache_size` | String | `64MiB` | Cache size for inverted index metadata. |
| `region_engine.mito.inverted_index.content_cache_size` | String | `128MiB` | Cache size for inverted index content. |
| `region_engine.mito.inverted_index.content_cache_page_size` | String | `64KiB` | Page size for inverted index content cache. |
| `region_engine.mito.inverted_index.content_cache_page_size` | String | `8MiB` | Page size for inverted index content cache. |
| `region_engine.mito.fulltext_index` | -- | -- | The options for full-text index in Mito engine. |
| `region_engine.mito.fulltext_index.create_on_flush` | String | `auto` | Whether to create the index on flush.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.fulltext_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |

View File

@@ -475,18 +475,18 @@ auto_flush_interval = "1h"
## @toml2docs:none-default="Auto"
#+ selector_result_cache_size = "512MB"
## Whether to enable the write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance.
enable_write_cache = false
## Whether to enable the experimental write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance.
enable_experimental_write_cache = false
## File system path for write cache, defaults to `{data_home}`.
write_cache_path = ""
experimental_write_cache_path = ""
## Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger.
write_cache_size = "5GiB"
experimental_write_cache_size = "5GiB"
## TTL for write cache.
## @toml2docs:none-default
write_cache_ttl = "8h"
experimental_write_cache_ttl = "8h"
## Buffer size for SST writing.
sst_write_buffer_size = "8MB"
@@ -550,7 +550,7 @@ metadata_cache_size = "64MiB"
content_cache_size = "128MiB"
## Page size for inverted index content cache.
content_cache_page_size = "64KiB"
content_cache_page_size = "8MiB"
## The options for full-text index in Mito engine.
[region_engine.mito.fulltext_index]

View File

@@ -337,7 +337,7 @@ data_home = "/tmp/greptimedb/"
type = "File"
## Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.
## A local file directory, defaults to `{data_home}`. An empty string means disabling.
## A local file directory, defaults to `{data_home}/object_cache/read`. An empty string means disabling.
## @toml2docs:none-default
#+ cache_path = ""
@@ -518,18 +518,18 @@ auto_flush_interval = "1h"
## @toml2docs:none-default="Auto"
#+ selector_result_cache_size = "512MB"
## Whether to enable the write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance.
enable_write_cache = false
## Whether to enable the experimental write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance.
enable_experimental_write_cache = false
## File system path for write cache, defaults to `{data_home}`.
write_cache_path = ""
## File system path for write cache, defaults to `{data_home}/object_cache/write`.
experimental_write_cache_path = ""
## Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger.
write_cache_size = "5GiB"
experimental_write_cache_size = "5GiB"
## TTL for write cache.
## @toml2docs:none-default
write_cache_ttl = "8h"
experimental_write_cache_ttl = "8h"
## Buffer size for SST writing.
sst_write_buffer_size = "8MB"
@@ -593,7 +593,7 @@ metadata_cache_size = "64MiB"
content_cache_size = "128MiB"
## Page size for inverted index content cache.
content_cache_page_size = "64KiB"
content_cache_page_size = "8MiB"
## The options for full-text index in Mito engine.
[region_engine.mito.fulltext_index]

View File

@@ -20,3 +20,31 @@ Sample at 49 Hertz, for 10 seconds, output report in text format.
```bash
curl -X POST -s '0:4000/debug/prof/cpu?seconds=10&frequency=49&output=text' > /tmp/pprof.txt
```
## Using `perf`
First find the pid of GreptimeDB:
Using `perf record` to profile GreptimeDB, at the sampling frequency of 99 hertz, and a duration of 60 seconds:
```bash
perf record -p <pid> --call-graph dwarf -F 99 -- sleep 60
```
The result will be saved to file `perf.data`.
Then
```bash
perf script --no-inline > perf.out
```
Produce a flame graph out of it:
```bash
git clone https://github.com/brendangregg/FlameGraph
FlameGraph/stackcollapse-perf.pl perf.out > perf.folded
FlameGraph/flamegraph.pl perf.folded > perf.svg
```

View File

@@ -14,7 +14,6 @@
import os
import re
from multiprocessing import Pool
def find_rust_files(directory):
@@ -34,11 +33,13 @@ def extract_branch_names(file_content):
return pattern.findall(file_content)
def check_snafu_in_files(branch_name, rust_files_content):
def check_snafu_in_files(branch_name, rust_files):
branch_name_snafu = f"{branch_name}Snafu"
for content in rust_files_content.values():
if branch_name_snafu in content:
return True
for rust_file in rust_files:
with open(rust_file, "r") as file:
content = file.read()
if branch_name_snafu in content:
return True
return False
@@ -48,24 +49,21 @@ def main():
for error_file in error_files:
with open(error_file, "r") as file:
branch_names.extend(extract_branch_names(file.read()))
content = file.read()
branch_names.extend(extract_branch_names(content))
# Read all rust files into memory once
rust_files_content = {}
for rust_file in other_rust_files:
with open(rust_file, "r") as file:
rust_files_content[rust_file] = file.read()
with Pool() as pool:
results = pool.starmap(
check_snafu_in_files, [(bn, rust_files_content) for bn in branch_names]
)
unused_snafu = [bn for bn, found in zip(branch_names, results) if not found]
unused_snafu = [
branch_name
for branch_name in branch_names
if not check_snafu_in_files(branch_name, other_rust_files)
]
if unused_snafu:
print("Unused error variants:")
for name in unused_snafu:
print(name)
if unused_snafu:
raise SystemExit(1)

View File

@@ -1,5 +1,5 @@
let
nixpkgs = fetchTarball "https://github.com/NixOS/nixpkgs/tarball/nixos-24.11";
nixpkgs = fetchTarball "https://github.com/NixOS/nixpkgs/tarball/nixos-unstable";
fenix = import (fetchTarball "https://github.com/nix-community/fenix/archive/main.tar.gz") {};
pkgs = import nixpkgs { config = {}; overlays = []; };
in
@@ -17,12 +17,10 @@ pkgs.mkShell rec {
})
cargo-nextest
taplo
curl
];
buildInputs = with pkgs; [
libgit2
libz
];
LD_LIBRARY_PATH = pkgs.lib.makeLibraryPath buildInputs;

View File

@@ -18,7 +18,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use common_error::ext::BoxedError;
use common_error::ext::{BoxedError, ErrorExt};
use common_meta::cache_invalidator::KvCacheInvalidator;
use common_meta::error::Error::CacheNotGet;
use common_meta::error::{CacheNotGetSnafu, Error, ExternalSnafu, GetKvCacheSnafu, Result};
@@ -37,6 +37,7 @@ use snafu::{OptionExt, ResultExt};
use crate::metrics::{
METRIC_CATALOG_KV_BATCH_GET, METRIC_CATALOG_KV_GET, METRIC_CATALOG_KV_REMOTE_GET,
METRIC_META_CLIENT_GET,
};
const DEFAULT_CACHE_MAX_CAPACITY: u64 = 10000;
@@ -292,7 +293,7 @@ impl KvBackend for CachedKvBackend {
}
.map_err(|e| {
GetKvCacheSnafu {
err_msg: e.to_string(),
err_msg: e.output_msg(),
}
.build()
});
@@ -445,6 +446,8 @@ impl KvBackend for MetaKvBackend {
}
async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>> {
let _timer = METRIC_META_CLIENT_GET.start_timer();
let mut response = self
.client
.range(RangeRequest::new().with_key(key))

View File

@@ -34,4 +34,6 @@ lazy_static! {
register_histogram!("greptime_catalog_kv_get", "catalog kv get").unwrap();
pub static ref METRIC_CATALOG_KV_BATCH_GET: Histogram =
register_histogram!("greptime_catalog_kv_batch_get", "catalog kv batch get").unwrap();
pub static ref METRIC_META_CLIENT_GET: Histogram =
register_histogram!("greptime_meta_client_get", "meta client get").unwrap();
}

View File

@@ -63,7 +63,9 @@ impl Instance {
&self.datanode
}
/// allow customizing datanode for downstream projects
/// Get mutable Datanode instance for changing some internal state, before starting it.
// Useful for wrapping Datanode instance. Please do not remove this method even if you find
// nowhere it is called.
pub fn datanode_mut(&mut self) -> &mut Datanode {
&mut self.datanode
}

View File

@@ -66,11 +66,6 @@ impl Instance {
pub fn flownode(&self) -> &FlownodeInstance {
&self.flownode
}
/// allow customizing flownode for downstream projects
pub fn flownode_mut(&mut self) -> &mut FlownodeInstance {
&mut self.flownode
}
}
#[async_trait::async_trait]

View File

@@ -69,7 +69,7 @@ fn test_load_datanode_example_config() {
region_engine: vec![
RegionEngineConfig::Mito(MitoConfig {
auto_flush_interval: Duration::from_secs(3600),
write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)),
experimental_write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)),
..Default::default()
}),
RegionEngineConfig::File(EngineConfig {}),
@@ -203,7 +203,7 @@ fn test_load_standalone_example_config() {
region_engine: vec![
RegionEngineConfig::Mito(MitoConfig {
auto_flush_interval: Duration::from_secs(3600),
write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)),
experimental_write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)),
..Default::default()
}),
RegionEngineConfig::File(EngineConfig {}),

View File

@@ -35,23 +35,10 @@ data = {
"bigint_other": [5, -5, 1, 5, 5],
"utf8_increase": ["a", "bb", "ccc", "dddd", "eeeee"],
"utf8_decrease": ["eeeee", "dddd", "ccc", "bb", "a"],
"timestamp_simple": [
datetime.datetime(2023, 4, 1, 20, 15, 30, 2000),
datetime.datetime.fromtimestamp(int("1629617204525777000") / 1000000000),
datetime.datetime(2023, 1, 1),
datetime.datetime(2023, 2, 1),
datetime.datetime(2023, 3, 1),
],
"date_simple": [
datetime.date(2023, 4, 1),
datetime.date(2023, 3, 1),
datetime.date(2023, 1, 1),
datetime.date(2023, 2, 1),
datetime.date(2023, 3, 1),
],
"timestamp_simple": [datetime.datetime(2023, 4, 1, 20, 15, 30, 2000), datetime.datetime.fromtimestamp(int('1629617204525777000')/1000000000), datetime.datetime(2023, 1, 1), datetime.datetime(2023, 2, 1), datetime.datetime(2023, 3, 1)],
"date_simple": [datetime.date(2023, 4, 1), datetime.date(2023, 3, 1), datetime.date(2023, 1, 1), datetime.date(2023, 2, 1), datetime.date(2023, 3, 1)]
}
def infer_schema(data):
schema = "struct<"
for key, value in data.items():
@@ -69,7 +56,7 @@ def infer_schema(data):
elif key.startswith("date"):
dt = "date"
else:
print(key, value, dt)
print(key,value,dt)
raise NotImplementedError
if key.startswith("double"):
dt = "double"
@@ -81,6 +68,7 @@ def infer_schema(data):
return schema
def _write(
schema: str,
data,

View File

@@ -60,7 +60,6 @@ pub fn alter_expr_to_request(table_id: TableId, expr: AlterTableExpr) -> Result<
column_schema: schema,
is_key: column_def.semantic_type == SemanticType::Tag as i32,
location: parse_location(ac.location)?,
add_if_not_exists: ac.add_if_not_exists,
})
})
.collect::<Result<Vec<_>>>()?;
@@ -221,7 +220,6 @@ mod tests {
..Default::default()
}),
location: None,
add_if_not_exists: true,
}],
})),
};
@@ -242,7 +240,6 @@ mod tests {
add_column.column_schema.data_type
);
assert_eq!(None, add_column.location);
assert!(add_column.add_if_not_exists);
}
#[test]
@@ -268,7 +265,6 @@ mod tests {
location_type: LocationType::First.into(),
after_column_name: String::default(),
}),
add_if_not_exists: false,
},
AddColumn {
column_def: Some(ColumnDef {
@@ -284,7 +280,6 @@ mod tests {
location_type: LocationType::After.into(),
after_column_name: "ts".to_string(),
}),
add_if_not_exists: true,
},
],
})),
@@ -313,7 +308,6 @@ mod tests {
}),
add_column.location
);
assert!(add_column.add_if_not_exists);
let add_column = add_columns.pop().unwrap();
assert!(!add_column.is_key);
@@ -323,7 +317,6 @@ mod tests {
add_column.column_schema.data_type
);
assert_eq!(Some(AddColumnLocation::First), add_column.location);
assert!(!add_column.add_if_not_exists);
}
#[test]

View File

@@ -299,7 +299,6 @@ mod tests {
.unwrap()
)
);
assert!(host_column.add_if_not_exists);
let memory_column = &add_columns.add_columns[1];
assert_eq!(
@@ -312,7 +311,6 @@ mod tests {
.unwrap()
)
);
assert!(host_column.add_if_not_exists);
let time_column = &add_columns.add_columns[2];
assert_eq!(
@@ -325,7 +323,6 @@ mod tests {
.unwrap()
)
);
assert!(host_column.add_if_not_exists);
let interval_column = &add_columns.add_columns[3];
assert_eq!(
@@ -338,7 +335,6 @@ mod tests {
.unwrap()
)
);
assert!(host_column.add_if_not_exists);
let decimal_column = &add_columns.add_columns[4];
assert_eq!(
@@ -356,7 +352,6 @@ mod tests {
.unwrap()
)
);
assert!(host_column.add_if_not_exists);
}
#[test]

View File

@@ -192,9 +192,6 @@ pub fn build_create_table_expr(
Ok(expr)
}
/// Find columns that are not present in the schema and return them as `AddColumns`
/// for adding columns automatically.
/// It always sets `add_if_not_exists` to `true` for now.
pub fn extract_new_columns(
schema: &Schema,
column_exprs: Vec<ColumnExpr>,
@@ -216,7 +213,6 @@ pub fn extract_new_columns(
AddColumn {
column_def,
location: None,
add_if_not_exists: true,
}
})
.collect::<Vec<_>>();

View File

@@ -35,8 +35,6 @@ common-wal.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datatypes.workspace = true
deadpool.workspace = true
deadpool-postgres.workspace = true
derive_builder.workspace = true
etcd-client.workspace = true
futures.workspace = true

View File

@@ -105,7 +105,7 @@ impl AlterLogicalTablesProcedure {
.context(ConvertAlterTableRequestSnafu)?;
let new_meta = table_info
.meta
.builder_with_alter_kind(table_ref.table, &request.alter_kind)
.builder_with_alter_kind(table_ref.table, &request.alter_kind, true)
.context(error::TableSnafu)?
.build()
.with_context(|_| error::BuildTableMetaSnafu {

View File

@@ -28,13 +28,13 @@ use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSn
use common_procedure::{
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, Status, StringKey,
};
use common_telemetry::{debug, error, info};
use common_telemetry::{debug, info};
use futures::future;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::storage::RegionId;
use strum::AsRefStr;
use table::metadata::{RawTableInfo, TableId, TableInfo};
use table::metadata::{RawTableInfo, TableId};
use table::table_reference::TableReference;
use crate::cache_invalidator::Context;
@@ -51,14 +51,10 @@ use crate::{metrics, ClusterId};
/// The alter table procedure
pub struct AlterTableProcedure {
/// The runtime context.
// The runtime context.
context: DdlContext,
/// The serialized data.
// The serialized data.
data: AlterTableData,
/// Cached new table metadata in the prepare step.
/// If we recover the procedure from json, then the table info value is not cached.
/// But we already validated it in the prepare step.
new_table_info: Option<TableInfo>,
}
impl AlterTableProcedure {
@@ -74,31 +70,18 @@ impl AlterTableProcedure {
Ok(Self {
context,
data: AlterTableData::new(task, table_id, cluster_id),
new_table_info: None,
})
}
pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data: AlterTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
Ok(AlterTableProcedure {
context,
data,
new_table_info: None,
})
Ok(AlterTableProcedure { context, data })
}
// Checks whether the table exists.
pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
self.check_alter().await?;
self.fill_table_info().await?;
// Validates the request and builds the new table info.
// We need to build the new table info here because we should ensure the alteration
// is valid in `UpdateMeta` state as we already altered the region.
// Safety: `fill_table_info()` already set it.
let table_info_value = self.data.table_info_value.as_ref().unwrap();
self.new_table_info = Some(self.build_new_table_info(&table_info_value.table_info)?);
// Safety: Checked in `AlterTableProcedure::new`.
let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
if matches!(alter_kind, Kind::RenameTable { .. }) {
@@ -123,14 +106,6 @@ impl AlterTableProcedure {
let leaders = find_leaders(&physical_table_route.region_routes);
let mut alter_region_tasks = Vec::with_capacity(leaders.len());
let alter_kind = self.make_region_alter_kind()?;
info!(
"Submitting alter region requests for table {}, table_id: {}, alter_kind: {:?}",
self.data.table_ref(),
table_id,
alter_kind,
);
for datanode in leaders {
let requester = self.context.node_manager.datanode(&datanode).await;
@@ -138,7 +113,7 @@ impl AlterTableProcedure {
for region in regions {
let region_id = RegionId::new(table_id, region);
let request = self.make_alter_region_request(region_id, alter_kind.clone())?;
let request = self.make_alter_region_request(region_id)?;
debug!("Submitting {request:?} to {datanode}");
let datanode = datanode.clone();
@@ -175,15 +150,7 @@ impl AlterTableProcedure {
let table_ref = self.data.table_ref();
// Safety: checked before.
let table_info_value = self.data.table_info_value.as_ref().unwrap();
// Gets the table info from the cache or builds it.
let new_info = match &self.new_table_info {
Some(cached) => cached.clone(),
None => self.build_new_table_info(&table_info_value.table_info)
.inspect_err(|e| {
// We already check the table info in the prepare step so this should not happen.
error!(e; "Unable to build info for table {} in update metadata step, table_id: {}", table_ref, table_id);
})?,
};
let new_info = self.build_new_table_info(&table_info_value.table_info)?;
debug!(
"Starting update table: {} metadata, new table info {:?}",
@@ -207,7 +174,7 @@ impl AlterTableProcedure {
.await?;
}
info!("Updated table metadata for table {table_ref}, table_id: {table_id}, kind: {alter_kind:?}");
info!("Updated table metadata for table {table_ref}, table_id: {table_id}");
self.data.state = AlterTableState::InvalidateTableCache;
Ok(Status::executing(true))
}

View File

@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use api::v1::alter_table_expr::Kind;
use api::v1::region::region_request::Body;
use api::v1::region::{
@@ -29,15 +27,13 @@ use crate::ddl::alter_table::AlterTableProcedure;
use crate::error::{InvalidProtoMsgSnafu, Result};
impl AlterTableProcedure {
/// Makes alter region request from existing an alter kind.
/// Region alter request always add columns if not exist.
pub(crate) fn make_alter_region_request(
&self,
region_id: RegionId,
kind: Option<alter_request::Kind>,
) -> Result<RegionRequest> {
/// Makes alter region request.
pub(crate) fn make_alter_region_request(&self, region_id: RegionId) -> Result<RegionRequest> {
// Safety: Checked in `AlterTableProcedure::new`.
let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
// Safety: checked
let table_info = self.data.table_info().unwrap();
let kind = create_proto_alter_kind(table_info, alter_kind)?;
Ok(RegionRequest {
header: Some(RegionRequestHeader {
@@ -51,66 +47,45 @@ impl AlterTableProcedure {
})),
})
}
/// Makes alter kind proto that all regions can reuse.
/// Region alter request always add columns if not exist.
pub(crate) fn make_region_alter_kind(&self) -> Result<Option<alter_request::Kind>> {
// Safety: Checked in `AlterTableProcedure::new`.
let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
// Safety: checked
let table_info = self.data.table_info().unwrap();
let kind = create_proto_alter_kind(table_info, alter_kind)?;
Ok(kind)
}
}
/// Creates region proto alter kind from `table_info` and `alter_kind`.
///
/// It always adds column if not exists and drops column if exists.
/// It skips the column if it already exists in the table.
/// Returns the kind and next column id if it adds new columns.
fn create_proto_alter_kind(
table_info: &RawTableInfo,
alter_kind: &Kind,
) -> Result<Option<alter_request::Kind>> {
match alter_kind {
Kind::AddColumns(x) => {
// Construct a set of existing columns in the table.
let existing_columns: HashSet<_> = table_info
.meta
.schema
.column_schemas
.iter()
.map(|col| &col.name)
.collect();
let mut next_column_id = table_info.meta.next_column_id;
let mut add_columns = Vec::with_capacity(x.add_columns.len());
for add_column in &x.add_columns {
let column_def = add_column
.column_def
.as_ref()
.context(InvalidProtoMsgSnafu {
err_msg: "'column_def' is absent",
})?;
let add_columns = x
.add_columns
.iter()
.map(|add_column| {
let column_def =
add_column
.column_def
.as_ref()
.context(InvalidProtoMsgSnafu {
err_msg: "'column_def' is absent",
})?;
// Skips existing columns.
if existing_columns.contains(&column_def.name) {
continue;
}
let column_id = next_column_id;
next_column_id += 1;
let column_id = next_column_id;
next_column_id += 1;
let column_def = RegionColumnDef {
column_def: Some(column_def.clone()),
column_id,
};
let column_def = RegionColumnDef {
column_def: Some(column_def.clone()),
column_id,
};
add_columns.push(AddColumn {
column_def: Some(column_def),
location: add_column.location.clone(),
});
}
Ok(AddColumn {
column_def: Some(column_def),
location: add_column.location.clone(),
})
})
.collect::<Result<Vec<_>>>()?;
Ok(Some(alter_request::Kind::AddColumns(AddColumns {
add_columns,
@@ -168,7 +143,6 @@ mod tests {
use crate::rpc::router::{Region, RegionRoute};
use crate::test_util::{new_ddl_context, MockDatanodeManager};
/// Prepares a region with schema `[ts: Timestamp, host: Tag, cpu: Field]`.
async fn prepare_ddl_context() -> (DdlContext, u64, TableId, RegionId, String) {
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(datanode_manager);
@@ -197,7 +171,6 @@ mod tests {
.name("cpu")
.data_type(ColumnDataType::Float64)
.semantic_type(SemanticType::Field)
.is_nullable(true)
.build()
.unwrap()
.into(),
@@ -252,16 +225,15 @@ mod tests {
name: "my_tag3".to_string(),
data_type: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: Vec::new(),
default_constraint: b"hello".to_vec(),
semantic_type: SemanticType::Tag as i32,
comment: String::new(),
..Default::default()
}),
location: Some(AddColumnLocation {
location_type: LocationType::After as i32,
after_column_name: "host".to_string(),
after_column_name: "my_tag2".to_string(),
}),
add_if_not_exists: false,
}],
})),
},
@@ -270,11 +242,8 @@ mod tests {
let mut procedure =
AlterTableProcedure::new(cluster_id, table_id, task, ddl_context).unwrap();
procedure.on_prepare().await.unwrap();
let alter_kind = procedure.make_region_alter_kind().unwrap();
let Some(Body::Alter(alter_region_request)) = procedure
.make_alter_region_request(region_id, alter_kind)
.unwrap()
.body
let Some(Body::Alter(alter_region_request)) =
procedure.make_alter_region_request(region_id).unwrap().body
else {
unreachable!()
};
@@ -290,7 +259,7 @@ mod tests {
name: "my_tag3".to_string(),
data_type: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: Vec::new(),
default_constraint: b"hello".to_vec(),
semantic_type: SemanticType::Tag as i32,
comment: String::new(),
..Default::default()
@@ -299,7 +268,7 @@ mod tests {
}),
location: Some(AddColumnLocation {
location_type: LocationType::After as i32,
after_column_name: "host".to_string(),
after_column_name: "my_tag2".to_string(),
}),
}]
}
@@ -330,11 +299,8 @@ mod tests {
let mut procedure =
AlterTableProcedure::new(cluster_id, table_id, task, ddl_context).unwrap();
procedure.on_prepare().await.unwrap();
let alter_kind = procedure.make_region_alter_kind().unwrap();
let Some(Body::Alter(alter_region_request)) = procedure
.make_alter_region_request(region_id, alter_kind)
.unwrap()
.body
let Some(Body::Alter(alter_region_request)) =
procedure.make_alter_region_request(region_id).unwrap().body
else {
unreachable!()
};

View File

@@ -23,9 +23,7 @@ use crate::key::table_info::TableInfoValue;
use crate::key::{DeserializedValueWithBytes, RegionDistribution};
impl AlterTableProcedure {
/// Builds new table info after alteration.
/// It bumps the column id of the table by the number of the add column requests.
/// So there may be holes in the column id sequence.
/// Builds new_meta
pub(crate) fn build_new_table_info(&self, table_info: &RawTableInfo) -> Result<TableInfo> {
let table_info =
TableInfo::try_from(table_info.clone()).context(error::ConvertRawTableInfoSnafu)?;
@@ -36,7 +34,7 @@ impl AlterTableProcedure {
let new_meta = table_info
.meta
.builder_with_alter_kind(table_ref.table, &request.alter_kind)
.builder_with_alter_kind(table_ref.table, &request.alter_kind, false)
.context(error::TableSnafu)?
.build()
.with_context(|_| error::BuildTableMetaSnafu {
@@ -48,9 +46,6 @@ impl AlterTableProcedure {
new_info.ident.version = table_info.ident.version + 1;
match request.alter_kind {
AlterKind::AddColumns { columns } => {
// Bumps the column id for the new columns.
// It may bump more than the actual number of columns added if there are
// existing columns, but it's fine.
new_info.meta.next_column_id += columns.len() as u32;
}
AlterKind::RenameTable { new_table_name } => {

View File

@@ -30,8 +30,6 @@ pub struct TestAlterTableExpr {
add_columns: Vec<ColumnDef>,
#[builder(setter(into, strip_option))]
new_table_name: Option<String>,
#[builder(setter)]
add_if_not_exists: bool,
}
impl From<TestAlterTableExpr> for AlterTableExpr {
@@ -55,7 +53,6 @@ impl From<TestAlterTableExpr> for AlterTableExpr {
.map(|col| AddColumn {
column_def: Some(col),
location: None,
add_if_not_exists: value.add_if_not_exists,
})
.collect(),
})),

View File

@@ -56,7 +56,6 @@ fn make_alter_logical_table_add_column_task(
let alter_table = alter_table
.table_name(table.to_string())
.add_columns(add_columns)
.add_if_not_exists(true)
.build()
.unwrap();

View File

@@ -139,7 +139,7 @@ async fn test_on_submit_alter_request() {
table_name: table_name.to_string(),
kind: Some(Kind::DropColumns(DropColumns {
drop_columns: vec![DropColumn {
name: "cpu".to_string(),
name: "my_field_column".to_string(),
}],
})),
},
@@ -225,7 +225,7 @@ async fn test_on_submit_alter_request_with_outdated_request() {
table_name: table_name.to_string(),
kind: Some(Kind::DropColumns(DropColumns {
drop_columns: vec![DropColumn {
name: "cpu".to_string(),
name: "my_field_column".to_string(),
}],
})),
},
@@ -330,7 +330,6 @@ async fn test_on_update_metadata_add_columns() {
..Default::default()
}),
location: None,
add_if_not_exists: false,
}],
})),
},

View File

@@ -667,18 +667,10 @@ pub enum Error {
},
#[cfg(feature = "pg_kvbackend")]
#[snafu(display("Failed to create connection pool for Postgres"))]
CreatePostgresPool {
#[snafu(display("Failed to connect to Postgres"))]
ConnectPostgres {
#[snafu(source)]
error: deadpool_postgres::CreatePoolError,
#[snafu(implicit)]
location: Location,
},
#[cfg(feature = "pg_kvbackend")]
#[snafu(display("Failed to get Postgres connection from pool: {}", reason))]
GetPostgresConnection {
reason: String,
error: tokio_postgres::Error,
#[snafu(implicit)]
location: Location,
},
@@ -794,9 +786,9 @@ impl ErrorExt for Error {
| EmptyDdlTasks { .. } => StatusCode::InvalidArguments,
#[cfg(feature = "pg_kvbackend")]
PostgresExecution { .. } | CreatePostgresPool { .. } | GetPostgresConnection { .. } => {
StatusCode::Internal
}
PostgresExecution { .. } => StatusCode::Internal,
#[cfg(feature = "pg_kvbackend")]
ConnectPostgres { .. } => StatusCode::Internal,
Error::DatanodeTableInfoNotFound { .. } => StatusCode::Internal,
}
}

View File

@@ -29,6 +29,7 @@ use crate::error::{self, Error, InvalidMetadataSnafu, ParseOptionSnafu, Result};
use crate::key::{MetadataKey, SCHEMA_NAME_KEY_PATTERN, SCHEMA_NAME_KEY_PREFIX};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::KvBackendRef;
use crate::metrics::METRIC_META_SCHEMA_INFO_GET;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use crate::rpc::store::RangeRequest;
use crate::rpc::KeyValue;
@@ -209,6 +210,8 @@ impl SchemaManager {
&self,
schema: SchemaNameKey<'_>,
) -> Result<Option<DeserializedValueWithBytes<SchemaNameValue>>> {
let _timer = METRIC_META_SCHEMA_INFO_GET.start_timer();
let raw_key = schema.to_bytes();
self.kv_backend
.get(&raw_key)

View File

@@ -29,6 +29,7 @@ use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{DeserializedValueWithBytes, MetadataKey, MetadataValue, TABLE_INFO_KEY_PREFIX};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::KvBackendRef;
use crate::metrics::METRIC_META_TABLE_INFO_GET;
use crate::rpc::store::BatchGetRequest;
/// The key stores the metadata of the table.
@@ -194,6 +195,8 @@ impl TableInfoManager {
&self,
table_id: TableId,
) -> Result<Option<DeserializedValueWithBytes<TableInfoValue>>> {
let _timer = METRIC_META_TABLE_INFO_GET.start_timer();
let key = TableInfoKey::new(table_id);
let raw_key = key.to_bytes();
self.kv_backend

View File

@@ -16,17 +16,15 @@ use std::any::Any;
use std::borrow::Cow;
use std::sync::Arc;
use deadpool_postgres::{Config, Pool, Runtime};
use common_telemetry::error;
use snafu::ResultExt;
use tokio_postgres::types::ToSql;
use tokio_postgres::NoTls;
use tokio_postgres::{Client, NoTls};
use crate::error::{
CreatePostgresPoolSnafu, Error, GetPostgresConnectionSnafu, PostgresExecutionSnafu, Result,
StrFromUtf8Snafu,
};
use super::{KvBackend, TxnService};
use crate::error::{ConnectPostgresSnafu, Error, PostgresExecutionSnafu, Result, StrFromUtf8Snafu};
use crate::kv_backend::txn::{Txn as KvTxn, TxnResponse as KvTxnResponse};
use crate::kv_backend::{KvBackend, KvBackendRef, TxnService};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
@@ -36,7 +34,8 @@ use crate::rpc::KeyValue;
/// Posgres backend store for metasrv
pub struct PgStore {
pool: Pool,
// TODO: Consider using sqlx crate.
client: Client,
}
const EMPTY: &[u8] = &[0];
@@ -95,49 +94,33 @@ SELECT k, v FROM prev;"#;
impl PgStore {
/// Create pgstore impl of KvBackendRef from url.
pub async fn with_url(url: &str) -> Result<KvBackendRef> {
let mut cfg = Config::new();
cfg.url = Some(url.to_string());
let pool = cfg
.create_pool(Some(Runtime::Tokio1), NoTls)
.context(CreatePostgresPoolSnafu)?;
Self::with_pg_pool(pool).await
// TODO: support tls.
let (client, conn) = tokio_postgres::connect(url, NoTls)
.await
.context(ConnectPostgresSnafu)?;
tokio::spawn(async move {
if let Err(e) = conn.await {
error!(e; "connection error");
}
});
Self::with_pg_client(client).await
}
/// Create pgstore impl of KvBackendRef from tokio-postgres client.
pub async fn with_pg_pool(pool: Pool) -> Result<KvBackendRef> {
pub async fn with_pg_client(client: Client) -> Result<KvBackendRef> {
// This step ensures the postgres metadata backend is ready to use.
// We check if greptime_metakv table exists, and we will create a new table
// if it does not exist.
let client = match pool.get().await {
Ok(client) => client,
Err(e) => {
return GetPostgresConnectionSnafu {
reason: e.to_string(),
}
.fail();
}
};
client
.execute(METADKV_CREATION, &[])
.await
.context(PostgresExecutionSnafu)?;
Ok(Arc::new(Self { pool }))
}
async fn get_client(&self) -> Result<deadpool::managed::Object<deadpool_postgres::Manager>> {
match self.pool.get().await {
Ok(client) => Ok(client),
Err(e) => GetPostgresConnectionSnafu {
reason: e.to_string(),
}
.fail(),
}
Ok(Arc::new(Self { client }))
}
async fn put_if_not_exists(&self, key: &str, value: &str) -> Result<bool> {
let res = self
.get_client()
.await?
.client
.query(PUT_IF_NOT_EXISTS, &[&key, &value])
.await
.context(PostgresExecutionSnafu)?;
@@ -276,8 +259,7 @@ impl KvBackend for PgStore {
})
.collect();
let res = self
.get_client()
.await?
.client
.query(&template, &params)
.await
.context(PostgresExecutionSnafu)?;
@@ -345,10 +327,8 @@ impl KvBackend for PgStore {
in_params.iter().map(|x| x as &(dyn ToSql + Sync)).collect();
let query = generate_batch_upsert_query(req.kvs.len());
let res = self
.get_client()
.await?
.client
.query(&query, &params)
.await
.context(PostgresExecutionSnafu)?;
@@ -385,10 +365,8 @@ impl KvBackend for PgStore {
.iter()
.map(|x| x as &(dyn ToSql + Sync))
.collect();
let res = self
.get_client()
.await?
.client
.query(&query, &params)
.await
.context(PostgresExecutionSnafu)?;
@@ -431,8 +409,7 @@ impl KvBackend for PgStore {
.collect();
let res = self
.get_client()
.await?
.client
.query(template, &params)
.await
.context(PostgresExecutionSnafu)?;
@@ -476,10 +453,8 @@ impl KvBackend for PgStore {
.iter()
.map(|x| x as &(dyn ToSql + Sync))
.collect();
let res = self
.get_client()
.await?
.client
.query(&query, &params)
.await
.context(PostgresExecutionSnafu)?;
@@ -513,8 +488,7 @@ impl KvBackend for PgStore {
let expect = process_bytes(&req.expect, "CASExpect")?;
let res = self
.get_client()
.await?
.client
.query(CAS, &[&key, &value, &expect])
.await
.context(PostgresExecutionSnafu)?;
@@ -586,19 +560,10 @@ mod tests {
return None;
}
let mut cfg = Config::new();
cfg.url = Some(endpoints);
let pool = cfg
.create_pool(Some(Runtime::Tokio1), NoTls)
.context(CreatePostgresPoolSnafu)
.unwrap();
let client = pool.get().await.unwrap();
client
.execute(METADKV_CREATION, &[])
.await
.context(PostgresExecutionSnafu)
.unwrap();
Some(PgStore { pool })
let (client, connection) = tokio_postgres::connect(&endpoints, NoTls).await.unwrap();
tokio::spawn(connection);
let _ = client.execute(METADKV_CREATION, &[]).await;
Some(PgStore { client })
}
#[tokio::test]

View File

@@ -108,4 +108,9 @@ lazy_static! {
&["name"]
)
.unwrap();
pub static ref METRIC_META_TABLE_INFO_GET: Histogram =
register_histogram!("greptime_meta_table_info_get", "get table info from kvbackend").unwrap();
pub static ref METRIC_META_SCHEMA_INFO_GET: Histogram =
register_histogram!("greptime_meta_schema_info_get", "get schema info from kvbackend").unwrap();
}

View File

@@ -433,8 +433,8 @@ impl DatanodeBuilder {
) -> Result<MitoEngine> {
if opts.storage.is_object_storage() {
// Enable the write cache when setting object storage
config.enable_write_cache = true;
info!("Configured 'enable_write_cache=true' for mito engine.");
config.enable_experimental_write_cache = true;
info!("Configured 'enable_experimental_write_cache=true' for mito engine.");
}
let mito_engine = match &opts.wal {

View File

@@ -45,12 +45,17 @@ use tokio::sync::broadcast::error::TryRecvError;
use tokio::sync::{broadcast, watch, Mutex, RwLock};
pub(crate) use crate::adapter::node_context::FlownodeContext;
use crate::adapter::table_source::ManagedTableSource;
use crate::adapter::util::relation_desc_to_column_schemas_with_fallback;
use crate::adapter::table_source::TableSource;
use crate::adapter::util::{
relation_desc_to_column_schemas_with_fallback, table_info_value_to_relation_desc,
};
use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
use crate::compute::ErrCollector;
use crate::df_optimizer::sql_to_flow_plan;
use crate::error::{EvalSnafu, ExternalSnafu, InternalSnafu, InvalidQuerySnafu, UnexpectedSnafu};
use crate::error::{
EvalSnafu, ExternalSnafu, FlowAlreadyExistSnafu, InternalSnafu, InvalidQuerySnafu,
UnexpectedSnafu,
};
use crate::expr::Batch;
use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_RUN_INTERVAL_MS};
use crate::repr::{self, DiffRow, RelationDesc, Row, BATCH_SIZE};
@@ -64,7 +69,7 @@ mod util;
mod worker;
pub(crate) mod node_context;
pub(crate) mod table_source;
mod table_source;
use crate::error::Error;
use crate::utils::StateReportHandler;
@@ -124,7 +129,7 @@ pub struct FlowWorkerManager {
/// The query engine that will be used to parse the query and convert it to a dataflow plan
pub query_engine: Arc<dyn QueryEngine>,
/// Getting table name and table schema from table info manager
table_info_source: ManagedTableSource,
table_info_source: TableSource,
frontend_invoker: RwLock<Option<FrontendInvoker>>,
/// contains mapping from table name to global id, and table schema
node_context: RwLock<FlownodeContext>,
@@ -153,11 +158,11 @@ impl FlowWorkerManager {
query_engine: Arc<dyn QueryEngine>,
table_meta: TableMetadataManagerRef,
) -> Self {
let srv_map = ManagedTableSource::new(
let srv_map = TableSource::new(
table_meta.table_info_manager().clone(),
table_meta.table_name_manager().clone(),
);
let node_context = FlownodeContext::new(Box::new(srv_map.clone()) as _);
let node_context = FlownodeContext::default();
let tick_manager = FlowTickManager::new();
let worker_handles = Vec::new();
FlowWorkerManager {
@@ -404,7 +409,7 @@ impl FlowWorkerManager {
) -> Result<Option<(Vec<String>, Option<usize>, Vec<ColumnSchema>)>, Error> {
if let Some(table_id) = self
.table_info_source
.get_opt_table_id_from_name(table_name)
.get_table_id_from_name(table_name)
.await?
{
let table_info = self
@@ -724,6 +729,43 @@ impl FlowWorkerManager {
query_ctx,
} = args;
let already_exist = {
let mut flag = false;
// check if the task already exists
for handle in self.worker_handles.iter() {
if handle.lock().await.contains_flow(flow_id).await? {
flag = true;
break;
}
}
flag
};
match (create_if_not_exists, or_replace, already_exist) {
// do replace
(_, true, true) => {
info!("Replacing flow with id={}", flow_id);
self.remove_flow(flow_id).await?;
}
(false, false, true) => FlowAlreadyExistSnafu { id: flow_id }.fail()?,
// do nothing if exists
(true, false, true) => {
info!("Flow with id={} already exists, do nothing", flow_id);
return Ok(None);
}
// create if not exists
(_, _, false) => (),
}
if create_if_not_exists {
// check if the task already exists
for handle in self.worker_handles.iter() {
if handle.lock().await.contains_flow(flow_id).await? {
return Ok(None);
}
}
}
let mut node_ctx = self.node_context.write().await;
// assign global id to source and sink table
for source in &source_table_ids {
@@ -786,9 +828,27 @@ impl FlowWorkerManager {
.fail()?,
}
}
let table_id = self
.table_info_source
.get_table_id_from_name(&sink_table_name)
.await?
.context(UnexpectedSnafu {
reason: format!("Can't get table id for table name {:?}", sink_table_name),
})?;
let table_info_value = self
.table_info_source
.get_table_info_value(&table_id)
.await?
.context(UnexpectedSnafu {
reason: format!("Can't get table info value for table id {:?}", table_id),
})?;
let real_schema = table_info_value_to_relation_desc(table_info_value)?;
node_ctx.assign_table_schema(&sink_table_name, real_schema.clone())?;
} else {
// assign inferred schema to sink table
// create sink table
node_ctx.assign_table_schema(&sink_table_name, flow_plan.schema.clone())?;
let did_create = self
.create_table_from_relation(
&format!("flow-id={flow_id}"),
@@ -837,11 +897,9 @@ impl FlowWorkerManager {
source_ids,
src_recvs: source_receivers,
expire_after,
or_replace,
create_if_not_exists,
err_collector,
};
handle.create_flow(create_request).await?;
info!("Successfully create flow with id={}", flow_id);
Ok(Some(flow_id))

View File

@@ -25,24 +25,20 @@ use common_meta::error::{ExternalSnafu, Result, UnexpectedSnafu};
use common_meta::node_manager::Flownode;
use common_telemetry::{debug, trace};
use itertools::Itertools;
use snafu::{IntoError, OptionExt, ResultExt};
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use super::util::from_proto_to_data_type;
use crate::adapter::{CreateFlowArgs, FlowWorkerManager};
use crate::error::{CreateFlowSnafu, InsertIntoFlowSnafu, InternalSnafu};
use crate::error::InternalSnafu;
use crate::metrics::METRIC_FLOW_TASK_COUNT;
use crate::repr::{self, DiffRow};
/// return a function to convert `crate::error::Error` to `common_meta::error::Error`
fn to_meta_err(
location: snafu::Location,
) -> impl FnOnce(crate::error::Error) -> common_meta::error::Error {
move |err: crate::error::Error| -> common_meta::error::Error {
common_meta::error::Error::External {
location,
source: BoxedError::new(err),
}
}
fn to_meta_err(err: crate::error::Error) -> common_meta::error::Error {
// TODO(discord9): refactor this
Err::<(), _>(BoxedError::new(err))
.with_context(|_| ExternalSnafu)
.unwrap_err()
}
#[async_trait::async_trait]
@@ -79,16 +75,11 @@ impl Flownode for FlowWorkerManager {
or_replace,
expire_after,
comment: Some(comment),
sql: sql.clone(),
sql,
flow_options,
query_ctx,
};
let ret = self
.create_flow(args)
.await
.map_err(BoxedError::new)
.with_context(|_| CreateFlowSnafu { sql: sql.clone() })
.map_err(to_meta_err(snafu::location!()))?;
let ret = self.create_flow(args).await.map_err(to_meta_err)?;
METRIC_FLOW_TASK_COUNT.inc();
Ok(FlowResponse {
affected_flows: ret
@@ -103,7 +94,7 @@ impl Flownode for FlowWorkerManager {
})) => {
self.remove_flow(flow_id.id as u64)
.await
.map_err(to_meta_err(snafu::location!()))?;
.map_err(to_meta_err)?;
METRIC_FLOW_TASK_COUNT.dec();
Ok(Default::default())
}
@@ -121,15 +112,9 @@ impl Flownode for FlowWorkerManager {
.await
.flush_all_sender()
.await
.map_err(to_meta_err(snafu::location!()))?;
let rows_send = self
.run_available(true)
.await
.map_err(to_meta_err(snafu::location!()))?;
let row = self
.send_writeback_requests()
.await
.map_err(to_meta_err(snafu::location!()))?;
.map_err(to_meta_err)?;
let rows_send = self.run_available(true).await.map_err(to_meta_err)?;
let row = self.send_writeback_requests().await.map_err(to_meta_err)?;
debug!(
"Done to flush flow_id={:?} with {} input rows flushed, {} rows sended and {} output rows flushed",
@@ -169,23 +154,17 @@ impl Flownode for FlowWorkerManager {
// TODO(discord9): reconsider time assignment mechanism
let now = self.tick_manager.tick();
let (table_types, fetch_order) = {
let fetch_order = {
let ctx = self.node_context.read().await;
// TODO(discord9): also check schema version so that altered table can be reported
let table_schema = ctx
.table_source
.table_from_id(&table_id)
.await
.map_err(to_meta_err(snafu::location!()))?;
let table_types = table_schema
.typ()
.column_types
.clone()
.into_iter()
.map(|t| t.scalar_type)
.collect_vec();
let table_col_names = table_schema.names;
let table_col_names = ctx
.table_repr
.get_by_table_id(&table_id)
.map(|r| r.1)
.and_then(|id| ctx.schema.get(&id))
.map(|desc| &desc.names)
.context(UnexpectedSnafu {
err_msg: format!("Table not found: {}", table_id),
})?;
let table_col_names = table_col_names
.iter().enumerate()
.map(|(idx,name)| match name {
@@ -204,19 +183,16 @@ impl Flownode for FlowWorkerManager {
);
let fetch_order: Vec<usize> = table_col_names
.iter()
.map(|col_name| {
name_to_col
.get(col_name)
.copied()
.with_context(|| UnexpectedSnafu {
err_msg: format!("Column not found: {}", col_name),
})
.map(|names| {
name_to_col.get(names).copied().context(UnexpectedSnafu {
err_msg: format!("Column not found: {}", names),
})
})
.try_collect()?;
if !fetch_order.iter().enumerate().all(|(i, &v)| i == v) {
trace!("Reordering columns: {:?}", fetch_order)
}
(table_types, fetch_order)
fetch_order
};
let rows: Vec<DiffRow> = rows_proto
@@ -231,29 +207,17 @@ impl Flownode for FlowWorkerManager {
})
.map(|r| (r, now, 1))
.collect_vec();
if let Err(err) = self
.handle_write_request(region_id.into(), rows, &table_types)
let batch_datatypes = insert_schema
.iter()
.map(from_proto_to_data_type)
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(to_meta_err)?;
self.handle_write_request(region_id.into(), rows, &batch_datatypes)
.await
{
let err = BoxedError::new(err);
let flow_ids = self
.node_context
.read()
.await
.get_flow_ids(table_id)
.into_iter()
.flatten()
.cloned()
.collect_vec();
let err = InsertIntoFlowSnafu {
region_id,
flow_ids,
}
.into_error(err);
common_telemetry::error!(err; "Failed to handle write request");
let err = to_meta_err(snafu::location!())(err);
return Err(err);
}
.map_err(|err| {
common_telemetry::error!(err;"Failed to handle write request");
to_meta_err(err)
})?;
}
Ok(Default::default())
}

View File

@@ -25,8 +25,7 @@ use snafu::{OptionExt, ResultExt};
use table::metadata::TableId;
use tokio::sync::{broadcast, mpsc, RwLock};
use crate::adapter::table_source::FlowTableSource;
use crate::adapter::{FlowId, ManagedTableSource, TableName};
use crate::adapter::{FlowId, TableName, TableSource};
use crate::error::{Error, EvalSnafu, TableNotFoundSnafu};
use crate::expr::error::InternalSnafu;
use crate::expr::{Batch, GlobalId};
@@ -34,7 +33,7 @@ use crate::metrics::METRIC_FLOW_INPUT_BUF_SIZE;
use crate::repr::{DiffRow, RelationDesc, BATCH_SIZE, BROADCAST_CAP, SEND_BUF_CAP};
/// A context that holds the information of the dataflow
#[derive(Debug)]
#[derive(Default, Debug)]
pub struct FlownodeContext {
/// mapping from source table to tasks, useful for schedule which task to run when a source table is updated
pub source_to_tasks: BTreeMap<TableId, BTreeSet<FlowId>>,
@@ -51,32 +50,13 @@ pub struct FlownodeContext {
/// note that the sink receiver should only have one, and we are using broadcast as mpsc channel here
pub sink_receiver:
BTreeMap<TableName, (mpsc::UnboundedSender<Batch>, mpsc::UnboundedReceiver<Batch>)>,
/// can query the schema of the table source, from metasrv with local cache
pub table_source: Box<dyn FlowTableSource>,
/// the schema of the table, query from metasrv or inferred from TypedPlan
pub schema: HashMap<GlobalId, RelationDesc>,
/// All the tables that have been registered in the worker
pub table_repr: IdToNameMap,
pub query_context: Option<Arc<QueryContext>>,
}
impl FlownodeContext {
pub fn new(table_source: Box<dyn FlowTableSource>) -> Self {
Self {
source_to_tasks: Default::default(),
flow_to_sink: Default::default(),
sink_to_flow: Default::default(),
source_sender: Default::default(),
sink_receiver: Default::default(),
table_source,
table_repr: Default::default(),
query_context: Default::default(),
}
}
pub fn get_flow_ids(&self, table_id: TableId) -> Option<&BTreeSet<FlowId>> {
self.source_to_tasks.get(&table_id)
}
}
/// a simple broadcast sender with backpressure, bounded capacity and blocking on send when send buf is full
/// note that it wouldn't evict old data, so it's possible to block forever if the receiver is slow
///
@@ -304,7 +284,7 @@ impl FlownodeContext {
/// Retrieves a GlobalId and table schema representing a table previously registered by calling the [register_table] function.
///
/// Returns an error if no table has been registered with the provided names
pub async fn table(&self, name: &TableName) -> Result<(GlobalId, RelationDesc), Error> {
pub fn table(&self, name: &TableName) -> Result<(GlobalId, RelationDesc), Error> {
let id = self
.table_repr
.get_by_name(name)
@@ -312,7 +292,13 @@ impl FlownodeContext {
.with_context(|| TableNotFoundSnafu {
name: name.join("."),
})?;
let schema = self.table_source.table(name).await?;
let schema = self
.schema
.get(&id)
.cloned()
.with_context(|| TableNotFoundSnafu {
name: name.join("."),
})?;
Ok((id, schema))
}
@@ -326,7 +312,7 @@ impl FlownodeContext {
/// merely creating a mapping from table id to global id
pub async fn assign_global_id_to_table(
&mut self,
srv_map: &ManagedTableSource,
srv_map: &TableSource,
mut table_name: Option<TableName>,
table_id: Option<TableId>,
) -> Result<GlobalId, Error> {
@@ -347,8 +333,9 @@ impl FlownodeContext {
// table id is Some meaning db must have created the table
if let Some(table_id) = table_id {
let known_table_name = srv_map.get_table_name(&table_id).await?;
let (known_table_name, schema) = srv_map.get_table_name_schema(&table_id).await?;
table_name = table_name.or(Some(known_table_name));
self.schema.insert(global_id, schema);
} // if we don't have table id, it means database haven't assign one yet or we don't need it
// still update the mapping with new global id
@@ -357,6 +344,26 @@ impl FlownodeContext {
}
}
/// Assign a schema to a table
///
pub fn assign_table_schema(
&mut self,
table_name: &TableName,
schema: RelationDesc,
) -> Result<(), Error> {
let gid = self
.table_repr
.get_by_name(table_name)
.map(|(_, gid)| gid)
.context(TableNotFoundSnafu {
name: format!("Table not found: {:?} in flownode cache", table_name),
})?;
self.schema.insert(gid, schema);
Ok(())
}
/// Get a new global id
pub fn new_global_id(&self) -> GlobalId {
GlobalId::User(self.table_repr.global_id_to_name_id.len() as u64)

View File

@@ -27,56 +27,16 @@ use crate::error::{
};
use crate::repr::RelationDesc;
/// Table source but for flow, provide table schema by table name/id
#[async_trait::async_trait]
pub trait FlowTableSource: Send + Sync + std::fmt::Debug {
async fn table_name_from_id(&self, table_id: &TableId) -> Result<TableName, Error>;
async fn table_id_from_name(&self, name: &TableName) -> Result<TableId, Error>;
/// Get the table schema by table name
async fn table(&self, name: &TableName) -> Result<RelationDesc, Error> {
let id = self.table_id_from_name(name).await?;
self.table_from_id(&id).await
}
async fn table_from_id(&self, table_id: &TableId) -> Result<RelationDesc, Error>;
}
/// managed table source information, query from table info manager and table name manager
#[derive(Clone)]
pub struct ManagedTableSource {
/// mapping of table name <-> table id should be query from tableinfo manager
pub struct TableSource {
/// for query `TableId -> TableName` mapping
table_info_manager: TableInfoManager,
table_name_manager: TableNameManager,
}
#[async_trait::async_trait]
impl FlowTableSource for ManagedTableSource {
async fn table_from_id(&self, table_id: &TableId) -> Result<RelationDesc, Error> {
let table_info_value = self
.get_table_info_value(table_id)
.await?
.with_context(|| TableNotFoundSnafu {
name: format!("TableId = {:?}, Can't found table info", table_id),
})?;
let desc = table_info_value_to_relation_desc(table_info_value)?;
Ok(desc)
}
async fn table_name_from_id(&self, table_id: &TableId) -> Result<TableName, Error> {
self.get_table_name(table_id).await
}
async fn table_id_from_name(&self, name: &TableName) -> Result<TableId, Error> {
self.get_opt_table_id_from_name(name)
.await?
.with_context(|| TableNotFoundSnafu {
name: name.join("."),
})
}
}
impl ManagedTableSource {
impl TableSource {
pub fn new(table_info_manager: TableInfoManager, table_name_manager: TableNameManager) -> Self {
ManagedTableSource {
TableSource {
table_info_manager,
table_name_manager,
}
@@ -103,10 +63,7 @@ impl ManagedTableSource {
}
/// If the table haven't been created in database, the tableId returned would be null
pub async fn get_opt_table_id_from_name(
&self,
name: &TableName,
) -> Result<Option<TableId>, Error> {
pub async fn get_table_id_from_name(&self, name: &TableName) -> Result<Option<TableId>, Error> {
let ret = self
.table_name_manager
.get(TableNameKey::new(&name[0], &name[1], &name[2]))
@@ -169,117 +126,3 @@ impl ManagedTableSource {
Ok((table_name, desc))
}
}
impl std::fmt::Debug for ManagedTableSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("KvBackendTableSource").finish()
}
}
#[cfg(test)]
pub(crate) mod test {
use std::collections::HashMap;
use datatypes::data_type::ConcreteDataType as CDT;
use super::*;
use crate::repr::{ColumnType, RelationType};
pub struct FlowDummyTableSource {
pub id_names_to_desc: Vec<(TableId, TableName, RelationDesc)>,
id_to_idx: HashMap<TableId, usize>,
name_to_idx: HashMap<TableName, usize>,
}
impl Default for FlowDummyTableSource {
fn default() -> Self {
let id_names_to_desc = vec![
(
1024,
[
"greptime".to_string(),
"public".to_string(),
"numbers".to_string(),
],
RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)])
.into_named(vec![Some("number".to_string())]),
),
(
1025,
[
"greptime".to_string(),
"public".to_string(),
"numbers_with_ts".to_string(),
],
RelationType::new(vec![
ColumnType::new(CDT::uint32_datatype(), false),
ColumnType::new(CDT::timestamp_millisecond_datatype(), false),
])
.into_named(vec![Some("number".to_string()), Some("ts".to_string())]),
),
];
let id_to_idx = id_names_to_desc
.iter()
.enumerate()
.map(|(idx, (id, _name, _desc))| (*id, idx))
.collect();
let name_to_idx = id_names_to_desc
.iter()
.enumerate()
.map(|(idx, (_id, name, _desc))| (name.clone(), idx))
.collect();
Self {
id_names_to_desc,
id_to_idx,
name_to_idx,
}
}
}
#[async_trait::async_trait]
impl FlowTableSource for FlowDummyTableSource {
async fn table_from_id(&self, table_id: &TableId) -> Result<RelationDesc, Error> {
let idx = self.id_to_idx.get(table_id).context(TableNotFoundSnafu {
name: format!("Table id = {:?}, couldn't found table desc", table_id),
})?;
let desc = self
.id_names_to_desc
.get(*idx)
.map(|x| x.2.clone())
.context(TableNotFoundSnafu {
name: format!("Table id = {:?}, couldn't found table desc", table_id),
})?;
Ok(desc)
}
async fn table_name_from_id(&self, table_id: &TableId) -> Result<TableName, Error> {
let idx = self.id_to_idx.get(table_id).context(TableNotFoundSnafu {
name: format!("Table id = {:?}, couldn't found table desc", table_id),
})?;
self.id_names_to_desc
.get(*idx)
.map(|x| x.1.clone())
.context(TableNotFoundSnafu {
name: format!("Table id = {:?}, couldn't found table desc", table_id),
})
}
async fn table_id_from_name(&self, name: &TableName) -> Result<TableId, Error> {
for (id, table_name, _desc) in &self.id_names_to_desc {
if name == table_name {
return Ok(*id);
}
}
TableNotFoundSnafu {
name: format!("Table name = {:?}, couldn't found table id", name),
}
.fail()?
}
}
impl std::fmt::Debug for FlowDummyTableSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DummyTableSource").finish()
}
}
}

View File

@@ -247,25 +247,15 @@ impl<'s> Worker<'s> {
src_recvs: Vec<broadcast::Receiver<Batch>>,
// TODO(discord9): set expire duration for all arrangement and compare to sys timestamp instead
expire_after: Option<repr::Duration>,
or_replace: bool,
create_if_not_exists: bool,
err_collector: ErrCollector,
) -> Result<Option<FlowId>, Error> {
let already_exist = self.task_states.contains_key(&flow_id);
match (create_if_not_exists, or_replace, already_exist) {
// if replace, ignore that old flow exists
(_, true, true) => {
info!("Replacing flow with id={}", flow_id);
}
(false, false, true) => FlowAlreadyExistSnafu { id: flow_id }.fail()?,
// already exists, and not replace, return None
(true, false, true) => {
info!("Flow with id={} already exists, do nothing", flow_id);
return Ok(None);
}
// continue as normal
(_, _, false) => (),
}
let already_exists = self.task_states.contains_key(&flow_id);
match (already_exists, create_if_not_exists) {
(true, true) => return Ok(None),
(true, false) => FlowAlreadyExistSnafu { id: flow_id }.fail()?,
(false, _) => (),
};
let mut cur_task_state = ActiveDataflowState::<'s> {
err_collector,
@@ -351,7 +341,6 @@ impl<'s> Worker<'s> {
source_ids,
src_recvs,
expire_after,
or_replace,
create_if_not_exists,
err_collector,
} => {
@@ -363,7 +352,6 @@ impl<'s> Worker<'s> {
&source_ids,
src_recvs,
expire_after,
or_replace,
create_if_not_exists,
err_collector,
);
@@ -410,7 +398,6 @@ pub enum Request {
source_ids: Vec<GlobalId>,
src_recvs: Vec<broadcast::Receiver<Batch>>,
expire_after: Option<repr::Duration>,
or_replace: bool,
create_if_not_exists: bool,
err_collector: ErrCollector,
},
@@ -560,7 +547,6 @@ mod test {
source_ids: src_ids,
src_recvs: vec![rx],
expire_after: None,
or_replace: false,
create_if_not_exists: true,
err_collector: ErrCollector::default(),
};

View File

@@ -32,27 +32,6 @@ use crate::expr::EvalError;
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display(
"Failed to insert into flow: region_id={}, flow_ids={:?}",
region_id,
flow_ids
))]
InsertIntoFlow {
region_id: u64,
flow_ids: Vec<u64>,
source: BoxedError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Error encountered while creating flow: {sql}"))]
CreateFlow {
sql: String,
source: BoxedError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("External error"))]
External {
source: BoxedError,
@@ -228,17 +207,16 @@ pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Self::Eval { .. }
| Self::JoinTask { .. }
| Self::Datafusion { .. }
| Self::InsertIntoFlow { .. } => StatusCode::Internal,
Self::Eval { .. } | Self::JoinTask { .. } | Self::Datafusion { .. } => {
StatusCode::Internal
}
Self::FlowAlreadyExist { .. } => StatusCode::TableAlreadyExists,
Self::TableNotFound { .. }
| Self::TableNotFoundMeta { .. }
| Self::FlowNotFound { .. }
| Self::ListFlows { .. } => StatusCode::TableNotFound,
Self::Plan { .. } | Self::Datatypes { .. } => StatusCode::PlanQuery,
Self::InvalidQuery { .. } | Self::CreateFlow { .. } => StatusCode::EngineExecuteQuery,
Self::InvalidQuery { .. } => StatusCode::EngineExecuteQuery,
Self::Unexpected { .. } => StatusCode::Unexpected,
Self::NotImplemented { .. } | Self::UnsupportedTemporalFilter { .. } => {
StatusCode::Unsupported

View File

@@ -50,8 +50,8 @@ use tonic::{Request, Response, Status};
use crate::adapter::{CreateFlowArgs, FlowWorkerManagerRef};
use crate::error::{
to_status_with_last_err, CacheRequiredSnafu, CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu,
ListFlowsSnafu, ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu,
to_status_with_last_err, CacheRequiredSnafu, ExternalSnafu, FlowNotFoundSnafu, ListFlowsSnafu,
ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu,
};
use crate::heartbeat::HeartbeatTask;
use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS};
@@ -392,13 +392,7 @@ impl FlownodeBuilder {
.build(),
),
};
manager
.create_flow(args)
.await
.map_err(BoxedError::new)
.with_context(|_| CreateFlowSnafu {
sql: info.raw_sql().clone(),
})?;
manager.create_flow(args).await?;
}
Ok(cnt)

View File

@@ -173,11 +173,12 @@ mod test {
use super::*;
use crate::adapter::node_context::IdToNameMap;
use crate::adapter::table_source::test::FlowDummyTableSource;
use crate::df_optimizer::apply_df_optimizer;
use crate::expr::GlobalId;
use crate::repr::{ColumnType, RelationType};
pub fn create_test_ctx() -> FlownodeContext {
let mut schemas = HashMap::new();
let mut tri_map = IdToNameMap::new();
{
let gid = GlobalId::User(0);
@@ -186,7 +187,10 @@ mod test {
"public".to_string(),
"numbers".to_string(),
];
let schema = RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]);
tri_map.insert(Some(name.clone()), Some(1024), gid);
schemas.insert(gid, schema.into_named(vec![Some("number".to_string())]));
}
{
@@ -196,16 +200,23 @@ mod test {
"public".to_string(),
"numbers_with_ts".to_string(),
];
let schema = RelationType::new(vec![
ColumnType::new(CDT::uint32_datatype(), false),
ColumnType::new(CDT::timestamp_millisecond_datatype(), false),
]);
schemas.insert(
gid,
schema.into_named(vec![Some("number".to_string()), Some("ts".to_string())]),
);
tri_map.insert(Some(name.clone()), Some(1025), gid);
}
let dummy_source = FlowDummyTableSource::default();
let mut ctx = FlownodeContext::new(Box::new(dummy_source));
ctx.table_repr = tri_map;
ctx.query_context = Some(Arc::new(QueryContext::with("greptime", "public")));
ctx
FlownodeContext {
schema: schemas,
table_repr: tri_map,
query_context: Some(Arc::new(QueryContext::with("greptime", "public"))),
..Default::default()
}
}
pub fn create_test_query_engine() -> Arc<dyn QueryEngine> {

View File

@@ -128,11 +128,7 @@ impl AggregateExpr {
}
if args.len() != 1 {
let fn_name = extensions.get(&f.function_reference).cloned();
return not_impl_err!(
"Aggregated function (name={:?}) with multiple arguments is not supported",
fn_name
);
return not_impl_err!("Aggregated function with multiple arguments is not supported");
}
let arg = if let Some(first) = args.first() {

View File

@@ -176,7 +176,7 @@ impl TypedPlan {
}
.fail()?,
};
let table = ctx.table(&table_reference).await?;
let table = ctx.table(&table_reference)?;
let get_table = Plan::Get {
id: crate::expr::Id::Global(table.0),
};

View File

@@ -34,8 +34,6 @@ common-version.workspace = true
common-wal.workspace = true
dashmap.workspace = true
datatypes.workspace = true
deadpool.workspace = true
deadpool-postgres.workspace = true
derive_builder.workspace = true
etcd-client.workspace = true
futures.workspace = true

View File

@@ -29,8 +29,6 @@ use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
#[cfg(feature = "pg_kvbackend")]
use common_telemetry::error;
use common_telemetry::info;
#[cfg(feature = "pg_kvbackend")]
use deadpool_postgres::{Config, Runtime};
use etcd_client::Client;
use futures::future;
use servers::configurator::ConfiguratorRef;
@@ -53,6 +51,9 @@ use crate::election::etcd::EtcdElection;
use crate::election::postgres::PgElection;
#[cfg(feature = "pg_kvbackend")]
use crate::election::CANDIDATE_LEASE_SECS;
#[cfg(feature = "pg_kvbackend")]
use crate::error::InvalidArgumentsSnafu;
use crate::error::{InitExportMetricsTaskSnafu, TomlFormatSnafu};
use crate::metasrv::builder::MetasrvBuilder;
use crate::metasrv::{BackendImpl, Metasrv, MetasrvOptions, SelectorRef};
use crate::selector::lease_based::LeaseBasedSelector;
@@ -85,14 +86,14 @@ impl MetasrvInstance {
let httpsrv = Arc::new(
HttpServerBuilder::new(opts.http.clone())
.with_metrics_handler(MetricsHandler)
.with_greptime_config_options(opts.to_toml().context(error::TomlFormatSnafu)?)
.with_greptime_config_options(opts.to_toml().context(TomlFormatSnafu)?)
.build(),
);
let metasrv = Arc::new(metasrv);
// put metasrv into plugins for later use
plugins.insert::<Arc<Metasrv>>(metasrv.clone());
let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
.context(error::InitExportMetricsTaskSnafu)?;
.context(InitExportMetricsTaskSnafu)?;
Ok(MetasrvInstance {
metasrv,
httpsrv,
@@ -107,7 +108,7 @@ impl MetasrvInstance {
self.metasrv.try_start().await?;
if let Some(t) = self.export_metrics_task.as_ref() {
t.start(None).context(error::InitExportMetricsTaskSnafu)?
t.start(None).context(InitExportMetricsTaskSnafu)?
}
let (tx, rx) = mpsc::channel::<()>(1);
@@ -228,11 +229,10 @@ pub async fn metasrv_builder(
}
#[cfg(feature = "pg_kvbackend")]
(None, BackendImpl::PostgresStore) => {
let pool = create_postgres_pool(opts).await?;
let kv_backend = PgStore::with_pg_pool(pool)
let pg_client = create_postgres_client(opts).await?;
let kv_backend = PgStore::with_pg_client(pg_client)
.await
.context(error::KvBackendSnafu)?;
// Client for election should be created separately since we need a different session keep-alive idle time.
let election_client = create_postgres_client(opts).await?;
let election = PgElection::with_pg_client(
opts.server_addr.clone(),
@@ -287,12 +287,9 @@ async fn create_etcd_client(opts: &MetasrvOptions) -> Result<Client> {
#[cfg(feature = "pg_kvbackend")]
async fn create_postgres_client(opts: &MetasrvOptions) -> Result<tokio_postgres::Client> {
let postgres_url = opts
.store_addrs
.first()
.context(error::InvalidArgumentsSnafu {
err_msg: "empty store addrs",
})?;
let postgres_url = opts.store_addrs.first().context(InvalidArgumentsSnafu {
err_msg: "empty store addrs",
})?;
let (client, connection) = tokio_postgres::connect(postgres_url, NoTls)
.await
.context(error::ConnectPostgresSnafu)?;
@@ -304,19 +301,3 @@ async fn create_postgres_client(opts: &MetasrvOptions) -> Result<tokio_postgres:
});
Ok(client)
}
#[cfg(feature = "pg_kvbackend")]
async fn create_postgres_pool(opts: &MetasrvOptions) -> Result<deadpool_postgres::Pool> {
let postgres_url = opts
.store_addrs
.first()
.context(error::InvalidArgumentsSnafu {
err_msg: "empty store addrs",
})?;
let mut cfg = Config::new();
cfg.url = Some(postgres_url.to_string());
let pool = cfg
.create_pool(Some(Runtime::Tokio1), NoTls)
.context(error::CreatePostgresPoolSnafu)?;
Ok(pool)
}

View File

@@ -704,7 +704,7 @@ pub enum Error {
},
#[cfg(feature = "pg_kvbackend")]
#[snafu(display("Failed to connect to Postgres"))]
#[snafu(display("Failed to connect to PostgresSQL"))]
ConnectPostgres {
#[snafu(source)]
error: tokio_postgres::Error,
@@ -712,23 +712,6 @@ pub enum Error {
location: Location,
},
#[cfg(feature = "pg_kvbackend")]
#[snafu(display("Failed to create connection pool for Postgres"))]
CreatePostgresPool {
#[snafu(source)]
error: deadpool_postgres::CreatePoolError,
#[snafu(implicit)]
location: Location,
},
#[cfg(feature = "pg_kvbackend")]
#[snafu(display("Failed to get connection from Postgres pool: {}", reason))]
GetPostgresConnection {
reason: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Handler not found: {}", name))]
HandlerNotFound {
name: String,
@@ -860,10 +843,9 @@ impl ErrorExt for Error {
Error::Other { source, .. } => source.status_code(),
Error::LookupPeer { source, .. } => source.status_code(),
#[cfg(feature = "pg_kvbackend")]
Error::CreatePostgresPool { .. }
| Error::GetPostgresConnection { .. }
| Error::PostgresExecution { .. }
| Error::ConnectPostgres { .. } => StatusCode::Internal,
Error::ConnectPostgres { .. } => StatusCode::Internal,
#[cfg(feature = "pg_kvbackend")]
Error::PostgresExecution { .. } => StatusCode::Internal,
}
}

View File

@@ -93,15 +93,15 @@ pub struct MitoConfig {
pub page_cache_size: ReadableSize,
/// Cache size for time series selector (e.g. `last_value()`). Setting it to 0 to disable the cache.
pub selector_result_cache_size: ReadableSize,
/// Whether to enable the write cache.
pub enable_write_cache: bool,
/// Whether to enable the experimental write cache.
pub enable_experimental_write_cache: bool,
/// File system path for write cache dir's root, defaults to `{data_home}`.
pub write_cache_path: String,
pub experimental_write_cache_path: String,
/// Capacity for write cache.
pub write_cache_size: ReadableSize,
pub experimental_write_cache_size: ReadableSize,
/// TTL for write cache.
#[serde(with = "humantime_serde")]
pub write_cache_ttl: Option<Duration>,
pub experimental_write_cache_ttl: Option<Duration>,
// Other configs:
/// Buffer size for SST writing.
@@ -147,10 +147,10 @@ impl Default for MitoConfig {
vector_cache_size: ReadableSize::mb(512),
page_cache_size: ReadableSize::mb(512),
selector_result_cache_size: ReadableSize::mb(512),
enable_write_cache: false,
write_cache_path: String::new(),
write_cache_size: ReadableSize::gb(5),
write_cache_ttl: None,
enable_experimental_write_cache: false,
experimental_write_cache_path: String::new(),
experimental_write_cache_size: ReadableSize::gb(5),
experimental_write_cache_ttl: None,
sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
allow_stale_entries: false,
@@ -234,8 +234,8 @@ impl MitoConfig {
}
// Sets write cache path if it is empty.
if self.write_cache_path.trim().is_empty() {
self.write_cache_path = data_home.to_string();
if self.experimental_write_cache_path.trim().is_empty() {
self.experimental_write_cache_path = data_home.to_string();
}
self.index.sanitize(data_home, &self.inverted_index)?;
@@ -268,7 +268,7 @@ impl MitoConfig {
self.selector_result_cache_size = mem_cache_size;
}
/// Enable write cache.
/// Enable experimental write cache.
#[cfg(test)]
pub fn enable_write_cache(
mut self,
@@ -276,10 +276,10 @@ impl MitoConfig {
size: ReadableSize,
ttl: Option<Duration>,
) -> Self {
self.enable_write_cache = true;
self.write_cache_path = path;
self.write_cache_size = size;
self.write_cache_ttl = ttl;
self.enable_experimental_write_cache = true;
self.experimental_write_cache_path = path;
self.experimental_write_cache_size = size;
self.experimental_write_cache_ttl = ttl;
self
}
}
@@ -443,7 +443,7 @@ impl Default for InvertedIndexConfig {
intermediate_path: String::new(),
metadata_cache_size: ReadableSize::mb(64),
content_cache_size: ReadableSize::mb(128),
content_cache_page_size: ReadableSize::kb(64),
content_cache_page_size: ReadableSize::mb(8),
};
if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {

View File

@@ -140,7 +140,7 @@ async fn test_edit_region_fill_cache() {
.create_engine_with(
MitoConfig {
// Write cache must be enabled to download the ingested SST file.
enable_write_cache: true,
enable_experimental_write_cache: true,
..Default::default()
},
None,

View File

@@ -16,7 +16,7 @@ use std::path::PathBuf;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_telemetry::{debug, warn};
use common_telemetry::warn;
use futures::{AsyncRead, AsyncWrite};
use index::error as index_error;
use index::error::Result as IndexResult;
@@ -189,8 +189,7 @@ impl ExternalTempFileProvider for TempFileProvider {
for entry in entries {
if entry.metadata().is_dir() {
// todo(hl): we can keep this warning once we find a way to filter self in list result.
debug!("Unexpected entry in index creation dir: {:?}", entry.path());
warn!("Unexpected entry in index creation dir: {:?}", entry.path());
continue;
}

View File

@@ -365,20 +365,23 @@ async fn write_cache_from_config(
puffin_manager_factory: PuffinManagerFactory,
intermediate_manager: IntermediateManager,
) -> Result<Option<WriteCacheRef>> {
if !config.enable_write_cache {
if !config.enable_experimental_write_cache {
return Ok(None);
}
tokio::fs::create_dir_all(Path::new(&config.write_cache_path))
// TODO(yingwen): Remove this and document the config once the write cache is ready.
warn!("Write cache is an experimental feature");
tokio::fs::create_dir_all(Path::new(&config.experimental_write_cache_path))
.await
.context(CreateDirSnafu {
dir: &config.write_cache_path,
dir: &config.experimental_write_cache_path,
})?;
let cache = WriteCache::new_fs(
&config.write_cache_path,
config.write_cache_size,
config.write_cache_ttl,
&config.experimental_write_cache_path,
config.experimental_write_cache_size,
config.experimental_write_cache_ttl,
puffin_manager_factory,
intermediate_manager,
)

View File

@@ -145,8 +145,10 @@ impl<S> RegionWorkerLoop<S> {
}
info!(
"Try to alter region {}, version.metadata: {:?}, request: {:?}",
region_id, version.metadata, request,
"Try to alter region {} from version {} to {}",
region_id,
version.metadata.schema_version,
region.metadata().schema_version
);
self.handle_alter_region_metadata(region, version, request, sender);
}

View File

@@ -101,10 +101,10 @@ impl<S: LogStore> RegionWorkerLoop<S> {
.version_control
.alter_schema(change_result.new_meta, &region.memtable_builder);
let version = region.version();
info!(
"Region {} is altered, metadata is {:?}, options: {:?}",
region.region_id, version.metadata, version.options,
"Region {} is altered, schema version is {}",
region.region_id,
region.metadata().schema_version
);
}

View File

@@ -477,7 +477,6 @@ pub fn column_schemas_to_defs(
.collect()
}
/// Converts a SQL alter table statement into a gRPC alter table expression.
pub(crate) fn to_alter_table_expr(
alter_table: AlterTable,
query_ctx: &QueryContextRef,
@@ -505,8 +504,6 @@ pub(crate) fn to_alter_table_expr(
.context(ExternalSnafu)?,
),
location: location.as_ref().map(From::from),
// TODO(yingwen): We don't support `IF NOT EXISTS` for `ADD COLUMN` yet.
add_if_not_exists: false,
}],
}),
AlterTableOperation::ModifyColumnType {

View File

@@ -741,8 +741,6 @@ impl Inserter {
Ok(create_table_expr)
}
/// Returns an alter table expression if it finds new columns in the request.
/// It always adds columns if not exist.
fn get_alter_table_expr_on_demand(
&self,
req: &RowInsertRequest,

View File

@@ -911,7 +911,7 @@ impl StatementExecutor {
let _ = table_info
.meta
.builder_with_alter_kind(table_name, &request.alter_kind)
.builder_with_alter_kind(table_name, &request.alter_kind, false)
.context(error::TableSnafu)?
.build()
.context(error::BuildTableMetaSnafu { table_name })?;

View File

@@ -597,8 +597,7 @@ pub struct AddColumn {
impl AddColumn {
/// Returns an error if the column to add is invalid.
///
/// It allows adding existing columns. However, the existing column must have the same metadata
/// and the location must be None.
/// It allows adding existing columns.
pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
ensure!(
self.column_metadata.column_schema.is_nullable()
@@ -616,46 +615,6 @@ impl AddColumn {
}
);
if let Some(existing_column) =
metadata.column_by_name(&self.column_metadata.column_schema.name)
{
// If the column already exists.
ensure!(
*existing_column == self.column_metadata,
InvalidRegionRequestSnafu {
region_id: metadata.region_id,
err: format!(
"column {} already exists with different metadata, existing: {:?}, got: {:?}",
self.column_metadata.column_schema.name, existing_column, self.column_metadata,
),
}
);
ensure!(
self.location.is_none(),
InvalidRegionRequestSnafu {
region_id: metadata.region_id,
err: format!(
"column {} already exists, but location is specified",
self.column_metadata.column_schema.name
),
}
);
}
if let Some(existing_column) = metadata.column_by_id(self.column_metadata.column_id) {
// Ensures the existing column has the same name.
ensure!(
existing_column.column_schema.name == self.column_metadata.column_schema.name,
InvalidRegionRequestSnafu {
region_id: metadata.region_id,
err: format!(
"column id {} already exists with different name {}",
self.column_metadata.column_id, existing_column.column_schema.name
),
}
);
}
Ok(())
}
@@ -1049,8 +1008,6 @@ mod tests {
);
}
/// Returns a new region metadata for testing. Metadata:
/// `[(ts, ms, 1), (tag_0, string, 2), (field_0, string, 3), (field_1, bool, 4)]`
fn new_metadata() -> RegionMetadata {
let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
builder
@@ -1105,7 +1062,7 @@ mod tests {
true,
),
semantic_type: SemanticType::Tag,
column_id: 5,
column_id: 4,
},
location: None,
};
@@ -1121,7 +1078,7 @@ mod tests {
false,
),
semantic_type: SemanticType::Tag,
column_id: 5,
column_id: 4,
},
location: None,
}
@@ -1137,7 +1094,7 @@ mod tests {
true,
),
semantic_type: SemanticType::Tag,
column_id: 2,
column_id: 4,
},
location: None,
};
@@ -1157,7 +1114,7 @@ mod tests {
true,
),
semantic_type: SemanticType::Tag,
column_id: 5,
column_id: 4,
},
location: None,
},
@@ -1169,7 +1126,7 @@ mod tests {
true,
),
semantic_type: SemanticType::Field,
column_id: 6,
column_id: 5,
},
location: None,
},
@@ -1180,82 +1137,6 @@ mod tests {
assert!(kind.need_alter(&metadata));
}
#[test]
fn test_add_existing_column_different_metadata() {
let metadata = new_metadata();
// Add existing column with different id.
let kind = AlterKind::AddColumns {
columns: vec![AddColumn {
column_metadata: ColumnMetadata {
column_schema: ColumnSchema::new(
"tag_0",
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Tag,
column_id: 4,
},
location: None,
}],
};
kind.validate(&metadata).unwrap_err();
// Add existing column with different type.
let kind = AlterKind::AddColumns {
columns: vec![AddColumn {
column_metadata: ColumnMetadata {
column_schema: ColumnSchema::new(
"tag_0",
ConcreteDataType::int64_datatype(),
true,
),
semantic_type: SemanticType::Tag,
column_id: 2,
},
location: None,
}],
};
kind.validate(&metadata).unwrap_err();
// Add existing column with different name.
let kind = AlterKind::AddColumns {
columns: vec![AddColumn {
column_metadata: ColumnMetadata {
column_schema: ColumnSchema::new(
"tag_1",
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Tag,
column_id: 2,
},
location: None,
}],
};
kind.validate(&metadata).unwrap_err();
}
#[test]
fn test_add_existing_column_with_location() {
let metadata = new_metadata();
let kind = AlterKind::AddColumns {
columns: vec![AddColumn {
column_metadata: ColumnMetadata {
column_schema: ColumnSchema::new(
"tag_0",
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Tag,
column_id: 2,
},
location: Some(AddColumnLocation::First),
}],
};
kind.validate(&metadata).unwrap_err();
}
#[test]
fn test_validate_drop_column() {
let metadata = new_metadata();
@@ -1354,19 +1235,19 @@ mod tests {
true,
),
semantic_type: SemanticType::Tag,
column_id: 5,
column_id: 4,
},
location: None,
},
AddColumn {
column_metadata: ColumnMetadata {
column_schema: ColumnSchema::new(
"field_2",
"field_1",
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Field,
column_id: 6,
column_id: 5,
},
location: None,
},

View File

@@ -194,9 +194,12 @@ impl TableMeta {
&self,
table_name: &str,
alter_kind: &AlterKind,
add_if_not_exists: bool,
) -> Result<TableMetaBuilder> {
match alter_kind {
AlterKind::AddColumns { columns } => self.add_columns(table_name, columns),
AlterKind::AddColumns { columns } => {
self.add_columns(table_name, columns, add_if_not_exists)
}
AlterKind::DropColumns { names } => self.remove_columns(table_name, names),
AlterKind::ModifyColumnTypes { columns } => {
self.modify_column_types(table_name, columns)
@@ -337,7 +340,6 @@ impl TableMeta {
Ok(meta_builder)
}
// TODO(yingwen): Remove this.
/// Allocate a new column for the table.
///
/// This method would bump the `next_column_id` of the meta.
@@ -382,11 +384,11 @@ impl TableMeta {
builder
}
// TODO(yingwen): Tests add if not exists.
fn add_columns(
&self,
table_name: &str,
requests: &[AddColumnRequest],
add_if_not_exists: bool,
) -> Result<TableMetaBuilder> {
let table_schema = &self.schema;
let mut meta_builder = self.new_meta_builder();
@@ -394,61 +396,63 @@ impl TableMeta {
self.primary_key_indices.iter().collect();
let mut names = HashSet::with_capacity(requests.len());
let mut new_columns = Vec::with_capacity(requests.len());
for col_to_add in requests {
if let Some(column_schema) =
table_schema.column_schema_by_name(&col_to_add.column_schema.name)
{
// If the column already exists.
ensure!(
col_to_add.add_if_not_exists,
error::ColumnExistsSnafu {
table_name,
column_name: &col_to_add.column_schema.name
},
);
// Checks if the type is the same
ensure!(
column_schema.data_type == col_to_add.column_schema.data_type,
error::InvalidAlterRequestSnafu {
table: table_name,
err: format!(
"column {} already exists with different type {:?}",
col_to_add.column_schema.name, column_schema.data_type,
),
}
);
} else {
// A new column.
// Ensures we only add a column once.
ensure!(
names.insert(&col_to_add.column_schema.name),
error::InvalidAlterRequestSnafu {
table: table_name,
err: format!(
"add column {} more than once",
col_to_add.column_schema.name
),
}
);
ensure!(
col_to_add.column_schema.is_nullable()
|| col_to_add.column_schema.default_constraint().is_some(),
error::InvalidAlterRequestSnafu {
table: table_name,
err: format!(
"no default value for column {}",
col_to_add.column_schema.name
),
},
);
new_columns.push(col_to_add.clone());
let mut new_requests = Vec::with_capacity(requests.len());
let requests = if add_if_not_exists {
for col_to_add in requests {
if let Some(column_schema) =
table_schema.column_schema_by_name(&col_to_add.column_schema.name)
{
// If the column already exists, we should check if the type is the same.
ensure!(
column_schema.data_type == col_to_add.column_schema.data_type,
error::InvalidAlterRequestSnafu {
table: table_name,
err: format!(
"column {} already exists with different type",
col_to_add.column_schema.name
),
}
);
} else {
new_requests.push(col_to_add.clone());
}
}
&new_requests[..]
} else {
requests
};
for col_to_add in requests {
ensure!(
names.insert(&col_to_add.column_schema.name),
error::InvalidAlterRequestSnafu {
table: table_name,
err: format!(
"add column {} more than once",
col_to_add.column_schema.name
),
}
);
ensure!(
!table_schema.contains_column(&col_to_add.column_schema.name),
error::ColumnExistsSnafu {
table_name,
column_name: col_to_add.column_schema.name.to_string()
},
);
ensure!(
col_to_add.column_schema.is_nullable()
|| col_to_add.column_schema.default_constraint().is_some(),
error::InvalidAlterRequestSnafu {
table: table_name,
err: format!(
"no default value for column {}",
col_to_add.column_schema.name
),
},
);
}
let requests = &new_columns[..];
let SplitResult {
columns_at_first,
@@ -877,7 +881,6 @@ pub struct RawTableMeta {
pub value_indices: Vec<usize>,
/// Engine type of this table. Usually in small case.
pub engine: String,
/// Next column id of a new column.
/// Deprecated. See https://github.com/GreptimeTeam/greptimedb/issues/2982
pub next_column_id: ColumnId,
pub region_numbers: Vec<u32>,
@@ -1075,7 +1078,6 @@ mod tests {
use super::*;
/// Create a test schema with 3 columns: `[col1 int32, ts timestampmills, col2 int32]`.
fn new_test_schema() -> Schema {
let column_schemas = vec![
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
@@ -1127,19 +1129,17 @@ mod tests {
column_schema: new_tag,
is_key: true,
location: None,
add_if_not_exists: false,
},
AddColumnRequest {
column_schema: new_field,
is_key: false,
location: None,
add_if_not_exists: false,
},
],
};
let builder = meta
.builder_with_alter_kind("my_table", &alter_kind)
.builder_with_alter_kind("my_table", &alter_kind, false)
.unwrap();
builder.build().unwrap()
}
@@ -1157,7 +1157,6 @@ mod tests {
column_schema: new_tag,
is_key: true,
location: Some(AddColumnLocation::First),
add_if_not_exists: false,
},
AddColumnRequest {
column_schema: new_field,
@@ -1165,13 +1164,12 @@ mod tests {
location: Some(AddColumnLocation::After {
column_name: "ts".to_string(),
}),
add_if_not_exists: false,
},
],
};
let builder = meta
.builder_with_alter_kind("my_table", &alter_kind)
.builder_with_alter_kind("my_table", &alter_kind, false)
.unwrap();
builder.build().unwrap()
}
@@ -1201,48 +1199,6 @@ mod tests {
assert_eq!(&[1, 2, 4], &new_meta.value_indices[..]);
}
#[test]
fn test_add_columns_multiple_times() {
let schema = Arc::new(new_test_schema());
let meta = TableMetaBuilder::default()
.schema(schema)
.primary_key_indices(vec![0])
.engine("engine")
.next_column_id(3)
.build()
.unwrap();
let alter_kind = AlterKind::AddColumns {
columns: vec![
AddColumnRequest {
column_schema: ColumnSchema::new(
"col3",
ConcreteDataType::int32_datatype(),
true,
),
is_key: true,
location: None,
add_if_not_exists: true,
},
AddColumnRequest {
column_schema: ColumnSchema::new(
"col3",
ConcreteDataType::int32_datatype(),
true,
),
is_key: true,
location: None,
add_if_not_exists: true,
},
],
};
let err = meta
.builder_with_alter_kind("my_table", &alter_kind)
.err()
.unwrap();
assert_eq!(StatusCode::InvalidArguments, err.status_code());
}
#[test]
fn test_remove_columns() {
let schema = Arc::new(new_test_schema());
@@ -1260,7 +1216,7 @@ mod tests {
names: vec![String::from("col2"), String::from("my_field")],
};
let new_meta = meta
.builder_with_alter_kind("my_table", &alter_kind)
.builder_with_alter_kind("my_table", &alter_kind, false)
.unwrap()
.build()
.unwrap();
@@ -1315,7 +1271,7 @@ mod tests {
names: vec![String::from("col3"), String::from("col1")],
};
let new_meta = meta
.builder_with_alter_kind("my_table", &alter_kind)
.builder_with_alter_kind("my_table", &alter_kind, false)
.unwrap()
.build()
.unwrap();
@@ -1351,62 +1307,14 @@ mod tests {
column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
is_key: false,
location: None,
add_if_not_exists: false,
}],
};
let err = meta
.builder_with_alter_kind("my_table", &alter_kind)
.builder_with_alter_kind("my_table", &alter_kind, false)
.err()
.unwrap();
assert_eq!(StatusCode::TableColumnExists, err.status_code());
// Add if not exists
let alter_kind = AlterKind::AddColumns {
columns: vec![AddColumnRequest {
column_schema: ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
is_key: true,
location: None,
add_if_not_exists: true,
}],
};
let new_meta = meta
.builder_with_alter_kind("my_table", &alter_kind)
.unwrap()
.build()
.unwrap();
assert_eq!(
meta.schema.column_schemas(),
new_meta.schema.column_schemas()
);
assert_eq!(meta.schema.version() + 1, new_meta.schema.version());
}
#[test]
fn test_add_different_type_column() {
let schema = Arc::new(new_test_schema());
let meta = TableMetaBuilder::default()
.schema(schema)
.primary_key_indices(vec![0])
.engine("engine")
.next_column_id(3)
.build()
.unwrap();
// Add if not exists, but different type.
let alter_kind = AlterKind::AddColumns {
columns: vec![AddColumnRequest {
column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
is_key: false,
location: None,
add_if_not_exists: true,
}],
};
let err = meta
.builder_with_alter_kind("my_table", &alter_kind)
.err()
.unwrap();
assert_eq!(StatusCode::InvalidArguments, err.status_code());
}
#[test]
@@ -1420,7 +1328,6 @@ mod tests {
.build()
.unwrap();
// Not nullable and no default value.
let alter_kind = AlterKind::AddColumns {
columns: vec![AddColumnRequest {
column_schema: ColumnSchema::new(
@@ -1430,12 +1337,11 @@ mod tests {
),
is_key: false,
location: None,
add_if_not_exists: false,
}],
};
let err = meta
.builder_with_alter_kind("my_table", &alter_kind)
.builder_with_alter_kind("my_table", &alter_kind, false)
.err()
.unwrap();
assert_eq!(StatusCode::InvalidArguments, err.status_code());
@@ -1457,7 +1363,7 @@ mod tests {
};
let err = meta
.builder_with_alter_kind("my_table", &alter_kind)
.builder_with_alter_kind("my_table", &alter_kind, false)
.err()
.unwrap();
assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
@@ -1482,7 +1388,7 @@ mod tests {
};
let err = meta
.builder_with_alter_kind("my_table", &alter_kind)
.builder_with_alter_kind("my_table", &alter_kind, false)
.err()
.unwrap();
assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
@@ -1505,7 +1411,7 @@ mod tests {
};
let err = meta
.builder_with_alter_kind("my_table", &alter_kind)
.builder_with_alter_kind("my_table", &alter_kind, false)
.err()
.unwrap();
assert_eq!(StatusCode::InvalidArguments, err.status_code());
@@ -1516,7 +1422,7 @@ mod tests {
};
let err = meta
.builder_with_alter_kind("my_table", &alter_kind)
.builder_with_alter_kind("my_table", &alter_kind, false)
.err()
.unwrap();
assert_eq!(StatusCode::InvalidArguments, err.status_code());
@@ -1542,7 +1448,7 @@ mod tests {
};
let err = meta
.builder_with_alter_kind("my_table", &alter_kind)
.builder_with_alter_kind("my_table", &alter_kind, false)
.err()
.unwrap();
assert_eq!(StatusCode::InvalidArguments, err.status_code());
@@ -1556,7 +1462,7 @@ mod tests {
};
let err = meta
.builder_with_alter_kind("my_table", &alter_kind)
.builder_with_alter_kind("my_table", &alter_kind, false)
.err()
.unwrap();
assert_eq!(StatusCode::InvalidArguments, err.status_code());
@@ -1625,7 +1531,7 @@ mod tests {
options: FulltextOptions::default(),
};
let err = meta
.builder_with_alter_kind("my_table", &alter_kind)
.builder_with_alter_kind("my_table", &alter_kind, false)
.err()
.unwrap();
assert_eq!(
@@ -1646,7 +1552,7 @@ mod tests {
},
};
let new_meta = new_meta
.builder_with_alter_kind("my_table", &alter_kind)
.builder_with_alter_kind("my_table", &alter_kind, false)
.unwrap()
.build()
.unwrap();
@@ -1666,7 +1572,7 @@ mod tests {
column_name: "my_tag_first".to_string(),
};
let new_meta = new_meta
.builder_with_alter_kind("my_table", &alter_kind)
.builder_with_alter_kind("my_table", &alter_kind, false)
.unwrap()
.build()
.unwrap();

View File

@@ -185,8 +185,6 @@ pub struct AddColumnRequest {
pub column_schema: ColumnSchema,
pub is_key: bool,
pub location: Option<AddColumnLocation>,
/// Add column if not exists.
pub add_if_not_exists: bool,
}
/// Change column datatype request

View File

@@ -66,190 +66,12 @@ mod test {
test_handle_ddl_request(instance.as_ref()).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_distributed_handle_multi_ddl_request() {
common_telemetry::init_default_ut_logging();
let instance =
tests::create_distributed_instance("test_distributed_handle_multi_ddl_request").await;
test_handle_multi_ddl_request(instance.frontend().as_ref()).await;
verify_table_is_dropped(&instance).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_standalone_handle_multi_ddl_request() {
let standalone =
GreptimeDbStandaloneBuilder::new("test_standalone_handle_multi_ddl_request")
.build()
.await;
let instance = &standalone.instance;
test_handle_multi_ddl_request(instance.as_ref()).await;
}
async fn query(instance: &Instance, request: Request) -> Output {
GrpcQueryHandler::do_query(instance, request, QueryContext::arc())
.await
.unwrap()
}
async fn test_handle_multi_ddl_request(instance: &Instance) {
let request = Request::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr {
catalog_name: "greptime".to_string(),
schema_name: "database_created_through_grpc".to_string(),
create_if_not_exists: true,
options: Default::default(),
})),
});
let output = query(instance, request).await;
assert!(matches!(output.data, OutputData::AffectedRows(1)));
let request = Request::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateTable(CreateTableExpr {
catalog_name: "greptime".to_string(),
schema_name: "database_created_through_grpc".to_string(),
table_name: "table_created_through_grpc".to_string(),
column_defs: vec![
ColumnDef {
name: "a".to_string(),
data_type: ColumnDataType::String as _,
is_nullable: true,
default_constraint: vec![],
semantic_type: SemanticType::Field as i32,
..Default::default()
},
ColumnDef {
name: "ts".to_string(),
data_type: ColumnDataType::TimestampMillisecond as _,
is_nullable: false,
default_constraint: vec![],
semantic_type: SemanticType::Timestamp as i32,
..Default::default()
},
],
time_index: "ts".to_string(),
engine: MITO_ENGINE.to_string(),
..Default::default()
})),
});
let output = query(instance, request).await;
assert!(matches!(output.data, OutputData::AffectedRows(0)));
let request = Request::Ddl(DdlRequest {
expr: Some(DdlExpr::AlterTable(AlterTableExpr {
catalog_name: "greptime".to_string(),
schema_name: "database_created_through_grpc".to_string(),
table_name: "table_created_through_grpc".to_string(),
kind: Some(alter_table_expr::Kind::AddColumns(AddColumns {
add_columns: vec![
AddColumn {
column_def: Some(ColumnDef {
name: "b".to_string(),
data_type: ColumnDataType::Int32 as _,
is_nullable: true,
default_constraint: vec![],
semantic_type: SemanticType::Field as i32,
..Default::default()
}),
location: None,
add_if_not_exists: true,
},
AddColumn {
column_def: Some(ColumnDef {
name: "a".to_string(),
data_type: ColumnDataType::String as _,
is_nullable: true,
default_constraint: vec![],
semantic_type: SemanticType::Field as i32,
..Default::default()
}),
location: None,
add_if_not_exists: true,
},
],
})),
})),
});
let output = query(instance, request).await;
assert!(matches!(output.data, OutputData::AffectedRows(0)));
let request = Request::Ddl(DdlRequest {
expr: Some(DdlExpr::AlterTable(AlterTableExpr {
catalog_name: "greptime".to_string(),
schema_name: "database_created_through_grpc".to_string(),
table_name: "table_created_through_grpc".to_string(),
kind: Some(alter_table_expr::Kind::AddColumns(AddColumns {
add_columns: vec![
AddColumn {
column_def: Some(ColumnDef {
name: "c".to_string(),
data_type: ColumnDataType::Int32 as _,
is_nullable: true,
default_constraint: vec![],
semantic_type: SemanticType::Field as i32,
..Default::default()
}),
location: None,
add_if_not_exists: true,
},
AddColumn {
column_def: Some(ColumnDef {
name: "d".to_string(),
data_type: ColumnDataType::Int32 as _,
is_nullable: true,
default_constraint: vec![],
semantic_type: SemanticType::Field as i32,
..Default::default()
}),
location: None,
add_if_not_exists: true,
},
],
})),
})),
});
let output = query(instance, request).await;
assert!(matches!(output.data, OutputData::AffectedRows(0)));
let request = Request::Query(QueryRequest {
query: Some(Query::Sql("INSERT INTO database_created_through_grpc.table_created_through_grpc (a, b, c, d, ts) VALUES ('s', 1, 1, 1, 1672816466000)".to_string()))
});
let output = query(instance, request).await;
assert!(matches!(output.data, OutputData::AffectedRows(1)));
let request = Request::Query(QueryRequest {
query: Some(Query::Sql(
"SELECT ts, a, b FROM database_created_through_grpc.table_created_through_grpc"
.to_string(),
)),
});
let output = query(instance, request).await;
let OutputData::Stream(stream) = output.data else {
unreachable!()
};
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+---------------------+---+---+
| ts | a | b |
+---------------------+---+---+
| 2023-01-04T07:14:26 | s | 1 |
+---------------------+---+---+";
assert_eq!(recordbatches.pretty_print().unwrap(), expected);
let request = Request::Ddl(DdlRequest {
expr: Some(DdlExpr::DropTable(DropTableExpr {
catalog_name: "greptime".to_string(),
schema_name: "database_created_through_grpc".to_string(),
table_name: "table_created_through_grpc".to_string(),
..Default::default()
})),
});
let output = query(instance, request).await;
assert!(matches!(output.data, OutputData::AffectedRows(0)));
}
async fn test_handle_ddl_request(instance: &Instance) {
let request = Request::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr {
@@ -309,7 +131,6 @@ mod test {
..Default::default()
}),
location: None,
add_if_not_exists: false,
}],
})),
})),

View File

@@ -372,7 +372,6 @@ pub async fn test_insert_and_select(store_type: StorageType) {
add_columns: vec![AddColumn {
column_def: Some(add_column),
location: None,
add_if_not_exists: false,
}],
});
let expr = AlterTableExpr {

View File

@@ -928,9 +928,9 @@ worker_request_batch_size = 64
manifest_checkpoint_distance = 10
compress_manifest = false
auto_flush_interval = "30m"
enable_write_cache = false
write_cache_path = ""
write_cache_size = "5GiB"
enable_experimental_write_cache = false
experimental_write_cache_path = ""
experimental_write_cache_size = "5GiB"
sst_write_buffer_size = "8MiB"
parallel_scan_channel_size = 32
allow_stale_entries = false
@@ -946,7 +946,7 @@ create_on_flush = "auto"
create_on_compaction = "auto"
apply_on_query = "auto"
mem_threshold_on_create = "auto"
content_cache_page_size = "64KiB"
content_cache_page_size = "8MiB"
[region_engine.mito.fulltext_index]
create_on_flush = "auto"

View File

@@ -32,7 +32,7 @@ INSERT INTO test VALUES ('hello', '2020-01-01 00:00:00'),
Affected Rows: 4
SELECT * FROM test WHERE MATCHES(message, 'hello') ORDER BY message;
SELECT * FROM test WHERE MATCHES(message, 'hello');
+-------------+---------------------+
| message | time |
@@ -46,7 +46,7 @@ ALTER TABLE test MODIFY COLUMN message SET FULLTEXT WITH(analyzer = 'Chinese', c
Affected Rows: 0
SELECT * FROM test WHERE MATCHES(message, 'hello') ORDER BY message;
SELECT * FROM test WHERE MATCHES(message, 'hello');
+-------------+---------------------+
| message | time |
@@ -63,15 +63,15 @@ INSERT INTO test VALUES ('hello NiKo', '2020-01-03 00:00:00'),
Affected Rows: 4
SELECT * FROM test WHERE MATCHES(message, 'hello') ORDER BY message;
SELECT * FROM test WHERE MATCHES(message, 'hello');
+-------------+---------------------+
| message | time |
+-------------+---------------------+
| NiKo hello | 2020-01-03T00:00:01 |
| hello | 2020-01-01T00:00:00 |
| hello NiKo | 2020-01-03T00:00:00 |
| NiKo hello | 2020-01-03T00:00:01 |
| hello hello | 2020-01-04T00:00:00 |
| hello | 2020-01-01T00:00:00 |
| hello world | 2020-01-02T00:00:00 |
| world hello | 2020-01-02T00:00:01 |
+-------------+---------------------+

View File

@@ -13,18 +13,18 @@ INSERT INTO test VALUES ('hello', '2020-01-01 00:00:00'),
('hello world', '2020-01-02 00:00:00'),
('world hello', '2020-01-02 00:00:01');
SELECT * FROM test WHERE MATCHES(message, 'hello') ORDER BY message;
SELECT * FROM test WHERE MATCHES(message, 'hello');
ALTER TABLE test MODIFY COLUMN message SET FULLTEXT WITH(analyzer = 'Chinese', case_sensitive = 'true');
SELECT * FROM test WHERE MATCHES(message, 'hello') ORDER BY message;
SELECT * FROM test WHERE MATCHES(message, 'hello');
INSERT INTO test VALUES ('hello NiKo', '2020-01-03 00:00:00'),
('NiKo hello', '2020-01-03 00:00:01'),
('hello hello', '2020-01-04 00:00:00'),
('NiKo, NiKo', '2020-01-04 00:00:01');
SELECT * FROM test WHERE MATCHES(message, 'hello') ORDER BY message;
SELECT * FROM test WHERE MATCHES(message, 'hello');
-- SQLNESS ARG restart=true
SHOW CREATE TABLE test;

View File

@@ -1,107 +0,0 @@
-- test if flow can get table schema correctly after table have been altered
CREATE TABLE bytes_log (
byte INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-- event time
TIME INDEX(ts)
);
Affected Rows: 0
CREATE TABLE approx_rate (
rate DOUBLE,
time_window TIMESTAMP,
update_at TIMESTAMP,
TIME INDEX(time_window)
);
Affected Rows: 0
-- make both src&sink table in cache of flownode by using them
CREATE FLOW find_approx_rate SINK TO approx_rate AS
SELECT
(max(byte) - min(byte)) / 30.0 as rate,
date_bin(INTERVAL '30 second', ts) as time_window
from
bytes_log
GROUP BY
time_window;
Affected Rows: 0
SHOW CREATE FLOW find_approx_rate;
+------------------+-----------------------------------------------------------------------------------------------------------------------------------------+
| Flow | Create Flow |
+------------------+-----------------------------------------------------------------------------------------------------------------------------------------+
| find_approx_rate | CREATE FLOW IF NOT EXISTS find_approx_rate |
| | SINK TO approx_rate |
| | AS SELECT (max(byte) - min(byte)) / 30.0 AS rate, date_bin(INTERVAL '30 second', ts) AS time_window FROM bytes_log GROUP BY time_window |
+------------------+-----------------------------------------------------------------------------------------------------------------------------------------+
DROP FLOW find_approx_rate;
Affected Rows: 0
ALTER TABLE bytes_log ADD COLUMN stat INT DEFAULT 200 AFTER byte;
Affected Rows: 0
ALTER TABLE approx_rate ADD COLUMN sample_cnt INT64 DEFAULT 0 AFTER rate;
Affected Rows: 0
CREATE FLOW find_approx_rate SINK TO approx_rate AS
SELECT
(max(byte) - min(byte)) / 30.0 as rate,
count(byte) as sample_cnt,
date_bin(INTERVAL '30 second', ts) as time_window
from
bytes_log
GROUP BY
time_window;
Affected Rows: 0
INSERT INTO
bytes_log
VALUES
(0, 200, '2023-01-01 00:00:01'),
(300,200, '2023-01-01 00:00:29');
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('find_approx_rate');
+--------------------------------------+
| ADMIN FLUSH_FLOW('find_approx_rate') |
+--------------------------------------+
| FLOW_FLUSHED |
+--------------------------------------+
SELECT
rate,
sample_cnt,
time_window
FROM
approx_rate;
+------+------------+---------------------+
| rate | sample_cnt | time_window |
+------+------------+---------------------+
| 10.0 | 2 | 2023-01-01T00:00:00 |
+------+------------+---------------------+
DROP TABLE bytes_log;
Affected Rows: 0
DROP FLOW find_approx_rate;
Affected Rows: 0
DROP TABLE approx_rate;
Affected Rows: 0

View File

@@ -1,64 +0,0 @@
-- test if flow can get table schema correctly after table have been altered
CREATE TABLE bytes_log (
byte INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-- event time
TIME INDEX(ts)
);
CREATE TABLE approx_rate (
rate DOUBLE,
time_window TIMESTAMP,
update_at TIMESTAMP,
TIME INDEX(time_window)
);
-- make both src&sink table in cache of flownode by using them
CREATE FLOW find_approx_rate SINK TO approx_rate AS
SELECT
(max(byte) - min(byte)) / 30.0 as rate,
date_bin(INTERVAL '30 second', ts) as time_window
from
bytes_log
GROUP BY
time_window;
SHOW CREATE FLOW find_approx_rate;
DROP FLOW find_approx_rate;
ALTER TABLE bytes_log ADD COLUMN stat INT DEFAULT 200 AFTER byte;
ALTER TABLE approx_rate ADD COLUMN sample_cnt INT64 DEFAULT 0 AFTER rate;
CREATE FLOW find_approx_rate SINK TO approx_rate AS
SELECT
(max(byte) - min(byte)) / 30.0 as rate,
count(byte) as sample_cnt,
date_bin(INTERVAL '30 second', ts) as time_window
from
bytes_log
GROUP BY
time_window;
INSERT INTO
bytes_log
VALUES
(0, 200, '2023-01-01 00:00:01'),
(300,200, '2023-01-01 00:00:29');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('find_approx_rate');
SELECT
rate,
sample_cnt,
time_window
FROM
approx_rate;
DROP TABLE bytes_log;
DROP FLOW find_approx_rate;
DROP TABLE approx_rate;

View File

@@ -1,185 +0,0 @@
-- test if reordered insert is correctly handled
CREATE TABLE bytes_log (
byte INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-- event time
TIME INDEX(ts)
);
Affected Rows: 0
-- TODO(discord9): remove this after auto infer table's time index is impl
CREATE TABLE approx_rate (
rate DOUBLE,
time_window TIMESTAMP,
update_at TIMESTAMP,
TIME INDEX(time_window)
);
Affected Rows: 0
CREATE FLOW find_approx_rate SINK TO approx_rate AS
SELECT
(max(byte) - min(byte)) / 30.0 as rate,
date_bin(INTERVAL '30 second', ts) as time_window
from
bytes_log
GROUP BY
time_window;
Affected Rows: 0
SHOW CREATE TABLE approx_rate;
+-------------+--------------------------------------------+
| Table | Create Table |
+-------------+--------------------------------------------+
| approx_rate | CREATE TABLE IF NOT EXISTS "approx_rate" ( |
| | "rate" DOUBLE NULL, |
| | "time_window" TIMESTAMP(3) NOT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | TIME INDEX ("time_window") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-------------+--------------------------------------------+
-- reordered insert, also test if null is handled correctly
INSERT INTO
bytes_log (ts, byte)
VALUES
('2023-01-01 00:00:01', NULL),
('2023-01-01 00:00:29', 300);
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('find_approx_rate');
+--------------------------------------+
| ADMIN FLUSH_FLOW('find_approx_rate') |
+--------------------------------------+
| FLOW_FLUSHED |
+--------------------------------------+
SELECT
rate,
time_window
FROM
approx_rate;
+------+---------------------+
| rate | time_window |
+------+---------------------+
| 0.0 | 2023-01-01T00:00:00 |
+------+---------------------+
-- reordered insert, also test if null is handled correctly
INSERT INTO
bytes_log (ts, byte)
VALUES
('2022-01-01 00:00:01', NULL),
('2022-01-01 00:00:29', NULL);
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('find_approx_rate');
+--------------------------------------+
| ADMIN FLUSH_FLOW('find_approx_rate') |
+--------------------------------------+
| FLOW_FLUSHED |
+--------------------------------------+
SELECT
rate,
time_window
FROM
approx_rate;
+------+---------------------+
| rate | time_window |
+------+---------------------+
| | 2022-01-01T00:00:00 |
| 0.0 | 2023-01-01T00:00:00 |
+------+---------------------+
-- reordered insert
INSERT INTO
bytes_log (ts, byte)
VALUES
('2025-01-01 00:00:01', 101),
('2025-01-01 00:00:29', 300);
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('find_approx_rate');
+--------------------------------------+
| ADMIN FLUSH_FLOW('find_approx_rate') |
+--------------------------------------+
| FLOW_FLUSHED |
+--------------------------------------+
SELECT
rate,
time_window
FROM
approx_rate;
+-------------------+---------------------+
| rate | time_window |
+-------------------+---------------------+
| | 2022-01-01T00:00:00 |
| 0.0 | 2023-01-01T00:00:00 |
| 6.633333333333334 | 2025-01-01T00:00:00 |
+-------------------+---------------------+
-- reordered insert
INSERT INTO
bytes_log (ts, byte)
VALUES
('2025-01-01 00:00:32', 450),
('2025-01-01 00:00:37', 500);
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('find_approx_rate');
+--------------------------------------+
| ADMIN FLUSH_FLOW('find_approx_rate') |
+--------------------------------------+
| FLOW_FLUSHED |
+--------------------------------------+
SELECT
rate,
time_window
FROM
approx_rate;
+--------------------+---------------------+
| rate | time_window |
+--------------------+---------------------+
| | 2022-01-01T00:00:00 |
| 0.0 | 2023-01-01T00:00:00 |
| 6.633333333333334 | 2025-01-01T00:00:00 |
| 1.6666666666666667 | 2025-01-01T00:00:30 |
+--------------------+---------------------+
DROP TABLE bytes_log;
Affected Rows: 0
DROP FLOW find_approx_rate;
Affected Rows: 0
DROP TABLE approx_rate;
Affected Rows: 0

View File

@@ -1,96 +0,0 @@
-- test if reordered insert is correctly handled
CREATE TABLE bytes_log (
byte INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-- event time
TIME INDEX(ts)
);
-- TODO(discord9): remove this after auto infer table's time index is impl
CREATE TABLE approx_rate (
rate DOUBLE,
time_window TIMESTAMP,
update_at TIMESTAMP,
TIME INDEX(time_window)
);
CREATE FLOW find_approx_rate SINK TO approx_rate AS
SELECT
(max(byte) - min(byte)) / 30.0 as rate,
date_bin(INTERVAL '30 second', ts) as time_window
from
bytes_log
GROUP BY
time_window;
SHOW CREATE TABLE approx_rate;
-- reordered insert, also test if null is handled correctly
INSERT INTO
bytes_log (ts, byte)
VALUES
('2023-01-01 00:00:01', NULL),
('2023-01-01 00:00:29', 300);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('find_approx_rate');
SELECT
rate,
time_window
FROM
approx_rate;
-- reordered insert, also test if null is handled correctly
INSERT INTO
bytes_log (ts, byte)
VALUES
('2022-01-01 00:00:01', NULL),
('2022-01-01 00:00:29', NULL);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('find_approx_rate');
SELECT
rate,
time_window
FROM
approx_rate;
-- reordered insert
INSERT INTO
bytes_log (ts, byte)
VALUES
('2025-01-01 00:00:01', 101),
('2025-01-01 00:00:29', 300);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('find_approx_rate');
SELECT
rate,
time_window
FROM
approx_rate;
-- reordered insert
INSERT INTO
bytes_log (ts, byte)
VALUES
('2025-01-01 00:00:32', 450),
('2025-01-01 00:00:37', 500);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('find_approx_rate');
SELECT
rate,
time_window
FROM
approx_rate;
DROP TABLE bytes_log;
DROP FLOW find_approx_rate;
DROP TABLE approx_rate;

View File

@@ -1,56 +0,0 @@
CREATE TABLE `api_requests` (
`timestamp` TIMESTAMP NOT NULL,
`request_id` STRING NOT NULL,
`upstream_id` STRING NOT NULL,
`application_id` STRING NULL,
`url` STRING NOT NULL,
`method` STRING NOT NULL,
`status_code` INTEGER NOT NULL,
`request_headers` JSON NULL,
`request_body` STRING NULL,
`response_headers` JSON NULL,
`response_body` STRING NULL,
`latency_ms` INTEGER NOT NULL,
`client_ip` STRING NULL,
`user_agent` STRING NULL,
TIME INDEX (`timestamp`)
)
WITH(
append_mode = 'true'
);
Affected Rows: 0
CREATE TABLE api_request_volume_upstream_stats (
`upstream_id` STRING NOT NULL,
`time_window` TIMESTAMP NOT NULL,
`request_count` BIGINT NOT NULL,
TIME INDEX (`time_window`)
);
Affected Rows: 0
CREATE FLOW api_request_volume_by_upstream
SINK TO api_request_volume_upstream_stats
AS
SELECT
upstream_id,
date_bin(INTERVAL '1 hour', timestamp, '2024-01-01 00:00:00'::TimestampNanosecond) AS time_window,
COUNT(*) AS request_count
FROM api_requests
GROUP BY upstream_id, time_window;
Affected Rows: 0
DROP FLOW api_request_volume_by_upstream;
Affected Rows: 0
DROP TABLE api_request_volume_upstream_stats;
Affected Rows: 0
DROP TABLE api_requests;
Affected Rows: 0

View File

@@ -1,41 +0,0 @@
CREATE TABLE `api_requests` (
`timestamp` TIMESTAMP NOT NULL,
`request_id` STRING NOT NULL,
`upstream_id` STRING NOT NULL,
`application_id` STRING NULL,
`url` STRING NOT NULL,
`method` STRING NOT NULL,
`status_code` INTEGER NOT NULL,
`request_headers` JSON NULL,
`request_body` STRING NULL,
`response_headers` JSON NULL,
`response_body` STRING NULL,
`latency_ms` INTEGER NOT NULL,
`client_ip` STRING NULL,
`user_agent` STRING NULL,
TIME INDEX (`timestamp`)
)
WITH(
append_mode = 'true'
);
CREATE TABLE api_request_volume_upstream_stats (
`upstream_id` STRING NOT NULL,
`time_window` TIMESTAMP NOT NULL,
`request_count` BIGINT NOT NULL,
TIME INDEX (`time_window`)
);
CREATE FLOW api_request_volume_by_upstream
SINK TO api_request_volume_upstream_stats
AS
SELECT
upstream_id,
date_bin(INTERVAL '1 hour', timestamp, '2024-01-01 00:00:00'::TimestampNanosecond) AS time_window,
COUNT(*) AS request_count
FROM api_requests
GROUP BY upstream_id, time_window;
DROP FLOW api_request_volume_by_upstream;
DROP TABLE api_request_volume_upstream_stats;
DROP TABLE api_requests;

View File

@@ -365,48 +365,33 @@ SELECT number FROM out_num_cnt_show;
| 16 |
+--------+
-- should mismatch, hence the old flow remains
-- should mismatch
CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number AS n1, number AS n2 FROM numbers_input_show where number > 15;
Error: 3001(EngineExecuteQuery), Invalid query: Column 1(name is 'ts', flow inferred name is 'n2')'s data type mismatch, expect Timestamp(Millisecond(TimestampMillisecondType)) got Int32(Int32Type)
-- should mismatch, hence the old flow remains
-- should mismatch
CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number AS n1, number AS n2, number AS n3 FROM numbers_input_show where number > 15;
Error: 3001(EngineExecuteQuery), Invalid query: Column 1(name is 'ts', flow inferred name is 'n2')'s data type mismatch, expect Timestamp(Millisecond(TimestampMillisecondType)) got Int32(Int32Type)
SELECT flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
INSERT INTO numbers_input_show VALUES (10, 6),(15, 7),(18, 3);
+---------------------------------------------------------------+
| flow_definition |
+---------------------------------------------------------------+
| SELECT number AS n1 FROM numbers_input_show WHERE number > 10 |
+---------------------------------------------------------------+
INSERT INTO numbers_input_show VALUES (10, 6),(11, 8),(15, 7),(18, 3);
Affected Rows: 4
Affected Rows: 3
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('filter_numbers_show');
+-----------------------------------------+
| ADMIN FLUSH_FLOW('filter_numbers_show') |
+-----------------------------------------+
| FLOW_FLUSHED |
+-----------------------------------------+
Error: 1003(Internal), Internal error: 1003
-- sink table shows new 11 since old flow remains
-- sink table stays the same since the flow error out due to column mismatch
SELECT number FROM out_num_cnt_show;
+--------+
| number |
+--------+
| 11 |
| 15 |
| 15 |
| 16 |
| 18 |
+--------+
DROP FLOW filter_numbers_show;

View File

@@ -147,20 +147,19 @@ ADMIN FLUSH_FLOW('filter_numbers_show');
SELECT number FROM out_num_cnt_show;
-- should mismatch, hence the old flow remains
-- should mismatch
CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number AS n1, number AS n2 FROM numbers_input_show where number > 15;
-- should mismatch, hence the old flow remains
-- should mismatch
CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number AS n1, number AS n2, number AS n3 FROM numbers_input_show where number > 15;
SELECT flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
INSERT INTO numbers_input_show VALUES (10, 6),(11, 8),(15, 7),(18, 3);
INSERT INTO numbers_input_show VALUES (10, 6),(15, 7),(18, 3);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('filter_numbers_show');
-- sink table shows new 11 since old flow remains
-- sink table stays the same since the flow error out due to column mismatch
SELECT number FROM out_num_cnt_show;
DROP FLOW filter_numbers_show;