41 #include <sys/types.h>
42 #include <sys/socket.h>
44 #include <sys/ioctl.h>
45 #include <netinet/in.h>
54 #include <arpa/inet.h>
57 #include <qb/qblist.h>
61 #include <qb/qbipc_common.h>
70 #define MAP_ANONYMOUS MAP_ANON
77 #define GROUP_HASH_SIZE 32
90 struct qb_list_head
list;
143 static struct qb_list_head joinlist_messages_head;
154 struct qb_list_head
list;
161 struct qb_list_head
list;
172 static unsigned int my_member_list_entries;
176 static unsigned int my_old_member_list_entries = 0;
188 struct qb_list_head
list;
202 static int cpg_lib_init_fn (
void *conn);
204 static int cpg_lib_exit_fn (
void *conn);
206 static void message_handler_req_exec_cpg_procjoin (
210 static void message_handler_req_exec_cpg_procleave (
214 static void message_handler_req_exec_cpg_joinlist (
218 static void message_handler_req_exec_cpg_mcast (
222 static void message_handler_req_exec_cpg_partial_mcast (
226 static void message_handler_req_exec_cpg_downlist_old (
230 static void message_handler_req_exec_cpg_downlist (
234 static void exec_cpg_procjoin_endian_convert (
void *msg);
236 static void exec_cpg_joinlist_endian_convert (
void *msg);
238 static void exec_cpg_mcast_endian_convert (
void *msg);
240 static void exec_cpg_partial_mcast_endian_convert (
void *msg);
242 static void exec_cpg_downlist_endian_convert_old (
void *msg);
244 static void exec_cpg_downlist_endian_convert (
void *msg);
246 static void message_handler_req_lib_cpg_join (
void *conn,
const void *message);
248 static void message_handler_req_lib_cpg_leave (
void *conn,
const void *message);
250 static void message_handler_req_lib_cpg_finalize (
void *conn,
const void *message);
252 static void message_handler_req_lib_cpg_mcast (
void *conn,
const void *message);
254 static void message_handler_req_lib_cpg_partial_mcast (
void *conn,
const void *message);
256 static void message_handler_req_lib_cpg_membership (
void *conn,
257 const void *message);
259 static void message_handler_req_lib_cpg_local_get (
void *conn,
260 const void *message);
262 static void message_handler_req_lib_cpg_iteration_initialize (
264 const void *message);
266 static void message_handler_req_lib_cpg_iteration_next (
268 const void *message);
270 static void message_handler_req_lib_cpg_iteration_finalize (
272 const void *message);
274 static void message_handler_req_lib_cpg_zc_alloc (
276 const void *message);
278 static void message_handler_req_lib_cpg_zc_free (
280 const void *message);
282 static void message_handler_req_lib_cpg_zc_execute (
284 const void *message);
286 static int cpg_node_joinleave_send (
unsigned int pid,
const mar_cpg_name_t *group_name,
int fn,
int reason);
288 static int cpg_exec_send_downlist(
void);
290 static int cpg_exec_send_joinlist(
void);
292 static void downlist_inform_clients (
void);
294 static void joinlist_inform_clients (
void);
296 static void joinlist_messages_delete (
void);
298 static void cpg_sync_init (
299 const unsigned int *trans_list,
300 size_t trans_list_entries,
301 const unsigned int *member_list,
302 size_t member_list_entries,
305 static int cpg_sync_process (
void);
307 static void cpg_sync_activate (
void);
309 static void cpg_sync_abort (
void);
311 static void do_proc_join(
317 static void do_proc_leave(
323 static int notify_lib_totem_membership (
325 int member_list_entries,
326 const unsigned int *member_list);
328 static inline int zcb_all_free (
331 static char *cpg_print_group_name (
344 .lib_handler_fn = message_handler_req_lib_cpg_leave,
348 .lib_handler_fn = message_handler_req_lib_cpg_mcast,
352 .lib_handler_fn = message_handler_req_lib_cpg_membership,
356 .lib_handler_fn = message_handler_req_lib_cpg_local_get,
360 .lib_handler_fn = message_handler_req_lib_cpg_iteration_initialize,
364 .lib_handler_fn = message_handler_req_lib_cpg_iteration_next,
368 .lib_handler_fn = message_handler_req_lib_cpg_iteration_finalize,
372 .lib_handler_fn = message_handler_req_lib_cpg_finalize,
376 .lib_handler_fn = message_handler_req_lib_cpg_zc_alloc,
380 .lib_handler_fn = message_handler_req_lib_cpg_zc_free,
384 .lib_handler_fn = message_handler_req_lib_cpg_zc_execute,
388 .lib_handler_fn = message_handler_req_lib_cpg_partial_mcast,
398 .exec_endian_convert_fn = exec_cpg_procjoin_endian_convert
401 .exec_handler_fn = message_handler_req_exec_cpg_procleave,
402 .exec_endian_convert_fn = exec_cpg_procjoin_endian_convert
405 .exec_handler_fn = message_handler_req_exec_cpg_joinlist,
406 .exec_endian_convert_fn = exec_cpg_joinlist_endian_convert
409 .exec_handler_fn = message_handler_req_exec_cpg_mcast,
410 .exec_endian_convert_fn = exec_cpg_mcast_endian_convert
413 .exec_handler_fn = message_handler_req_exec_cpg_downlist_old,
414 .exec_endian_convert_fn = exec_cpg_downlist_endian_convert_old
417 .exec_handler_fn = message_handler_req_exec_cpg_downlist,
418 .exec_endian_convert_fn = exec_cpg_downlist_endian_convert
421 .exec_handler_fn = message_handler_req_exec_cpg_partial_mcast,
422 .exec_endian_convert_fn = exec_cpg_partial_mcast_endian_convert
427 .
name =
"corosync cluster closed process group service v1.01",
430 .private_data_size =
sizeof (
struct cpg_pd),
433 .lib_init_fn = cpg_lib_init_fn,
434 .lib_exit_fn = cpg_lib_exit_fn,
435 .lib_engine = cpg_lib_engine,
437 .exec_init_fn = cpg_exec_init_fn,
438 .exec_dump_fn = NULL,
439 .exec_engine = cpg_exec_engine,
441 .sync_init = cpg_sync_init,
442 .sync_process = cpg_sync_process,
443 .sync_activate = cpg_sync_activate,
444 .sync_abort = cpg_sync_abort
498 struct qb_list_head
list;
513 for (i = 0; i < group->length; i++) {
516 if (c >=
' ' && c < 0x7f && c !=
'\\') {
520 res[dest_pos++] =
'\\';
521 res[dest_pos++] =
'\\';
523 snprintf(res + dest_pos,
sizeof(res) - dest_pos,
"\\x%02X", c);
533 static void cpg_sync_init (
534 const unsigned int *trans_list,
535 size_t trans_list_entries,
536 const unsigned int *member_list,
537 size_t member_list_entries,
546 memcpy (my_member_list, member_list, member_list_entries *
547 sizeof (
unsigned int));
548 my_member_list_entries = member_list_entries;
557 for (i = 0; i < my_old_member_list_entries; i++) {
559 for (j = 0; j < trans_list_entries; j++) {
560 if (my_old_member_list[i] == trans_list[j]) {
566 g_req_exec_cpg_downlist.nodeids[entries++] =
567 my_old_member_list[i];
570 g_req_exec_cpg_downlist.left_nodes = entries;
573 static int cpg_sync_process (
void)
578 res = cpg_exec_send_downlist();
585 res = cpg_exec_send_joinlist();
590 static void cpg_sync_activate (
void)
592 memcpy (my_old_member_list, my_member_list,
593 my_member_list_entries *
sizeof (
unsigned int));
594 my_old_member_list_entries = my_member_list_entries;
596 downlist_inform_clients ();
598 joinlist_inform_clients ();
600 joinlist_messages_delete ();
602 notify_lib_totem_membership (NULL, my_member_list_entries, my_member_list);
605 static void cpg_sync_abort (
void)
608 joinlist_messages_delete ();
611 static int notify_lib_totem_membership (
613 int member_list_entries,
614 const unsigned int *member_list)
616 struct qb_list_head *iter;
628 res->member_list_entries = member_list_entries;
629 res->header.size = size;
631 res->header.error =
CS_OK;
637 qb_list_for_each(iter, &cpg_pd_list_head) {
648 static int notify_lib_joinlist(
651 int joined_list_entries,
653 int left_list_entries,
659 struct qb_list_head *iter;
666 qb_list_for_each(iter, &process_info_list_head) {
668 if (mar_name_compare (&pi->
group, group_name) == 0) {
672 for (i = 0; i < left_list_entries; i++) {
673 if (left_list[i].
nodeid == pi->
nodeid && left_list[i].pid == pi->
pid) {
690 res->joined_list_entries = joined_list_entries;
691 res->left_list_entries = left_list_entries;
692 res->member_list_entries = count;
694 res->header.size = size;
696 res->header.error =
CS_OK;
699 qb_list_for_each(iter, &process_info_list_head) {
702 if (mar_name_compare (&pi->
group, group_name) == 0) {
706 for (i = 0;i < left_list_entries; i++) {
707 if (left_list[i].
nodeid == pi->
nodeid && left_list[i].pid == pi->
pid) {
713 retgi->nodeid = pi->
nodeid;
714 retgi->pid = pi->
pid;
720 if (left_list_entries) {
722 retgi += left_list_entries;
725 if (joined_list_entries) {
727 retgi += joined_list_entries;
733 qb_list_for_each(iter, &cpg_pd_list_head) {
736 assert (joined_list_entries <= 1);
737 if (joined_list_entries) {
738 if (joined_list[0].
pid == cpd->
pid &&
749 if (left_list_entries) {
750 if (left_list[0].
pid == cpd->
pid &&
767 qb_list_for_each(iter, &cpg_pd_list_head) {
773 notify_lib_totem_membership (cpd->
conn, my_old_member_list_entries, my_old_member_list);
783 "%s: members(old:%d left:%d)",
789 static void downlist_inform_clients (
void)
791 struct qb_list_head *iter, *tmp_iter;
799 int left_list_entries;
800 struct qb_list_head list;
802 qb_map_iter_t *miter;
805 downlist_log(
"my downlist", &g_req_exec_cpg_downlist);
807 group_map = qb_skiplist_create();
814 qb_list_for_each_safe(iter, tmp_iter, &process_info_list_head) {
818 for (i = 0; i < g_req_exec_cpg_downlist.left_nodes; i++) {
820 if (pi->
nodeid == g_req_exec_cpg_downlist.nodeids[i]) {
827 marshall_from_mar_cpg_name_t(&cpg_group, &left_pi->
group);
828 cpg_group.value[cpg_group.length] = 0;
830 pcd = (
struct confchg_data *)qb_map_get(group_map, cpg_group.value);
832 pcd = (
struct confchg_data *)calloc(1,
sizeof(
struct confchg_data));
833 memcpy(&pcd->cpg_group, &cpg_group,
sizeof(
struct cpg_name));
834 qb_map_put(group_map, pcd->cpg_group.value, pcd);
836 size = pcd->left_list_entries;
837 pcd->left_list[size].nodeid = left_pi->
nodeid;
838 pcd->left_list[size].pid = left_pi->
pid;
840 pcd->left_list_entries++;
841 qb_list_del (&left_pi->
list);
847 miter = qb_map_iter_create(group_map);
848 while (qb_map_iter_next(miter, (
void **)&pcd)) {
849 marshall_to_mar_cpg_name_t(&group, &pcd->cpg_group);
851 log_printf (LOG_DEBUG,
"left_list_entries:%d", pcd->left_list_entries);
852 for (i=0; i<pcd->left_list_entries; i++) {
853 log_printf (LOG_DEBUG,
"left_list[%d] group:%s, ip:%s, pid:%d",
854 i, cpg_print_group_name(&group),
856 pcd->left_list[i].pid);
860 notify_lib_joinlist(&group, NULL,
862 pcd->left_list_entries,
868 qb_map_iter_free(miter);
869 qb_map_destroy(group_map);
875 static void joinlist_remove_zombie_pi_entries (
void)
877 struct qb_list_head *pi_iter, *tmp_iter;
878 struct qb_list_head *jl_iter;
883 qb_list_for_each_safe(pi_iter, tmp_iter, &process_info_list_head) {
897 qb_list_for_each(jl_iter, &joinlist_messages_head) {
905 pi->
pid == stored_msg->
pid &&
918 static void joinlist_inform_clients (
void)
921 struct qb_list_head *iter;
925 qb_list_for_each(iter, &joinlist_messages_head) {
926 stored_msg = qb_list_entry(iter,
struct joinlist_msg, list);
928 log_printf (LOG_DEBUG,
"joinlist_messages[%u] group:%s, ip:%s, pid:%d",
929 i++, cpg_print_group_name(&stored_msg->
group_name),
942 joinlist_remove_zombie_pi_entries ();
945 static void joinlist_messages_delete (
void)
948 struct qb_list_head *iter, *tmp_iter;
950 qb_list_for_each_safe(iter, tmp_iter, &joinlist_messages_head) {
951 stored_msg = qb_list_entry(iter,
struct joinlist_msg, list);
952 qb_list_del (&stored_msg->
list);
955 qb_list_init (&joinlist_messages_head);
960 qb_list_init (&joinlist_messages_head);
967 struct qb_list_head *iter, *tmp_iter;
972 qb_list_del (&pi->
list);
980 static void cpg_pd_finalize (
struct cpg_pd *cpd)
982 struct qb_list_head *iter, *tmp_iter;
989 cpg_iteration_instance_finalize (cpii);
992 qb_list_del (&cpd->
list);
995 static int cpg_lib_exit_fn (
void *conn)
1006 cpg_pd_finalize (cpd);
1015 struct iovec req_exec_cpg_iovec;
1034 static void exec_cpg_procjoin_endian_convert (
void *msg)
1043 static void exec_cpg_joinlist_endian_convert (
void *msg_v)
1046 struct qb_ipc_response_header *res = (
struct qb_ipc_response_header *)msg;
1049 swab_mar_int32_t (&res->size);
1051 while ((
const char*)jle < msg + res->size) {
1058 static void exec_cpg_downlist_endian_convert_old (
void *msg)
1062 static void exec_cpg_downlist_endian_convert (
void *msg)
1076 static void exec_cpg_mcast_endian_convert (
void *msg)
1087 static void exec_cpg_partial_mcast_endian_convert (
void *msg)
1101 struct qb_list_head *iter;
1103 qb_list_for_each(iter, &process_info_list_head) {
1107 mar_name_compare (&pi->
group, group_name) == 0) {
1115 static void do_proc_join(
1124 struct qb_list_head *list;
1125 struct qb_list_head *list_to_add = NULL;
1127 if (process_info_find (name, pid,
nodeid) != NULL) {
1137 memcpy(&pi->
group, name,
sizeof(*name));
1138 qb_list_init(&pi->
list);
1143 list_to_add = &process_info_list_head;
1144 qb_list_for_each(list, &process_info_list_head) {
1145 pi_entry = qb_list_entry(list,
struct process_info, list);
1153 qb_list_add (&pi->
list, list_to_add);
1155 notify_info.pid = pi->
pid;
1156 notify_info.nodeid =
nodeid;
1157 notify_info.reason = reason;
1159 notify_lib_joinlist(&pi->
group, NULL,
1165 static void do_proc_leave(
1172 struct qb_list_head *iter, *tmp_iter;
1175 notify_info.pid = pid;
1176 notify_info.nodeid =
nodeid;
1177 notify_info.reason = reason;
1179 notify_lib_joinlist(name, NULL,
1184 qb_list_for_each_safe(iter, tmp_iter, &process_info_list_head) {
1188 mar_name_compare (&pi->
group, name)==0) {
1189 qb_list_del (&pi->
list);
1195 static void message_handler_req_exec_cpg_downlist_old (
1196 const void *message,
1203 static void message_handler_req_exec_cpg_downlist(
1204 const void *message,
1214 static void message_handler_req_exec_cpg_procjoin (
1215 const void *message,
1230 static void message_handler_req_exec_cpg_procleave (
1231 const void *message,
1248 static void message_handler_req_exec_cpg_joinlist (
1249 const void *message_v,
1252 const char *message = message_v;
1253 const struct qb_ipc_response_header *res = (
const struct qb_ipc_response_header *)message;
1260 while ((
const char*)jle < message + res->size) {
1264 stored_msg->
pid = jle->
pid;
1266 qb_list_init (&stored_msg->
list);
1267 qb_list_add (&stored_msg->
list, &joinlist_messages_head);
1272 static void message_handler_req_exec_cpg_mcast (
1273 const void *message,
1279 struct qb_list_head *iter, *pi_iter, *tmp_iter;
1281 struct iovec iovec[2];
1296 iovec[1].iov_len = msglen;
1298 qb_list_for_each_safe(iter, tmp_iter, &cpg_pd_list_head) {
1299 cpd = qb_list_entry(iter,
struct cpg_pd, list);
1305 qb_list_for_each(pi_iter, &process_info_list_head) {
1326 static void message_handler_req_exec_cpg_partial_mcast (
1327 const void *message,
1333 struct qb_list_head *iter, *pi_iter, *tmp_iter;
1335 struct iovec iovec[2];
1354 iovec[1].iov_len = msglen;
1356 qb_list_for_each_safe(iter, tmp_iter, &cpg_pd_list_head) {
1357 cpd = qb_list_entry(iter,
struct cpg_pd, list);
1364 qb_list_for_each(pi_iter, &process_info_list_head) {
1386 static int cpg_exec_send_downlist(
void)
1393 g_req_exec_cpg_downlist.old_members = my_old_member_list_entries;
1395 iov.iov_base = (
void *)&g_req_exec_cpg_downlist;
1396 iov.iov_len = g_req_exec_cpg_downlist.header.size;
1401 static int cpg_exec_send_joinlist(
void)
1404 struct qb_list_head *iter;
1405 struct qb_ipc_response_header *res;
1408 struct iovec req_exec_cpg_iovec;
1410 qb_list_for_each(iter, &process_info_list_head) {
1422 buf = alloca(
sizeof(
struct qb_ipc_response_header) +
sizeof(
struct join_list_entry) * count);
1428 jle = (
struct join_list_entry *)(buf +
sizeof(
struct qb_ipc_response_header));
1429 res = (
struct qb_ipc_response_header *)buf;
1431 qb_list_for_each(iter, &process_info_list_head) {
1442 res->size =
sizeof(
struct qb_ipc_response_header)+sizeof(struct
join_list_entry) * count;
1444 req_exec_cpg_iovec.iov_base = buf;
1445 req_exec_cpg_iovec.iov_len = res->size;
1450 static int cpg_lib_init_fn (
void *conn)
1453 memset (cpd, 0,
sizeof(
struct cpg_pd));
1455 qb_list_add (&cpd->
list, &cpg_pd_list_head);
1466 static void message_handler_req_lib_cpg_join (
void *
conn,
const void *message)
1472 struct qb_list_head *iter;
1475 qb_list_for_each(iter, &cpg_pd_list_head) {
1491 qb_list_for_each(iter, &process_info_list_head) {
1539 static void message_handler_req_lib_cpg_leave (
void *conn,
const void *message)
1576 static void message_handler_req_lib_cpg_finalize (
1578 const void *message)
1590 qb_list_del (&cpd->
list);
1591 qb_list_init (&cpd->
list);
1611 fd = open (path, O_RDWR, 0600);
1619 res = ftruncate (fd, bytes);
1621 goto error_close_unlink;
1624 addr = mmap (NULL, bytes, PROT_READ | PROT_WRITE,
1627 if (
addr == MAP_FAILED) {
1628 goto error_close_unlink;
1631 madvise(
addr, bytes, MADV_NOSYNC);
1636 munmap (
addr, bytes);
1648 static inline int zcb_alloc (
1650 const char *path_to_file,
1689 static inline int zcb_by_addr_free (
struct cpg_pd *cpd,
void *
addr)
1691 struct qb_list_head *list, *tmp_iter;
1693 unsigned int res = 0;
1707 static inline int zcb_all_free (
1710 struct qb_list_head *list, *tmp_iter;
1726 static uint64_t void2serveraddr (
void *server_ptr)
1734 static void *serveraddr2void (uint64_t
server_addr)
1742 static void message_handler_req_lib_cpg_zc_alloc (
1744 const void *message)
1747 struct qb_ipc_response_header res_header;
1755 res = zcb_alloc (cpd, hdr->path_to_file, hdr->map_size,
1762 res_header.size =
sizeof (
struct qb_ipc_response_header);
1769 static void message_handler_req_lib_cpg_zc_free (
1771 const void *message)
1774 struct qb_ipc_response_header res_header;
1780 addr = serveraddr2void (hdr->server_address);
1782 zcb_by_addr_free (cpd,
addr);
1784 res_header.size =
sizeof (
struct qb_ipc_response_header);
1792 static void message_handler_req_lib_cpg_partial_mcast (
void *conn,
const void *message)
1798 struct iovec req_exec_cpg_iovec[2];
1833 if (error ==
CS_OK) {
1848 req_exec_cpg_iovec[1].iov_len = msglen;
1851 assert(result == 0);
1854 conn, group_name.value, cpd->
cpd_state, error);
1863 static void message_handler_req_lib_cpg_mcast (
void *conn,
const void *message)
1869 struct iovec req_exec_cpg_iovec[2];
1892 if (error ==
CS_OK) {
1905 req_exec_cpg_iovec[1].iov_len = msglen;
1908 assert(result == 0);
1911 conn, group_name.value, cpd->
cpd_state, error);
1915 static void message_handler_req_lib_cpg_zc_execute (
1917 const void *message)
1920 struct qb_ipc_request_header *
header;
1923 struct iovec req_exec_cpg_iovec[2];
1931 header = (
struct qb_ipc_request_header *)(((
char *)serveraddr2void(hdr->server_address) + sizeof (
struct coroipcs_zc_header)));
1951 if (error ==
CS_OK) {
1981 static void message_handler_req_lib_cpg_membership (
void *conn,
1982 const void *message)
1987 struct qb_list_head *iter;
1988 int member_count = 0;
1995 qb_list_for_each(iter, &process_info_list_head) {
2009 static void message_handler_req_lib_cpg_local_get (
void *conn,
2010 const void *message)
2023 static void message_handler_req_lib_cpg_iteration_initialize (
2025 const void *message)
2031 struct qb_list_head *iter, *iter2;
2046 &cpg_iteration_handle);
2053 res = hdb_handle_get (&cpg_iteration_handle_t_db, cpg_iteration_handle, (
void *)&
cpg_iteration_instance);
2066 qb_list_for_each(iter, &process_info_list_head) {
2079 if (mar_name_compare (&pi2->
group, &pi->
group) == 0) {
2108 goto error_put_destroy;
2112 qb_list_init (&new_pi->
list);
2127 if (mar_name_compare (&pi2->
group, &pi->
group) == 0) {
2132 qb_list_add (&new_pi->
list, iter2);
2148 hdb_handle_put (&cpg_iteration_handle_t_db, cpg_iteration_handle);
2150 if (error !=
CS_OK) {
2151 hdb_handle_destroy (&cpg_iteration_handle_t_db, cpg_iteration_handle);
2164 static void message_handler_req_lib_cpg_iteration_next (
2166 const void *message)
2177 res = hdb_handle_get (&cpg_iteration_handle_t_db,
2217 static void message_handler_req_lib_cpg_iteration_finalize (
2219 const void *message)
2229 res = hdb_handle_get (&cpg_iteration_handle_t_db,