diff --git a/src/cmd/dsscmd.c b/src/cmd/dsscmd.c index b6b11134b28750e8c4cd3c90b768037c005b42b6..79c48118796f0d789f78e65a2882f3c0fc70d71e 100644 --- a/src/cmd/dsscmd.c +++ b/src/cmd/dsscmd.c @@ -2678,6 +2678,43 @@ static status_t getcfg_proc(void) return status; } +static dss_args_t cmd_getstatus_args[] = { + {'U', "UDS", CM_FALSE, CM_TRUE, cmd_check_uds, cmd_check_convert_uds_home, cmd_clean_check_convert, 0, NULL, NULL, + 0}, +}; + +static dss_args_set_t cmd_getstatus_args_set = { + cmd_getstatus_args, + sizeof(cmd_getstatus_args) / sizeof(dss_args_t), + NULL, +}; + +static void getstatus_help(char *prog_name) +{ + (void)printf("\nUsage:%s getstatus <-n name> [-U UDS:socket_domain]\n", prog_name); + (void)printf("[client command] get dss server status\n"); + (void)printf("-U/--UDS , [optional], the unix socket path of dssserver, " + "default vaule is UDS:/tmp/.dss_unix_d_socket\n"); +} + +static status_t getstatus_proc(void) +{ + dss_conn_t connection; + status_t status = get_connection_by_input_args(cmd_getstatus_args[DSS_ARG_IDX_1].input_args, &connection); + if (status != CM_SUCCESS) { + return status; + } + int server_status = 0; + status = dss_get_inst_status_on_server(&connection, &server_status); + if (status != CM_SUCCESS) { + DSS_PRINT_ERROR("Failed to get server status.\n"); + } else { + DSS_PRINT_INF("Server status is %d.\n", server_status); + } + dss_disconnect_ex(&connection); + return status; +} + static dss_args_t cmd_stopdss_args[] = { {'U', "UDS", CM_FALSE, CM_TRUE, cmd_check_uds, cmd_check_convert_uds_home, cmd_clean_check_convert, 0, NULL, NULL, 0}, @@ -2847,6 +2884,7 @@ dss_admin_cmd_t g_dss_admin_cmd[] = { {"cv", cv_help, cv_proc, &cmd_cv_args_set} {"encrypt", encrypt_help, encrypt_proc, &cmd_encrypt_args_set}, {"setcfg", setcfg_help, setcfg_proc, &cmd_setcfg_args_set}, {"getcfg", getcfg_help, getcfg_proc, &cmd_getcfg_args_set}, + {"getstatus", getstatus_help, getstatus_proc, &cmd_getstatus_args_set}, {"stopdss", stopdss_help, stopdss_proc, &cmd_stopdss_args_set}, {"scandisk", scandisk_help, scandisk_proc, &cmd_scandisk_args_set}, }; diff --git a/src/common/dss_defs.c b/src/common/dss_defs.c index 15ca7ebfc349a16e069ea713420b747363592231..a36bfbbcb86f18f8e5068b3d6b5cbb89b7c85dec 100644 --- a/src/common/dss_defs.c +++ b/src/common/dss_defs.c @@ -55,9 +55,10 @@ static char *g_dss_cmd_desc[DSS_CMD_TYPE_OFFSET(DSS_CMD_END)] = { [DSS_CMD_TYPE_OFFSET(DSS_CMD_UPDATE_WRITTEN_SIZE)] = "update written size", [DSS_CMD_TYPE_OFFSET(DSS_CMD_STOP_SERVER)] = "stopserver", [DSS_CMD_TYPE_OFFSET(DSS_CMD_SETCFG)] = "setcfg", - [DSS_CMD_TYPE_OFFSET(DSS_CMD_SET_STATUS)] = "setstatus", [DSS_CMD_TYPE_OFFSET(DSS_CMD_SYMLINK)] = "symlink", [DSS_CMD_TYPE_OFFSET(DSS_CMD_UNLINK)] = "unlink", + [DSS_CMD_TYPE_OFFSET(DSS_CMD_SET_MAIN_INST)] = "set main inst", + [DSS_CMD_TYPE_OFFSET(DSS_CMD_SWITCH_LOCK)] = "switch cm lock", [DSS_CMD_TYPE_OFFSET(DSS_CMD_GET_HOME)] = "get home", [DSS_CMD_TYPE_OFFSET(DSS_CMD_EXIST_FILE)] = "exist file", [DSS_CMD_TYPE_OFFSET(DSS_CMD_EXIST_DIR)] = "exist dir", @@ -65,6 +66,7 @@ static char *g_dss_cmd_desc[DSS_CMD_TYPE_OFFSET(DSS_CMD_END)] = { [DSS_CMD_TYPE_OFFSET(DSS_CMD_READLINK)] = "readlink", [DSS_CMD_TYPE_OFFSET(DSS_CMD_GET_FTID_BY_PATH)] = "get ftid by path", [DSS_CMD_TYPE_OFFSET(DSS_CMD_GETCFG)] = "getcfg", + [DSS_CMD_TYPE_OFFSET(DSS_CMD_GET_INST_STATUS)] = "get inst status", [DSS_CMD_TYPE_OFFSET(DSS_CMD_EXEC_REMOTE)] = "exec remote", }; diff --git a/src/common/dss_defs.h b/src/common/dss_defs.h index 4b8b6885b49cd1ddb69cb73186edf77c699056ab..6f12e832f85e74728ac83ddc98d063d5c4a1216f 100644 --- a/src/common/dss_defs.h +++ b/src/common/dss_defs.h @@ -99,9 +99,10 @@ typedef enum { DSS_CMD_UPDATE_WRITTEN_SIZE, DSS_CMD_STOP_SERVER, DSS_CMD_SETCFG, - DSS_CMD_SET_STATUS, DSS_CMD_SYMLINK, DSS_CMD_UNLINK, + DSS_CMD_SET_MAIN_INST, + DSS_CMD_SWITCH_LOCK, DSS_CMD_MODIFY_END, DSS_CMD_QUERY_BEGIN = DSS_CMD_MODIFY_END, DSS_CMD_GET_HOME = DSS_CMD_QUERY_BEGIN, @@ -111,11 +112,18 @@ typedef enum { DSS_CMD_READLINK, DSS_CMD_GET_FTID_BY_PATH, DSS_CMD_GETCFG, + DSS_CMD_GET_INST_STATUS, DSS_CMD_QUERY_END, DSS_CMD_EXEC_REMOTE = DSS_CMD_QUERY_END, DSS_CMD_END // must be the last item } dss_cmd_type_e; +static inline bool32 dss_can_cmd_type_no_open(dss_cmd_type_e type) +{ + return ((type == DSS_CMD_GET_INST_STATUS) || (type == DSS_CMD_GET_HOME) || (type == DSS_CMD_SET_SESSIONID) || + (type == DSS_CMD_STOP_SERVER)); +} + #define DSS_DEFAULT_AU_SIZE SIZE_M(8) #define DSS_MAX_AU_SIZE SIZE_M(64) #define DSS_MIN_AU_SIZE SIZE_M(2) diff --git a/src/common/dss_diskgroup.c b/src/common/dss_diskgroup.c index c0073e9745c902f8f99291a34b9c4e3da4ece589..e2dedf952035ebf17fa51d7d811635c4e4c6c6df 100644 --- a/src/common/dss_diskgroup.c +++ b/src/common/dss_diskgroup.c @@ -101,12 +101,6 @@ int32 dss_get_server_status_flag() void dss_set_server_status_flag(int32 dss_status) { g_is_dss_readwrite = dss_status; - if (dss_status == DSS_STATUS_READWRITE) { - dss_config_t *inst_cfg = dss_get_inst_cfg(); - g_master_instance_id = (uint32)(inst_cfg->params.inst_id); - } else { - g_master_instance_id = DSS_INVALID_ID32; - } } void dss_checksum_vg_ctrl(dss_vg_info_item_t *vg_item); @@ -210,7 +204,7 @@ dss_vg_info_item_t *dss_find_vg_item(const char *vg_name) return NULL; } -status_t dg_check_and_recover(uint32 index) +status_t dss_load_ctrlinfo(uint32 index) { dss_vg_info_item_t *vg_item = &g_vgs_info->volume_group[index]; dss_config_t *inst_cfg = dss_get_inst_cfg(); @@ -219,18 +213,11 @@ status_t dg_check_and_recover(uint32 index) LOG_DEBUG_ERR("Failed to lock vg:%s.", vg_item->entry_path); return status; } -#ifdef OPENGAUSS if (dss_recover_ctrlinfo(vg_item) != CM_SUCCESS) { dss_unlock_vg_storage(vg_item, vg_item->entry_path, inst_cfg); LOG_DEBUG_ERR("dss ctrl of %s is invalid when instance init.", vg_item->vg_name); return CM_ERROR; } -#endif - status = dss_check_redo_and_recover(vg_item); - if (status != CM_SUCCESS) { - LOG_DEBUG_ERR("Failed to check redo and recover vg:%s.", vg_item->entry_path); - } - dss_unlock_vg_storage(vg_item, vg_item->entry_path, inst_cfg); return status; } @@ -318,9 +305,9 @@ static status_t dss_get_vg_info_core(uint32 i, dss_share_vg_info_t *share_vg_inf return status; } - status = dg_check_and_recover(i); + status = dss_load_ctrlinfo(i); if (status != CM_SUCCESS) { - LOG_RUN_ERR("DSS instance failed to recover vg:%s!", g_vgs_info->volume_group[i].vg_name); + LOG_RUN_ERR("DSS instance failed to load dss ctrl of vg:%s!", g_vgs_info->volume_group[i].vg_name); return status; } @@ -1347,12 +1334,10 @@ status_t dss_load_volume_ctrl(dss_vg_info_item_t *vg_item, dss_volume_ctrl_t *vo LOG_DEBUG_ERR("Failed to load vg:%s volume ctrl.", vg_item->vg_name); return status; } - if (remote == CM_FALSE) { uint32 checksum = dss_get_checksum(volume_ctrl, DSS_VOLUME_CTRL_SIZE); dss_check_checksum(checksum, volume_ctrl->checksum); } - return CM_SUCCESS; } @@ -1537,7 +1522,13 @@ static inline bool32 dss_need_load_remote(int size) return ((remote_read_proc != NULL) && (dss_is_readonly()) && (size <= (int32)DSS_LOADDISK_BUFFER_SIZE)); } -#define DSS_READ_VOLUME_TRY_MAX 5 +/* + 1、when the node is standby, just send message to primary to read volume + 2、if the primary is just in recovery or switch, may wait the read request + 3、if read failed, just retry. + 4、may be standby switch to primary, just read volume from self; + 5、may be primary just change to standby, just read volume from new primary; +*/ #define DSS_READ_REMOTE_INTERVAL 50 static bool32 dss_read_remote_checksum(void *buf, int32 size) @@ -1547,6 +1538,14 @@ static bool32 dss_read_remote_checksum(void *buf, int32 size) return sum1 == sum2; } +bool32 dss_need_exec_local() +{ + dss_config_t *cfg = dss_get_inst_cfg(); + uint32 master_id = dss_get_master_id(); + uint32 curr_id = (uint32)(cfg->params.inst_id); + return ((curr_id == master_id)); +} + status_t dss_read_volume_inst( dss_vg_info_item_t *vg_item, dss_volume_t *volume, int64 offset, void *buf, int32 size, bool32 *remote) { @@ -1556,19 +1555,11 @@ status_t dss_read_volume_inst( CM_ASSERT(size % DSS_DISK_UNIT_SIZE == 0); CM_ASSERT(((uint64)buf) % DSS_DISK_UNIT_SIZE == 0); - uint8 i = 0; while (dss_need_load_remote(size) == CM_TRUE && status != CM_SUCCESS) { status = remote_read_proc(vg_item->vg_name, volume, offset, buf, size); - i++; if (status != CM_SUCCESS) { - int32 errcode = cm_get_error_code(); - if (errcode == ERR_DSS_GET_MASTER_ID) { - LOG_DEBUG_INF("Read volume from local disk when dss get master id failed"); - cm_reset_error(); - break; - } LOG_RUN_ERR("Failed to load disk(%s) data from the active node, result:%d", volume->name_p, status); - if (i > DSS_READ_VOLUME_TRY_MAX) { + if (dss_need_exec_local()) { break; } cm_sleep(DSS_READ_REMOTE_INTERVAL); @@ -1577,9 +1568,6 @@ status_t dss_read_volume_inst( if (dss_read_remote_checksum(buf, size) != CM_TRUE) { LOG_RUN_ERR("Failed to load disk(%s) data from the active node, checksum error", volume->name_p); - if (i > DSS_READ_VOLUME_TRY_MAX) { - break; - } continue; } @@ -1607,7 +1595,6 @@ status_t dss_read_volume_4standby(const char *vg_name, uint32 volumeid, int64 of LOG_RUN_ERR("Read volume for standby fialed, vg(%s) voiume id[%u] error.", vg_name, volumeid); return CM_ERROR; } - dss_volume_t *volume = &vg_item->volume_handle[volumeid]; if (volume->handle == DSS_INVALID_HANDLE) { diff --git a/src/common/dss_diskgroup.h b/src/common/dss_diskgroup.h index c9cb4ebb8b30757a96a464a5c833e76c613ab035..98acfcbe4fde3db0430852a5b9b873a65586b28f 100644 --- a/src/common/dss_diskgroup.h +++ b/src/common/dss_diskgroup.h @@ -107,7 +107,6 @@ status_t dss_write_volume_inst( dss_vg_info_item_t *vg_item, dss_volume_t *volume, int64 offset, const void *buf, uint32 size); status_t dss_read_volume_inst( dss_vg_info_item_t *vg_item, dss_volume_t *volume, int64 offset, void *buf, int32 size, bool32 *remote); - status_t dss_init_vol_handle(dss_vg_info_item_t *vg_item, int32 flags, dss_vol_handles_t *vol_handles); void dss_destroy_vol_handle(dss_vg_info_item_t *vg_item, dss_vol_handles_t *vol_handles, uint32 size); extern dss_vg_info_t *g_vgs_info; @@ -166,8 +165,10 @@ bool32 dss_is_server(void); bool32 dss_is_readwrite(void); bool32 dss_is_readonly(void); void dss_set_server_flag(void); -void dss_set_server_status_flag(int32 dss_status); +bool32 dss_need_exec_local(); int32 dss_get_server_status_flag(); +void dss_set_server_status_flag(int32 dss_status); +status_t dss_load_ctrlinfo(uint32 index); status_t dss_init_volume(dss_vg_info_item_t *vg_item, dss_volume_ctrl_t *volume); status_t dss_check_write_volume(dss_vg_info_item_t *vg_item, uint32 volumeid, int64 offset, void *buf, uint32 size); diff --git a/src/common/dss_file_def.h b/src/common/dss_file_def.h index 58c10dbb3965899c0ad8d62a6d3a32934cfceee6..2d060fcf5dc1a450591bd5ecc734e51f84861a82 100644 --- a/src/common/dss_file_def.h +++ b/src/common/dss_file_def.h @@ -53,7 +53,7 @@ #define DSS_VG_CONF_NAME "dss_vg_conf.ini" #define DSS_RECYLE_DIR_NAME ".recycle" -#define DSS_CTRL_RESERVE_SIZE (SIZE_K(230) + 512) +#define DSS_CTRL_RESERVE_SIZE (SIZE_K(742) + 512) #define DSS_CTRL_CORE_OFFSET OFFSET_OF(dss_ctrl_t, core_data) #define DSS_CTRL_VOLUME_OFFSET OFFSET_OF(dss_ctrl_t, volume_data) @@ -230,7 +230,6 @@ typedef struct st_dss_ctrl { }; char root[DSS_ROOT_FT_DISK_SIZE]; // dss_root_ft_block_t, 8KB - char log_buf[DSS_LOG_BUFFER_SIZE]; char reserve[DSS_CTRL_RESERVE_SIZE]; } dss_ctrl_t; diff --git a/src/common/dss_log.c b/src/common/dss_log.c index 345645640ac9b5505a2b3d0b3d414789005bceb0..b5294c70cc864fa8395d542d6cf28d332518ca2f 100644 --- a/src/common/dss_log.c +++ b/src/common/dss_log.c @@ -98,7 +98,6 @@ const char *g_dss_error_desc[DSS_ERROR_COUNT] = { Eg:server_locator=\"UDS:UNIX_emserver.domain\"", [ERR_DSS_RECV_MSG_FAILED] = "Recv msg failed, errcode:%d, inst:%u.", [ERR_DSS_LINK_NOT_EXIST] = "The link %s of %s does not exist.", - [ERR_DSS_GET_MASTER_ID] = "DSS get master id failed", }; static status_t dss_init_log_file(log_param_t *log_param, dss_config_t *inst_cfg) diff --git a/src/common/dss_protocol.c b/src/common/dss_protocol.c index 28b308e6ac1d62718c4dd30cb8426fb8ba10b497..ccae8a96dfdec276b099861cb6e503a7ff30f18d 100644 --- a/src/common/dss_protocol.c +++ b/src/common/dss_protocol.c @@ -131,11 +131,9 @@ status_t dss_write_packet(cs_pipe_t *pipe, dss_packet_t *pack) if (pack->head->size > DSS_MAX_PACKET_SIZE) { DSS_RETURN_IFERR2(CM_ERROR, CM_THROW_ERROR(ERR_BUFFER_OVERFLOW, "PACKET BUFFER OVERFLOW")); } - status_t status = VIO_SEND_TIMED(pipe, pack->buf, pack->head->size, DSS_DEFAULT_NULL_VALUE); DSS_RETURN_IFERR2( status, CM_THROW_ERROR(ERR_PACKET_SEND, pack->buf_size, pack->head->size, DSS_DEFAULT_NULL_VALUE)); - return CM_SUCCESS; } @@ -207,10 +205,12 @@ static status_t dss_call_base(cs_pipe_t *pipe, dss_packet_t *req, dss_packet_t * bool32 ready = CM_FALSE; if (dss_write(pipe, req) != CM_SUCCESS) { + LOG_RUN_ERR("dss write failed."); return CM_ERROR; } if (cs_wait(pipe, CS_WAIT_FOR_READ, pipe->socket_timeout, &ready) != CM_SUCCESS) { + LOG_RUN_ERR("cs wait failed."); return CM_ERROR; } diff --git a/src/common/dss_redo.c b/src/common/dss_redo.c index 87b7a738656bcdc5adbbf1a535cb3ac16e70da87..032646d2fddd9a35efb4fef48728afd85acb1c02 100644 --- a/src/common/dss_redo.c +++ b/src/common/dss_redo.c @@ -71,9 +71,6 @@ status_t dss_set_log_buf_for_first_vg(const char *vg_name, dss_vg_info_item_t *v status_t dss_set_log_buf(const char *vg_name, dss_vg_info_item_t *vg_item, dss_volume_t *volume) { -#ifndef OPENGAUSS - return CM_SUCCESS; -#endif if (!is_first_vg(vg_name)) { return CM_SUCCESS; } @@ -159,12 +156,7 @@ char *dss_get_log_buf_from_instance(dss_session_t *session, dss_vg_info_item_t * char *dss_get_total_log_buf(dss_session_t *session, dss_vg_info_item_t *vg_item, dss_redo_type_t type) { - char *log_buf = NULL; -#ifdef OPENGAUSS - log_buf = dss_get_log_buf_from_instance(session, vg_item, type); -#else - log_buf = vg_item->dss_ctrl->log_buf; -#endif + char *log_buf = dss_get_log_buf_from_instance(session, vg_item, type); return log_buf; } @@ -209,17 +201,12 @@ status_t dss_write_redolog_to_disk(dss_vg_info_item_t *item, int64 offset, char return dss_write_ctrl_to_disk(item, offset, buf, size); } -status_t dss_flush_log_inner(int32_t log_split, dss_vg_info_item_t *vg_item, char *log_buf, uint32 flush_size) +status_t dss_flush_log_inner(int32_t log_split, char *log_buf, uint32 flush_size) { - int64 offset; -#ifdef OPENGAUSS - vg_item = dss_get_first_vg_item(); + dss_vg_info_item_t *vg_item = dss_get_first_vg_item(); dss_ctrl_t *dss_ctrl = vg_item->dss_ctrl; uint64 au_size = dss_get_vg_au_size(dss_ctrl); - offset = au_size + log_split * DSS_INSTANCE_LOG_SPLIT_SIZE; -#else - offset = (int64)DSS_LOG_OFFSET; -#endif + int64 offset = au_size + log_split * DSS_INSTANCE_LOG_SPLIT_SIZE; status_t status = dss_write_redolog_to_disk(vg_item, offset, log_buf, flush_size); return status; } @@ -241,24 +228,10 @@ status_t dss_flush_log(int32_t log_split, dss_vg_info_item_t *vg_item, char *log // tail // tail errcode = memcpy_s(log_buf + batch->size, DSS_LOG_BUFFER_SIZE - batch->size, batch, sizeof(dss_redo_batch_t)); securec_check_ret(errcode); - status_t status = dss_flush_log_inner(log_split, vg_item, log_buf, flush_size); + status_t status = dss_flush_log_inner(log_split, log_buf, flush_size); return status; } -status_t dss_reset_log(dss_vg_info_item_t *vg_item) -{ - errno_t errcode = memset_s(vg_item->dss_ctrl->log_buf, DSS_LOG_BUFFER_SIZE, 0, DSS_DISK_UNIT_SIZE); - securec_check_ret(errcode); - - status_t status = - dss_write_redolog_to_disk(vg_item, (int64)DSS_LOG_OFFSET, vg_item->dss_ctrl->log_buf, DSS_DISK_UNIT_SIZE); - if (status != CM_SUCCESS) { - return status; - } - - return CM_SUCCESS; -} - static status_t rp_redo_update_volhead(dss_vg_info_item_t *vg_item, dss_redo_entry_t *entry) { #ifndef WIN32 @@ -1363,49 +1336,16 @@ bool32 dss_check_redo_log_available(dss_redo_batch_t *batch, dss_vg_info_item_t } } while (0); if (!is_complete) { -#ifdef OPENGAUSS if (slot == DSS_LOG_BUF_SLOT_COUNT) { dss_reset_all_log_slot(); } else { (void)dss_reset_log_slot_head(slot); } -#else - (void)dss_reset_log(vg_item); -#endif return CM_FALSE; } return CM_TRUE; } -status_t dss_recover(dss_vg_info_item_t *vg_item) -{ - dss_redo_entry_t *entry = NULL; - dss_redo_batch_t *batch = NULL; - uint32 data_size, offset; - batch = (dss_redo_batch_t *)vg_item->dss_ctrl->log_buf; - if (!dss_check_redo_log_available(batch, vg_item, DSS_LOG_BUF_SLOT_COUNT)) { - LOG_RUN_INF("Reset log when find uncompleted redo data."); - return CM_SUCCESS; - } - data_size = batch->size - DSS_REDO_BATCH_HEAD_SIZE; - LOG_RUN_INF("Begin recovering."); - vg_item->status = DSS_STATUS_RECOVERY; - if (dss_recover_ctrlinfo(vg_item) != CM_SUCCESS) { - DSS_RETURN_IFERR2(CM_ERROR, LOG_DEBUG_ERR("dss ctrl is invalid.")); - } - offset = 0; - while (offset < data_size) { - entry = (dss_redo_entry_t *)(batch->data + offset); - status_t status = dss_replay(vg_item, entry); - DSS_RETURN_IFERR2(status, LOG_DEBUG_ERR("Failed to replay redo log.")); - offset += entry->size; - } - (void)dss_reset_log(vg_item); - vg_item->status = DSS_STATUS_OPEN; - LOG_RUN_INF("Complete recovering."); - return CM_SUCCESS; -} - static int32 lsn_compare(const void *pa, const void *pb) { const dss_sort_handle_t *a = (const dss_sort_handle_t *)pa; @@ -1477,26 +1417,18 @@ status_t dss_recover_when_instance_start(dss_redo_batch_t *batch, bool32 need_ch char *dss_get_log_buf(dss_session_t *session, dss_vg_info_item_t *vg_item) { -#ifdef OPENGAUSS if (session->log_split == DSS_INVALID_SLOT) { return NULL; } dss_log_file_ctrl_t *log_ctrl = dss_get_kernel_instance_log_ctrl(); char *log_buf = (char *)(log_ctrl->log_buf + session->log_split * DSS_LOG_BUFFER_SIZE); -#else - char *log_buf = vg_item->dss_ctrl->log_buf; -#endif return log_buf; } void dss_reset_log_buf(dss_session_t *session, dss_vg_info_item_t *vg_item) { -#ifdef OPENGAUSS (void)dss_reset_log_slot_head(session->log_split); dss_free_log_slot(session); -#else - (void)dss_reset_log(vg_item); -#endif } status_t dss_process_redo_log(dss_session_t *session, dss_vg_info_item_t *vg_item) @@ -1514,8 +1446,7 @@ status_t dss_process_redo_log(dss_session_t *session, dss_vg_info_item_t *vg_ite return CM_SUCCESS; } - status_t status; - status = dss_flush_log(session->log_split, vg_item, log_buf); + status_t status = dss_flush_log(session->log_split, vg_item, log_buf); DSS_RETURN_IFERR2(status, LOG_DEBUG_ERR("Failed to flush log,errcode:%d.", cm_get_error_code())); // #ifndef WIN32 @@ -1531,46 +1462,6 @@ status_t dss_process_redo_log(dss_session_t *session, dss_vg_info_item_t *vg_ite return CM_SUCCESS; } -status_t dss_check_redo_and_recover(dss_vg_info_item_t *vg_item) -{ -#ifdef OPENGAUSS - return CM_SUCCESS; -#else - bool32 remote = CM_FALSE; - CM_ASSERT(dss_is_server()); - LOG_DEBUG_INF("Begin to check redo and recover"); - status_t status = - dss_load_vg_ctrl_part(vg_item, (int64)DSS_LOG_OFFSET, vg_item->dss_ctrl->log_buf, DSS_DISK_UNIT_SIZE, &remote); - DSS_RETURN_IFERR2(status, LOG_DEBUG_ERR("Failed to load redo log.")); - - dss_redo_batch_t *batch = (dss_redo_batch_t *)vg_item->dss_ctrl->log_buf; - if (batch->size == 0) { - LOG_DEBUG_INF("Redo log batch is empty, check dssctrl before check_redo_and_recover ends."); - if (dss_recover_ctrlinfo(vg_item) != CM_SUCCESS) { - DSS_RETURN_IFERR2(CM_ERROR, LOG_DEBUG_ERR("dss ctrl is invalid.")); - } - return CM_SUCCESS; - } - - DSS_LOG_DEBUG_OP("There are redo logs, size:%u.", batch->size); - uint32 load_size = CM_CALC_ALIGN(batch->size + sizeof(dss_redo_batch_t), DSS_DISK_UNIT_SIZE); - if (load_size > DSS_LOG_BUFFER_SIZE) { - // invalid log ,ignored it. - (void)dss_reset_log(vg_item); - LOG_DEBUG_INF("Redo log is invalid, ignored it."); - return CM_SUCCESS; - } - - if (load_size > DSS_DISK_UNIT_SIZE) { - status = dss_load_vg_ctrl_part( - vg_item, (int64)DSS_LOG_OFFSET, vg_item->dss_ctrl->log_buf, (int32)load_size, &remote); - DSS_RETURN_IFERR2(status, LOG_DEBUG_ERR("Failed to load redo log.")); - } - - return dss_recover(vg_item); -#endif -} - static status_t dss_rollback(dss_vg_info_item_t *vg_item, dss_redo_entry_t *entry) { DSS_LOG_DEBUG_OP("rollback redo, type:%u.", entry->type); @@ -1616,15 +1507,11 @@ status_t dss_rollback_log(dss_vg_info_item_t *vg_item, char *log_buf) void dss_rollback_mem_update(int32_t log_split, dss_vg_info_item_t *vg_item) { char *log_buf = NULL; -#ifdef OPENGAUSS if (log_split == DSS_INVALID_SLOT) { return; } dss_log_file_ctrl_t *log_ctrl = dss_get_kernel_instance_log_ctrl(); log_buf = (char *)(log_ctrl->log_buf + log_split * DSS_INSTANCE_LOG_SPLIT_SIZE); -#else - log_buf = vg_item->dss_ctrl->log_buf; -#endif dss_redo_batch_t *batch = (dss_redo_batch_t *)log_buf; if (batch->size == 0) { return; @@ -1634,16 +1521,11 @@ void dss_rollback_mem_update(int32_t log_split, dss_vg_info_item_t *vg_item) return; } LOG_RUN_INF("Try to rollback!!!"); - status_t status; vg_item->status = DSS_STATUS_ROLLBACK; status = dss_rollback_log(vg_item, log_buf); CM_ASSERT(status == CM_SUCCESS); -#ifdef OPENGAUSS (void)dss_reset_log_slot_head(log_split); -#else - (void)dss_reset_log(vg_item); -#endif vg_item->status = DSS_STATUS_OPEN; return; } diff --git a/src/common/dss_redo.h b/src/common/dss_redo.h index b8c35a07ae3a77df9b1fcd019b4144007ef8156d..51dc24c0a8039e5c86ee9f0178855ceb039c57fc 100644 --- a/src/common/dss_redo.h +++ b/src/common/dss_redo.h @@ -56,11 +56,6 @@ typedef enum en_dss_redo_type { DSS_RT_SET_FILE_FS_BLOCK, } dss_redo_type_t; -static inline bool32 dss_imediate_flush_redo_type(dss_redo_type_t type) -{ - return ((type >= DSS_RT_ALLOC_FILE_TABLE_NODE) && (type <= DSS_RT_RENAME_FILE)); -} - typedef struct st_dss_redo_entry { dss_redo_type_t type; uint32 vg_id; // exist operation multi vg @@ -113,8 +108,6 @@ bool32 rp_check_block_addr(const dss_block_addr_his_t *addr_his, const void *blo status_t dss_write_redolog_to_disk(dss_vg_info_item_t *item, int64 offset, char *buf, uint32 size); void dss_put_log(dss_session_t *session, dss_vg_info_item_t *vg_item, dss_redo_type_t type, void *data, uint32 size); status_t dss_flush_log(int32_t log_split, dss_vg_info_item_t *vg_item, char *log_buf); -status_t dss_reset_log(dss_vg_info_item_t *vg_item); -status_t dss_recover(dss_vg_info_item_t *vg_item); status_t dss_recover_when_instance_start(dss_redo_batch_t *batch, bool32 need_check); status_t dss_recover_ctrlinfo(dss_vg_info_item_t *vg_item); status_t dss_apply_log(dss_vg_info_item_t *vg_item, char *log_buf); @@ -124,13 +117,11 @@ bool32 dss_check_redo_log_available(dss_redo_batch_t *batch, dss_vg_info_item_t void dss_rollback_mem_update(int32_t log_split, dss_vg_info_item_t *vg_item); void dss_free_log_slot(dss_session_t *session); void dss_reset_log_buf(dss_session_t *session, dss_vg_info_item_t *vg_item); -status_t dss_check_redo_and_recover(dss_vg_info_item_t *vg_item); char *dss_get_log_buf_from_instance(dss_session_t *session, dss_vg_info_item_t *vg_item, dss_redo_type_t type); char *dss_get_total_log_buf(dss_session_t *session, dss_vg_info_item_t *vg_item, dss_redo_type_t type); status_t dss_set_log_buf_for_first_vg(const char *vg_name, dss_vg_info_item_t *vg_item, dss_volume_t *volume); status_t dss_set_log_buf(const char *vg_name, dss_vg_info_item_t *vg_item, dss_volume_t *volume); char *dss_get_log_buf(dss_session_t *session, dss_vg_info_item_t *vg_item); -status_t dss_flush_log_inner(int32_t log_split, dss_vg_info_item_t *vg_item, char *log_buf, uint32 flush_size); #ifdef __cplusplus } diff --git a/src/common/dss_session.h b/src/common/dss_session.h index 36783fd1d1304fc2a54bdf41de218c2fbd36a3d6..778568d3873f760f933ad2a581d8f9c18e3da2b0 100644 --- a/src/common/dss_session.h +++ b/src/common/dss_session.h @@ -72,6 +72,13 @@ typedef enum en_protocol_type { PROTO_TYPE_GS = 1, } protocol_type_t; +typedef enum en_dss_thread_status { + DSS_THREAD_STATUS_IDLE = 0, + DSS_THREAD_STATUS_RUNNING, + DSS_THREAD_STATUS_PAUSING, + DSS_THREAD_STATUS_PAUSED, +} dss_thread_status_t; + typedef struct st_dss_session { uint32 id; bool32 is_closed; @@ -90,6 +97,7 @@ typedef struct st_dss_session { dss_kernel_instance_t *kernel_instance; // global unique volatile uint64 curr_lsn; // latest lsn generated by current session dss_audit_info_t audit_info; + dss_thread_status_t status; } dss_session_t; static inline char *dss_init_sendinfo_buf(char *input) diff --git a/src/common_api/dss_interaction.c b/src/common_api/dss_interaction.c index a51833dd55aeab1608207f93a9123a59eed14d11..1809a8b265cdebe350ec647252ce4a6483572d8d 100644 --- a/src/common_api/dss_interaction.c +++ b/src/common_api/dss_interaction.c @@ -62,7 +62,7 @@ status_t dss_open_file_on_server(dss_conn_t *conn, const char *file_path, int fl return CM_SUCCESS; } -status_t dss_set_status_on_server(dss_conn_t *conn, int status) +status_t dss_get_inst_status_on_server(dss_conn_t *conn, int *status) { int32 errcode; char *errmsg = NULL; @@ -70,9 +70,44 @@ status_t dss_set_status_on_server(dss_conn_t *conn, int status) dss_init_packet(&conn->pack, conn->pipe.options); dss_init_set(&conn->pack); dss_packet_t *send_pack = &conn->pack; - send_pack->head->cmd = DSS_CMD_SET_STATUS; + send_pack->head->cmd = DSS_CMD_GET_INST_STATUS; + send_pack->head->flags = 0; + + dss_packet_t *ack_pack = &conn->pack; + DSS_RETURN_IF_ERROR(dss_call_ex(&conn->pipe, send_pack, ack_pack)); + + if (ack_pack->head->result != CM_SUCCESS) { + dss_cli_get_err(ack_pack, &errcode, &errmsg); + DSS_THROW_ERROR_EX(errcode, "%s", errmsg); + return CM_ERROR; + } + text_t extra_info = CM_NULL_TEXT; + dss_init_get(ack_pack); + if (dss_get_text(ack_pack, &extra_info) != CM_SUCCESS) { + DSS_THROW_ERROR(ERR_DSS_CLI_EXEC_FAIL, dss_get_cmd_desc(DSS_CMD_GET_INST_STATUS), "get inst status error"); + LOG_DEBUG_ERR("get inst status error"); + return CM_ERROR; + } + if (extra_info.len != sizeof(uint32)) { + DSS_THROW_ERROR( + ERR_DSS_CLI_EXEC_FAIL, dss_get_cmd_desc(DSS_CMD_GET_INST_STATUS), "get inst status length error"); + LOG_DEBUG_ERR("get inst status length error"); + return CM_ERROR; + } + *status = *(int *)extra_info.str; + return CM_SUCCESS; +} + +status_t dss_set_main_inst_on_server(dss_conn_t *conn) +{ + int32 errcode; + char *errmsg = NULL; + + dss_init_packet(&conn->pack, conn->pipe.options); + dss_init_set(&conn->pack); + dss_packet_t *send_pack = &conn->pack; + send_pack->head->cmd = DSS_CMD_SET_MAIN_INST; send_pack->head->flags = 0; - DSS_RETURN_IF_ERROR(dss_put_int32(send_pack, (uint32)status)); dss_packet_t *ack_pack = &conn->pack; DSS_RETURN_IF_ERROR(dss_call_ex(&conn->pipe, send_pack, ack_pack)); diff --git a/src/common_api/dss_interaction.h b/src/common_api/dss_interaction.h index 1f96cb5a2bb6ec53ccf6d76e3f62bc971df631cf..89324867222c9242ab1e8a8f5f1500def22ce2d6 100644 --- a/src/common_api/dss_interaction.h +++ b/src/common_api/dss_interaction.h @@ -44,9 +44,9 @@ typedef struct st_dss_conn { void dss_cli_get_err(dss_packet_t *pack, int32 *errcode, char **errmsg); status_t dss_open_file_on_server(dss_conn_t *conn, const char *file_path, int flag); -status_t dss_set_status_on_server(dss_conn_t *conn, int status); status_t dss_close_file_on_server(dss_conn_t *conn, dss_vg_info_item_t *vg_item, uint64 fid, ftid_t ftid); - +status_t dss_get_inst_status_on_server(dss_conn_t *conn, int *status); +status_t dss_set_main_inst_on_server(dss_conn_t *conn); #ifdef __cplusplus } #endif diff --git a/src/interface/dss_api.c b/src/interface/dss_api.c index e12e893401e1e377dfa6c24396a4629abc57b685..f3f5bd2f3c66181ddff216db7dd921c0733a3d57 100644 --- a/src/interface/dss_api.c +++ b/src/interface/dss_api.c @@ -401,17 +401,20 @@ int dss_fopen(const char *file, int flag, int *handle) return (int)ret; } -int dss_set_server_status(dss_server_status_t status) +int dss_get_inst_status(int *status) { - if (status < DSS_STATUS_READONLY || status > DSS_STATUS_READWRITE) { - DSS_THROW_ERROR(ERR_DSS_INVALID_PARAM, "status"); - return CM_ERROR; - } dss_conn_t *conn = NULL; status_t ret = dss_get_conn(&conn); - DSS_RETURN_IFERR2(ret, LOG_DEBUG_ERR("set sever status get conn error")); + DSS_RETURN_IFERR2(ret, LOG_DEBUG_ERR("get conn error when get inst status")); + return (int)dss_get_inst_status_on_server(conn, status); +} - return (int)dss_set_status_on_server(conn, (int)status); +int dss_set_main_inst() +{ + dss_conn_t *conn = NULL; + status_t ret = dss_get_conn(&conn); + DSS_RETURN_IFERR2(ret, LOG_DEBUG_ERR("get conn error when set main inst")); + return (int)dss_set_main_inst_on_server(conn); } int dss_fclose(int handle) diff --git a/src/interface/dss_api.h b/src/interface/dss_api.h index 84ccfc3dc71b27cae2279df00db9c74c7bf1f873..d13c54309e01e380c50c297bb9e991743ec7cf9b 100644 --- a/src/interface/dss_api.h +++ b/src/interface/dss_api.h @@ -97,7 +97,7 @@ typedef struct st_dss_stat { #define DSS_LOCAL_MINOR_VER_WEIGHT 1000 #define DSS_LOCAL_MAJOR_VERSION 0 #define DSS_LOCAL_MINOR_VERSION 0 -#define DSS_LOCAL_VERSION 2 +#define DSS_LOCAL_VERSION 3 typedef struct st_dss_dirent *dss_dir_item_t; typedef struct st_dss_stat *dss_stat_info_t; @@ -149,7 +149,8 @@ DSS_DECLARE void dss_refresh_logger(char *log_field, unsigned long long *value); // connection DSS_DECLARE int dss_set_svr_path(const char *conn_path); // instance param -DSS_DECLARE int dss_set_server_status(dss_server_status_t status); +DSS_DECLARE int dss_set_main_inst(); +DSS_DECLARE int dss_get_inst_status(); DSS_DECLARE int dss_stat(const char *path, dss_stat_info_t item); DSS_DECLARE int dss_lstat(const char *path, dss_stat_info_t item); diff --git a/src/interface/dss_errno.h b/src/interface/dss_errno.h index 93b5601464598d36a67365186ed0f2782463f24d..71b02adc55c7d44f19d1f8bf4971b012dbf91099 100644 --- a/src/interface/dss_errno.h +++ b/src/interface/dss_errno.h @@ -118,9 +118,7 @@ extern "C" { #define ERR_DSS_UDS_INVALID_URL 2411 #define ERR_DSS_RECV_MSG_FAILED 2412 #define ERR_DSS_LINK_NOT_EXIST 2413 -#define ERR_DSS_GET_MASTER_ID 2414 -#define ERR_DSS_INIT_LOGGER_FAILED 2415 - +#define ERR_DSS_INIT_LOGGER_FAILED 2414 #define ERR_DSS_CEIL 2500 #ifdef __cplusplus diff --git a/src/service/dss_instance.c b/src/service/dss_instance.c index cbe3d5ddd5656a7c7fd62600067ea70b7e6a6008..4e048d70730c7d31f96b6e8d238fa8b7ebe953e8 100644 --- a/src/service/dss_instance.c +++ b/src/service/dss_instance.c @@ -28,6 +28,7 @@ #include "cm_error.h" #include "dss_errno.h" #include "dss_defs.h" +#include "dss_api.h" #include "dss_file.h" #include "dss_malloc.h" #include "dss_mes.h" @@ -220,6 +221,7 @@ status_t dss_recover_from_instance(dss_instance_t *inst) char *log_buf = inst->kernel_instance->log_ctrl.log_buf; dss_redo_batch_t *batch = (dss_redo_batch_t *)log_buf; batch->size = sizeof(dss_redo_batch_t); + batch->count = 0; dss_vg_info_item_t *vg_item = &g_vgs_info->volume_group[0]; if (dss_check_vg_ctrl_valid(vg_item) != CM_SUCCESS) { DSS_FREE_POINT(batch); @@ -253,21 +255,91 @@ status_t dss_recover_from_instance(dss_instance_t *inst) return status; } -/* - 1、when create first vg, init global log buffer; - 2、when dss_server start up, load log_buf, if nouse, ignore; if inuse, sort by lsn and recover; - 3、when session execute, allocate log split and record redo log; +status_t dss_recover_when_change_status(dss_instance_t *inst) +{ + return dss_recover_from_instance(inst); +} + +bool32 dss_config_cm() +{ + dss_config_t *inst_cfg = dss_get_inst_cfg(); + char *value = cm_get_config_value(&inst_cfg->config, "DSS_CM_SO_NAME"); + if (value == NULL || strlen(value) == 0) { + LOG_RUN_INF("dss cm config of DSS_CM_SO_NAME is empty."); + return CM_FALSE; + } + return CM_TRUE; +} + +/* For startup: + (1) if master_id == cur_id and status is recovery,try to recover, then set status open; + (2) if master_id != cur_id and status is recovery, just set status open. When metadata needs to be modified, + just send messages to master_id, if master_id is in recovery, just reject; */ -status_t dss_get_instance_log_buf_and_recover(dss_instance_t *inst) +status_t dss_change_instance_status_to_open(dss_instance_t *cur_inst, uint32 curr_id, uint32 master_id) { -#ifndef OPENGAUSS + status_t ret; + if (curr_id == master_id) { + LOG_RUN_INF("instance [%u] begin to recover.", master_id); + ret = dss_recover_when_change_status(cur_inst); + if (ret != CM_SUCCESS) { + return ret; + } + } + cur_inst->status = ZFS_STATUS_OPEN; return CM_SUCCESS; +} + +/* + 1、NO CM:every node can do readwrite + 2、CM:get cm lock to be master + 3、ENABLE_DSSTEST: for test, select min id as master +*/ +status_t dss_get_instance_log_buf_no_cm(dss_instance_t *inst) +{ + dss_config_t *inst_cfg = dss_get_inst_cfg(); + uint32 curr_id = (uint32)inst_cfg->params.inst_id; + if (inst_cfg->params.inst_cnt <= 1) { + dss_set_master_id(curr_id); + dss_set_server_status_flag(DSS_STATUS_READWRITE); + return dss_change_instance_status_to_open(inst, curr_id, curr_id); + } +#ifdef ENABLE_DSSTEST + uint32 i; + for (i = 0; i < DSS_MAX_INSTANCES; i++) { + if (inst_cfg->params.ports[i] != 0) { + dss_set_master_id(i); + LOG_RUN_INF("Set min id %u as master id.", i); + break; + } + } + if (i == curr_id) { + dss_set_server_status_flag(DSS_STATUS_READWRITE); + return dss_change_instance_status_to_open(inst, curr_id, curr_id); + } else { + dss_set_server_status_flag(DSS_STATUS_READONLY); + inst->status = ZFS_STATUS_OPEN; + } +#else + if (!dss_config_cm()) { + dss_set_master_id(curr_id); + dss_set_server_status_flag(DSS_STATUS_READWRITE); + return dss_change_instance_status_to_open(inst, curr_id, curr_id); + } #endif + return CM_SUCCESS; +} +/* + 1、when create first vg, init global log buffer; + 2、when dss_server start up, init memory log buf; +*/ +status_t dss_get_instance_log_buf(dss_instance_t *inst) +{ status_t ret = dss_alloc_instance_log_buf(inst); if (ret != CM_SUCCESS) { return ret; } - return dss_recover_from_instance(inst); + return dss_get_instance_log_buf_no_cm(inst); } static status_t instance_init_core(dss_instance_t *inst, uint32 objectid) @@ -283,7 +355,7 @@ static status_t instance_init_core(dss_instance_t *inst, uint32 objectid) errno_t errcode = memset_s(&g_dss_kernel_instance, sizeof(g_dss_kernel_instance), 0, sizeof(g_dss_kernel_instance)); securec_check_ret(errcode); inst->kernel_instance = &g_dss_kernel_instance; - status = dss_get_instance_log_buf_and_recover(inst); + status = dss_get_instance_log_buf(inst); DSS_RETURN_IFERR2(status, DSS_THROW_ERROR(ERR_DSS_GA_INIT, "DSS instance failed to get log buf")); uint32 sess_cnt = inst->inst_cfg.params.cfg_session_num + inst->inst_cfg.params.work_thread_cnt + inst->inst_cfg.params.channel_num; @@ -431,48 +503,6 @@ void dss_check_peer_by_inst(dss_instance_t *inst, uint64 inst_id) dss_check_peer_inst(inst, inst_id); } -bool32 dss_check_inst_workstatus(uint32 instid) -{ - dss_instance_t *inst = &g_dss_instance; - cm_spin_lock(&inst->inst_work_lock, NULL); - cm_res_stat_ptr_t res = cm_res_get_stat(&inst->cm_res.mgr); - if (res == NULL) { - cm_spin_unlock(&inst->inst_work_lock); - return CM_FALSE; - } - int insttotal = cm_res_get_instance_count(&inst->cm_res.mgr, res); - for (int idx = 0; idx < insttotal; idx++) { - const cm_res_inst_info_ptr_t inst_res = cm_res_get_instance_info(&inst->cm_res.mgr, res, (unsigned int)idx); - if (inst_res == NULL) { - cm_res_free_stat(&inst->cm_res.mgr, res); - cm_spin_unlock(&inst->inst_work_lock); - return CM_FALSE; - } - - int resid = cm_res_get_inst_instance_id(&inst->cm_res.mgr, inst_res); - int workstatus = cm_res_get_inst_is_work_member(&inst->cm_res.mgr, inst_res); - if ((workstatus != 0) && ((uint32)resid == instid)) { - cm_res_free_stat(&inst->cm_res.mgr, res); - cm_spin_unlock(&inst->inst_work_lock); - return CM_TRUE; - } - - if (workstatus == 0) { - LOG_RUN_INF("dss instance [%d] is not work member. May be kicked off by cm.", resid); - if ((uint32)resid == instid) { - cm_res_free_stat(&inst->cm_res.mgr, res); - cm_spin_unlock(&inst->inst_work_lock); - return CM_FALSE; - } - } - } - - LOG_RUN_INF("dss instance [%d] is not work member. May be kicked off by cm.", instid); - cm_res_free_stat(&inst->cm_res.mgr, res); - cm_spin_unlock(&inst->inst_work_lock); - return CM_FALSE; -} - static void dss_check_peer_by_cm(dss_instance_t *inst) { cm_res_stat_ptr_t res = cm_res_get_stat(&inst->cm_res.mgr); @@ -534,6 +564,116 @@ void dss_init_cm_res(dss_instance_t *inst) return; } +#ifdef ENABLE_DSSTEST +status_t dss_get_cm_res_lock_owner(dss_cm_res *cm_res, uint32 *master_id) +{ + dss_config_t *inst_cfg = dss_get_inst_cfg(); + for (int i = 0; i < DSS_MAX_INSTANCES; i++) { + if (inst_cfg->params.ports[i] != 0) { + *master_id = i; + LOG_RUN_INF("Set min id %u as master id.", i); + break; + } + } + LOG_RUN_INF("master id is %u when get cm lock.", *master_id); + return CM_SUCCESS; +} +#else +status_t dss_get_cm_res_lock_owner(dss_cm_res *cm_res, uint32 *master_id) +{ + int ret = cm_res_get_lock_owner(&cm_res->mgr, DSS_CM_LOCK, master_id); + if (ret == CM_RES_TIMEOUT) { + return CM_ERROR; + } else if (ret == CM_RES_SUCCESS) { + return CM_SUCCESS; + } else { + *master_id = CM_INVALID_ID32; + } + return CM_SUCCESS; +} +#endif + +// get cm lock owner, if no owner, try to become.master_id can not be DSS_INVALID_ID32. +uint32 dss_get_cm_lock_owner(dss_instance_t *inst) +{ + dss_cm_res *cm_res = &inst->cm_res; + uint32 master_id = DSS_INVALID_ID32; + status_t ret = CM_SUCCESS; + date_t time_start = g_timer()->now; + date_t time_now = 0; + while (CM_TRUE) { + time_now = g_timer()->now; + if (time_now - time_start > DSS_MAX_FAIL_TIME_WITH_CM * MICROSECS_PER_SECOND) { + LOG_RUN_ERR("[DSS] ABORT INFO: Fail to get lock owner for %d seconds, exit.", DSS_MAX_FAIL_TIME_WITH_CM); + cm_fync_logfile(); + _exit(1); + } + ret = dss_get_cm_res_lock_owner(cm_res, &master_id); + if (ret != CM_SUCCESS) { + DSS_GET_CM_LOCK_LONG_SLEEP; + continue; + } + if (master_id == DSS_INVALID_ID32) { + (void)cm_res_lock(&cm_res->mgr, DSS_CM_LOCK); + continue; + } + break; + } + return master_id; +} + +/* + 1、old_master_id == master_id, just return; + 2、old_master_id != master_id, just indicates that the master has been reselected.so to juge whether recover. +*/ +void dss_get_cm_lock_and_recover(dss_instance_t *inst) +{ + uint32 old_master_id = dss_get_master_id(); + uint32 master_id = dss_get_cm_lock_owner(inst); + if (old_master_id == master_id) { + return; + } + dss_set_master_id(master_id); + dss_config_t *inst_cfg = dss_get_inst_cfg(); + uint32 curr_id = (uint32)inst_cfg->params.inst_id; + if (master_id != curr_id) { + dss_set_server_status_flag(DSS_STATUS_READONLY); + inst->status = ZFS_STATUS_OPEN; + return; + } + if (inst->status == ZFS_STATUS_SWITCH) { + return; + } + dss_wait_session_pause(inst); + cm_spin_lock(&inst->switch_lock, NULL); + inst->status = ZFS_STATUS_RECOVERY; + LOG_RUN_INF("master_id is %u when get cm lock to do recovery.", master_id); + status_t ret = dss_change_instance_status_to_open(inst, curr_id, master_id); + if (ret != CM_SUCCESS) { + cm_spin_unlock(&inst->switch_lock); + LOG_RUN_ERR("[DSS] ABORT INFO: Recover failed when get cm lock."); + cm_fync_logfile(); + _exit(1); + } + dss_session_t *session = NULL; + if (dss_create_session(NULL, &session) != CM_SUCCESS) { + cm_spin_unlock(&inst->switch_lock); + LOG_RUN_ERR("[DSS] ABORT INFO: Refresh meta info failed when create session."); + cm_fync_logfile(); + _exit(1); + } + if (dss_refresh_meta_info(session) != CM_SUCCESS) { + cm_spin_unlock(&inst->switch_lock); + LOG_RUN_ERR("[DSS] ABORT INFO: Refresh meta info failed after recovery."); + cm_fync_logfile(); + _exit(1); + } + dss_destroy_session(session); + dss_set_session_running(inst); + dss_set_server_status_flag(DSS_STATUS_READWRITE); + cm_spin_unlock(&inst->switch_lock); +} + static void dss_check_peer_inst_inner(dss_instance_t *inst) { /** diff --git a/src/service/dss_instance.h b/src/service/dss_instance.h index c75b08c13ecd5c3c6bd8e1196daa5a10767cffeb..5a50d6f78b897a01bc7e8f85a96adade03404a8d 100644 --- a/src/service/dss_instance.h +++ b/src/service/dss_instance.h @@ -47,10 +47,25 @@ extern "C" { #define DSS_INS_SIZE (sizeof(dss_share_vg_info_t)) typedef enum en_zfs_instance_status { - ZFS_STATUS_OPEN = 1, - ZFS_STATUS_RECOVERY, + ZFS_STATUS_RECOVERY = 0, + ZFS_STATUS_SWITCH, + ZFS_STATUS_OPEN, } dss_instance_status_t; +typedef enum { + CM_RES_SUCCESS = 0, + CM_RES_CANNOT_DO = 1, + CM_RES_DDB_FAILED = 2, + CM_RES_VERSION_WRONG = 3, + CM_RES_CONNECT_ERROR = 4, + CM_RES_TIMEOUT = 5, + CM_RES_NO_LOCK_OWNER = 6, +} cm_err_code; + +#define DSS_CM_LOCK "dss cm lock" +#define DSS_MAX_FAIL_TIME_WITH_CM 30 +#define DSS_GET_CM_LOCK_LONG_SLEEP cm_sleep(500) + typedef struct st_dss_cm_res { spinlock_t init_lock; bool8 is_init; @@ -60,8 +75,8 @@ typedef struct st_dss_cm_res { typedef struct st_dss_instance { int32 lock_fd; - spinlock_t lock; - dss_config_t inst_cfg; // instance config + spinlock_t switch_lock; + dss_config_t inst_cfg; // instance config_ dss_instance_status_t status; uds_lsnr_t lsnr; // HYJ: reform_ctx_t rf_ctx; @@ -85,14 +100,18 @@ status_t dss_start_lsnr(dss_instance_t *inst); void dss_uninit_cm(dss_instance_t *inst); void dss_check_peer_inst(dss_instance_t *inst, uint64 inst_id); void dss_free_log_ctrl(dss_instance_t *inst); -status_t dss_get_instance_log_buf_and_recover(dss_instance_t *inst); +status_t dss_get_instance_log_buf(dss_instance_t *inst); status_t dss_load_log_buffer(dss_redo_batch_t *batch); status_t dss_alloc_instance_log_buf(dss_instance_t *inst); status_t dss_recover_from_instance(dss_instance_t *inst); void dss_check_peer_by_inst(dss_instance_t *inst, uint64 inst_id); uint64 dss_get_inst_work_status(void); void dss_set_inst_work_status(uint64 cur_inst_map); -bool32 dss_check_inst_workstatus(uint32 instid); +status_t dss_recover_when_change_status(dss_instance_t *inst); +uint32 dss_get_cm_lock_owner(dss_instance_t *inst); +status_t dss_get_cm_res_lock_owner(dss_cm_res *cm_res, uint32 *master_id); +void dss_get_cm_lock_and_recover(dss_instance_t *inst); + #ifdef __cplusplus } #endif diff --git a/src/service/dss_lsnr.c b/src/service/dss_lsnr.c index 3530f86b927925de0f64cee769434a695fa0453f..82e76e88099f01333571eee76adb100391601431 100644 --- a/src/service/dss_lsnr.c +++ b/src/service/dss_lsnr.c @@ -86,7 +86,7 @@ static void cs_try_uds_accept(uds_lsnr_t *lsnr, cs_pipe_t *pipe) if (!cs_uds_create_link(sock_ready, pipe)) { continue; } - if (lsnr->status != LSNR_STATUS_RUNNING) { + if (lsnr->status != LSNR_STATUS_RUNNING && lsnr->status != LSNR_STATUS_PAUSING) { LOG_RUN_ERR("cs_try_uds_accept error :%u\n", lsnr->status); cs_uds_disconnect(&pipe->link.uds); continue; diff --git a/src/service/dss_mes.c b/src/service/dss_mes.c index 603713f8a25b1f8a328d03f7cb05fb28d9eca8ab..71fa37f412d02363b3f61253bc827ce4ad506a8b 100644 --- a/src/service/dss_mes.c +++ b/src/service/dss_mes.c @@ -28,9 +28,12 @@ #include "dss_file.h" #include "dss_service.h" #include "dss_instance.h" +#include "dss_api.h" #include "dss_mes.h" +void dss_proc_broadcast_req(dss_session_t *session, mes_message_t *msg); void dss_proc_broadcast_ack(dss_session_t *session, mes_message_t *msg); +void dss_proc_broadcast_ack2(dss_session_t *session, mes_message_t *msg); void dss_proc_syb2active_req(dss_session_t *session, mes_message_t *msg); void dss_proc_syb2active_ack(dss_session_t *session, mes_message_t *msg); void dss_proc_loaddisk_req(dss_session_t *session, mes_message_t *msg); @@ -85,26 +88,6 @@ static dss_bcast_ack_cmd_t dss_get_ack_cmd(dss_bcast_req_cmd_t bcast_op) return BCAST_ACK_END; } -static void dss_ask_server_status(dss_session_t *se, mes_message_t *msg) -{ - dss_mes_ack_with_data_t ack; - unsigned short size = sizeof(dss_mes_ack_with_data_t); - mes_init_ack_head(msg->head, &ack.head, DSS_CMD_ACK_BROADCAST_WITH_MSG, size, se->id); - uint32 id = dss_get_master_id(); - *(uint32 *)(ack.data) = id; - ack.type = BCAST_ACK_ASK_STATUS; - int ret = mes_send_data(&ack.head); - if (ret != CM_SUCCESS) { - LOG_RUN_ERR("send ask server status ack failed, src inst(%u), dst inst(%u), ret(%d) ", - (uint32)msg->head->src_inst, (uint32)msg->head->dst_inst, ret); - return; - } - DSS_LOG_DEBUG_OP( - "send ask server status ack[%u] success. cmd=%hhu, rsn=%u, src_inst=%hhu, dst_inst=%hhu, src_sid=%hu, dst_sid=%hu.", - id, msg->head->cmd, msg->head->rsn, msg->head->src_inst, msg->head->dst_inst, msg->head->src_sid, - msg->head->dst_sid); -} - static void dss_check_file_open(dss_session_t *se, mes_message_t *msg) { bool32 is_open = CM_FALSE; @@ -156,23 +139,11 @@ static void dss_check_file_open(dss_session_t *se, mes_message_t *msg) DSS_FREE_POINT(send_msg); } -static bool32 dss_mes_check_masterid(uint32 instid) -{ - if (instid > CM_MAX_INSTANCES) { - return CM_FALSE; - } - - return dss_check_inst_workstatus(instid); -} - -int32 dss_process_broadcast_ack( - dss_session_t *session, char *data, unsigned int len, dss_recv_msg_t *recv_msg_output) +int32 dss_process_broadcast_ack(dss_session_t *session, char *data, unsigned int len, dss_recv_msg_t *recv_msg_output) { int32 ret = ERR_DSS_MES_ILL; dss_bcast_ack_cmd_t bcast_op = *(dss_bcast_ack_cmd_t *)data; - uint32 id = 0; - switch (bcast_op) { case BCAST_ACK_RENAME: case BCAST_ACK_DEL_FILE: @@ -180,16 +151,6 @@ int32 dss_process_broadcast_ack( ret = *(int32 *)(data + sizeof(dss_bcast_ack_cmd_t)); recv_msg_output->open_flag = *(bool32 *)(data + sizeof(dss_bcast_ack_cmd_t) + sizeof(int32)); break; - case BCAST_ACK_ASK_STATUS: - id = *(uint32 *)(data + sizeof(dss_bcast_ack_cmd_t)); - ret = DSS_SUCCESS; - if (dss_mes_check_masterid(id) == CM_TRUE) { - dss_set_master_id(id); - DSS_LOG_DEBUG_OP("Get master instance id success, master instance id(%u)", id); - } else { - LOG_RUN_ERR("Get master instance id error, master instance id(%u)", id); - } - break; default: LOG_DEBUG_ERR("invalid broadcast ack type"); break; @@ -209,9 +170,6 @@ void dss_proc_broadcast_req(dss_session_t *session, mes_message_t *msg) case BCAST_REQ_TRUNCATE_FILE: dss_check_file_open(session, msg); break; - case BCAST_REQ_ASK_STATUS: - dss_ask_server_status(session, msg); - break; default: LOG_DEBUG_ERR("invalid broadcast req type"); break; @@ -340,6 +298,24 @@ static status_t dss_broadcast_msg( return dss_broadcast_msg_with_try(session, &head, buffer, recv_msg, timeout); } +static bool32 dss_check_srv_status(mes_message_t *msg) +{ + date_t time_start = g_timer()->now; + date_t time_now = 0; + mes_message_head_t head = *(msg->head); + while (g_dss_instance.status != ZFS_STATUS_OPEN) { + LOG_DEBUG_INF( + "Could not exec remote req for the dssserver is not open, src node(%u).", (uint32)(head.src_inst)); + DSS_GET_CM_LOCK_LONG_SLEEP; + time_now = g_timer()->now; + if (time_now - time_start > DSS_MAX_FAIL_TIME_WITH_CM * MICROSECS_PER_SECOND) { + LOG_RUN_ERR("Fail to change status open for %d seconds when exec remote req.", DSS_MAX_FAIL_TIME_WITH_CM); + return CM_FALSE; + } + } + return CM_TRUE; +} + static void dss_process_message(uint32 work_idx, mes_message_t *msg) { dss_config_t *inst_cfg = dss_get_inst_cfg(); @@ -349,14 +325,19 @@ static void dss_process_message(uint32 work_idx, mes_message_t *msg) cm_panic(0); } if (msg->head->cmd >= DSS_CMD_CEIL) { - LOG_DEBUG_ERR("Invalid request received,cmd is %c.", msg->head->cmd); + LOG_DEBUG_ERR("Invalid request received,cmd is %u.", (uint8)msg->head->cmd); return; } // ready the ack connection dss_check_peer_by_inst(&g_dss_instance, msg->head->src_inst); + LOG_DEBUG_INF("dss process message, cmd is %u.", (uint8)msg->head->cmd); dss_processor_t *processor = &g_dss_processors[msg->head->cmd]; dss_session_ctrl_t *session_ctrl = dss_get_session_ctrl(); dss_session_t *session = &session_ctrl->sessions[work_idx]; + if (dss_check_srv_status(msg) != CM_TRUE) { + mes_release_message_buf(msg); + return; + } processor->proc(session, msg); } @@ -570,7 +551,7 @@ void dss_check_mes_conn(uint64 cur_inst_map) dss_set_inst_work_status(cur_inst_map); } -status_t dss_exec_sync(dss_session_t *session, uint32 remoteid, uint32 currtid) +status_t dss_exec_sync(dss_session_t *session, uint32 remoteid, uint32 currtid, status_t *remote_result) { status_t ret = CM_ERROR; mes_message_head_t head; @@ -591,7 +572,7 @@ status_t dss_exec_sync(dss_session_t *session, uint32 remoteid, uint32 currtid) LOG_RUN_ERR("dss server receive msg from remote node failed, src node(%u), dst node(%u).", currtid, remoteid)); // 4. attach remote execution result uint16 cpsize = msg.head->size - (sizeof(mes_message_head_t) + sizeof(int32)); - ret = *(int32 *)(msg.buffer + sizeof(mes_message_head_t)); + *remote_result = *(int32 *)(msg.buffer + sizeof(mes_message_head_t)); if (cpsize) { session->send_info.str = dss_init_sendinfo_buf(session->recv_pack.init_buf); session->send_info.len = 0; @@ -604,16 +585,6 @@ status_t dss_exec_sync(dss_session_t *session, uint32 remoteid, uint32 currtid) return ret; } -status_t dss_polling_master_id(dss_session_t *session) -{ - dss_bcast_req_t req; - dss_recv_msg_t recv_msg = {CM_TRUE, CM_FALSE}; - errno_t errcode = memset_s(&req, sizeof(dss_bcast_req_t), 0, sizeof(dss_bcast_req_t)); - securec_check_panic(errcode); - req.type = BCAST_REQ_ASK_STATUS; - return dss_broadcast_msg(session, (void *)&req, sizeof(dss_bcast_req_t), &recv_msg, DSS_MES_WAIT_TIMEOUT); -} - void dss_proc_syb2active_req(dss_session_t *session, mes_message_t *msg) { uint32 size = msg->head->size - sizeof(mes_message_head_t); @@ -621,9 +592,11 @@ void dss_proc_syb2active_req(dss_session_t *session, mes_message_t *msg) uint32 srcid = (uint32)(head.src_inst); uint32 dstid = (uint32)(head.dst_inst); if (size > DSS_MAX_PACKET_SIZE) { - LOG_DEBUG_ERR("The dss server receive msg from remote failed, src node(%u), dst node(%u).", srcid, dstid); + LOG_DEBUG_ERR("The dss server receive msg from remote failed, src node(%u), dst node(%u), size is %u.", srcid, + dstid, size); return; } + LOG_DEBUG_INF("The dss server receive msg from remote node, src node(%u), dst node(%u).", srcid, dstid); dss_init_packet(&session->recv_pack, CM_FALSE); dss_init_packet(&session->send_pack, CM_FALSE); errno_t errcode = memcpy_s(session->recv_pack.buf, size, (msg->buffer + sizeof(mes_message_head_t)), size); @@ -647,7 +620,7 @@ void dss_proc_syb2active_req(dss_session_t *session, mes_message_t *msg) (uint32)(head.src_inst), (uint32)(head.dst_inst)); } -status_t dss_send2standy( +status_t dss_send2standby( dss_session_t *session, mes_message_head_t *reqhead, big_packets_ctrl_t *ctrl, const char *buf, uint16 size) { mes_message_head_t ack; @@ -655,12 +628,12 @@ status_t dss_send2standy( mes_init_ack_head(reqhead, &ack, DSS_CMD_ACK_LOAD_DISK, ack.size, session->id); status_t ret = mes_send_data4(&ack, sizeof(mes_message_head_t), ctrl, sizeof(big_packets_ctrl_t), buf, size); if (ret != CM_SUCCESS) { - LOG_RUN_ERR("The dssserver fils to send messages to th remote node, src node(%u), dst node(%u).", + LOG_RUN_ERR("The dssserver fails to send messages to the remote node, src node(%u), dst node(%u).", (uint32)(reqhead->src_inst), (uint32)(reqhead->dst_inst)); return ret; } - LOG_DEBUG_INF("The dssserver send messages to th remote node success, src node(%u), dst node(%u).", + LOG_DEBUG_INF("The dssserver send messages to the remote node success, src node(%u), dst node(%u).", (uint32)(reqhead->src_inst), (uint32)(reqhead->dst_inst)); return ret; } @@ -709,7 +682,7 @@ status_t dss_batch_load(dss_session_t *session, dss_loaddisk_req_t *req, mes_mes ctrl.cursize = (uint32)readsize; ctrl.endflag = (remain == 0) ? CM_TRUE : CM_FALSE; - if (dss_send2standy(session, reqhead, &ctrl, readbuff, (uint16)readsize) != CM_SUCCESS) { + if (dss_send2standby(session, reqhead, &ctrl, readbuff, (uint16)readsize) != CM_SUCCESS) { LOG_RUN_ERR("read volume for standby send msg failed, vg name[%s], volume id[%u].", req->vg_name, req->volumeid); dss_loaddisk_unlock(req->vg_name); return CM_ERROR; @@ -760,9 +733,14 @@ void dss_proc_loaddisk_req(dss_session_t *session, mes_message_t *msg) dss_loaddisk_req_t req = *(dss_loaddisk_req_t *)(msg->buffer + sizeof(mes_message_head_t)); LOG_DEBUG_INF("Exec load disk req, src node(%u), volume id:%u, offset:%llu, size:%u.", (uint32)(head.src_inst), req.volumeid, req.offset, req.size); + if (dss_check_srv_status(msg) != CM_TRUE) { + dss_send_diskload_err_ack(session, &head, ret); + mes_release_message_buf(msg); + return; + } ret = dss_batch_load(session, &req, &head); if (ret != CM_SUCCESS) { - LOG_DEBUG_INF("Exec load disk req failed, src node(%u), volume id:%u, offset:%llu, size:%u.", (uint32)(head.src_inst), + LOG_RUN_ERR("Exec load disk req failed, src node(%u), volume id:%u, offset:%llu, size:%u.", (uint32)(head.src_inst), req.volumeid, req.offset, req.size); dss_send_diskload_err_ack(session, &head, ret); } @@ -777,25 +755,11 @@ static status_t dss_init_readvlm_remote_params( securec_check_ret(errcode); errcode = memcpy_s(req->vg_name, DSS_MAX_NAME_LEN, entry, DSS_MAX_NAME_LEN); securec_check_ret(errcode); - - if (dss_get_exec_nodeid(session, currid, remoteid) != CM_SUCCESS) { - LOG_RUN_ERR("read volume from active node get eec node id failed."); - if (*remoteid == DSS_INVALID_ID32) { - cm_reset_error(); - DSS_THROW_ERROR(ERR_DSS_GET_MASTER_ID); - dss_set_master_id(DSS_INVALID_ID32); - } - return CM_ERROR; - } - + dss_get_exec_nodeid(session, currid, remoteid); if (*currid == *remoteid) { - cm_reset_error(); - DSS_THROW_ERROR(ERR_DSS_GET_MASTER_ID); - dss_set_master_id(DSS_INVALID_ID32); + LOG_DEBUG_ERR("read from current node %u no need to send message.", *currid); return CM_ERROR; } - - return CM_SUCCESS; } @@ -817,13 +781,13 @@ static bool32 dss_packets_verify(bool32 bfirst, big_packets_ctrl_t *lastctrl, bi } if (ctrl->cursize > ctrl->totalsize) { - LOG_RUN_ERR( - "msg verfy failed, cursize error, cursize(%u) totalsize(%u).", (uint32)(ctrl->cursize), (uint32)(ctrl->totalsize)); + LOG_RUN_ERR("msg verfy failed, cursize error, cursize(%u) totalsize(%u).", + (uint32)(ctrl->cursize), (uint32)(ctrl->totalsize)); return CM_FALSE; } if ((lastctrl->offset + lastctrl->cursize) != ctrl->offset) { - LOG_RUN_ERR("msg verfy failed, offset error, last cursize(%u) last offset(%u) cur offset(%u).", + LOG_RUN_ERR("msg verfy failed, offset error, last cursize(%u) last offset(%u) cur offset(%u).", lastctrl->cursize, lastctrl->offset, ctrl->offset); return CM_FALSE; } @@ -873,6 +837,7 @@ static status_t dss_rec_msgs(dss_session_t *session, void *buf, int32 size) ctrl = *(big_packets_ctrl_t *)(msg.buffer + sizeof(mes_message_head_t)); if (dss_packets_verify(bfirst, &lastctrl, &ctrl) == CM_FALSE) { mes_release_message_buf(&msg); + LOG_RUN_ERR("dss server receive msg verify failed."); return CM_ERROR; } @@ -902,9 +867,9 @@ status_t dss_read_volume_remote(const char *vg_name, dss_volume_t *volume, int64 } ret = dss_init_readvlm_remote_params(&req, vg_name, &currid, &remoteid, session); - if (ret != CM_SUCCESS) { + if (ret != CM_SUCCESS || currid == remoteid) { dss_destroy_session(session); - return ret; + return CM_ERROR; } LOG_DEBUG_INF( @@ -920,7 +885,7 @@ status_t dss_read_volume_remote(const char *vg_name, dss_volume_t *volume, int64 ret = mes_send_data2(&head, &req); if (ret != CM_SUCCESS) { LOG_RUN_ERR( - "The dssserver fails to send msssages to the remote node, src node (%u) dst node(%u).", currid, remoteid); + "The dssserver fails to send messages to the remote node, src node (%u), dst node(%u).", currid, remoteid); dss_destroy_session(session); dss_set_master_id(DSS_INVALID_ID32); return ret; @@ -930,7 +895,7 @@ status_t dss_read_volume_remote(const char *vg_name, dss_volume_t *volume, int64 dss_destroy_session(session); if (ret != CM_SUCCESS) { LOG_RUN_ERR( - "The dssserver receive msssages from remote node failed, src node (%u) dst node(%u).", currid, remoteid); + "The dssserver receive messages from remote node failed, src node (%u) dst node(%u).", currid, remoteid); return ret; } diff --git a/src/service/dss_mes.h b/src/service/dss_mes.h index 7c715f7b1d19f2b69a7b7a7402571cf5fbff1d31..cd6ec248eb12f9bae9c5124a50d130e04a47e499 100644 --- a/src/service/dss_mes.h +++ b/src/service/dss_mes.h @@ -93,7 +93,6 @@ typedef enum st_dss_bcast_req_cmd { BCAST_REQ_RENAME = 0, BCAST_REQ_DEL_DIR_FILE, BCAST_REQ_TRUNCATE_FILE, - BCAST_REQ_ASK_STATUS, BCAST_REQ_END } dss_bcast_req_cmd_t; @@ -101,7 +100,6 @@ typedef enum st_dss_bcast_ack_cmd { BCAST_ACK_RENAME = 0, BCAST_ACK_DEL_FILE, BCAST_ACK_TRUNCATE_FILE, - BCAST_ACK_ASK_STATUS, BCAST_ACK_END } dss_bcast_ack_cmd_t; @@ -155,17 +153,16 @@ typedef struct st_loaddisk_req { status_t dss_notify_sync( dss_session_t *session, dss_bcast_req_cmd_t cmd, const char *buffer, uint32 size, dss_recv_msg_t *recv_msg); -status_t dss_polling_master_id(dss_session_t *session); -status_t dss_exec_sync(dss_session_t *session, uint32 remoteid, uint32 currtid); +status_t dss_exec_sync(dss_session_t *session, uint32 remoteid, uint32 currtid, status_t *remote_result); void dss_check_mes_conn(uint64 cur_inst_map); status_t dss_startup_mes(void); void dss_stop_mes(void); int32 dss_process_broadcast_ack( dss_session_t *session, char *data, unsigned int len, dss_recv_msg_t *recv_msg_output); -void dss_proc_broadcast_req(dss_session_t *session, mes_message_t *receive_msg); +void dss_proc_broadcast_req(dss_session_t *session, mes_message_t *msg); void dss_proc_broadcast_ack2(dss_session_t *session, mes_message_t *msg); status_t dss_read_volume_remote(const char *vg_name, dss_volume_t *volume, int64 offset, void *buf, int32 size); -status_t dss_send2standy( +status_t dss_send2standby( dss_session_t *session, mes_message_head_t *reqhead, big_packets_ctrl_t *ctrl, const char *buf, uint16 size); status_t dss_batch_load(dss_session_t *session, dss_loaddisk_req_t *req, mes_message_head_t *reqhead); status_t dss_notify_online(dss_session_t *session); diff --git a/src/service/dss_service.c b/src/service/dss_service.c index f540598000acf765df34cc13fe600e6828f6251d..d11bff1dcb163a9f966e915ff230d3b266b88544 100644 --- a/src/service/dss_service.c +++ b/src/service/dss_service.c @@ -33,57 +33,60 @@ #include "dss_api.h" #include "dss_service.h" +#ifdef __cplusplus +extern "C" { +#endif + static inline bool32 dss_need_exec_remote(bool32 exec_on_active, bool32 local_req) { - return ((dss_is_readonly() == CM_TRUE) && (exec_on_active) && (local_req == CM_TRUE)); + dss_config_t *cfg = dss_get_inst_cfg(); + uint32 master_id = dss_get_master_id(); + uint32 curr_id = (uint32)(cfg->params.inst_id); + return ((curr_id != master_id) && (exec_on_active) && (local_req == CM_TRUE)); } -status_t dss_get_exec_nodeid(dss_session_t *session, uint32 *srcid, uint32 *dstid) +#define DSS_PROCESS_GET_MASTER_ID 50 +void dss_get_exec_nodeid(dss_session_t *session, uint32 *currid, uint32 *remoteid) { dss_config_t *cfg = dss_get_inst_cfg(); - uint32 remoteid = dss_get_master_id(); - uint32 currid = (uint32)(cfg->params.inst_id); - status_t ret = CM_ERROR; - if (remoteid == DSS_INVALID_ID32) { - ret = dss_polling_master_id(session); - if (ret != CM_SUCCESS) { - LOG_RUN_ERR("dss server polling master dss server id failed, current dss node(%u).", currid); - return ret; - } - remoteid = dss_get_master_id(); - if (remoteid == DSS_INVALID_ID32) { - LOG_RUN_ERR("dss server polling master dss server id error."); - return CM_ERROR; - } + *currid = (uint32)(cfg->params.inst_id); + *remoteid = dss_get_master_id(); + while (*remoteid == DSS_INVALID_ID32) { + cm_sleep(DSS_PROCESS_GET_MASTER_ID); + *remoteid = dss_get_master_id(); } - - *dstid = remoteid; - *srcid = currid; - return CM_SUCCESS; + LOG_DEBUG_INF("Start processing remote requests(%d), remote node(%u),current node(%u).", + (session->recv_pack.head == NULL) ? -1 : session->recv_pack.head->cmd, *remoteid, *currid); + return; } +#define DSS_PROCESS_REMOTE_INTERVAL 50 static status_t dss_process_remote(dss_session_t *session) { uint32 remoteid = DSS_INVALID_ID32; uint32 currid = DSS_INVALID_ID32; status_t ret = CM_ERROR; - - if (dss_get_exec_nodeid(session, &currid, &remoteid)) { - return CM_ERROR; - } + uint32 i = 0; + dss_get_exec_nodeid(session, &currid, &remoteid); LOG_DEBUG_INF("Start processing remote requests(%d), remote node(%u),current node(%u).", session->recv_pack.head->cmd, remoteid, currid); - ret = dss_exec_sync(session, remoteid, currid); - if (ret != CM_SUCCESS) { - LOG_DEBUG_ERR( - "End of processing the remote request(%d) failed, remote node(%u),current node(%u), result code(%d).", - session->recv_pack.head->cmd, remoteid, currid, ret); - return ret; + status_t remote_result = CM_ERROR; + while (CM_TRUE) { + ret = dss_exec_sync(session, remoteid, currid, &remote_result); + if (ret != CM_SUCCESS) { + LOG_DEBUG_ERR( + "End of processing the remote request(%d) failed, remote node(%u),current node(%u), result code(%d).", + session->recv_pack.head->cmd, remoteid, currid, ret); + cm_sleep(DSS_PROCESS_REMOTE_INTERVAL); + dss_get_exec_nodeid(session, &currid, &remoteid); + continue; + } + break; } LOG_DEBUG_INF("The remote request(%d) is processed successfully, remote node(%u),current node(%u).", session->recv_pack.head->cmd, remoteid, currid); - return ret; + return remote_result; } static status_t dss_diag_proto_type(dss_session_t *session) @@ -125,6 +128,7 @@ static void dss_clean_open_files(dss_session_t *session) LOG_RUN_INF("Clean open files for pid:%llu.", session->cli_info.cli_pid); } +#define DSS_SESSION_PAUSED_WAIT 50 void dss_session_entry(thread_t *thread) { dss_session_t *session = (dss_session_t *)thread->argument; @@ -134,7 +138,6 @@ void dss_session_entry(thread_t *thread) dss_init_packet(&session->send_pack, CM_FALSE); cm_set_thread_name("DSS_SERVER"); - session->pipe.socket_timeout = (int32)CM_SOCKET_TIMEOUT; /* fetch protocol type */ @@ -144,14 +147,24 @@ void dss_session_entry(thread_t *thread) LOG_RUN_ERR("Failed to get protocol type!"); return; } + session->status = DSS_THREAD_STATUS_RUNNING; session->curr_lsn = cm_get_curr_lsn(); (void)cm_atomic_inc(&g_dss_instance.thread_cnt); while (!thread->closed) { + if (session->status == DSS_THREAD_STATUS_PAUSED) { + cm_sleep(DSS_SESSION_PAUSED_WAIT); + continue; + } /* process request command */ if ((dss_process_command(session) != CM_SUCCESS) && (session->is_closed == CM_TRUE)) { break; } + if (session->status == DSS_THREAD_STATUS_PAUSING) { + session->status = DSS_THREAD_STATUS_PAUSED; + LOG_DEBUG_INF("Set Session:%u paused.", session->id); + } } + session->status = DSS_THREAD_STATUS_IDLE; LOG_RUN_INF("Session:%u end to do service.", session->id); session->is_closed = CM_TRUE; @@ -185,7 +198,7 @@ static void dss_return_error(dss_session_t *session) (void)dss_put_str_with_cutoff(send_pack, message); status_t status = dss_write(&session->pipe, send_pack); if (status != CM_SUCCESS) { - LOG_DEBUG_ERR("Failed to reply,size:%u.", send_pack->head->size); + LOG_DEBUG_ERR("Failed to reply,size:%u, cmd:%u.", send_pack->head->size, send_pack->head->cmd); } cm_reset_error(); } @@ -209,7 +222,7 @@ static void dss_return_success(dss_session_t *session) } status = dss_write(&session->pipe, send_pack); if (status != CM_SUCCESS) { - LOG_DEBUG_ERR("Failed to reply message,size:%u.", send_pack->head->size); + LOG_DEBUG_ERR("Failed to reply message,size:%u, cmd:%u.", send_pack->head->size, send_pack->head->cmd); } } @@ -246,9 +259,7 @@ static status_t dss_process_rmdir(dss_session_t *session) dss_init_get(&session->recv_pack); DSS_RETURN_IF_ERROR(dss_get_str(&session->recv_pack, &dir)); DSS_RETURN_IF_ERROR(dss_get_int32(&session->recv_pack, &recursive)); - int32 ret = - snprintf_s(session->audit_info.resource, DSS_FILE_PATH_MAX_LENGTH, DSS_FILE_PATH_MAX_LENGTH - 1, "%s", dir); - DSS_SECUREC_SS_RETURN_IF_ERROR(ret, CM_ERROR); + DSS_RETURN_IF_ERROR(dss_set_audit_resource(session->audit_info.resource, DSS_AUDIT_MODIFY, "%s", dir)); return dss_remove_dir(session, (const char *)dir, (bool)recursive); } @@ -729,24 +740,65 @@ static status_t dss_process_get_ftid_by_path(dss_session_t *session) return CM_SUCCESS; } -static status_t dss_process_set_status(dss_session_t *session) +// get dssserver status:open, recovery or switch +static status_t dss_process_get_inst_status(dss_session_t *session) { - int32 dss_status; - dss_init_get(&session->recv_pack); - DSS_RETURN_IF_ERROR(dss_get_int32(&session->recv_pack, &dss_status)); - DSS_RETURN_IF_ERROR(dss_set_audit_resource(session->audit_info.resource, DSS_AUDIT_MODIFY, "%d", dss_status)); - LOG_DEBUG_INF("dss server current status(%d), set status(%d).", dss_get_server_status_flag(), dss_status); - bool32 need_refresh = (dss_status == DSS_STATUS_READWRITE) && (!dss_is_readwrite()); - dss_set_server_status_flag(dss_status); - if (need_refresh == CM_TRUE) { - status_t status = dss_refresh_meta_info(session); - DSS_RETURN_IFERR2( - status, LOG_DEBUG_ERR("dss server set status(%d) refresh meta fialed, result(%d).", dss_status, status)); - } - LOG_RUN_INF("Dss set server status %d.", dss_status); + dss_instance_status_t status = g_dss_instance.status; + session->send_info.str = dss_init_sendinfo_buf(session->recv_pack.init_buf); + *(uint32 *)session->send_info.str = (uint32)status; + session->send_info.len = sizeof(uint32); + DSS_RETURN_IF_ERROR( + dss_set_audit_resource(session->audit_info.resource, DSS_AUDIT_MODIFY, "status:%u", (uint32)status)); + DSS_LOG_DEBUG_OP("Server status is %u.", (uint32)status); return CM_SUCCESS; } +static void dss_wait_session_pause_inner(dss_session_t *session) +{ + if (session != NULL && session->status == DSS_THREAD_STATUS_RUNNING) { + session->status = DSS_THREAD_STATUS_PAUSING; + LOG_DEBUG_INF("Succeed to pause session: %d.", session->id); + while (session->status != DSS_THREAD_STATUS_PAUSED && !session->is_closed) { + cm_sleep(1); + } + } +} + +void dss_wait_session_pause(dss_instance_t *inst) +{ + uds_lsnr_t *lsnr = &inst->lsnr; + LOG_DEBUG_INF("Begin to set session paused."); + cs_pause_uds_lsnr(lsnr); + uint32 start_sid = dss_get_udssession_startid(); + uint32 end_sid = start_sid + inst->inst_cfg.params.cfg_session_num; + if (inst->threads != NULL) { + for (uint32 i = start_sid; i < end_sid; i++) { + dss_session_t *session = (dss_session_t *)inst->threads[i].argument; + dss_wait_session_pause_inner(session); + } + } + LOG_DEBUG_INF("Succeed to pause all session."); +} + +void dss_set_session_running(dss_instance_t *inst) +{ + LOG_DEBUG_INF("Begin to set session running."); + uds_lsnr_t *lsnr = &inst->lsnr; + uint32 start_sid = dss_get_udssession_startid(); + uint32 end_sid = start_sid + inst->inst_cfg.params.cfg_session_num; + if (inst->threads != NULL) { + for (uint32 i = start_sid; i < end_sid; i++) { + dss_session_t *session = (dss_session_t *)inst->threads[i].argument; + if (session != NULL && session->status == DSS_THREAD_STATUS_PAUSED) { + session->status = DSS_THREAD_STATUS_RUNNING; + LOG_DEBUG_INF("Succeed to run session: %d.", session->id); + } + } + } + lsnr->status = LSNR_STATUS_RUNNING; + LOG_DEBUG_INF("Succeed to run all sessions."); +} + static status_t dss_process_setcfg(dss_session_t *session) { char *name = NULL; @@ -788,6 +840,96 @@ static status_t dss_process_stop_server(dss_session_t *session) return CM_SUCCESS; } +// process switch lock,just master id can do +static status_t dss_process_switch_lock(dss_session_t *session) +{ + uint32 master_id = dss_get_master_id(); + dss_config_t *cfg = dss_get_inst_cfg(); + uint32 curr_id = (uint32)(cfg->params.inst_id); + int32 switch_id; + dss_init_get(&session->recv_pack); + DSS_RETURN_IF_ERROR(dss_get_int32(&session->recv_pack, &switch_id)); + if ((uint32)switch_id == master_id) { + LOG_DEBUG_INF("switchid is equal to current master_id, which is %u.", master_id); + return CM_SUCCESS; + } + if (master_id != curr_id) { + LOG_DEBUG_ERR("current id is %u, just master id %u can do switch lock.", curr_id, master_id); + return CM_ERROR; + } + dss_wait_session_pause(&g_dss_instance); + g_dss_instance.status = ZFS_STATUS_SWITCH; + status_t ret = CM_SUCCESS; + // trans lock + if (g_dss_instance.cm_res.is_valid) { + dss_set_server_status_flag(DSS_STATUS_READONLY); + ret = cm_res_trans_lock(&g_dss_instance.cm_res.mgr, DSS_CM_LOCK, (uint32)switch_id); + if (ret != CM_SUCCESS) { + LOG_DEBUG_ERR("cm do switch lock failed from %u to %u.", curr_id, master_id); + return ret; + } + dss_set_master_id((uint32)switch_id); + dss_set_session_running(&g_dss_instance); + g_dss_instance.status = ZFS_STATUS_OPEN; + } else { + dss_set_session_running(&g_dss_instance); + g_dss_instance.status = ZFS_STATUS_OPEN; + LOG_DEBUG_ERR("Only with cm can switch lock."); + return CM_ERROR; + } + DSS_LOG_DEBUG_OP("Main server switch lock from %u to %u successfully.", curr_id, (uint32)switch_id); + return CM_SUCCESS; +} +/* + 1 curr_id == master_id, just return success; + 2 curr_id != master_id, just send message to master_id to do switch lock + then master_id to do: + (1) set status switch + (2) lsnr pause + (3) trans lock +*/ +static status_t dss_process_set_main_inst(dss_session_t *session) +{ + uint32 master_id = dss_get_master_id(); + dss_config_t *cfg = dss_get_inst_cfg(); + uint32 curr_id = (uint32)(cfg->params.inst_id); + status_t status = CM_ERROR; + DSS_RETURN_IF_ERROR(dss_set_audit_resource( + session->audit_info.resource, DSS_AUDIT_MODIFY, "set %u as master", curr_id)); + cm_spin_lock(&g_dss_instance.switch_lock, NULL); + if (master_id == curr_id) { + DSS_LOG_DEBUG_OP("Main server %u from %u set successfully.", curr_id, master_id); + cm_spin_unlock(&g_dss_instance.switch_lock); + return CM_SUCCESS; + } + while (CM_TRUE) { + dss_init_get(&session->recv_pack); + session->recv_pack.head->cmd = DSS_CMD_SWITCH_LOCK; + LOG_DEBUG_INF("Try to switch lock to %u by %u.", curr_id, master_id); + (void)dss_put_int32(&session->recv_pack, curr_id); + status = dss_process_remote(session); + if (status == CM_SUCCESS) { + break; + } else { + LOG_DEBUG_ERR("Failed to switch lock to %u by %u.", curr_id, master_id); + cm_sleep(DSS_PROCESS_REMOTE_INTERVAL); + } + } + g_dss_instance.status = ZFS_STATUS_SWITCH; + dss_set_master_id(curr_id); + dss_set_server_status_flag(DSS_STATUS_READWRITE); + status = dss_refresh_meta_info(session); + if (status != CM_SUCCESS) { + cm_spin_unlock(&g_dss_instance.switch_lock); + LOG_DEBUG_ERR("dss instance %u refresh meta failed, result(%d).", curr_id, status); + return CM_ERROR; + } + g_dss_instance.status = ZFS_STATUS_OPEN; + DSS_LOG_DEBUG_OP("Main server %u from %u set successfully.", curr_id, master_id); + cm_spin_unlock(&g_dss_instance.switch_lock); + return CM_SUCCESS; +} + // clang-format off static dss_cmd_hdl_t g_dss_cmd_handle[] = { // modify @@ -816,9 +958,10 @@ static dss_cmd_hdl_t g_dss_cmd_handle[] = { { DSS_CMD_UPDATE_WRITTEN_SIZE, dss_process_update_file_written_size, NULL, CM_TRUE }, { DSS_CMD_STOP_SERVER, dss_process_stop_server, NULL, CM_FALSE }, { DSS_CMD_SETCFG, dss_process_setcfg, NULL, CM_FALSE }, - { DSS_CMD_SET_STATUS, dss_process_set_status, NULL, CM_FALSE }, { DSS_CMD_SYMLINK, dss_process_symlink, NULL, CM_TRUE }, { DSS_CMD_UNLINK, dss_process_unlink, NULL, CM_TRUE }, + { DSS_CMD_SET_MAIN_INST, dss_process_set_main_inst, NULL, CM_FALSE }, + { DSS_CMD_SWITCH_LOCK, dss_process_switch_lock, NULL, CM_FALSE }, // query { DSS_CMD_GET_HOME, dss_process_get_home, NULL, CM_FALSE }, { DSS_CMD_EXIST_FILE, dss_process_exist, NULL, CM_FALSE }, @@ -827,6 +970,7 @@ static dss_cmd_hdl_t g_dss_cmd_handle[] = { { DSS_CMD_READLINK, dss_process_readlink, NULL, CM_FALSE }, { DSS_CMD_GET_FTID_BY_PATH, dss_process_get_ftid_by_path, NULL, CM_FALSE }, { DSS_CMD_GETCFG, dss_process_getcfg, NULL, CM_FALSE }, + { DSS_CMD_GET_INST_STATUS, dss_process_get_inst_status, NULL, CM_FALSE }, }; dss_cmd_hdl_t g_dss_remote_handle = { DSS_CMD_EXEC_REMOTE, dss_process_remote, NULL, CM_FALSE }; @@ -862,7 +1006,8 @@ static dss_cmd_hdl_t *dss_get_cmd_handle(int32 cmd, bool32 local_req) static status_t dss_exec_cmd(dss_session_t *session, bool32 local_req) { session->send_info.len = 0; - DSS_LOG_DEBUG_OP("Receive command:%d.", session->recv_pack.head->cmd); + DSS_LOG_DEBUG_OP( + "Receive command:%d, server status is %d.", session->recv_pack.head->cmd, (int32)g_dss_instance.status); dss_cmd_hdl_t *handle = NULL; if (session->recv_pack.head->cmd < DSS_CMD_END) { @@ -880,13 +1025,14 @@ static status_t dss_exec_cmd(dss_session_t *session, bool32 local_req) return status; } +#define DSS_WAIT_TIMEOUT 5 status_t dss_process_command(dss_session_t *session) { status_t status = CM_SUCCESS; bool32 ready = CM_FALSE; cm_reset_error(); - if (cs_wait(&session->pipe, CS_WAIT_FOR_READ, session->pipe.socket_timeout, &ready) != CM_SUCCESS) { + if (cs_wait(&session->pipe, CS_WAIT_FOR_READ, DSS_WAIT_TIMEOUT, &ready) != CM_SUCCESS) { session->is_closed = CM_TRUE; return CM_ERROR; } @@ -901,7 +1047,30 @@ status_t dss_process_command(dss_session_t *session) session->is_closed = CM_TRUE; return CM_ERROR; } - + date_t time_start = g_timer()->now; + date_t time_now = 0; + while (g_dss_instance.status != ZFS_STATUS_OPEN) { + if (dss_can_cmd_type_no_open(session->recv_pack.head->cmd)) { + status = dss_exec_cmd(session, CM_TRUE); + if (status != CM_SUCCESS) { + LOG_DEBUG_ERR("Failed to execute command:%d.", session->recv_pack.head->cmd); + dss_return_error(session); + return CM_ERROR; + } else { + dss_return_success(session); + return CM_SUCCESS; + } + } + DSS_GET_CM_LOCK_LONG_SLEEP; + LOG_RUN_INF("The status %d of instance %lld is not open, just wait.\n", (int32)g_dss_instance.status, + dss_get_inst_cfg()->params.inst_id); + time_now = g_timer()->now; + if (time_now - time_start > DSS_MAX_FAIL_TIME_WITH_CM * MICROSECS_PER_SECOND) { + LOG_RUN_ERR("[DSS] ABORT INFO: Fail to change status open for %d seconds, exit.", DSS_MAX_FAIL_TIME_WITH_CM); + cm_fync_logfile(); + _exit(1); + } + } status = dss_exec_cmd(session, CM_TRUE); if (status != CM_SUCCESS) { LOG_DEBUG_ERR("Failed to execute command:%d.", session->recv_pack.head->cmd); @@ -910,13 +1079,12 @@ status_t dss_process_command(dss_session_t *session) } else { dss_return_success(session); } - return CM_SUCCESS; } status_t dss_proc_standby_req(dss_session_t *session) { - if (dss_is_readonly() == CM_TRUE) { + if (dss_is_readonly() == CM_TRUE && !dss_need_exec_local()) { dss_config_t *cfg = dss_get_inst_cfg(); uint32 id = (uint32)(cfg->params.inst_id); LOG_RUN_ERR("The local node(%u) is in readonly state and cannot execute remote requests.", id); @@ -925,3 +1093,7 @@ status_t dss_proc_standby_req(dss_session_t *session) return dss_exec_cmd(session, CM_FALSE); } + +#ifdef __cplusplus +} +#endif diff --git a/src/service/dss_service.h b/src/service/dss_service.h index fc8fb3721723d23e0e932c6fd9341491641b353a..208199b7a7006b94036c8bcc7cf31a8d849e190c 100644 --- a/src/service/dss_service.h +++ b/src/service/dss_service.h @@ -26,6 +26,7 @@ #define __DSS_SERVICE_H__ #include "dss_latch.h" #include "dss_session.h" +#include "dss_instance.h" #ifdef __cplusplus extern "C" { @@ -45,10 +46,12 @@ status_t dss_signal_proc(void); status_t dss_signal_proc_with_graceful_exit(void); #endif +void dss_get_exec_nodeid(dss_session_t *session, uint32 *currid, uint32 *remoteid); +void dss_wait_session_pause(dss_instance_t *inst); +void dss_set_session_running(dss_instance_t *inst); status_t dss_process_command(dss_session_t *session); void dss_session_entry(thread_t *thread); status_t dss_proc_standby_req(dss_session_t *session); -status_t dss_get_exec_nodeid(dss_session_t *session, uint32 *currid, uint32 *remoteid); #ifdef __cplusplus } diff --git a/src/service/dssserver.c b/src/service/dssserver.c index 2cf8246e1b84a5d87595c6bcbe6e418c40a73420..17e3bc4b2c4954ef5090ac7d05d0a6c06a1952dc 100644 --- a/src/service/dssserver.c +++ b/src/service/dssserver.c @@ -114,6 +114,9 @@ static void handle_main_wait(void) break; } dss_check_peer_inst(&g_dss_instance, DSS_INVALID_64); + if (g_dss_instance.cm_res.is_valid) { + dss_get_cm_lock_and_recover(&g_dss_instance); + } if (periods == MILLISECS_PER_SECOND * SECONDS_PER_DAY / interval) { periods = 0; dss_ssl_ca_cert_expire();