diff --git a/src/common/dss_param.h b/src/common/dss_param.h index 6da8891402fdb51136f1ea0cf8875c659a08ed3f..d5a869269fc1b541b58135a270a6d9e39d2a6c15 100644 --- a/src/common/dss_param.h +++ b/src/common/dss_param.h @@ -62,14 +62,12 @@ typedef struct st_dss_params { uint64 mes_pool_size; uint32 inst_cnt; + uint64 inst_map; char nodes[DSS_MAX_INSTANCES][CM_MAX_IP_LEN]; uint16 ports[DSS_MAX_INSTANCES]; uint32 channel_num; uint32 work_thread_cnt; cs_pipe_type_t pipe_type; - uint64 inst_map; - uint64 inst_work_status_map; // one bit , on inst, if 1 inst be ok, 0 inst not ok - uint64 inst_out_of_work_cnt; // count of inst, whose status is ok, if not the inst_cnt, the brocast will not send bool32 elapsed_switch; uint32 shm_key; #ifdef ENABLE_GLOBAL_CACHE @@ -103,26 +101,6 @@ static inline char *dss_get_cfg_dir(dss_config_t *inst_cfg) return inst_cfg->home; } -static inline uint64 dss_get_inst_work_status(void) -{ - return (uint64)cm_atomic_get((atomic_t *)&g_inst_cfg->params.inst_work_status_map); -} - -static inline void dss_set_inst_work_status(uint64 cur_inst_map) -{ - (void)cm_atomic_set((atomic_t *)&g_inst_cfg->params.inst_work_status_map, (int64)cur_inst_map); -} - -static inline uint64 dss_get_inst_out_of_work_cnt(void) -{ - return (uint64)cm_atomic_get((atomic_t *)&g_inst_cfg->params.inst_out_of_work_cnt); -} - -static inline void dss_set_inst_out_of_work_cnt(uint64 inst_out_of_work_cnt) -{ - (void)cm_atomic_set((atomic_t *)&g_inst_cfg->params.inst_out_of_work_cnt, (int64)inst_out_of_work_cnt); -} - /* * @brief set ssl relevant param * @[in] param name(SSL_CA、SSL_KEY、SSL_PWD_PLAINTEXT、SSL_CERT). diff --git a/src/service/dss_instance.c b/src/service/dss_instance.c index 1c283a58be63f024d30466e01ee65b09fe6e87df..cd6d8a47713751c3ba4c206805cc13cfa4dbec3f 100644 --- a/src/service/dss_instance.c +++ b/src/service/dss_instance.c @@ -373,7 +373,7 @@ status_t dss_start_lsnr(dss_instance_t *inst) status_t dss_init_cm(dss_instance_t *inst) { inst->cm_res.is_valid = CM_FALSE; - inst->inst_cfg.params.inst_work_status_map = 0; + inst->inst_work_status_map = 0; dss_config_t *inst_cfg = dss_get_inst_cfg(); char *value = cm_get_config_value(&inst_cfg->config, "DSS_CM_SO_NAME"); if (value == NULL || strlen(value) == 0) { @@ -406,6 +406,29 @@ void dss_free_log_ctrl(dss_instance_t *inst) } } +void dss_check_peer_by_inst(dss_instance_t *inst, uint64 inst_id) +{ + dss_config_t *inst_cfg = dss_get_inst_cfg(); + // Can't be myself + if (inst_id == (uint64)inst_cfg->params.inst_id) { + return; + } + + // Not cfg the inst + uint64 inst_mask = ((uint64)0x1 << inst_id); + if ((inst_cfg->params.inst_map & inst_mask) == 0) { + return; + } + + uint64 cur_inst_map = dss_get_inst_work_status(); + // Has connection + if ((cur_inst_map & inst_mask) != 0) { + return; + } + + dss_check_peer_inst(inst, inst_id); +} + static void dss_check_peer_by_cm(dss_instance_t *inst) { cm_res_stat_ptr_t res = cm_res_get_stat(&inst->cm_res.mgr); @@ -413,7 +436,6 @@ static void dss_check_peer_by_cm(dss_instance_t *inst) return; } dss_config_t *inst_cfg = dss_get_inst_cfg(); - uint32 inst_work_stats_ok_cnt = 0; uint64 cur_inst_map = 0; int instance_count = cm_res_get_instance_count(&inst->cm_res.mgr, res); for (int32_t idx = 0; idx < instance_count; idx++) { @@ -437,22 +459,19 @@ static void dss_check_peer_by_cm(dss_instance_t *inst) } int stat = cm_res_get_inst_stat(&inst->cm_res.mgr, inst_res); - if (stat == CM_RES_STATUS_ONLINE) { - inst_work_stats_ok_cnt++; - } else { + if (stat != CM_RES_STATUS_ONLINE) { LOG_RUN_INF("dss instance [%d] work stat [%d] not online.", res_instance_id, stat); } cur_inst_map |= ((uint64)0x1 << res_instance_id); } - dss_set_inst_out_of_work_cnt(inst_cfg->params.inst_cnt - inst_work_stats_ok_cnt); + dss_check_mes_conn(cur_inst_map); cm_res_free_stat(&inst->cm_res.mgr, res); } static void dss_check_peer_default(void) { - dss_set_inst_out_of_work_cnt(0); - dss_check_mes_conn(DSS_ULL_MAX); + dss_check_mes_conn(DSS_INVALID_64); } void dss_init_cm_res(dss_instance_t *inst) @@ -471,12 +490,8 @@ void dss_init_cm_res(dss_instance_t *inst) return; } -void dss_check_peer_inst(dss_instance_t *inst) +static void dss_check_peer_inst_inner(dss_instance_t *inst) { - dss_config_t *inst_cfg = dss_get_inst_cfg(); - if (inst_cfg->params.inst_cnt <= 1) { - return; - } /** * During installation initialization, db_init depends on the DSS server. However, the CMS is not started. * Therefore, cm_init cannot be invoked during the DSS server startup. @@ -491,3 +506,35 @@ void dss_check_peer_inst(dss_instance_t *inst) } dss_check_peer_default(); } + +void dss_check_peer_inst(dss_instance_t *inst, uint64 inst_id) +{ + dss_config_t *inst_cfg = dss_get_inst_cfg(); + if (inst_cfg->params.inst_cnt <= 1) { + return; + } + + uint64 inst_mask = ((uint64)0x1 << inst_id); + cm_spin_lock(&inst->inst_work_lock, NULL); + + // after lock, check again, other thed may get the lock, and init the map before + uint64 cur_inst_map = dss_get_inst_work_status(); + // has connection + if (inst_id != DSS_INVALID_64 && (cur_inst_map & inst_mask) != 0) { + cm_spin_unlock(&inst->inst_work_lock); + return; + } + + dss_check_peer_inst_inner(inst); + cm_spin_unlock(&inst->inst_work_lock); +} + +uint64 dss_get_inst_work_status(void) +{ + return (uint64)cm_atomic_get((atomic_t *)&g_dss_instance.inst_work_status_map); +} + +void dss_set_inst_work_status(uint64 cur_inst_map) +{ + (void)cm_atomic_set((atomic_t *)&g_dss_instance.inst_work_status_map, (int64)cur_inst_map); +} \ No newline at end of file diff --git a/src/service/dss_instance.h b/src/service/dss_instance.h index c6427255b8bfb62bd8fbfa4716ed4f3cc875a924..f24c15c717e5370152b57acc520d346773acadab 100644 --- a/src/service/dss_instance.h +++ b/src/service/dss_instance.h @@ -69,6 +69,8 @@ typedef struct st_dss_instance { int64 thread_cnt; bool32 abort_status; dss_cm_res cm_res; + uint64 inst_work_status_map; // one bit one inst, bit value is 1 means inst ok, 0 means inst not ok + spinlock_t inst_work_lock; dss_kernel_instance_t *kernel_instance; } dss_instance_t; @@ -81,12 +83,15 @@ extern dss_instance_t g_dss_instance; status_t dss_start_lsnr(dss_instance_t *inst); void dss_uninit_cm(dss_instance_t *inst); -void dss_check_peer_inst(dss_instance_t *inst); +void dss_check_peer_inst(dss_instance_t *inst, uint64 inst_id); void dss_free_log_ctrl(dss_instance_t *inst); status_t dss_get_instance_log_buf_and_recover(dss_instance_t *inst); status_t dss_load_log_buffer(dss_redo_batch_t *batch); status_t dss_alloc_instance_log_buf(dss_instance_t *inst); status_t dss_recover_from_instance(dss_instance_t *inst); +void dss_check_peer_by_inst(dss_instance_t *inst, uint64 inst_id); +uint64 dss_get_inst_work_status(void); +void dss_set_inst_work_status(uint64 cur_inst_map); #ifdef __cplusplus } diff --git a/src/service/dss_mes.c b/src/service/dss_mes.c index 2ce53bca2e3bbe4a16fbbd5d063b58348f8b31ea..2cd1ecd9fff3ffd10c5be825fbb5eb44da5b4560 100644 --- a/src/service/dss_mes.c +++ b/src/service/dss_mes.c @@ -342,11 +342,12 @@ static void dss_process_message(uint32 work_idx, mes_message_t *msg) LOG_DEBUG_ERR("Invalid request received,cmd is %c.", msg->head->cmd); return; } + // ready the ack connection + dss_check_peer_by_inst(&g_dss_instance, msg->head->src_inst); dss_processor_t *processor = &g_dss_processors[msg->head->cmd]; dss_session_ctrl_t *session_ctrl = dss_get_session_ctrl(); dss_session_t *session = &session_ctrl->sessions[work_idx]; processor->proc(session, msg); - return; } // add function diff --git a/src/service/dss_mes.h b/src/service/dss_mes.h index ff28e9500e094acc460dc71c91688663c1bed78d..7c715f7b1d19f2b69a7b7a7402571cf5fbff1d31 100644 --- a/src/service/dss_mes.h +++ b/src/service/dss_mes.h @@ -168,6 +168,7 @@ status_t dss_read_volume_remote(const char *vg_name, dss_volume_t *volume, int64 status_t dss_send2standy( dss_session_t *session, mes_message_head_t *reqhead, big_packets_ctrl_t *ctrl, const char *buf, uint16 size); status_t dss_batch_load(dss_session_t *session, dss_loaddisk_req_t *req, mes_message_head_t *reqhead); +status_t dss_notify_online(dss_session_t *session); #ifdef __cplusplus } diff --git a/src/service/dssserver.c b/src/service/dssserver.c index a1430193c2690754ce9b190b8e7a8f6243712982..98b26a1d19227401030288be81b6ef60a3aeaabd 100644 --- a/src/service/dssserver.c +++ b/src/service/dssserver.c @@ -111,7 +111,7 @@ static void handle_main_wait(void) if (g_dss_instance.abort_status == CM_TRUE) { break; } - dss_check_peer_inst(&g_dss_instance); + dss_check_peer_inst(&g_dss_instance, DSS_INVALID_64); dss_ssl_ca_cert_expire(); cm_sleep(500); } while (CM_TRUE);