40 #include <sys/types.h>
42 #include <sys/socket.h>
45 #include <sys/ioctl.h>
46 #include <sys/param.h>
47 #include <netinet/in.h>
48 #include <arpa/inet.h>
61 #include <qb/qbdefs.h>
62 #include <qb/qbloop.h>
77 #define MSG_NOSIGNAL 0
81 #define CFG_INTERFACE_STATUS_MAX_LEN 512
103 unsigned int link_no);
131 const char *
function,
179 static void totemknet_start_merge_detect_timeout(
182 static void totemknet_stop_merge_detect_timeout(
185 static void log_flush_messages (
193 #define knet_log_printf(level, format, args...) \
195 instance->totemknet_log_printf ( \
196 level, instance->totemknet_subsys_id, \
197 __FUNCTION__, __FILE__, __LINE__, \
198 (const char *)format, ##args); \
201 #define libknet_log_printf(level, format, args...) \
203 instance->totemknet_log_printf ( \
204 level, instance->knet_subsys_id, \
205 __FUNCTION__, "libknet.h", __LINE__, \
206 (const char *)format, ##args); \
209 #define KNET_LOGSYS_PERROR(err_num, level, fmt, args...) \
211 char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \
212 const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \
213 instance->totemknet_log_printf ( \
214 level, instance->totemknet_subsys_id, \
215 __FUNCTION__, __FILE__, __LINE__, \
216 fmt ": %s (%d)", ##args, _error_ptr, err_num); \
220 static int dst_host_filter_callback_fn(
void *private_data,
221 const unsigned char *outdata,
224 knet_node_id_t this_host_id,
225 knet_node_id_t src_host_id,
227 knet_node_id_t *dst_host_ids,
228 size_t *dst_host_ids_entries)
236 *dst_host_ids_entries = 1;
240 *dst_host_ids_entries = 0;
246 static void socket_error_callback_fn(
void *private_data,
int datafd, int8_t channel, uint8_t tx_rx,
int error,
int errorno)
251 if ((error == -1 && errorno != EAGAIN) || (error == 0)) {
252 knet_handle_remove_datafd(instance->
knet_handle, datafd);
256 static void host_change_callback_fn(
void *private_data, knet_node_id_t host_id, uint8_t reachable, uint8_t remote, uint8_t external)
264 static void pmtu_change_callback_fn(
void *private_data,
unsigned int data_mtu)
275 const char *cipher_type,
276 const char *hash_type)
282 static inline void ucast_sendmsg (
286 unsigned int msg_len)
290 struct msghdr msg_ucast;
295 iovec.iov_base = (
void *)msg;
296 iovec.iov_len = msg_len;
301 memset(&msg_ucast, 0,
sizeof(msg_ucast));
302 msg_ucast.msg_iov = (
void *)&iovec;
303 msg_ucast.msg_iovlen = 1;
304 #ifdef HAVE_MSGHDR_CONTROL
305 msg_ucast.msg_control = 0;
307 #ifdef HAVE_MSGHDR_CONTROLLEN
308 msg_ucast.msg_controllen = 0;
310 #ifdef HAVE_MSGHDR_FLAGS
311 msg_ucast.msg_flags = 0;
313 #ifdef HAVE_MSGHDR_ACCRIGHTS
314 msg_ucast.msg_accrights = NULL;
316 #ifdef HAVE_MSGHDR_ACCRIGHTSLEN
317 msg_ucast.msg_accrightslen = 0;
328 "sendmsg(ucast) failed (non-critical)");
332 static inline void mcast_sendmsg (
335 unsigned int msg_len,
340 struct msghdr msg_mcast;
343 iovec.iov_base = (
void *)msg;
344 iovec.iov_len = msg_len;
351 memset(&msg_mcast, 0,
sizeof(msg_mcast));
352 msg_mcast.msg_iov = (
void *)&iovec;
353 msg_mcast.msg_iovlen = 1;
354 #ifdef HAVE_MSGHDR_CONTROL
355 msg_mcast.msg_control = 0;
357 #ifdef HAVE_MSGHDR_CONTROLLEN
358 msg_mcast.msg_controllen = 0;
360 #ifdef HAVE_MSGHDR_FLAGS
361 msg_mcast.msg_flags = 0;
363 #ifdef HAVE_MSGHDR_ACCRIGHTS
364 msg_mcast.msg_accrights = NULL;
366 #ifdef HAVE_MSGHDR_ACCRIGHTSLEN
367 msg_mcast.msg_accrightslen = 0;
387 static int node_compare(
const void *aptr,
const void *bptr)
391 a = *(uint16_t *)aptr;
392 b = *(uint16_t *)bptr;
399 unsigned int *iface_count)
402 struct knet_link_status link_status;
403 knet_node_id_t host_list[KNET_MAX_HOST];
404 uint8_t link_list[KNET_MAX_LINK];
418 res = knet_host_get_host_list(instance->
knet_handle,
419 host_list, &num_hosts);
423 qsort(host_list, num_hosts,
sizeof(uint16_t), node_compare);
433 for (j=0; j<num_hosts; j++) {
434 res = knet_link_get_link_list(instance->
knet_handle,
435 host_list[j], link_list, &num_links);
441 for (i=0; i < num_links; i++) {
456 sizeof(link_status));
458 ptr[j] =
'0' + (link_status.enabled |
459 link_status.connected<<1 |
460 link_status.dynconnected<<2);
481 static knet_node_id_t nodes[KNET_MAX_HOST];
482 uint8_t links[KNET_MAX_LINK];
491 res = knet_host_get_host_list(instance->
knet_handle, nodes, &num_nodes);
499 for (i=0; i<num_nodes; i++) {
501 res = knet_link_get_link_list(instance->
knet_handle, nodes[i], links, &num_links);
506 for (j=0; j<num_links; j++) {
507 res = knet_link_set_enable(instance->
knet_handle, nodes[i], links[j], 0);
511 res = knet_link_clear_config(instance->
knet_handle, nodes[i], links[j]);
516 res = knet_host_remove(instance->
knet_handle, nodes[i]);
523 res = knet_handle_setfwd(instance->
knet_handle, 0);
532 totemknet_stop_merge_detect_timeout(instance);
534 log_flush_messages(instance);
539 static int log_deliver_fn (
545 char buffer[
sizeof(
struct knet_log_msg)*4];
546 char *bufptr = buffer;
550 len = read(fd, buffer,
sizeof(buffer));
552 struct knet_log_msg *msg = (
struct knet_log_msg *)bufptr;
553 switch (msg->msglevel) {
556 knet_log_get_subsystem_name(msg->subsystem),
561 knet_log_get_subsystem_name(msg->subsystem),
566 knet_log_get_subsystem_name(msg->subsystem),
571 knet_log_get_subsystem_name(msg->subsystem),
575 bufptr +=
sizeof(
struct knet_log_msg);
576 done +=
sizeof(
struct knet_log_msg);
581 static int data_deliver_fn (
587 struct msghdr msg_hdr;
588 struct iovec iov_recv;
591 int truncated_packet;
594 iov_recv.iov_len = KNET_MAX_PACKET_SIZE;
597 msg_hdr.msg_namelen =
sizeof (
struct sockaddr_storage);
598 msg_hdr.msg_iov = &iov_recv;
599 msg_hdr.msg_iovlen = 1;
600 #ifdef HAVE_MSGHDR_CONTROL
601 msg_hdr.msg_control = 0;
603 #ifdef HAVE_MSGHDR_CONTROLLEN
604 msg_hdr.msg_controllen = 0;
606 #ifdef HAVE_MSGHDR_FLAGS
607 msg_hdr.msg_flags = 0;
609 #ifdef HAVE_MSGHDR_ACCRIGHTS
610 msg_hdr.msg_accrights = NULL;
612 #ifdef HAVE_MSGHDR_ACCRIGHTSLEN
613 msg_hdr.msg_accrightslen = 0;
616 msg_len = recvmsg (fd, &msg_hdr,
MSG_NOSIGNAL | MSG_DONTWAIT);
621 truncated_packet = 0;
623 #ifdef HAVE_MSGHDR_FLAGS
624 if (msg_hdr.msg_flags & MSG_TRUNC) {
625 truncated_packet = 1;
632 if (bytes_received == KNET_MAX_PACKET_SIZE) {
633 truncated_packet = 1;
637 if (truncated_packet) {
639 "Received too big message. This may be because something bad is happening"
640 "on the network (attack?), or you tried join more nodes than corosync is"
641 "compiled with (%u) or bug in the code (bad estimation of "
658 static void timer_function_netif_check_timeout (
675 static void totemknet_refresh_config(
677 const char *key_name,
686 knet_node_id_t host_ids[KNET_MAX_HOST];
697 if (
icmap_get_uint8(
"config.totemconfig_reload_in_progress", &reloading) ==
CS_OK && reloading) {
712 err = knet_host_get_host_list(instance->
knet_handle, host_ids, &num_nodes);
717 for (i=0; i<num_nodes; i++) {
723 err = knet_link_set_ping_timers(instance->
knet_handle, host_ids[i], link_no,
730 err = knet_link_set_pong_count(instance->
knet_handle, host_ids[i], link_no,
735 err = knet_link_set_priority(instance->
knet_handle, host_ids[i], link_no,
756 totemknet_refresh_config,
762 totemknet_refresh_config,
764 &icmap_track_reload);
782 unsigned int msg_len,
785 void (*iface_change_fn) (
788 unsigned int link_no),
790 void (*mtu_changed) (
794 void (*target_set_completed) (
803 if (instance == NULL) {
807 totemknet_instance_initialize (instance);
857 fcntl(instance->
logpipes[0], F_SETFL, O_NONBLOCK);
858 fcntl(instance->
logpipes[1], F_SETFL, O_NONBLOCK);
860 #if !defined(KNET_API_VER) || (KNET_API_VER == 1)
863 #if KNET_API_VER == 2
875 res = knet_handle_enable_filter(instance->
knet_handle, instance, dst_host_filter_callback_fn);
879 res = knet_handle_enable_sock_notify(instance->
knet_handle, instance, socket_error_callback_fn);
883 res = knet_host_enable_status_change_notify(instance->
knet_handle, instance, host_change_callback_fn);
887 res = knet_handle_enable_pmtud_notify(instance->
knet_handle, instance, pmtu_change_callback_fn);
897 knet_log_printf(LOG_DEBUG,
"knet_handle_add_datafd failed: %s", strerror(errno));
903 struct knet_handle_crypto_cfg crypto_cfg;
911 res = knet_handle_crypto(instance->
knet_handle, &crypto_cfg);
920 knet_log_printf(LOG_INFO,
"kronosnet crypto initialized: %s/%s", crypto_cfg.crypto_cipher_type, crypto_cfg.crypto_hash_type);
928 instance->
link_mode = KNET_LINK_POLICY_PASSIVE;
930 instance->
link_mode = KNET_LINK_POLICY_ACTIVE;
933 instance->
link_mode = KNET_LINK_POLICY_RR;
946 POLLIN, instance, log_deliver_fn);
951 POLLIN, instance, data_deliver_fn);
959 100*QB_TIME_NS_IN_MSEC,
961 timer_function_netif_check_timeout,
962 &instance->timer_netif_check_timeout);
964 totemknet_start_merge_detect_timeout(instance);
967 totemknet_add_config_notifications(instance);
973 *knet_context = instance;
978 log_flush_messages(instance);
986 return malloc(KNET_MAX_PACKET_SIZE + 512);
1014 unsigned int msg_len)
1019 ucast_sendmsg (instance, &instance->
token_target, msg, msg_len);
1026 unsigned int msg_len)
1031 mcast_sendmsg (instance, msg, msg_len, 0);
1039 unsigned int msg_len)
1044 mcast_sendmsg (instance, msg, msg_len, 1);
1087 struct msghdr msg_hdr;
1088 struct iovec iov_recv;
1091 int msg_processed = 0;
1094 iov_recv.iov_len = KNET_MAX_PACKET_SIZE;
1097 msg_hdr.msg_namelen =
sizeof (
struct sockaddr_storage);
1098 msg_hdr.msg_iov = &iov_recv;
1099 msg_hdr.msg_iovlen = 1;
1100 #ifdef HAVE_MSGHDR_CONTROL
1101 msg_hdr.msg_control = 0;
1103 #ifdef HAVE_MSGHDR_CONTROLLEN
1104 msg_hdr.msg_controllen = 0;
1106 #ifdef HAVE_MSGHDR_FLAGS
1107 msg_hdr.msg_flags = 0;
1109 #ifdef HAVE_MSGHDR_ACCRIGHTS
1110 msg_msg_hdr.msg_accrights = NULL;
1112 #ifdef HAVE_MSGHDR_ACCRIGHTSLEN
1113 msg_msg_hdr.msg_accrightslen = 0;
1118 ufd.events = POLLIN;
1119 nfds = poll (&ufd, 1, 0);
1120 if (nfds == 1 && ufd.revents & POLLIN) {
1128 }
while (nfds == 1);
1130 return (msg_processed);
1135 unsigned short ip_port,
1136 unsigned int iface_no)
1158 int port = instance->
ip_port[link_no];
1159 struct sockaddr_storage remote_ss;
1160 struct sockaddr_storage local_ss;
1164 knet_node_id_t host_ids[KNET_MAX_HOST];
1165 size_t num_host_ids;
1183 err = knet_host_get_host_list(instance->
knet_handle, host_ids, &num_host_ids);
1188 for (i=0; i<num_host_ids; i++) {
1189 if (host_ids[i] == member->
nodeid) {
1196 if (err != 0 && errno != EEXIST) {
1212 memset(&local_ss, 0,
sizeof(local_ss));
1221 KNET_TRANSPORT_LOOPBACK,
1222 &local_ss, &remote_ss, KNET_LINK_FLAG_TRAFFICHIPRIO);
1227 &local_ss, &remote_ss, KNET_LINK_FLAG_TRAFFICHIPRIO);
1274 uint8_t link_list[KNET_MAX_LINK];
1301 res = knet_link_get_link_list(instance->
knet_handle,
1307 if (num_links == 0) {
1324 struct knet_handle_compress_cfg compress_cfg;
1332 res = knet_handle_compress(instance->
knet_handle, &compress_cfg);
1346 (void) knet_handle_clear_stats(instance->
knet_handle, KNET_CLEARSTATS_HANDLE_AND_LINK);
1351 knet_node_id_t node, uint8_t link_no,
1352 struct knet_link_status *status)
1388 struct knet_handle_stats *stats)
1398 static void timer_function_merge_detect_timeout (
1409 totemknet_start_merge_detect_timeout(instance);
1412 static void totemknet_start_merge_detect_timeout(
1421 timer_function_merge_detect_timeout,
1422 &instance->timer_merge_detect_timeout);
1426 static void totemknet_stop_merge_detect_timeout(
1445 pfd.events = POLLIN;
1448 if ((poll(&pfd, 1, 0) > 0) &&
1449 (pfd.revents & POLLIN) &&
1450 (log_deliver_fn(instance->
logpipes[0], POLLIN, instance) == 0)) {