diff --git a/src/common/backend/catalog/storage.cpp b/src/common/backend/catalog/storage.cpp index 922cbda6b013189f99e008636ca0df09eba41093..ede441c0ddb03596b7eeea177a7ea5f7e985f995 100644 --- a/src/common/backend/catalog/storage.cpp +++ b/src/common/backend/catalog/storage.cpp @@ -327,6 +327,9 @@ void log_smgrcreate(RelFileNode* rnode, ForkNumber forkNum) XLogBeginInsert(); XLogRegisterData((char*)&xlrec, (int)size); XLogInsert(RM_SMGR_ID, info, rnode->bucketNode); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } static void CStoreRelDropStorage(Relation rel, RelFileNode* rnode, Oid ownerid) @@ -692,6 +695,9 @@ void RelationTruncate(Relation rel, BlockNumber nblocks, TransactionId latest_re XLogRegisterData((char*)&xlrec, (int)size); lsn = XLogInsert(RM_SMGR_ID, info, rel->rd_node.bucketNode); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif /* * Flush, because otherwise the truncation of the main relation might @@ -781,6 +787,9 @@ void PartitionTruncate(Relation parent, Partition part, BlockNumber nblocks, Tra XLogRegisterData((char*)&xlrec, redoSize); lsn = XLogInsert(RM_SMGR_ID, info, part->pd_node.bucketNode); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif /* * Flush, because otherwise the truncation of the main relation might diff --git a/src/common/backend/pgxc_single/barrier/barrier.cpp b/src/common/backend/pgxc_single/barrier/barrier.cpp index eef9408f1e8ad18f24c0b97a3bcdcdd325f8de42..43a35f2230c4f6fb9f45e2aafff975b43a7f4008 100755 --- a/src/common/backend/pgxc_single/barrier/barrier.cpp +++ b/src/common/backend/pgxc_single/barrier/barrier.cpp @@ -150,6 +150,9 @@ void ProcessCreateBarrierCommit(const char* id) XLogRegisterData((char*)id, strlen(id) + 1); XLogRecPtr recptr = XLogInsert(RM_BARRIER_ID, XLOG_BARRIER_COMMIT, InvalidBktId); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif XLogWaitFlush(recptr); @@ -217,6 +220,9 @@ void ProcessCreateBarrierExecute(const char* id, bool isSwitchoverBarrier) } recptr = XLogInsert(RM_BARRIER_ID, isSwitchoverBarrier? XLOG_BARRIER_SWITCHOVER : XLOG_BARRIER_CREATE, InvalidBktId); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif XLogWaitFlush(recptr); if (IS_CSN_BARRIER(id) && !isSwitchoverBarrier) { @@ -400,6 +406,9 @@ void DisasterRecoveryRequestBarrier(const char* id, bool isSwitchoverBarrier) XLogRegisterData((char*)id, strlen(id) + 1); recptr = XLogInsert(RM_BARRIER_ID, XLOG_BARRIER_CREATE, InvalidBktId); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif XLogWaitFlush(recptr); #ifndef ENABLE_LITE_MODE if (t_thrd.role == BARRIER_CREATOR) { @@ -441,6 +450,9 @@ void CreateHadrSwitchoverBarrier() XLogRegisterData((char*)barrier_id, strlen(barrier_id) + 1); recptr = XLogInsert(RM_BARRIER_ID, XLOG_BARRIER_SWITCHOVER, InvalidBktId); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif XLogWaitFlush(recptr); g_instance.streaming_dr_cxt.switchoverBarrierLsn = recptr; @@ -860,6 +872,9 @@ static void ExecuteBarrier(const char* id, bool isSwitchoverBarrier) } recptr = XLogInsert(RM_BARRIER_ID, isSwitchoverBarrier? XLOG_BARRIER_SWITCHOVER : XLOG_BARRIER_CREATE, InvalidBktId); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif XLogWaitFlush(recptr); if (IS_CSN_BARRIER(id) && !isSwitchoverBarrier) { @@ -932,6 +947,9 @@ static void CommitBarrier(PGXCNodeAllHandles* prepared_handles, const char* id) XLogRegisterData((char*)id, strlen(id) + 1); XLogRecPtr recptr = XLogInsert(RM_BARRIER_ID, XLOG_BARRIER_COMMIT, InvalidBktId); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif XLogWaitFlush(recptr); } diff --git a/src/common/backend/utils/cache/relmapper.cpp b/src/common/backend/utils/cache/relmapper.cpp index f8e48237049b095506622c6094ca772e20d94710..c9f8bc68212a72d94963d395ad507463f05938e6 100644 --- a/src/common/backend/utils/cache/relmapper.cpp +++ b/src/common/backend/utils/cache/relmapper.cpp @@ -824,6 +824,9 @@ static void write_relmap_file(bool shared, RelMapFile* newmap, bool write_wal, b XLogRegisterData((char*)(&xlrec), MinSizeOfRelmapUpdate); RegistRelMapWal(newmap); lsn = XLogInsert(RM_RELMAP_ID, XLOG_RELMAP_UPDATE); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif /* As always, WAL must hit the disk before the data update does */ XLogWaitFlush(lsn); diff --git a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp index e0c1f2b648b9808c909f1df4bc03049e47b1ba6d..f60b68d64750fa47301bafa64f0793720248ecce 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp @@ -1255,6 +1255,11 @@ static int32 CBProcessBroadcast(void *db_handle, dms_broadcast_context_t *broad_ case BCAST_RELOAD_REFORM_CTRL_PAGE: ret = SSReloadReformCtrlPage(len); break; +#ifdef ENABLE_SS_MULTIMASTER + case BCAST_REDO_DONE: + ret = SSUpdateArgs(len); + break; +#endif default: ereport(WARNING, (errmodule(MOD_DMS), errmsg("invalid broadcast operate type"))); ret = DMS_ERROR; diff --git a/src/gausskernel/ddes/adapter/ss_transaction.cpp b/src/gausskernel/ddes/adapter/ss_transaction.cpp index ff58921258383fdc68c78f6b1dfae694a6e9b259..c2fb99c89291cb3822498b2c9e4b823fe79dd7ac 100644 --- a/src/gausskernel/ddes/adapter/ss_transaction.cpp +++ b/src/gausskernel/ddes/adapter/ss_transaction.cpp @@ -653,6 +653,18 @@ int SSReloadReformCtrlPage(uint32 len) return DMS_SUCCESS; } +#ifdef ENABLE_SS_MULTIMASTER +int SSUpdateArgs(uint32 len) +{ + if (unlikely(len != sizeof(SSBroadcastCmdOnly))) { + return DMS_ERROR; + } + + g_instance.dms_cxt.SSRecoveryInfo.redo_done = true; + return DMS_SUCCESS; +} +#endif + int SSCheckDbBackends(char *data, uint32 len, char *output_msg, uint32 *output_msg_len) { if (unlikely(len != sizeof(SSBroadcastDbBackends))) { @@ -722,6 +734,26 @@ void SSRequestAllStandbyReloadReformCtrlPage() } while (ret != DMS_SUCCESS); } +#ifdef ENABLE_SS_MULTIMASTER +void SSRequestAllNodeUpdateArgs() +{ + dms_context_t dms_ctx; + InitDmsContext(&dms_ctx); + int ret; + SSBroadcastCmdOnly ssmsg; + ssmsg.type = BCAST_REDO_DONE; + do { + ret = dms_broadcast_msg(&dms_ctx, (char *)&ssmsg, sizeof(SSBroadcastCmdOnly), + (unsigned char)false, SS_BROADCAST_WAIT_ONE_SECOND); + + if (ret == DMS_SUCCESS) { + return; + } + pg_usleep(5000L); + } while (ret != DMS_SUCCESS); +} +#endif + void SSSendSharedInvalidMessages(const SharedInvalidationMessage *msgs, int n) { dms_context_t dms_ctx; diff --git a/src/gausskernel/optimizer/commands/dbcommands.cpp b/src/gausskernel/optimizer/commands/dbcommands.cpp index 45d0a25bc56048439a582e6b60dbe163c9faac98..dc6938aa94e5447478602a440957a145165d0435 100644 --- a/src/gausskernel/optimizer/commands/dbcommands.cpp +++ b/src/gausskernel/optimizer/commands/dbcommands.cpp @@ -686,6 +686,9 @@ Oid createdb(const CreatedbStmt* stmt) XLogRegisterData((char*)&xlrec, sizeof(xl_dbase_create_rec)); (void)XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } } @@ -709,6 +712,9 @@ Oid createdb(const CreatedbStmt* stmt) XLogBeginInsert(); XLogRegisterData((char*)&xlrec, sizeof(xl_dbase_create_rec)); (void)XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif pfree_ext(srcpath); pfree_ext(dstpath); @@ -1030,6 +1036,9 @@ static void DropdbXactCallback(bool isCommit, const void* arg) XLogRegisterData((char*)&xlrec, sizeof(xl_dbase_drop_rec)); (void)XLogInsert(RM_DBASE_ID, XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } pfree_ext(dstpath); @@ -1588,6 +1597,9 @@ static void movedb(const char* dbname, const char* tblspcname) XLogRegisterData((char*)&xlrec, sizeof(xl_dbase_create_rec)); (void)XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } /* @@ -1711,6 +1723,9 @@ static void movedb_success_callback(Oid db_id, Oid src_tblspcoid) XLogRegisterData((char*)&xlrec, sizeof(xl_dbase_drop_rec)); (void)XLogInsert(RM_DBASE_ID, XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } /* Now it's safe to release the database lock */ @@ -2217,6 +2232,9 @@ static void remove_dbtablespaces(Oid db_id) XLogRegisterData((char*)&xlrec, sizeof(xl_dbase_drop_rec)); (void)XLogInsert(RM_DBASE_ID, XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } pfree_ext(dstpath); diff --git a/src/gausskernel/optimizer/commands/sequence/sequence.cpp b/src/gausskernel/optimizer/commands/sequence/sequence.cpp index 36543f4a0a9794fd25a1076257a36399f54d81e4..02012ce6d2258d4473fbbd52af4f7fb842780139 100644 --- a/src/gausskernel/optimizer/commands/sequence/sequence.cpp +++ b/src/gausskernel/optimizer/commands/sequence/sequence.cpp @@ -542,7 +542,11 @@ static int64 GetNextvalGlobal(SeqTable sess_elm, Relation seqrel) /* forced log to satisfy local demand for values */ log = fetch + SEQ_LOG_VALS; } else { +#ifdef ENABLE_SS_MULTIMASTER + XLogRecPtr redoptr = t_thrd.shemem_ptr_cxt.XLogCtl->logicRedoRecPtr; +#else XLogRecPtr redoptr = GetRedoRecPtr(); +#endif if (XLByteLE(PageGetLSN(page), redoptr)) { /* last update of seq was before checkpoint */ @@ -601,7 +605,11 @@ static int128 GetNextvalLocal(SeqTable elm, Relation seqrel) fetch = log = fetch + SEQ_LOG_VALS; logit = true; } else { +#ifdef ENABLE_SS_MULTIMASTER + XLogRecPtr redoptr = t_thrd.shemem_ptr_cxt.XLogCtl->logicRedoRecPtr; +#else XLogRecPtr redoptr = GetRedoRecPtr(); +#endif if (XLByteLE(PageGetLSN(page), redoptr)) { /* last update of seq was before checkpoint */ @@ -648,6 +656,9 @@ static int128 GetNextvalLocal(SeqTable elm, Relation seqrel) recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG, seqrel->rd_node.bucketNode); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } /* Now update sequence tuple to the intended final state */ @@ -1677,6 +1688,9 @@ void autoinc_setval(Oid relid, int128 next, bool iscalled) recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG, seqrel->rd_node.bucketNode); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -1843,6 +1857,9 @@ static void do_setval(Oid relid, int128 next, bool iscalled) recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG, seqrel->rd_node.bucketNode); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -2938,6 +2955,9 @@ static void updateNextValForSequence(Buffer buf, Form_pg_sequence seq, HeapTuple XLogRegisterData((char*)seqtuple.t_data, seqtuple.t_len); recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG, seqrel->rd_node.bucketNode); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } /* * We must mark the buffer dirty before doing XLogInsert(); see notes in diff --git a/src/gausskernel/optimizer/commands/sequence/sequence_util.cpp b/src/gausskernel/optimizer/commands/sequence/sequence_util.cpp index d3c095982ec3f002ab59ec75014fa75d75f32c51..1d72bafb9626de7560afd563d6a36dfee5ccdfc5 100644 --- a/src/gausskernel/optimizer/commands/sequence/sequence_util.cpp +++ b/src/gausskernel/optimizer/commands/sequence/sequence_util.cpp @@ -125,6 +125,9 @@ void fill_seq_with_data(Relation rel, HeapTuple tuple) recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG, rel->rd_node.bucketNode); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); diff --git a/src/gausskernel/optimizer/commands/tablecmds.cpp b/src/gausskernel/optimizer/commands/tablecmds.cpp index e411cb485da629d0db50d2563e37dc9f5a69a484..5ea09df82c9453d581a15775540f3eb93fafe26e 100755 --- a/src/gausskernel/optimizer/commands/tablecmds.cpp +++ b/src/gausskernel/optimizer/commands/tablecmds.cpp @@ -19682,6 +19682,9 @@ static void copy_relation_data(Relation rel, SMgrRelation* dstptr, ForkNumber fo errno_t rc = memcpy_s(BufferGetBlock(buf), BLCKSZ, page, BLCKSZ); securec_check(rc, "\0", "\0"); PageSetLSN(BufferGetPage(buf), xlog_ptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif MarkBufferDirty(buf); UnlockReleaseBuffer(buf); @@ -19693,6 +19696,9 @@ static void copy_relation_data(Relation rel, SMgrRelation* dstptr, ForkNumber fo */ if (use_wal) { log_newpage(&dst->smgr_rnode.node, forkNum, blkno, page, false, &tde_info); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } if (RelationisEncryptEnable(rel)) { @@ -19915,6 +19921,9 @@ static void mergeHeapBlock(Relation src, Relation dest, ForkNumber forkNum, char errno_t rc = memcpy_s(BufferGetBlock(buf), BLCKSZ, page, BLCKSZ); securec_check(rc, "\0", "\0"); PageSetLSN(BufferGetPage(buf), xlog_ptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif MarkBufferDirty(buf); UnlockReleaseBuffer(buf); } else { @@ -19925,6 +19934,9 @@ static void mergeHeapBlock(Relation src, Relation dest, ForkNumber forkNum, char RelationOpenSmgr(dest); if (use_wal) { log_newpage(&dest->rd_smgr->smgr_rnode.node, forkNum, dest_blkno, page, true, &tde_info); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } if (RelationisEncryptEnable(src)) { diff --git a/src/gausskernel/optimizer/commands/tablespace.cpp b/src/gausskernel/optimizer/commands/tablespace.cpp index a953be97ad26f1fff3ce8b9718c55fb963902fc1..d7e0fcbbaec581c1ef0190201a00dd5251456343 100644 --- a/src/gausskernel/optimizer/commands/tablespace.cpp +++ b/src/gausskernel/optimizer/commands/tablespace.cpp @@ -793,6 +793,9 @@ Oid CreateTableSpace(CreateTableSpaceStmt* stmt) * So We use different xlog info to mark relative */ (void)XLogInsert(RM_TBLSPC_ID, relative ? XLOG_TBLSPC_RELATIVE_CREATE : XLOG_TBLSPC_CREATE); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } /* @@ -1026,6 +1029,9 @@ void DropTableSpace(DropTableSpaceStmt* stmt) XLogRegisterData((char*)&xlrec, sizeof(xl_tblspc_drop_rec)); (void)XLogInsert(RM_TBLSPC_ID, XLOG_TBLSPC_DROP); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } /* diff --git a/src/gausskernel/optimizer/commands/vacuumlazy.cpp b/src/gausskernel/optimizer/commands/vacuumlazy.cpp index c34218a1f0fef2b177919b93ee755e1fea5c9f1a..5d86b0bed22c27f27a1a7096fc45880265a04cc4 100644 --- a/src/gausskernel/optimizer/commands/vacuumlazy.cpp +++ b/src/gausskernel/optimizer/commands/vacuumlazy.cpp @@ -1505,6 +1505,9 @@ static IndexBulkDeleteResult** lazy_scan_heap( changedMultiXid ? u_sess->cmd_cxt.MultiXactFrzLimit : InvalidMultiXactId, frozen, nfrozen); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); if (TransactionIdPrecedes(((HeapPageHeader)page)->pd_xid_base, u_sess->utils_cxt.RecentXmin)) { @@ -1522,6 +1525,9 @@ static IndexBulkDeleteResult** lazy_scan_heap( recptr = log_heap_invalid(onerel, buf, u_sess->cmd_cxt.FreezeLimit, invalid, ninvalid); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); if (TransactionIdPrecedes(((HeapPageHeader)page)->pd_xid_base, u_sess->utils_cxt.RecentXmin)) { @@ -1833,6 +1839,9 @@ static int lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer, i recptr = log_heap_clean(onerel, buffer, NULL, 0, NULL, 0, unused, uncnt, vacrelstats->latestRemovedXid, true); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); diff --git a/src/gausskernel/process/threadpool/knl_instance.cpp b/src/gausskernel/process/threadpool/knl_instance.cpp index afa7cdbdf372d71f83f13ca9e3ae7cf5d7d61bd8..cb8729315149c3dd0ce3ed2436dab83b3f6fa7ba 100755 --- a/src/gausskernel/process/threadpool/knl_instance.cpp +++ b/src/gausskernel/process/threadpool/knl_instance.cpp @@ -210,6 +210,9 @@ static void knl_g_dms_init(knl_g_dms_context *dms_cxt) dms_cxt->SSRecoveryInfo.recovery_trapped_in_page_request = false; dms_cxt->SSRecoveryInfo.dorado_sharestorage_inited = false; dms_cxt->SSRecoveryInfo.ondemand_recovery_pause_status = NOT_PAUSE; +#ifdef ENABLE_SS_MULTIMASTER + dms_cxt->SSRecoveryInfo.redo_done = false; +#endif dms_cxt->log_timezone = NULL; pg_atomic_init_u32(&dms_cxt->inDmsThreShmemInitCnt, 0); pg_atomic_init_u32(&dms_cxt->inProcExitCnt, 0); diff --git a/src/gausskernel/process/threadpool/knl_thread.cpp b/src/gausskernel/process/threadpool/knl_thread.cpp index a01fefe750af1447f2f304564dd6375c07c3e5e2..1a95ee6d1d2037dc13d2b19289edcfb0b73a177a 100755 --- a/src/gausskernel/process/threadpool/knl_thread.cpp +++ b/src/gausskernel/process/threadpool/knl_thread.cpp @@ -437,7 +437,7 @@ static void knl_t_xlog_init(knl_t_xlog_context* xlog_cxt) xlog_cxt->XactLastCommitEnd = InvalidXLogRecPtr; xlog_cxt->RedoRecPtr = InvalidXLogRecPtr; #ifdef ENABLE_SS_MULTIMASTER - xlog_cxt->RedoLogicLSN = 0; + xlog_cxt->logicRedoRecPtr = 0; #endif xlog_cxt->doPageWrites = false; xlog_cxt->RedoStartLSN = InvalidXLogRecPtr; diff --git a/src/gausskernel/storage/access/gin/ginbtree.cpp b/src/gausskernel/storage/access/gin/ginbtree.cpp index bb2a323e1d625a16f4e9fc6f099b6452e8ad9b59..dbe0503926a93122fc1c35f39757e3462baedab0 100644 --- a/src/gausskernel/storage/access/gin/ginbtree.cpp +++ b/src/gausskernel/storage/access/gin/ginbtree.cpp @@ -392,6 +392,9 @@ static bool ginPlaceToPage(GinBtree btree, GinBtreeStack *stack, void *insertdat if (BufferIsValid(childbuf)) { PageSetLSN(childpage, recptr); } +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -546,6 +549,9 @@ static bool ginPlaceToPage(GinBtree btree, GinBtreeStack *stack, void *insertdat if (BufferIsValid(childbuf)) { PageSetLSN(childpage, recptr); } +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); diff --git a/src/gausskernel/storage/access/gin/gindatapage.cpp b/src/gausskernel/storage/access/gin/gindatapage.cpp index 842999f73cad8e78f7c3315ca2571e7a859617ef..32e8e6ff1c64dad0732a2f691ea551c292eb1ae7 100644 --- a/src/gausskernel/storage/access/gin/gindatapage.cpp +++ b/src/gausskernel/storage/access/gin/gindatapage.cpp @@ -780,6 +780,9 @@ void ginVacuumPostingTreeLeaf(Relation indexrel, Buffer buffer, GinVacuumState * XLogRegisterBufData(0, leaf->walinfo, leaf->walinfolen); recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_VACUUM_DATA_LEAF_PAGE); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -1665,6 +1668,9 @@ BlockNumber createPostingTree(Relation index, ItemPointerData *items, uint32 nit recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_CREATE_PTREE); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } UnlockReleaseBuffer(buffer); diff --git a/src/gausskernel/storage/access/gin/ginfast.cpp b/src/gausskernel/storage/access/gin/ginfast.cpp index 0c2b6352cfb029799f264af3d53f684b032c8f99..76e7197aa99f594eca017ad4bccf19ebac96951c 100644 --- a/src/gausskernel/storage/access/gin/ginfast.cpp +++ b/src/gausskernel/storage/access/gin/ginfast.cpp @@ -125,6 +125,9 @@ static int32 writeListPage(Relation index, Buffer buffer, IndexTuple *tuples, in recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } /* get free space before releasing buffer */ @@ -387,6 +390,9 @@ void ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector) if (buffer != InvalidBuffer) { PageSetLSN(page, recptr); } +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } if (buffer != InvalidBuffer) @@ -557,6 +563,9 @@ static void shiftList(Relation index, Buffer metabuffer, BlockNumber newHead, bo page = BufferGetPage(buffers[i]); PageSetLSN(page, recptr); } +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } for (i = 0; i < data.ndeleted; i++) diff --git a/src/gausskernel/storage/access/gin/gininsert.cpp b/src/gausskernel/storage/access/gin/gininsert.cpp index fcfa4e0d96b62046c171d509ab7d2e86e79da7de..b95fafa605104c857c562e8e0a745abd0f6e6c71 100644 --- a/src/gausskernel/storage/access/gin/gininsert.cpp +++ b/src/gausskernel/storage/access/gin/gininsert.cpp @@ -335,6 +335,9 @@ static void buildInitialize(Relation index, GinBuildState *buildstate) page = BufferGetPage(MetaBuffer); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } UnlockReleaseBuffer(MetaBuffer); diff --git a/src/gausskernel/storage/access/gin/ginutil.cpp b/src/gausskernel/storage/access/gin/ginutil.cpp index 45fb22595d8c22d843593ea1ae818b56cac75f82..37542611adb437f92a36cbb1fb128de655186186 100644 --- a/src/gausskernel/storage/access/gin/ginutil.cpp +++ b/src/gausskernel/storage/access/gin/ginutil.cpp @@ -590,6 +590,9 @@ void ginUpdateStats(Relation index, const GinStatsData *stats) XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT); recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE); PageSetLSN(metapage, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } UnlockReleaseBuffer(metabuffer); END_CRIT_SECTION(); diff --git a/src/gausskernel/storage/access/gin/ginvacuum.cpp b/src/gausskernel/storage/access/gin/ginvacuum.cpp index a7a0d3dddf84b2a8bf85f815008a636df89bb1f7..84d36f922fc6b45c7e949dd5306bfa8ae6adcabd 100644 --- a/src/gausskernel/storage/access/gin/ginvacuum.cpp +++ b/src/gausskernel/storage/access/gin/ginvacuum.cpp @@ -98,6 +98,9 @@ static void xlogVacuumPage(Relation index, Buffer buffer) recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_VACUUM_PAGE); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } static bool ginVacuumPostingTreeLeaves(GinVacuumState *gvs, BlockNumber blkno, bool isRoot, Buffer *rootBuffer) @@ -246,6 +249,9 @@ static void ginDeletePage(GinVacuumState *gvs, BlockNumber deleteBlkno, BlockNum PageSetLSN(page, recptr); PageSetLSN(parentPage, recptr); PageSetLSN(BufferGetPage(lBuffer), recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } if (!isParentRoot) diff --git a/src/gausskernel/storage/access/gist/gist.cpp b/src/gausskernel/storage/access/gist/gist.cpp index 4ab541080f46baac4bad8fa69bef432802b9c360..26c57ff43b920b6f62279e6a89799238c09abd6f 100644 --- a/src/gausskernel/storage/access/gist/gist.cpp +++ b/src/gausskernel/storage/access/gist/gist.cpp @@ -412,6 +412,9 @@ bool gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate, Buffer for (ptr = dist; ptr; ptr = ptr->next) { PageSetLSN(ptr->page, recptr); } +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif /* * Return the new child buffers to the caller. @@ -454,6 +457,9 @@ bool gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate, Buffer recptr = gistXLogUpdate(buffer, deloffs, ndeloffs, itup, ntup, leftchildbuf); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } else { recptr = GetXLogRecPtrForTemp(); PageSetLSN(page, recptr); diff --git a/src/gausskernel/storage/access/gist/gistbuild.cpp b/src/gausskernel/storage/access/gist/gistbuild.cpp index 9d5e118c71fc5c670bafec503fef85aa20d3a7bb..80ebec2c5eaee256b5dc3c42a1eecf5f30457fd6 100644 --- a/src/gausskernel/storage/access/gist/gistbuild.cpp +++ b/src/gausskernel/storage/access/gist/gistbuild.cpp @@ -183,6 +183,9 @@ Datum gistbuild(PG_FUNCTION_ARGS) recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_CREATE_INDEX); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } else PageSetLSN(page, GetXLogRecPtrForTemp()); diff --git a/src/gausskernel/storage/access/gist/gistvacuum.cpp b/src/gausskernel/storage/access/gist/gistvacuum.cpp index 5763df2d8e14e730b37286ddb68ed36b4c50c3ff..4c19457f7a69549704fb81a9880ae2e799d05dec 100644 --- a/src/gausskernel/storage/access/gist/gistvacuum.cpp +++ b/src/gausskernel/storage/access/gist/gistvacuum.cpp @@ -203,6 +203,9 @@ Datum gistbulkdelete(PG_FUNCTION_ARGS) recptr = gistXLogUpdate(buffer, todelete, ntodelete, NULL, 0, InvalidBuffer); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } else PageSetLSN(page, GetXLogRecPtrForTemp()); diff --git a/src/gausskernel/storage/access/hash/hash.cpp b/src/gausskernel/storage/access/hash/hash.cpp index e7cfe74d7df3db04f170b55103c1b4dd44244ff9..5f68e91e3fa4a4d779b6b726e250fa78e4103273 100644 --- a/src/gausskernel/storage/access/hash/hash.cpp +++ b/src/gausskernel/storage/access/hash/hash.cpp @@ -656,6 +656,9 @@ loop_top: recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_UPDATE_META_PAGE); PageSetLSN(BufferGetPage(metabuf), recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -860,6 +863,9 @@ void hashbucketcleanup(Relation rel, Bucket cur_bucket, Buffer bucket_buf, PageSetLSN(BufferGetPage(bucket_buf), recptr); } PageSetLSN(BufferGetPage(buf), recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); diff --git a/src/gausskernel/storage/access/hash/hashinsert.cpp b/src/gausskernel/storage/access/hash/hashinsert.cpp index 6c28075a2a153b8c507e5440473f92872e01c852..fec4dc8123fb8a2817c805025e06be96cd33ff01 100644 --- a/src/gausskernel/storage/access/hash/hashinsert.cpp +++ b/src/gausskernel/storage/access/hash/hashinsert.cpp @@ -216,6 +216,9 @@ restart_insert: PageSetLSN(BufferGetPage(buf), recptr); PageSetLSN(BufferGetPage(metabuf), recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -384,6 +387,9 @@ static void _hash_vacuum_one_page(Relation rel, Buffer metabuf, Buffer buf, RelF PageSetLSN(BufferGetPage(buf), recptr); PageSetLSN(BufferGetPage(metabuf), recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); diff --git a/src/gausskernel/storage/access/hash/hashovfl.cpp b/src/gausskernel/storage/access/hash/hashovfl.cpp index acaa4ee6245d88f8d0d13ec3f9b03996c127b0cc..2848a69d7e37d0ee9dd4a4ff30155ef5fa55e409 100644 --- a/src/gausskernel/storage/access/hash/hashovfl.cpp +++ b/src/gausskernel/storage/access/hash/hashovfl.cpp @@ -397,6 +397,9 @@ found: PageSetLSN(BufferGetPage(newmapbuf), recptr); PageSetLSN(BufferGetPage(metabuf), recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -678,6 +681,9 @@ BlockNumber _hash_freeovflpage(Relation rel, Buffer bucketbuf, Buffer ovflbuf, if (update_metap) PageSetLSN(BufferGetPage(metabuf), recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -936,6 +942,9 @@ readpage: PageSetLSN(BufferGetPage(wbuf), recptr); PageSetLSN(BufferGetPage(rbuf), recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); diff --git a/src/gausskernel/storage/access/hash/hashpage.cpp b/src/gausskernel/storage/access/hash/hashpage.cpp index c8baba488e016f9828842419f0f1547031ab5ba5..3f00702ffa56d509f73c1975a19c85225fccf5e2 100644 --- a/src/gausskernel/storage/access/hash/hashpage.cpp +++ b/src/gausskernel/storage/access/hash/hashpage.cpp @@ -385,6 +385,9 @@ uint32 _hash_init(Relation rel, double num_tuples, ForkNumber forkNum) recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_INIT_META_PAGE); PageSetLSN(BufferGetPage(metabuf), recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } num_buckets = metap->hashm_maxbucket + 1; @@ -411,12 +414,16 @@ uint32 _hash_init(Relation rel, double num_tuples, ForkNumber forkNum) _hash_initbuf(buf, metap->hashm_maxbucket, i, LH_BUCKET_PAGE, false); MarkBufferDirty(buf); - if (use_wal) + if (use_wal) { log_newpage(&rel->rd_node, forkNum, blkno, BufferGetPage(buf), true); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif + } _hash_relbuf(rel, buf); } @@ -465,6 +472,9 @@ uint32 _hash_init(Relation rel, double num_tuples, ForkNumber forkNum) PageSetLSN(BufferGetPage(bitmapbuf), recptr); PageSetLSN(BufferGetPage(metabuf), recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } /* all done */ @@ -903,6 +913,9 @@ restart_expand: PageSetLSN(BufferGetPage(buf_oblkno), recptr); PageSetLSN(BufferGetPage(buf_nblkno), recptr); PageSetLSN(BufferGetPage(metabuf), recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -987,12 +1000,16 @@ static bool _hash_alloc_buckets(Relation rel, BlockNumber firstblock, uint32 nbl ovflopaque->hasho_page_id = HASHO_PAGE_ID; PageSetChecksumInplace(zerobuf, lastblock); - if (RelationNeedsWAL(rel)) + if (RelationNeedsWAL(rel)) { log_newpage(&rel->rd_node, MAIN_FORKNUM, lastblock, zerobuf, true); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif + } if (IsSegmentFileNode(rel->rd_node)) { Buffer buf; @@ -1267,6 +1284,9 @@ static void _hash_splitbucket(Relation rel, Buffer metabuf, Bucket obucket, Buck PageSetLSN(BufferGetPage(bucket_obuf), recptr); PageSetLSN(BufferGetPage(bucket_nbuf), recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -1427,6 +1447,9 @@ static void log_split_page(Relation rel, Buffer buf) recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_SPLIT_PAGE); PageSetLSN(BufferGetPage(buf), recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } } diff --git a/src/gausskernel/storage/access/heap/heapam.cpp b/src/gausskernel/storage/access/heap/heapam.cpp index 333c41f9deecb7c748bd25a0203db99ef21220c2..1ca7c3ff3a485665fab7cb1afb807bf0d1f4e61d 100755 --- a/src/gausskernel/storage/access/heap/heapam.cpp +++ b/src/gausskernel/storage/access/heap/heapam.cpp @@ -3103,6 +3103,9 @@ Oid heap_insert(Relation relation, HeapTuple tup, CommandId cid, int options, Bu recptr = XLogInsert(RM_HEAP_ID, info, InvalidBktId, istoast); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif if (tdeinfo != NULL) { pfree_ext(tdeinfo); @@ -3313,6 +3316,9 @@ void heap_abort_speculative(Relation relation, HeapTuple tuple) XLOG_HEAP_DELETE | XLOG_TUPLE_LOCK_UPGRADE_FLAG); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -3479,6 +3485,9 @@ static void HeapPageShiftBase(Buffer buffer, Page page, bool multi, int64 delta) recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_BASE_SHIFT); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif END_CRIT_SECTION(); } @@ -3625,6 +3634,9 @@ static int freeze_single_heap_page(Relation relation, Buffer buffer) changedMultiXid ? freeze_mxid : InvalidMultiXactId, frozen, nfrozen); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -3639,6 +3651,9 @@ static int freeze_single_heap_page(Relation relation, Buffer buffer) XLogRecPtr recptr = log_heap_invalid(relation, buffer, freeze_xid, invalid, ninvalid); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -4328,6 +4343,9 @@ int heap_multi_insert(Relation relation, Relation parent, HeapTuple* tuples, int recptr = XLogInsert(RM_HEAP2_ID, info); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -4853,6 +4871,9 @@ l1: XLOG_HEAP_DELETE | XLOG_TUPLE_LOCK_UPGRADE_FLAG); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -5638,7 +5659,6 @@ l2: newbuf = buffer; heaptup = newtup; } - /* * We're about to create the new tuple -- check for conflict first, to * avoid possibly having to roll back work we've just done. @@ -5784,6 +5804,9 @@ l2: PageSetLSN(BufferGetPage(newbuf), recptr); } PageSetLSN(BufferGetPage(buffer), recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -5856,6 +5879,9 @@ static XLogRecPtr log_heap_new_cid_insert(xl_heap_new_cid *xlrec, int bucketid) XLogRegisterData((char *) xlrec, SizeOfHeapNewCid); /* will be looked at irrespective of origin */ recptr = XLogInsert(RM_HEAP3_ID, XLOG_HEAP3_NEW_CID, bucketid); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif return recptr; } @@ -6887,6 +6913,9 @@ failed: recptr = XLogInsert(RM_HEAP_ID, useOldXlog ? XLOG_HEAP_LOCK : XLOG_HEAP_LOCK | XLOG_TUPLE_LOCK_UPGRADE_FLAG); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -7444,6 +7473,9 @@ l4: recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_LOCK | XLOG_TUPLE_LOCK_UPGRADE_FLAG); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -7611,6 +7643,9 @@ void heap_inplace_update(Relation relation, HeapTuple tuple, bool waitFlush) recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_INPLACE); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -8068,6 +8103,9 @@ XLogRecPtr log_heap_cleanup_info(const RelFileNode* rnode, TransactionId latest_ XLogRegisterData((char*)&xlrec, SizeOfHeapCleanupInfo); recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_CLEANUP_INFO, rnode->bucketNode); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif return recptr; } @@ -8698,6 +8736,9 @@ XLogRecPtr log_logical_newpage(RelFileNode* rnode, ForkNumber forkNum, BlockNumb recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_LOGICAL_NEWPAGE, rnode->bucketNode); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif PageSetLogical(page); END_CRIT_SECTION(); @@ -8756,13 +8797,18 @@ XLogRecPtr log_newpage_buffer(Buffer buffer, bool page_std, TdeInfo* tde_info) RelFileNode rnode; ForkNumber forkNum; BlockNumber blkno; + XLogRecPtr recptr; /* We should be in a critical section. */ Assert(t_thrd.int_cxt.CritSectionCount > 0); BufferGetTag(buffer, &rnode, &forkNum, &blkno); - return log_newpage(&rnode, forkNum, blkno, page, page_std, tde_info); + recptr = log_newpage(&rnode, forkNum, blkno, page, page_std, tde_info); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif + return recptr; } /* diff --git a/src/gausskernel/storage/access/heap/pruneheap.cpp b/src/gausskernel/storage/access/heap/pruneheap.cpp index 915b7d3ed0b4a77b4081f7db8711e273d761dcaa..7990714f72534fd3e66efd1c3452e4880c4988b5 100644 --- a/src/gausskernel/storage/access/heap/pruneheap.cpp +++ b/src/gausskernel/storage/access/heap/pruneheap.cpp @@ -260,6 +260,9 @@ int heap_page_prune(Relation relation, Buffer buffer, TransactionId oldest_xmin, repair_fragmentation); PageSetLSN(BufferGetPage(buffer), recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } } else { /* diff --git a/src/gausskernel/storage/access/heap/rewriteheap.cpp b/src/gausskernel/storage/access/heap/rewriteheap.cpp index 5dbfce0d93e84ce3b025d7f77910dd58a32a0caa..5114ea9a44bb9942a533a060c373e566c0555a00 100644 --- a/src/gausskernel/storage/access/heap/rewriteheap.cpp +++ b/src/gausskernel/storage/access/heap/rewriteheap.cpp @@ -360,6 +360,9 @@ static void rewrite_write_one_page(RewriteState state, Page page) errno_t rc = memcpy_s(BufferGetBlock(buf), BLCKSZ, page, BLCKSZ); securec_check(rc, "\0", "\0"); PageSetLSN(BufferGetPage(buf), xlog_ptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif MarkBufferDirty(buf); UnlockReleaseBuffer(buf); } else { @@ -368,6 +371,9 @@ static void rewrite_write_one_page(RewriteState state, Page page) if (state->rs_use_wal) { log_newpage(&state->rs_new_rel->rd_node, MAIN_FORKNUM, state->rs_blockno, page, true, &tde_info); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } RelationOpenSmgr(state->rs_new_rel); @@ -735,6 +741,9 @@ static void prepare_cmpr_buffer(RewriteState state, Size meta_size, const char * errno_t rc = memcpy_s(BufferGetBlock(buf), BLCKSZ, page, BLCKSZ); securec_check(rc, "\0", "\0"); PageSetLSN(BufferGetPage(buf), xlog_ptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif MarkBufferDirty(buf); UnlockReleaseBuffer(buf); diff --git a/src/gausskernel/storage/access/heap/visibilitymap.cpp b/src/gausskernel/storage/access/heap/visibilitymap.cpp index f5c1a66d44486e15416802152535e70e00e06f10..ae64a97eeef2316ba7b8d068e793c027865d308b 100644 --- a/src/gausskernel/storage/access/heap/visibilitymap.cpp +++ b/src/gausskernel/storage/access/heap/visibilitymap.cpp @@ -244,6 +244,9 @@ void visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf, XLogRe } } PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } } END_CRIT_SECTION(); diff --git a/src/gausskernel/storage/access/nbtree/nbtdedup.cpp b/src/gausskernel/storage/access/nbtree/nbtdedup.cpp index 9e8a9daf15961978c5cc64f1d4c622a9dde8ef4c..013d22613faa3c7721815bab052104b2883a0413 100644 --- a/src/gausskernel/storage/access/nbtree/nbtdedup.cpp +++ b/src/gausskernel/storage/access/nbtree/nbtdedup.cpp @@ -191,6 +191,9 @@ void btree_dedup_write_wal(BTDedupState state, Buffer buf) Page page = BufferGetPage(buf); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } void btree_dedup_single_value_fillfactor(Page page, BTDedupState state, Size newitemsz) @@ -476,4 +479,4 @@ static bool btree_dedup_posting_valid(IndexTuple posting) return true; } -#endif \ No newline at end of file +#endif diff --git a/src/gausskernel/storage/access/nbtree/nbtinsert.cpp b/src/gausskernel/storage/access/nbtree/nbtinsert.cpp index e65beddd708718049d94dcd73a3f5472b1d83ff6..0d288679cacb7246d738b7b99c17fa74900f1d03 100644 --- a/src/gausskernel/storage/access/nbtree/nbtinsert.cpp +++ b/src/gausskernel/storage/access/nbtree/nbtinsert.cpp @@ -1198,6 +1198,9 @@ static void _bt_insertonpg(Relation rel, BTScanInsert itup_key, Buffer buf, Buff } } PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -1704,6 +1707,9 @@ static Buffer _bt_split(Relation rel, BTScanInsert itup_key, Buffer buf, Buffer PageSetLSN(BufferGetPage(cbuf), recptr); } } +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -2445,6 +2451,9 @@ static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) } PageSetLSN(rootpage, recptr); PageSetLSN(metapg, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -2605,4 +2614,4 @@ bool CheckPartitionIsInvisible(GPIScanDesc gpiScan) } return false; -} \ No newline at end of file +} diff --git a/src/gausskernel/storage/access/nbtree/nbtpage.cpp b/src/gausskernel/storage/access/nbtree/nbtpage.cpp index 33fae015450a81efd320bd91ca1d83218b415744..01fc0da05a4528c237dbb12df58a942a1328b6e9 100644 --- a/src/gausskernel/storage/access/nbtree/nbtpage.cpp +++ b/src/gausskernel/storage/access/nbtree/nbtpage.cpp @@ -302,6 +302,9 @@ Buffer _bt_getroot(Relation rel, int access) PageSetLSN(rootpage, recptr); PageSetLSN(metapg, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -582,6 +585,9 @@ static void _bt_log_reuse_page(const Relation rel, BlockNumber blkno, Transactio XLogRegisterData((char *)&xlrec_reuse, SizeOfBtreeReusePage); (void)XLogInsert(RM_BTREE_ID, XLOG_BTREE_REUSE_PAGE, rel->rd_node.bucketNode); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } /* @@ -899,6 +905,9 @@ void _bt_delitems_vacuum(const Relation rel, Buffer buf, OffsetNumber *deletable } PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -980,6 +989,9 @@ void _bt_delitems_delete(const Relation rel, Buffer buf, OffsetNumber *itemnos, recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_DELETE, bucket_id); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -1688,6 +1700,9 @@ int _bt_pagedel_old(Relation rel, Buffer buf, BTStack stack) page = BufferGetPage(lbuf); PageSetLSN(page, recptr); } +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -2153,6 +2168,9 @@ static bool _bt_mark_page_halfdead(Relation rel, Buffer leafbuf, BTStack stack) PageSetLSN(page, recptr); page = BufferGetPage(leafbuf); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -2532,6 +2550,9 @@ static bool _bt_unlink_halfdead_page(Relation rel, Buffer leafbuf, bool *rightsi page = BufferGetPage(leafbuf); PageSetLSN(page, recptr); } +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); diff --git a/src/gausskernel/storage/access/nbtree/nbtree.cpp b/src/gausskernel/storage/access/nbtree/nbtree.cpp index 3171da3886e40c14f369cfe0e8ef28ff5d72ef47..63b91610074ca9872557999852a72eee458902c0 100644 --- a/src/gausskernel/storage/access/nbtree/nbtree.cpp +++ b/src/gausskernel/storage/access/nbtree/nbtree.cpp @@ -227,6 +227,9 @@ void btbuildempty_internal(Relation index) smgrwrite(index->rd_smgr, INIT_FORKNUM, BTREE_METAPAGE, (char *)metapage, true); log_newpage(&index->rd_smgr->smgr_rnode.node, INIT_FORKNUM, BTREE_METAPAGE, metapage, false); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif /* * An immediate sync is require even if we xlog'd the page, because the diff --git a/src/gausskernel/storage/access/nbtree/nbtsort.cpp b/src/gausskernel/storage/access/nbtree/nbtsort.cpp index c6ef7953f24388b01f1f7f5b32b6ccf40b219ea3..62cf3af1e61ccab371cafe9c7df2d417d57c6809 100644 --- a/src/gausskernel/storage/access/nbtree/nbtsort.cpp +++ b/src/gausskernel/storage/access/nbtree/nbtsort.cpp @@ -319,6 +319,9 @@ static void _bt_segment_blwritepage(BTWriteState *wstate, Page page, BlockNumber errno_t rc = memcpy_s(BufferGetBlock(buf), BLCKSZ, page, BLCKSZ); securec_check(rc, "\0", "\0"); PageSetLSN(BufferGetPage(buf), xlog_ptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif MarkBufferDirty(buf); UnlockReleaseBuffer(buf); pfree(page); @@ -343,6 +346,9 @@ static void _bt_blwritepage(BTWriteState *wstate, Page page, BlockNumber blkno) if (wstate->btws_use_wal) { /* We use the heap NEWPAGE record type for this */ log_newpage(&wstate->index->rd_node, MAIN_FORKNUM, blkno, page, true); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } if (blkno >= wstate->btws_pages_written) { diff --git a/src/gausskernel/storage/access/redo/redo_xlogutils.cpp b/src/gausskernel/storage/access/redo/redo_xlogutils.cpp index 2aa458b122598b013f9fcbc193db21fa800192db..ccb0d153a1e5338817511a79ded30f20bf9a4407 100644 --- a/src/gausskernel/storage/access/redo/redo_xlogutils.cpp +++ b/src/gausskernel/storage/access/redo/redo_xlogutils.cpp @@ -1805,9 +1805,15 @@ bool XLogBlockRedoForExtremeRTO(XLogRecParseState *redoblocktate, RedoBufferInfo return false; } +#ifdef ENABLE_SS_MULTIMASTER + if ((block_valid != BLOCK_DATA_UNDO_TYPE) && g_instance.attr.attr_storage.EnableHotStandby && + IsDefaultExtremeRtoMode() && XLByteLT(PageGetLSN(bufferinfo->pageinfo.page), blockhead->logic_lsn) && + !IsSegmentFileNode(bufferinfo->blockinfo.rnode)) { +#else if ((block_valid != BLOCK_DATA_UNDO_TYPE) && g_instance.attr.attr_storage.EnableHotStandby && IsDefaultExtremeRtoMode() && XLByteLT(PageGetLSN(bufferinfo->pageinfo.page), blockhead->end_ptr) && !IsSegmentFileNode(bufferinfo->blockinfo.rnode)) { +#endif if (unlikely(bufferinfo->blockinfo.forknum >= EXRTO_FORK_NUM)) { ereport(PANIC, (errmsg("forknum is illegal: %d", bufferinfo->blockinfo.forknum))); } @@ -2044,7 +2050,11 @@ void redo_target_page(const BufferTag &buf_tag, StandbyReadLsnInfoArray *lsn_inf ereport(ERROR, (errmsg("redo_target_page: internal error, xlog in lsn %X/%X doesn't contain target block.", (uint32)(lsn_info->lsn_array[i] >> LSN_MOVE32), (uint32)(lsn_info->lsn_array[i])))); } +#ifdef ENABLE_SS_MULTIMASTER + buf_info.lsn = state_iter->blockparse.blockhead.logic_lsn; +#else buf_info.lsn = state_iter->blockparse.blockhead.end_ptr; +#endif buf_info.blockinfo.pblk = state_iter->blockparse.blockhead.pblk; wal_block_redo_for_extreme_rto_read(state_iter, &buf_info); XLogBlockParseStateRelease(state); diff --git a/src/gausskernel/storage/access/spgist/spgdoinsert.cpp b/src/gausskernel/storage/access/spgist/spgdoinsert.cpp index 16f31a78f969bd2e5778ad33ddfd7272fbeb13de..61adfc6ae17afeeea93faa1997c6d0a372328103 100644 --- a/src/gausskernel/storage/access/spgist/spgdoinsert.cpp +++ b/src/gausskernel/storage/access/spgist/spgdoinsert.cpp @@ -291,6 +291,9 @@ static void addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple lea if (xlrec.offnumParent != InvalidOffsetNumber) { PageSetLSN(parent->page, recptr); } +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -496,6 +499,9 @@ static void moveLeafs(Relation index, SpGistState *state, SPPageDesc *current, S PageSetLSN(current->page, recptr); PageSetLSN(npage, recptr); PageSetLSN(parent->page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -1256,6 +1262,9 @@ static bool doPickSplit(Relation index, SpGistState *state, SPPageDesc *current, if (parent->buffer != InvalidBuffer) { PageSetLSN(parent->page, recptr); } +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -1377,6 +1386,9 @@ static void spgAddNodeAction(Relation index, SpGistState *state, SpGistInnerTupl recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE); PageSetLSN(current->page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -1502,6 +1514,9 @@ static void spgAddNodeAction(Relation index, SpGistState *state, SpGistInnerTupl PageSetLSN(current->page, recptr); PageSetLSN(parent->page, recptr); PageSetLSN(saveCurrent.page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -1652,6 +1667,9 @@ static void spgSplitNodeAction(Relation index, SpGistState *state, SpGistInnerTu if (newBuffer != InvalidBuffer) { PageSetLSN(BufferGetPage(newBuffer), recptr); } +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); diff --git a/src/gausskernel/storage/access/spgist/spginsert.cpp b/src/gausskernel/storage/access/spgist/spginsert.cpp index 5fd04c511c5d9d67024fd8513b89d6a7156671cd..9f0a3e5d9600e0a1678d8c60326f9ac29c162c38 100644 --- a/src/gausskernel/storage/access/spgist/spginsert.cpp +++ b/src/gausskernel/storage/access/spgist/spginsert.cpp @@ -106,6 +106,9 @@ Datum spgbuild(PG_FUNCTION_ARGS) PageSetLSN(BufferGetPage(metabuffer), recptr); PageSetLSN(BufferGetPage(rootbuffer), recptr); PageSetLSN(BufferGetPage(nullbuffer), recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -171,6 +174,9 @@ Datum spgbuildempty(PG_FUNCTION_ARGS) PageSetChecksumInplace(page, SPGIST_METAPAGE_BLKNO); smgrwrite(index->rd_smgr, INIT_FORKNUM, SPGIST_METAPAGE_BLKNO, (char *)page, true); log_newpage(&index->rd_smgr->smgr_rnode.node, INIT_FORKNUM, SPGIST_METAPAGE_BLKNO, page, false); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif /* Likewise for the root page. */ SpGistInitPage(page, SPGIST_LEAF); @@ -178,6 +184,9 @@ Datum spgbuildempty(PG_FUNCTION_ARGS) PageSetChecksumInplace(page, SPGIST_ROOT_BLKNO); smgrwrite(index->rd_smgr, INIT_FORKNUM, SPGIST_ROOT_BLKNO, (char *)page, true); log_newpage(&index->rd_smgr->smgr_rnode.node, INIT_FORKNUM, SPGIST_ROOT_BLKNO, page, true); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif /* Likewise for the null-tuples root page. */ SpGistInitPage(page, SPGIST_LEAF | SPGIST_NULLS); @@ -185,6 +194,9 @@ Datum spgbuildempty(PG_FUNCTION_ARGS) PageSetChecksumInplace(page, SPGIST_NULL_BLKNO); smgrwrite(index->rd_smgr, INIT_FORKNUM, SPGIST_NULL_BLKNO, (char *)page, true); log_newpage(&index->rd_smgr->smgr_rnode.node, INIT_FORKNUM, SPGIST_NULL_BLKNO, page, true); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif /* * An immediate sync is required even if we xlog'd the pages, because the @@ -247,4 +259,4 @@ Datum spgmerge(PG_FUNCTION_ARGS) ereport(ERROR, (errcode(ERRCODE_INDEX_CORRUPTED), errmsg("spgmerge: unimplemented"))); PG_RETURN_POINTER(result); -} \ No newline at end of file +} diff --git a/src/gausskernel/storage/access/spgist/spgvacuum.cpp b/src/gausskernel/storage/access/spgist/spgvacuum.cpp index ceda3ceaacfb2efd582469aea62c04b057cb34ae..c5f21124a375c779b0d2476a1cf8a0798e29e442 100644 --- a/src/gausskernel/storage/access/spgist/spgvacuum.cpp +++ b/src/gausskernel/storage/access/spgist/spgvacuum.cpp @@ -359,6 +359,9 @@ static void vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffe recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_LEAF); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -430,6 +433,9 @@ static void vacuumLeafRoot(spgBulkDeleteState *bds, Relation index, Buffer buffe recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_ROOT); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -541,6 +547,9 @@ static void vacuumRedirectAndPlaceholder(Relation index, Buffer buffer) recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_REDIRECT); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); diff --git a/src/gausskernel/storage/access/transam/clog.cpp b/src/gausskernel/storage/access/transam/clog.cpp index 4109ef78534de02f28cb52f3e7311162665eea63..00e2d024cc44b72619e424e58eb355e7e81235e4 100644 --- a/src/gausskernel/storage/access/transam/clog.cpp +++ b/src/gausskernel/storage/access/transam/clog.cpp @@ -1090,6 +1090,9 @@ static void WriteZeroPageXlogRec(int64 pageno) XLogBeginInsert(); XLogRegisterData((char *)(&pageno), sizeof(int64)); (void)XLogInsert(RM_CLOG_ID, CLOG_ZEROPAGE); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } /* @@ -1105,6 +1108,9 @@ static void WriteTruncateXlogRec(int64 pageno) XLogBeginInsert(); XLogRegisterData((char *)(&pageno), sizeof(int64)); recptr = XLogInsert(RM_CLOG_ID, CLOG_TRUNCATE); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif XLogWaitFlush(recptr); } diff --git a/src/gausskernel/storage/access/transam/multi_redo_api.cpp b/src/gausskernel/storage/access/transam/multi_redo_api.cpp index c451266fedb885b6cea0c3c485e5fcdb0f220b75..e357bc5a044588af2d7cef21b74b553661ba9484 100644 --- a/src/gausskernel/storage/access/transam/multi_redo_api.cpp +++ b/src/gausskernel/storage/access/transam/multi_redo_api.cpp @@ -57,6 +57,47 @@ void StartUpMultiRedo(XLogReaderState *xlogreader, XLogReaderState **xlogreaderL parallel_recovery::StartRecoveryWorkers(xlogreader->ReadRecPtr); } } + +void InitReaderStateByOld(XLogReaderState *newState, XLogReaderState *oldState) +{ + newState->read_page = oldState->read_page; + newState->system_identifier = oldState->system_identifier; + newState->private_data = oldState->private_data; + newState->errormsg_buf = oldState->errormsg_buf; + newState->isPRProcess = oldState->isPRProcess; + + newState->ReadRecPtr = oldState->ReadRecPtr; + newState->EndRecPtr = oldState->EndRecPtr; +#ifdef ENABLE_SS_MULTIMASTER + newState->xlogPath = oldState->xlogPath; + newState->isEnd = oldState->isEnd; + newState->instId = oldState->instId; + newState->logicLSN = oldState->logicLSN; + newState->args = oldState->args; +#endif + newState->readSegNo = oldState->readSegNo; + newState->readOff = oldState->readOff; + newState->readPageTLI = oldState->readPageTLI; + newState->curReadSegNo = oldState->curReadSegNo; + newState->curReadOff = oldState->curReadOff; + newState->latestPagePtr = oldState->latestPagePtr; + newState->latestPageTLI = oldState->latestPageTLI; + newState->currRecPtr = oldState->currRecPtr; + newState->readBuf = oldState->readBuf; + newState->readLen = oldState->readLen; + newState->preReadStartPtr = oldState->preReadStartPtr; + newState->preReadBuf = oldState->preReadBuf; + + newState->decoded_record = NULL; + newState->main_data = NULL; + newState->main_data_len = 0; + + newState->max_block_id = -1; + newState->readblocks = 0; + /* move block clear to FreeRedoItem because we used MCXT_ALLOC_ZERO to alloc buf, if the variable is not init to 0, + you should put it here. */ + +} #endif bool IsMultiThreadRedoRunning() @@ -411,14 +452,6 @@ void DiagLogRedoRecord(XLogReaderState *record, const char *funcName) void ApplyRedoRecord(XLogReaderState *record) { -#ifdef ENABLE_SS_MULTIMASTER - XLogReaderArgs args; - if (record->args) { - memset_s(&args, sizeof(args), 0, sizeof(args)); - CurrentArgsCopy(&args); - NodeArgsSwitch(record->args); - } -#endif ErrorContextCallback errContext; errContext.callback = rm_redo_error_callback; errContext.arg = (void *)record; @@ -428,11 +461,7 @@ void ApplyRedoRecord(XLogReaderState *record) DiagLogRedoRecord(record, "ApplyRedoRecord"); } RmgrTable[XLogRecGetRmid(record)].rm_redo(record); -#ifdef ENABLE_SS_MULTIMASTER - if (record->args) { - NodeArgsSwitch(&args); - } -#endif + t_thrd.log_cxt.error_context_stack = errContext.previous; } diff --git a/src/gausskernel/storage/access/transam/multixact.cpp b/src/gausskernel/storage/access/transam/multixact.cpp index fa2fb0c59ba41793f4cce75b0bfae18190d19fcc..e0bdea8f580930d3edeab313c644a28a630b27fe 100644 --- a/src/gausskernel/storage/access/transam/multixact.cpp +++ b/src/gausskernel/storage/access/transam/multixact.cpp @@ -740,6 +740,9 @@ static MultiXactId CreateMultiXactId(int nmembers, MultiXactMember *members) XLogRegisterData((char *)xidsWithStatus, (unsigned)nmembers * sizeof(TransactionId)); (void)XLogInsert(RM_MULTIXACT_ID, XLOG_MULTIXACT_CREATE_ID); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif /* Now enter the information into the OFFSETs and MEMBERs logs */ RecordNewMultiXact(multi, offset, nmembers, xidsWithStatus); @@ -2118,6 +2121,9 @@ static void WriteMZeroPageXlogRec(int64 pageno, uint8 info) XLogRegisterData((char *)(&pageno), sizeof(int)); } (void)XLogInsert(RM_MULTIXACT_ID, info); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } void get_multixact_pageno(uint8 info, int64 *pageno, XLogReaderState *record) @@ -2227,7 +2233,7 @@ void SSMultiXactShmemClear(void) } #ifdef ENABLE_SS_MULTIMASTER -Size SSMultiXactSize(void) +Size SSGetMultiXactSize(void) { return sizeof(MultiXactStateData); } diff --git a/src/gausskernel/storage/access/transam/twophase.cpp b/src/gausskernel/storage/access/transam/twophase.cpp index 8f2b68aeadc2ba35821aa3cffef03e27c4648c9b..e5ae7f72dd330b75dbbdcacfddd307afb720930e 100644 --- a/src/gausskernel/storage/access/transam/twophase.cpp +++ b/src/gausskernel/storage/access/transam/twophase.cpp @@ -1966,6 +1966,9 @@ void EndPrepare(GlobalTransaction gxact) } gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif XLogWaitFlush(gxact->prepare_end_lsn); /* If we crash now, we have prepared: WAL replay will fix things */ @@ -3544,6 +3547,9 @@ static void RecordTransactionCommitPrepared(TransactionId xid, int nchildren, Tr info |= XLR_REL_COMPRESS; } recptr = XLogInsert(RM_XACT_ID, (uint8)info); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif if (nrels > 0) { globalDelayDDLLSN = GetDDLDelayStartPtr(); @@ -3642,6 +3648,9 @@ static void RecordTransactionAbortPrepared(TransactionId xid, int nchildren, Tra info |= XLR_REL_COMPRESS; } recptr = XLogInsert(RM_XACT_ID, (uint8)info); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif if (nrels > 0) { globalDelayDDLLSN = GetDDLDelayStartPtr(); diff --git a/src/gausskernel/storage/access/transam/xact.cpp b/src/gausskernel/storage/access/transam/xact.cpp index f1cba2d9d4270d91d16b1caa865faf158f0d795f..841b098e60c3e42f7579e107021657032ee5db24 100755 --- a/src/gausskernel/storage/access/transam/xact.cpp +++ b/src/gausskernel/storage/access/transam/xact.cpp @@ -880,6 +880,9 @@ static void AssignTransactionId(TransactionState s) t_thrd.xact_cxt.nUnreportedXids = 0; /* mark top, not current xact as having been logged */ TopTransactionStateData.didLogXid = true; +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } } } @@ -1703,6 +1706,9 @@ static TransactionId RecordTransactionCommit(void) info |= XLR_REL_COMPRESS; } commitRecLSN = XLogInsert(RM_XACT_ID, (uint8)info); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif if (nrels > 0) { globalDelayDDLLSN = GetDDLDelayStartPtr(); @@ -1743,6 +1749,9 @@ static TransactionId RecordTransactionCommit(void) info |= XLR_REL_COMPRESS; } (void)XLogInsert(RM_XACT_ID, (uint8)info); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } } @@ -2142,6 +2151,9 @@ static TransactionId RecordTransactionAbort(bool isSubXact) #else abortRecLSN = XLogInsert(RM_XACT_ID, (uint8)(XLOG_XACT_ABORT | info)); #endif +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif if (nrels > 0) { globalDelayDDLLSN = GetDDLDelayStartPtr(); diff --git a/src/gausskernel/storage/access/transam/xlog.cpp b/src/gausskernel/storage/access/transam/xlog.cpp index 54ed742cf4b31ee8d65be0ed006e2805618813f8..111bd8d1cd7d00034779b562b20bde1fb6822c91 100755 --- a/src/gausskernel/storage/access/transam/xlog.cpp +++ b/src/gausskernel/storage/access/transam/xlog.cpp @@ -193,6 +193,7 @@ #define SS_STANDBY_INST_SKIP_SHARED_FILE false #define SS_REDO_ID 0 #define SS_REDO_MODE (ENABLE_DMS && (SS_MY_INST_ID == SS_REDO_ID)) +#define SS_FORK_MODE (ENABLE_DMS && (SS_MY_INST_ID != SS_REDO_ID)) #define SS_IS_FORK_RECORD(state) (state->args != NULL && state->instId != SS_MY_INST_ID) #endif @@ -346,10 +347,6 @@ typedef struct XLogSwitchInfo { volatile bool IsPendingXactsRecoveryDone = false; -#ifdef ENABLE_SS_MULTIMASTER -static uint32 GetXlogRecordLogicLSN(XLogRecPtr RecPtr); -#endif - static void XLogFlushCore(XLogRecPtr writeRqstPtr); static void XLogSelfFlush(void); static void XLogSelfFlushWithoutStatus(int numHitsOnStartPage, XLogRecPtr currPos, int currLRC); @@ -517,7 +514,7 @@ static void XLogInsertRecordGroupLeader(PGPROC *leader, uint64 *end_byte_pos_ptr uint32 logic_lsn = 0; #ifdef ENABLE_SS_MULTIMASTER - *leader->xlogGroupRedoRecPtr = t_thrd.shemem_ptr_cxt.XLogCtl->Insert.RedoLogicLSN; + *leader->xlogGroupRedoRecPtr = t_thrd.shemem_ptr_cxt.XLogCtl->Insert.logicRedoRecPtr; #else *leader->xlogGroupRedoRecPtr = t_thrd.shemem_ptr_cxt.XLogCtl->Insert.RedoRecPtr; #endif @@ -597,7 +594,7 @@ static void XLogInsertRecordGroupFollowers(PGPROC *leader, const uint32 head, ui while (nextidx != (uint32)(leader->pgprocno)) { follower = g_instance.proc_base_all_procs[nextidx]; #ifdef ENABLE_SS_MULTIMASTER - *follower->xlogGroupRedoRecPtr = t_thrd.shemem_ptr_cxt.XLogCtl->Insert.RedoLogicLSN; + *follower->xlogGroupRedoRecPtr = t_thrd.shemem_ptr_cxt.XLogCtl->Insert.logicRedoRecPtr; #else *follower->xlogGroupRedoRecPtr = t_thrd.shemem_ptr_cxt.XLogCtl->Insert.RedoRecPtr; #endif @@ -735,7 +732,7 @@ static XLogRecPtr XLogInsertRecordGroup(XLogRecData *rdata, XLogRecPtr fpw_lsn) proc->xlogGroupXactLastRecEnd = &t_thrd.xlog_cxt.XactLastRecEnd; proc->xlogGroupCurrentTransactionState = GetCurrentTransactionState(); #ifdef ENABLE_SS_MULTIMASTER - proc->xlogGroupRedoRecPtr = &t_thrd.xlog_cxt.RedoLogicLSN; + proc->xlogGroupRedoRecPtr = &t_thrd.xlog_cxt.logicRedoRecPtr; #else proc->xlogGroupRedoRecPtr = &t_thrd.xlog_cxt.RedoRecPtr; #endif @@ -1317,13 +1314,13 @@ static XLogRecPtr XLogInsertRecordSingle(XLogRecData *rdata, XLogRecPtr fpw_lsn) Assert(t_thrd.xlog_cxt.RedoRecPtr < Insert->RedoRecPtr); t_thrd.xlog_cxt.RedoRecPtr = Insert->RedoRecPtr; #ifdef ENABLE_SS_MULTIMASTER - t_thrd.xlog_cxt.RedoLogicLSN = Insert->RedoLogicLSN; + t_thrd.xlog_cxt.logicRedoRecPtr = Insert->logicRedoRecPtr; #endif } t_thrd.xlog_cxt.doPageWrites = (Insert->fullPageWrites || Insert->forcePageWrites); #ifdef ENABLE_SS_MULTIMASTER - if (fpw_lsn != InvalidXLogRecPtr && fpw_lsn <= t_thrd.xlog_cxt.RedoLogicLSN && t_thrd.xlog_cxt.doPageWrites) { + if (fpw_lsn != InvalidXLogRecPtr && fpw_lsn <= t_thrd.xlog_cxt.logicRedoRecPtr && t_thrd.xlog_cxt.doPageWrites) { #else if (fpw_lsn != InvalidXLogRecPtr && fpw_lsn <= t_thrd.xlog_cxt.RedoRecPtr && t_thrd.xlog_cxt.doPageWrites) { #endif @@ -9737,15 +9734,8 @@ void NodeArgsSwitch(XLogReaderArgs *args) t_thrd.shemem_ptr_cxt.MultiXactState = args->MultiXactState; t_thrd.shemem_ptr_cxt.ClogCtl = args->ExtendClogCtl; t_thrd.shemem_ptr_cxt.CsnlogCtlPtr = args->ExtendCsnlogCtlPtr; -} - -void CurrentArgsCopy(XLogReaderArgs *args) -{ - args->ControlFile = t_thrd.shemem_ptr_cxt.ControlFile; - args->ShmemVariableCache = t_thrd.xact_cxt.ShmemVariableCache; - args->MultiXactState = t_thrd.shemem_ptr_cxt.MultiXactState; - args->ExtendClogCtl = t_thrd.shemem_ptr_cxt.ClogCtl; - args->ExtendCsnlogCtlPtr = t_thrd.shemem_ptr_cxt.CsnlogCtlPtr; + t_thrd.shemem_ptr_cxt.MultiXactOffsetCtl = args->ExtendMultiXactOffsetCtl; + t_thrd.shemem_ptr_cxt.MultiXactMemberCtl = args->ExtendMultiXactMemberCtl; } static int get_min_xlogreader(XLogReaderState **xlogreaderList, int instanceNum) @@ -9773,6 +9763,8 @@ void SlruInit(XLogReaderArgs *args, int id) args->ExtendClogCtl = (SlruCtlData*)palloc0(NUM_CLOG_PARTITIONS * sizeof(SlruCtlData)); args->ExtendCsnlogCtlPtr = (SlruCtlData*)palloc0(NUM_CSNLOG_PARTITIONS * sizeof(SlruCtlData)); + args->ExtendMultiXactOffsetCtl = (SlruCtlData*)palloc0(sizeof(SlruCtlData)); + args->ExtendMultiXactMemberCtl = (SlruCtlData*)palloc0(sizeof(SlruCtlData)); rc = snprintf_s(dir, MAXPGPATH, MAXPGPATH - 1, "%s/%s%d", g_instance.attr.attr_storage.dss_attr.ss_dss_vg_name, "pg_clog", id); @@ -9789,6 +9781,28 @@ void SlruInit(XLogReaderArgs *args, int id) ExtendSimpleLruInit(&args->ExtendCsnlogCtlPtr[i], LWTRANCHE_CSNLOG_CTL, CSNLOGShmemBuffers(), 0, CSNBufMappingPartitionLockByIndex(i), dir); } + + rc = snprintf_s(dir, MAXPGPATH, MAXPGPATH - 1, + "%s/%s/offsets", g_instance.attr.attr_storage.dss_attr.ss_dss_vg_name, "pg_multixact", id); + securec_check_ss(rc, "\0", "\0"); + if (ENABLE_DSS) { + ExtendSimpleLruInit(args->ExtendMultiXactOffsetCtl, LWTRANCHE_MULTIXACTOFFSET_CTL, DSS_MAX_MXACTOFFSET, 0, + MultiXactOffsetControlLock, dir); + } else { + ExtendSimpleLruInit(args->ExtendMultiXactOffsetCtl, LWTRANCHE_MULTIXACTOFFSET_CTL, NUM_MXACTOFFSET_BUFFERS, 0, + MultiXactOffsetControlLock, dir); + } + + rc = snprintf_s(dir, MAXPGPATH, MAXPGPATH - 1, + "%s/%s/members", g_instance.attr.attr_storage.dss_attr.ss_dss_vg_name, "pg_multixact", id); + securec_check_ss(rc, "\0", "\0"); + if (ENABLE_DSS) { + ExtendSimpleLruInit(args->ExtendMultiXactMemberCtl, LWTRANCHE_MULTIXACTMEMBER_CTL, DSS_MAX_MXACTMEMBER, 0, + MultiXactMemberControlLock, dir); + } else { + ExtendSimpleLruInit(args->ExtendMultiXactMemberCtl, LWTRANCHE_MULTIXACTMEMBER_CTL, NUM_MXACTMEMBER_BUFFERS, 0, + MultiXactMemberControlLock, dir); + } } #endif /* @@ -9796,6 +9810,7 @@ void SlruInit(XLogReaderArgs *args, int id) */ void StartupXLOG(void) { + uint64 redoTime = GetCurrentTimestamp(); XLogCtlInsert *Insert = NULL; CheckPoint checkPoint; CheckPointNew checkPointNew; /* to adapt update and not to modify the storage format */ @@ -9820,6 +9835,7 @@ void StartupXLOG(void) int instanceNum = 0; XLogReaderState **xlogreaderList = NULL; XLogReaderArgs **xlogReaderArgsList = NULL; + uint64 checkpointRedoLogicLSN = 0; #endif XLogPageReadPrivate readprivate; bool RecoveryByPending = false; /* recovery caused by pending mode */ @@ -9923,11 +9939,13 @@ void StartupXLOG(void) xlogReaderArgsList[i]->MultiXactState = t_thrd.shemem_ptr_cxt.MultiXactState; xlogReaderArgsList[i]->ExtendClogCtl = t_thrd.shemem_ptr_cxt.ClogCtl; xlogReaderArgsList[i]->ExtendCsnlogCtlPtr = t_thrd.shemem_ptr_cxt.CsnlogCtlPtr; + xlogReaderArgsList[i]->ExtendMultiXactOffsetCtl = t_thrd.shemem_ptr_cxt.MultiXactOffsetCtl; + xlogReaderArgsList[i]->ExtendMultiXactMemberCtl = t_thrd.shemem_ptr_cxt.MultiXactMemberCtl; } else { xlogReaderArgsList[i]->ControlFile = (ControlFileData *)palloc(sizeof(ControlFileData)); xlogReaderArgsList[i]->checkPointUndo = (CheckPointUndo *)palloc(sizeof(CheckPointUndo)); xlogReaderArgsList[i]->ShmemVariableCache = (VariableCacheData *)palloc(sizeof(VariableCacheData)); - xlogReaderArgsList[i]->MultiXactState = (MultiXactStateData *)palloc(SSMultiXactSize()); + xlogReaderArgsList[i]->MultiXactState = (MultiXactStateData *)palloc(SSGetMultiXactSize()); SSReadControlFile(i, false, xlogReaderArgsList[i]->ControlFile); SlruInit(xlogReaderArgsList[i], i); @@ -9937,16 +9955,13 @@ void StartupXLOG(void) xlogReaderArgsList[i]->checkPointUndo->globalRecycleXid = InvalidTransactionId; errno_t rc = memset_s(xlogReaderArgsList[i]->ShmemVariableCache,sizeof(VariableCacheData),0,sizeof(VariableCacheData)); securec_check(rc, "\0", "\0"); - rc = memset_s(xlogReaderArgsList[i]->MultiXactState, sizeof(SSMultiXactSize()), 0, sizeof(SSMultiXactSize())); + rc = memset_s(xlogReaderArgsList[i]->MultiXactState, SSGetMultiXactSize(), 0, SSGetMultiXactSize()); securec_check(rc, "", ""); } xlogReaderArgsList[i]->readFile = -1; xlogReaderArgsList[i]->readSegNo = 0; } } else { - SSCSNLOGShmemClear(); - SSCLOGShmemClear(); - SSMultiXactShmemClear(); src_id = SS_MY_INST_ID; SSReadControlFile(SS_MY_INST_ID); } @@ -10181,8 +10196,12 @@ void StartupXLOG(void) if (ENABLE_DMS && ENABLE_DSS) { SSGetRecoveryXlogPath(); #ifdef ENABLE_SS_MULTIMASTER - if (SS_ONDEMAND_REALTIME_BUILD_READY_TO_BUILD || !SS_REDO_MODE) { + if (SS_ONDEMAND_REALTIME_BUILD_READY_TO_BUILD || SS_FORK_MODE) { + SSDisasterGetXlogPathList(); xlogreader = SSXLogReaderAllocate(&SSXLogPageRead, &readprivate, ALIGNOF_BUFFER); + xlogreader->xlogPath = g_instance.dms_cxt.SSRecoveryInfo.xlog_list[SS_MY_INST_ID]; + xlogreader->instId = SS_MY_INST_ID; + xlogreader->isEnd = false; close_readFile_if_open(); } else { xlogreaderList = (XLogReaderState **)palloc(instanceNum * sizeof(XLogReaderState *)); @@ -10471,6 +10490,16 @@ void StartupXLOG(void) wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN); wasCheckpoint = wasShutdown || (record->xl_info == XLOG_CHECKPOINT_ONLINE); +#ifdef ENABLE_SS_MULTIMASTER + if (XLByteLT(checkPoint.redo, checkPointLoc)) { + record = ReadRecord(xlogreader, checkPoint.redo, PANIC, false); + } + if (record != NULL) { + checkpointRedoLogicLSN = record->logic_lsn; + } else { + checkpointRedoLogicLSN = InvalidXLogRecPtr; + } +#endif } /* @@ -10711,6 +10740,10 @@ void StartupXLOG(void) t_thrd.xlog_cxt.lastFullPageWrites = checkPoint.fullPageWrites; t_thrd.xlog_cxt.RedoRecPtr = t_thrd.shemem_ptr_cxt.XLogCtl->RedoRecPtr = t_thrd.shemem_ptr_cxt.XLogCtl->Insert.RedoRecPtr = checkPoint.redo; +#ifdef ENABLE_SS_MULTIMASTER + t_thrd.xlog_cxt.logicRedoRecPtr = t_thrd.shemem_ptr_cxt.XLogCtl->logicRedoRecPtr = + t_thrd.shemem_ptr_cxt.XLogCtl->Insert.logicRedoRecPtr = checkpointRedoLogicLSN; +#endif } if (ENABLE_INCRE_CKPT) { @@ -10752,14 +10785,33 @@ void StartupXLOG(void) } #ifdef ENABLE_SS_MULTIMASTER - if (!SS_REDO_MODE && t_thrd.xlog_cxt.InRecovery == true && SS_ONDEMAND_REALTIME_BUILD_DISABLED) { + if (SS_FORK_MODE && t_thrd.xlog_cxt.InRecovery == true && SS_ONDEMAND_REALTIME_BUILD_DISABLED) { + /* do not need replay anything in SS_FORK_MODE */ + ereport(LOG, (errmsg("[SS] Skip redo replay in standby mode"))); + t_thrd.xlog_cxt.InRecovery = false; + do { + record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false); + } while (record != NULL); + t_thrd.xlog_cxt.LastRec = t_thrd.xlog_cxt.ReadRecPtr; + /* but need to wait SS_REDO_MODE */ + while (!g_instance.dms_cxt.SSRecoveryInfo.redo_done) { + pg_usleep(5000L); + } + SSCSNLOGShmemClear(); + SSCLOGShmemClear(); + SSMultiXactShmemClear(); + errno_t rc = memcpy_s(t_thrd.xact_cxt.ShmemVariableCache, sizeof(VariableCacheData), g_instance.dms_cxt.SSReformerControl.argsList[SS_MY_INST_ID].ShmemVariableCache, sizeof(VariableCacheData)); + securec_check(rc, "", ""); + rc = memcpy_s(t_thrd.shemem_ptr_cxt.MultiXactState, MUXACT_SIZE, g_instance.dms_cxt.SSReformerControl.argsList[SS_MY_INST_ID].PartMultiXactState, MUXACT_SIZE); + securec_check(rc, "", ""); + } #else if (SS_STANDBY_MODE && t_thrd.xlog_cxt.InRecovery == true && SS_ONDEMAND_REALTIME_BUILD_DISABLED) { -#endif /* do not need replay anything in SS standby mode */ ereport(LOG, (errmsg("[SS] Skip redo replay in standby mode"))); t_thrd.xlog_cxt.InRecovery = false; } +#endif g_instance.dms_cxt.SSRecoveryInfo.in_ondemand_recovery = false; SetDefaultExtremeRtoMode(); @@ -10774,11 +10826,19 @@ void StartupXLOG(void) } } +#ifdef ENABLE_SS_MULTIMASTER + if (SS_REDO_MODE || SS_DISASTER_MAIN_STANDBY_NODE) { + LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); + SSUpdateReformerCtrl(); + LWLockRelease(ControlFileLock); + } +#else if (SS_PRIMARY_MODE || SS_DISASTER_MAIN_STANDBY_NODE) { LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); SSUpdateReformerCtrl(); LWLockRelease(ControlFileLock); } +#endif if (SS_ONDEMAND_REALTIME_BUILD_READY_TO_BUILD) { t_thrd.xlog_cxt.InRecovery = true; @@ -11081,6 +11141,7 @@ void StartupXLOG(void) record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false); } + XLogReaderState *oldXlogReader = xlogreader; #ifdef ENABLE_SS_MULTIMASTER if (SS_REDO_MODE) { if (record == NULL) { @@ -11097,7 +11158,7 @@ void StartupXLOG(void) } if (XLByteLT(tmpRedoPointLoc, tmpCheckPointLoc)) { t_thrd.xlog_cxt.InRecovery = true; - record = ReadRecord(xlogreaderList[i], checkPoint.redo, PANIC, false); + record = ReadRecord(xlogreaderList[i], tmpRedoPointLoc, PANIC, false); } else { record = ReadRecord(xlogreaderList[i], InvalidXLogRecPtr, LOG, false); if (record == NULL) { @@ -11112,9 +11173,6 @@ void StartupXLOG(void) xlogctl->lastReplayedLogicLSN = record->logic_lsn; } } - XLogReaderState *oldXlogReader = xlogreaderList[SS_MY_INST_ID]; -#else - XLogReaderState *oldXlogReader = xlogreader; #endif if (record != NULL) { @@ -11183,6 +11241,11 @@ void StartupXLOG(void) pfree_ext(buf.data); } #endif +#ifdef ENABLE_SS_MULTIMASTER + if (xlogreader->args) { + NodeArgsSwitch(xlogreader->args); + } +#endif /* Handle interrupt signals of startup process */ RedoInterruptCallBack(); @@ -11323,6 +11386,9 @@ void StartupXLOG(void) GetRedoStartTime(t_thrd.xlog_cxt.timeCost[TIME_COST_STEP_1]); if (xlogreader->isPRProcess && IsExtremeRedo()) { record = ExtremeReadNextXLogRecord(&xlogreader, LOG); +#ifdef ENABLE_SS_MULTIMASTER + InitReaderStateByOld(xlogreaderList[xlogreader->instId], xlogreader); +#endif } else { xlogreader = newXlogReader; record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false); @@ -11341,7 +11407,6 @@ void StartupXLOG(void) } CountRedoTime(t_thrd.xlog_cxt.timeCost[TIME_COST_STEP_1]); } while (record != NULL); // end of main redo apply loop - if (SS_IN_ONDEMAND_RECOVERY) { OnDemandSendRecoveryEndMarkToWorkersAndWaitForReach(0); } else { @@ -11350,6 +11415,9 @@ void StartupXLOG(void) #ifdef ENABLE_SS_MULTIMASTER if (SS_REDO_MODE) { xlogreader = xlogreaderList[SS_MY_INST_ID]; + if (xlogreader->args) { + NodeArgsSwitch(xlogreader->args); + } for (int i = 0; i < instanceNum; i++) { if (xlogreaderList[i]->instId == SS_MY_INST_ID) { continue; @@ -11358,6 +11426,9 @@ void StartupXLOG(void) (void)SimpleLruFlush(&xlogreaderList[i]->args->ExtendClogCtl[j], false); for (int j = 0; j < NUM_CSNLOG_PARTITIONS; j++) (void)SimpleLruFlush(&xlogreaderList[i]->args->ExtendCsnlogCtlPtr[j], false); + (void)SimpleLruFlush(xlogreaderList[i]->args->ExtendMultiXactOffsetCtl, false); + (void)SimpleLruFlush(xlogreaderList[i]->args->ExtendMultiXactMemberCtl, false); + } } #endif @@ -11504,14 +11575,6 @@ void StartupXLOG(void) * Re-fetch the last valid or last applied record, so we can identify the * exact endpoint of what we consider the valid portion of WAL. */ -#ifdef ENABLE_SS_MULTIMASTER - do { - record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false); - t_thrd.xlog_cxt.LastRec = t_thrd.xlog_cxt.ReadRecPtr; - } while (record != NULL); - record = ReadRecord(xlogreader, t_thrd.xlog_cxt.ReadRecPtr, LOG, false); - EndOfLog = t_thrd.xlog_cxt.EndRecPtr; -#else xlogreader->readSegNo = 0; xlogreader->readOff = 0; xlogreader->readLen = 0; @@ -11519,7 +11582,6 @@ void StartupXLOG(void) UpdateTermFromXLog(record->xl_term); EndOfLog = t_thrd.xlog_cxt.EndRecPtr; -#endif XLByteToPrevSeg(EndOfLog, endLogSegNo); if (!SS_DISASTER_STANDBY_CLUSTER && ((ENABLE_DMS && SS_STANDBY_FAILOVER) || SS_STANDBY_PROMOTING)) { @@ -11606,11 +11668,6 @@ void StartupXLOG(void) Insert->LogicLSN = record->logic_lsn + 1; } g_instance.wal_cxt.prevValidPtr = t_thrd.xlog_cxt.LastRec; - - if (!IsInitdb) { - record = ReadRecord(xlogreader, checkPoint.redo, PANIC, false); - t_thrd.xlog_cxt.RedoLogicLSN = t_thrd.shemem_ptr_cxt.XLogCtl->Insert.RedoLogicLSN = record->logic_lsn; - } #else Insert->PrevByteSize = XLogRecPtrToBytePos(EndOfLog) - XLogRecPtrToBytePos(t_thrd.xlog_cxt.LastRec); #endif @@ -11781,12 +11838,7 @@ void StartupXLOG(void) #ifndef ENABLE_MULTIPLE_NODES StartupReplicationOrigin(); #endif -#ifdef ENABLE_SS_MULTIMASTER - if (SS_REDO_MODE) - TrimCLOG(); -#else TrimCLOG(); -#endif /* Reload shared-memory state for prepared transactions */ RecoverPreparedTransactions(); @@ -11811,6 +11863,10 @@ void StartupXLOG(void) /* Shut down the xlog reader facility. */ #ifdef ENABLE_SS_MULTIMASTER for (int i = 0; i < instanceNum; i++) { + errno_t rc = memcpy_s(g_instance.dms_cxt.SSReformerControl.argsList[i].ShmemVariableCache, sizeof(VariableCacheData), xlogReaderArgsList[i]->ShmemVariableCache, sizeof(VariableCacheData)); + securec_check(rc, "", ""); + rc = memcpy_s(g_instance.dms_cxt.SSReformerControl.argsList[i].PartMultiXactState, MUXACT_SIZE, xlogReaderArgsList[i]->MultiXactState, MUXACT_SIZE); + securec_check(rc, "", ""); XLogReaderFree(xlogreaderList[i]); xlogreaderList[i] = NULL; } @@ -11955,7 +12011,19 @@ void StartupXLOG(void) } g_instance.dms_cxt.SSRecoveryInfo.in_ondemand_recovery = false; } - +#ifdef ENABLE_SS_MULTIMASTER + if (SS_REDO_MODE) { + g_instance.dms_cxt.SSRecoveryInfo.cluster_ondemand_status = CLUSTER_NORMAL; + g_instance.dms_cxt.SSRecoveryInfo.ondemand_realtime_build_status = DISABLED; + /* for other nodes in cluster */ + LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); + g_instance.dms_cxt.SSReformerControl.clusterStatus = CLUSTER_NORMAL; + SSUpdateReformerCtrl(); + LWLockRelease(ControlFileLock); + SSRequestAllStandbyReloadReformCtrlPage(); + SSRequestAllNodeUpdateArgs(); + } +#else if (SS_PRIMARY_MODE) { g_instance.dms_cxt.SSRecoveryInfo.cluster_ondemand_status = CLUSTER_NORMAL; g_instance.dms_cxt.SSRecoveryInfo.ondemand_realtime_build_status = DISABLED; @@ -11966,6 +12034,7 @@ void StartupXLOG(void) LWLockRelease(ControlFileLock); SSRequestAllStandbyReloadReformCtrlPage(); } +#endif ereport(LOG, (errmsg("redo done, nextXid: " XID_FMT ", startupMaxXid: " XID_FMT ", recentLocalXmin: " XID_FMT ", recentGlobalXmin: %lu, PendingPreparedXacts: %d" @@ -12599,10 +12668,9 @@ XLogRecPtr GetRedoRecPtr(void) */ void GetFullPageWriteInfo(XLogFPWInfo *fpwInfo_p) { -#ifdef ENABLE_SS_MULTIMASTER - fpwInfo_p->redoRecPtr = t_thrd.xlog_cxt.RedoLogicLSN; -#else fpwInfo_p->redoRecPtr = t_thrd.xlog_cxt.RedoRecPtr; +#ifdef ENABLE_SS_MULTIMASTER + fpwInfo_p->logicRedoRecPtr = t_thrd.xlog_cxt.logicRedoRecPtr; #endif fpwInfo_p->doPageWrites = t_thrd.xlog_cxt.doPageWrites && !ENABLE_INCRE_CKPT; @@ -12914,6 +12982,46 @@ static void LogCheckpointEnd(bool restartpoint) } } +#ifdef ENABLE_SS_MULTIMASTER +uint64 GetCheckPointRedoLogicLSN(XLogRecPtr redo) { + uint64 logicLSN = 0; + TimeLineID tli = 0; + XLogReaderState *xlogreader = NULL; + XLogPageReadPrivate readprivate; + + if (!t_thrd.xlog_cxt.expectedTLIs && t_thrd.shemem_ptr_cxt.XLogCtl && t_thrd.shemem_ptr_cxt.ControlFile) { + tli = t_thrd.shemem_ptr_cxt.XLogCtl->ThisTimeLineID; + t_thrd.xlog_cxt.expectedTLIs = list_make1_int((int)tli); + + tli = t_thrd.shemem_ptr_cxt.ControlFile->checkPointCopy.ThisTimeLineID; + /* Build list with newest item first */ + if (!list_member_int(t_thrd.xlog_cxt.expectedTLIs, (int)tli)) { + t_thrd.xlog_cxt.expectedTLIs = lcons_int((int)tli, t_thrd.xlog_cxt.expectedTLIs); + } + } + + errno_t rc = memset_s(&readprivate, sizeof(XLogPageReadPrivate), 0, sizeof(XLogPageReadPrivate)); + securec_check(rc, "\0", "\0"); + if (ENABLE_DMS && ENABLE_DSS) { + xlogreader = SSXLogReaderAllocate(&SSXLogPageRead, &readprivate, ALIGNOF_BUFFER); + } else { + xlogreader = XLogReaderAllocate(&XLogPageRead, &readprivate); + } + xlogreader->xlogPath = g_instance.dms_cxt.SSRecoveryInfo.xlog_list[SS_MY_INST_ID]; + xlogreader->instId = SS_MY_INST_ID; + XLogRecord *record = ReadRecord(xlogreader, redo, LOG, false); + if (record != NULL) { + logicLSN = record->logic_lsn; + } else { + logicLSN = InvalidXLogRecPtr; + } + ShutdownReadFileFacility(); + XLogReaderFree(xlogreader); + xlogreader = NULL; + return logicLSN; +} +#endif + /* * Perform a checkpoint --- either during shutdown, or on-the-fly * @@ -13463,6 +13571,10 @@ void CreateCheckPoint(int flags) recptr = XLogInsert(RM_XLOG_ID, shutdown ? XLOG_CHECKPOINT_SHUTDOWN : XLOG_CHECKPOINT_ONLINE); XLogWaitFlush(recptr); +#ifdef ENABLE_SS_MULTIMASTER + t_thrd.xlog_cxt.logicRedoRecPtr = xlogctl->Insert.logicRedoRecPtr = t_thrd.shemem_ptr_cxt.XLogCtl->logicRedoRecPtr = GetCheckPointRedoLogicLSN(checkPoint.redo); + g_curr_lsn = 0; +#endif /* * We mustn't write any new WAL after a shutdown checkpoint, or it will be @@ -13540,15 +13652,6 @@ void CreateCheckPoint(int flags) */ END_CRIT_SECTION(); -#ifdef ENABLE_SS_MULTIMASTER - if (!IsInitdb) { - uint32 logic_lsn = GetXlogRecordLogicLSN(checkPoint.redo); - if (logic_lsn) { - t_thrd.xlog_cxt.RedoLogicLSN = xlogctl->Insert.RedoLogicLSN = logic_lsn; - } - } -#endif - SyncPostCheckpoint(); if (!t_thrd.cbm_cxt.XlogCbmSys->firstCPCreated) { @@ -13988,6 +14091,11 @@ bool CreateRestartPoint(int flags) */ StartSuspendWalInsert(&lastlrc); xlogctl->Insert.RedoRecPtr = lastCheckPoint.redo; +#ifdef ENABLE_SS_MULTIMASTER + uint64 lastRedoLSN = GetCheckPointRedoLogicLSN(lastCheckPoint.redo); + xlogctl->Insert.logicRedoRecPtr = lastRedoLSN; + t_thrd.shemem_ptr_cxt.XLogCtl->logicRedoRecPtr = lastRedoLSN; +#endif #ifdef ENABLE_MOT CallCheckpointCallback(EVENT_CHECKPOINT_SNAPSHOT_READY, lastCheckPointRecPtr); #endif @@ -13996,6 +14104,9 @@ bool CreateRestartPoint(int flags) /* Also update the info_lck-protected copy */ SpinLockAcquire(&xlogctl->info_lck); xlogctl->RedoRecPtr = lastCheckPoint.redo; +#ifdef ENABLE_SS_MULTIMASTER + xlogctl->logicRedoRecPtr = lastRedoLSN; +#endif SpinLockRelease(&xlogctl->info_lck); /* @@ -14623,6 +14734,9 @@ void XLogPutNextOid(Oid nextOid) XLogRegisterData((char *)(&nextOid), sizeof(Oid)); (void)XLogInsert(RM_XLOG_ID, XLOG_NEXTOID); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif /* * We need not flush the NEXTOID record immediately, because any of the @@ -14659,7 +14773,11 @@ XLogRecPtr RequestXLogSwitch(void) /* XLOG SWITCH has no data */ XLogBeginInsert(); - return XLogInsert(RM_XLOG_ID, XLOG_SWITCH); + XLogRecPtr recPtr = XLogInsert(RM_XLOG_ID, XLOG_SWITCH); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif + return recPtr; } /* @@ -14679,6 +14797,9 @@ XLogRecPtr XLogRestorePoint(const char *rpName) XLogRegisterData((char *)&xlrec, sizeof(xl_restore_point)); RecPtr = XLogInsert(RM_XLOG_ID, XLOG_RESTORE_POINT); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif ereport(LOG, (errmsg("restore point \"%s\" created at %X/%X", rpName, (uint32)(RecPtr >> 32), (uint32)RecPtr))); @@ -14716,6 +14837,9 @@ static void XLogReportParameters(void) XLogRegisterData((char *)&xlrec, sizeof(xlrec)); recptr = XLogInsert(RM_XLOG_ID, XLOG_PARAMETER_CHANGE); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif XLogWaitFlush(recptr); } @@ -14779,6 +14903,9 @@ void UpdateFullPageWrites(void) XLogRegisterData((char *)(&u_sess->attr.attr_storage.fullPageWrites), sizeof(bool)); (void)XLogInsert(RM_XLOG_ID, XLOG_FPW_CHANGE); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } if (!u_sess->attr.attr_storage.fullPageWrites) { @@ -16162,6 +16289,9 @@ XLogRecPtr do_pg_stop_backup(char *labelfile, bool waitforarchive, unsigned long XLogBeginInsert(); XLogRegisterData((char *)(&startpoint), sizeof(startpoint)); stoppoint = XLogInsert(RM_XLOG_ID, XLOG_BACKUP_END); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif if (strcmp(u_sess->attr.attr_common.application_name, "gs_roach") == 0) { char *fileName = BACKUP_LABEL_FILE_ROACH; struct stat statbuf; @@ -19730,6 +19860,9 @@ void SetDelayXlogRecycle(bool toDelay, bool isRedo) XLogBeginInsert(); XLogRegisterData((char *)(&toDelay), sizeof(toDelay)); recptr = XLogInsert(RM_XLOG_ID, XLOG_DELAY_XLOG_RECYCLE); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif XLogWaitFlush(recptr); END_CRIT_SECTION(); } @@ -21712,69 +21845,3 @@ void XlogArchUwal(XLogRecPtr archRqstPtr) return; } -#ifdef ENABLE_SS_MULTIMASTER -/* - * get the LogicLSN of the xlog record - */ -uint32 GetXlogRecordLogicLSN(XLogRecPtr RecPtr) -{ - XLogReaderState *xlogreader = NULL; - XLogPageReadPrivate readprivate; - uint32 res = 0; - TimeLineID tli = 0; - XLogRecord *rec = NULL; - char *errorMsg = NULL; - errno_t rc = EOK; - - if (!t_thrd.xlog_cxt.expectedTLIs && t_thrd.shemem_ptr_cxt.XLogCtl && t_thrd.shemem_ptr_cxt.ControlFile) { - tli = t_thrd.shemem_ptr_cxt.XLogCtl->ThisTimeLineID; - t_thrd.xlog_cxt.expectedTLIs = list_make1_int((int)tli); - - tli = t_thrd.shemem_ptr_cxt.ControlFile->checkPointCopy.ThisTimeLineID; - /* Build list with newest item first */ - if (!list_member_int(t_thrd.xlog_cxt.expectedTLIs, (int)tli)) { - t_thrd.xlog_cxt.expectedTLIs = lcons_int((int)tli, t_thrd.xlog_cxt.expectedTLIs); - } - } - - if (!XRecOffIsValid(RecPtr)) { - ereport(ERROR, (errcode(ERRCODE_CASE_NOT_FOUND), - errmsg("invalid record offset at %X/%X.", (uint32)(RecPtr >> 32), (uint32)RecPtr))); - } - - /* Set up XLOG reader facility */ - rc = memset_s(&readprivate, sizeof(XLogPageReadPrivate), 0, sizeof(XLogPageReadPrivate)); - securec_check(rc, "\0", "\0"); - - xlogreader = SSXLogReaderAllocate(&SSXLogPageRead, &readprivate, ALIGNOF_BUFFER); - - if (xlogreader == NULL) { - ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory"), - errdetail("Failed while allocating an XLog reading processor"))); - } - xlogreader->system_identifier = GetSystemIdentifier(); - - if (dummyStandbyMode) { - rec = ReadRecord(xlogreader, RecPtr, LOG, false); - } else { - rec = XLogReadRecord(xlogreader, RecPtr, &errorMsg, true); - } - - if (rec != NULL) { - res = rec->logic_lsn; - - if (dummyStandbyMode) { - SetDummyStandbyEndRecPtr(xlogreader); - } - } - - /* Shut down readFile facility, free space. */ - ShutdownReadFileFacility(); - - /* Shut down the xlog reader facility. */ - XLogReaderFree(xlogreader); - xlogreader = NULL; - - return res; -} -#endif diff --git a/src/gausskernel/storage/access/transam/xloginsert.cpp b/src/gausskernel/storage/access/transam/xloginsert.cpp index 4e27cda69fcd4e24d0b52ca77da18a13f3a9f2b9..c5be1a0acb4d24cc3a2ee334ca84559605033315 100755 --- a/src/gausskernel/storage/access/transam/xloginsert.cpp +++ b/src/gausskernel/storage/access/transam/xloginsert.cpp @@ -765,7 +765,11 @@ static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info, XLogFPWInfo fpw_ */ XLogRecPtr page_lsn = PageGetLSN(regbuf->page); +#ifdef ENABLE_SS_MULTIMASTER + needs_backup = XLByteLE(page_lsn, fpw_info.logicRedoRecPtr); +#else needs_backup = XLByteLE(page_lsn, fpw_info.redoRecPtr); +#endif if (!needs_backup) { if (XLByteEQ(*fpw_lsn, InvalidXLogRecPtr) || XLByteLT(page_lsn, *fpw_lsn)) *fpw_lsn = page_lsn; @@ -1111,7 +1115,11 @@ XLogRecPtr XLogSaveBufferForHint(Buffer buffer, bool buffer_std) /* * Update RedoRecPtr so that we can make the right decision */ +#ifdef ENABLE_SS_MULTIMASTER + RedoRecPtr = t_thrd.shemem_ptr_cxt.XLogCtl->logicRedoRecPtr; +#else RedoRecPtr = GetRedoRecPtr(); +#endif /* * We assume page LSN is first data on *every* page that can be passed to diff --git a/src/gausskernel/storage/buffer/bufmgr.cpp b/src/gausskernel/storage/buffer/bufmgr.cpp index c8f317cef11d15004a201e35cef0586535b43a9f..6217b3e265c9367d90cf9d3465cc0966acd2fb82 100644 --- a/src/gausskernel/storage/buffer/bufmgr.cpp +++ b/src/gausskernel/storage/buffer/bufmgr.cpp @@ -6124,6 +6124,9 @@ void MarkBufferDirtyHint(Buffer buffer, bool buffer_std) t_thrd.vacuum_cxt.VacuumCostBalance += u_sess->attr.attr_storage.VacuumCostPageDirty; } } +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif buf_state = old_buf_state | (BM_DIRTY | BM_JUST_DIRTIED); diff --git a/src/gausskernel/storage/ipc/standby.cpp b/src/gausskernel/storage/ipc/standby.cpp index cbab224f79a4b89c6e22d3c36aec0ad3a7310259..beb5bb055fbbd2ccec4d754b716f895f3369a8d3 100755 --- a/src/gausskernel/storage/ipc/standby.cpp +++ b/src/gausskernel/storage/ipc/standby.cpp @@ -1200,6 +1200,9 @@ void CleanUpMakeCommitAbort(List* committingCsnList) XLogBeginInsert(); XLogRegisterData((char *)action, sizeof(TransactionId)); XLogInsert(RM_STANDBY_ID, XLOG_STANDBY_CSN_ABORTED); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } MemoryContext oldCtx = NULL; if (IsExtremeRtoRunning()) { diff --git a/src/gausskernel/storage/lmgr/lwlock.cpp b/src/gausskernel/storage/lmgr/lwlock.cpp index f2b5a1abb9fa6bcff4333493d240281012ce9def..8070ab9192d52af4c94fe280e2b421d53fb99448 100644 --- a/src/gausskernel/storage/lmgr/lwlock.cpp +++ b/src/gausskernel/storage/lmgr/lwlock.cpp @@ -396,6 +396,12 @@ int NumLWLocks(void) /* clog.c needs one per CLOG buffer */ numLocks += DMS_MAX_INSTANCES * NUM_CLOG_PARTITIONS * CLOGShmemBuffers(); + + if (ENABLE_DSS) { + numLocks += DMS_MAX_INSTANCES * (DSS_MAX_MXACTOFFSET + DSS_MAX_MXACTMEMBER); + } else { + numLocks += NUM_MXACTOFFSET_BUFFERS + NUM_MXACTMEMBER_BUFFERS; + } #else /* clog.c needs one per CLOG buffer */ numLocks += CLOGShmemBuffers(); @@ -405,7 +411,6 @@ int NumLWLocks(void) /* clog.c needs one per CLOG buffer */ numLocks += NUM_CLOG_PARTITIONS * CLOGShmemBuffers(); -#endif /* multixact.c needs two SLRU areas */ if (ENABLE_DSS) { @@ -413,6 +418,7 @@ int NumLWLocks(void) } else { numLocks += NUM_MXACTOFFSET_BUFFERS + NUM_MXACTMEMBER_BUFFERS; } +#endif /* async.c needs one per Async buffer */ numLocks += NUM_ASYNC_BUFFERS; diff --git a/src/gausskernel/storage/replication/bcm.cpp b/src/gausskernel/storage/replication/bcm.cpp index 4c00338f0bb5f63360dcaa0040235806a8305dab..c14284fdba5ec5b4a3160b40653f7c8856b6bd29 100644 --- a/src/gausskernel/storage/replication/bcm.cpp +++ b/src/gausskernel/storage/replication/bcm.cpp @@ -193,6 +193,9 @@ void BCMLogCU(Relation rel, uint64 offset, int col, BCMBitStatus status, int cou cuBlock = cstore_offset_to_cstoreblock(offset, align_size); recptr = log_cu_bcm(&(rel->rd_node), col, cuBlock, status, count); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -427,6 +430,9 @@ void BCMSetStatusBit(Relation rel, uint64 heapBlk, Buffer buf, BCMBitStatus stat recptr = log_heap_bcm(&(rel->rd_node), 0, heapBlk, status); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); } diff --git a/src/gausskernel/storage/replication/logical/logical.cpp b/src/gausskernel/storage/replication/logical/logical.cpp index 21169017533ff13f057e6484824e25ccc45df2ed..1f1d3f7b01240429a2c23cc9d62318609c922933 100644 --- a/src/gausskernel/storage/replication/logical/logical.cpp +++ b/src/gausskernel/storage/replication/logical/logical.cpp @@ -377,15 +377,24 @@ LogicalDecodingContext *CreateInitDecodingContext(const char *plugin, List *outp XLogRecPtr flushptr; /* start at current insert position */ +#ifdef ENABLE_SS_MULTIMASTER + slot->data.restart_lsn = GetXLogInsertLogicLSN(); +#else slot->data.restart_lsn = GetXLogInsertRecPtr(); +#endif /* make sure we have enough information to start */ flushptr = LogStandbySnapshot(); /* and make sure it's fsynced to disk */ XLogWaitFlush(flushptr); - } else + } else { +#ifdef ENABLE_SS_MULTIMASTER + slot->data.restart_lsn = t_thrd.shemem_ptr_cxt.XLogCtl->logicRedoRecPtr; +#else slot->data.restart_lsn = GetRedoRecPtr(); +#endif + } /* prevent WAL removal as fast as possible */ ReplicationSlotsComputeRequiredLSN(NULL); diff --git a/src/gausskernel/storage/replication/logical/snapbuild.cpp b/src/gausskernel/storage/replication/logical/snapbuild.cpp index 2efcda0676cb39fa68f7f6e38ebd1a306cb6695b..c97e169c7d6e2d835f855ba05228ae106e444a7e 100644 --- a/src/gausskernel/storage/replication/logical/snapbuild.cpp +++ b/src/gausskernel/storage/replication/logical/snapbuild.cpp @@ -1768,7 +1768,11 @@ void CheckPointSnapBuild(void) * We start of with a minimum of the last redo pointer. No new replication * slot will start before that, so that's a safe upper bound for removal. */ +#ifdef ENABLE_SS_MULTIMASTER + redo = t_thrd.shemem_ptr_cxt.XLogCtl->logicRedoRecPtr; +#else redo = GetRedoRecPtr(); +#endif /* now check for the restart ptrs from existing slots */ cutoff = ReplicationSlotsComputeLogicalRestartLSN(); diff --git a/src/gausskernel/storage/replication/slotfuncs.cpp b/src/gausskernel/storage/replication/slotfuncs.cpp index e2651805409ed3532178ee0f07101d878239f109..db67cbb8124e5b4b99d030886689a6cba7c0e55e 100755 --- a/src/gausskernel/storage/replication/slotfuncs.cpp +++ b/src/gausskernel/storage/replication/slotfuncs.cpp @@ -65,6 +65,9 @@ void log_slot_create(const ReplicationSlotPersistentData *slotInfo, char* extra_ XLogRegisterData(extra_content, strlen(extra_content) + 1); } recptr = XLogInsert(RM_SLOT_ID, XLOG_SLOT_CREATE); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif XLogWaitFlush(recptr); if (g_instance.attr.attr_storage.max_wal_senders > 0) WalSndWakeup(); @@ -99,6 +102,9 @@ void log_slot_advance(const ReplicationSlotPersistentData *slotInfo, char* extra XLogRegisterData(extra_content, strlen(extra_content) + 1); } Ptr = XLogInsert(RM_SLOT_ID, XLOG_SLOT_ADVANCE); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif XLogWaitFlush(Ptr); if (g_instance.attr.attr_storage.max_wal_senders > 0) WalSndWakeup(); @@ -132,6 +138,9 @@ void log_slot_drop(const char *name) XLogRegisterData((char *)&xlrec, ReplicationSlotPersistentDataConstSize); Ptr = XLogInsert(RM_SLOT_ID, XLOG_SLOT_DROP); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif XLogWaitFlush(Ptr); if (g_instance.attr.attr_storage.max_wal_senders > 0) WalSndWakeup(); @@ -160,6 +169,9 @@ void LogCheckSlot() XLogRegisterData((char *)LogicalSlot, size); recptr = XLogInsert(RM_SLOT_ID, XLOG_SLOT_CHECK); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif XLogWaitFlush(recptr); if (g_instance.attr.attr_storage.max_wal_senders > 0) WalSndWakeup(); @@ -1154,6 +1166,9 @@ void write_term_log(uint32 term) XLogRegisterData((char *)&term, sizeof(uint32)); recptr = XLogInsert(RM_SLOT_ID, XLOG_TERM_LOG); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif XLogWaitFlush(recptr); if (g_instance.attr.attr_storage.max_wal_senders > 0) { WalSndWakeup(); diff --git a/src/gausskernel/storage/smgr/cfs/cfs_md.cpp b/src/gausskernel/storage/smgr/cfs/cfs_md.cpp index 96568c9427cd11b30f1fa4378bc878e99b03f536..db2a88476f8e9a64d166cd09e6ec72dd038dbb09 100644 --- a/src/gausskernel/storage/smgr/cfs/cfs_md.cpp +++ b/src/gausskernel/storage/smgr/cfs/cfs_md.cpp @@ -773,6 +773,9 @@ void CfsShrinkRecord(const RelFileNode &node, ForkNumber forknum) XLogBeginInsert(); XLogRegisterData((char *)&data, sizeof(CfsShrink_t)); XLogRecPtr lsn = XLogInsert((RmgrId)RM_COMPRESSION_REL_ID, XLOG_CFS_SHRINK_OPERATION); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif END_CRIT_SECTION(); XLogWaitFlush(lsn); diff --git a/src/gausskernel/storage/smgr/segment/extent_group.cpp b/src/gausskernel/storage/smgr/segment/extent_group.cpp index ddfb334a213d44566997268cae14c3800cf61dbb..4ffd3bcac088faea58f1c51860e33dbcf08eba5d 100644 --- a/src/gausskernel/storage/smgr/segment/extent_group.cpp +++ b/src/gausskernel/storage/smgr/segment/extent_group.cpp @@ -475,6 +475,9 @@ void eg_init_bitmap_page(SegExtentGroup *seg, BlockNumber pageno, BlockNumber fi XLogRegisterBuffer(0, buffer, REGBUF_WILL_INIT); XLogRecPtr rec_ptr = XLogInsert(RM_SEGPAGE_ID, XLOG_SEG_INIT_MAPPAGE, SegmentBktId); PageSetLSN(BufferGetPage(buffer), rec_ptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif END_CRIT_SECTION(); @@ -495,6 +498,9 @@ void eg_init_invrsptr_page(SegExtentGroup *seg, BlockNumber pageno) XLogRegisterBuffer(0, buffer, REGBUF_WILL_INIT); XLogRecPtr rec_ptr = XLogInsert(RM_SEGPAGE_ID, XLOG_SEG_INIT_INVRSPTR_PAGE, SegmentBktId); PageSetLSN(page, rec_ptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif END_CRIT_SECTION(); SegUnlockReleaseBuffer(buffer); @@ -562,6 +568,9 @@ void eg_add_map_group(SegExtentGroup *seg, BlockNumber pageno, uint8 group_size, XLogRecPtr recptr = XLogInsert(RM_SEGPAGE_ID, XLOG_SEG_ADD_NEW_GROUP, SegmentBktId); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif END_CRIT_SECTION(); } SEGMENTTEST(EXTENT_GROUP_ADD_NEW_GROUP_XLOG, (errmsg("EXTENT_GROUP_ADD_NEW_GROUP_XLOG %s: add new group xlog success!\n", @@ -785,6 +794,10 @@ void eg_create_if_necessary(SegExtentGroup *seg) SEGMENTTEST(EXTENT_GROUP_CREATE_EXTENT, (errmsg("EXTENT_GROUP_CREATE_EXTENT %s: create segment file success!\n", g_instance.attr.attr_common.PGXCNodeName))); t_thrd.pgxact->delayChkpt = false; +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif + END_CRIT_SECTION(); } } diff --git a/src/gausskernel/storage/smgr/segment/space.cpp b/src/gausskernel/storage/smgr/segment/space.cpp index c34489e3c39a76da2a3698936d42378ac3e3074a..46bcdeaf6e4b5182c68fe09cd3fb5fb9ba254f58 100644 --- a/src/gausskernel/storage/smgr/segment/space.cpp +++ b/src/gausskernel/storage/smgr/segment/space.cpp @@ -366,6 +366,9 @@ SegSpace *spc_drop(Oid spcNode, Oid dbNode, bool redo) XLogRegisterData((char *)&spcNode, sizeof(Oid)); XLogRegisterData((char *)&dbNode, sizeof(Oid)); XLogRecPtr lsn = XLogInsert(RM_SEGPAGE_ID, XLOG_SEG_SPACE_DROP); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif END_CRIT_SECTION(); XLogWaitFlush(lsn); @@ -582,6 +585,9 @@ static void copy_extent(SegExtentGroup *seg, RelFileNode logic_rnode, uint32 log XLogRegisterData(pagedata, BLCKSZ); XLogRecPtr recptr = XLogInsert(RM_SEGPAGE_ID, XLOG_SEG_NEW_PAGE); PageSetLSN(pagedata, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif /* 2. double write */ if (dw_enabled() && pg_atomic_read_u32(&g_instance.ckpt_cxt_ctl->current_page_writer_count) > 0) { @@ -1123,6 +1129,9 @@ void spc_shrink_files(SegExtentGroup *seg, BlockNumber target_size, bool redo) xlog_data.forknum = seg->forknum; XLogRegisterData((char *)&xlog_data, sizeof(XLogDataSpaceShrink)); XLogRecPtr lsn = XLogInsert(RM_SEGPAGE_ID, XLOG_SEG_SPACE_SHRINK); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif /* Standby's log is reported in "redo_space_shrink" */ ereport(LOG, (errmsg("call space shrink files, filename: %s, xlog lsn: %lX", diff --git a/src/gausskernel/storage/smgr/segment/xlog_atomic_op.cpp b/src/gausskernel/storage/smgr/segment/xlog_atomic_op.cpp index c37285cdd00447fe97644e2fe86af754e745564f..40fc88c91bceef1a37ebd42ecdc25929ec2bad6d 100644 --- a/src/gausskernel/storage/smgr/segment/xlog_atomic_op.cpp +++ b/src/gausskernel/storage/smgr/segment/xlog_atomic_op.cpp @@ -352,6 +352,9 @@ void XLogAtomicOperation::XLogCommit(RmgrId rmid, uint8 info, int bucket_id) } } } +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif if (critical_section) { END_CRIT_SECTION(); diff --git a/src/gausskernel/storage/smgr/segstore.cpp b/src/gausskernel/storage/smgr/segstore.cpp index 1a20f8ccb28a9f8aeab7be9e068d6c66e72aed3d..2b0ca9e2f0becd9e9491dd58c3e5a1e2c1445395 100755 --- a/src/gausskernel/storage/smgr/segstore.cpp +++ b/src/gausskernel/storage/smgr/segstore.cpp @@ -1497,6 +1497,9 @@ void seg_extend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, cha PageSetLSN(BufferGetPage(seg_buffer), xlog_rec); PageSetLSN(buffer, xlog_rec); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } SegUnlockReleaseBuffer(seg_buffer); diff --git a/src/include/access/multi_redo_api.h b/src/include/access/multi_redo_api.h index 3a827824b1f147ee3c30cb634b4f2f8441e1c6ab..aad30c1fd98a4a2046c113425f20d2d89e660373 100644 --- a/src/include/access/multi_redo_api.h +++ b/src/include/access/multi_redo_api.h @@ -126,6 +126,7 @@ void MultiRedoMain(); void StartUpMultiRedo(XLogReaderState* xlogreader, uint32 privateLen); #else void StartUpMultiRedo(XLogReaderState *xlogreader, XLogReaderState **xlogreaderList, int instanceNum, uint32 privateLen); +void InitReaderStateByOld(XLogReaderState *newState, XLogReaderState *oldState); #endif void ProcTxnWorkLoad(bool force); diff --git a/src/include/access/multixact.h b/src/include/access/multixact.h index a9e0e0c898f6305c163b9cd9d084a74770e86493..90bb9764d56d4a42cc59cf26d92820ab358909a5 100644 --- a/src/include/access/multixact.h +++ b/src/include/access/multixact.h @@ -147,7 +147,7 @@ extern void SSMultiXactShmemClear(void); extern void SSGetMultiXactIdMembers(MultiXactId multi, int *nmembers, MultiXactMember **members, int owner); #ifdef ENABLE_SS_MULTIMASTER -extern Size SSMultiXactSize(void); +extern Size SSGetMultiXactSize(void); #endif #endif /* MULTIXACT_H */ diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 823c5aa266a036798f6e261905b50d27e1ba77a5..00578b47970b30caeb2f177043043a17f8ec1be5 100755 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -296,6 +296,9 @@ typedef struct CheckpointStatsData { */ typedef struct XLogFPWInfo { XLogRecPtr redoRecPtr; +#ifdef ENABLE_SS_MULTIMASTER + XLogRecPtr logicRedoRecPtr; +#endif bool doPageWrites; bool forcePageWrites; } XLogFPWInfo; @@ -455,7 +458,7 @@ typedef struct XLogCtlInsert { */ XLogRecPtr RedoRecPtr; /* current redo point for insertions */ #ifdef ENABLE_SS_MULTIMASTER - uint32 RedoLogicLSN; + XLogRecPtr logicRedoRecPtr; #endif bool forcePageWrites; /* forcing full-page writes for PITR? */ bool fullPageWrites; @@ -593,6 +596,7 @@ typedef struct XLogCtlData { #ifdef ENABLE_SS_MULTIMASTER uint32 lastReplayedLogicLSN; uint32 RedoStartLogicLSN; + XLogRecPtr logicRedoRecPtr; #endif /* timestamp of last COMMIT/ABORT record replayed (or being replayed) */ TimestampTz recoveryLastXTime; diff --git a/src/include/access/xlog_basic.h b/src/include/access/xlog_basic.h index 03cdb88974ef71e76dd28cd6c3692e34d028aebf..4596c4bf5530454dac5474504d05c2c2c97be6c6 100644 --- a/src/include/access/xlog_basic.h +++ b/src/include/access/xlog_basic.h @@ -504,10 +504,5 @@ typedef struct ShareStorageOperateCtl_ { extern List *readTimeLineHistory(TimeLineID targetTLI); -#ifdef ENABLE_SS_MULTIMASTER -extern void NodeArgsSwitch(XLogReaderArgs *args); -extern void CurrentArgsCopy(XLogReaderArgs *args); -#endif - #endif /* XLOG_BASIC_H */ diff --git a/src/include/ddes/dms/ss_common_attr.h b/src/include/ddes/dms/ss_common_attr.h index b552b520789dc021b022195b30d454e153c23d4d..182944b908c232976a9a73153beb05dcec9801ac 100644 --- a/src/include/ddes/dms/ss_common_attr.h +++ b/src/include/ddes/dms/ss_common_attr.h @@ -180,6 +180,9 @@ typedef enum SSBroadcastOp { BCAST_CHECK_DB_BACKENDS, BCAST_SEND_SNAPSHOT, BCAST_RELOAD_REFORM_CTRL_PAGE, +#ifdef ENABLE_SS_MULTIMASTER + BCAST_REDO_DONE, +#endif BCAST_END } SSBroadcastOp; diff --git a/src/include/ddes/dms/ss_dms_recovery.h b/src/include/ddes/dms/ss_dms_recovery.h index bb50344447cb595af88066aef804a429a356b218..bc1619581baf5ac8720d5070be34b76d83eccbb6 100644 --- a/src/include/ddes/dms/ss_dms_recovery.h +++ b/src/include/ddes/dms/ss_dms_recovery.h @@ -59,6 +59,17 @@ g_instance.dms_cxt.SSRecoveryInfo.ondemand_recovery_pause_status == PAUSE_FOR_PRUNE_TRXN_QUEUE) #define REFORM_CTRL_VERSION 1 + +#ifdef ENABLE_SS_MULTIMASTER +#define SHEMVARCACHE_SIZE 200 +#define MUXACT_SIZE 16 + +typedef struct reformArgs { + char ShmemVariableCache[SHEMVARCACHE_SIZE]; + char PartMultiXactState[MUXACT_SIZE]; +} reformArgs; +#endif + typedef struct st_reformer_ctrl { uint32 version; uint64 list_stable; // stable instances list @@ -66,6 +77,10 @@ typedef struct st_reformer_ctrl { int recoveryInstId; SSGlobalClusterState clusterStatus; ClusterRunMode clusterRunMode; +#ifdef ENABLE_SS_MULTIMASTER + reformArgs argsList[DMS_MAX_INSTANCE]; + +#endif pg_crc32c crc; } ss_reformer_ctrl_t; @@ -145,6 +160,9 @@ typedef struct ss_recovery_info { volatile ondemand_realtime_build_status_t ondemand_realtime_build_status; bool dorado_sharestorage_inited; // used in dorado mode volatile ondemand_recovery_pause_status_t ondemand_recovery_pause_status; +#ifdef ENABLE_SS_MULTIMASTER + bool redo_done; +#endif } ss_recovery_info_t; typedef struct ondemand_htab_ctrl { @@ -167,4 +185,4 @@ void StartupOndemandRecovery(); void OndemandRealtimeBuildHandleFailover(); -#endif \ No newline at end of file +#endif diff --git a/src/include/ddes/dms/ss_transaction.h b/src/include/ddes/dms/ss_transaction.h index ba4d62b1ca7e64e5b561207e439dfda6138debbe..222a04c48a017144818f6654db54776d2d50e7ef 100644 --- a/src/include/ddes/dms/ss_transaction.h +++ b/src/include/ddes/dms/ss_transaction.h @@ -127,6 +127,10 @@ void SSSendLatestSnapshotToStandby(TransactionId xmin, TransactionId xmax, Commi int SSUpdateLatestSnapshotOfStandby(char *data, uint32 len); int SSReloadReformCtrlPage(uint32 len); void SSRequestAllStandbyReloadReformCtrlPage(); +#ifdef ENABLE_SS_MULTIMASTER +void SSRequestAllNodeUpdateArgs(); +int SSUpdateArgs(uint32 len); +#endif bool SSCanFetchLocalSnapshotTxnRelatedInfo(); void SSGetMultiXactIdMembers(MultiXactId multi, int *nmembers, MultiXactMember **members, int owner); diff --git a/src/include/knl/knl_thread.h b/src/include/knl/knl_thread.h index 30c3c3408fe1d23fbce148bf4fc19f164bec3df3..cedb76ff047c656acbd03b32ffd2d201db57ba5f 100755 --- a/src/include/knl/knl_thread.h +++ b/src/include/knl/knl_thread.h @@ -608,8 +608,9 @@ typedef struct knl_t_xlog_context { */ XLogRecPtr RedoRecPtr; #ifdef ENABLE_SS_MULTIMASTER - uint32 RedoLogicLSN; + XLogRecPtr logicRedoRecPtr; #endif + /* * doPageWrites is this backend's local copy of (forcePageWrites || * fullPageWrites). It is used together with RedoRecPtr to decide whether diff --git a/src/include/storage/buf/bufpage.h b/src/include/storage/buf/bufpage.h index ccebc6f3f464d3eeb1cd627be213ab3d619f8fa0..51e38f536bc0ba3bec334deffb8a6838ef8a5f13 100644 --- a/src/include/storage/buf/bufpage.h +++ b/src/include/storage/buf/bufpage.h @@ -403,7 +403,6 @@ inline OffsetNumber PageGetMaxOffsetNumber(char* pghr) #define PageGetLSN(page) (((uint64)((PageHeader)(page))->pd_lsn.xlogid << 32) | ((PageHeader)(page))->pd_lsn.xrecoff) #define PageSetLSNInternal(page, lsn) \ (((PageHeader)(page))->pd_lsn.xlogid = (uint32)((lsn) >> 32), ((PageHeader)(page))->pd_lsn.xrecoff = (uint32)(lsn)) - #ifndef FRONTEND extern thread_local uint32 g_curr_lsn; inline void PageSetLSN(Page page, XLogRecPtr LSN, bool check = true)