由于用户的表示是一个高度特定于应用程序的概念,所以此处将它留下作为您的一个练习,以练习如何将这些消息关联到任何类型的用户系统。一种可能的方法是,添加外键引用到包含聊天各方 ID 的chats表中。对于单人对单人的简单聊天,这可能就是简单的两个列,对于多方聊天,您必须给出一个将聊天与参与者相关联的多对多连接表。
BEGIN; CREATESCHEMA app;
CREATETABLE app.chats( id bigserial, created_at timestamptzNOTNULLDEFAULT now(), PRIMARY KEY (id, created_at) -- the partition column must be part of pk ) PARTITION BY RANGE (created_at);
CREATEINDEX "chats_created_at" ON app.chats (created_at);
CREATETABLE app.chat_messages( id bigserial, created_at timestamptzNOTNULL, chat_id bigintNOTNULL, chat_created_at timestamptzNOTNULL, message textNOTNULL, PRIMARY KEY (id, created_at), FOREIGN KEY (chat_id, chat_created_at) -- multicolumn fk to ensure REFERENCES app.chats(id, created_at) ) PARTITION BY RANGE (created_at);
CREATEINDEX "chat_messages_created_at" ON app.chat_messages (created_at); -- -- need this index on the fk source to lookup messages by parent -- CREATEINDEX "chat_messages_chat_id_chat_created_at" ON app.chat_messages (chat_id, chat_created_at);
在这个模型中,聊天消息按其创建日期进行分区,这可以在PARTITION BY RANGE子句中看到。我们将为每天创建一个新分区,这意味着每天都会为 “当日的” 聊天消息创建一个新表。如果不进行分区,每天对表的查询会越来越慢,但通过分区,查询时间将保持一致。chats和chat_messages表都会按天分区。这使得模型相当简单,但有一个问题,聊天可能会持续多天,因此如果您想保持数据的一致性,请记住这一点。
请注意子分区是使用LIKE子句来创建的,这让它们可以获得和父表一样的结构,其中的INCLUDING DEFAULTS INCLUDING CONSTRAINTS子句意味着子表会获得和父表一样的默认值和约束(如外键)。
-- -- Function creates a chats partition for the given day argument -- CREATE OR REPLACE PROCEDURE app.create_chats_partition(partition_day date) LANGUAGE plpgsql AS $$ BEGIN EXECUTE format( $i$ CREATE TABLE IF NOT EXISTS app."chats_%1$s" (LIKE app.chats INCLUDING DEFAULTS INCLUDING CONSTRAINTS); $i$, partition_day); END; $$;
下面有一些重要的细节,主键是在加载数据后添加的,这样可以尽快地为这些键创建好索引。此外,如上所述,向子表添加了一个CHECK约束,以确保它们不会违反父表PARTITION BY RANGE的约束,从而避免在附加到父表时对父表进行任何不必要的EXCLUSIVE ACCESS锁定。这种类型的锁定会导致业务中断,因为它会阻塞对表的所有访问,包括SELECT查询。对于可能需要几个小时才能完成复制的数亿行的数据量,您承受不了应用程序长时间的中断。
最后,使用ATTACH PARTITION ... FOR VALUES FROM TO 将新的子分区表附加到父表。由于已知所有约束都使用了主键和CHECK约束进行过验证,因此这可以在不用锁定的情况下完成。接下来,新创建的索引也用ALTER INDEX ... ATTACH PARTITION附加到了父表。最后,对避免表锁定非常有用的CHECK约束现在不再需要了,因此可以使用ALTER TABLE ... DROP CONSTRAINT将它们删除。
CREATE OR REPLACE PROCEDURE app.index_and_attach_chats_partition(partition_day date) LANGUAGE plpgsql AS $$ BEGIN EXECUTE format( $i$ -- now that any bulk data is loaded, setup the new partition table's pks ALTER TABLE app."chats_%1$s" ADD PRIMARY KEY (id, created_at);
-- adding these check constraints means postgres can -- attach partitions without locking and having to scan them. ALTER TABLE app."chats_%1$s" ADD CONSTRAINT "chats_partition_by_range_check_%1$s" CHECK ( created_at >= DATE %1$L AND created_at < DATE %2$L );
-- add more partition indexes here if necessary CREATE INDEX "chats_%1$s_created_at" ON app."chats_%1$s" USING btree(created_at) WITH (fillfactor=100);
-- by "attaching" the new tables and indexes *after* the pk, -- indexing and check constraints verify all rows, -- no scan checks or locks are necessary, attachment is very fast, -- and queries to parent are not blocked. ALTER TABLE app.chats ATTACH PARTITION app."chats_%1$s" FOR VALUES FROM (%1$L) TO (%2$L);
-- You now also "attach" any indexes you made at this point ALTER INDEX app."chats_created_at" ATTACH PARTITION app."chats_%1$s_created_at";
-- Dropping the now unnecessary check constraint they were just needed -- to prevent the attachment from forcing a scan to do the same check ALTER TABLE app."chats_%1$s" DROP CONSTRAINT "chats_partition_by_range_check_%1$s"; $i$, partition_day, (partition_day + interval '1 day')::date); END; $$;
-- -- Function copies one day's worth of chats rows from old "large" -- table new partition. Note that the copied data is ordered by -- created_at, this improves block cache density. -- CREATE OR REPLACE PROCEDURE app.copy_chats_partition(partition_day date) LANGUAGE plpgsql AS $$ DECLARE num_copied bigint = 0; BEGIN EXECUTE format( $i$ INSERT INTO app."chats_%1$s" (id, created_at) SELECT id, created_at FROM chats WHERE created_at::date >= %1$L::date AND created_at::date < (%1$L::date + interval '1 day') ORDER BY created_at $i$, partition_day); GET DIAGNOSTICS num_copied = ROW_COUNT; RAISE NOTICE 'Copied % rows to %', num_copied, format('app."chats_%1$s"', partition_day); END; $$;
-- -- Wrapper function to create, copy, index and attach a given day. -- CREATE OR REPLACE PROCEDURE app.load_chats_partition(i date) LANGUAGE plpgsql AS $$ BEGIN CALL app.create_chats_partition(i); CALL app.copy_chats_partition(i); CALL app.index_and_attach_chats_partition(i); END; $$;
-- -- This procedure loops over all days in the old table, copying each day -- and then committing the transaction. -- CREATE OR REPLACE PROCEDURE app.load_chats_partitions() LANGUAGE plpgsql AS $$ DECLARE start_date date; end_date date; i date; BEGIN SELECT min(created_at)::date INTO start_date FROM chats; SELECT max(created_at)::date INTO end_date FROM chats; FOR i IN SELECT * FROM generate_series(end_date, start_date, interval '-1 day') LOOP CALL app.load_chats_partition(i); COMMIT; END LOOP; END; $$;
-- -- This procedure will be used by pg_cron to create both new -- partitions for "today". -- CREATEOR REPLACEPROCEDURE app.create_daily_partition(today date) LANGUAGE plpgsql AS $$ BEGIN CALL app.create_chats_partition(today); CALL app.create_chat_messages_partition(today); END; $$;