diff --git a/src/common/backend/utils/misc/guc.cpp b/src/common/backend/utils/misc/guc.cpp index e0d5ad77f0397ea48f3154cfe95cacb7fec27fb5..77adb4848c254ee5d94d4f94aeeadc463bb671d9 100755 --- a/src/common/backend/utils/misc/guc.cpp +++ b/src/common/backend/utils/misc/guc.cpp @@ -5333,10 +5333,6 @@ void InitializePostmasterGUC() #endif g_instance.attr.attr_network.PoolerPort = g_instance.attr.attr_network.PostPortNumber + 1; parseDmsInstanceCount(); -#ifndef USE_ASSERT_CHECKING - /* in Release, this param is ON and undisclosed */ - g_instance.attr.attr_storage.dms_attr.enable_reform = true; -#endif } /* diff --git a/src/common/backend/utils/misc/guc/guc_storage.cpp b/src/common/backend/utils/misc/guc/guc_storage.cpp index 08477451b3d0686584866eebc463f93921724a3f..6666e902da234dd6d3eb7334144c7ed0d489a4b2 100755 --- a/src/common/backend/utils/misc/guc/guc_storage.cpp +++ b/src/common/backend/utils/misc/guc/guc_storage.cpp @@ -1134,20 +1134,6 @@ static void InitStorageConfigureNamesBool() NULL, NULL, NULL}, -#ifdef USE_ASSERT_CHECKING - {{"ss_enable_reform", - PGC_POSTMASTER, - NODE_SINGLENODE, - SHARED_STORAGE_OPTIONS, - gettext_noop("Whether use dms reform"), - NULL, - GUC_SUPERUSER_ONLY}, - &g_instance.attr.attr_storage.dms_attr.enable_reform, - true, - NULL, - NULL, - NULL}, -#endif {{"ss_enable_ssl", PGC_POSTMASTER, NODE_SINGLENODE, diff --git a/src/common/backend/utils/time/snapmgr.cpp b/src/common/backend/utils/time/snapmgr.cpp index 46fcabbf2261807ba4d88e444a820e25ccf7f1cf..2a15302850da62f303ff75ed8a40b356ba995718 100644 --- a/src/common/backend/utils/time/snapmgr.cpp +++ b/src/common/backend/utils/time/snapmgr.cpp @@ -374,39 +374,39 @@ bool CommittedXidVisibleInSnapshot(TransactionId xid, Snapshot snapshot, Buffer * Make a quick range check to eliminate most XIDs without looking at the * CSN log. */ -#ifdef ENABLE_SS_MULTIMASTER - int owner = xid % DMS_MAX_INSTANCE; - if (owner == SS_MY_INST_ID) { - if (TransactionIdPrecedes(xid, snapshot->xmin)) - return true; - } else { - if (TransactionIdPrecedes(xid, snapshot->g_xmin[owner])) - return true; - } -#else +// #ifdef ENABLE_SS_MULTIMASTER +// int owner = xid % DMS_MAX_INSTANCE; +// if (owner == SS_MY_INST_ID) { +// if (TransactionIdPrecedes(xid, snapshot->xmin)) +// return true; +// } else { +// if (TransactionIdPrecedes(xid, snapshot->g_xmin[owner])) +// return true; +// } +// #else if (TransactionIdPrecedes(xid, snapshot->xmin)) return true; -#endif +// #endif } loop: if (ENABLE_DMS) { -#ifndef ENABLE_SS_MULTIMASTER - /* fetch TXN info locally if either reformer, original primary, or normal primary */ - if (SSCanFetchLocalSnapshotTxnRelatedInfo()) { - csn = TransactionIdGetCommitSeqNo(xid, true, true, false, snapshot); - } else { - csn = SSTransactionIdGetCommitSeqNo(xid, true, true, false, snapshot, NULL, SS_PRIMARY_ID); - } -#else +// #ifndef ENABLE_SS_MULTIMASTER + // /* fetch TXN info locally if either reformer, original primary, or normal primary */ + // if (SSCanFetchLocalSnapshotTxnRelatedInfo()) { + // csn = TransactionIdGetCommitSeqNo(xid, true, true, false, snapshot); + // } else { + // csn = SSTransactionIdGetCommitSeqNo(xid, true, true, false, snapshot, NULL, SS_PRIMARY_ID); + // } +// #else int owner = xid % DMS_MAX_INSTANCE; if (owner == SS_MY_INST_ID) { csn = TransactionIdGetCommitSeqNo(xid, true, true, false, snapshot); } else { csn = SSTransactionIdGetCommitSeqNo(xid, true, true, false, snapshot, NULL, owner); } -#endif +// #endif } else { csn = TransactionIdGetCommitSeqNo(xid, true, true, false, snapshot); } diff --git a/src/gausskernel/ddes/adapter/ss_dms.cpp b/src/gausskernel/ddes/adapter/ss_dms.cpp index 5d675a186e348e2c0c125b19a1f0ede8a382f762..c0effb24237acf243c1d744007e8ede3566e06e2 100644 --- a/src/gausskernel/ddes/adapter/ss_dms.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms.cpp @@ -240,9 +240,9 @@ int dms_request_opengauss_xid_csn(dms_context_t *dms_ctx, dms_opengauss_xid_csn_ { return g_ss_dms_func.dms_request_opengauss_xid_csn(dms_ctx, dms_txn_info, xid_csn_result); } -int dms_request_opengauss_csn(dms_context_t *dms_ctx, unsigned char request, unsigned long long *result) +int dms_request_opengauss_csn(dms_context_t *dms_ctx, unsigned char res_type, unsigned char is_query, unsigned long long *result) { - return g_ss_dms_func.dms_request_opengauss_csn(dms_ctx, request, result); + return g_ss_dms_func.dms_request_opengauss_csn(dms_ctx, res_type, is_query, result); } int dms_request_opengauss_txn_status(dms_context_t *dms_ctx, unsigned char request, unsigned char *result) { diff --git a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp index e0c1f2b648b9808c909f1df4bc03049e47b1ba6d..145c1b33bca51a985afdbd16bca095b0926e63b0 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp @@ -69,9 +69,30 @@ static unsigned long long CBGetGlobalXID(void *db_handle) return ReadNewTransactionId(); } -static int CBGetGlobalCSN(void *db_handle, unsigned char request, unsigned long long *result) +static int CBProcessGetGlobalXID(unsigned char src_inst, unsigned char is_query, unsigned long long *result) { - if (request) { + if (!is_query) { + (void)LWLockAcquire(XidGenLock, LW_EXCLUSIVE); + if (SS_MY_INST_ID == (t_thrd.xact_cxt.ShmemVariableCache->nextXid % DMS_MAX_INSTANCE)) { + *result = t_thrd.xact_cxt.ShmemVariableCache->nextXid = t_thrd.xact_cxt.ShmemVariableCache->nextXid + src_inst; + } else { + *result = t_thrd.xact_cxt.ShmemVariableCache->nextXid = t_thrd.xact_cxt.ShmemVariableCache->nextXid + DMS_MAX_INSTANCE - + (t_thrd.xact_cxt.ShmemVariableCache->nextXid % DMS_MAX_INSTANCE) + src_inst; + } + LWLockRelease(XidGenLock); + } else { + (void)LWLockAcquire(XidGenLock, LW_SHARED); + *result = t_thrd.xact_cxt.ShmemVariableCache->nextXid; + LWLockRelease(XidGenLock); + } + + ereport(LOG, (errmsg("CB Current global xid is %llu", t_thrd.xact_cxt.ShmemVariableCache->nextXid))); + return DMS_SUCCESS; +} + +static int CBProcessGetGlobalCSN(unsigned char is_query, unsigned long long *result) +{ + if (!is_query) { *result = pg_atomic_fetch_add_u64(&t_thrd.xact_cxt.ShmemVariableCache->nextCommitSeqNo, 1); } else { *result = pg_atomic_read_u64(&t_thrd.xact_cxt.ShmemVariableCache->nextCommitSeqNo); @@ -80,6 +101,49 @@ static int CBGetGlobalCSN(void *db_handle, unsigned char request, unsigned long return DMS_SUCCESS; } +static int CBProcessGetGlobalMultiXID(unsigned char src_inst, unsigned char is_query, unsigned long long *result) +{ + if (!is_query) { + (void)LWLockAcquire(MultiXactGenLock, LW_EXCLUSIVE); + if (SS_MY_INST_ID == (t_thrd.shemem_ptr_cxt.MultiXactState->nextMXact % DMS_MAX_INSTANCE)) { + *result = t_thrd.shemem_ptr_cxt.MultiXactState->nextMXact = t_thrd.shemem_ptr_cxt.MultiXactState->nextMXact + src_inst; + } else { + *result = t_thrd.shemem_ptr_cxt.MultiXactState->nextMXact = t_thrd.shemem_ptr_cxt.MultiXactState->nextMXact + DMS_MAX_INSTANCE - + (t_thrd.shemem_ptr_cxt.MultiXactState->nextMXact % DMS_MAX_INSTANCE) + src_inst; + } + LWLockRelease(MultiXactGenLock); + } else { + (void)LWLockAcquire(MultiXactGenLock, LW_SHARED); + *result = t_thrd.shemem_ptr_cxt.MultiXactState->nextMXact; + LWLockRelease(MultiXactGenLock); + } + + ereport(LOG, (errmsg("CB Current global multixid is %llu", t_thrd.shemem_ptr_cxt.MultiXactState->nextMXact))); + return DMS_SUCCESS; +} + +static int CBGetGlobalCSN(void *db_handle, unsigned char src_inst, unsigned char res_type, unsigned char is_query, unsigned long long *result) +{ + int32 ret = DMS_ERROR; + + switch ((SSGlobalValType)res_type) { + case SS_GLOBAL_XID: + ret = CBProcessGetGlobalXID(src_inst, is_query, result); + break; + case SS_GLOBAL_CSN: + ret = CBProcessGetGlobalCSN(is_query, result); + break; + case SS_GLOBAL_MULTIXID: + ret = CBProcessGetGlobalMultiXID(src_inst, is_query, result); + break; + default: + ereport(WARNING, (errmodule(MOD_DMS), errmsg("invalid global res type"))); + break; + } + + return ret; +} + /* * Wake up startup process to replay WAL, or to notice that * failover has been requested. @@ -178,13 +242,13 @@ static CommitSeqNo TransactionWaitCommittingCSN(dms_opengauss_xid_csn_t *xid_csn snapshotcsn, latestCSN, xid))); -#ifndef ENABLE_SS_MULTIMASTER +// #ifndef ENABLE_SS_MULTIMASTER /* in this case, SS tuple is not visible; to return ABORT is inappropriate, so let standby judge */ return latestCSN; -#else - /* return true csn (with status), letting client handle -- for multi-write(peer-nodes) */ - return csn; -#endif +// #else +// /* return true csn (with status), letting client handle -- for multi-write(peer-nodes) */ +// return csn; +// #endif } } else { parentXid = (TransactionId)GET_PARENTXID(csn); @@ -579,7 +643,7 @@ static int CBSaveStableList(void *db_handle, unsigned long long list_stable, uns g_instance.dms_cxt.SSReformerControl.primaryInstId = primary_id; g_instance.dms_cxt.SSReformerControl.list_stable = list_stable; int ret = DMS_ERROR; - SSLockReleaseAll(); + //SSLockReleaseAll(); SSSyncOldestXminWhenReform(reformer_id); if ((int)primary_id == SS_MY_INST_ID) { @@ -1658,7 +1722,7 @@ static int CBRecoveryPrimary(void *db_handle, int inst_id) g_instance.dms_cxt.SSReformerControl.primaryInstId))); /* Release my own lock before recovery */ - SSLockReleaseAll(); + //SSLockReleaseAll(); SSWakeupRecovery(); if (!SSRecoveryNodes()) { g_instance.dms_cxt.SSRecoveryInfo.recovery_pause_flag = true; diff --git a/src/gausskernel/ddes/adapter/ss_dms_recovery.cpp b/src/gausskernel/ddes/adapter/ss_dms_recovery.cpp index ac58f583c4ad79856d45f764626d27e16e644cca..76d013195a9f52a302d0715341ff36316d5fd5c3 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_recovery.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_recovery.cpp @@ -147,10 +147,6 @@ bool SSRecoveryNodes() bool SSRecoveryApplyDelay() { - if (!ENABLE_REFORM) { - return false; - } - if (SS_DISASTER_STANDBY_CLUSTER) { return true; } @@ -174,7 +170,7 @@ void SSInitReformerControlPages(void) struct stat st; if (stat(XLOG_CONTROL_FILE, &st) == 0 && S_ISREG(st.st_mode)) { SSReadControlFile(REFORM_CTRL_PAGE); -#ifndef ENABLE_SS_MULTIMASTER +// #ifndef ENABLE_SS_MULTIMASTER if (g_instance.dms_cxt.SSReformerControl.list_stable != 0 || g_instance.dms_cxt.SSReformerControl.primaryInstId == SS_MY_INST_ID) { (void)printf("[SS] ERROR: files from last install must be cleared.\n"); @@ -182,7 +178,7 @@ void SSInitReformerControlPages(void) } (void)printf("[SS] Current node:%d acknowledges cluster PRIMARY node:%d.\n", SS_MY_INST_ID, g_instance.dms_cxt.SSReformerControl.primaryInstId); -#endif +// #endif return; } diff --git a/src/gausskernel/ddes/adapter/ss_init.cpp b/src/gausskernel/ddes/adapter/ss_init.cpp index 51177ef492c251f3f54a0da0da00575431d61000..dfdd3addc3b92920f80a82e66fadbfa9bdbdfc6c 100644 --- a/src/gausskernel/ddes/adapter/ss_init.cpp +++ b/src/gausskernel/ddes/adapter/ss_init.cpp @@ -390,7 +390,6 @@ static void setDMSProfile(dms_profile_t* profile) setScrlConfig(profile); SetOckLogPath(dms_attr, profile->ock_log_path); profile->inst_map = 0; - profile->enable_reform = (unsigned char)dms_attr->enable_reform; profile->parallel_thread_num = dms_attr->parallel_thread_num; profile->max_wait_time = DMS_MSG_MAX_WAIT_TIME; @@ -459,12 +458,6 @@ void DMSInit() } rc = memset_s(g_instance.dms_cxt.conninfo, MAXCONNINFO, '\0', MAXCONNINFO); securec_check(rc, "", ""); - -#ifdef USE_ASSERT_CHECKING - if (!ENABLE_REFORM && SS_NORMAL_STANDBY) { - SSStandbySetLibpqswConninfo(); - } -#endif } void GetSSLogPath(char *sslog_path) diff --git a/src/gausskernel/ddes/adapter/ss_transaction.cpp b/src/gausskernel/ddes/adapter/ss_transaction.cpp index 3a085a0a0a2d4c1d761a5ab753ce59d12af8b487..a3a8e301307d495be39beaad216304d73b75b6cc 100644 --- a/src/gausskernel/ddes/adapter/ss_transaction.cpp +++ b/src/gausskernel/ddes/adapter/ss_transaction.cpp @@ -172,16 +172,17 @@ int SSUpdateLatestSnapshotOfStandby(char *data, uint32 len) int SSTransactionSendBoc() { - unsigned long long success_inst; - unsigned long long ruid; - dms_context_t dms_ctx; - InitDmsContext(&dms_ctx); + //return 1; + unsigned long long success_inst; + unsigned long long ruid; + dms_context_t dms_ctx; + InitDmsContext(&dms_ctx); - // CommitSeqNo csn = t_thrd.xact_cxt.ShmemVariableCache->nextCommitSeqNo; - TransactionId xmin = pg_atomic_read_u64(&t_thrd.xact_cxt.ShmemVariableCache->recentGlobalXmin); + // CommitSeqNo csn = t_thrd.xact_cxt.ShmemVariableCache->nextCommitSeqNo; + TransactionId xmin = pg_atomic_read_u64(&t_thrd.xact_cxt.ShmemVariableCache->recentGlobalXmin); - int ret = dms_send_boc(&dms_ctx, 0, xmin, &success_inst, &ruid); - return ret; + int ret = dms_send_boc(&dms_ctx, 0, xmin, &success_inst, &ruid); + return ret; } static int SSTransactionIdGetCSN(dms_opengauss_xid_csn_t *dms_txn_info, dms_opengauss_csn_result_t *xid_csn_result, int owner) @@ -240,30 +241,62 @@ static inline bool IsClogStatusDefinitive(CLogXidStatus status) return (status != CLOG_XID_STATUS_IN_PROGRESS && status != CLOG_XID_STATUS_SUB_COMMITTED); } -uint64 SSGetGlobalCSN(unsigned char request) +// uint64 SSGetGlobalCSN(unsigned char request) +// { +// unsigned long long csn = 0; + +// dms_context_t dms_ctx; +// InitDmsContext(&dms_ctx); +// dms_ctx.xid_ctx.inst_id = (unsigned char)CSN_OWNER_NODE; + +// do { +// if (dms_request_opengauss_csn(&dms_ctx, request, &csn) == DMS_SUCCESS) { +// ereport(DEBUG1, (errmsg("SS get csn success, csn=%llu.", csn))); +// pg_atomic_exchange_u64(&t_thrd.xact_cxt.ShmemVariableCache->nextCommitSeqNo, csn); +// break; +// } else { +// if (SS_IN_REFORM && +// (t_thrd.role == WORKER || t_thrd.role == THREADPOOL_WORKER || t_thrd.role == STREAM_WORKER)) { +// ereport(FATAL, (errmsg("SSGetGlobalCSN failed during reform."))); +// } +// pg_usleep(USECS_PER_SEC); +// continue; +// } +// } while (true); + +// return (uint64)csn; +// } + +uint64 SSGetGlobalVal(SSGlobalValType val_type, bool is_query) { - unsigned long long csn = 0; + uint64 result = 0; dms_context_t dms_ctx; InitDmsContext(&dms_ctx); dms_ctx.xid_ctx.inst_id = (unsigned char)CSN_OWNER_NODE; do { - if (dms_request_opengauss_csn(&dms_ctx, request, &csn) == DMS_SUCCESS) { - ereport(DEBUG1, (errmsg("SS get csn success, csn=%llu.", csn))); - pg_atomic_exchange_u64(&t_thrd.xact_cxt.ShmemVariableCache->nextCommitSeqNo, csn); + if (dms_request_opengauss_csn(&dms_ctx, val_type, is_query, (unsigned long long *)&result) == DMS_SUCCESS) { + ereport(DEBUG1, (errmsg("SS get global val success, result=%llu.", result))); + if (val_type == SS_GLOBAL_XID) { + pg_atomic_exchange_u64(&t_thrd.xact_cxt.ShmemVariableCache->nextXid, result); + } else if (val_type == SS_GLOBAL_CSN) { + pg_atomic_exchange_u64(&t_thrd.xact_cxt.ShmemVariableCache->nextCommitSeqNo, result); + } else if (val_type == SS_GLOBAL_MULTIXID) { + SSSetGlobalMultiXID(result); + } break; } else { if (SS_IN_REFORM && (t_thrd.role == WORKER || t_thrd.role == THREADPOOL_WORKER || t_thrd.role == STREAM_WORKER)) { - ereport(FATAL, (errmsg("SSGetGlobalCSN failed during reform."))); + ereport(FATAL, (errmsg("SSGetGlobalVal failed during reform."))); } pg_usleep(USECS_PER_SEC); continue; } } while (true); - return (uint64)csn; + return result; } /* @@ -324,11 +357,11 @@ CommitSeqNo SSTransactionIdGetCommitSeqNo(TransactionId transactionId, bool isCo dms_txn_info.is_nest = (unsigned char)isNest; if (snapshot != NULL) { dms_txn_info.snapshotcsn = snapshot->snapshotcsn; -#ifdef ENABLE_SS_MULTIMASTER - dms_txn_info.snapshotxmin = snapshot->g_xmin[transactionId % DMS_MAX_INSTANCE]; -#else +// #ifdef ENABLE_SS_MULTIMASTER +// dms_txn_info.snapshotxmin = snapshot->g_xmin[transactionId % DMS_MAX_INSTANCE]; +// #else dms_txn_info.snapshotxmin = snapshot->xmin; -#endif +// #endif } else { dms_txn_info.snapshotcsn = InvalidCommitSeqNo; dms_txn_info.snapshotxmin = InvalidTransactionId; @@ -419,13 +452,13 @@ CommitSeqNo SSTransactionIdGetCommitSeqNo(TransactionId transactionId, bool isCo * 2. HEAP_MAX_COMMITTED can only be set when ensured CLOG status is commited * 3. CLOG status and CSN status with order bound */ -#ifdef ENABLE_SS_MULTIMASTER - if (COMMITSEQNO_IS_COMMITTED(csn)) { - t_thrd.xact_cxt.cachedFetchXid = transactionId; - /* keep consistent with the HEAP_MAX_COMMITED set rule, check in HeapTupleSatisfiesMVCC-XidVisibleInSnapshot-setHintBIt HEAP_MAX_COMMITED */ - t_thrd.xact_cxt.cachedFetchXidStatus = CLOG_XID_STATUS_COMMITTED; - } -#endif +// #ifdef ENABLE_SS_MULTIMASTER +// if (COMMITSEQNO_IS_COMMITTED(csn)) { +// t_thrd.xact_cxt.cachedFetchXid = transactionId; +// /* keep consistent with the HEAP_MAX_COMMITED set rule, check in HeapTupleSatisfiesMVCC-XidVisibleInSnapshot-setHintBIt HEAP_MAX_COMMITED */ +// t_thrd.xact_cxt.cachedFetchXidStatus = CLOG_XID_STATUS_COMMITTED; +// } +// #endif } if (IsClogStatusDefinitive(clogstatus)) { @@ -738,8 +771,8 @@ void SSSendSharedInvalidMessages(const SharedInvalidationMessage *msgs, int n) } int backup_output = t_thrd.postgres_cxt.whereToSendOutput; t_thrd.postgres_cxt.whereToSendOutput = DestNone; - int ret = dms_broadcast_opengauss_ddllock(&dms_ctx, (char *)&ssmsg, sizeof(SSBroadcastSI), - (unsigned char)false, SS_BROADCAST_WAIT_FIVE_SECONDS, (unsigned char)SHARED_INVAL_MSG); + int ret = dms_broadcast_msg(&dms_ctx, (char *)&ssmsg, sizeof(SSBroadcastSI), + (unsigned char)false, SS_BROADCAST_WAIT_FIVE_SECONDS); if (ret != DMS_SUCCESS) { ereport(DEBUG1, (errmsg("SS broadcast SI msg failed!"))); } diff --git a/src/gausskernel/optimizer/commands/vacuumlazy.cpp b/src/gausskernel/optimizer/commands/vacuumlazy.cpp index c34218a1f0fef2b177919b93ee755e1fea5c9f1a..e85d375b77c4dec4d5cc1f4763d052a855d8228b 100644 --- a/src/gausskernel/optimizer/commands/vacuumlazy.cpp +++ b/src/gausskernel/optimizer/commands/vacuumlazy.cpp @@ -1241,12 +1241,12 @@ static IndexBulkDeleteResult** lazy_scan_heap( ereport(WARNING, (errmsg("relation \"%s\" page %u is uninitialized --- fixing", relname, blkno))); HeapPageHeader phdr = (HeapPageHeader)page; PageInit(page, BufferGetPageSize(buf), 0, true); -#ifdef ENABLE_SS_MULTIMASTER - phdr->pd_xid_base = ENABLE_DMS ? u_sess->utils_cxt.CurrentSnapshot->g_oldestxmin - FirstNormalTransactionId - : u_sess->utils_cxt.RecentXmin - FirstNormalTransactionId; -#else +// #ifdef ENABLE_SS_MULTIMASTER +// phdr->pd_xid_base = ENABLE_DMS ? u_sess->utils_cxt.CurrentSnapshot->g_oldestxmin - FirstNormalTransactionId +// : u_sess->utils_cxt.RecentXmin - FirstNormalTransactionId; +// #else phdr->pd_xid_base = u_sess->utils_cxt.RecentXmin - FirstNormalTransactionId; -#endif +// #endif phdr->pd_multi_base = 0; const char* algo = RelationGetAlgo(onerel); if (RelationisEncryptEnable(onerel) || (algo && *algo != '\0')) { diff --git a/src/gausskernel/process/postmaster/postmaster.cpp b/src/gausskernel/process/postmaster/postmaster.cpp index 087023b5d1ac586042f302c46bab74dce6614e7c..ac0e30279c622ad13787e4b9027d95b21ca637fe 100644 --- a/src/gausskernel/process/postmaster/postmaster.cpp +++ b/src/gausskernel/process/postmaster/postmaster.cpp @@ -3062,7 +3062,7 @@ int PostmasterMain(int argc, char* argv[]) g_instance.attr.attr_storage.dms_attr.instance_id, src_id))); Assert(src_id >= 0 && src_id <= DMS_MAX_INSTANCE - 1); - if (!SS_OFFICIAL_PRIMARY && g_instance.attr.attr_storage.dms_attr.enable_reform) { + if (!SS_OFFICIAL_PRIMARY) { const long SLEEP_ONE_SEC = 1000000L; while (g_instance.dms_cxt.SSReformerControl.list_stable == 0) { pg_usleep(SLEEP_ONE_SEC); @@ -3135,7 +3135,7 @@ int PostmasterMain(int argc, char* argv[]) /* init sharestorge(dorado) */ ShareStorageInit(); exrto_standby_read_init(); - if (ENABLE_DMS && ENABLE_REFORM) { + if (ENABLE_DMS) { if (!DMSWaitInitStartup()) { if (g_instance.pid_cxt.StartupPID == 0) { ereport(LOG, (errmsg("[SS reform] Node:%d first startup fail and exit", SS_MY_INST_ID))); @@ -3898,7 +3898,7 @@ static int ServerLoop(void) gs_signal_setmask(&t_thrd.libpq_cxt.UnBlockSig, NULL); (void)gs_signal_unblock_sigusr2(); - if (ENABLE_DMS && ENABLE_REFORM && g_instance.dms_cxt.SSRecoveryInfo.startup_reform + if (ENABLE_DMS && g_instance.dms_cxt.SSRecoveryInfo.startup_reform && !startup_reform_finish) { ereport(LOG, (errmsg("[SS reform] Node:%d first-round reform start wait.", SS_MY_INST_ID))); if (!DMSWaitReform()) { diff --git a/src/gausskernel/storage/access/heap/heapam.cpp b/src/gausskernel/storage/access/heap/heapam.cpp index aa850e2f2c192dc7b63d8900c9011ba5a5403290..d213d9dbbfd223df47acef0e54aeb122f31941f3 100755 --- a/src/gausskernel/storage/access/heap/heapam.cpp +++ b/src/gausskernel/storage/access/heap/heapam.cpp @@ -3681,12 +3681,12 @@ bool heap_page_prepare_for_xid(Relation relation, Buffer buffer, TransactionId x } if (PageGetMaxOffsetNumber(page) == InvalidOffsetNumber && !PageIsCompressed(page) && !multi) { -#ifdef ENABLE_SS_MULTIMASTER - TransactionId xid_base = ENABLE_DMS ? u_sess->utils_cxt.CurrentSnapshot->g_oldestxmin - FirstNormalTransactionId - : u_sess->utils_cxt.RecentXmin - FirstNormalTransactionId; -#else +// #ifdef ENABLE_SS_MULTIMASTER +// TransactionId xid_base = ENABLE_DMS ? u_sess->utils_cxt.CurrentSnapshot->g_oldestxmin - FirstNormalTransactionId +// : u_sess->utils_cxt.RecentXmin - FirstNormalTransactionId; +// #else TransactionId xid_base = u_sess->utils_cxt.RecentXmin - FirstNormalTransactionId; -#endif +// #endif ereport(LOG, (errmsg("new page, the xid base is not correct, base is %lu, reset the xid_base to %lu", base, xid_base))); @@ -7862,11 +7862,11 @@ TransactionId MultiXactIdGetUpdateXid(TransactionId xmax, uint16 t_infomask, uin { if (ENABLE_DMS) { /* fetch TXN info locally if either reformer, original primary, or normal primary */ -#ifndef ENABLE_SS_MULTIMASTER +// #ifndef ENABLE_SS_MULTIMASTER bool local_fetch = SSCanFetchLocalSnapshotTxnRelatedInfo(); -#else - bool local_fetch = ((int)xmax % DMS_MAX_INSTANCE) == SS_MY_INST_ID ? true : false; -#endif +// #else +// bool local_fetch = ((int)xmax % DMS_MAX_INSTANCE) == SS_MY_INST_ID ? true : false; +// #endif if (!local_fetch) { return SSMultiXactIdGetUpdateXid(xmax, t_infomask, t_infomask2); } diff --git a/src/gausskernel/storage/access/heap/heapam_visibility.cpp b/src/gausskernel/storage/access/heap/heapam_visibility.cpp index 6b0e7e675b07d823a6e9cb3a24cfb517ca9a42dc..013b04083453d55ddfee399b8f550f1a77b60527 100644 --- a/src/gausskernel/storage/access/heap/heapam_visibility.cpp +++ b/src/gausskernel/storage/access/heap/heapam_visibility.cpp @@ -183,7 +183,7 @@ static inline void SetHintBits(HeapTupleHeader tuple, Buffer buffer, uint16 info if (TransactionIdIsValid(xid)) { #ifdef ENABLE_SS_MULTIMASTER - if (ENABLE_DMS && (xid % DMS_MAX_INSTANCE != SS_MY_INST_ID)) return; + if (ENABLE_DMS && (xid % DMS_MAX_INSTANCE == SS_MY_INST_ID)) { #endif /* NB: xid must be known committed here! */ XLogRecPtr commitLSN = TransactionIdGetCommitLSN(xid); @@ -192,6 +192,9 @@ static inline void SetHintBits(HeapTupleHeader tuple, Buffer buffer, uint16 info /* not flushed and no LSN interlock, so don't set hint */ return; } +#ifdef ENABLE_SS_MULTIMASTER + } +#endif } tuple->t_infomask |= infomask; diff --git a/src/gausskernel/storage/access/heap/hio.cpp b/src/gausskernel/storage/access/heap/hio.cpp index 5504a85f82e5c80479ec0653f1806d86a4c0bc9f..558e62c805b51538c683300ec056815e2fd94d1e 100644 --- a/src/gausskernel/storage/access/heap/hio.cpp +++ b/src/gausskernel/storage/access/heap/hio.cpp @@ -190,12 +190,12 @@ void RelationAddExtraBlocks(Relation relation, BulkInsertState bistate) } else { HeapPageHeader phdr = (HeapPageHeader)page; PageInit(page, BufferGetPageSize(buffer), 0, true); -#ifdef ENABLE_SS_MULTIMASTER - phdr->pd_xid_base = ENABLE_DMS ? u_sess->utils_cxt.CurrentSnapshot->g_oldestxmin - FirstNormalTransactionId - : u_sess->utils_cxt.RecentXmin - FirstNormalTransactionId; -#else +// #ifdef ENABLE_SS_MULTIMASTER +// phdr->pd_xid_base = ENABLE_DMS ? u_sess->utils_cxt.CurrentSnapshot->g_oldestxmin - FirstNormalTransactionId +// : u_sess->utils_cxt.RecentXmin - FirstNormalTransactionId; +// #else phdr->pd_xid_base = u_sess->utils_cxt.RecentXmin - FirstNormalTransactionId; -#endif +// #endif phdr->pd_multi_base = 0; const char* algo = RelationGetAlgo(relation); if (RelationisEncryptEnable(relation) || (algo && *algo != '\0')) { @@ -676,12 +676,12 @@ loop: phdr = (HeapPageHeader)page; PageInit(page, BufferGetPageSize(buffer), 0, true); -#ifdef ENABLE_SS_MULTIMASTER - phdr->pd_xid_base = ENABLE_DMS ? u_sess->utils_cxt.CurrentSnapshot->g_oldestxmin - FirstNormalTransactionId - : u_sess->utils_cxt.RecentXmin - FirstNormalTransactionId; -#else +// #ifdef ENABLE_SS_MULTIMASTER +// phdr->pd_xid_base = ENABLE_DMS ? u_sess->utils_cxt.CurrentSnapshot->g_oldestxmin - FirstNormalTransactionId +// : u_sess->utils_cxt.RecentXmin - FirstNormalTransactionId; +// #else phdr->pd_xid_base = u_sess->utils_cxt.RecentXmin - FirstNormalTransactionId; -#endif +// #endif phdr->pd_multi_base = 0; const char* algo = RelationGetAlgo(relation); if (RelationisEncryptEnable(relation) || (algo && *algo != '\0')) { @@ -799,12 +799,12 @@ Buffer RelationGetNewBufferForBulkInsert(Relation relation, Size len, Size dict_ phdr = (HeapPageHeader)page; PageInit(page, BufferGetPageSize(buffer), 0, true); -#ifdef ENABLE_SS_MULTIMASTER - phdr->pd_xid_base = ENABLE_DMS ? u_sess->utils_cxt.CurrentSnapshot->g_oldestxmin - FirstNormalTransactionId - : u_sess->utils_cxt.RecentXmin - FirstNormalTransactionId; -#else +// #ifdef ENABLE_SS_MULTIMASTER +// phdr->pd_xid_base = ENABLE_DMS ? u_sess->utils_cxt.CurrentSnapshot->g_oldestxmin - FirstNormalTransactionId +// : u_sess->utils_cxt.RecentXmin - FirstNormalTransactionId; +// #else phdr->pd_xid_base = u_sess->utils_cxt.RecentXmin - FirstNormalTransactionId; -#endif +// #endif phdr->pd_multi_base = 0; RelationSetTargetBlock(relation, BufferGetBlockNumber(buffer)); diff --git a/src/gausskernel/storage/access/heap/rewriteheap.cpp b/src/gausskernel/storage/access/heap/rewriteheap.cpp index 5dbfce0d93e84ce3b025d7f77910dd58a32a0caa..61a2e54d7eb028b9c69a7e9b2c601b681ab0688b 100644 --- a/src/gausskernel/storage/access/heap/rewriteheap.cpp +++ b/src/gausskernel/storage/access/heap/rewriteheap.cpp @@ -747,12 +747,12 @@ static void prepare_cmpr_buffer(RewriteState state, Size meta_size, const char * Assert(meta_size > 0 && meta_size < BLCKSZ); PageInit(page, BLCKSZ, 0, true); phdr = (HeapPageHeader)page; -#ifdef ENABLE_SS_MULTIMASTER - phdr->pd_xid_base = ENABLE_DMS ? u_sess->utils_cxt.CurrentSnapshot->g_oldestxmin - FirstNormalTransactionId - : u_sess->utils_cxt.RecentXmin - FirstNormalTransactionId; -#else +// #ifdef ENABLE_SS_MULTIMASTER +// phdr->pd_xid_base = ENABLE_DMS ? u_sess->utils_cxt.CurrentSnapshot->g_oldestxmin - FirstNormalTransactionId +// : u_sess->utils_cxt.RecentXmin - FirstNormalTransactionId; +// #else phdr->pd_xid_base = u_sess->utils_cxt.RecentXmin - FirstNormalTransactionId; -#endif +// #endif phdr->pd_multi_base = 0; PageReinitWithDict(page, meta_size); @@ -1253,12 +1253,12 @@ static void raw_heap_insert(RewriteState state, HeapTuple tup) HeapPageHeader phdr = (HeapPageHeader)page; /* Initialize a new empty page */ PageInit(page, BLCKSZ, 0, true); -#ifdef ENABLE_SS_MULTIMASTER - phdr->pd_xid_base = ENABLE_DMS ? u_sess->utils_cxt.CurrentSnapshot->g_oldestxmin - FirstNormalTransactionId - : u_sess->utils_cxt.RecentXmin - FirstNormalTransactionId; -#else +// #ifdef ENABLE_SS_MULTIMASTER +// phdr->pd_xid_base = ENABLE_DMS ? u_sess->utils_cxt.CurrentSnapshot->g_oldestxmin - FirstNormalTransactionId +// : u_sess->utils_cxt.RecentXmin - FirstNormalTransactionId; +// #else phdr->pd_xid_base = u_sess->utils_cxt.RecentXmin - FirstNormalTransactionId; -#endif +// #endif phdr->pd_multi_base = 0; state->rs_buffer_valid = true; const char* algo = RelationGetAlgo(state->rs_new_rel); diff --git a/src/gausskernel/storage/access/redo/redo_sequence.cpp b/src/gausskernel/storage/access/redo/redo_sequence.cpp index a56c56ae3ea73caa87d6055457403dda5bf1e8f1..fcc173fa1a190400e32897c544395fdcce91ec28 100644 --- a/src/gausskernel/storage/access/redo/redo_sequence.cpp +++ b/src/gausskernel/storage/access/redo/redo_sequence.cpp @@ -91,12 +91,12 @@ void seqRedoOperatorPage(RedoBufferInfo *buffer, void *itmedata, Size itemsz) sm->magic = SEQ_MAGIC; phdr = (HeapPageHeader)localpage; -#ifdef ENABLE_SS_MULTIMASTER - phdr->pd_xid_base = ENABLE_DMS ? u_sess->utils_cxt.CurrentSnapshot->g_oldestxmin - FirstNormalTransactionId - : u_sess->utils_cxt.RecentXmin - FirstNormalTransactionId; -#else +// #ifdef ENABLE_SS_MULTIMASTER +// phdr->pd_xid_base = ENABLE_DMS ? u_sess->utils_cxt.CurrentSnapshot->g_oldestxmin - FirstNormalTransactionId +// : u_sess->utils_cxt.RecentXmin - FirstNormalTransactionId; +// #else phdr->pd_xid_base = u_sess->utils_cxt.RecentXmin - FirstNormalTransactionId; -#endif +// #endif phdr->pd_multi_base = 0; if (PageAddItem(localpage, (Item)item, itemsz, FirstOffsetNumber, false, false) == InvalidOffsetNumber) diff --git a/src/gausskernel/storage/access/transam/double_write.cpp b/src/gausskernel/storage/access/transam/double_write.cpp index 27558a4bf8fc7d0fb475eb2e1499f0703313a418..4127372142d51e29912be0dd823cc4161b019f4f 100644 --- a/src/gausskernel/storage/access/transam/double_write.cpp +++ b/src/gausskernel/storage/access/transam/double_write.cpp @@ -2078,7 +2078,7 @@ void dw_transfer_phybuffer_addr(const BufferDesc *buf_desc, BufferTag *buf_tag) Assert(buf_desc->extra->seg_fileno <= EXTENT_TYPES && buf_desc->extra->seg_fileno > EXTENT_INVALID); buf_tag->rnode.relNode = buf_desc->extra->seg_fileno; buf_tag->blockNum = buf_desc->extra->seg_blockno; - } else if (ENABLE_REFORM && SS_BEFORE_RECOVERY) { + } else if (SS_BEFORE_RECOVERY) { buf_tag->rnode.relNode = buf_desc->extra->seg_fileno; buf_tag->blockNum = buf_desc->extra->seg_blockno; } else { diff --git a/src/gausskernel/storage/access/transam/multixact.cpp b/src/gausskernel/storage/access/transam/multixact.cpp index ad9d16d519a948f5dd77206014d6ce97a0e09826..5642c15a285946ae49b5da10ca7c5485d61d3893 100644 --- a/src/gausskernel/storage/access/transam/multixact.cpp +++ b/src/gausskernel/storage/access/transam/multixact.cpp @@ -71,7 +71,7 @@ #include "storage/procarray.h" #include "utils/builtins.h" #include "utils/memutils.h" - +#include "ddes/dms/ss_transaction.h" /* * MultiXact state shared across all backends. All this state is protected * by MultiXactGenLock. (We also use MultiXactOffsetControlLock and @@ -862,18 +862,20 @@ static MultiXactId GetNewMultiXactId(int nmembers, MultiXactOffset *offset) * Assign the MXID, and make sure there is room for it in the file. */ #ifdef ENABLE_SS_MULTIMASTER - if (ENABLE_DMS) { - /* multixid do leaping with the partition fixed allocation routine */ - MultiXactId& tmpXactId = t_thrd.shemem_ptr_cxt.MultiXactState->nextMXact; - int owner_inst_id = tmpXactId % DMS_MAX_INSTANCE; - if (owner_inst_id != SS_MY_INST_ID) { - tmpXactId = owner_inst_id < SS_MY_INST_ID ? (tmpXactId + (SS_MY_INST_ID - owner_inst_id)) : - (tmpXactId + (DMS_MAX_INSTANCE - owner_inst_id) + SS_MY_INST_ID); + if (SS_MY_INST_ID == 0) { + if (SS_MY_INST_ID == (t_thrd.shemem_ptr_cxt.MultiXactState->nextMXact % DMS_MAX_INSTANCE)) { + result = t_thrd.shemem_ptr_cxt.MultiXactState->nextMXact; + } else { + /* need adjust xid first */ + result = t_thrd.shemem_ptr_cxt.MultiXactState->nextMXact = t_thrd.shemem_ptr_cxt.MultiXactState->nextMXact + DMS_MAX_INSTANCE - + (t_thrd.shemem_ptr_cxt.MultiXactState->nextMXact % DMS_MAX_INSTANCE); } + } else { + result = SSGetGlobalVal(SS_GLOBAL_MULTIXID); } -#endif - +#else result = t_thrd.shemem_ptr_cxt.MultiXactState->nextMXact; +#endif ExtendMultiXactOffset(result); @@ -909,7 +911,13 @@ static MultiXactId GetNewMultiXactId(int nmembers, MultiXactOffset *offset) * Similarly, nextOffset may be zero, but we won't use that as the * actual start offset of the next multixact. */ +#ifdef ENABLE_SS_MULTIMASTER + if (SS_MY_INST_ID == 0) { + t_thrd.shemem_ptr_cxt.MultiXactState->nextMXact += DMS_MAX_INSTANCE; + } +#else (t_thrd.shemem_ptr_cxt.MultiXactState->nextMXact)++; +#endif t_thrd.shemem_ptr_cxt.MultiXactState->nextOffset += (unsigned)nmembers; @@ -2225,3 +2233,29 @@ void SSMultiXactShmemClear(void) GetBuiltInTrancheName(LWTRANCHE_MULTIXACTMEMBER_CTL), LWTRANCHE_MULTIXACTMEMBER_CTL, DSS_MAX_MXACTMEMBER, 0, MultiXactMemberControlLock, path); } + +int CBProcessGetGlobalMultiXID(unsigned char src_inst, unsigned char is_query, unsigned long long *result) +{ + if (!is_query) { + (void)LWLockAcquire(MultiXactGenLock, LW_EXCLUSIVE); + if (SS_MY_INST_ID == (t_thrd.shemem_ptr_cxt.MultiXactState->nextMXact % DMS_MAX_INSTANCE)) { + *result = t_thrd.shemem_ptr_cxt.MultiXactState->nextMXact = t_thrd.shemem_ptr_cxt.MultiXactState->nextMXact + src_inst; + } else { + *result = t_thrd.shemem_ptr_cxt.MultiXactState->nextMXact = t_thrd.shemem_ptr_cxt.MultiXactState->nextMXact + DMS_MAX_INSTANCE - + (t_thrd.shemem_ptr_cxt.MultiXactState->nextMXact % DMS_MAX_INSTANCE) + src_inst; + } + LWLockRelease(MultiXactGenLock); + } else { + (void)LWLockAcquire(MultiXactGenLock, LW_SHARED); + *result = t_thrd.shemem_ptr_cxt.MultiXactState->nextMXact; + LWLockRelease(MultiXactGenLock); + } + + ereport(LOG, (errmsg("CB Current global multixid is %llu", t_thrd.shemem_ptr_cxt.MultiXactState->nextMXact))); + return DMS_SUCCESS; +} + +void SSSetGlobalMultiXID(uint64 result) +{ + pg_atomic_exchange_u64(&t_thrd.shemem_ptr_cxt.MultiXactState->nextMXact, result); +} diff --git a/src/gausskernel/storage/access/transam/transam.cpp b/src/gausskernel/storage/access/transam/transam.cpp index e8ec58368b14afe1c395bec4d34659b2b12475b3..ab4c2882685f1e402acfe355acaea118ed7eb330 100644 --- a/src/gausskernel/storage/access/transam/transam.cpp +++ b/src/gausskernel/storage/access/transam/transam.cpp @@ -141,6 +141,8 @@ RETRY: */ if (snapshot != NULL && snapshot->satisfies == SNAPSHOT_DECODE_MVCC) { xid = GetReplicationSlotCatalogXmin(); + } else if (ENABLE_DMS && t_thrd.role == DMS_WORKER && isMvcc) { + xid = pg_atomic_read_u64(&t_thrd.xact_cxt.ShmemVariableCache->xmin); } else if (!isMvcc || GTM_LITE_MODE) { TransactionId recentGlobalXmin = pg_atomic_read_u64(&t_thrd.xact_cxt.ShmemVariableCache->recentGlobalXmin); if (!TransactionIdIsValid(recentGlobalXmin)) { @@ -405,7 +407,7 @@ bool TransactionIdDidCommit(TransactionId transactionId) /* true if given transa } #else int owner = transactionId % DMS_MAX_INSTANCE; - if (SS_MY_INST_ID != owner) { + if (SS_MY_INST_ID != owner && t_thrd.role != DMS_WORKER) { bool didCommit; SSTransactionIdDidCommit(transactionId, &didCommit, owner); return didCommit; @@ -552,9 +554,9 @@ bool TransactionIdDidAbort(TransactionId transactionId) /* true if given transac */ bool LatestFetchTransactionIdDidAbort(TransactionId transactionId) /* true if given transaction aborted */ { -#ifdef ENABLE_SS_MULTIMASTER - if (ENABLE_DMS) return true; -#endif +// #ifdef ENABLE_SS_MULTIMASTER +// if (ENABLE_DMS) return true; +// #endif CLogXidStatus xidstatus; @@ -587,9 +589,9 @@ bool LatestFetchTransactionIdDidAbort(TransactionId transactionId) /* true if gi bool LatestFetchCSNDidAbort(TransactionId transactionId) /* true if given transaction aborted */ { -#ifdef ENABLE_SS_MULTIMASTER - if (ENABLE_DMS) return true; -#endif +// #ifdef ENABLE_SS_MULTIMASTER +// if (ENABLE_DMS) return true; +// #endif CommitSeqNo csn; diff --git a/src/gausskernel/storage/access/transam/varsup.cpp b/src/gausskernel/storage/access/transam/varsup.cpp index fba6a2751708c4bc71c808ca4c3d30bab1233996..1ff55174a1d8836f573f1fc7dcca03b655281791 100644 --- a/src/gausskernel/storage/access/transam/varsup.cpp +++ b/src/gausskernel/storage/access/transam/varsup.cpp @@ -32,6 +32,7 @@ #include "utils/builtins.h" #include "utils/syscache.h" #include "utils/distribute_test.h" +#include "ddes/dms/ss_transaction.h" #ifdef PGXC #include "pgxc/pgxc.h" #include "access/gtm.h" @@ -145,13 +146,20 @@ TransactionId GetNewTransactionId(bool isSubXact) (void)LWLockAcquire(XidGenLock, LW_EXCLUSIVE); #ifdef ENABLE_SS_MULTIMASTER - uint64& curr_next_xid = t_thrd.xact_cxt.ShmemVariableCache->nextXid; - int owner_inst_id = curr_next_xid % DMS_MAX_INSTANCE; - if (owner_inst_id != SS_MY_INST_ID) { - curr_next_xid = (curr_next_xid + (DMS_MAX_INSTANCE - owner_inst_id) + SS_MY_INST_ID); + if (SS_MY_INST_ID == 0) { + if (SS_MY_INST_ID == (t_thrd.xact_cxt.ShmemVariableCache->nextXid % DMS_MAX_INSTANCE)) { + xid = t_thrd.xact_cxt.ShmemVariableCache->nextXid; + } else { + /* need adjust xid first */ + xid = t_thrd.xact_cxt.ShmemVariableCache->nextXid = t_thrd.xact_cxt.ShmemVariableCache->nextXid + DMS_MAX_INSTANCE - + (t_thrd.xact_cxt.ShmemVariableCache->nextXid % DMS_MAX_INSTANCE); + } + } else { + xid = SSGetGlobalVal(SS_GLOBAL_XID); } -#endif +#else xid = t_thrd.xact_cxt.ShmemVariableCache->nextXid; +#endif /* * Check to see if it's safe to assign another XID. @@ -192,7 +200,14 @@ TransactionId GetNewTransactionId(bool isSubXact) ExtendCLOG(xid); ExtendCSNLOG(xid); +#ifdef ENABLE_SS_MULTIMASTER + if (SS_MY_INST_ID == 0) { + t_thrd.xact_cxt.ShmemVariableCache->nextXid = t_thrd.xact_cxt.ShmemVariableCache->nextXid + DMS_MAX_INSTANCE; + } +#else TransactionIdAdvance(t_thrd.xact_cxt.ShmemVariableCache->nextXid); +#endif + #ifdef DEBUG FastAdvanceXid(); #endif @@ -276,6 +291,7 @@ TransactionId GetNewTransactionId(bool isSubXact) t_thrd.xact_cxt.ShmemVariableCache->latestCompletedXid, xid))); LWLockRelease(XidGenLock); + ereport(LOG, (errmsg("Set Current global xid is %llu", t_thrd.xact_cxt.ShmemVariableCache->nextXid))); return xid; } diff --git a/src/gausskernel/storage/access/transam/xact.cpp b/src/gausskernel/storage/access/transam/xact.cpp index 758f2c659e465ca1c14aa29e9b0c5ae7836a534d..26cb4cc9f4f973a2809cab733282d2b97d5d950c 100755 --- a/src/gausskernel/storage/access/transam/xact.cpp +++ b/src/gausskernel/storage/access/transam/xact.cpp @@ -1550,7 +1550,7 @@ static TransactionId RecordTransactionCommit(void) #endif #ifdef ENABLE_SS_MULTIMASTER if (ENABLE_DMS && (SS_MY_INST_ID != 0)) { - setCommitCsn(SSGetGlobalCSN(1)); + setCommitCsn(SSGetGlobalVal(SS_GLOBAL_CSN)); } else { setCommitCsn(getLocalNextCSN()); } diff --git a/src/gausskernel/storage/access/transam/xlog.cpp b/src/gausskernel/storage/access/transam/xlog.cpp index dddd6c7b170deb5ec920344d1afaf19d13b526f5..c995c3fe52c9f2286b43ae9245bb2d09743d3a6b 100755 --- a/src/gausskernel/storage/access/transam/xlog.cpp +++ b/src/gausskernel/storage/access/transam/xlog.cpp @@ -549,6 +549,7 @@ static void XLogInsertRecordGroupLeader(PGPROC *leader, uint64 *end_byte_pos_ptr XLogBytePosToRecPtr(prev_byte_pos), ¤t_lrc); #ifdef ENABLE_SS_MULTIMASTER ((XLogRecord *)leader->xlogGrouprdata->data)->logic_lsn = logic_lsn; + g_curr_lsn = logic_lsn; #endif } @@ -638,6 +639,7 @@ static void XLogInsertRecordGroupFollowers(PGPROC *leader, const uint32 head, ui XLogBytePosToRecPtr(prev_byte_pos), ¤t_lrc); #ifdef ENABLE_SS_MULTIMASTER ((XLogRecord *)follower->xlogGrouprdata->data)->logic_lsn = logic_lsn; + follower->xlogGroupLogicLSN = logic_lsn; logic_lsn++; #endif prev_byte_pos = start_byte_pos; @@ -782,6 +784,11 @@ static XLogRecPtr XLogInsertRecordGroup(XLogRecData *rdata, XLogRecPtr fpw_lsn) */ pg_memory_barrier(); Assert(pg_atomic_read_u32(&proc->xlogGroupNext) == INVALID_PGPROCNO); + +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = proc->xlogGroupLogicLSN; +#endif + return proc->xlogGroupReturntRecPtr; } @@ -10211,8 +10218,8 @@ void StartupXLOG(void) * in SS Switchover, skip dw init since we didn't do ShutdownXLOG */ - if ((ENABLE_REFORM && SS_REFORM_REFORMER && !SS_STANDBY_FAILOVER && !SS_PERFORMING_SWITCHOVER) || - !ENABLE_DMS || !ENABLE_REFORM) { + if ((SS_REFORM_REFORMER && !SS_STANDBY_FAILOVER && !SS_PERFORMING_SWITCHOVER) || + !ENABLE_DMS) { /* process assist file of chunk recycling */ dw_ext_init(); dw_init(); @@ -11580,7 +11587,7 @@ void StartupXLOG(void) } #endif - if (ENABLE_DMS && ENABLE_REFORM && !SS_PRIMARY_DEMOTED && !SS_DISASTER_STANDBY_CLUSTER) { + if (ENABLE_DMS && !SS_PRIMARY_DEMOTED && !SS_DISASTER_STANDBY_CLUSTER) { StartupWaitReform(); } diff --git a/src/gausskernel/storage/ipc/procarray.cpp b/src/gausskernel/storage/ipc/procarray.cpp index 0a8e93166d3fae5b37467e3f193e8431e7926b1b..d922a958b44c4a880c938d8f59f0f3d88f0c10f2 100755 --- a/src/gausskernel/storage/ipc/procarray.cpp +++ b/src/gausskernel/storage/ipc/procarray.cpp @@ -1329,24 +1329,24 @@ data consistency. bool TransactionIdIsInProgress(TransactionId xid, uint32* needSync, bool shortcutByRecentXmin, bool bCareNextxid, bool isTopXact, bool checkLatestCompletedXid) { -#ifdef ENABLE_SS_MULTIMASTER - if (ENABLE_DMS) { - int owner = xid % DMS_MAX_INSTANCE; - /* fetch TXN info locally if either reformer, original primary, or normal primary */ - bool local_fetch = (SS_MY_INST_ID == owner); - if (!local_fetch) { - if (TransactionIdIsKnownCompleted(xid)) { - ereport(DEBUG1, (errmsg("xid(%llu) in progress test by local fetched cachedFetchXid.", xid))); - xc_by_known_xact_inc(); - return false; - } - - bool in_progress = true; - SSTransactionIdIsInProgress(xid, &in_progress, owner); - return in_progress; - } - } -#endif +// #ifdef ENABLE_SS_MULTIMASTER +// if (ENABLE_DMS && t_thrd.role != DMS_WORKER) { +// int owner = xid % DMS_MAX_INSTANCE; +// /* fetch TXN info locally if either reformer, original primary, or normal primary */ +// bool local_fetch = (SS_MY_INST_ID == owner); +// if (!local_fetch) { +// if (TransactionIdIsKnownCompleted(xid)) { +// ereport(DEBUG1, (errmsg("xid(%llu) in progress test by local fetched cachedFetchXid.", xid))); +// xc_by_known_xact_inc(); +// return false; +// } + +// bool in_progress = true; +// SSTransactionIdIsInProgress(xid, &in_progress, owner); +// return in_progress; +// } +// } +// #endif ProcArrayStruct* arrayP = g_instance.proc_array_idx; #ifdef USE_ASSERT_CHECKING @@ -1406,7 +1406,7 @@ bool TransactionIdIsInProgress(TransactionId xid, uint32* needSync, bool shortcu return false; } -#ifndef ENABLE_SS_MULTIMASTER +// #ifndef ENABLE_SS_MULTIMASTER if (ENABLE_DMS) { /* fetch TXN info locally if either reformer, original primary, or normal primary */ bool local_fetch = SSCanFetchLocalSnapshotTxnRelatedInfo(); @@ -1416,18 +1416,18 @@ bool TransactionIdIsInProgress(TransactionId xid, uint32* needSync, bool shortcu return in_progress; } } -#endif +// #endif /* * Also, we can handle our own transaction (and subtransactions) without * any access to shared memory. */ -#ifndef ENABLE_SS_MULTIMASTER +// #ifndef ENABLE_SS_MULTIMASTER if (TransactionIdIsCurrentTransactionId(xid)) { xc_by_my_xact_inc(); Assert(shortCutCheckRes == true); return true; } -#endif +// #endif if (!RecoveryInProgress()) { LWLockAcquire(ProcArrayLock, LW_SHARED); @@ -1624,22 +1624,22 @@ TransactionId GetOldestXmin(Relation rel, bool bFixRecentGlobalXmin, bool bRecen TransactionId replication_slot_xmin; volatile TransactionId replication_slot_catalog_xmin = InvalidTransactionId; -#ifdef ENABLE_SS_MULTIMASTER - TransactionId g_result = MaxTransactionId; - if (ENABLE_DMS) { - TransactionId g_result_array[DMS_MAX_INSTANCE]{0}; - dms_get_min_scn(g_result_array); - for (auto min : g_result_array) { - if (min && min < g_result) - g_result = min; - } - } - if (!bFixRecentGlobalXmin && TransactionIdIsNormal(u_sess->utils_cxt.RecentGlobalXmin) && !bRecentGlobalXminNoCheck) - return g_result < u_sess->utils_cxt.RecentGlobalXmin ? g_result : u_sess->utils_cxt.RecentGlobalXmin; -#else +// #ifdef ENABLE_SS_MULTIMASTER +// TransactionId g_result = MaxTransactionId; +// if (ENABLE_DMS) { +// TransactionId g_result_array[DMS_MAX_INSTANCE]{0}; +// dms_get_min_scn(g_result_array); +// for (auto min : g_result_array) { +// if (min && min < g_result) +// g_result = min; +// } +// } +// if (!bFixRecentGlobalXmin && TransactionIdIsNormal(u_sess->utils_cxt.RecentGlobalXmin) && !bRecentGlobalXminNoCheck) +// return g_result < u_sess->utils_cxt.RecentGlobalXmin ? g_result : u_sess->utils_cxt.RecentGlobalXmin; +// #else if (!bFixRecentGlobalXmin && TransactionIdIsNormal(u_sess->utils_cxt.RecentGlobalXmin) && !bRecentGlobalXminNoCheck) return u_sess->utils_cxt.RecentGlobalXmin; -#endif +// #endif /* Fetch into local variable, don't need to hold ProcArrayLock */ replication_slot_xmin = g_instance.proc_array_idx->replication_slot_xmin; @@ -1706,11 +1706,11 @@ TransactionId GetOldestXmin(Relation rel, bool bFixRecentGlobalXmin, bool bRecen NormalTransactionIdPrecedes(replication_slot_catalog_xmin, result)) result = replication_slot_catalog_xmin; -#ifdef ENABLE_SS_MULTIMASTER - return g_result < result ? g_result : result; -#else +// #ifdef ENABLE_SS_MULTIMASTER +// return g_result < result ? g_result : result; +// #else return result; -#endif +// #endif } TransactionId GetGlobalOldestXmin() @@ -2131,7 +2131,7 @@ RETRY: if (SS_MY_INST_ID == 0) snapshot->snapshotcsn = pg_atomic_read_u64(&t_thrd.xact_cxt.ShmemVariableCache->nextCommitSeqNo); else - snapshot->snapshotcsn = SSGetGlobalCSN(0); + snapshot->snapshotcsn = SSGetGlobalVal(SS_GLOBAL_CSN, true); uint64 oldestxmin = dms_get_min_scn(snapshot->g_xmin); snapshot->g_oldestxmin = (oldestxmin && oldestxmin < u_sess->utils_cxt.RecentXmin) ? oldestxmin: u_sess->utils_cxt.RecentXmin; ereport(DEBUG1, (errmodule(MOD_DMS), errmsg("[##DEBUGGING## g_oldestxmin(%llu), recentxmin(%llu)", snapshot->g_oldestxmin, u_sess->utils_cxt.RecentXmin))); @@ -5241,7 +5241,8 @@ void CalculateLocalLatestSnapshot(bool forceCalc) pg_write_barrier(); - ereport(DEBUG1, (errmsg("Generated snapshot in ring buffer slot %lu\n", SNAPXID_INDEX(snapxid)))); + //ereport(DEBUG1, (errmsg("Generated snapshot in ring buffer slot %lu\n", SNAPXID_INDEX(snapxid)))); + ereport(LOG, (errmsg("Snapshot %lu/%lu/%lu/%lu", snapxid->xmin, snapxid->xmax, snapxid->localxmin, snapxid->snapshotcsn))); SetNextSnapXid(); } diff --git a/src/gausskernel/storage/lmgr/lock.cpp b/src/gausskernel/storage/lmgr/lock.cpp index e2e6d1b54aa969982cb057daa80e6b85c6aa7352..6da028a05a2cde341e50fbaf66824768d2fe7da8 100644 --- a/src/gausskernel/storage/lmgr/lock.cpp +++ b/src/gausskernel/storage/lmgr/lock.cpp @@ -780,13 +780,13 @@ static LockAcquireResult LockAcquireExtendedXC(const LOCKTAG *locktag, LOCKMODE } /* First we try to get dms lock in shared storage mode */ -#ifndef ENABLE_SS_MULTIMASTER +//#ifndef ENABLE_SS_MULTIMASTER if (ENABLE_DMS && (locktag->locktag_type < (uint8)LOCKTAG_PAGE || locktag->locktag_type == (uint8)LOCKTAG_OBJECT) && !RecoveryInProgress()) { -#else - if (ENABLE_DMS && ((locktag->locktag_type <= (uint8)LOCKTAG_TRANSACTION && locktag->locktag_type != (uint8)LOCKTAG_PAGE) - || locktag->locktag_type == (uint8)LOCKTAG_OBJECT) && !RecoveryInProgress()) { -#endif +// #else +// if (ENABLE_DMS && ((locktag->locktag_type <= (uint8)LOCKTAG_TRANSACTION && locktag->locktag_type != (uint8)LOCKTAG_PAGE) +// || locktag->locktag_type == (uint8)LOCKTAG_OBJECT) && !RecoveryInProgress()) { +// #endif bool ret = SSDmsLockAcquire(locallock, dontWait, waitSec); if (!ret) { instr_stmt_report_lock(LOCK_END, NoLock); diff --git a/src/include/access/multixact.h b/src/include/access/multixact.h index 5e3d8d11d224f080dc493766de04b155313d605f..3f48114547ec89c01a667e011d45e6040c7f0547 100644 --- a/src/include/access/multixact.h +++ b/src/include/access/multixact.h @@ -145,5 +145,6 @@ extern void get_multixact_pageno(uint8 info, int64 *pageno, XLogReaderState *rec extern void SSMultiXactShmemClear(void); extern void SSGetMultiXactIdMembers(MultiXactId multi, int *nmembers, MultiXactMember **members, int owner); - +extern int CBProcessGetGlobalMultiXID(unsigned char src_inst, unsigned char is_query, unsigned long long *result); +extern void SSSetGlobalMultiXID(uint64 result); #endif /* MULTIXACT_H */ diff --git a/src/include/access/transam.h b/src/include/access/transam.h index ff634d1178058537ade5ca4e6b57c6583f144bd2..1c86b24540c0103efdfdcb874fdfc83ffb6e7574 100755 --- a/src/include/access/transam.h +++ b/src/include/access/transam.h @@ -63,11 +63,20 @@ : (ShortTransactionId)(xid)) /* advance a transaction ID variable */ +#ifdef ENABLE_SS_MULTIMASTER +#define TransactionIdAdvance(dest) \ + do { \ + if (SS_MY_INST_ID == 0) { \ + t_thrd.xact_cxt.ShmemVariableCache->nextXid = t_thrd.xact_cxt.ShmemVariableCache->nextXid + DMS_MAX_INSTANCE; \ + } \ + } while (0) +#else #define TransactionIdAdvance(dest) \ do { \ (dest)++; \ Assert((dest) > FirstNormalTransactionId); \ } while (0) +#endif /* back up a transaction ID variable */ #define TransactionIdRetreat(dest) \ diff --git a/src/include/ddes/dms/dms_api.h b/src/include/ddes/dms/dms_api.h index 8312baccad35aff4cbb4f9375721e66fcedab51d..b9ed9ff2b527f7231f049d73c7fd52459f252f33 100644 --- a/src/include/ddes/dms/dms_api.h +++ b/src/include/ddes/dms/dms_api.h @@ -52,7 +52,7 @@ extern "C" { #ifndef ENABLE_SS_MULTIMASTER #define DMS_MAX_INSTANCES 64 #else -#define DMS_MAX_INSTANCES 16 +#define DMS_MAX_INSTANCES 3 #endif #define DMS_MAX_NAME_LEN 64 @@ -762,7 +762,7 @@ typedef unsigned long long (*dms_get_page_lfn)(dms_buf_ctrl_t *buf_ctrl); typedef unsigned long long(*dms_get_global_lfn)(void *db_handle); typedef unsigned long long(*dms_get_global_scn)(void *db_handle); typedef unsigned long long(*dms_get_global_lsn)(void *db_handle); -typedef int(*dms_get_global_csn)(void *db_handle, unsigned char request, unsigned long long *result); +typedef int(*dms_get_global_csn)(void *db_handle, unsigned char src_inst, unsigned char res_type, unsigned char is_query, unsigned long long *result); typedef unsigned long long(*dms_get_global_flushed_lfn)(void *db_handle); typedef int(*dms_read_local_page4transfer)(void *db_handle, char pageid[DMS_PAGEID_SIZE], dms_lock_mode_t mode, dms_buf_ctrl_t **buf_ctrl); @@ -1082,7 +1082,6 @@ typedef struct st_dms_profile { unsigned char rdma_rpc_bind_core_start; unsigned char rdma_rpc_bind_core_end; char ock_log_path[DMS_OCK_LOG_PATH_LEN]; - unsigned char enable_reform; // ock scrlock configs unsigned char enable_scrlock; unsigned int primary_inst_id; diff --git a/src/include/ddes/dms/ss_common_attr.h b/src/include/ddes/dms/ss_common_attr.h index b552b520789dc021b022195b30d454e153c23d4d..ca42e670d04d209eb33af1e60be9ce79a642f34d 100644 --- a/src/include/ddes/dms/ss_common_attr.h +++ b/src/include/ddes/dms/ss_common_attr.h @@ -33,14 +33,12 @@ #ifdef ENABLE_LITE_MODE #define ENABLE_DMS false -#define ENABLE_REFORM false #define ENABLE_VERIFY_PAGE_VERSION false #define ENABLE_SS_TXNSTATUS_CACHE false #define ENABLE_SS_BCAST_SNAPSHOT false #define SS_SINGLE_CLUSTER false #else #define ENABLE_DMS (g_instance.attr.attr_storage.dms_attr.enable_dms && !IsInitdb) -#define ENABLE_REFORM (g_instance.attr.attr_storage.dms_attr.enable_reform) #define ENABLE_VERIFY_PAGE_VERSION (g_instance.attr.attr_storage.dms_attr.enable_verify_page) #define ENABLE_SS_TXNSTATUS_CACHE (ENABLE_DMS && g_instance.attr.attr_storage.dms_attr.txnstatus_cache_size > 0) #define ENABLE_SS_BCAST_SNAPSHOT (ENABLE_DMS && g_instance.attr.attr_storage.dms_attr.enable_bcast_snapshot) diff --git a/src/include/ddes/dms/ss_dms.h b/src/include/ddes/dms/ss_dms.h index 57bd30fcbb0be8973f50701890b1db2744b8f3d4..2ddb1111ba90158d59bbac2b0fe262da2f0ace24 100644 --- a/src/include/ddes/dms/ss_dms.h +++ b/src/include/ddes/dms/ss_dms.h @@ -48,7 +48,7 @@ typedef struct st_ss_dms_func { int (*dms_send_boc)(dms_context_t *dms_ctx, unsigned long long commit_scn, unsigned long long min_scn, unsigned long long *success_inst, unsigned long long *ruid); unsigned long long(*dms_get_min_scn)(unsigned long *min_scn); - int (*dms_request_opengauss_csn)(dms_context_t *dms_ctx, unsigned char request, unsigned long long *result); + int (*dms_request_opengauss_csn)(dms_context_t *dms_ctx, unsigned char res_type, unsigned char is_query, unsigned long long *result); int (*dms_request_opengauss_update_xid)(dms_context_t *dms_ctx, unsigned short t_infomask, unsigned short t_infomask2, unsigned long long *uxid); int (*dms_request_opengauss_xid_csn)(dms_context_t *dms_ctx, dms_opengauss_xid_csn_t *dms_txn_info, @@ -115,7 +115,7 @@ int dms_request_opengauss_update_xid(dms_context_t *dms_ctx, unsigned short t_infomask, unsigned short t_infomask2, unsigned long long *uxid); int dms_request_opengauss_xid_csn(dms_context_t *dms_ctx, dms_opengauss_xid_csn_t *dms_txn_info, dms_opengauss_csn_result_t *xid_csn_result); -int dms_request_opengauss_csn(dms_context_t *dms_ctx, unsigned char request, unsigned long long *result); +int dms_request_opengauss_csn(dms_context_t *dms_ctx, unsigned char res_type, unsigned char is_query, unsigned long long *result); int dms_request_opengauss_txn_status(dms_context_t *dms_ctx, unsigned char request, unsigned char *result); int dms_request_opengauss_multixact_members(dms_context_t *dms_ctx, int membersize, int *nmembers, char **result); int dms_request_opengauss_txn_snapshot(dms_context_t *dms_ctx, diff --git a/src/include/ddes/dms/ss_init.h b/src/include/ddes/dms/ss_init.h index 25c69a9d10c92863fd3e1428d08ce5beaa64e85a..1c5269d8fea3226b62e4b78b3919d444e9388275 100644 --- a/src/include/ddes/dms/ss_init.h +++ b/src/include/ddes/dms/ss_init.h @@ -30,7 +30,7 @@ #ifndef ENABLE_SS_MULTIMASTER #define DMS_MAX_INSTANCE 64 #else -#define DMS_MAX_INSTANCE 16 +#define DMS_MAX_INSTANCE 3 #endif #define DMS_MAX_SESSIONS (uint32)16320 #define DMS_MAX_CONNECTIONS (int32)16000 diff --git a/src/include/ddes/dms/ss_transaction.h b/src/include/ddes/dms/ss_transaction.h index e245b59731504daa6d48335b08444d906848cc4d..cb21e04fe80fe2d0bb7caf17bd13a207ae0245de 100644 --- a/src/include/ddes/dms/ss_transaction.h +++ b/src/include/ddes/dms/ss_transaction.h @@ -94,8 +94,15 @@ typedef struct SSBroadcasDbBackendsAck { int count; } SSBroadcastDbBackendsAck; +typedef enum { + SS_GLOBAL_XID, + SS_GLOBAL_CSN, + SS_GLOBAL_MULTIXID +} SSGlobalValType; + Snapshot SSGetSnapshotData(Snapshot snapshot); CommitSeqNo SSGetGlobalCSN(unsigned char request); +uint64 SSGetGlobalVal(SSGlobalValType val_type, bool is_query = false); CommitSeqNo SSTransactionIdGetCommitSeqNo(TransactionId transactionId, bool isCommit, bool isMvcc, bool isNest, Snapshot snapshot, bool* sync, int owner); void SSTransactionIdDidCommit(TransactionId transactionId, bool *ret_did_commit, int owner); diff --git a/src/include/knl/knl_guc/knl_instance_attr_storage.h b/src/include/knl/knl_guc/knl_instance_attr_storage.h index 0ea91341a06c9cbcd25a7ba10b968ae111496b67..e7c7976e543341fa7d491c5c6fa3d0257d6f8f5a 100755 --- a/src/include/knl/knl_guc/knl_instance_attr_storage.h +++ b/src/include/knl/knl_guc/knl_instance_attr_storage.h @@ -111,7 +111,6 @@ typedef struct knl_instance_attr_dms { char* ock_log_path; int channel_count; int work_thread_count; - bool enable_reform; bool enable_ssl; int inst_count; bool enable_log_level; diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 08ea46778f3e1cef4054d7374a1619d544c719f1..70b3c547157059944203712fbc3354cd5000b3f7 100755 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -273,6 +273,9 @@ struct PGPROC { bool* xlogGroupDoPageWrites; bool xlogGroupIsFPW; uint64 snap_refcnt_bitmap; +#ifdef ENABLE_SS_MULTIMASTER + uint32 xlogGroupLogicLSN; +#endif #endif XLogRecPtr exrto_min; /* calculate recycle lsn for read on standby in extreme rto */