57 #include <sys/types.h> 59 #include <sys/socket.h> 62 #include <sys/ioctl.h> 63 #include <sys/param.h> 64 #include <netinet/in.h> 65 #include <arpa/inet.h> 78 #include <qb/qbdefs.h> 79 #include <qb/qbutil.h> 80 #include <qb/qbloop.h> 86 #define LOGSYS_UTILS_ONLY 1 95 #define LOCALHOST_IP inet_addr("127.0.0.1") 96 #define QUEUE_RTR_ITEMS_SIZE_MAX 16384 97 #define RETRANS_MESSAGE_QUEUE_SIZE_MAX 16384 98 #define RECEIVED_MESSAGE_QUEUE_SIZE_MAX 500 100 #define RETRANSMIT_ENTRIES_MAX 30 101 #define TOKEN_SIZE_MAX 64000 102 #define LEAVE_DUMMY_NODEID 0 114 #define SEQNO_START_MSG 0x0 115 #define SEQNO_START_TOKEN 0x0 137 #define ENDIAN_LOCAL 0xff22 374 struct sq regular_sort_queue;
376 struct sq recovery_sort_queue;
433 void (*totemsrp_log_printf) (
436 const char *
function,
439 const char *format, ...)
__attribute__((format(printf, 6, 7)));;
449 void (*totemsrp_deliver_fn) (
452 unsigned int msg_len,
453 int endian_conversion_required);
455 void (*totemsrp_confchg_fn) (
457 const unsigned int *member_list,
size_t member_list_entries,
458 const unsigned int *left_list,
size_t left_list_entries,
459 const unsigned int *joined_list,
size_t joined_list_entries,
462 void (*totemsrp_service_ready_fn) (void);
464 void (*totemsrp_waiting_trans_ack_cb_fn) (
465 int waiting_trans_ack);
467 void (*memb_ring_id_create_or_load) (
471 void (*memb_ring_id_store) (
472 const struct memb_ring_id *memb_ring_id,
525 char commit_token_storage[40000];
530 int (*handler_functions[6]) (
534 int endian_conversion_needed);
579 static int message_handler_orf_token (
583 int endian_conversion_needed);
585 static int message_handler_mcast (
589 int endian_conversion_needed);
591 static int message_handler_memb_merge_detect (
595 int endian_conversion_needed);
597 static int message_handler_memb_join (
601 int endian_conversion_needed);
603 static int message_handler_memb_commit_token (
607 int endian_conversion_needed);
609 static int message_handler_token_hold_cancel (
613 int endian_conversion_needed);
617 static unsigned int main_msgs_missing (
void);
619 static void main_token_seqid_get (
622 unsigned int *token_is);
624 static void srp_addr_copy (
struct srp_addr *dest,
const struct srp_addr *src);
626 static void srp_addr_to_nodeid (
627 unsigned int *nodeid_out,
629 unsigned int entries);
631 static int srp_addr_equal (
const struct srp_addr *a,
const struct srp_addr *b);
637 static void messages_deliver_to_app (
struct totemsrp_instance *instance,
int skip,
unsigned int end_point);
639 int fcc_mcasts_allowed);
640 static void messages_free (
struct totemsrp_instance *instance,
unsigned int token_aru);
644 static void target_set_completed (
void *context);
646 static void memb_state_commit_token_target_set (
struct totemsrp_instance *instance);
651 static void orf_token_endian_convert (
const struct orf_token *in,
struct orf_token *out);
652 static void memb_commit_token_endian_convert (
const struct memb_commit_token *in,
struct memb_commit_token *out);
653 static void memb_join_endian_convert (
const struct memb_join *in,
struct memb_join *out);
654 static void mcast_endian_convert (
const struct mcast *in,
struct mcast *out);
655 static void memb_merge_detect_endian_convert (
658 static void srp_addr_copy_endian_convert (
struct srp_addr *out,
const struct srp_addr *in);
659 static void timer_function_orf_token_timeout (
void *data);
660 static void timer_function_pause_timeout (
void *data);
661 static void timer_function_heartbeat_timeout (
void *data);
662 static void timer_function_token_retransmit_timeout (
void *data);
663 static void timer_function_token_hold_retransmit_timeout (
void *data);
664 static void timer_function_merge_detect_timeout (
void *data);
666 static void totemsrp_buffer_release (
struct totemsrp_instance *instance,
void *ptr);
672 unsigned int msg_len);
677 unsigned int iface_no);
682 message_handler_orf_token,
683 message_handler_mcast,
684 message_handler_memb_merge_detect,
685 message_handler_memb_join,
686 message_handler_memb_commit_token,
687 message_handler_token_hold_cancel
691 #define log_printf(level, format, args...) \ 693 instance->totemsrp_log_printf ( \ 694 level, instance->totemsrp_subsys_id, \ 695 __FUNCTION__, __FILE__, __LINE__, \ 698 #define LOGSYS_PERROR(err_num, level, fmt, args...) \ 700 char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \ 701 const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \ 702 instance->totemsrp_log_printf ( \ 703 level, instance->totemsrp_subsys_id, \ 704 __FUNCTION__, __FILE__, __LINE__, \ 705 fmt ": %s (%d)\n", ##args, _error_ptr, err_num); \ 711 return gather_state_from_desc[gsfrom];
751 static void main_token_seqid_get (
754 unsigned int *token_is)
766 static unsigned int main_msgs_missing (
void)
775 uint64_t timestamp_msec;
778 now_msec = (qb_util_nano_current_get () / QB_TIME_NS_IN_MSEC);
783 "Process pause detected for %d ms, flushing membership messages.", (
unsigned int)(now_msec - timestamp_msec));
798 unsigned long long nano_secs = qb_util_nano_current_get ();
800 time_now = (nano_secs / QB_TIME_NS_IN_MSEC);
833 qb_loop_t *poll_handle,
841 unsigned int msg_len,
842 int endian_conversion_required),
846 const unsigned int *member_list,
size_t member_list_entries,
847 const unsigned int *left_list,
size_t left_list_entries,
848 const unsigned int *joined_list,
size_t joined_list_entries,
850 void (*waiting_trans_ack_cb_fn) (
856 if (instance == NULL) {
860 totemsrp_instance_initialize (instance);
898 "Token Timeout (%d ms) retransmit timeout (%d ms)",
901 "token hold (%d ms) retransmits before loss (%d retrans)",
904 "join (%d ms) send_join (%d ms) consensus (%d ms) merge (%d ms)",
911 "downcheck (%d ms) fail to recv const (%d msgs)",
917 "window size per rotation (%d messages) maximum messages per rotation (%d messages)",
921 "missed count const (%d messages)",
925 "send threads (%d threads)", totem_config->
threads);
927 "RRP token expired timeout (%d ms)",
930 "RRP token problem counter (%d ms)",
933 "RRP threshold (%d problem count)",
936 "RRP multicast threshold (%d problem count)",
939 "RRP automatic recovery check timeout (%d ms)",
966 timer_function_pause_timeout (instance);
970 "HeartBeat is Disabled. To enable set heartbeat_failures_allowed > 0");
981 "total heartbeat_timeout (%d ms) is not less than token timeout (%d ms)",
985 "heartbeat_timeout = heartbeat_failures_allowed * token_retransmit_timeout + max_network_delay");
987 "heartbeat timeout should be less than the token timeout. Heartbeat is disabled!!");
1003 main_iface_change_fn,
1004 main_token_seqid_get,
1006 target_set_completed);
1023 token_event_stats_collector,
1029 token_event_stats_collector,
1031 *srp_context = instance;
1044 memb_leave_message_send (instance);
1064 unsigned int nodeid,
1066 unsigned int interfaces_size,
1068 unsigned int *iface_count)
1072 unsigned int found = 0;
1085 if (interfaces_size >= *iface_count) {
1105 if (interfaces_size >= *iface_count) {
1122 const char *cipher_type,
1123 const char *hash_type)
1172 static int srp_addr_equal (
const struct srp_addr *a,
const struct srp_addr *b)
1177 for (i = 0; i < 1; i++) {
1186 static void srp_addr_copy (
struct srp_addr *dest,
const struct srp_addr *src)
1197 static void srp_addr_to_nodeid (
1198 unsigned int *nodeid_out,
1200 unsigned int entries)
1204 for (i = 0; i < entries; i++) {
1205 nodeid_out[i] = srp_addr_in[i].
addr[0].
nodeid;
1209 static void srp_addr_copy_endian_convert (
struct srp_addr *out,
const struct srp_addr *in)
1223 static void memb_set_subtract (
1224 struct srp_addr *out_list,
int *out_list_entries,
1225 struct srp_addr *one_list,
int one_list_entries,
1226 struct srp_addr *two_list,
int two_list_entries)
1232 *out_list_entries = 0;
1234 for (i = 0; i < one_list_entries; i++) {
1235 for (j = 0; j < two_list_entries; j++) {
1236 if (srp_addr_equal (&one_list[i], &two_list[j])) {
1242 srp_addr_copy (&out_list[*out_list_entries], &one_list[i]);
1243 *out_list_entries = *out_list_entries + 1;
1252 static void memb_consensus_set (
1279 static int memb_consensus_isset (
1296 static int memb_consensus_agreed (
1299 struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
1300 int token_memb_entries = 0;
1304 memb_set_subtract (token_memb, &token_memb_entries,
1308 for (i = 0; i < token_memb_entries; i++) {
1309 if (memb_consensus_isset (instance, &token_memb[i]) == 0) {
1324 assert (token_memb_entries >= 1);
1329 static void memb_consensus_notset (
1331 struct srp_addr *no_consensus_list,
1332 int *no_consensus_list_entries,
1334 int comparison_list_entries)
1338 *no_consensus_list_entries = 0;
1341 if (memb_consensus_isset (instance, &instance->
my_proc_list[i]) == 0) {
1342 srp_addr_copy (&no_consensus_list[*no_consensus_list_entries], &instance->
my_proc_list[i]);
1343 *no_consensus_list_entries = *no_consensus_list_entries + 1;
1351 static int memb_set_equal (
1352 struct srp_addr *set1,
int set1_entries,
1353 struct srp_addr *set2,
int set2_entries)
1360 if (set1_entries != set2_entries) {
1363 for (i = 0; i < set2_entries; i++) {
1364 for (j = 0; j < set1_entries; j++) {
1365 if (srp_addr_equal (&set1[j], &set2[i])) {
1381 static int memb_set_subset (
1382 const struct srp_addr *subset,
int subset_entries,
1383 const struct srp_addr *fullset,
int fullset_entries)
1389 if (subset_entries > fullset_entries) {
1392 for (i = 0; i < subset_entries; i++) {
1393 for (j = 0; j < fullset_entries; j++) {
1394 if (srp_addr_equal (&subset[i], &fullset[j])) {
1408 static void memb_set_merge (
1409 const struct srp_addr *subset,
int subset_entries,
1410 struct srp_addr *fullset,
int *fullset_entries)
1416 for (i = 0; i < subset_entries; i++) {
1417 for (j = 0; j < *fullset_entries; j++) {
1418 if (srp_addr_equal (&fullset[j], &subset[i])) {
1424 srp_addr_copy (&fullset[*fullset_entries], &subset[i]);
1425 *fullset_entries = *fullset_entries + 1;
1432 static void memb_set_and_with_ring_id (
1448 for (i = 0; i < set2_entries; i++) {
1449 for (j = 0; j < set1_entries; j++) {
1450 if (srp_addr_equal (&set1[j], &set2[i])) {
1451 if (memcmp (&set1_ring_ids[j], old_ring_id,
sizeof (
struct memb_ring_id)) == 0) {
1458 srp_addr_copy (&and[*and_entries], &set1[j]);
1459 *and_entries = *and_entries + 1;
1466 #ifdef CODE_COVERAGE 1467 static void memb_set_print (
1474 printf (
"List '%s' contains %d entries:\n",
string, list_entries);
1476 for (i = 0; i < list_entries; i++) {
1477 printf (
"Address %d with %d rings\n", i, list[i].
no_addrs);
1478 for (j = 0; j < list[i].
no_addrs; j++) {
1479 printf (
"\tiface %d %s\n", j,
totemip_print (&list[i].addr[j]));
1480 printf (
"\tfamily %d\n", list[i].addr[j].
family);
1485 static void my_leave_memb_clear(
1492 static unsigned int my_leave_memb_match(
1494 unsigned int nodeid)
1497 unsigned int ret = 0;
1508 static void my_leave_memb_set(
1510 unsigned int nodeid)
1527 "Cannot set LEAVE nodeid=%d", nodeid);
1534 assert (instance != NULL);
1538 static void totemsrp_buffer_release (
struct totemsrp_instance *instance,
void *ptr)
1540 assert (instance != NULL);
1554 timer_function_token_retransmit_timeout,
1557 log_printf(instance->totemsrp_log_level_error,
"reset_token_retransmit_timeout - qb_loop_timer_add error : %d", res);
1571 timer_function_merge_detect_timeout,
1574 log_printf(instance->totemsrp_log_level_error,
"start_merge_detect_timeout - qb_loop_timer_add error : %d", res);
1577 instance->my_merge_detect_timeout_outstanding = 1;
1600 "Saving state aru %x high seq received %x",
1610 "Restoring instance->my_aru %x my high seq received %x",
1617 "Resetting old ring state");
1630 timer_function_pause_timeout,
1633 log_printf(instance->totemsrp_log_level_error,
"reset_pause_timeout - qb_loop_timer_add error : %d", res);
1645 timer_function_orf_token_timeout,
1648 log_printf(instance->totemsrp_log_level_error,
"reset_token_timeout - qb_loop_timer_add error : %d", res);
1660 timer_function_heartbeat_timeout,
1663 log_printf(instance->totemsrp_log_level_error,
"reset_heartbeat_timeout - qb_loop_timer_add error : %d", res);
1676 static void cancel_token_retransmit_timeout (
struct totemsrp_instance *instance)
1681 static void start_token_hold_retransmit_timeout (
struct totemsrp_instance *instance)
1689 timer_function_token_hold_retransmit_timeout,
1692 log_printf(instance->totemsrp_log_level_error,
"start_token_hold_retransmit_timeout - qb_loop_timer_add error : %d", res);
1696 static void cancel_token_hold_retransmit_timeout (
struct totemsrp_instance *instance)
1702 static void memb_state_consensus_timeout_expired (
1705 struct srp_addr no_consensus_list[PROCESSOR_COUNT_MAX];
1706 int no_consensus_list_entries;
1709 if (memb_consensus_agreed (instance)) {
1710 memb_consensus_reset (instance);
1712 memb_consensus_set (instance, &instance->
my_id);
1714 reset_token_timeout (instance);
1716 memb_consensus_notset (
1719 &no_consensus_list_entries,
1723 memb_set_merge (no_consensus_list, no_consensus_list_entries,
1736 static void timer_function_pause_timeout (
void *data)
1741 reset_pause_timeout (instance);
1746 old_ring_state_restore (instance);
1751 static void timer_function_orf_token_timeout (
void *data)
1758 "The token was lost in the OPERATIONAL state.");
1760 "A processor failed, forming new configuration.");
1768 "The consensus timeout expired.");
1769 memb_state_consensus_timeout_expired (instance);
1776 "The token was lost in the COMMIT state.");
1783 "The token was lost in the RECOVERY state.");
1784 memb_recovery_state_token_loss (instance);
1790 static void timer_function_heartbeat_timeout (
void *data)
1794 "HeartBeat Timer expired Invoking token loss mechanism in state %d ", instance->
memb_state);
1795 timer_function_orf_token_timeout(data);
1798 static void memb_timer_function_state_gather (
void *data)
1810 memb_join_message_send (instance);
1821 memb_timer_function_state_gather,
1822 &instance->memb_timer_state_gather_join_timeout);
1825 log_printf(instance->totemsrp_log_level_error,
"memb_timer_function_state_gather - qb_loop_timer_add error : %d", res);
1831 static void memb_timer_function_gather_consensus_timeout (
void *data)
1834 memb_state_consensus_timeout_expired (instance);
1837 static void deliver_messages_from_recovery_to_regular (
struct totemsrp_instance *instance)
1842 unsigned int range = 0;
1855 for (i = 1; i <= range; i++) {
1861 recovery_message_item = ptr;
1866 mcast = recovery_message_item->
mcast;
1872 regular_message_item.
mcast =
1873 (
struct mcast *)(((
char *)recovery_message_item->
mcast) +
sizeof (
struct mcast));
1874 regular_message_item.
msg_len =
1875 recovery_message_item->
msg_len -
sizeof (
struct mcast);
1876 mcast = regular_message_item.
mcast;
1885 "comparing if ring id is for this processors old ring seqno %d",
1899 ®ular_message_item, mcast->
seq);
1906 "-not adding msg with seq no %x", mcast->
seq);
1916 struct srp_addr joined_list[PROCESSOR_COUNT_MAX];
1917 int joined_list_entries = 0;
1918 unsigned int aru_save;
1925 char left_node_msg[1024];
1926 char joined_node_msg[1024];
1927 char failed_node_msg[1024];
1931 memb_consensus_reset (instance);
1933 old_ring_state_reset (instance);
1935 deliver_messages_from_recovery_to_regular (instance);
1938 "Delivering to app %x to %x",
1941 aru_save = instance->
my_aru;
1954 memb_set_subtract (joined_list, &joined_list_entries,
1982 srp_addr_to_nodeid (trans_memb_list_totemip,
1995 instance->
my_aru = aru_save;
2005 joined_list, joined_list_entries,
2010 srp_addr_to_nodeid (new_memb_list_totemip,
2012 srp_addr_to_nodeid (joined_list_totemip, joined_list,
2013 joined_list_entries);
2017 joined_list_totemip, joined_list_entries, &instance->
my_ring_id);
2079 regular_message = ptr;
2080 free (regular_message->
mcast);
2086 if (joined_list_entries) {
2088 sptr += snprintf(joined_node_msg,
sizeof(joined_node_msg)-sptr,
" joined:");
2089 for (i=0; i< joined_list_entries; i++) {
2090 sptr += snprintf(joined_node_msg+sptr,
sizeof(joined_node_msg)-sptr,
" %u", joined_list_totemip[i]);
2094 joined_node_msg[0] =
'\0';
2100 sptr += snprintf(left_node_msg,
sizeof(left_node_msg)-sptr,
" left:");
2102 sptr += snprintf(left_node_msg+sptr,
sizeof(left_node_msg)-sptr,
" %u", left_list[i]);
2105 if (my_leave_memb_match(instance, left_list[i]) == 0) {
2107 sptr2 += snprintf(failed_node_msg,
sizeof(failed_node_msg)-sptr2,
" failed:");
2109 sptr2 += snprintf(failed_node_msg+sptr2,
sizeof(left_node_msg)-sptr2,
" %u", left_list[i]);
2113 failed_node_msg[0] =
'\0';
2117 left_node_msg[0] =
'\0';
2118 failed_node_msg[0] =
'\0';
2121 my_leave_memb_clear(instance);
2124 "entering OPERATIONAL state.");
2126 "A new membership (%s:%lld) was formed. Members%s%s",
2132 if (strlen(failed_node_msg)) {
2134 "Failed to receive the leave message.%s",
2145 reset_pause_timeout (instance);
2158 static void memb_state_gather_enter (
2169 &instance->
my_id, 1,
2172 memb_join_message_send (instance);
2183 memb_timer_function_state_gather,
2184 &instance->memb_timer_state_gather_join_timeout);
2186 log_printf(instance->totemsrp_log_level_error,
"memb_state_gather_enter - qb_loop_timer_add error(1) : %d", res);
2192 qb_loop_timer_del (instance->totemsrp_poll_handle,
2193 instance->memb_timer_state_gather_consensus_timeout);
2195 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
2197 instance->totem_config->consensus_timeout*QB_TIME_NS_IN_MSEC,
2199 memb_timer_function_gather_consensus_timeout,
2200 &instance->memb_timer_state_gather_consensus_timeout);
2202 log_printf(instance->totemsrp_log_level_error,
"memb_state_gather_enter - qb_loop_timer_add error(2) : %d", res);
2208 cancel_token_retransmit_timeout (instance);
2209 cancel_token_timeout (instance);
2210 cancel_merge_detect_timeout (instance);
2212 memb_consensus_reset (instance);
2214 memb_consensus_set (instance, &instance->my_id);
2216 log_printf (instance->totemsrp_log_level_debug,
2217 "entering GATHER state from %d(%s).",
2218 gather_from, gsfrom_to_msg(gather_from));
2221 instance->stats.gather_entered++;
2227 instance->stats.continuous_gather++;
2233 static void timer_function_token_retransmit_timeout (
void *data);
2235 static void target_set_completed (
2240 memb_state_commit_token_send (instance);
2244 static void memb_state_commit_enter (
2247 old_ring_state_save (instance);
2249 memb_state_commit_token_update (instance);
2251 memb_state_commit_token_target_set (instance);
2267 "entering COMMIT state.");
2270 reset_token_retransmit_timeout (instance);
2271 reset_token_timeout (instance);
2287 static void memb_state_recovery_enter (
2292 int local_received_flg = 1;
2293 unsigned int low_ring_aru;
2294 unsigned int range = 0;
2295 unsigned int messages_originated = 0;
2298 struct memb_ring_id my_new_memb_ring_id_list[PROCESSOR_COUNT_MAX];
2304 "entering RECOVERY state.");
2315 memb_state_commit_token_send_recovery (instance, commit_token);
2330 memcpy (&my_new_memb_ring_id_list[i],
2331 &memb_list[i].ring_id,
2334 memb_set_and_with_ring_id (
2336 my_new_memb_ring_id_list,
2350 "position [%d] member %s:", i,
totemip_print (&addr[i].addr[0]));
2352 "previous ring seq %llx rep %s",
2357 "aru %x high delivered %x received flag %d",
2375 local_received_flg = 0;
2379 if (local_received_flg == 1) {
2395 if (sq_lt_compare (memb_list[i].
aru, low_ring_aru)) {
2397 low_ring_aru = memb_list[i].
aru;
2418 "copying all old ring messages from %x-%x.",
2421 for (i = 1; i <= range; i++) {
2428 low_ring_aru + i, &ptr);
2432 sort_queue_item = ptr;
2433 messages_originated++;
2434 memset (&message_item, 0,
sizeof (
struct message_item));
2436 message_item.
mcast = totemsrp_buffer_alloc (instance);
2437 assert (message_item.
mcast);
2447 memcpy (((
char *)message_item.
mcast) + sizeof (
struct mcast),
2448 sort_queue_item->
mcast,
2453 "Originated %d messages in RECOVERY.", messages_originated);
2458 "Did not need to originate any messages in recovery.");
2468 reset_token_timeout (instance);
2469 reset_token_retransmit_timeout (instance);
2482 token_hold_cancel_send (instance);
2489 struct iovec *iovec,
2490 unsigned int iov_len,
2497 unsigned int addr_idx;
2506 if (cs_queue_is_full (queue_use)) {
2511 memset (&message_item, 0,
sizeof (
struct message_item));
2516 message_item.
mcast = totemsrp_buffer_alloc (instance);
2517 if (message_item.
mcast == 0) {
2524 memset(message_item.
mcast, 0, sizeof (
struct mcast));
2534 addr = (
char *)message_item.
mcast;
2535 addr_idx = sizeof (
struct mcast);
2536 for (i = 0; i < iov_len; i++) {
2537 memcpy (&addr[addr_idx], iovec[i].iov_base, iovec[i].iov_len);
2538 addr_idx += iovec[i].iov_len;
2541 message_item.
msg_len = addr_idx;
2545 cs_queue_item_add (queue_use, &message_item);
2567 cs_queue_avail (queue_use, &avail);
2578 static int orf_token_remcast (
2586 struct sq *sort_queue;
2594 res = sq_in_range (sort_queue, seq);
2603 res = sq_item_get (sort_queue, seq, &ptr);
2608 sort_queue_item = ptr;
2612 sort_queue_item->
mcast,
2622 static void messages_free (
2624 unsigned int token_aru)
2629 int log_release = 0;
2630 unsigned int release_to;
2631 unsigned int range = 0;
2633 release_to = token_aru;
2634 if (sq_lt_compare (instance->
my_last_aru, release_to)) {
2654 for (i = 1; i <= range; i++) {
2660 regular_message = ptr;
2661 totemsrp_buffer_release (instance, regular_message->
mcast);
2672 "releasing messages up to and including %x", release_to);
2676 static void update_aru (
2681 struct sq *sort_queue;
2683 unsigned int my_aru_saved = 0;
2693 my_aru_saved = instance->
my_aru;
2694 for (i = 1; i <= range; i++) {
2698 res = sq_item_get (sort_queue, my_aru_saved + i, &ptr);
2706 instance->
my_aru += i - 1;
2712 static int orf_token_mcast (
2715 int fcc_mcasts_allowed)
2719 struct sq *sort_queue;
2722 unsigned int fcc_mcast_current;
2727 reset_token_retransmit_timeout (instance);
2738 for (fcc_mcast_current = 0; fcc_mcast_current < fcc_mcasts_allowed; fcc_mcast_current++) {
2739 if (cs_queue_is_empty (mcast_queue)) {
2742 message_item = (
struct message_item *)cs_queue_item_get (mcast_queue);
2750 memset (&sort_queue_item, 0,
sizeof (
struct sort_queue_item));
2754 mcast = sort_queue_item.
mcast;
2761 sq_item_add (sort_queue, &sort_queue_item, message_item->
mcast->
seq);
2765 message_item->
mcast,
2771 cs_queue_item_remove (mcast_queue);
2779 update_aru (instance);
2784 return (fcc_mcast_current);
2791 static int orf_token_rtr (
2794 unsigned int *fcc_allowed)
2799 struct sq *sort_queue;
2801 unsigned int range = 0;
2802 char retransmit_msg[1024];
2811 rtr_list = &orf_token->
rtr_list[0];
2813 strcpy (retransmit_msg,
"Retransmit List: ");
2818 sprintf (value,
"%x ", rtr_list[i].seq);
2819 strcat (retransmit_msg, value);
2821 strcat (retransmit_msg,
"");
2823 "%s", retransmit_msg);
2836 if (memcmp (&rtr_list[i].ring_id, &instance->
my_ring_id,
2843 res = orf_token_remcast (instance, rtr_list[i].seq);
2850 memmove (&rtr_list[i], &rtr_list[i + 1],
2866 range = orf_token->
seq - instance->
my_aru;
2870 (i <= range); i++) {
2875 res = sq_in_range (sort_queue, instance->
my_aru + i);
2883 res = sq_item_inuse (sort_queue, instance->
my_aru + i);
2894 res = sq_item_miss_count (sort_queue, instance->
my_aru + i);
2904 if (instance->
my_aru + i == rtr_list[j].
seq) {
2934 static void timer_function_token_retransmit_timeout (
void *data)
2944 token_retransmit (instance);
2945 reset_token_retransmit_timeout (instance);
2950 static void timer_function_token_hold_retransmit_timeout (
void *data)
2961 token_retransmit (instance);
2966 static void timer_function_merge_detect_timeout(
void *data)
2975 memb_merge_detect_transmit (instance);
2988 static int token_send (
2990 struct orf_token *orf_token,
2994 unsigned int orf_token_size;
2996 orf_token_size =
sizeof (
struct orf_token) +
2997 (orf_token->rtr_list_entries *
sizeof (
struct rtr_item));
3004 if (forward_token == 0) {
3041 sizeof (
struct token_hold_cancel));
3048 struct orf_token orf_token;
3080 res = token_send (instance, &orf_token, 1);
3085 static void memb_state_commit_token_update (
3090 unsigned int high_aru;
3126 if (sq_lt_compare (high_aru, memb_list[i].
aru)) {
3127 high_aru = memb_list[i].
aru;
3137 if (sq_lt_compare (memb_list[i].
aru, high_aru)) {
3152 static void memb_state_commit_token_target_set (
3169 static int memb_state_commit_token_send_recovery (
3171 struct memb_commit_token *commit_token)
3173 unsigned int commit_token_size;
3177 commit_token_size =
sizeof (
struct memb_commit_token) +
3178 ((sizeof (struct srp_addr) +
3179 sizeof (struct memb_commit_token_memb_entry)) * commit_token->
addr_entries);
3195 reset_token_retransmit_timeout (instance);
3199 static int memb_state_commit_token_send (
3202 unsigned int commit_token_size;
3206 commit_token_size =
sizeof (
struct memb_commit_token) +
3207 ((sizeof (struct srp_addr) +
3224 reset_token_retransmit_timeout (instance);
3231 struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
3232 int token_memb_entries = 0;
3236 memb_set_subtract (token_memb, &token_memb_entries,
3244 lowest_addr = &token_memb[0].
addr[0];
3245 for (i = 1; i < token_memb_entries; i++) {
3253 static int srp_addr_compare (
const void *a,
const void *b)
3261 static void memb_state_commit_token_create (
3264 struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
3267 int token_memb_entries = 0;
3270 "Creating commit token because I am the rep.");
3272 memb_set_subtract (token_memb, &token_memb_entries,
3276 memset (instance->
commit_token, 0, sizeof (
struct memb_commit_token));
3291 qsort (token_memb, token_memb_entries,
sizeof (
struct srp_addr),
3300 memcpy (addr, token_memb,
3301 token_memb_entries *
sizeof (
struct srp_addr));
3302 memset (memb_list, 0,
3308 char memb_join_data[40000];
3311 unsigned int addr_idx;
3320 msg_len =
sizeof(
struct memb_join) +
3321 ((instance->my_proc_list_entries + instance->my_failed_list_entries) *
sizeof(
struct srp_addr));
3323 if (msg_len >
sizeof(memb_join_data)) {
3324 log_printf (instance->totemsrp_log_level_error,
3325 "memb_join_message too long. Ignoring message.");
3330 memb_join->
ring_seq = instance->my_ring_id.seq;
3333 srp_addr_copy (&memb_join->
system_from, &instance->my_id);
3339 addr = (
char *)memb_join;
3340 addr_idx =
sizeof (
struct memb_join);
3341 memcpy (&addr[addr_idx],
3342 instance->my_proc_list,
3343 instance->my_proc_list_entries *
3346 instance->my_proc_list_entries *
3348 memcpy (&addr[addr_idx],
3349 instance->my_failed_list,
3350 instance->my_failed_list_entries *
3353 instance->my_failed_list_entries *
3356 if (instance->totem_config->send_join_timeout) {
3357 usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3360 instance->stats.memb_join_tx++;
3363 instance->totemrrp_context,
3370 char memb_join_data[40000];
3373 unsigned int addr_idx;
3374 int active_memb_entries;
3375 struct srp_addr active_memb[PROCESSOR_COUNT_MAX];
3379 "sending join/leave message");
3386 &instance->
my_id, 1,
3389 memb_set_subtract (active_memb, &active_memb_entries,
3391 &instance->
my_id, 1);
3393 msg_len =
sizeof(
struct memb_join) +
3394 ((active_memb_entries + instance->my_failed_list_entries) *
sizeof(
struct srp_addr));
3396 if (msg_len >
sizeof(memb_join_data)) {
3397 log_printf (instance->totemsrp_log_level_error,
3398 "memb_leave message too long. Ignoring message.");
3408 memb_join->
ring_seq = instance->my_ring_id.seq;
3411 srp_addr_copy (&memb_join->
system_from, &instance->my_id);
3419 addr = (
char *)memb_join;
3420 addr_idx =
sizeof (
struct memb_join);
3421 memcpy (&addr[addr_idx],
3423 active_memb_entries *
3426 active_memb_entries *
3428 memcpy (&addr[addr_idx],
3429 instance->my_failed_list,
3430 instance->my_failed_list_entries *
3433 instance->my_failed_list_entries *
3437 if (instance->totem_config->send_join_timeout) {
3438 usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3440 instance->stats.memb_join_tx++;
3443 instance->totemrrp_context,
3464 sizeof (
struct memb_merge_detect));
3467 static void memb_ring_id_set (
3486 token_hold_cancel_send (instance);
3489 if (callback_handle == 0) {
3492 *handle_out = (
void *)callback_handle;
3493 list_init (&callback_handle->
list);
3495 callback_handle->
data = (
void *) data;
3497 callback_handle->
delete =
delete;
3516 list_del (&h->
list);
3523 static void token_callbacks_execute (
3529 struct list_head *callback_listhead = 0;
3545 for (list = callback_listhead->
next; list != callback_listhead;
3548 token_callback_instance =
list_entry (list,
struct token_callback_instance, list);
3550 list_next = list->
next;
3551 del = token_callback_instance->
delete;
3558 token_callback_instance->
data);
3562 if (res == -1 && del == 1) {
3563 list_add (list, callback_listhead);
3565 free (token_callback_instance);
3589 if (queue_use != NULL) {
3590 backlog = cs_queue_used (queue_use);
3597 static int fcc_calculate (
3599 struct orf_token *token)
3601 unsigned int transmits_allowed;
3602 unsigned int backlog_calc;
3610 instance->
my_cbl = backlog_get (instance);
3619 if (backlog_calc > 0 && transmits_allowed > backlog_calc) {
3620 transmits_allowed = backlog_calc;
3624 return (transmits_allowed);
3630 static void fcc_rtr_limit (
3632 struct orf_token *token,
3633 unsigned int *transmits_allowed)
3637 assert (check >= 0);
3644 *transmits_allowed = 0;
3648 static void fcc_token_update (
3650 struct orf_token *token,
3651 unsigned int msgs_transmitted)
3653 token->
fcc += msgs_transmitted - instance->
my_trc;
3655 instance->
my_trc = msgs_transmitted;
3662 static int check_totemip_sanity(
3665 int endian_conversion_needed)
3670 if (endian_conversion_needed) {
3674 if (family != AF_INET && family != AF_INET6) {
3676 "Received message corrupted... ignoring.");
3684 static int check_srpaddr_sanity(
3687 int endian_conversion_needed)
3695 for (i = 0; i < addr->
no_addrs; i++) {
3697 if (check_totemip_sanity(instance, &addr->
addr[i], endian_conversion_needed) == -1) {
3706 static int check_orf_token_sanity(
3710 int endian_conversion_needed)
3713 const struct orf_token *token = (
const struct orf_token *)msg;
3714 size_t required_len;
3717 if (msg_len <
sizeof(
struct orf_token)) {
3719 "Received orf_token message is too short... ignoring.");
3724 if (check_totemip_sanity(instance, &token->
ring_id.
rep, endian_conversion_needed) == -1) {
3728 if (endian_conversion_needed) {
3734 required_len =
sizeof(
struct orf_token) + rtr_entries *
sizeof(
struct rtr_item);
3735 if (msg_len < required_len) {
3737 "Received orf_token message is too short... ignoring.");
3742 for (i = 0; i < rtr_entries; i++) {
3744 endian_conversion_needed) == -1) {
3752 static int check_mcast_sanity(
3756 int endian_conversion_needed)
3758 const struct mcast *mcast_msg = (
const struct mcast *)msg;
3760 if (msg_len <
sizeof(
struct mcast)) {
3762 "Received mcast message is too short... ignoring.");
3767 if ((check_totemip_sanity(instance, &mcast_msg->
ring_id.
rep, endian_conversion_needed) == -1) ||
3768 (check_srpaddr_sanity(instance, &mcast_msg->
system_from, endian_conversion_needed) == -1)) {
3775 static int check_memb_merge_detect_sanity(
3779 int endian_conversion_needed)
3785 "Received memb_merge_detect message is too short... ignoring.");
3790 if ((check_totemip_sanity(instance, &mmd_msg->
ring_id.
rep, endian_conversion_needed) == -1) ||
3791 (check_srpaddr_sanity(instance, &mmd_msg->
system_from, endian_conversion_needed) == -1)) {
3798 static int check_memb_join_sanity(
3802 int endian_conversion_needed)
3807 size_t required_len;
3809 const struct srp_addr *failed_list;
3812 if (msg_len <
sizeof(
struct memb_join)) {
3814 "Received memb_join message is too short... ignoring.");
3819 if (check_srpaddr_sanity(instance, &mj_msg->
system_from, endian_conversion_needed) == -1) {
3826 if (endian_conversion_needed) {
3827 proc_list_entries =
swab32(proc_list_entries);
3828 failed_list_entries =
swab32(failed_list_entries);
3831 required_len =
sizeof(
struct memb_join) + ((proc_list_entries + failed_list_entries) *
sizeof(
struct srp_addr));
3832 if (msg_len < required_len) {
3834 "Received memb_join message is too short... ignoring.");
3840 failed_list = proc_list + proc_list_entries;
3843 if (check_srpaddr_sanity(instance, &proc_list[i], endian_conversion_needed) == -1) {
3849 if (check_srpaddr_sanity(instance, &failed_list[i], endian_conversion_needed) == -1) {
3857 static int check_memb_commit_token_sanity(
3861 int endian_conversion_needed)
3863 const struct memb_commit_token *mct_msg = (
const struct memb_commit_token *)msg;
3867 size_t required_len;
3870 if (msg_len <
sizeof(
struct memb_commit_token)) {
3872 "Received memb_commit_token message is too short... ignoring.");
3877 if (check_totemip_sanity(instance, &mct_msg->
ring_id.
rep, endian_conversion_needed) == -1) {
3882 if (endian_conversion_needed) {
3883 addr_entries =
swab32(addr_entries);
3886 required_len =
sizeof(
struct memb_commit_token) +
3887 (addr_entries * (
sizeof(
struct srp_addr) + sizeof(struct memb_commit_token_memb_entry)));
3888 if (msg_len < required_len) {
3890 "Received memb_commit_token message is too short... ignoring.");
3896 memb_list = (
const struct memb_commit_token_memb_entry *)(addr +
addr_entries);
3899 if (check_srpaddr_sanity(instance, &addr[i], endian_conversion_needed) == -1) {
3903 if (memb_list[i].ring_id.
rep.
family != 0) {
3904 if (check_totemip_sanity(instance, &memb_list[i].ring_id.
rep,
3905 endian_conversion_needed) == -1) {
3914 static int check_token_hold_cancel_sanity(
3918 int endian_conversion_needed)
3924 "Received token_hold_cancel message is too short... ignoring.");
3929 if (check_totemip_sanity(instance, &thc_msg->
ring_id.
rep, endian_conversion_needed) == -1) {
3944 static int message_handler_orf_token (
3948 int endian_conversion_needed)
3950 char token_storage[1500];
3951 char token_convert[1500];
3952 struct orf_token *token = NULL;
3954 unsigned int transmits_allowed;
3955 unsigned int mcasted_retransmit;
3956 unsigned int mcasted_regular;
3957 unsigned int last_aru;
3960 unsigned long long tv_current;
3961 unsigned long long tv_diff;
3963 tv_current = qb_util_nano_current_get ();
3964 tv_diff = tv_current -
tv_old;
3965 tv_old = tv_current;
3968 "Time since last token %0.4f ms", ((
float)tv_diff) / 1000000.0);
3971 if (check_orf_token_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
3978 #ifdef TEST_DROP_ORF_TOKEN_PERCENTAGE 3979 if (random()%100 < TEST_DROP_ORF_TOKEN_PERCENTAGE) {
3984 if (endian_conversion_needed) {
3985 orf_token_endian_convert ((
struct orf_token *)msg,
3986 (
struct orf_token *)token_convert);
3987 msg = (
struct orf_token *)token_convert;
3994 token = (
struct orf_token *)token_storage;
3995 memcpy (token, msg,
sizeof (
struct orf_token));
3996 memcpy (&token->
rtr_list[0], (
char *)msg + sizeof (
struct orf_token),
4004 start_merge_detect_timeout (instance);
4007 cancel_merge_detect_timeout (instance);
4008 cancel_token_hold_retransmit_timeout (instance);
4014 #ifdef TEST_RECOVERY_MSG_COUNT 4055 messages_free (instance, token->
aru);
4074 reset_heartbeat_timeout(instance);
4077 cancel_heartbeat_timeout(instance);
4092 transmits_allowed = fcc_calculate (instance, token);
4093 mcasted_retransmit = orf_token_rtr (instance, token, &transmits_allowed);
4101 fcc_rtr_limit (instance, token, &transmits_allowed);
4102 mcasted_regular = orf_token_mcast (instance, token, transmits_allowed);
4109 fcc_token_update (instance, token, mcasted_retransmit +
4112 if (sq_lt_compare (instance->
my_aru, token->
aru) ||
4117 if (token->
aru == token->
seq) {
4123 if (token->
aru == last_aru && token->
aru_addr != 0) {
4138 "FAILED TO RECEIVE");
4142 memb_set_merge (&instance->
my_id, 1,
4169 "token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, aru %x",
4182 "install seq %x aru %x high seq received %x",
4200 "retrans flag count %x token aru %x install seq %x aru %x %x",
4204 memb_state_operational_enter (instance);
4211 token_send (instance, token, forward_token);
4214 tv_current = qb_util_nano_current_get ();
4215 tv_diff = tv_current -
tv_old;
4216 tv_old = tv_current;
4219 ((
float)tv_diff) / 1000000.0);
4222 messages_deliver_to_app (instance, 0,
4230 reset_token_timeout (instance);
4231 reset_token_retransmit_timeout (instance);
4235 start_token_hold_retransmit_timeout (instance);
4245 reset_heartbeat_timeout(instance);
4248 cancel_heartbeat_timeout(instance);
4254 static void messages_deliver_to_app (
4257 unsigned int end_point)
4262 struct mcast *mcast_in;
4263 struct mcast mcast_header;
4264 unsigned int range = 0;
4265 int endian_conversion_required;
4266 unsigned int my_high_delivered_stored = 0;
4282 for (i = 1; i <= range; i++) {
4290 my_high_delivered_stored + i);
4296 my_high_delivered_stored + i, &ptr);
4300 if (res != 0 && skip == 0) {
4311 sort_queue_item_p = ptr;
4313 mcast_in = sort_queue_item_p->
mcast;
4314 assert (mcast_in != (
struct mcast *)0xdeadbeef);
4316 endian_conversion_required = 0;
4318 endian_conversion_required = 1;
4319 mcast_endian_convert (mcast_in, &mcast_header);
4321 memcpy (&mcast_header, mcast_in,
sizeof (
struct mcast));
4342 "Delivering MCAST message with seq %x to pending delivery queue",
4350 ((
char *)sort_queue_item_p->
mcast) + sizeof (
struct mcast),
4352 endian_conversion_required);
4359 static int message_handler_mcast (
4363 int endian_conversion_needed)
4366 struct sq *sort_queue;
4367 struct mcast mcast_header;
4369 if (check_mcast_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4373 if (endian_conversion_needed) {
4374 mcast_endian_convert (msg, &mcast_header);
4376 memcpy (&mcast_header, msg,
sizeof (
struct mcast));
4387 #ifdef TEST_DROP_MCAST_PERCENTAGE 4388 if (random()%100 < TEST_DROP_MCAST_PERCENTAGE) {
4408 if (!memb_set_subset (
4435 "Received ringid(%s:%lld) seq %x",
4445 sq_in_range (sort_queue, mcast_header.
seq) &&
4446 sq_item_inuse (sort_queue, mcast_header.
seq) == 0) {
4452 sort_queue_item.
mcast = totemsrp_buffer_alloc (instance);
4453 if (sort_queue_item.
mcast == NULL) {
4456 memcpy (sort_queue_item.
mcast, msg, msg_len);
4457 sort_queue_item.
msg_len = msg_len;
4460 mcast_header.
seq)) {
4464 sq_item_add (sort_queue, &sort_queue_item, mcast_header.
seq);
4467 update_aru (instance);
4476 static int message_handler_memb_merge_detect (
4480 int endian_conversion_needed)
4484 if (check_memb_merge_detect_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4488 if (endian_conversion_needed) {
4489 memb_merge_detect_endian_convert (msg, &memb_merge_detect);
4491 memcpy (&memb_merge_detect, msg,
4492 sizeof (
struct memb_merge_detect));
4509 memb_set_merge (&memb_merge_detect.
system_from, 1,
4515 if (!memb_set_subset (
4521 memb_set_merge (&memb_merge_detect.
system_from, 1,
4539 static void memb_join_process (
4545 int gather_entered = 0;
4546 int fail_minus_memb_entries = 0;
4547 struct srp_addr fail_minus_memb[PROCESSOR_COUNT_MAX];
4563 "Discarding LEAVE message during flush, nodeid=%u",
4570 "Discarding JOIN message during flush, nodeid=%d", memb_join->
header.
nodeid);
4585 if (memb_set_equal (proc_list,
4590 memb_set_equal (failed_list,
4595 memb_consensus_set (instance, &memb_join->
system_from);
4597 if (memb_consensus_agreed (instance) && instance->
failed_to_recv == 1) {
4604 memb_state_commit_token_create (instance);
4606 memb_state_commit_enter (instance);
4609 if (memb_consensus_agreed (instance) &&
4610 memb_lowest_in_config (instance)) {
4612 memb_state_commit_token_create (instance);
4614 memb_state_commit_enter (instance);
4619 if (memb_set_subset (proc_list,
4624 memb_set_subset (failed_list,
4636 memb_set_merge (proc_list,
4640 if (memb_set_subset (
4641 &instance->
my_id, 1,
4648 if (memb_set_subset (
4653 if (memb_set_subset (
4658 memb_set_merge (failed_list,
4662 memb_set_subtract (fail_minus_memb,
4663 &fail_minus_memb_entries,
4669 memb_set_merge (fail_minus_memb,
4670 fail_minus_memb_entries,
4681 if (gather_entered == 0 &&
4688 static void memb_join_endian_convert (
const struct memb_join *in,
struct memb_join *out)
4710 srp_addr_copy_endian_convert (&out_proc_list[i], &in_proc_list[i]);
4713 srp_addr_copy_endian_convert (&out_failed_list[i], &in_failed_list[i]);
4717 static void memb_commit_token_endian_convert (
const struct memb_commit_token *in,
struct memb_commit_token *out)
4738 srp_addr_copy_endian_convert (&out_addr[i], &in_addr[i]);
4743 if (in_memb_list[i].ring_id.
rep.
family != 0) {
4756 static void orf_token_endian_convert (
const struct orf_token *in,
struct orf_token *out)
4780 static void mcast_endian_convert (
const struct mcast *in,
struct mcast *out)
4796 static void memb_merge_detect_endian_convert (
4808 static int ignore_join_under_operational (
4810 const struct memb_join *memb_join)
4820 if (memb_set_subset (&instance->
my_id, 1,
4838 static int message_handler_memb_join (
4842 int endian_conversion_needed)
4844 const struct memb_join *memb_join;
4845 struct memb_join *memb_join_convert = alloca (msg_len);
4847 if (check_memb_join_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4851 if (endian_conversion_needed) {
4852 memb_join = memb_join_convert;
4853 memb_join_endian_convert (msg, memb_join_convert);
4863 if (pause_flush (instance)) {
4872 if (!ignore_join_under_operational (instance, memb_join)) {
4873 memb_join_process (instance, memb_join);
4878 memb_join_process (instance, memb_join);
4889 memb_join_process (instance, memb_join);
4902 memb_join_process (instance, memb_join);
4903 memb_recovery_state_token_loss (instance);
4911 static int message_handler_memb_commit_token (
4915 int endian_conversion_needed)
4917 struct memb_commit_token *memb_commit_token_convert = alloca (msg_len);
4918 struct memb_commit_token *memb_commit_token;
4919 struct srp_addr sub[PROCESSOR_COUNT_MAX];
4925 "got commit token");
4927 if (check_memb_commit_token_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4931 if (endian_conversion_needed) {
4932 memb_commit_token_endian_convert (msg, memb_commit_token_convert);
4934 memcpy (memb_commit_token_convert, msg, msg_len);
4936 memb_commit_token = memb_commit_token_convert;
4939 #ifdef TEST_DROP_COMMIT_TOKEN_PERCENTAGE
4940 if (random()%100 < TEST_DROP_COMMIT_TOKEN_PERCENTAGE) {
4950 memb_set_subtract (sub, &sub_entries,
4954 if (memb_set_equal (addr,
4960 memcpy (instance->
commit_token, memb_commit_token, msg_len);
4961 memb_state_commit_enter (instance);
4975 memb_state_recovery_enter (instance, memb_commit_token);
4990 "Sending initial ORF token");
4993 orf_token_send_initial (instance);
4994 reset_token_timeout (instance);
4995 reset_token_retransmit_timeout (instance);
5002 static int message_handler_token_hold_cancel (
5006 int endian_conversion_needed)
5010 if (check_token_hold_cancel_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
5019 timer_function_token_retransmit_timeout (instance);
5028 unsigned int msg_len)
5033 if (msg_len <
sizeof (
struct message_header)) {
5035 "Received message is too short... ignoring %u.",
5036 (
unsigned int)msg_len);
5041 switch (message_header->
type) {
5062 printf (
"wrong message type\n");
5079 unsigned int iface_no)
5095 "Created or loaded sequence id %llx.%s for this ring.",
5122 void (*totem_service_ready) (
void))
void(* totemsrp_service_ready_fn)(void)
void(* totemsrp_deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required)
void(*) enum memb_stat memb_state)
int totemrrp_iface_check(void *rrp_context)
void main_iface_change_fn(void *context, const struct totem_ip_address *iface_address, unsigned int iface_no)
void totemip_copy_endian_convert(struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
struct srp_addr system_from
struct memb_ring_id ring_id
uint32_t waiting_trans_ack
struct srp_addr system_from
struct memb_ring_id ring_id
int totemsrp_log_level_debug
struct memb_ring_id my_ring_id
Totem Single Ring Protocol.
uint64_t memb_commit_token_rx
int my_leave_memb_entries
struct message_header header
unsigned int old_ring_state_high_seq_received
unsigned int proc_list_entries
struct totem_interface * interfaces
unsigned int interface_count
int totemsrp_my_family_get(void *srp_context)
void(* totemsrp_confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)
The totem_ip_address struct.
totemsrp_token_stats_t token[TOTEM_TOKEN_STATS_MAX]
const char * totemip_print(const struct totem_ip_address *addr)
int totemsrp_log_level_error
#define LEAVE_DUMMY_NODEID
struct memb_ring_id ring_id
qb_loop_timer_handle timer_heartbeat_timeout
unsigned int failed_list_entries
unsigned char end_of_memb_join[0]
unsigned long long int tv_old
#define SEQNO_START_TOKEN
void(* memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, const struct totem_ip_address *addr)
unsigned int token_hold_timeout
struct memb_ring_id ring_id
struct totem_ip_address member_list[PROCESSOR_COUNT_MAX]
int totemip_compare(const void *a, const void *b)
int totemsrp_member_add(void *context, const struct totem_ip_address *member, int ring_no)
void * token_sent_event_handle
struct srp_addr system_from
totem_configuration_type
The totem_configuration_type enum.
int totemsrp_log_level_notice
unsigned int proc_list_entries
unsigned int totemsrp_my_nodeid_get(void *srp_context)
char rrp_mode[TOTEM_RRP_MODE_BYTES]
void totemsrp_net_mtu_adjust(struct totem_config *totem_config)
int totemsrp_log_level_warning
int totemsrp_crypto_set(void *srp_context, const char *cipher_type, const char *hash_type)
void totemrrp_membership_changed(void *rrp_context, enum totem_configuration_type configuration_type, const struct srp_addr *member_list, size_t member_list_entries, const struct srp_addr *left_list, size_t left_list_entries, const struct srp_addr *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)
struct message_header header
uint64_t memb_merge_detect_rx
int totemsrp_ifaces_get(void *srp_context, unsigned int nodeid, struct totem_ip_address *interfaces, unsigned int interfaces_size, char ***status, unsigned int *iface_count)
struct cs_queue new_message_queue_trans
struct message_header header
unsigned char end_of_commit_token[0]
char commit_token_storage[40000]
unsigned int rrp_problem_count_timeout
struct list_head token_callback_sent_listhead
struct cs_queue new_message_queue
struct srp_addr my_deliver_memb_list[PROCESSOR_COUNT_MAX]
void totemsrp_callback_token_destroy(void *srp_context, void **handle_out)
uint64_t gather_token_lost
int totemsrp_log_level_trace
void totemip_copy(struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
int totemrrp_ifaces_get(void *rrp_context, char ***status, unsigned int *iface_count)
struct memb_ring_id my_old_ring_id
void * totemrrp_buffer_alloc(void *rrp_context)
unsigned int downcheck_timeout
struct srp_addr my_new_memb_list[PROCESSOR_COUNT_MAX]
uint64_t memb_commit_token_tx
int my_deliver_memb_entries
unsigned int max_network_delay
unsigned int heartbeat_failures_allowed
#define TOTEM_TOKEN_STATS_MAX
#define swab64(x)
The swab64 macro.
struct message_item __attribute__
unsigned long long token_ring_id_seq
struct totem_ip_address mcast_address
int totemsrp_callback_token_create(void *srp_context, void **handle_out, enum totem_callback_token_type type, int delete, int(*callback_fn)(enum totem_callback_token_type type, const void *), const void *data)
unsigned int send_join_timeout
void totemsrp_service_ready_register(void *context, void(*totem_service_ready)(void))
unsigned int rrp_problem_count_threshold
struct srp_addr my_memb_list[PROCESSOR_COUNT_MAX]
uint64_t operational_entered
void(*) in log_level_security)
unsigned long long ring_seq
int totemsrp_mcast(void *srp_context, struct iovec *iovec, unsigned int iov_len, int guarantee)
Multicast a message.
uint64_t operational_token_lost
unsigned int received_flg
uint64_t consensus_timeouts
Totem Network interface - also does encryption/decryption.
unsigned int my_high_delivered
struct message_handlers totemsrp_message_handlers
qb_loop_timer_handle memb_timer_state_gather_consensus_timeout
uint64_t recovery_token_lost
unsigned char end_of_memb_join[0]
unsigned int token_retransmits_before_loss_const
struct message_header header
int totemrrp_finalize(void *rrp_context)
struct list_head token_callback_received_listhead
int totemrrp_member_remove(void *rrp_context, const struct totem_ip_address *member, int iface_no)
struct rtr_item rtr_list[0]
int totemsrp_ring_reenable(void *srp_context)
struct memb_ring_id ring_id
unsigned int seqno_unchanged_const
uint64_t commit_token_lost
unsigned int miss_count_const
int totemrrp_crypto_set(void *rrp_context, const char *cipher_type, const char *hash_type)
uint64_t token_hold_cancel_rx
unsigned int join_timeout
uint32_t originated_orf_token
int totemrrp_send_flush(void *rrp_context)
struct message_header header
struct totem_ip_address mcast_addr
#define MESSAGE_QUEUE_MAX
int totemrrp_member_add(void *rrp_context, const struct totem_ip_address *member, int iface_no)
unsigned int received_flg
struct totem_ip_address rep
unsigned int last_released
int orf_token_retransmit_size
int totemsrp_avail(void *srp_context)
Return number of available messages that can be queued.
unsigned int rrp_autorecovery_check_timeout
#define RETRANS_MESSAGE_QUEUE_SIZE_MAX
void(* memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, const struct totem_ip_address *addr)
unsigned int fail_to_recv_const
void * token_recv_event_handle
struct totem_ip_address boundto
unsigned int my_high_seq_received
int totemrrp_initialize(qb_loop_t *poll_handle, void **rrp_context, struct totem_config *totem_config, totemsrp_stats_t *stats, void *context, void(*deliver_fn)(void *context, const void *msg, unsigned int msg_len), void(*iface_change_fn)(void *context, const struct totem_ip_address *iface_addr, unsigned int iface_no), void(*token_seqid_get)(const void *msg, unsigned int *seqid, unsigned int *token_is), unsigned int(*msgs_missing)(void), void(*target_set_completed)(void *context))
Create an instance.
qb_loop_t * totemsrp_poll_handle
qb_loop_timer_handle timer_pause_timeout
qb_loop_timer_handle timer_merge_detect_timeout
int my_merge_detect_timeout_outstanding
void(* log_printf)(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format,...) __attribute__((format(printf
int totemsrp_log_level_security
qb_loop_timer_handle timer_orf_token_retransmit_timeout
struct totem_config * totem_config
int(* callback_fn)(enum totem_callback_token_type type, const void *)
#define swab32(x)
The swab32 macro.
qb_loop_timer_handle timer_orf_token_timeout
uint32_t continuous_gather
void totemsrp_threaded_mode_enable(void *context)
int totemsrp_initialize(qb_loop_t *poll_handle, void **srp_context, struct totem_config *totem_config, totemmrp_stats_t *stats, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id), void(*waiting_trans_ack_cb_fn)(int waiting_trans_ack))
Create a protocol instance.
void totemsrp_event_signal(void *srp_context, enum totem_event_type type, int value)
int totemrrp_recv_flush(void *rrp_context)
uint32_t orf_token_discard
int my_failed_list_entries
struct srp_addr my_left_memb_list[PROCESSOR_COUNT_MAX]
uint64_t token_hold_cancel_tx
void(* totem_memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, const struct totem_ip_address *addr)
unsigned int token_timeout
unsigned int high_delivered
unsigned int consensus_timeout
void main_deliver_fn(void *context, const void *msg, unsigned int msg_len)
#define PROCESSOR_COUNT_MAX
unsigned short endian_detector
void totemrrp_buffer_release(void *rrp_context, void *ptr)
Totem Network interface - also does encryption/decryption.
char orf_token_retransmit[TOKEN_SIZE_MAX]
struct message_header header
struct sq regular_sort_queue
#define swab16(x)
The swab16 macro.
void totemsrp_finalize(void *srp_context)
#define QUEUE_RTR_ITEMS_SIZE_MAX
struct srp_addr my_failed_list[PROCESSOR_COUNT_MAX]
struct cs_queue retrans_message_queue
const char * gather_state_from_desc[]
qb_loop_timer_handle memb_timer_state_gather_join_timeout
int my_trans_memb_entries
void(* totemsrp_waiting_trans_ack_cb_fn)(int waiting_trans_ack)
uint64_t memb_merge_detect_tx
unsigned int high_delivered
struct rtr_item rtr_list[0]
int consensus_list_entries
unsigned int rrp_problem_count_mcast_threshold
int totemrrp_processor_count_set(void *rrp_context, unsigned int processor_count)
int totemrrp_mcast_noflush_send(void *rrp_context, const void *msg, unsigned int msg_len)
unsigned char end_of_commit_token[0]
uint32_t threaded_mode_enabled
enum totem_callback_token_type callback_type
int totemrrp_mcast_recv_empty(void *rrp_context)
#define list_entry(ptr, type, member)
unsigned long long ring_seq
struct totem_logging_configuration totem_logging_configuration
int totemrrp_mcast_flush_send(void *rrp_context, const void *msg, unsigned int msg_len)
struct memb_ring_id ring_id
#define log_printf(level, format, args...)
void totemsrp_trans_ack(void *context)
unsigned int max_messages
uint64_t recovery_entered
qb_loop_timer_handle memb_timer_state_commit_timeout
struct memb_commit_token * commit_token
struct consensus_list_item consensus_list[PROCESSOR_COUNT_MAX]
struct srp_addr system_from
struct srp_addr my_proc_list[PROCESSOR_COUNT_MAX]
int(* handler_functions[6])(struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed)
unsigned int merge_timeout
unsigned int use_heartbeat
struct message_header header
int totemsrp_member_remove(void *context, const struct totem_ip_address *member, int ring_no)
unsigned int token_retransmit_timeout
struct srp_addr my_trans_memb_list[PROCESSOR_COUNT_MAX]
#define RETRANSMIT_ENTRIES_MAX
void(* totemsrp_log_printf)(int level, int sybsys, const char *function, const char *file, int line, const char *format,...) __attribute__((format(printf
int totemip_equal(const struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
unsigned int my_token_seq
struct memb_ring_id ring_id
int totemrrp_ring_reenable(void *rrp_context, unsigned int iface_no)
unsigned int my_leave_memb_list[PROCESSOR_COUNT_MAX]
qb_loop_timer_handle timer_orf_token_hold_retransmit_timeout
struct totem_ip_address addr[INTERFACE_MAX]
unsigned int rrp_token_expired_timeout
struct memb_ring_id ring_id
unsigned int my_install_seq
int totemrrp_token_send(void *rrp_context, const void *msg, unsigned int msg_len)
unsigned int failed_list_entries
void(* totem_memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, const struct totem_ip_address *addr)
struct sq recovery_sort_queue
int totemrrp_token_target_set(void *rrp_context, struct totem_ip_address *addr, unsigned int iface_no)
totem_callback_token_type
The totem_callback_token_type enum.
unsigned int my_high_ring_delivered