diff --git a/Makefile b/Makefile index 0911465fb8..bd5c4638b0 100644 --- a/Makefile +++ b/Makefile @@ -180,6 +180,11 @@ postgres-check-%: postgres-% .PHONY: neon-pg-ext-% neon-pg-ext-%: postgres-% + +@echo "Compiling lsm3 $*" + mkdir -p $(POSTGRES_INSTALL_DIR)/build/lsm3-$* + $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config COPT='$(COPT)' \ + -C $(POSTGRES_INSTALL_DIR)/build/lsm3-$* \ + -f $(ROOT_PROJECT_DIR)/pgxn/lsm3/Makefile install +@echo "Compiling neon $*" mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-$* $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config COPT='$(COPT)' \ diff --git a/pgxn/lsm3/META.json b/pgxn/lsm3/META.json new file mode 100644 index 0000000000..34f456b9da --- /dev/null +++ b/pgxn/lsm3/META.json @@ -0,0 +1,34 @@ +{ + "name": "lsm3", + "abstract": "LSM tree implemented using standard Postgres B-Tree indexes", + "description": "LSM3 provides fast inserts because of background index updates and improving index access locality.", + "version": "0.1.3", + "maintainer": ["Konstantin Kninzhik "], + "license": { + "PostgreSQL": "http://www.postgresql.org/about/licence" + }, + "provides": { + "lsm3": { + "file": "lsm3--1.0.sql", + "docfile": "README.md", + "version": "0.1.3", + "abstract": "LSM tree implemented using standard Postgres B-Tree indexes" + } + }, + "tags": ["LSM", "Log-structured merge-tree"], + "resources": { + "repository": { + "url": "https://github.com/postgrespro/lsm3.git", + "web": "https://github.com/postgrespro/lsm3", + "type": "git" + }, + "bugtracker": { + "web": "https://github.com/postgrespro/lsm3/issues" + } + }, + "generated_by": "Konstantin Knizhnik", + "meta-spec": { + "version": "1.0.0", + "url": "http://pgxn.org/meta/spec.txt" + } +} diff --git a/pgxn/lsm3/Makefile b/pgxn/lsm3/Makefile new file mode 100644 index 0000000000..15c52eb7b2 --- /dev/null +++ b/pgxn/lsm3/Makefile @@ -0,0 +1,14 @@ +MODULE_big = lsm3 +OBJS = lsm3.o +PGFILEDESC = "lsm3 - MVCC storage with undo log" + +EXTENSION = lsm3 +DATA = lsm3--1.0.sql + +REGRESS = test +REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/lsm3/lsm3.conf + + +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) diff --git a/pgxn/lsm3/README.md b/pgxn/lsm3/README.md new file mode 100644 index 0000000000..7c5ec8b846 --- /dev/null +++ b/pgxn/lsm3/README.md @@ -0,0 +1,42 @@ +LSM tree implemented using standard Postgres B-Tree indexes. +Top index is used to perform fast inserts and on overflow it is merged +with base index. To perform merge operation concurrently +without blocking other operations with index, two top indexes are used: +active and merged. So totally there are three B-Tree indexes: +two top indexes and one base index. +When performing index scan we have to merge scans of all this three indexes. + +This extension needs to create data structure in shared memory and this is why it should be loaded through +"shared_preload_library" list. Once extension is created, you can define indexes using lsm3 access method: + +```sql +create extension lsm3; +create table t(id integer, val text); +create index idx on t using lsm3(id); +``` + +`Lsm3` provides for the same types and set of operations as standard B-Tree. + +Current restrictions of `Lsm3`: +- Parallel index scan is not supported. +- Array keys are not supported. +- `Lsm3` index can not be declared as unique. + +`Lsm3` extension can be configured using the following parameters: +- `lsm3.max_indexes`: maximal number of Lsm3 indexes (default 1024). +- `lsm3.top_index_size`: size (kb) of top index (default 64Mb). + +It is also possible to specify size of top index in relation options - this value will override `lsm3.top_index_size` GUC. + +Although unique constraint can not be enforced using Lsm3 index, it is still possible to mark index as unique to +optimize index search. If index is marked as unique and searched key is found in active +top index, then lookup in other two indexes is not performed. As far as application is most frequently +searching for last recently inserted data, we can speedup this search by performing just one index lookup instead of 3. +Index can be marked as unique using index options: + +```sql +create index idx on t using lsm3(id) with (unique=true); +``` + +Please notice that Lsm3 creates bgworker merge process for each Lsm3 index. +So you may need to adjust `max_worker_processes` in postgresql.conf to be large enough. diff --git a/pgxn/lsm3/expected/test.out b/pgxn/lsm3/expected/test.out new file mode 100644 index 0000000000..ae2887ecc8 --- /dev/null +++ b/pgxn/lsm3/expected/test.out @@ -0,0 +1,177 @@ +create extension lsm3; +create table t(k bigint, val bigint); +create index lsm3_index on t using lsm3(k); +set enable_seqscan=off; +insert into t values (1,10); +select lsm3_start_merge('lsm3_index'); + lsm3_start_merge +------------------ + +(1 row) + +select lsm3_wait_merge_completion('lsm3_index'); + lsm3_wait_merge_completion +---------------------------- + +(1 row) + +insert into t values (2,20); +select lsm3_start_merge('lsm3_index'); + lsm3_start_merge +------------------ + +(1 row) + +select lsm3_wait_merge_completion('lsm3_index'); + lsm3_wait_merge_completion +---------------------------- + +(1 row) + +insert into t values (3,30); +select lsm3_start_merge('lsm3_index'); + lsm3_start_merge +------------------ + +(1 row) + +select lsm3_wait_merge_completion('lsm3_index'); + lsm3_wait_merge_completion +---------------------------- + +(1 row) + +insert into t values (4,40); +select lsm3_start_merge('lsm3_index'); + lsm3_start_merge +------------------ + +(1 row) + +select lsm3_wait_merge_completion('lsm3_index'); + lsm3_wait_merge_completion +---------------------------- + +(1 row) + +insert into t values (5,50); +select lsm3_start_merge('lsm3_index'); + lsm3_start_merge +------------------ + +(1 row) + +select lsm3_wait_merge_completion('lsm3_index'); + lsm3_wait_merge_completion +---------------------------- + +(1 row) + +select lsm3_get_merge_count('lsm3_index'); + lsm3_get_merge_count +---------------------- + 5 +(1 row) + +select * from t where k = 1; + k | val +---+----- + 1 | 10 +(1 row) + +select * from t order by k; + k | val +---+----- + 1 | 10 + 2 | 20 + 3 | 30 + 4 | 40 + 5 | 50 +(5 rows) + +select * from t order by k desc; + k | val +---+----- + 5 | 50 + 4 | 40 + 3 | 30 + 2 | 20 + 1 | 10 +(5 rows) + +analyze t; +explain (COSTS OFF, TIMING OFF, SUMMARY OFF) select * from t order by k; + QUERY PLAN +---------------------------------- + Index Scan using lsm3_index on t +(1 row) + +insert into t values (generate_series(1,100000), 1); +insert into t values (generate_series(1000001,200000), 2); +insert into t values (generate_series(2000001,300000), 3); +insert into t values (generate_series(1,100000), 1); +insert into t values (generate_series(1000001,200000), 2); +insert into t values (generate_series(2000001,300000), 3); +select * from t where k = 1; + k | val +---+----- + 1 | 10 + 1 | 1 + 1 | 1 +(3 rows) + +select * from t where k = 1000000; + k | val +---+----- +(0 rows) + +select * from t where k = 2000000; + k | val +---+----- +(0 rows) + +select * from t where k = 3000000; + k | val +---+----- +(0 rows) + +analyze t; +explain (COSTS OFF, TIMING OFF, SUMMARY OFF) select * from t where k = 1; + QUERY PLAN +---------------------------------- + Index Scan using lsm3_index on t + Index Cond: (k = 1) +(2 rows) + +select lsm3_get_merge_count('lsm3_index') > 5; + ?column? +---------- + t +(1 row) + +truncate table t; +insert into t values (generate_series(1,1000000), 1); +select * from t where k = 1; + k | val +---+----- + 1 | 1 +(1 row) + +reindex table t; +select * from t where k = 1; + k | val +---+----- + 1 | 1 +(1 row) + +drop table t; +create table lsm(k bigint); +insert into lsm values (generate_series(1, 1000000)); +create index concurrently on lsm using lsm3(k); +select * from lsm where k = 1; + k +--- + 1 +(1 row) + +drop table lsm; diff --git a/pgxn/lsm3/lsm3--1.0.sql b/pgxn/lsm3/lsm3--1.0.sql new file mode 100644 index 0000000000..26fa8d4252 --- /dev/null +++ b/pgxn/lsm3/lsm3--1.0.sql @@ -0,0 +1,803 @@ +-- Lsm3 operators + +CREATE OR REPLACE FUNCTION lsm3_handler(internal) +RETURNS index_am_handler +AS 'MODULE_PATHNAME' +LANGUAGE C; + +CREATE ACCESS METHOD lsm3 TYPE INDEX HANDLER lsm3_handler; + +CREATE OPERATOR FAMILY integer_ops USING lsm3; + +CREATE OPERATOR CLASS int2_ops DEFAULT + FOR TYPE int2 USING lsm3 FAMILY integer_ops AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 btint2cmp(int2,int2); + +CREATE OPERATOR CLASS int4_ops DEFAULT + FOR TYPE int4 USING lsm3 FAMILY integer_ops AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 btint4cmp(int4,int4); + +CREATE OPERATOR CLASS int8_ops DEFAULT + FOR TYPE int8 USING lsm3 FAMILY integer_ops AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 btint8cmp(int8,int8); + +ALTER OPERATOR FAMILY integer_ops USING lsm3 ADD + OPERATOR 1 < (int2,int4), + OPERATOR 1 < (int2,int8), + OPERATOR 1 < (int4,int2), + OPERATOR 1 < (int4,int8), + OPERATOR 1 < (int8,int2), + OPERATOR 1 < (int8,int4), + + OPERATOR 2 <= (int2,int4), + OPERATOR 2 <= (int2,int8), + OPERATOR 2 <= (int4,int2), + OPERATOR 2 <= (int4,int8), + OPERATOR 2 <= (int8,int2), + OPERATOR 2 <= (int8,int4), + + OPERATOR 3 = (int2,int4), + OPERATOR 3 = (int2,int8), + OPERATOR 3 = (int4,int2), + OPERATOR 3 = (int4,int8), + OPERATOR 3 = (int8,int2), + OPERATOR 3 = (int8,int4), + + OPERATOR 4 >= (int2,int4), + OPERATOR 4 >= (int2,int8), + OPERATOR 4 >= (int4,int2), + OPERATOR 4 >= (int4,int8), + OPERATOR 4 >= (int8,int2), + OPERATOR 4 >= (int8,int4), + + OPERATOR 5 > (int2,int4), + OPERATOR 5 > (int2,int8), + OPERATOR 5 > (int4,int2), + OPERATOR 5 > (int4,int8), + OPERATOR 5 > (int8,int2), + OPERATOR 5 > (int8,int4), + + FUNCTION 1(int2,int4) btint24cmp(int2,int4), + FUNCTION 1(int2,int8) btint28cmp(int2,int8), + FUNCTION 1(int4,int2) btint42cmp(int4,int2), + FUNCTION 1(int4,int8) btint48cmp(int4,int8), + FUNCTION 1(int8,int4) btint84cmp(int8,int4), + FUNCTION 1(int8,int2) btint82cmp(int8,int2), + + FUNCTION 2(int2,int2) btint2sortsupport(internal), + FUNCTION 2(int4,int4) btint4sortsupport(internal), + FUNCTION 2(int8,int8) btint8sortsupport(internal), + + FUNCTION 3(int2,int8) in_range(int2,int2,int8,bool,bool), + FUNCTION 3(int2,int4) in_range(int2,int2,int4,bool,bool), + FUNCTION 3(int2,int2) in_range(int2,int2,int2,bool,bool), + FUNCTION 3(int4,int8) in_range(int4,int4,int8,bool,bool), + FUNCTION 3(int4,int4) in_range(int4,int4,int4,bool,bool), + FUNCTION 3(int4,int2) in_range(int4,int4,int2,bool,bool), + FUNCTION 3(int8,int8) in_range(int8,int8,int8,bool,bool), + + FUNCTION 4(int2,int2) btequalimage(oid), + FUNCTION 4(int4,int4) btequalimage(oid), + FUNCTION 4(int8,int8) btequalimage(oid); + +CREATE OPERATOR FAMILY float_ops USING lsm3; + +CREATE OPERATOR CLASS float4_ops DEFAULT + FOR TYPE float4 USING lsm3 FAMILY float_ops AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 btfloat4cmp(float4,float4); + +CREATE OPERATOR CLASS float8_ops DEFAULT + FOR TYPE float8 USING lsm3 FAMILY float_ops AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 btfloat8cmp(float8,float8); + + +ALTER OPERATOR FAMILY float_ops USING lsm3 ADD + OPERATOR 1 < (float4,float8), + OPERATOR 1 < (float8,float4), + + OPERATOR 2 <= (float4,float8), + OPERATOR 2 <= (float8,float4), + + OPERATOR 3 = (float4,float8), + OPERATOR 3 = (float8,float4), + + OPERATOR 4 >= (float4,float8), + OPERATOR 4 >= (float8,float4), + + OPERATOR 5 > (float4,float8), + OPERATOR 5 > (float8,float4), + + FUNCTION 1(float4,float8) btfloat48cmp(float4,float8), + FUNCTION 1(float8,float4) btfloat84cmp(float8,float4), + + FUNCTION 2(float4,float4) btfloat4sortsupport(internal), + FUNCTION 2(float8,float8) btfloat8sortsupport(internal), + + FUNCTION 3(float4,float8) in_range(float4,float4,float8,bool,bool), + FUNCTION 3(float8,float8) in_range(float8,float8,float8,bool,bool); + +CREATE OPERATOR CLASS bool_ops DEFAULT + FOR TYPE bool USING lsm3 AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 btboolcmp(bool,bool), + FUNCTION 4 btequalimage(oid); + +CREATE OPERATOR CLASS bpchar_ops DEFAULT + FOR TYPE bpchar USING lsm3 AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 bpcharcmp(bpchar,bpchar), + FUNCTION 2 bpchar_sortsupport(internal), + FUNCTION 4 btvarstrequalimage(oid); + +CREATE OPERATOR CLASS bytea_ops DEFAULT + FOR TYPE bytea USING lsm3 AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 byteacmp(bytea,bytea), + FUNCTION 2 bytea_sortsupport(internal), + FUNCTION 4 btequalimage(oid); + +CREATE OPERATOR CLASS char_ops DEFAULT + FOR TYPE "char" USING lsm3 AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 btcharcmp("char","char"), + FUNCTION 4 btequalimage(oid); + +CREATE OPERATOR FAMILY datetime_ops USING lsm3; + +CREATE OPERATOR CLASS date_ops DEFAULT + FOR TYPE date USING lsm3 FAMILY datetime_ops AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 date_cmp(date,date), + FUNCTION 2 date_sortsupport(internal), + FUNCTION 4 btequalimage(oid); + +CREATE OPERATOR CLASS timestamp_ops DEFAULT + FOR TYPE timestamp USING lsm3 FAMILY datetime_ops AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 timestamp_cmp(timestamp,timestamp), + FUNCTION 2 timestamp_sortsupport(internal), + FUNCTION 4 btequalimage(oid); + +CREATE OPERATOR CLASS timestamptz_ops DEFAULT + FOR TYPE timestamptz USING lsm3 FAMILY datetime_ops AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 timestamptz_cmp(timestamptz,timestamptz), + FUNCTION 2 timestamp_sortsupport(internal), + FUNCTION 4 btequalimage(oid); + +ALTER OPERATOR FAMILY datetime_ops USING lsm3 ADD + OPERATOR 1 < (date,timestamp), + OPERATOR 2 <= (date,timestamp), + OPERATOR 3 = (date,timestamp), + OPERATOR 4 >= (date,timestamp), + OPERATOR 5 > (date,timestamp), + FUNCTION 1(date,timestamp) date_cmp_timestamp(date,timestamp), + + OPERATOR 1 < (date,timestamptz), + OPERATOR 2 <= (date,timestamptz), + OPERATOR 3 = (date,timestamptz), + OPERATOR 4 >= (date,timestamptz), + OPERATOR 5 > (date,timestamptz), + FUNCTION 1(date,timestamptz) date_cmp_timestamptz(date,timestamptz), + + OPERATOR 1 < (timestamp,date), + OPERATOR 2 <= (timestamp,date), + OPERATOR 3 = (timestamp,date), + OPERATOR 4 >= (timestamp,date), + OPERATOR 5 > (timestamp,date), + FUNCTION 1(timestamp,date) timestamp_cmp_date(timestamp,date), + + OPERATOR 1 < (timestamptz,date), + OPERATOR 2 <= (timestamptz,date), + OPERATOR 3 = (timestamptz,date), + OPERATOR 4 >= (timestamptz,date), + OPERATOR 5 > (timestamptz,date), + FUNCTION 1(timestamptz,date) timestamptz_cmp_date(timestamptz,date), + + OPERATOR 1 < (timestamp,timestamptz), + OPERATOR 2 <= (timestamp,timestamptz), + OPERATOR 3 = (timestamp,timestamptz), + OPERATOR 4 >= (timestamp,timestamptz), + OPERATOR 5 > (timestamp,timestamptz), + FUNCTION 1(timestamp,timestamptz) timestamp_cmp_timestamptz(timestamp,timestamptz), + + OPERATOR 1 < (timestamptz,timestamp), + OPERATOR 2 <= (timestamptz,timestamp), + OPERATOR 3 = (timestamptz,timestamp), + OPERATOR 4 >= (timestamptz,timestamp), + OPERATOR 5 > (timestamptz,timestamp), + FUNCTION 1(timestamptz,timestamp) timestamptz_cmp_timestamp(timestamptz,timestamp), + + FUNCTION 3(date,interval) in_range(date,date,interval,bool,bool), + FUNCTION 3(timestamp,interval) in_range(timestamp,timestamp,interval,bool,bool), + FUNCTION 3(timestamptz,interval) in_range(timestamptz,timestamptz,interval,bool,bool); + +CREATE OPERATOR CLASS interval_ops DEFAULT + FOR TYPE interval USING lsm3 AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 interval_cmp(interval,interval), + FUNCTION 3 in_range(interval,interval,interval,bool,bool), + FUNCTION 4 btequalimage(oid); + +CREATE OPERATOR CLASS macaddr_ops DEFAULT + FOR TYPE macaddr USING lsm3 AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 macaddr_cmp(macaddr,macaddr), + FUNCTION 2 macaddr_sortsupport(internal), + FUNCTION 4 btequalimage(oid); + +CREATE OPERATOR CLASS numeric_ops DEFAULT + FOR TYPE numeric USING lsm3 AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 numeric_cmp(numeric,numeric), + FUNCTION 2 numeric_sortsupport(internal), + FUNCTION 3 in_range(numeric,numeric,numeric,bool,bool), + FUNCTION 4 btequalimage(oid); + +CREATE OPERATOR CLASS oid_ops DEFAULT + FOR TYPE oid USING lsm3 AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 btoidcmp(oid,oid), + FUNCTION 2 btoidsortsupport(internal), + FUNCTION 4 btequalimage(oid); + +CREATE OPERATOR FAMILY text_ops USING lsm3; + +CREATE OPERATOR CLASS text_ops DEFAULT + FOR TYPE text USING lsm3 FAMILY text_ops AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 bttextcmp(text,text), + FUNCTION 2 bttextsortsupport(internal), + FUNCTION 4 btvarstrequalimage(oid); + +CREATE OPERATOR CLASS name_ops DEFAULT + FOR TYPE name USING lsm3 FAMILY text_ops AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 btnamecmp(name,name), + FUNCTION 2 btnamesortsupport(internal), + FUNCTION 4 btvarstrequalimage(oid); + +ALTER OPERATOR FAMILY text_ops USING lsm3 ADD + OPERATOR 1 < (text,name), + OPERATOR 2 <= (text,name), + OPERATOR 3 = (text,name), + OPERATOR 4 >= (text,name), + OPERATOR 5 > (text,name), + FUNCTION 1(text,name) bttextnamecmp(text,name), + + OPERATOR 1 < (name,text), + OPERATOR 2 <= (name,text), + OPERATOR 3 = (name,text), + OPERATOR 4 >= (name,text), + OPERATOR 5 > (name,text), + FUNCTION 1(name,text) btnametextcmp(name,text); + +CREATE OPERATOR CLASS time_ops DEFAULT + FOR TYPE time USING lsm3 AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 time_cmp(time,time), + FUNCTION 3 in_range(time,time,interval,bool,bool), + FUNCTION 4 btequalimage(oid); + +CREATE OPERATOR CLASS timetz_ops DEFAULT + FOR TYPE timetz USING lsm3 AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 timetz_cmp(timetz,timetz), + FUNCTION 3 in_range(timetz,timetz,interval,bool,bool), + FUNCTION 4 btequalimage(oid); + +CREATE OPERATOR CLASS money_ops DEFAULT + FOR TYPE money USING lsm3 AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 cash_cmp(money,money), + FUNCTION 4 btequalimage(oid); + +CREATE OPERATOR CLASS uuid_ops DEFAULT + FOR TYPE uuid USING lsm3 AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 uuid_cmp(uuid,uuid), + FUNCTION 2 uuid_sortsupport(internal), + FUNCTION 4 btequalimage(oid); + +-- lsm3_bree_wrapper operators + +CREATE OR REPLACE FUNCTION lsm3_btree_wrapper(internal) +RETURNS index_am_handler +AS 'MODULE_PATHNAME' +LANGUAGE C; + +CREATE ACCESS METHOD lsm3_btree_wrapper TYPE INDEX HANDLER lsm3_btree_wrapper; + +CREATE OPERATOR FAMILY integer_ops USING lsm3_btree_wrapper; + +CREATE OPERATOR CLASS int2_ops DEFAULT + FOR TYPE int2 USING lsm3_btree_wrapper FAMILY integer_ops AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 btint2cmp(int2,int2); + +CREATE OPERATOR CLASS int4_ops DEFAULT + FOR TYPE int4 USING lsm3_btree_wrapper FAMILY integer_ops AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 btint4cmp(int4,int4); + +CREATE OPERATOR CLASS int8_ops DEFAULT + FOR TYPE int8 USING lsm3_btree_wrapper FAMILY integer_ops AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 btint8cmp(int8,int8); + +ALTER OPERATOR FAMILY integer_ops USING lsm3_btree_wrapper ADD + OPERATOR 1 < (int2,int4), + OPERATOR 1 < (int2,int8), + OPERATOR 1 < (int4,int2), + OPERATOR 1 < (int4,int8), + OPERATOR 1 < (int8,int2), + OPERATOR 1 < (int8,int4), + + OPERATOR 2 <= (int2,int4), + OPERATOR 2 <= (int2,int8), + OPERATOR 2 <= (int4,int2), + OPERATOR 2 <= (int4,int8), + OPERATOR 2 <= (int8,int2), + OPERATOR 2 <= (int8,int4), + + OPERATOR 3 = (int2,int4), + OPERATOR 3 = (int2,int8), + OPERATOR 3 = (int4,int2), + OPERATOR 3 = (int4,int8), + OPERATOR 3 = (int8,int2), + OPERATOR 3 = (int8,int4), + + OPERATOR 4 >= (int2,int4), + OPERATOR 4 >= (int2,int8), + OPERATOR 4 >= (int4,int2), + OPERATOR 4 >= (int4,int8), + OPERATOR 4 >= (int8,int2), + OPERATOR 4 >= (int8,int4), + + OPERATOR 5 > (int2,int4), + OPERATOR 5 > (int2,int8), + OPERATOR 5 > (int4,int2), + OPERATOR 5 > (int4,int8), + OPERATOR 5 > (int8,int2), + OPERATOR 5 > (int8,int4), + + FUNCTION 1(int2,int4) btint24cmp(int2,int4), + FUNCTION 1(int2,int8) btint28cmp(int2,int8), + FUNCTION 1(int4,int2) btint42cmp(int4,int2), + FUNCTION 1(int4,int8) btint48cmp(int4,int8), + FUNCTION 1(int8,int4) btint84cmp(int8,int4), + FUNCTION 1(int8,int2) btint82cmp(int8,int2), + + FUNCTION 2(int2,int2) btint2sortsupport(internal), + FUNCTION 2(int4,int4) btint4sortsupport(internal), + FUNCTION 2(int8,int8) btint8sortsupport(internal), + + FUNCTION 3(int2,int8) in_range(int2,int2,int8,bool,bool), + FUNCTION 3(int2,int4) in_range(int2,int2,int4,bool,bool), + FUNCTION 3(int2,int2) in_range(int2,int2,int2,bool,bool), + FUNCTION 3(int4,int8) in_range(int4,int4,int8,bool,bool), + FUNCTION 3(int4,int4) in_range(int4,int4,int4,bool,bool), + FUNCTION 3(int4,int2) in_range(int4,int4,int2,bool,bool), + FUNCTION 3(int8,int8) in_range(int8,int8,int8,bool,bool), + + FUNCTION 4(int2,int2) btequalimage(oid), + FUNCTION 4(int4,int4) btequalimage(oid), + FUNCTION 4(int8,int8) btequalimage(oid); + +CREATE OPERATOR FAMILY float_ops USING lsm3_btree_wrapper; + +CREATE OPERATOR CLASS float4_ops DEFAULT + FOR TYPE float4 USING lsm3_btree_wrapper FAMILY float_ops AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 btfloat4cmp(float4,float4); + +CREATE OPERATOR CLASS float8_ops DEFAULT + FOR TYPE float8 USING lsm3_btree_wrapper FAMILY float_ops AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 btfloat8cmp(float8,float8); + + +ALTER OPERATOR FAMILY float_ops USING lsm3_btree_wrapper ADD + OPERATOR 1 < (float4,float8), + OPERATOR 1 < (float8,float4), + + OPERATOR 2 <= (float4,float8), + OPERATOR 2 <= (float8,float4), + + OPERATOR 3 = (float4,float8), + OPERATOR 3 = (float8,float4), + + OPERATOR 4 >= (float4,float8), + OPERATOR 4 >= (float8,float4), + + OPERATOR 5 > (float4,float8), + OPERATOR 5 > (float8,float4), + + FUNCTION 1(float4,float8) btfloat48cmp(float4,float8), + FUNCTION 1(float8,float4) btfloat84cmp(float8,float4), + + FUNCTION 2(float4,float4) btfloat4sortsupport(internal), + FUNCTION 2(float8,float8) btfloat8sortsupport(internal), + + FUNCTION 3(float4,float8) in_range(float4,float4,float8,bool,bool), + FUNCTION 3(float8,float8) in_range(float8,float8,float8,bool,bool); + +CREATE OPERATOR CLASS bool_ops DEFAULT + FOR TYPE bool USING lsm3_btree_wrapper AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 btboolcmp(bool,bool), + FUNCTION 4 btequalimage(oid); + +CREATE OPERATOR CLASS bpchar_ops DEFAULT + FOR TYPE bpchar USING lsm3_btree_wrapper AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 bpcharcmp(bpchar,bpchar), + FUNCTION 2 bpchar_sortsupport(internal), + FUNCTION 4 btvarstrequalimage(oid); + +CREATE OPERATOR CLASS bytea_ops DEFAULT + FOR TYPE bytea USING lsm3_btree_wrapper AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 byteacmp(bytea,bytea), + FUNCTION 2 bytea_sortsupport(internal), + FUNCTION 4 btequalimage(oid); + +CREATE OPERATOR CLASS char_ops DEFAULT + FOR TYPE "char" USING lsm3_btree_wrapper AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 btcharcmp("char","char"), + FUNCTION 4 btequalimage(oid); + +CREATE OPERATOR FAMILY datetime_ops USING lsm3_btree_wrapper; + +CREATE OPERATOR CLASS date_ops DEFAULT + FOR TYPE date USING lsm3_btree_wrapper FAMILY datetime_ops AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 date_cmp(date,date), + FUNCTION 2 date_sortsupport(internal), + FUNCTION 4 btequalimage(oid); + +CREATE OPERATOR CLASS timestamp_ops DEFAULT + FOR TYPE timestamp USING lsm3_btree_wrapper FAMILY datetime_ops AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 timestamp_cmp(timestamp,timestamp), + FUNCTION 2 timestamp_sortsupport(internal), + FUNCTION 4 btequalimage(oid); + +CREATE OPERATOR CLASS timestamptz_ops DEFAULT + FOR TYPE timestamptz USING lsm3_btree_wrapper FAMILY datetime_ops AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 timestamptz_cmp(timestamptz,timestamptz), + FUNCTION 2 timestamp_sortsupport(internal), + FUNCTION 4 btequalimage(oid); + +ALTER OPERATOR FAMILY datetime_ops USING lsm3_btree_wrapper ADD + OPERATOR 1 < (date,timestamp), + OPERATOR 2 <= (date,timestamp), + OPERATOR 3 = (date,timestamp), + OPERATOR 4 >= (date,timestamp), + OPERATOR 5 > (date,timestamp), + FUNCTION 1(date,timestamp) date_cmp_timestamp(date,timestamp), + + OPERATOR 1 < (date,timestamptz), + OPERATOR 2 <= (date,timestamptz), + OPERATOR 3 = (date,timestamptz), + OPERATOR 4 >= (date,timestamptz), + OPERATOR 5 > (date,timestamptz), + FUNCTION 1(date,timestamptz) date_cmp_timestamptz(date,timestamptz), + + OPERATOR 1 < (timestamp,date), + OPERATOR 2 <= (timestamp,date), + OPERATOR 3 = (timestamp,date), + OPERATOR 4 >= (timestamp,date), + OPERATOR 5 > (timestamp,date), + FUNCTION 1(timestamp,date) timestamp_cmp_date(timestamp,date), + + OPERATOR 1 < (timestamptz,date), + OPERATOR 2 <= (timestamptz,date), + OPERATOR 3 = (timestamptz,date), + OPERATOR 4 >= (timestamptz,date), + OPERATOR 5 > (timestamptz,date), + FUNCTION 1(timestamptz,date) timestamptz_cmp_date(timestamptz,date), + + OPERATOR 1 < (timestamp,timestamptz), + OPERATOR 2 <= (timestamp,timestamptz), + OPERATOR 3 = (timestamp,timestamptz), + OPERATOR 4 >= (timestamp,timestamptz), + OPERATOR 5 > (timestamp,timestamptz), + FUNCTION 1(timestamp,timestamptz) timestamp_cmp_timestamptz(timestamp,timestamptz), + + OPERATOR 1 < (timestamptz,timestamp), + OPERATOR 2 <= (timestamptz,timestamp), + OPERATOR 3 = (timestamptz,timestamp), + OPERATOR 4 >= (timestamptz,timestamp), + OPERATOR 5 > (timestamptz,timestamp), + FUNCTION 1(timestamptz,timestamp) timestamptz_cmp_timestamp(timestamptz,timestamp), + + FUNCTION 3(date,interval) in_range(date,date,interval,bool,bool), + FUNCTION 3(timestamp,interval) in_range(timestamp,timestamp,interval,bool,bool), + FUNCTION 3(timestamptz,interval) in_range(timestamptz,timestamptz,interval,bool,bool); + +CREATE OPERATOR CLASS interval_ops DEFAULT + FOR TYPE interval USING lsm3_btree_wrapper AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 interval_cmp(interval,interval), + FUNCTION 3 in_range(interval,interval,interval,bool,bool), + FUNCTION 4 btequalimage(oid); + +CREATE OPERATOR CLASS macaddr_ops DEFAULT + FOR TYPE macaddr USING lsm3_btree_wrapper AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 macaddr_cmp(macaddr,macaddr), + FUNCTION 2 macaddr_sortsupport(internal), + FUNCTION 4 btequalimage(oid); + +CREATE OPERATOR CLASS numeric_ops DEFAULT + FOR TYPE numeric USING lsm3_btree_wrapper AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 numeric_cmp(numeric,numeric), + FUNCTION 2 numeric_sortsupport(internal), + FUNCTION 3 in_range(numeric,numeric,numeric,bool,bool), + FUNCTION 4 btequalimage(oid); + +CREATE OPERATOR CLASS oid_ops DEFAULT + FOR TYPE oid USING lsm3_btree_wrapper AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 btoidcmp(oid,oid), + FUNCTION 2 btoidsortsupport(internal), + FUNCTION 4 btequalimage(oid); + +CREATE OPERATOR FAMILY text_ops USING lsm3_btree_wrapper; + +CREATE OPERATOR CLASS text_ops DEFAULT + FOR TYPE text USING lsm3_btree_wrapper FAMILY text_ops AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 bttextcmp(text,text), + FUNCTION 2 bttextsortsupport(internal), + FUNCTION 4 btvarstrequalimage(oid); + +CREATE OPERATOR CLASS name_ops DEFAULT + FOR TYPE name USING lsm3_btree_wrapper FAMILY text_ops AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 btnamecmp(name,name), + FUNCTION 2 btnamesortsupport(internal), + FUNCTION 4 btvarstrequalimage(oid); + +ALTER OPERATOR FAMILY text_ops USING lsm3_btree_wrapper ADD + OPERATOR 1 < (text,name), + OPERATOR 2 <= (text,name), + OPERATOR 3 = (text,name), + OPERATOR 4 >= (text,name), + OPERATOR 5 > (text,name), + FUNCTION 1(text,name) bttextnamecmp(text,name), + + OPERATOR 1 < (name,text), + OPERATOR 2 <= (name,text), + OPERATOR 3 = (name,text), + OPERATOR 4 >= (name,text), + OPERATOR 5 > (name,text), + FUNCTION 1(name,text) btnametextcmp(name,text); + +CREATE OPERATOR CLASS time_ops DEFAULT + FOR TYPE time USING lsm3_btree_wrapper AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 time_cmp(time,time), + FUNCTION 3 in_range(time,time,interval,bool,bool), + FUNCTION 4 btequalimage(oid); + +CREATE OPERATOR CLASS timetz_ops DEFAULT + FOR TYPE timetz USING lsm3_btree_wrapper AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 timetz_cmp(timetz,timetz), + FUNCTION 3 in_range(timetz,timetz,interval,bool,bool), + FUNCTION 4 btequalimage(oid); + +CREATE OPERATOR CLASS money_ops DEFAULT + FOR TYPE money USING lsm3_btree_wrapper AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 cash_cmp(money,money), + FUNCTION 4 btequalimage(oid); + +CREATE OPERATOR CLASS uuid_ops DEFAULT + FOR TYPE uuid USING lsm3_btree_wrapper AS + OPERATOR 1 <, + OPERATOR 2 <=, + OPERATOR 3 =, + OPERATOR 4 >=, + OPERATOR 5 >, + FUNCTION 1 uuid_cmp(uuid,uuid), + FUNCTION 2 uuid_sortsupport(internal), + FUNCTION 4 btequalimage(oid); + +-- Number of index merges since server start +CREATE FUNCTION lsm3_get_merge_count(index regclass) returns bigint +AS 'MODULE_PATHNAME' LANGUAGE C STRICT PARALLEL RESTRICTED; + +-- Force merge of top index. +CREATE FUNCTION lsm3_start_merge(index regclass) returns void +AS 'MODULE_PATHNAME' LANGUAGE C STRICT PARALLEL RESTRICTED; + +-- Wait merge completion +CREATE FUNCTION lsm3_wait_merge_completion(index regclass) returns void +AS 'MODULE_PATHNAME' LANGUAGE C STRICT PARALLEL RESTRICTED; + +-- Get active top index size +CREATE FUNCTION lsm3_top_index_size(index regclass) returns bigint +AS 'MODULE_PATHNAME' LANGUAGE C STRICT PARALLEL RESTRICTED; diff --git a/pgxn/lsm3/lsm3.c b/pgxn/lsm3/lsm3.c new file mode 100644 index 0000000000..77875e55f0 --- /dev/null +++ b/pgxn/lsm3/lsm3.c @@ -0,0 +1,1254 @@ +#include "postgres.h" +#include "access/attnum.h" +#include "utils/relcache.h" +#include "access/reloptions.h" +#include "access/nbtree.h" +#include "access/table.h" +#include "access/relation.h" +#include "access/relscan.h" +#include "access/xact.h" +#include "access/xloginsert.h" +#include "commands/defrem.h" +#include "funcapi.h" +#include "utils/rel.h" +#include "nodes/makefuncs.h" +#include "catalog/dependency.h" +#include "catalog/pg_operator.h" +#include "catalog/index.h" +#include "catalog/namespace.h" +#include "catalog/storage.h" +#include "utils/lsyscache.h" +#include "utils/typcache.h" +#include "utils/builtins.h" +#include "utils/index_selfuncs.h" +#include "utils/rel.h" +#include "miscadmin.h" +#include "tcop/utility.h" +#include "postmaster/bgworker.h" +#include "pgstat.h" +#include "executor/executor.h" +#include "storage/ipc.h" +#include "storage/latch.h" +#include "storage/lock.h" +#include "storage/lmgr.h" +#include "storage/proc.h" +#include "storage/procarray.h" + +#include "lsm3.h" + +#ifdef PG_MODULE_MAGIC +PG_MODULE_MAGIC; +#endif + +PG_FUNCTION_INFO_V1(lsm3_handler); +PG_FUNCTION_INFO_V1(lsm3_btree_wrapper); +PG_FUNCTION_INFO_V1(lsm3_get_merge_count); +PG_FUNCTION_INFO_V1(lsm3_start_merge); +PG_FUNCTION_INFO_V1(lsm3_wait_merge_completion); +PG_FUNCTION_INFO_V1(lsm3_top_index_size); + +extern void _PG_init(void); +extern void _PG_fini(void); + +PGDLLEXPORT void lsm3_merger_main(Datum arg); + +/* Lsm3 dictionary (hashtable with control data for all indexes) */ +static HTAB* Lsm3Dict; +static LWLock* Lsm3DictLock; +static List* Lsm3ReleasedLocks; +static List* Lsm3Entries; +static bool Lsm3InsideCopy; + +/* Kind of relation optioms for Lsm3 index */ +static relopt_kind Lsm3ReloptKind; + +/* Lsm3 kooks */ +static ProcessUtility_hook_type PreviousProcessUtilityHook = NULL; +static shmem_startup_hook_type PreviousShmemStartupHook = NULL; +#if PG_VERSION_NUM>=150000 +static shmem_request_hook_type PreviousShmemRequestHook = NULL; +#endif +static ExecutorFinish_hook_type PreviousExecutorFinish = NULL; + +/* Lsm3 GUCs */ +static int Lsm3MaxIndexes; +static int Lsm3TopIndexSize; + +/* Background worker termination flag */ +static volatile bool Lsm3Cancel; + +#if PG_VERSION_NUM<160000 +typedef PGAlignedBlock PGIOAlignedBlock; +#endif + +static void +lsm3_shmem_request(void) +{ +#if PG_VERSION_NUM>=150000 + if (PreviousShmemRequestHook) + PreviousShmemRequestHook(); +#endif + + RequestAddinShmemSpace(hash_estimate_size(Lsm3MaxIndexes, sizeof(Lsm3DictEntry))); + RequestNamedLWLockTranche("lsm3", 1); +} + +static void +lsm3_shmem_startup(void) +{ + HASHCTL info; + + if (PreviousShmemStartupHook) + { + PreviousShmemStartupHook(); + } + memset(&info, 0, sizeof(info)); + info.keysize = sizeof(Oid); + info.entrysize = sizeof(Lsm3DictEntry); + Lsm3Dict = ShmemInitHash("lsm3 hash", + Lsm3MaxIndexes, Lsm3MaxIndexes, + &info, + HASH_ELEM | HASH_BLOBS); + Lsm3DictLock = &(GetNamedLWLockTranche("lsm3"))->lock; +} + +/* Initialize Lsm3 control data entry */ +static void +lsm3_init_entry(Lsm3DictEntry* entry, Relation index) +{ + SpinLockInit(&entry->spinlock); + entry->active_index = 0; + entry->merger = NULL; + entry->merge_in_progress = false; + entry->start_merge = false; + entry->n_merges = 0; + entry->n_inserts = 0; + entry->top[0] = entry->top[1] = InvalidOid; + entry->access_count[0] = entry->access_count[1] = 0; + entry->heap = index->rd_index->indrelid; + entry->db_id = MyDatabaseId; + entry->user_id = GetUserId(); + entry->top_index_size = index->rd_options ? ((Lsm3Options*)index->rd_options)->top_index_size : 0; +} + +/* Get B-Tree index size (number of blocks) */ +static BlockNumber +lsm3_get_index_size(Oid relid) +{ + Relation index = index_open(relid, AccessShareLock); + BlockNumber size = RelationGetNumberOfBlocks(index); + index_close(index, AccessShareLock); + return size; +} + +/* Lookup or create Lsm3 control data for this index */ +static Lsm3DictEntry* +lsm3_get_entry(Relation index) +{ + Lsm3DictEntry* entry; + bool found = true; + LWLockAcquire(Lsm3DictLock, LW_SHARED); + entry = (Lsm3DictEntry*)hash_search(Lsm3Dict, &RelationGetRelid(index), HASH_FIND, &found); + if (entry == NULL) + { + /* We need exclusive lock to create new entry */ + LWLockRelease(Lsm3DictLock); + LWLockAcquire(Lsm3DictLock, LW_EXCLUSIVE); + entry = (Lsm3DictEntry*)hash_search(Lsm3Dict, &RelationGetRelid(index), HASH_ENTER, &found); + } + if (!found) + { + char* relname = RelationGetRelationName(index); + lsm3_init_entry(entry, index); + for (int i = 0; i < 2; i++) + { + char* topidxname = psprintf("%s_top%d", relname, i); + entry->top[i] = get_relname_relid(topidxname, RelationGetNamespace(index)); + if (entry->top[i] == InvalidOid) + { + elog(ERROR, "Lsm3: failed to lookup %s index", topidxname); + } + } + entry->active_index = lsm3_get_index_size(entry->top[0]) >= lsm3_get_index_size(entry->top[1]) ? 0 : 1; + } + LWLockRelease(Lsm3DictLock); + return entry; +} + +/* Launch merger bgworker */ +static void +lsm3_launch_bgworker(Lsm3DictEntry* entry) +{ + BackgroundWorker worker; + BackgroundWorkerHandle *handle; + pid_t bgw_pid; + + MemSet(&worker, 0, sizeof(worker)); + snprintf(worker.bgw_name, sizeof(worker.bgw_name), "lsm3-merger-%d", entry->base); + snprintf(worker.bgw_type, sizeof(worker.bgw_type), "lsm3-merger-%d", entry->base); + worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; + worker.bgw_start_time = BgWorkerStart_ConsistentState; + worker.bgw_restart_time = BGW_NEVER_RESTART; + strcpy(worker.bgw_function_name, "lsm3_merger_main"); + strcpy(worker.bgw_library_name, "lsm3"); + worker.bgw_main_arg = PointerGetDatum(entry); + worker.bgw_notify_pid = MyProcPid; + + if (!RegisterDynamicBackgroundWorker(&worker, &handle)) + { + elog(ERROR, "Lsm3: failed to start background worker"); + } + if (WaitForBackgroundWorkerStartup(handle, &bgw_pid) != BGWH_STARTED) + { + elog(ERROR, "Lsm3: startup of background worker is failed"); + } + entry->merger = BackendPidGetProc(bgw_pid); + for (int n_attempts = 0; entry->merger == NULL || n_attempts < 100; n_attempts++) + { + pg_usleep(10000); /* wait background worker to be registered in procarray */ + entry->merger = BackendPidGetProc(bgw_pid); + } + if (entry->merger == NULL) + { + elog(ERROR, "Lsm3: background worker %d is crashed", bgw_pid); + } +} + +/* Cancel merger bgwroker */ +static void +lsm3_merge_cancel(int sig) +{ + Lsm3Cancel = true; + SetLatch(MyLatch); +} + +/* Truncate top index */ +static void +lsm3_truncate_index(Oid index_oid, Oid heap_oid) +{ + Relation index = index_open(index_oid, AccessExclusiveLock); + Relation heap = table_open(heap_oid, AccessShareLock); /* heap is actually not used, because we will not load data to top indexes */ + IndexInfo* indexInfo = BuildDummyIndexInfo(index); + RelationTruncate(index, 0); + elog(LOG, "Lsm3: truncate index %s", RelationGetRelationName(index)); + index_build(heap, index, indexInfo, true, false); + index_close(index, AccessExclusiveLock); + table_close(heap, AccessShareLock); +} + +#define INSERT_FLAGS UNIQUE_CHECK_NO, false + +/* Merge top index into base index */ +static void +lsm3_merge_indexes(Oid dst_oid, Oid src_oid, Oid heap_oid) +{ + Relation top_index = index_open(src_oid, AccessShareLock); + Relation heap = table_open(heap_oid, AccessShareLock); + Relation base_index = index_open(dst_oid, RowExclusiveLock); + IndexScanDesc scan; + bool ok; + Oid save_am = base_index->rd_rel->relam; + + elog(LOG, "Lsm3: merge top index %s with size %d blocks", RelationGetRelationName(top_index), RelationGetNumberOfBlocks(top_index)); + + base_index->rd_rel->relam = BTREE_AM_OID; + scan = index_beginscan(heap, top_index, SnapshotAny, 0, 0); + scan->xs_want_itup = true; + btrescan(scan, NULL, 0, 0, 0); + for (ok = _bt_first(scan, ForwardScanDirection); ok; ok = _bt_next(scan, ForwardScanDirection)) + { + IndexTuple itup = scan->xs_itup; + if (BTreeTupleIsPosting(itup)) + { + /* Some dirty coding here related with handling of posting items (index deduplication). + * If index tuple is posting item, we need to transfer it to normal index tuple. + * Posting list is representing by index tuple with INDEX_ALT_TID_MASK bit set in t_info and + * BT_IS_POSTING bit in TID offset, following by array of TIDs. + * We need to store right TID (taken from xs_heaptid) and correct index tuple length + * (not including size of TIDs array), clearing INDEX_ALT_TID_MASK. + * For efficiency reasons let's do it in place, saving and restoring original values after insertion is done. + */ + ItemPointerData save_tid = itup->t_tid; + unsigned short save_info = itup->t_info; + itup->t_info = (save_info & ~(INDEX_SIZE_MASK | INDEX_ALT_TID_MASK)) + BTreeTupleGetPostingOffset(itup); + itup->t_tid = scan->xs_heaptid; + _bt_doinsert(base_index, itup, INSERT_FLAGS, heap); /* lsm3 index is not unique so need not to heck for duplica +tes */ + itup->t_tid = save_tid; + itup->t_info = save_info; + } + else + { + _bt_doinsert(base_index, itup, INSERT_FLAGS, heap); /* lsm3 index is not unique so need not to heck for duplica +tes */ + } + } + index_endscan(scan); + base_index->rd_rel->relam = save_am; + index_close(top_index, AccessShareLock); + index_close(base_index, RowExclusiveLock); + table_close(heap, AccessShareLock); +} + +/* Lsm3 index options. + */ +static bytea * +lsm3_options(Datum reloptions, bool validate) +{ + static const relopt_parse_elt tab[] = { + {"fillfactor", RELOPT_TYPE_INT, offsetof(BTOptions, fillfactor)}, + {"vacuum_cleanup_index_scale_factor", RELOPT_TYPE_REAL, + offsetof(BTOptions, vacuum_cleanup_index_scale_factor)}, + {"deduplicate_items", RELOPT_TYPE_BOOL, + offsetof(BTOptions, deduplicate_items)}, + {"top_index_size", RELOPT_TYPE_INT, offsetof(Lsm3Options, top_index_size)}, + {"unique", RELOPT_TYPE_BOOL, offsetof(Lsm3Options, unique)} + }; + return (bytea *) build_reloptions(reloptions, validate, Lsm3ReloptKind, + sizeof(Lsm3Options), tab, lengthof(tab)); +} + +/* Main function of merger bgwroker */ +void +lsm3_merger_main(Datum arg) +{ + Lsm3DictEntry* entry = (Lsm3DictEntry*)DatumGetPointer(arg); + char *appname; + + pqsignal(SIGINT, lsm3_merge_cancel); + pqsignal(SIGQUIT, lsm3_merge_cancel); + pqsignal(SIGTERM, lsm3_merge_cancel); + + /* We're now ready to receive signals */ + BackgroundWorkerUnblockSignals(); + + BackgroundWorkerInitializeConnectionByOid(entry->db_id, entry->user_id, 0); + + appname = psprintf("lsm3 merger for %d", entry->base); + pgstat_report_appname(appname); + pfree(appname); + + while (!Lsm3Cancel) + { + int merge_index= -1; + int wr; + pgstat_report_activity(STATE_IDLE, "waiting"); + wr = WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1L, PG_WAIT_EXTENSION); + + if ((wr & WL_POSTMASTER_DEATH) || Lsm3Cancel) + { + break; + } + + ResetLatch(MyLatch); + + /* Check if merge is requested under spinlock */ + SpinLockAcquire(&entry->spinlock); + if (entry->start_merge) + { + merge_index = 1 - entry->active_index; /* at this moment active index should already by swapped */ + entry->start_merge = false; + } + SpinLockRelease(&entry->spinlock); + + if (merge_index >= 0) + { + StartTransactionCommand(); + { + pgstat_report_activity(STATE_RUNNING, "merging"); + lsm3_merge_indexes(entry->base, entry->top[merge_index], entry->heap); + + pgstat_report_activity(STATE_RUNNING, "truncate"); + lsm3_truncate_index(entry->top[merge_index], entry->heap); + } + CommitTransactionCommand(); + + SpinLockAcquire(&entry->spinlock); + entry->merge_in_progress = false; /* mark merge as completed */ + SpinLockRelease(&entry->spinlock); + } + } + entry->merger = NULL; +} + +/* Build index tuple comparator context */ +static SortSupport +lsm3_build_sortkeys(Relation index) +{ + int keysz = IndexRelationGetNumberOfKeyAttributes(index); + SortSupport sortKeys = (SortSupport) palloc0(keysz * sizeof(SortSupportData)); + BTScanInsert inskey = _bt_mkscankey(index, NULL); + Oid save_am = index->rd_rel->relam; + + index->rd_rel->relam = BTREE_AM_OID; + + for (int i = 0; i < keysz; i++) + { + SortSupport sortKey = &sortKeys[i]; + ScanKey scanKey = &inskey->scankeys[i]; + int16 strategy; + + sortKey->ssup_cxt = CurrentMemoryContext; + sortKey->ssup_collation = scanKey->sk_collation; + sortKey->ssup_nulls_first = + (scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0; + sortKey->ssup_attno = scanKey->sk_attno; + /* Abbreviation is not supported here */ + sortKey->abbreviate = false; + + Assert(sortKey->ssup_attno != 0); + + strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ? + BTGreaterStrategyNumber : BTLessStrategyNumber; + + PrepareSortSupportFromIndexRel(index, strategy, sortKey); + } + index->rd_rel->relam = save_am; + return sortKeys; +} + +/* Compare index tuples */ +static int +lsm3_compare_index_tuples(IndexScanDesc scan1, IndexScanDesc scan2, SortSupport sortKeys) +{ + int n_keys = IndexRelationGetNumberOfKeyAttributes(scan1->indexRelation); + + for (int i = 1; i <= n_keys; i++) + { + Datum datum[2]; + bool isNull[2]; + int result; + + datum[0] = index_getattr(scan1->xs_itup, i, scan1->xs_itupdesc, &isNull[0]); + datum[1] = index_getattr(scan2->xs_itup, i, scan2->xs_itupdesc, &isNull[1]); + result = ApplySortComparator(datum[0], isNull[0], + datum[1], isNull[1], + &sortKeys[i - 1]); + if (result != 0) + { + return result; + } + } + return ItemPointerCompare(&scan1->xs_heaptid, &scan2->xs_heaptid); +} + +/* + * Lsm3 access methods implementation + */ + +static IndexBuildResult * +lsm3_build(Relation heap, Relation index, IndexInfo *indexInfo) +{ + bool found; + Lsm3DictEntry* entry; + LWLockAcquire(Lsm3DictLock, LW_EXCLUSIVE); + elog(LOG, "lsm3_build %s", index->rd_rel->relname.data); + entry = hash_search(Lsm3Dict, &RelationGetRelid(index), HASH_ENTER, &found); /* Setting Lsm3Entry indicates to utility hook that Lsm3 index was created */ + if (!found) + { + lsm3_init_entry(entry, index); + } + { + MemoryContext old_context = MemoryContextSwitchTo(TopMemoryContext); + Lsm3Entries = lappend(Lsm3Entries, entry); + MemoryContextSwitchTo(old_context); + } + entry->am_id = index->rd_rel->relam; + index->rd_rel->relam = BTREE_AM_OID; + LWLockRelease(Lsm3DictLock); /* Release lock set by lsm3_build */ + return btbuild(heap, index, indexInfo); +} + +/* + * Grab previously release self locks (to let merger to proceed). + */ +static void +lsm3_reacquire_locks(void) +{ + if (Lsm3ReleasedLocks) + { + ListCell* cell; + foreach (cell, Lsm3ReleasedLocks) + { + Oid indexOid = lfirst_oid(cell); + LockRelationOid(indexOid, RowExclusiveLock); + } + list_free(Lsm3ReleasedLocks); + Lsm3ReleasedLocks = NULL; + } +} + +/* Insert in active top index, on overflow swap active indexes and initiate merge to base index */ +static bool +lsm3_insert(Relation rel, Datum *values, bool *isnull, + ItemPointer ht_ctid, Relation heapRel, + IndexUniqueCheck checkUnique, + bool indexUnchanged, + IndexInfo *indexInfo) +{ + Lsm3DictEntry* entry = lsm3_get_entry(rel); + + int active_index; + uint64 n_merges; /* used to check if merge was initiated by somebody else */ + Relation index; + Oid save_am; + bool overflow; + int top_index_size = entry->top_index_size ? entry->top_index_size : Lsm3TopIndexSize; + bool is_initialized = true; + + /* Obtain current active index and increment access counter under spinlock */ + SpinLockAcquire(&entry->spinlock); + active_index = entry->active_index; + if (entry->top[active_index]) + entry->access_count[active_index] += 1; + else + is_initialized = false; + n_merges = entry->n_merges; + SpinLockRelease(&entry->spinlock); + + if (!is_initialized) + { + bool res; + save_am = rel->rd_rel->relam; + rel->rd_rel->relam = BTREE_AM_OID; + res = btinsert(rel, values, isnull, ht_ctid, heapRel, checkUnique, + indexUnchanged, + indexInfo); + rel->rd_rel->relam = save_am; + return res; + } + /* Do insert in top index */ + index = index_open(entry->top[active_index], RowExclusiveLock); + index->rd_rel->relam = BTREE_AM_OID; + save_am = index->rd_rel->relam; + btinsert(index, values, isnull, ht_ctid, heapRel, checkUnique, + indexUnchanged, + indexInfo); + index_close(index, RowExclusiveLock); + index->rd_rel->relam = save_am; + + overflow = !entry->merge_in_progress /* do not check for overflow if merge was already initiated */ + && (entry->n_inserts % LSM3_CHECK_TOP_INDEX_SIZE_PERIOD) == 0 /* perform check only each N-th insert */ + && RelationGetNumberOfBlocks(index)*(BLCKSZ/1024) > top_index_size; + + SpinLockAcquire(&entry->spinlock); + /* If merge was not initiated before by somebody else, then do it */ + if (overflow && !entry->merge_in_progress && entry->n_merges == n_merges) + { + Assert(entry->active_index == active_index); + entry->merge_in_progress = true; + entry->active_index ^= 1; /* swap top indexes */ + entry->n_merges += 1; + } + Assert(entry->access_count[active_index] > 0); + entry->access_count[active_index] -= 1; + entry->n_inserts += 1; + if (entry->merge_in_progress) + { + LOCKTAG tag; + SET_LOCKTAG_RELATION(tag, + MyDatabaseId, + entry->top[1-active_index]); + /* Holding lock on non-ative index prevent merger bgworker from truncation this index */ +#if PG_VERSION_NUM>=170000 + if (LockHeldByMe(&tag, RowExclusiveLock, false)) +#else + if (LockHeldByMe(&tag, RowExclusiveLock)) +#endif + { + /* Copy locks all indexes and hold this locks until end of copy. + * We can not just release lock, because otherwise CopyFrom produces + * "you don't own a lock of type" warning. + * So just try to periodically release this lock and let merger grab it. + */ + if (!Lsm3InsideCopy || + (entry->n_inserts % LSM3_CHECK_TOP_INDEX_SIZE_PERIOD) == 0) /* release lock only each N-th insert */ + + { + LockRelease(&tag, RowExclusiveLock, false); + Lsm3ReleasedLocks = lappend_oid(Lsm3ReleasedLocks, entry->top[1-active_index]); + } + } + + /* If all inserts in previous active index are completed then we can start merge */ + if (entry->active_index != active_index && entry->access_count[active_index] == 0) + { + entry->start_merge = true; + if (entry->merger == NULL) /* lazy start of bgworker */ + { + lsm3_launch_bgworker(entry); + } + SetLatch(&entry->merger->procLatch); + } + } + SpinLockRelease(&entry->spinlock); + + /* We have to require released locks because othervise CopyFrom will produce warning */ + if (Lsm3InsideCopy && Lsm3ReleasedLocks) + { + pg_usleep(1); /* give merge thread a chance to grab the lock before we require it */ + lsm3_reacquire_locks(); + } + return false; +} + +static IndexScanDesc +lsm3_beginscan(Relation rel, int nkeys, int norderbys) +{ + IndexScanDesc scan; + Lsm3ScanOpaque* so; + int i; + + /* no order by operators allowed */ + Assert(norderbys == 0); + + /* get the scan */ + scan = RelationGetIndexScan(rel, nkeys, norderbys); + scan->xs_itupdesc = RelationGetDescr(rel); + so = (Lsm3ScanOpaque*)palloc(sizeof(Lsm3ScanOpaque)); + so->entry = lsm3_get_entry(rel); + so->sortKeys = lsm3_build_sortkeys(rel); + for (i = 0; i < 2; i++) + { + if (so->entry->top[i]) + { + so->top_index[i] = index_open(so->entry->top[i], AccessShareLock); + so->scan[i] = btbeginscan(so->top_index[i], nkeys, norderbys); + } + else + { + so->top_index[i] = NULL; + so->scan[i] = NULL; + } + } + so->scan[2] = btbeginscan(rel, nkeys, norderbys); + for (i = 0; i < 3; i++) + { + if (so->scan[i]) + { + so->eof[i] = false; + so->scan[i]->xs_want_itup = true; + so->scan[i]->parallel_scan = NULL; + } + } + so->unique = rel->rd_options ? ((Lsm3Options*)rel->rd_options)->unique : false; + so->curr_index = -1; + scan->opaque = so; + + return scan; +} + +static void +lsm3_rescan(IndexScanDesc scan, ScanKey scankey, int nscankeys, + ScanKey orderbys, int norderbys) +{ + Lsm3ScanOpaque* so = (Lsm3ScanOpaque*) scan->opaque; + + so->curr_index = -1; + for (int i = 0; i < 3; i++) + { + if (so->scan[i]) + { + btrescan(so->scan[i], scankey, nscankeys, orderbys, norderbys); + so->eof[i] = false; + } + } +} + +static void +lsm3_endscan(IndexScanDesc scan) +{ + Lsm3ScanOpaque* so = (Lsm3ScanOpaque*) scan->opaque; + + for (int i = 0; i < 3; i++) + { + if (so->scan[i]) + { + btendscan(so->scan[i]); + if (i < 2) + { + index_close(so->top_index[i], AccessShareLock); + } + } + } + pfree(so); +} + + +static bool +lsm3_gettuple(IndexScanDesc scan, ScanDirection dir) +{ + Lsm3ScanOpaque* so = (Lsm3ScanOpaque*) scan->opaque; + int min = -1; + int curr = so->curr_index; + /* We start with active top index, then merging index and last of all: largest base index */ + int try_index_order[3] = {so->entry->active_index, 1-so->entry->active_index, 2}; + + /* btree indexes are never lossy */ + scan->xs_recheck = false; + + if (curr >= 0) /* lazy advance of current index */ + { + so->eof[curr] = !_bt_next(so->scan[curr], dir); /* move forward current index */ + } + + for (int j = 0; j < 3; j++) + { + int i = try_index_order[j]; + BTScanOpaque bto = (BTScanOpaque)so->scan[i]->opaque; + so->scan[i]->xs_snapshot = scan->xs_snapshot; + if (!so->eof[i] && !BTScanPosIsValid(bto->currPos)) + { + so->eof[i] = !_bt_first(so->scan[i], dir); + if (!so->eof[i] && so->unique && scan->numberOfKeys == scan->indexRelation->rd_index->indnkeyatts) + { + /* If index is marked as unique and we perform lookup using all index keys, + * then we can stop after locating first occurrence. + * If make it possible to avoid lookups of all three indexes. + */ + elog(DEBUG1, "Lsm3: lookup %d indexes", j+1); + while (++j < 3) /* prevent search of all remanining indexes */ + { + so->eof[try_index_order[j]] = true; + } + min = i; + break; + } + } + if (!so->eof[i]) + { + if (min < 0) + { + min = i; + } + else + { + int result = lsm3_compare_index_tuples(so->scan[i], so->scan[min], so->sortKeys); + if (result == 0) + { + /* Duplicate: it can happen during merge when same tid is both in top and base index */ + so->eof[i] = !_bt_next(so->scan[i], dir); /* just skip one of entries */ + } + else if ((result < 0) == ScanDirectionIsForward(dir)) + { + min = i; + } + } + } + } + if (min < 0) /* all indexes are traversed */ + { + return false; + } + else + { + scan->xs_heaptid = so->scan[min]->xs_heaptid; /* copy TID */ + if (scan->xs_want_itup) { + scan->xs_itup = so->scan[min]->xs_itup; + } + so->curr_index = min; /*will be advance at next call of gettuple */ + return true; + } +} + +static int64 +lsm3_getbitmap(IndexScanDesc scan, TIDBitmap *tbm) +{ + Lsm3ScanOpaque* so = (Lsm3ScanOpaque*)scan->opaque; + int64 ntids = 0; + for (int i = 0; i < 3; i++) + { + if (so->scan[i]) + { + so->scan[i]->xs_snapshot = scan->xs_snapshot; + ntids += btgetbitmap(so->scan[i], tbm); + } + } + return ntids; +} + +Datum +lsm3_handler(PG_FUNCTION_ARGS) +{ + IndexAmRoutine *amroutine = makeNode(IndexAmRoutine); + + amroutine->amstrategies = BTMaxStrategyNumber; + amroutine->amsupport = BTNProcs; + amroutine->amoptsprocnum = BTOPTIONS_PROC; + amroutine->amcanorder = true; + amroutine->amcanorderbyop = false; + amroutine->amcanbackward = true; + amroutine->amcanunique = false; /* We can't check that index is unique without accessing base index */ + amroutine->amcanmulticol = true; + amroutine->amoptionalkey = true; + amroutine->amsearcharray = false; /* TODO: not sure if it will work correctly with merge */ + amroutine->amsearchnulls = true; + amroutine->amstorage = false; + amroutine->amclusterable = true; + amroutine->ampredlocks = true; + amroutine->amcanparallel = false; /* TODO: parallel scac is not supported yet */ + amroutine->amcaninclude = true; + amroutine->amusemaintenanceworkmem = false; + amroutine->amparallelvacuumoptions = 0; + amroutine->amkeytype = InvalidOid; + + amroutine->ambuild = lsm3_build; + amroutine->ambuildempty = btbuildempty; + amroutine->aminsert = lsm3_insert; + amroutine->ambulkdelete = btbulkdelete; + amroutine->amvacuumcleanup = btvacuumcleanup; + amroutine->amcanreturn = btcanreturn; + amroutine->amcostestimate = btcostestimate; + amroutine->amoptions = lsm3_options; + amroutine->amproperty = btproperty; + amroutine->ambuildphasename = btbuildphasename; + amroutine->amvalidate = btvalidate; + amroutine->ambeginscan = lsm3_beginscan; + amroutine->amrescan = lsm3_rescan; + amroutine->amgettuple = lsm3_gettuple; + amroutine->amgetbitmap = lsm3_getbitmap; + amroutine->amendscan = lsm3_endscan; + amroutine->ammarkpos = NULL; /* When do we need index_markpos? Can we live without it? */ + amroutine->amrestrpos = NULL; + amroutine->amestimateparallelscan = NULL; + amroutine->aminitparallelscan = NULL; + amroutine->amparallelrescan = NULL; + + PG_RETURN_POINTER(amroutine); +} + +/* + * Access methods for B-Tree wrapper: actually we aonly want to disable inserts. + */ + +/* We do not need to load data in top top index: just initialize index metadata */ +static IndexBuildResult * +lsm3_build_empty(Relation heap, Relation index, IndexInfo *indexInfo) +{ + PGIOAlignedBlock metapage; + XLogRecPtr lsn; + + /* Construct metapage. */ + _bt_initmetapage(metapage.data, BTREE_METAPAGE, 0, _bt_allequalimage(index, false)); + +#if PG_VERSION_NUM>=150000 + RelationGetSmgr(index); +#else + RelationOpenSmgr(index); +#endif + + smgr_start_unlogged_build(index->rd_smgr); + + /* + * Write the page and log it. It might seem that an immediate sync would + * be sufficient to guarantee that the file exists on disk, but recovery + * itself might remove it while replaying, for example, an + * XLOG_DBASE_CREATE or XLOG_TBLSPC_CREATE record. Therefore, we need + * this even when wal_level=minimal. + */ + PageSetChecksumInplace(metapage.data, BTREE_METAPAGE); + smgrextend(index->rd_smgr, MAIN_FORKNUM, BTREE_METAPAGE, + (char *) metapage.data, true); + + smgr_finish_unlogged_build_phase_1(index->rd_smgr); + +#if PG_VERSION_NUM>=160000 + lsn = log_newpage(&index->rd_smgr->smgr_rlocator.locator, MAIN_FORKNUM, + BTREE_METAPAGE, metapage.data, true); + SetLastWrittenLSNForBlock(lsn, index->rd_smgr->smgr_rlocator.locator, MAIN_FORKNUM, BTREE_METAPAGE); + SetLastWrittenLSNForRelation(lsn, index->rd_smgr->smgr_rlocator.locator, MAIN_FORKNUM); +#else + lsn = log_newpage(&index->rd_smgr->smgr_rnode.node, MAIN_FORKNUM, + BTREE_METAPAGE, metapage.data, true); + SetLastWrittenLSNForBlock(lsn, index->rd_smgr->smgr_rnode.node, MAIN_FORKNUM, BTREE_METAPAGE); + SetLastWrittenLSNForRelation(lsn, index->rd_smgr->smgr_rnode.node, MAIN_FORKNUM); +#endif + + /* + * An immediate sync is required even if we xlog'd the page, because the + * write did not go through shared_buffers and therefore a concurrent + * checkpoint may have moved the redo pointer past our xlog record. + */ + smgrimmedsync(index->rd_smgr, MAIN_FORKNUM); + smgr_end_unlogged_build(index->rd_smgr); + + RelationCloseSmgr(index); + + + return (IndexBuildResult *) palloc0(sizeof(IndexBuildResult)); +} + +static bool +lsm3_dummy_insert(Relation rel, Datum *values, bool *isnull, + ItemPointer ht_ctid, Relation heapRel, + IndexUniqueCheck checkUnique, +#if PG_VERSION_NUM>=140000 + bool indexUnchanged, +#endif + IndexInfo *indexInfo) +{ + return false; +} + +Datum +lsm3_btree_wrapper(PG_FUNCTION_ARGS) +{ + IndexAmRoutine *amroutine = makeNode(IndexAmRoutine); + + amroutine->amstrategies = BTMaxStrategyNumber; + amroutine->amsupport = BTNProcs; + amroutine->amoptsprocnum = BTOPTIONS_PROC; + amroutine->amcanorder = true; + amroutine->amcanorderbyop = false; + amroutine->amcanbackward = true; + amroutine->amcanunique = false; + amroutine->amcanmulticol = true; + amroutine->amoptionalkey = true; + amroutine->amsearcharray = true; + amroutine->amsearchnulls = true; + amroutine->amstorage = false; + amroutine->amclusterable = true; + amroutine->ampredlocks = true; + amroutine->amcanparallel = false; + amroutine->amcaninclude = true; + amroutine->amusemaintenanceworkmem = false; + amroutine->amparallelvacuumoptions = 0; + amroutine->amkeytype = InvalidOid; + + amroutine->ambuild = lsm3_build_empty; + amroutine->ambuildempty = btbuildempty; + amroutine->aminsert = lsm3_dummy_insert; + amroutine->ambulkdelete = btbulkdelete; + amroutine->amvacuumcleanup = btvacuumcleanup; + amroutine->amcanreturn = btcanreturn; + amroutine->amcostestimate = btcostestimate; + amroutine->amoptions = lsm3_options; + amroutine->amproperty = btproperty; + amroutine->ambuildphasename = btbuildphasename; + amroutine->amvalidate = btvalidate; + amroutine->ambeginscan = btbeginscan; + amroutine->amrescan = btrescan; + amroutine->amgettuple = btgettuple; + amroutine->amgetbitmap = btgetbitmap; + amroutine->amendscan = btendscan; + amroutine->ammarkpos = NULL; + amroutine->amrestrpos = NULL; + amroutine->amestimateparallelscan = NULL; + amroutine->aminitparallelscan = NULL; + amroutine->amparallelrescan = NULL; + + PG_RETURN_POINTER(amroutine); +} + +/* + * Utulity hook handling creation of Lsm3 indexes + */ +static void +lsm3_process_utility(PlannedStmt *plannedStmt, + const char *queryString, + bool readOnlyTree, + ProcessUtilityContext context, + ParamListInfo paramListInfo, + QueryEnvironment *queryEnvironment, + DestReceiver *destReceiver, + QueryCompletion *completionTag + ) +{ + Node *parseTree = plannedStmt->utilityStmt; + DropStmt* drop = NULL; + ObjectAddresses *drop_objects = NULL; + List* drop_oids = NULL; + ListCell* cell; + + Lsm3Entries = NULL; /* Reset entry to check it after utility statement execution */ + Lsm3InsideCopy = false; + if (IsA(parseTree, DropStmt)) + { + drop = (DropStmt*)parseTree; + if (drop->removeType == OBJECT_INDEX) + { + foreach (cell, drop->objects) + { + RangeVar* rv = makeRangeVarFromNameList((List *) lfirst(cell)); + Relation index = relation_openrv(rv, ExclusiveLock); + if (index->rd_indam->ambuild == lsm3_build) + { + Lsm3DictEntry* entry = lsm3_get_entry(index); + if (drop_objects == NULL) + { + drop_objects = new_object_addresses(); + } + for (int i = 0; i < 2; i++) + { + if (entry->top[i]) + { + ObjectAddress obj; + obj.classId = RelationRelationId; + obj.objectId = entry->top[i]; + obj.objectSubId = 0; + add_exact_object_address(&obj, drop_objects); + } + } + drop_oids = lappend_oid(drop_oids, RelationGetRelid(index)); + } + relation_close(index, ExclusiveLock); + } + } + } + else if (IsA(parseTree, CopyStmt)) + { + Lsm3InsideCopy = true; + } + + (PreviousProcessUtilityHook ? PreviousProcessUtilityHook : standard_ProcessUtility) + (plannedStmt, + queryString, + readOnlyTree, + context, + paramListInfo, + queryEnvironment, + destReceiver, + completionTag); + + if (Lsm3Entries) + { + foreach (cell, Lsm3Entries) + { + Lsm3DictEntry* entry = (Lsm3DictEntry*)lfirst(cell); + Oid top_index[2]; + if (IsA(parseTree, IndexStmt)) /* This is Lsm3 creation statement */ + { + IndexStmt* stmt = (IndexStmt*)parseTree; + char* originIndexName = stmt->idxname; + char* originAccessMethod = stmt->accessMethod; + + for (int i = 0; i < 2; i++) + { + if (stmt->concurrent) + { + PushActiveSnapshot(GetTransactionSnapshot()); + } + stmt->accessMethod = "lsm3_btree_wrapper"; + stmt->idxname = psprintf("%s_top%d", get_rel_name(entry->base), i); + top_index[i] = DefineIndex(entry->heap, + stmt, + InvalidOid, + InvalidOid, + InvalidOid, +#if PG_VERSION_NUM>=160000 + -1, +#endif + false, + false, + false, + false, + true).objectId; + } + stmt->accessMethod = originAccessMethod; + stmt->idxname = originIndexName; + } + else + { + for (int i = 0; i < 2; i++) + { + top_index[i] = entry->top[i]; + if (top_index[i] == InvalidOid) + { + char* topidxname = psprintf("%s_top%d", get_rel_name(entry->base), i); + top_index[i] = get_relname_relid(topidxname, get_rel_namespace(entry->base)); + if (top_index[i] == InvalidOid) + { + elog(ERROR, "Lsm3: failed to lookup %s index", topidxname); + } + } + } + } + if (ActiveSnapshotSet()) + { + PopActiveSnapshot(); + } + CommitTransactionCommand(); + StartTransactionCommand(); + /* Mark top index as invalid to prevent planner from using it in queries */ + for (int i = 0; i < 2; i++) + { + index_set_state_flags(top_index[i], INDEX_DROP_CLEAR_VALID); + } + SpinLockAcquire(&entry->spinlock); + for (int i = 0; i < 2; i++) + { + entry->top[i] = top_index[i]; + } + SpinLockRelease(&entry->spinlock); + { + Relation index = index_open(entry->base, AccessShareLock); + index->rd_rel->relam = entry->am_id; + index_close(index, AccessShareLock); + } + } + list_free(Lsm3Entries); + Lsm3Entries = NULL; + } + else if (drop_objects) + { + performMultipleDeletions(drop_objects, drop->behavior, 0); + LWLockAcquire(Lsm3DictLock, LW_EXCLUSIVE); + foreach (cell, drop_oids) + { + hash_search(Lsm3Dict, &lfirst_oid(cell), HASH_REMOVE, NULL); + } + LWLockRelease(Lsm3DictLock); + } +} + +/* + * Executor finish hook to reclaim released locks on non-active top indexes + * to avoid "you don't own a lock of type RowExclusiveLock" warning + */ +static void +lsm3_executor_finish(QueryDesc *queryDesc) +{ + lsm3_reacquire_locks(); + Lsm3InsideCopy = false; + if (PreviousExecutorFinish) + PreviousExecutorFinish(queryDesc); + else + standard_ExecutorFinish(queryDesc); + +} + + +void +_PG_init(void) +{ + if (!process_shared_preload_libraries_in_progress) + { + elog(ERROR, "Lsm3: this extension should be loaded via shared_preload_libraries"); + } + DefineCustomIntVariable("lsm3.top_index_size", + "Size of top index B-Tree (kb)", + NULL, + &Lsm3TopIndexSize, + 64*1024, + BLCKSZ/1024, + INT_MAX, + PGC_SIGHUP, + GUC_UNIT_KB, + NULL, + NULL, + NULL); + + DefineCustomIntVariable("lsm3.max_indexes", + "Maximal number of Lsm3 indexes.", + NULL, + &Lsm3MaxIndexes, + 1024, + 1, + INT_MAX, + PGC_POSTMASTER, + 0, + NULL, + NULL, + NULL); + + Lsm3ReloptKind = add_reloption_kind(); + + add_bool_reloption(Lsm3ReloptKind, "unique", + "Index contains no duplicates", + false, AccessExclusiveLock); + add_int_reloption(Lsm3ReloptKind, "top_index_size", + "Size of top index (kb)", + 0, 0, INT_MAX, AccessExclusiveLock); + add_int_reloption(Lsm3ReloptKind, "fillfactor", + "Packs btree index pages only to this percentage", + BTREE_DEFAULT_FILLFACTOR, BTREE_MIN_FILLFACTOR, 100, ShareUpdateExclusiveLock); + add_real_reloption(Lsm3ReloptKind, "vacuum_cleanup_index_scale_factor", + "Packs btree index pages only to this percentage", + -1, 0.0, 1e10, ShareUpdateExclusiveLock); + add_bool_reloption(Lsm3ReloptKind, "deduplicate_items", + "Enables \"deduplicate items\" feature for this btree index", + true, AccessExclusiveLock); + + PreviousShmemStartupHook = shmem_startup_hook; + shmem_startup_hook = lsm3_shmem_startup; + +#if PG_VERSION_NUM>=150000 + PreviousShmemRequestHook = shmem_request_hook; + shmem_request_hook = lsm3_shmem_request; +#else + lsm3_shmem_request(); +#endif + + PreviousProcessUtilityHook = ProcessUtility_hook; + ProcessUtility_hook = lsm3_process_utility; + + PreviousExecutorFinish = ExecutorFinish_hook; + ExecutorFinish_hook = lsm3_executor_finish; +} + +Datum +lsm3_get_merge_count(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + Relation index = index_open(relid, AccessShareLock); + Lsm3DictEntry* entry = lsm3_get_entry(index); + index_close(index, AccessShareLock); + if (entry == NULL) + PG_RETURN_NULL(); + else + PG_RETURN_INT64(entry->n_merges); +} + + +Datum +lsm3_start_merge(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + Relation index = index_open(relid, AccessShareLock); + Lsm3DictEntry* entry = lsm3_get_entry(index); + index_close(index, AccessShareLock); + + SpinLockAcquire(&entry->spinlock); + if (!entry->merge_in_progress) + { + entry->merge_in_progress = true; + entry->active_index ^= 1; + entry->n_merges += 1; + if (entry->access_count[1-entry->active_index] == 0) + { + entry->start_merge = true; + if (entry->merger == NULL) /* lazy start of bgworker */ + { + lsm3_launch_bgworker(entry); + } + SetLatch(&entry->merger->procLatch); + } + } + SpinLockRelease(&entry->spinlock); + PG_RETURN_NULL(); +} + +Datum +lsm3_wait_merge_completion(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + Relation index = index_open(relid, AccessShareLock); + Lsm3DictEntry* entry = lsm3_get_entry(index); + index_close(index, AccessShareLock); + + while (entry->merge_in_progress) + { + pg_usleep(1000000); /* one second */ + } + PG_RETURN_NULL(); +} + +Datum +lsm3_top_index_size(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + Relation index = index_open(relid, AccessShareLock); + Lsm3DictEntry* entry = lsm3_get_entry(index); + index_close(index, AccessShareLock); + PG_RETURN_INT64((uint64)lsm3_get_index_size(lsm3_get_index_size(entry->top[entry->active_index]))*BLCKSZ); +} diff --git a/pgxn/lsm3/lsm3.conf b/pgxn/lsm3/lsm3.conf new file mode 100644 index 0000000000..c35f7e5e31 --- /dev/null +++ b/pgxn/lsm3/lsm3.conf @@ -0,0 +1,2 @@ +shared_preload_libraries = 'lsm3' +lsm3.top_index_size=1MB diff --git a/pgxn/lsm3/lsm3.control b/pgxn/lsm3/lsm3.control new file mode 100644 index 0000000000..fd7d81276f --- /dev/null +++ b/pgxn/lsm3/lsm3.control @@ -0,0 +1,4 @@ +comment = 'Lsm3 index' +default_version = '1.0' +module_pathname = '$libdir/lsm3' +relocatable = true diff --git a/pgxn/lsm3/lsm3.h b/pgxn/lsm3/lsm3.h new file mode 100644 index 0000000000..95f77bc450 --- /dev/null +++ b/pgxn/lsm3/lsm3.h @@ -0,0 +1,53 @@ +/* + * It is too expensive to check index size at each insert because it requires traverse of all index file segments and calling lseek for each. + * But we do not need precise size, so it is enough to do it at each n-th insert. The lagest B-Tree key size is abut 2kb, + * so with N=64K in the worst case error will be less than 128Mb and for 32-bit key just 1Mb. + */ +#define LSM3_CHECK_TOP_INDEX_SIZE_PERIOD (64*1024) /* should be power of two */ + +/* + * Control structure for Lsm3 index located in shared memory + */ +typedef struct +{ + Oid base; /* Oid of base index */ + Oid heap; /* Oid of indexed relation */ + Oid top[2]; /* Oids of two top indexes */ + int access_count[2]; /* Access counter for top indexes */ + int active_index; /* Index used for insert */ + uint64 n_merges; /* Number of performed merges since database open */ + uint64 n_inserts; /* Number of performed inserts since database open */ + volatile bool start_merge; /* Start merging of top index with base index */ + volatile bool merge_in_progress; /* Overflow of top index intiate merge process */ + PGPROC* merger; /* Merger background worker */ + Oid db_id; /* user ID (for background worker) */ + Oid user_id; /* database Id (for background worker) */ + Oid am_id; /* Lsm3 AM Oid */ + int top_index_size; /* Size of top index */ + slock_t spinlock; /* Spinlock to synchronize access */ +} Lsm3DictEntry; + +/* + * Opaque part of index scan descriptor + */ +typedef struct +{ + Lsm3DictEntry* entry; /* Lsm3 control structure */ + Relation top_index[2]; /* Opened top index relations */ + SortSupport sortKeys; /* Context for comparing index tuples */ + IndexScanDesc scan[3]; /* Scan descriptors for two top indexes and base index */ + bool eof[3]; /* Indicators that end of index was reached */ + bool unique; /* Whether index is "unique" and we can stop scan after locating first occurrence */ + int curr_index; /* Index from which last tuple was selected (or -1 if none) */ +} Lsm3ScanOpaque; + +/* Lsm3 index options */ +typedef struct +{ + BTOptions nbt_opts; /* Standard B-Tree options */ + int top_index_size; /* Size of top index (overrode lsm3.top_index_size GUC */ + bool unique; /* Index may not contain duplicates. We prohibit unique constraint for Lsm3 index + * because it can not be enforced. But presence of this index option allows to optimize + * index lookup: if key is found in active top index, do not search other two indexes. + */ +} Lsm3Options; diff --git a/pgxn/lsm3/sql/test.sql b/pgxn/lsm3/sql/test.sql new file mode 100644 index 0000000000..57c38da3cc --- /dev/null +++ b/pgxn/lsm3/sql/test.sql @@ -0,0 +1,59 @@ +create extension lsm3; + +create table t(k bigint, val bigint); +create index lsm3_index on t using lsm3(k); + +set enable_seqscan=off; + +insert into t values (1,10); +select lsm3_start_merge('lsm3_index'); +select lsm3_wait_merge_completion('lsm3_index'); +insert into t values (2,20); +select lsm3_start_merge('lsm3_index'); +select lsm3_wait_merge_completion('lsm3_index'); +insert into t values (3,30); +select lsm3_start_merge('lsm3_index'); +select lsm3_wait_merge_completion('lsm3_index'); +insert into t values (4,40); +select lsm3_start_merge('lsm3_index'); +select lsm3_wait_merge_completion('lsm3_index'); +insert into t values (5,50); +select lsm3_start_merge('lsm3_index'); +select lsm3_wait_merge_completion('lsm3_index'); +select lsm3_get_merge_count('lsm3_index'); +select * from t where k = 1; +select * from t order by k; +select * from t order by k desc; +analyze t; +explain (COSTS OFF, TIMING OFF, SUMMARY OFF) select * from t order by k; + +insert into t values (generate_series(1,100000), 1); +insert into t values (generate_series(1000001,200000), 2); +insert into t values (generate_series(2000001,300000), 3); +insert into t values (generate_series(1,100000), 1); +insert into t values (generate_series(1000001,200000), 2); +insert into t values (generate_series(2000001,300000), 3); +select * from t where k = 1; +select * from t where k = 1000000; +select * from t where k = 2000000; +select * from t where k = 3000000; +analyze t; +explain (COSTS OFF, TIMING OFF, SUMMARY OFF) select * from t where k = 1; +select lsm3_get_merge_count('lsm3_index') > 5; + +truncate table t; +insert into t values (generate_series(1,1000000), 1); +select * from t where k = 1; + +reindex table t; +select * from t where k = 1; + +drop table t; + +create table lsm(k bigint); +insert into lsm values (generate_series(1, 1000000)); +create index concurrently on lsm using lsm3(k); +select * from lsm where k = 1; + +drop table lsm; +