90 #include <sys/types.h>
91 #include <sys/socket.h>
92 #include <netinet/in.h>
93 #include <arpa/inet.h>
104 #include <qb/qblist.h>
105 #include <qb/qbloop.h>
106 #include <qb/qbipcs.h>
108 #define LOGSYS_UTILS_ONLY 1
119 #if !(defined(__i386__) || defined(__x86_64__))
123 #define TOTEMPG_NEED_ALIGN 1
153 #define TOTEMPG_PACKET_SIZE (totempg_totem_config->net_mtu - \
154 sizeof (struct totempg_mcast))
161 static int mcast_packed_msg_count = 0;
163 static int totempg_reserved = 1;
165 static unsigned int totempg_size_limit;
169 static uint32_t totempg_threaded_mode = 0;
171 static void *totemsrp_context;
176 static int totempg_log_level_security;
177 static int totempg_log_level_error;
178 static int totempg_log_level_warning;
179 static int totempg_log_level_notice;
180 static int totempg_log_level_debug;
181 static int totempg_subsys_id;
182 static void (*totempg_log_printf) (
185 const char *
function,
188 const char *format, ...)
__attribute__((format(printf, 6, 7)));
205 struct qb_list_head
list;
234 static unsigned char *fragmentation_data;
236 static int fragment_size = 0;
238 static int fragment_continuation = 0;
240 static int totempg_waiting_transack = 0;
246 unsigned int msg_len,
247 int endian_conversion_required);
251 const unsigned int *member_list,
size_t member_list_entries,
252 const unsigned int *left_list,
size_t left_list_entries,
253 const unsigned int *joined_list,
size_t joined_list_entries,
261 struct qb_list_head
list;
264 static unsigned char next_fragment = 1;
266 static pthread_mutex_t totempg_mutex = PTHREAD_MUTEX_INITIALIZER;
268 static pthread_mutex_t callback_token_mutex = PTHREAD_MUTEX_INITIALIZER;
270 static pthread_mutex_t mcast_msg_mutex = PTHREAD_MUTEX_INITIALIZER;
272 #define log_printf(level, format, args...) \
274 totempg_log_printf(level, \
276 __FUNCTION__, __FILE__, __LINE__, \
280 static int msg_count_send_ok (
int msg_count);
282 static int byte_count_send_ok (
int byte_count);
284 static void totempg_waiting_trans_ack_cb (
int waiting_trans_ack)
286 log_printf(LOG_DEBUG,
"waiting_trans_ack changed to %u", waiting_trans_ack);
287 totempg_waiting_transack = waiting_trans_ack;
293 struct qb_list_head *list;
294 struct qb_list_head *active_assembly_list_inuse;
296 if (totempg_waiting_transack) {
297 active_assembly_list_inuse = &assembly_list_inuse_trans;
299 active_assembly_list_inuse = &assembly_list_inuse;
305 qb_list_for_each(list, active_assembly_list_inuse) {
316 if (qb_list_empty (&assembly_list_free) == 0) {
317 assembly = qb_list_first_entry (&assembly_list_free,
struct assembly, list);
319 qb_list_add (&
assembly->
list, active_assembly_list_inuse);
341 qb_list_add (&
assembly->
list, active_assembly_list_inuse);
352 static void assembly_deref_from_normal_and_trans (
int nodeid)
355 struct qb_list_head *list, *tmp_iter;
356 struct qb_list_head *active_assembly_list_inuse;
359 for (j = 0; j < 2; j++) {
361 active_assembly_list_inuse = &assembly_list_inuse;
363 active_assembly_list_inuse = &assembly_list_inuse_trans;
366 qb_list_for_each_safe(
list, tmp_iter, active_assembly_list_inuse) {
378 static inline void app_confchg_fn (
380 const unsigned int *member_list,
size_t member_list_entries,
381 const unsigned int *left_list,
size_t left_list_entries,
382 const unsigned int *joined_list,
size_t joined_list_entries,
387 struct qb_list_head *list;
394 for (i = 0; i < left_list_entries; i++) {
395 assembly_deref_from_normal_and_trans (left_list[i]);
398 qb_list_for_each(list, &totempg_groups_list) {
415 static inline void group_endian_convert (
419 unsigned short *group_len;
423 #ifdef TOTEMPG_NEED_ALIGN
427 if ((
size_t)msg % 4 != 0) {
428 aligned_msg = alloca(msg_len);
429 memcpy(aligned_msg, msg, msg_len);
437 group_len = (
unsigned short *)aligned_msg;
438 group_len[0] =
swab16(group_len[0]);
439 for (i = 1; i < group_len[0] + 1; i++) {
440 group_len[i] =
swab16(group_len[i]);
443 if (aligned_msg != msg) {
444 memcpy(msg, aligned_msg, msg_len);
448 static inline int group_matches (
450 unsigned int iov_len,
452 unsigned int group_b_cnt,
453 unsigned int *adjust_iovec)
455 unsigned short *group_len;
459 #ifdef TOTEMPG_NEED_ALIGN
460 struct iovec iovec_aligned = { NULL, 0 };
463 assert (iov_len == 1);
465 #ifdef TOTEMPG_NEED_ALIGN
469 if ((
size_t)iovec->iov_base % 4 != 0) {
470 iovec_aligned.iov_base = alloca(iovec->iov_len);
471 memcpy(iovec_aligned.iov_base, iovec->iov_base, iovec->iov_len);
472 iovec_aligned.iov_len = iovec->iov_len;
473 iovec = &iovec_aligned;
477 group_len = (
unsigned short *)iovec->iov_base;
478 group_name = ((
char *)iovec->iov_base) +
479 sizeof (
unsigned short) * (group_len[0] + 1);
485 *adjust_iovec =
sizeof (
unsigned short) * (group_len[0] + 1);
486 for (i = 1; i < group_len[0] + 1; i++) {
487 *adjust_iovec += group_len[i];
493 for (i = 1; i < group_len[0] + 1; i++) {
494 for (j = 0; j < group_b_cnt; j++) {
495 if ((group_len[i] == groups_b[j].group_len) &&
496 (memcmp (groups_b[j].group, group_name, group_len[i]) == 0)) {
500 group_name += group_len[i];
506 static inline void app_deliver_fn (
509 unsigned int msg_len,
510 int endian_conversion_required)
513 struct iovec stripped_iovec;
514 unsigned int adjust_iovec;
516 struct qb_list_head *list;
518 struct iovec aligned_iovec = { NULL, 0 };
520 if (endian_conversion_required) {
521 group_endian_convert (msg, msg_len);
530 #ifdef TOTEMPG_NEED_ALIGN
534 aligned_iovec.iov_base = alloca(msg_len);
535 aligned_iovec.iov_len = msg_len;
536 memcpy(aligned_iovec.iov_base, msg, msg_len);
538 aligned_iovec.iov_base = msg;
539 aligned_iovec.iov_len = msg_len;
542 iovec = &aligned_iovec;
544 qb_list_for_each(list, &totempg_groups_list) {
546 if (group_matches (iovec, 1, instance->
groups, instance->
groups_cnt, &adjust_iovec)) {
547 stripped_iovec.iov_len = iovec->iov_len - adjust_iovec;
548 stripped_iovec.iov_base = (
char *)iovec->iov_base + adjust_iovec;
554 if ((
char *)iovec->iov_base + adjust_iovec % 4 != 0) {
558 stripped_iovec.iov_base =
559 alloca (stripped_iovec.iov_len);
560 memcpy (stripped_iovec.iov_base,
561 (
char *)iovec->iov_base + adjust_iovec,
562 stripped_iovec.iov_len);
567 stripped_iovec.iov_base,
568 stripped_iovec.iov_len,
569 endian_conversion_required);
574 static void totempg_confchg_fn (
576 const unsigned int *member_list,
size_t member_list_entries,
577 const unsigned int *left_list,
size_t left_list_entries,
578 const unsigned int *joined_list,
size_t joined_list_entries,
582 app_confchg_fn (configuration_type,
583 member_list, member_list_entries,
584 left_list, left_list_entries,
585 joined_list, joined_list_entries,
589 static void totempg_deliver_fn (
592 unsigned int msg_len,
593 int endian_conversion_required)
596 unsigned short *msg_lens;
605 struct iovec iov_delv;
616 if (endian_conversion_required) {
624 memcpy (
header, msg, datasize);
628 if (endian_conversion_required) {
629 for (i = 0; i <
mcast->msg_count; i++) {
630 msg_lens[i] =
swab16 (msg_lens[i]);
663 if (
mcast->fragmented == 0 ||
mcast->fragmented == 1) {
668 iov_delv.iov_len = msg_lens[1];
676 app_deliver_fn(
nodeid, iov_delv.iov_base, iov_delv.iov_len,
677 endian_conversion_required);
681 iov_delv.iov_len = msg_lens[i + 1];
685 log_printf (LOG_DEBUG,
"fragmented continuation %u is not equal to assembly last_frag_num %u",
691 if (
mcast->fragmented == 0) {
702 if (
mcast->msg_count > 1) {
724 struct iovec iovecs[3];
726 if (totempg_threaded_mode == 1) {
727 pthread_mutex_lock (&mcast_msg_mutex);
729 if (mcast_packed_msg_count == 0) {
730 if (totempg_threaded_mode == 1) {
731 pthread_mutex_unlock (&mcast_msg_mutex);
736 if (totempg_threaded_mode == 1) {
737 pthread_mutex_unlock (&mcast_msg_mutex);
743 mcast.fragmented = 0;
749 mcast.continuation = fragment_continuation;
750 fragment_continuation = 0;
752 mcast.msg_count = mcast_packed_msg_count;
754 iovecs[0].iov_base = (
void *)&
mcast;
756 iovecs[1].iov_base = (
void *)mcast_packed_msg_lens;
757 iovecs[1].iov_len = mcast_packed_msg_count *
sizeof (
unsigned short);
758 iovecs[2].iov_base = (
void *)&fragmentation_data[0];
759 iovecs[2].iov_len = fragment_size;
762 mcast_packed_msg_count = 0;
765 if (totempg_threaded_mode == 1) {
766 pthread_mutex_unlock (&mcast_msg_mutex);
775 qb_loop_t *poll_handle,
790 if (fragmentation_data == 0) {
803 totempg_waiting_trans_ack_cb);
814 callback_token_received_fn,
821 qb_list_init (&totempg_groups_list);
829 if (totempg_threaded_mode == 1) {
830 pthread_mutex_lock (&totempg_mutex);
833 if (totempg_threaded_mode == 1) {
834 pthread_mutex_unlock (&totempg_mutex);
841 static int mcast_msg (
842 struct iovec *iovec_in,
843 unsigned int iov_len,
848 struct iovec iovecs[3];
849 struct iovec iovec[64];
852 int max_packet_size = 0;
857 if (totempg_threaded_mode == 1) {
858 pthread_mutex_lock (&mcast_msg_mutex);
865 assert (iov_len < 64);
866 for (dest = 0, src = 0; src < iov_len; src++) {
867 if (iovec_in[src].iov_len) {
868 memcpy (&iovec[dest++], &iovec_in[src],
869 sizeof (
struct iovec));
875 (
sizeof (
unsigned short) * (mcast_packed_msg_count + 1));
877 mcast_packed_msg_lens[mcast_packed_msg_count] = 0;
882 for (i = 0; i < iov_len; i++) {
883 total_size += iovec[i].iov_len;
886 if (byte_count_send_ok (total_size +
sizeof(
unsigned short) *
887 (mcast_packed_msg_count)) == 0) {
889 if (totempg_threaded_mode == 1) {
890 pthread_mutex_unlock (&mcast_msg_mutex);
896 for (i = 0; i < iov_len; ) {
897 mcast.fragmented = 0;
898 mcast.continuation = fragment_continuation;
899 copy_len = iovec[i].iov_len - copy_base;
907 if ((iovec[i].iov_len + fragment_size) <
908 (max_packet_size -
sizeof (
unsigned short))) {
910 memcpy (&fragmentation_data[fragment_size],
911 (
char *)iovec[i].iov_base + copy_base, copy_len);
912 fragment_size += copy_len;
913 mcast_packed_msg_lens[mcast_packed_msg_count] += copy_len;
924 unsigned char *data_ptr;
926 copy_len =
min(copy_len, max_packet_size - fragment_size);
927 if( copy_len == max_packet_size )
928 data_ptr = (
unsigned char *)iovec[i].iov_base + copy_base;
930 data_ptr = fragmentation_data;
933 memcpy (&fragmentation_data[fragment_size],
934 (
unsigned char *)iovec[i].iov_base + copy_base, copy_len);
935 mcast_packed_msg_lens[mcast_packed_msg_count] += copy_len;
942 if ((i < (iov_len - 1)) ||
943 ((copy_base + copy_len) < iovec[i].iov_len)) {
944 if (!next_fragment) {
947 fragment_continuation = next_fragment;
948 mcast.fragmented = next_fragment++;
949 assert(fragment_continuation != 0);
950 assert(
mcast.fragmented != 0);
952 fragment_continuation = 0;
958 mcast.msg_count = ++mcast_packed_msg_count;
959 iovecs[0].iov_base = (
void *)&
mcast;
961 iovecs[1].iov_base = (
void *)mcast_packed_msg_lens;
962 iovecs[1].iov_len = mcast_packed_msg_count *
963 sizeof(
unsigned short);
964 iovecs[2].iov_base = (
void *)data_ptr;
965 iovecs[2].iov_len = fragment_size + copy_len;
975 mcast_packed_msg_lens[0] = 0;
976 mcast_packed_msg_count = 0;
983 if ((copy_base + copy_len) == iovec[i].iov_len) {
992 copy_base += copy_len;
1002 if (mcast_packed_msg_lens[mcast_packed_msg_count]) {
1003 mcast_packed_msg_count++;
1007 if (totempg_threaded_mode == 1) {
1008 pthread_mutex_unlock (&mcast_msg_mutex);
1016 static int msg_count_send_ok (
1024 return ((avail - totempg_reserved) >
msg_count);
1027 static int byte_count_send_ok (
1040 static int send_reserve (
1052 static void send_release (
1059 #ifndef HAVE_SMALL_MEMORY_FOOTPRINT
1060 #undef MESSAGE_QUEUE_MAX
1061 #define MESSAGE_QUEUE_MAX ((4 * MESSAGE_SIZE_MAX) / totempg_totem_config->net_mtu)
1064 static uint32_t q_level_precent_used(
void)
1077 if (totempg_threaded_mode == 1) {
1078 pthread_mutex_lock (&callback_token_mutex);
1082 if (totempg_threaded_mode == 1) {
1083 pthread_mutex_unlock (&callback_token_mutex);
1091 if (totempg_threaded_mode == 1) {
1092 pthread_mutex_lock (&callback_token_mutex);
1095 if (totempg_threaded_mode == 1) {
1096 pthread_mutex_unlock (&callback_token_mutex);
1105 void **totempg_groups_instance,
1107 void (*deliver_fn) (
1110 unsigned int msg_len,
1111 int endian_conversion_required),
1113 void (*confchg_fn) (
1115 const unsigned int *member_list,
size_t member_list_entries,
1116 const unsigned int *left_list,
size_t left_list_entries,
1117 const unsigned int *joined_list,
size_t joined_list_entries,
1122 if (totempg_threaded_mode == 1) {
1123 pthread_mutex_lock (&totempg_mutex);
1127 if (instance == NULL) {
1135 instance->
q_level = QB_LOOP_MED;
1136 qb_list_init (&instance->
list);
1137 qb_list_add (&instance->
list, &totempg_groups_list);
1139 if (totempg_threaded_mode == 1) {
1140 pthread_mutex_unlock (&totempg_mutex);
1142 *totempg_groups_instance = instance;
1146 if (totempg_threaded_mode == 1) {
1147 pthread_mutex_unlock (&totempg_mutex);
1153 void *totempg_groups_instance,
1161 if (totempg_threaded_mode == 1) {
1162 pthread_mutex_lock (&totempg_mutex);
1165 new_groups = realloc (instance->
groups,
1168 if (new_groups == 0) {
1174 instance->
groups = new_groups;
1178 if (totempg_threaded_mode == 1) {
1179 pthread_mutex_unlock (&totempg_mutex);
1185 void *totempg_groups_instance,
1189 if (totempg_threaded_mode == 1) {
1190 pthread_mutex_lock (&totempg_mutex);
1193 if (totempg_threaded_mode == 1) {
1194 pthread_mutex_unlock (&totempg_mutex);
1199 #define MAX_IOVECS_FROM_APP 32
1200 #define MAX_GROUPS_PER_MSG 32
1203 void *totempg_groups_instance,
1204 const struct iovec *iovec,
1205 unsigned int iov_len,
1214 if (totempg_threaded_mode == 1) {
1215 pthread_mutex_lock (&totempg_mutex);
1225 iovec_mcast[i + 1].iov_base = (
void *) instance->
groups[i].
group;
1227 iovec_mcast[0].iov_len = (instance->
groups_cnt + 1) *
sizeof (
unsigned short);
1228 iovec_mcast[0].iov_base = group_len;
1229 for (i = 0; i < iov_len; i++) {
1230 iovec_mcast[i + instance->
groups_cnt + 1].iov_len = iovec[i].iov_len;
1231 iovec_mcast[i + instance->
groups_cnt + 1].iov_base = iovec[i].iov_base;
1236 if (totempg_threaded_mode == 1) {
1237 pthread_mutex_unlock (&totempg_mutex);
1243 static void check_q_level(
1244 void *totempg_groups_instance)
1247 int32_t old_level = instance->
q_level;
1248 int32_t percent_used = q_level_precent_used();
1259 if (totem_queue_level_changed && old_level != instance->
q_level) {
1260 totem_queue_level_changed(instance->
q_level);
1265 void *totempg_groups_instance)
1269 check_q_level(instance);
1273 void *totempg_groups_instance,
1274 const struct iovec *iovec,
1275 unsigned int iov_len)
1278 unsigned int size = 0;
1280 unsigned int reserved = 0;
1282 if (totempg_threaded_mode == 1) {
1283 pthread_mutex_lock (&totempg_mutex);
1284 pthread_mutex_lock (&mcast_msg_mutex);
1290 for (i = 0; i < iov_len; i++) {
1291 size += iovec[i].iov_len;
1294 if (size >= totempg_size_limit) {
1299 if (byte_count_send_ok (size)) {
1300 reserved = send_reserve (size);
1306 check_q_level(instance);
1308 if (totempg_threaded_mode == 1) {
1309 pthread_mutex_unlock (&mcast_msg_mutex);
1310 pthread_mutex_unlock (&totempg_mutex);
1318 if (totempg_threaded_mode == 1) {
1319 pthread_mutex_lock (&totempg_mutex);
1320 pthread_mutex_lock (&mcast_msg_mutex);
1322 send_release (msg_count);
1323 if (totempg_threaded_mode == 1) {
1324 pthread_mutex_unlock (&mcast_msg_mutex);
1325 pthread_mutex_unlock (&totempg_mutex);
1331 void *totempg_groups_instance,
1335 const struct iovec *iovec,
1336 unsigned int iov_len)
1343 if (totempg_threaded_mode == 1) {
1344 pthread_mutex_lock (&totempg_mutex);
1350 group_len[0] = groups_cnt;
1351 for (i = 0; i < groups_cnt; i++) {
1353 iovec_mcast[i + 1].iov_len = groups[i].
group_len;
1354 iovec_mcast[i + 1].iov_base = (
void *) groups[i].group;
1356 iovec_mcast[0].iov_len = (groups_cnt + 1) *
sizeof (
unsigned short);
1357 iovec_mcast[0].iov_base = group_len;
1358 for (i = 0; i < iov_len; i++) {
1359 iovec_mcast[i + groups_cnt + 1].iov_len = iovec[i].iov_len;
1360 iovec_mcast[i + groups_cnt + 1].iov_base = iovec[i].iov_base;
1363 res = mcast_msg (iovec_mcast, iov_len + groups_cnt + 1,
guarantee);
1365 if (totempg_threaded_mode == 1) {
1366 pthread_mutex_unlock (&totempg_mutex);
1375 void *totempg_groups_instance,
1378 const struct iovec *iovec,
1379 unsigned int iov_len)
1381 unsigned int size = 0;
1385 if (totempg_threaded_mode == 1) {
1386 pthread_mutex_lock (&totempg_mutex);
1389 for (i = 0; i < groups_cnt; i++) {
1392 for (i = 0; i < iov_len; i++) {
1393 size += iovec[i].iov_len;
1396 res = msg_count_send_ok (size);
1398 if (totempg_threaded_mode == 1) {
1399 pthread_mutex_unlock (&totempg_mutex);
1406 unsigned short ip_port,
1407 unsigned int iface_no)
1422 unsigned int *interface_id,
1424 unsigned int interfaces_size,
1426 unsigned int *iface_count)
1449 return &totempg_stats;
1453 const char *cipher_type,
1454 const char *hash_type)
1463 #define ONE_IFACE_LEN 63
1469 unsigned int iface_count;
1474 iface_string[0] =
'\0';
1478 return (
"no interface found for nodeid");
1483 for (i = 0; i < iface_count; i++) {
1484 if (!interfaces[i].
family) {
1490 strcat (iface_string, one_iface);
1492 return (iface_string);
1505 void (*totem_service_ready) (
void))
1512 totem_queue_level_changed = fn;
1545 totempg_threaded_mode = 1;