41 #include <sys/types.h> 43 #include <sys/socket.h> 46 #include <sys/ioctl.h> 47 #include <sys/param.h> 48 #include <netinet/in.h> 49 #include <arpa/inet.h> 65 #include <qb/qbdefs.h> 66 #include <qb/qbloop.h> 67 #define LOGSYS_UTILS_ONLY 1 80 #define MSG_NOSIGNAL 0 83 #define MCAST_SOCKET_BUFFER_SIZE (TRANSMITS_ALLOWED * FRAME_SIZE_MAX) 84 #define NETIF_STATE_REPORT_UP 1 85 #define NETIF_STATE_REPORT_DOWN 2 87 #define BIND_STATE_UNBOUND 0 88 #define BIND_STATE_REGULAR 1 89 #define BIND_STATE_LOOPBACK 2 91 #define MESSAGE_TYPE_MEMB_JOIN 3 119 void (*totemudp_deliver_fn) (
122 unsigned int msg_len);
124 void (*totemudp_iface_change_fn) (
128 void (*totemudp_target_set_completed) (
void *context);
145 void (*totemudp_log_printf) (
148 const char *
function,
160 struct iovec totemudp_iov_recv;
162 struct iovec totemudp_iov_recv_flush;
178 struct timeval stats_tv_start;
203 static int totemudp_build_sockets (
231 #define log_printf(level, format, args...) \ 233 instance->totemudp_log_printf ( \ 234 level, instance->totemudp_subsys_id, \ 235 __FUNCTION__, __FILE__, __LINE__, \ 236 (const char *)format, ##args); \ 239 #define LOGSYS_PERROR(err_num, level, fmt, args...) \ 241 char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \ 242 const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \ 243 instance->totemudp_log_printf ( \ 244 level, instance->totemudp_subsys_id, \ 245 __FUNCTION__, __FILE__, __LINE__, \ 246 fmt ": %s (%d)\n", ##args, _error_ptr, err_num); \ 251 const char *cipher_type,
252 const char *hash_type)
259 static inline void ucast_sendmsg (
263 unsigned int msg_len)
265 struct msghdr msg_ucast;
269 struct sockaddr_storage sockaddr;
284 (
const unsigned char *)msg,
287 &buf_out_len) != 0) {
292 iovec.iov_base = (
void *)buf_out;
293 iovec.iov_len = buf_out_len;
298 memset(&msg_ucast, 0,
sizeof(msg_ucast));
301 msg_ucast.msg_name = &sockaddr;
302 msg_ucast.msg_namelen = addrlen;
303 msg_ucast.msg_iov = (
void *)&iovec;
304 msg_ucast.msg_iovlen = 1;
305 #ifdef HAVE_MSGHDR_CONTROL 306 msg_ucast.msg_control = 0;
308 #ifdef HAVE_MSGHDR_CONTROLLEN 309 msg_ucast.msg_controllen = 0;
311 #ifdef HAVE_MSGHDR_FLAGS 312 msg_ucast.msg_flags = 0;
314 #ifdef HAVE_MSGHDR_ACCRIGHTS 315 msg_ucast.msg_accrights = NULL;
317 #ifdef HAVE_MSGHDR_ACCRIGHTSLEN 318 msg_ucast.msg_accrightslen = 0;
330 "sendmsg(ucast) failed (non-critical)");
334 static inline void mcast_sendmsg (
337 unsigned int msg_len)
339 struct msghdr msg_mcast;
344 struct sockaddr_storage sockaddr;
358 (
const unsigned char *)msg,
361 &buf_out_len) != 0) {
366 iovec.iov_base = (
void *)&buf_out;
367 iovec.iov_len = buf_out_len;
374 memset(&msg_mcast, 0,
sizeof(msg_mcast));
375 msg_mcast.msg_name = &sockaddr;
376 msg_mcast.msg_namelen = addrlen;
377 msg_mcast.msg_iov = (
void *)&iovec;
378 msg_mcast.msg_iovlen = 1;
379 #ifdef HAVE_MSGHDR_CONTROL 380 msg_mcast.msg_control = 0;
382 #ifdef HAVE_MSGHDR_CONTROLLEN 383 msg_mcast.msg_controllen = 0;
385 #ifdef HAVE_MSGHDR_FLAGS 386 msg_mcast.msg_flags = 0;
388 #ifdef HAVE_MSGHDR_ACCRIGHTS 389 msg_mcast.msg_accrights = NULL;
391 #ifdef HAVE_MSGHDR_ACCRIGHTSLEN 392 msg_mcast.msg_accrightslen = 0;
403 "sendmsg(mcast) failed (non-critical)");
413 msg_mcast.msg_name = NULL;
414 msg_mcast.msg_namelen = 0;
420 "sendmsg(local mcast loop) failed (non-critical)");
458 static int net_deliver_fn (
464 struct msghdr msg_recv;
466 struct sockaddr_storage system_from;
481 msg_recv.msg_namelen =
sizeof (
struct sockaddr_storage);
482 msg_recv.msg_iov = iovec;
483 msg_recv.msg_iovlen = 1;
484 #ifdef HAVE_MSGHDR_CONTROL 485 msg_recv.msg_control = 0;
487 #ifdef HAVE_MSGHDR_CONTROLLEN 488 msg_recv.msg_controllen = 0;
490 #ifdef HAVE_MSGHDR_FLAGS 491 msg_recv.msg_flags = 0;
493 #ifdef HAVE_MSGHDR_ACCRIGHTS 494 msg_recv.msg_accrights = NULL;
496 #ifdef HAVE_MSGHDR_ACCRIGHTSLEN 497 msg_recv.msg_accrightslen = 0;
500 bytes_received = recvmsg (fd, &msg_recv,
MSG_NOSIGNAL | MSG_DONTWAIT);
501 if (bytes_received == -1) {
514 "Invalid packet data");
518 iovec->iov_len = bytes_received;
524 message_type = (
char *)iovec->iov_base;
543 static int netif_determine (
553 interface_up, interface_num,
565 static void timer_function_netif_check_timeout (
576 netif_determine (instance,
579 &interface_up, &interface_num);
585 interface_up == 0) ||
589 interface_up == 1)) {
595 timer_function_netif_check_timeout,
596 &instance->timer_netif_check_timeout);
624 if (interface_up == 0) {
629 bind_address = &localhost;
638 timer_function_netif_check_timeout,
639 &instance->timer_netif_check_timeout);
650 (void)totemudp_build_sockets (instance,
660 POLLIN, instance, net_deliver_fn);
666 POLLIN, instance, net_deliver_fn);
672 POLLIN, instance, net_deliver_fn);
682 "The network interface [%s] is now up.",
695 timer_function_netif_check_timeout,
696 &instance->timer_netif_check_timeout);
702 "The network interface is down.");
712 static void totemudp_traffic_control_set(
struct totemudp_instance *instance,
int sock)
717 if (setsockopt(sock, SOL_SOCKET, SO_PRIORITY, &prio,
sizeof(
int))) {
723 static int totemudp_build_sockets_ip (
731 struct sockaddr_storage sockaddr;
732 struct ipv6_mreq mreq6;
734 struct sockaddr_storage mcast_ss, boundto_ss;
735 struct sockaddr_in6 *mcast_sin6 = (
struct sockaddr_in6 *)&mcast_ss;
736 struct sockaddr_in *mcast_sin = (
struct sockaddr_in *)&mcast_ss;
737 struct sockaddr_in *boundto_sin = (
struct sockaddr_in *)&boundto_ss;
738 unsigned int sendbuf_size;
739 unsigned int recvbuf_size;
740 unsigned int optlen =
sizeof (sendbuf_size);
758 res = fcntl (sockets->
mcast_recv, F_SETFL, O_NONBLOCK);
761 "Could not set non-blocking operation on multicast socket");
769 if ( setsockopt(sockets->
mcast_recv, SOL_SOCKET, SO_REUSEADDR, (
char *)&flag, sizeof (flag)) < 0) {
771 "setsockopt(SO_REUSEADDR) failed");
784 for (i = 0; i < 2; i++) {
789 "Could not set non-blocking operation on multicast socket");
807 res = fcntl (sockets->
mcast_send, F_SETFL, O_NONBLOCK);
810 "Could not set non-blocking operation on multicast socket");
818 if ( setsockopt(sockets->
mcast_send, SOL_SOCKET, SO_REUSEADDR, (
char *)&flag, sizeof (flag)) < 0) {
820 "setsockopt(SO_REUSEADDR) failed");
825 &sockaddr, &addrlen);
826 res = bind (sockets->
mcast_send, (
struct sockaddr *)&sockaddr, addrlen);
829 "Unable to bind the socket to send multicast packets");
836 sockets->
token = socket (bindnet_address->
family, SOCK_DGRAM, 0);
837 if (sockets->
token == -1) {
844 res = fcntl (sockets->
token, F_SETFL, O_NONBLOCK);
847 "Could not set non-blocking operation on token socket");
855 if ( setsockopt(sockets->
token, SOL_SOCKET, SO_REUSEADDR, (
char *)&flag, sizeof (flag)) < 0) {
857 "setsockopt(SO_REUSEADDR) failed");
866 res = bind (sockets->
token, (
struct sockaddr *)&sockaddr, addrlen);
869 "Unable to bind UDP unicast socket");
878 res = setsockopt (sockets->
mcast_recv, SOL_SOCKET, SO_RCVBUF, &recvbuf_size, optlen);
881 "Unable to set SO_RCVBUF size on UDP mcast socket");
884 res = setsockopt (sockets->
mcast_send, SOL_SOCKET, SO_SNDBUF, &sendbuf_size, optlen);
887 "Unable to set SO_SNDBUF size on UDP mcast socket");
890 res = setsockopt (sockets->
local_mcast_loop[0], SOL_SOCKET, SO_RCVBUF, &recvbuf_size, optlen);
893 "Unable to set SO_RCVBUF size on UDP local mcast loop socket");
896 res = setsockopt (sockets->
local_mcast_loop[1], SOL_SOCKET, SO_SNDBUF, &sendbuf_size, optlen);
899 "Unable to set SO_SNDBUF size on UDP local mcast loop socket");
903 res = getsockopt (sockets->
mcast_recv, SOL_SOCKET, SO_RCVBUF, &recvbuf_size, &optlen);
906 "Receive multicast socket recv buffer size (%d bytes).", recvbuf_size);
909 res = getsockopt (sockets->
mcast_send, SOL_SOCKET, SO_SNDBUF, &sendbuf_size, &optlen);
912 "Transmit multicast socket send buffer size (%d bytes).", sendbuf_size);
915 res = getsockopt (sockets->
local_mcast_loop[0], SOL_SOCKET, SO_RCVBUF, &recvbuf_size, &optlen);
918 "Local receive multicast loop socket recv buffer size (%d bytes).", recvbuf_size);
921 res = getsockopt (sockets->
local_mcast_loop[1], SOL_SOCKET, SO_SNDBUF, &sendbuf_size, &optlen);
924 "Local transmit multicast loop socket send buffer size (%d bytes).", sendbuf_size);
935 unsigned int broadcast = 1;
937 if ((setsockopt(sockets->
mcast_recv, SOL_SOCKET,
938 SO_BROADCAST, &broadcast, sizeof (broadcast))) == -1) {
940 "setting broadcast option failed");
943 if ((setsockopt(sockets->
mcast_send, SOL_SOCKET,
944 SO_BROADCAST, &broadcast, sizeof (broadcast))) == -1) {
946 "setting broadcast option failed");
950 switch (bindnet_address->
family) {
952 memset(&mreq, 0,
sizeof(mreq));
953 mreq.imr_multiaddr.s_addr = mcast_sin->sin_addr.s_addr;
954 mreq.imr_interface.s_addr = boundto_sin->sin_addr.s_addr;
955 res = setsockopt (sockets->
mcast_recv, IPPROTO_IP, IP_ADD_MEMBERSHIP,
956 &mreq, sizeof (mreq));
959 "join ipv4 multicast group failed");
964 memset(&mreq6, 0,
sizeof(mreq6));
965 memcpy(&mreq6.ipv6mr_multiaddr, &mcast_sin6->sin6_addr,
sizeof(
struct in6_addr));
966 mreq6.ipv6mr_interface = interface_num;
968 res = setsockopt (sockets->
mcast_recv, IPPROTO_IPV6, IPV6_JOIN_GROUP,
969 &mreq6, sizeof (mreq6));
972 "join ipv6 multicast group failed");
984 switch ( bindnet_address->
family ) {
987 res = setsockopt (sockets->
mcast_send, IPPROTO_IP, IP_MULTICAST_LOOP,
988 &sflag, sizeof (sflag));
991 res = setsockopt (sockets->
mcast_send, IPPROTO_IPV6, IPV6_MULTICAST_LOOP,
992 &flag, sizeof (flag));
996 "Unable to turn off multicast loopback");
1004 if (bindnet_address->
family == AF_INET6) {
1005 res = setsockopt (sockets->
mcast_send, IPPROTO_IPV6, IPV6_MULTICAST_HOPS,
1006 &flag, sizeof (flag));
1009 "set mcast v6 TTL failed");
1014 res = setsockopt(sockets->
mcast_send, IPPROTO_IP, IP_MULTICAST_TTL,
1015 &sflag,
sizeof(sflag));
1018 "set mcast v4 TTL failed");
1026 switch ( bindnet_address->
family ) {
1028 if (setsockopt (sockets->
mcast_send, IPPROTO_IP, IP_MULTICAST_IF,
1029 &boundto_sin->sin_addr, sizeof (boundto_sin->sin_addr)) < 0) {
1031 "cannot select interface for multicast packets (send)");
1034 if (setsockopt (sockets->
mcast_recv, IPPROTO_IP, IP_MULTICAST_IF,
1035 &boundto_sin->sin_addr, sizeof (boundto_sin->sin_addr)) < 0) {
1037 "cannot select interface for multicast packets (recv)");
1042 if (setsockopt (sockets->
mcast_send, IPPROTO_IPV6, IPV6_MULTICAST_IF,
1043 &interface_num, sizeof (interface_num)) < 0) {
1045 "cannot select interface for multicast packets (send v6)");
1048 if (setsockopt (sockets->
mcast_recv, IPPROTO_IPV6, IPV6_MULTICAST_IF,
1049 &interface_num, sizeof (interface_num)) < 0) {
1051 "cannot select interface for multicast packets (recv v6)");
1065 res = bind (sockets->
mcast_recv, (
struct sockaddr *)&sockaddr, addrlen);
1068 "Unable to bind the socket to receive multicast packets");
1074 static int totemudp_build_sockets (
1088 res = netif_determine (instance,
1100 res = totemudp_build_sockets_ip (instance, mcast_address,
1101 bindnet_address, sockets, bound_to, interface_num);
1104 totemudp_traffic_control_set(instance, sockets->
token);
1117 qb_loop_t *poll_handle,
1124 void (*deliver_fn) (
1127 unsigned int msg_len),
1129 void (*iface_change_fn) (
1133 void (*target_set_completed) (
1139 if (instance == NULL) {
1143 totemudp_instance_initialize (instance);
1202 100*QB_TIME_NS_IN_MSEC,
1204 timer_function_netif_check_timeout,
1205 &instance->timer_netif_check_timeout);
1207 *udp_context = instance;
1223 int processor_count)
1231 if (processor_count == 1) {
1236 timer_function_netif_check_timeout,
1237 &instance->timer_netif_check_timeout);
1254 for (i = 0; i < 2; i++) {
1266 ufd.events = POLLIN;
1267 nfds = poll (&ufd, 1, 0);
1268 if (nfds == 1 && ufd.revents & POLLIN) {
1269 net_deliver_fn (sock, ufd.revents, instance);
1271 }
while (nfds == 1);
1287 unsigned int msg_len)
1292 ucast_sendmsg (instance, &instance->
token_target, msg, msg_len);
1299 unsigned int msg_len)
1304 mcast_sendmsg (instance, msg, msg_len);
1312 unsigned int msg_len)
1317 mcast_sendmsg (instance, msg, msg_len);
1327 timer_function_netif_check_timeout (instance);
1344 const char *ret_char;
1383 struct sockaddr_storage system_from;
1384 struct msghdr msg_recv;
1387 int msg_processed = 0;
1395 msg_recv.msg_namelen =
sizeof (
struct sockaddr_storage);
1397 msg_recv.msg_iovlen = 1;
1398 #ifdef HAVE_MSGHDR_CONTROL 1399 msg_recv.msg_control = 0;
1401 #ifdef HAVE_MSGHDR_CONTROLLEN 1402 msg_recv.msg_controllen = 0;
1404 #ifdef HAVE_MSGHDR_FLAGS 1405 msg_recv.msg_flags = 0;
1407 #ifdef HAVE_MSGHDR_ACCRIGHTS 1408 msg_recv.msg_accrights = NULL;
1410 #ifdef HAVE_MSGHDR_ACCRIGHTSLEN 1411 msg_recv.msg_accrightslen = 0;
1414 for (i = 0; i < 2; i++) {
1426 ufd.events = POLLIN;
1427 nfds = poll (&ufd, 1, 0);
1428 if (nfds == 1 && ufd.revents & POLLIN) {
1429 res = recvmsg (sock, &msg_recv,
MSG_NOSIGNAL | MSG_DONTWAIT);
1436 }
while (nfds == 1);
1439 return (msg_processed);
unsigned int clear_node_high_bit
int totemudp_crypto_set(void *udp_context, const char *cipher_type, const char *hash_type)
int totemudp_processor_count_set(void *udp_context, int processor_count)
int totemip_localhost(int family, struct totem_ip_address *localhost)
struct iovec totemudp_iov_recv_flush
struct totem_interface * interfaces
unsigned int interface_count
struct totemudp_instance * instance
struct crypto_instance * crypto_inst
size_t crypto_sec_header_size(const char *crypto_cipher_type, const char *crypto_hash_type)
The totem_ip_address struct.
const char * totemip_print(const struct totem_ip_address *addr)
void(* totemudp_iface_change_fn)(void *context, const struct totem_ip_address *iface_address)
size_t crypto_get_current_sec_header_size(const struct crypto_instance *instance)
void(* totemudp_deliver_fn)(void *context, const void *msg, unsigned int msg_len)
void totemudp_net_mtu_adjust(void *udp_context, struct totem_config *totem_config)
void * totemudp_buffer_alloc(void)
int totemudp_mcast_flush_send(void *udp_context, const void *msg, unsigned int msg_len)
int totemudp_token_target_set(void *udp_context, const struct totem_ip_address *token_target)
unsigned char private_key[TOTEM_PRIVATE_KEY_LEN]
char iov_buffer[FRAME_SIZE_MAX]
struct totemudp_socket totemudp_sockets
unsigned char addr[TOTEMIP_ADDRLEN]
struct crypto_instance * crypto_init(const unsigned char *private_key, unsigned int private_key_len, const char *crypto_cipher_type, const char *crypto_hash_type, void(*log_printf_func)(int level, int subsys, const char *function, const char *file, int line, const char *format,...) __attribute__((format(printf, 6, 7))), int log_level_security, int log_level_notice, int log_level_error, int log_subsys_id)
void totemip_copy(struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
struct totem_ip_address mcast_address
unsigned int downcheck_timeout
int totemudp_mcast_noflush_send(void *udp_context, const void *msg, unsigned int msg_len)
unsigned int private_key_len
int totemudp_log_level_security
#define BIND_STATE_REGULAR
struct totem_config * totem_config
#define NETIF_STATE_REPORT_DOWN
#define totemip_nosigpipe(s)
int totemudp_log_level_error
int totemudp_send_flush(void *udp_context)
int totemudp_recv_flush(void *udp_context)
struct iovec totemudp_iov_recv
int totemip_iface_check(struct totem_ip_address *bindnet, struct totem_ip_address *boundto, int *interface_up, int *interface_num, int mask_high_bit)
int crypto_encrypt_and_sign(struct crypto_instance *instance, const unsigned char *buf_in, const size_t buf_in_len, unsigned char *buf_out, size_t *buf_out_len)
int totemudp_token_send(void *udp_context, const void *msg, unsigned int msg_len)
struct totem_interface * totem_interface
int totemudp_initialize(qb_loop_t *poll_handle, void **udp_context, struct totem_config *totem_config, totemsrp_stats_t *stats, int interface_no, void *context, void(*deliver_fn)(void *context, const void *msg, unsigned int msg_len), void(*iface_change_fn)(void *context, const struct totem_ip_address *iface_address), void(*target_set_completed)(void *context))
Create an instance.
qb_loop_t * totemudp_poll_handle
unsigned int my_memb_entries
struct totem_ip_address mcast_addr
void(* totemudp_log_printf)(int level, int subsys, const char *function, const char *file, int line, const char *format,...) __attribute__((format(printf
int totemudp_finalize(void *udp_context)
struct totem_ip_address boundto
#define BIND_STATE_LOOPBACK
size_t totemip_udpip_header_size(int family)
const char * totemudp_iface_print(void *udp_context)
#define NETIF_STATE_REPORT_UP
void(* log_printf)(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format,...) __attribute__((format(printf
#define MCAST_SOCKET_BUFFER_SIZE
void(*) void udp_context)
char iov_buffer_flush[FRAME_SIZE_MAX]
int totemudp_log_level_debug
int crypto_authenticate_and_decrypt(struct crypto_instance *instance, unsigned char *buf, int *buf_len)
int totemudp_log_level_notice
#define MESSAGE_TYPE_MEMB_JOIN
unsigned int broadcast_use
int totemudp_iface_get(void *udp_context, struct totem_ip_address *addr)
#define LOGSYS_LEVEL_CRIT
void(* totemudp_target_set_completed)(void *context)
int totemip_totemip_to_sockaddr_convert(struct totem_ip_address *ip_addr, uint16_t port, struct sockaddr_storage *saddr, int *addrlen)
struct totem_logging_configuration totem_logging_configuration
#define LOGSYS_PERROR(err_num, level, fmt, args...)
struct srp_addr system_from
#define log_printf(level, format, args...)
char * crypto_cipher_type
struct totem_ip_address my_id
int totemudp_recv_mcast_empty(void *udp_context)
int totemudp_iface_check(void *udp_context)
struct totem_ip_address bindnet
uint32_t continuous_sendmsg_failures
struct totem_ip_address token_target
qb_loop_timer_handle timer_netif_check_timeout
void totemudp_buffer_release(void *ptr)
int totemudp_log_level_warning