55 #include <sys/types.h>
57 #include <sys/socket.h>
60 #include <sys/ioctl.h>
61 #include <sys/param.h>
62 #include <netinet/in.h>
63 #include <arpa/inet.h>
76 #include <qb/qblist.h>
77 #include <qb/qbdefs.h>
78 #include <qb/qbutil.h>
79 #include <qb/qbloop.h>
84 #define LOGSYS_UTILS_ONLY 1
92 #define LOCALHOST_IP inet_addr("127.0.0.1")
93 #define QUEUE_RTR_ITEMS_SIZE_MAX 16384
94 #define RETRANS_MESSAGE_QUEUE_SIZE_MAX 16384
95 #define RECEIVED_MESSAGE_QUEUE_SIZE_MAX 500
97 #define RETRANSMIT_ENTRIES_MAX 30
98 #define TOKEN_SIZE_MAX 64000
99 #define LEAVE_DUMMY_NODEID 0
118 #define SEQNO_START_MSG 0x0
119 #define SEQNO_START_TOKEN 0x0
141 #define ENDIAN_LOCAL 0xff22
167 struct qb_list_head
list;
438 const char *
function,
441 const char *format, ...)
__attribute__((format(printf, 6, 7)));;
454 unsigned int msg_len,
455 int endian_conversion_required);
459 const unsigned int *member_list,
size_t member_list_entries,
460 const unsigned int *left_list,
size_t left_list_entries,
461 const unsigned int *joined_list,
size_t joined_list_entries,
536 int endian_conversion_needed);
581 static int message_handler_orf_token (
585 int endian_conversion_needed);
587 static int message_handler_mcast (
591 int endian_conversion_needed);
593 static int message_handler_memb_merge_detect (
597 int endian_conversion_needed);
599 static int message_handler_memb_join (
603 int endian_conversion_needed);
605 static int message_handler_memb_commit_token (
609 int endian_conversion_needed);
611 static int message_handler_token_hold_cancel (
615 int endian_conversion_needed);
619 static void srp_addr_copy (
struct srp_addr *dest,
const struct srp_addr *src);
621 static void srp_addr_to_nodeid (
623 unsigned int *nodeid_out,
625 unsigned int entries);
627 static int srp_addr_equal (
const struct srp_addr *a,
const struct srp_addr *b);
633 static void messages_deliver_to_app (
struct totemsrp_instance *instance,
int skip,
unsigned int end_point);
635 int fcc_mcasts_allowed);
636 static void messages_free (
struct totemsrp_instance *instance,
unsigned int token_aru);
640 static void target_set_completed (
void *context);
642 static void memb_state_commit_token_target_set (
struct totemsrp_instance *instance);
647 static void orf_token_endian_convert (
const struct orf_token *in,
struct orf_token *out);
649 static void memb_join_endian_convert (
const struct memb_join *in,
struct memb_join *out);
650 static void mcast_endian_convert (
const struct mcast *in,
struct mcast *out);
651 static void memb_merge_detect_endian_convert (
654 static void srp_addr_copy_endian_convert (
struct srp_addr *out,
const struct srp_addr *in);
655 static void timer_function_orf_token_timeout (
void *data);
656 static void timer_function_orf_token_warning (
void *data);
657 static void timer_function_pause_timeout (
void *data);
658 static void timer_function_heartbeat_timeout (
void *data);
659 static void timer_function_token_retransmit_timeout (
void *data);
660 static void timer_function_token_hold_retransmit_timeout (
void *data);
661 static void timer_function_merge_detect_timeout (
void *data);
663 static void totemsrp_buffer_release (
struct totemsrp_instance *instance,
void *ptr);
669 unsigned int msg_len,
675 unsigned int iface_no);
680 message_handler_orf_token,
681 message_handler_mcast,
682 message_handler_memb_merge_detect,
683 message_handler_memb_join,
684 message_handler_memb_commit_token,
685 message_handler_token_hold_cancel
689 #define log_printf(level, format, args...) \
691 instance->totemsrp_log_printf ( \
692 level, instance->totemsrp_subsys_id, \
693 __FUNCTION__, __FILE__, __LINE__, \
696 #define LOGSYS_PERROR(err_num, level, fmt, args...) \
698 char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \
699 const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \
700 instance->totemsrp_log_printf ( \
701 level, instance->totemsrp_subsys_id, \
702 __FUNCTION__, __FILE__, __LINE__, \
703 fmt ": %s (%d)\n", ##args, _error_ptr, err_num); \
750 uint64_t timestamp_msec;
753 now_msec = (qb_util_nano_current_get () / QB_TIME_NS_IN_MSEC);
758 "Process pause detected for %d ms, flushing membership messages.", (
unsigned int)(now_msec - timestamp_msec));
773 unsigned long long nano_secs = qb_util_nano_current_get ();
775 time_now = (nano_secs / QB_TIME_NS_IN_MSEC);
804 static void totempg_mtu_changed(
void *context,
int net_mtu)
811 "Net MTU changed to %d, new value is %d",
819 qb_loop_t *poll_handle,
827 unsigned int msg_len,
828 int endian_conversion_required),
832 const unsigned int *member_list,
size_t member_list_entries,
833 const unsigned int *left_list,
size_t left_list_entries,
834 const unsigned int *joined_list,
size_t joined_list_entries,
836 void (*waiting_trans_ack_cb_fn) (
837 int waiting_trans_ack))
843 if (instance == NULL) {
847 totemsrp_instance_initialize (instance);
885 "Token Timeout (%d ms) retransmit timeout (%d ms)",
890 "Token warning every %d ms (%d%% of Token Timeout)",
892 if (token_warning_ms < totem_config->token_retransmit_timeout)
894 "The token warning interval (%d ms) is less than the token retransmit timeout (%d ms) "
895 "which can lead to spurious token warnings. Consider increasing the token_warning parameter.",
899 "Token warnings disabled");
902 "token hold (%d ms) retransmits before loss (%d retrans)",
905 "join (%d ms) send_join (%d ms) consensus (%d ms) merge (%d ms)",
912 "downcheck (%d ms) fail to recv const (%d msgs)",
918 "window size per rotation (%d messages) maximum messages per rotation (%d messages)",
922 "missed count const (%d messages)",
950 timer_function_pause_timeout (instance);
954 "HeartBeat is Disabled. To enable set heartbeat_failures_allowed > 0");
965 "total heartbeat_timeout (%d ms) is not less than token timeout (%d ms)",
969 "heartbeat_timeout = heartbeat_failures_allowed * token_retransmit_timeout + max_network_delay");
971 "heartbeat timeout should be less than the token timeout. Heartbeat is disabled!!");
989 target_set_completed);
1011 token_event_stats_collector,
1017 token_event_stats_collector,
1019 *srp_context = instance;
1031 memb_leave_message_send (instance);
1052 unsigned int *interface_id,
1054 unsigned int interfaces_size,
1056 unsigned int *iface_count)
1072 interface_id[num_ifs] = i;
1074 if (++num_ifs > interfaces_size) {
1083 *iface_count = num_ifs;
1089 const char *cipher_type,
1090 const char *hash_type)
1127 static int srp_addr_equal (
const struct srp_addr *a,
const struct srp_addr *b)
1135 static void srp_addr_copy (
struct srp_addr *dest,
const struct srp_addr *src)
1140 static void srp_addr_to_nodeid (
1142 unsigned int *nodeid_out,
1144 unsigned int entries)
1148 for (i = 0; i < entries; i++) {
1149 nodeid_out[i] = srp_addr_in[i].
nodeid;
1153 static void srp_addr_copy_endian_convert (
struct srp_addr *out,
const struct srp_addr *in)
1163 static void memb_set_subtract (
1164 struct srp_addr *out_list,
int *out_list_entries,
1165 struct srp_addr *one_list,
int one_list_entries,
1166 struct srp_addr *two_list,
int two_list_entries)
1172 *out_list_entries = 0;
1174 for (i = 0; i < one_list_entries; i++) {
1175 for (j = 0; j < two_list_entries; j++) {
1176 if (srp_addr_equal (&one_list[i], &two_list[j])) {
1182 srp_addr_copy (&out_list[*out_list_entries], &one_list[i]);
1183 *out_list_entries = *out_list_entries + 1;
1192 static void memb_consensus_set (
1216 static int memb_consensus_isset (
1233 static int memb_consensus_agreed (
1237 int token_memb_entries = 0;
1241 memb_set_subtract (token_memb, &token_memb_entries,
1245 for (i = 0; i < token_memb_entries; i++) {
1246 if (memb_consensus_isset (instance, &token_memb[i]) == 0) {
1261 assert (token_memb_entries >= 1);
1266 static void memb_consensus_notset (
1268 struct srp_addr *no_consensus_list,
1269 int *no_consensus_list_entries,
1271 int comparison_list_entries)
1275 *no_consensus_list_entries = 0;
1278 if (memb_consensus_isset (instance, &instance->
my_proc_list[i]) == 0) {
1279 srp_addr_copy (&no_consensus_list[*no_consensus_list_entries], &instance->
my_proc_list[i]);
1280 *no_consensus_list_entries = *no_consensus_list_entries + 1;
1288 static int memb_set_equal (
1289 struct srp_addr *set1,
int set1_entries,
1290 struct srp_addr *set2,
int set2_entries)
1297 if (set1_entries != set2_entries) {
1300 for (i = 0; i < set2_entries; i++) {
1301 for (j = 0; j < set1_entries; j++) {
1302 if (srp_addr_equal (&set1[j], &set2[i])) {
1318 static int memb_set_subset (
1319 const struct srp_addr *subset,
int subset_entries,
1320 const struct srp_addr *fullset,
int fullset_entries)
1326 if (subset_entries > fullset_entries) {
1329 for (i = 0; i < subset_entries; i++) {
1330 for (j = 0; j < fullset_entries; j++) {
1331 if (srp_addr_equal (&subset[i], &fullset[j])) {
1345 static void memb_set_merge (
1346 const struct srp_addr *subset,
int subset_entries,
1347 struct srp_addr *fullset,
int *fullset_entries)
1353 for (i = 0; i < subset_entries; i++) {
1354 for (j = 0; j < *fullset_entries; j++) {
1355 if (srp_addr_equal (&fullset[j], &subset[i])) {
1361 srp_addr_copy (&fullset[*fullset_entries], &subset[i]);
1362 *fullset_entries = *fullset_entries + 1;
1369 static void memb_set_and_with_ring_id (
1385 for (i = 0; i < set2_entries; i++) {
1386 for (j = 0; j < set1_entries; j++) {
1387 if (srp_addr_equal (&set1[j], &set2[i])) {
1388 if (memcmp (&set1_ring_ids[j], old_ring_id,
sizeof (
struct memb_ring_id)) == 0) {
1395 srp_addr_copy (&and[*and_entries], &set1[j]);
1396 *and_entries = *and_entries + 1;
1403 static void memb_set_log(
1414 memset(list_str, 0,
sizeof(list_str));
1416 for (i = 0; i < list_entries; i++) {
1418 snprintf(int_buf,
sizeof(int_buf),
"%u", list[i].
nodeid);
1420 snprintf(int_buf,
sizeof(int_buf),
",%u", list[i].
nodeid);
1423 if (strlen(list_str) + strlen(int_buf) >=
sizeof(list_str)) {
1426 strcat(list_str, int_buf);
1429 log_printf(level,
"List '%s' contains %d entries: %s",
string, list_entries, list_str);
1432 static void my_leave_memb_clear(
1439 static unsigned int my_leave_memb_match(
1444 unsigned int ret = 0;
1455 static void my_leave_memb_set(
1474 "Cannot set LEAVE nodeid=%d",
nodeid);
1481 assert (instance != NULL);
1485 static void totemsrp_buffer_release (
struct totemsrp_instance *instance,
void *ptr)
1487 assert (instance != NULL);
1501 timer_function_token_retransmit_timeout,
1502 &instance->timer_orf_token_retransmit_timeout);
1518 timer_function_merge_detect_timeout,
1519 &instance->timer_merge_detect_timeout);
1547 "Saving state aru %x high seq received %x",
1557 "Restoring instance->my_aru %x my high seq received %x",
1564 "Resetting old ring state");
1577 timer_function_pause_timeout,
1578 &instance->timer_pause_timeout);
1592 timer_function_orf_token_warning,
1593 &instance->timer_orf_token_warning);
1607 timer_function_orf_token_timeout,
1608 &instance->timer_orf_token_timeout);
1614 reset_token_warning(instance);
1625 timer_function_heartbeat_timeout,
1626 &instance->timer_heartbeat_timeout);
1641 cancel_token_warning(instance);
1648 static void cancel_token_retransmit_timeout (
struct totemsrp_instance *instance)
1653 static void start_token_hold_retransmit_timeout (
struct totemsrp_instance *instance)
1661 timer_function_token_hold_retransmit_timeout,
1662 &instance->timer_orf_token_hold_retransmit_timeout);
1668 static void cancel_token_hold_retransmit_timeout (
struct totemsrp_instance *instance)
1674 static void memb_state_consensus_timeout_expired (
1678 int no_consensus_list_entries;
1681 if (memb_consensus_agreed (instance)) {
1682 memb_consensus_reset (instance);
1684 memb_consensus_set (instance, &instance->
my_id);
1686 reset_token_timeout (instance);
1688 memb_consensus_notset (
1691 &no_consensus_list_entries,
1695 memb_set_merge (no_consensus_list, no_consensus_list_entries,
1708 static void timer_function_pause_timeout (
void *data)
1713 reset_pause_timeout (instance);
1718 old_ring_state_restore (instance);
1723 static void timer_function_orf_token_warning (
void *data)
1730 tv_diff = qb_util_nano_current_get () / QB_TIME_NS_IN_MSEC -
1733 "Token has not been received in %d ms ", (
unsigned int) tv_diff);
1734 reset_token_warning(instance);
1736 cancel_token_warning(instance);
1740 static void timer_function_orf_token_timeout (
void *data)
1747 "The token was lost in the OPERATIONAL state.");
1749 "A processor failed, forming new configuration.");
1757 "The consensus timeout expired.");
1758 memb_state_consensus_timeout_expired (instance);
1765 "The token was lost in the COMMIT state.");
1772 "The token was lost in the RECOVERY state.");
1773 memb_recovery_state_token_loss (instance);
1779 static void timer_function_heartbeat_timeout (
void *data)
1783 "HeartBeat Timer expired Invoking token loss mechanism in state %d ", instance->
memb_state);
1784 timer_function_orf_token_timeout(data);
1787 static void memb_timer_function_state_gather (
void *data)
1799 memb_join_message_send (instance);
1810 memb_timer_function_state_gather,
1811 &instance->memb_timer_state_gather_join_timeout);
1820 static void memb_timer_function_gather_consensus_timeout (
void *data)
1823 memb_state_consensus_timeout_expired (instance);
1826 static void deliver_messages_from_recovery_to_regular (
struct totemsrp_instance *instance)
1831 unsigned int range = 0;
1844 for (i = 1; i <= range; i++) {
1850 recovery_message_item = ptr;
1861 regular_message_item.mcast =
1862 (
struct mcast *)(((
char *)recovery_message_item->
mcast) +
sizeof (
struct mcast));
1863 regular_message_item.msg_len =
1864 recovery_message_item->
msg_len -
sizeof (
struct mcast);
1865 mcast = regular_message_item.mcast;
1874 "comparing if ring id is for this processors old ring seqno %d",
1888 ®ular_message_item,
mcast->
seq);
1895 "-not adding msg with seq no %x",
mcast->
seq);
1906 int joined_list_entries = 0;
1907 unsigned int aru_save;
1914 char left_node_msg[1024];
1915 char joined_node_msg[1024];
1916 char failed_node_msg[1024];
1920 memb_consensus_reset (instance);
1922 old_ring_state_reset (instance);
1924 deliver_messages_from_recovery_to_regular (instance);
1927 "Delivering to app %x to %x",
1930 aru_save = instance->
my_aru;
1943 memb_set_subtract (joined_list, &joined_list_entries,
1961 srp_addr_to_nodeid (instance, trans_memb_list_totemip,
1974 instance->
my_aru = aru_save;
1979 srp_addr_to_nodeid (instance, new_memb_list_totemip,
1981 srp_addr_to_nodeid (instance, joined_list_totemip, joined_list,
1982 joined_list_entries);
1986 joined_list_totemip, joined_list_entries, &instance->
my_ring_id);
2048 regular_message = ptr;
2049 free (regular_message->
mcast);
2055 if (joined_list_entries) {
2057 sptr += snprintf(joined_node_msg,
sizeof(joined_node_msg)-sptr,
" joined:");
2058 for (i=0; i< joined_list_entries; i++) {
2059 sptr += snprintf(joined_node_msg+sptr,
sizeof(joined_node_msg)-sptr,
" %u", joined_list_totemip[i]);
2063 joined_node_msg[0] =
'\0';
2069 sptr += snprintf(left_node_msg,
sizeof(left_node_msg)-sptr,
" left:");
2071 sptr += snprintf(left_node_msg+sptr,
sizeof(left_node_msg)-sptr,
" %u", left_list[i]);
2074 if (my_leave_memb_match(instance, left_list[i]) == 0) {
2076 sptr2 += snprintf(failed_node_msg,
sizeof(failed_node_msg)-sptr2,
" failed:");
2078 sptr2 += snprintf(failed_node_msg+sptr2,
sizeof(left_node_msg)-sptr2,
" %u", left_list[i]);
2082 failed_node_msg[0] =
'\0';
2086 left_node_msg[0] =
'\0';
2087 failed_node_msg[0] =
'\0';
2090 my_leave_memb_clear(instance);
2093 "entering OPERATIONAL state.");
2095 "A new membership (%u:%lld) was formed. Members%s%s",
2101 if (strlen(failed_node_msg)) {
2103 "Failed to receive the leave message.%s",
2114 reset_pause_timeout (instance);
2127 static void memb_state_gather_enter (
2138 &instance->
my_id, 1,
2141 memb_join_message_send (instance);
2152 memb_timer_function_state_gather,
2153 &instance->memb_timer_state_gather_join_timeout);
2168 memb_timer_function_gather_consensus_timeout,
2169 &instance->memb_timer_state_gather_consensus_timeout);
2177 cancel_token_retransmit_timeout (instance);
2178 cancel_token_timeout (instance);
2179 cancel_merge_detect_timeout (instance);
2181 memb_consensus_reset (instance);
2183 memb_consensus_set (instance, &instance->
my_id);
2186 "entering GATHER state from %d(%s).",
2187 gather_from, gsfrom_to_msg(gather_from));
2202 static void timer_function_token_retransmit_timeout (
void *data);
2204 static void target_set_completed (
2209 memb_state_commit_token_send (instance);
2213 static void memb_state_commit_enter (
2216 old_ring_state_save (instance);
2218 memb_state_commit_token_update (instance);
2220 memb_state_commit_token_target_set (instance);
2237 "entering COMMIT state.");
2240 reset_token_retransmit_timeout (instance);
2241 reset_token_timeout (instance);
2257 static void memb_state_recovery_enter (
2262 int local_received_flg = 1;
2263 unsigned int low_ring_aru;
2264 unsigned int range = 0;
2265 unsigned int messages_originated = 0;
2274 "entering RECOVERY state.");
2285 memb_state_commit_token_send_recovery (instance, commit_token);
2300 memcpy (&my_new_memb_ring_id_list[i],
2304 memb_set_and_with_ring_id (
2306 my_new_memb_ring_id_list,
2320 "position [%d] member %u:", i,
addr[i].nodeid);
2322 "previous ring seq %llx rep %u",
2323 memb_list[i].ring_id.seq,
2324 memb_list[i].ring_id.rep);
2327 "aru %x high delivered %x received flag %d",
2329 memb_list[i].high_delivered,
2330 memb_list[i].received_flg);
2341 memb_list[i].received_flg == 0) {
2345 local_received_flg = 0;
2349 if (local_received_flg == 1) {
2362 &memb_list[i].ring_id,
2365 if (sq_lt_compare (memb_list[i].
aru, low_ring_aru)) {
2367 low_ring_aru = memb_list[i].
aru;
2388 "copying all old ring messages from %x-%x.",
2391 for (i = 1; i <= range; i++) {
2398 low_ring_aru + i, &ptr);
2403 messages_originated++;
2425 "Originated %d messages in RECOVERY.", messages_originated);
2430 "Did not need to originate any messages in recovery.");
2440 reset_token_timeout (instance);
2441 reset_token_retransmit_timeout (instance);
2454 token_hold_cancel_send (instance);
2461 struct iovec *iovec,
2462 unsigned int iov_len,
2469 unsigned int addr_idx;
2478 if (cs_queue_is_full (queue_use)) {
2509 addr_idx = sizeof (
struct mcast);
2510 for (i = 0; i < iov_len; i++) {
2511 memcpy (&
addr[addr_idx], iovec[i].iov_base, iovec[i].iov_len);
2512 addr_idx += iovec[i].iov_len;
2541 cs_queue_avail (queue_use, &avail);
2552 static int orf_token_remcast (
2560 struct sq *sort_queue;
2568 res = sq_in_range (sort_queue,
seq);
2577 res = sq_item_get (sort_queue,
seq, &ptr);
2596 static void messages_free (
2598 unsigned int token_aru)
2603 int log_release = 0;
2604 unsigned int release_to;
2605 unsigned int range = 0;
2607 release_to = token_aru;
2608 if (sq_lt_compare (instance->
my_last_aru, release_to)) {
2628 for (i = 1; i <= range; i++) {
2634 regular_message = ptr;
2635 totemsrp_buffer_release (instance, regular_message->
mcast);
2646 "releasing messages up to and including %x", release_to);
2650 static void update_aru (
2655 struct sq *sort_queue;
2657 unsigned int my_aru_saved = 0;
2667 my_aru_saved = instance->
my_aru;
2668 for (i = 1; i <= range; i++) {
2672 res = sq_item_get (sort_queue, my_aru_saved + i, &ptr);
2680 instance->
my_aru += i - 1;
2686 static int orf_token_mcast (
2689 int fcc_mcasts_allowed)
2693 struct sq *sort_queue;
2696 unsigned int fcc_mcast_current;
2701 reset_token_retransmit_timeout (instance);
2712 for (fcc_mcast_current = 0; fcc_mcast_current < fcc_mcasts_allowed; fcc_mcast_current++) {
2713 if (cs_queue_is_empty (mcast_queue)) {
2745 cs_queue_item_remove (mcast_queue);
2753 update_aru (instance);
2758 return (fcc_mcast_current);
2765 static int orf_token_rtr (
2768 unsigned int *fcc_allowed)
2773 struct sq *sort_queue;
2775 unsigned int range = 0;
2776 char retransmit_msg[1024];
2787 strcpy (retransmit_msg,
"Retransmit List: ");
2793 strcat (retransmit_msg,
value);
2795 strcat (retransmit_msg,
"");
2797 "%s", retransmit_msg);
2817 res = orf_token_remcast (instance,
rtr_list[i].
seq);
2844 (i <= range); i++) {
2849 res = sq_in_range (sort_queue, instance->
my_aru + i);
2857 res = sq_item_inuse (sort_queue, instance->
my_aru + i);
2868 res = sq_item_miss_count (sort_queue, instance->
my_aru + i);
2908 static void timer_function_token_retransmit_timeout (
void *data)
2918 token_retransmit (instance);
2919 reset_token_retransmit_timeout (instance);
2924 static void timer_function_token_hold_retransmit_timeout (
void *data)
2935 token_retransmit (instance);
2940 static void timer_function_merge_detect_timeout(
void *data)
2949 memb_merge_detect_transmit (instance);
2962 static int token_send (
2968 unsigned int orf_token_size;
2970 orf_token_size =
sizeof (
struct orf_token) +
2978 if (forward_token == 0) {
3056 res = token_send (instance, &
orf_token, 1);
3061 static void memb_state_commit_token_update (
3066 unsigned int high_aru;
3099 &memb_list[i].ring_id,
3102 if (sq_lt_compare (high_aru, memb_list[i].
aru)) {
3103 high_aru = memb_list[i].
aru;
3110 &memb_list[i].ring_id,
3113 if (sq_lt_compare (memb_list[i].
aru, high_aru)) {
3128 static void memb_state_commit_token_target_set (
3142 static int memb_state_commit_token_send_recovery (
3146 unsigned int commit_token_size;
3168 reset_token_retransmit_timeout (instance);
3172 static int memb_state_commit_token_send (
3175 unsigned int commit_token_size;
3197 reset_token_retransmit_timeout (instance);
3205 int token_memb_entries = 0;
3207 unsigned int lowest_nodeid;
3209 memb_set_subtract (token_memb, &token_memb_entries,
3216 assert(token_memb_entries > 0);
3218 lowest_nodeid = token_memb[0].nodeid;
3219 for (i = 1; i < token_memb_entries; i++) {
3220 if (lowest_nodeid > token_memb[i].
nodeid) {
3221 lowest_nodeid = token_memb[i].nodeid;
3227 static int srp_addr_compare (
const void *a,
const void *b)
3241 static void memb_state_commit_token_create (
3247 int token_memb_entries = 0;
3250 "Creating commit token because I am the rep.");
3252 memb_set_subtract (token_memb, &token_memb_entries,
3271 qsort (token_memb, token_memb_entries,
sizeof (
struct srp_addr),
3280 memcpy (
addr, token_memb,
3281 token_memb_entries *
sizeof (
struct srp_addr));
3282 memset (memb_list, 0,
3288 char memb_join_data[40000];
3291 unsigned int addr_idx;
3302 ((instance->my_proc_list_entries + instance->my_failed_list_entries) *
sizeof(
struct srp_addr));
3304 if (msg_len >
sizeof(memb_join_data)) {
3306 "memb_join_message too long. Ignoring message.");
3322 memcpy (&
addr[addr_idx],
3329 memcpy (&
addr[addr_idx],
3351 char memb_join_data[40000];
3354 unsigned int addr_idx;
3355 int active_memb_entries;
3360 "sending join/leave message");
3367 &instance->
my_id, 1,
3370 memb_set_subtract (active_memb, &active_memb_entries,
3372 &instance->
my_id, 1);
3375 ((active_memb_entries + instance->my_failed_list_entries) *
sizeof(
struct srp_addr));
3377 if (msg_len >
sizeof(memb_join_data)) {
3379 "memb_leave message too long. Ignoring message.");
3402 memcpy (&
addr[addr_idx],
3404 active_memb_entries *
3407 active_memb_entries *
3409 memcpy (&
addr[addr_idx],
3449 static void memb_ring_id_set (
3468 token_hold_cancel_send (instance);
3471 if (callback_handle == 0) {
3474 *handle_out = (
void *)callback_handle;
3475 qb_list_init (&callback_handle->
list);
3477 callback_handle->
data = (
void *)
data;
3479 callback_handle->
delete =
delete;
3498 qb_list_del (&h->
list);
3505 static void token_callbacks_execute (
3509 struct qb_list_head *list, *tmp_iter;
3510 struct qb_list_head *callback_listhead = 0;
3526 qb_list_for_each_safe(
list, tmp_iter, callback_listhead) {
3539 if (res == -1 && del == 1) {
3540 qb_list_add (
list, callback_listhead);
3566 if (queue_use != NULL) {
3567 backlog = cs_queue_used (queue_use);
3574 static int fcc_calculate (
3578 unsigned int transmits_allowed;
3579 unsigned int backlog_calc;
3587 instance->
my_cbl = backlog_get (instance);
3596 if (backlog_calc > 0 && transmits_allowed > backlog_calc) {
3597 transmits_allowed = backlog_calc;
3601 return (transmits_allowed);
3607 static void fcc_rtr_limit (
3610 unsigned int *transmits_allowed)
3614 assert (check >= 0);
3621 *transmits_allowed = 0;
3625 static void fcc_token_update (
3628 unsigned int msgs_transmitted)
3630 token->
fcc += msgs_transmitted - instance->
my_trc;
3632 instance->
my_trc = msgs_transmitted;
3639 static int check_orf_token_sanity(
3643 int endian_conversion_needed)
3647 size_t required_len;
3649 if (msg_len <
sizeof(
struct orf_token)) {
3651 "Received orf_token message is too short... ignoring.");
3656 if (endian_conversion_needed) {
3662 required_len =
sizeof(
struct orf_token) + rtr_entries *
sizeof(
struct rtr_item);
3663 if (msg_len < required_len) {
3665 "Received orf_token message is too short... ignoring.");
3673 static int check_mcast_sanity(
3677 int endian_conversion_needed)
3680 if (msg_len <
sizeof(
struct mcast)) {
3682 "Received mcast message is too short... ignoring.");
3690 static int check_memb_merge_detect_sanity(
3694 int endian_conversion_needed)
3699 "Received memb_merge_detect message is too short... ignoring.");
3707 static int check_memb_join_sanity(
3711 int endian_conversion_needed)
3716 size_t required_len;
3718 if (msg_len <
sizeof(
struct memb_join)) {
3720 "Received memb_join message is too short... ignoring.");
3728 if (endian_conversion_needed) {
3734 if (msg_len < required_len) {
3736 "Received memb_join message is too short... ignoring.");
3744 static int check_memb_commit_token_sanity(
3748 int endian_conversion_needed)
3752 size_t required_len;
3756 "Received memb_commit_token message is too short... ignoring.");
3762 if (endian_conversion_needed) {
3768 if (msg_len < required_len) {
3770 "Received memb_commit_token message is too short... ignoring.");
3778 static int check_token_hold_cancel_sanity(
3782 int endian_conversion_needed)
3787 "Received token_hold_cancel message is too short... ignoring.");
3803 static int message_handler_orf_token (
3807 int endian_conversion_needed)
3809 char token_storage[1500];
3810 char token_convert[1500];
3813 unsigned int transmits_allowed;
3814 unsigned int mcasted_retransmit;
3815 unsigned int mcasted_regular;
3816 unsigned int last_aru;
3819 unsigned long long tv_current;
3820 unsigned long long tv_diff;
3822 tv_current = qb_util_nano_current_get ();
3823 tv_diff = tv_current -
tv_old;
3827 "Time since last token %0.4f ms", ((
float)tv_diff) / 1000000.0);
3830 if (check_orf_token_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
3837 #ifdef TEST_DROP_ORF_TOKEN_PERCENTAGE
3838 if (random()%100 < TEST_DROP_ORF_TOKEN_PERCENTAGE) {
3843 if (endian_conversion_needed) {
3844 orf_token_endian_convert ((
struct orf_token *)msg,
3846 msg = (
struct orf_token *)token_convert;
3853 token = (
struct orf_token *)token_storage;
3854 memcpy (token, msg,
sizeof (
struct orf_token));
3863 start_merge_detect_timeout (instance);
3866 cancel_merge_detect_timeout (instance);
3867 cancel_token_hold_retransmit_timeout (instance);
3873 #ifdef TEST_RECOVERY_MSG_COUNT
3915 messages_free (instance, token->
aru);
3934 reset_heartbeat_timeout(instance);
3937 cancel_heartbeat_timeout(instance);
3952 transmits_allowed = fcc_calculate (instance, token);
3953 mcasted_retransmit = orf_token_rtr (instance, token, &transmits_allowed);
3961 fcc_rtr_limit (instance, token, &transmits_allowed);
3962 mcasted_regular = orf_token_mcast (instance, token, transmits_allowed);
3969 fcc_token_update (instance, token, mcasted_retransmit +
3972 if (sq_lt_compare (instance->
my_aru, token->
aru) ||
3977 if (token->
aru == token->
seq) {
3983 if (token->
aru == last_aru && token->
aru_addr != 0) {
3998 "FAILED TO RECEIVE");
4002 memb_set_merge (&instance->
my_id, 1,
4029 "token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, aru %x",
4042 "install seq %x aru %x high seq received %x",
4060 "retrans flag count %x token aru %x install seq %x aru %x %x",
4064 memb_state_operational_enter (instance);
4071 token_send (instance, token, forward_token);
4074 tv_current = qb_util_nano_current_get ();
4075 tv_diff = tv_current -
tv_old;
4079 ((
float)tv_diff) / 1000000.0);
4082 messages_deliver_to_app (instance, 0,
4090 reset_token_timeout (instance);
4091 reset_token_retransmit_timeout (instance);
4095 start_token_hold_retransmit_timeout (instance);
4105 reset_heartbeat_timeout(instance);
4108 cancel_heartbeat_timeout(instance);
4114 static void messages_deliver_to_app (
4117 unsigned int end_point)
4122 struct mcast *mcast_in;
4123 struct mcast mcast_header;
4124 unsigned int range = 0;
4125 int endian_conversion_required;
4126 unsigned int my_high_delivered_stored = 0;
4142 for (i = 1; i <= range; i++) {
4150 my_high_delivered_stored + i);
4156 my_high_delivered_stored + i, &ptr);
4160 if (res != 0 && skip == 0) {
4171 sort_queue_item_p = ptr;
4173 mcast_in = sort_queue_item_p->
mcast;
4174 assert (mcast_in != (
struct mcast *)0xdeadbeef);
4176 endian_conversion_required = 0;
4178 endian_conversion_required = 1;
4179 mcast_endian_convert (mcast_in, &mcast_header);
4181 memcpy (&mcast_header, mcast_in,
sizeof (
struct mcast));
4188 memb_set_subset (&mcast_header.system_from,
4202 "Delivering MCAST message with seq %x to pending delivery queue",
4209 mcast_header.header.nodeid,
4210 ((
char *)sort_queue_item_p->
mcast) + sizeof (
struct mcast),
4212 endian_conversion_required);
4219 static int message_handler_mcast (
4223 int endian_conversion_needed)
4226 struct sq *sort_queue;
4227 struct mcast mcast_header;
4229 if (check_mcast_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4233 if (endian_conversion_needed) {
4234 mcast_endian_convert (msg, &mcast_header);
4236 memcpy (&mcast_header, msg,
sizeof (
struct mcast));
4247 #ifdef TEST_DROP_MCAST_PERCENTAGE
4248 if (random()%100 < TEST_DROP_MCAST_PERCENTAGE) {
4256 if (memcmp (&instance->
my_ring_id, &mcast_header.ring_id,
4262 &mcast_header.system_from, 1,
4268 if (!memb_set_subset (
4269 &mcast_header.system_from,
4274 memb_set_merge (&mcast_header.system_from, 1,
4295 "Received ringid(%u:%lld) seq %x",
4296 mcast_header.ring_id.rep,
4297 mcast_header.ring_id.seq,
4305 sq_in_range (sort_queue, mcast_header.seq) &&
4306 sq_item_inuse (sort_queue, mcast_header.seq) == 0) {
4320 mcast_header.seq)) {
4327 update_aru (instance);
4336 static int message_handler_memb_merge_detect (
4340 int endian_conversion_needed)
4344 if (check_memb_merge_detect_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4348 if (endian_conversion_needed) {
4375 if (!memb_set_subset (
4399 static void memb_join_process (
4405 int gather_entered = 0;
4406 int fail_minus_memb_entries = 0;
4426 "Discarding LEAVE message during flush, nodeid=%u",
4448 if (memb_set_equal (proc_list,
4453 memb_set_equal (failed_list,
4462 if (memb_consensus_agreed (instance) && instance->
failed_to_recv == 1) {
4469 memb_state_commit_token_create (instance);
4471 memb_state_commit_enter (instance);
4474 if (memb_consensus_agreed (instance) &&
4475 memb_lowest_in_config (instance)) {
4477 memb_state_commit_token_create (instance);
4479 memb_state_commit_enter (instance);
4484 if (memb_set_subset (proc_list,
4489 memb_set_subset (failed_list,
4501 memb_set_merge (proc_list,
4505 if (memb_set_subset (
4506 &instance->
my_id, 1,
4513 if (memb_set_subset (
4518 if (memb_set_subset (
4523 memb_set_merge (failed_list,
4527 memb_set_subtract (fail_minus_memb,
4528 &fail_minus_memb_entries,
4534 memb_set_merge (fail_minus_memb,
4535 fail_minus_memb_entries,
4546 if (gather_entered == 0 &&
4553 static void memb_join_endian_convert (
const struct memb_join *in,
struct memb_join *out)
4576 srp_addr_copy_endian_convert (&out_proc_list[i], &in_proc_list[i]);
4579 srp_addr_copy_endian_convert (&out_failed_list[i], &in_failed_list[i]);
4605 srp_addr_copy_endian_convert (&out_addr[i], &in_addr[i]);
4622 static void orf_token_endian_convert (
const struct orf_token *in,
struct orf_token *out)
4647 static void mcast_endian_convert (
const struct mcast *in,
struct mcast *out)
4664 static void memb_merge_detect_endian_convert (
4677 static int ignore_join_under_operational (
4689 if (memb_set_subset (&instance->
my_id, 1,
4707 static int message_handler_memb_join (
4711 int endian_conversion_needed)
4714 struct memb_join *memb_join_convert = alloca (msg_len);
4716 if (check_memb_join_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4720 if (endian_conversion_needed) {
4722 memb_join_endian_convert (msg, memb_join_convert);
4732 if (pause_flush (instance)) {
4741 if (!ignore_join_under_operational (instance,
memb_join)) {
4742 memb_join_process (instance,
memb_join);
4747 memb_join_process (instance,
memb_join);
4758 memb_join_process (instance,
memb_join);
4771 memb_join_process (instance,
memb_join);
4772 memb_recovery_state_token_loss (instance);
4780 static int message_handler_memb_commit_token (
4784 int endian_conversion_needed)
4794 "got commit token");
4796 if (check_memb_commit_token_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4800 if (endian_conversion_needed) {
4801 memb_commit_token_endian_convert (msg, memb_commit_token_convert);
4803 memcpy (memb_commit_token_convert, msg, msg_len);
4808 #ifdef TEST_DROP_COMMIT_TOKEN_PERCENTAGE
4809 if (random()%100 < TEST_DROP_COMMIT_TOKEN_PERCENTAGE) {
4819 memb_set_subtract (sub, &sub_entries,
4823 if (memb_set_equal (
addr,
4830 memb_state_commit_enter (instance);
4859 "Sending initial ORF token");
4862 orf_token_send_initial (instance);
4863 reset_token_timeout (instance);
4864 reset_token_retransmit_timeout (instance);
4871 static int message_handler_token_hold_cancel (
4875 int endian_conversion_needed)
4879 if (check_token_hold_cancel_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4888 timer_function_token_retransmit_timeout (instance);
4894 static int check_message_header_validity(
4897 unsigned int msg_len,
4902 const char *guessed_str;
4903 const char *msg_byte = msg;
4907 "Message received from %s is too short... Ignoring %u.",
4921 if (message_header->
magic == 0xFFFF) {
4925 guessed_str =
"Corosync 2.2";
4926 }
else if (message_header->
magic == 0xFEFE) {
4930 guessed_str =
"Corosync 2.3+";
4931 }
else if (msg_byte[0] == 0x01) {
4935 guessed_str =
"unencrypted Kronosnet";
4936 }
else if (msg_byte[0] >= 0 && msg_byte[0] <= 5) {
4941 guessed_str =
"unencrypted Corosync 2.0/2.1/1.x/OpenAIS";
4952 guessed_str =
"encrypted Kronosnet/Corosync 2.0/2.1/1.x/OpenAIS or unknown";
4956 "Message received from %s has bad magic number (probably sent by %s).. Ignoring",
4965 "Message received from %s has unsupported version %u... Ignoring",
4979 unsigned int msg_len,
4985 if (check_message_header_validity(context, msg, msg_len,
system_from) == -1) {
4989 switch (message_header->
type) {
5010 "Message received from %s has wrong type... ignoring %d.\n",
5012 (
int)message_header->
type);
5030 unsigned short ip_port,
5031 unsigned int iface_no)
5051 unsigned int iface_no)
5067 "Created or loaded sequence id %llx.%u for this ring.",
5101 void (*totem_service_ready) (
void))
5171 timer_function_orf_token_timeout(context);