| /* |
| * test-simple-ipc.c: verify that the Inter-Process Communication works. |
| */ |
| |
| #include "test-tool.h" |
| #include "gettext.h" |
| #include "strbuf.h" |
| #include "simple-ipc.h" |
| #include "parse-options.h" |
| #include "thread-utils.h" |
| #include "strvec.h" |
| #include "run-command.h" |
| #include "trace2.h" |
| |
| #ifndef SUPPORTS_SIMPLE_IPC |
| int cmd__simple_ipc(int argc, const char **argv) |
| { |
| die("simple IPC not available on this platform"); |
| } |
| #else |
| |
| /* |
| * The test daemon defines an "application callback" that supports a |
| * series of commands (see `test_app_cb()`). |
| * |
| * Unknown commands are caught here and we send an error message back |
| * to the client process. |
| */ |
| static int app__unhandled_command(const char *command, |
| ipc_server_reply_cb *reply_cb, |
| struct ipc_server_reply_data *reply_data) |
| { |
| struct strbuf buf = STRBUF_INIT; |
| int ret; |
| |
| strbuf_addf(&buf, "unhandled command: %s", command); |
| ret = reply_cb(reply_data, buf.buf, buf.len); |
| strbuf_release(&buf); |
| |
| return ret; |
| } |
| |
| /* |
| * Reply with a single very large buffer. This is to ensure that |
| * long response are properly handled -- whether the chunking occurs |
| * in the kernel or in the (probably pkt-line) layer. |
| */ |
| #define BIG_ROWS (10000) |
| static int app__big_command(ipc_server_reply_cb *reply_cb, |
| struct ipc_server_reply_data *reply_data) |
| { |
| struct strbuf buf = STRBUF_INIT; |
| int row; |
| int ret; |
| |
| for (row = 0; row < BIG_ROWS; row++) |
| strbuf_addf(&buf, "big: %.75d\n", row); |
| |
| ret = reply_cb(reply_data, buf.buf, buf.len); |
| strbuf_release(&buf); |
| |
| return ret; |
| } |
| |
| /* |
| * Reply with a series of lines. This is to ensure that we can incrementally |
| * compute the response and chunk it to the client. |
| */ |
| #define CHUNK_ROWS (10000) |
| static int app__chunk_command(ipc_server_reply_cb *reply_cb, |
| struct ipc_server_reply_data *reply_data) |
| { |
| struct strbuf buf = STRBUF_INIT; |
| int row; |
| int ret; |
| |
| for (row = 0; row < CHUNK_ROWS; row++) { |
| strbuf_setlen(&buf, 0); |
| strbuf_addf(&buf, "big: %.75d\n", row); |
| ret = reply_cb(reply_data, buf.buf, buf.len); |
| } |
| |
| strbuf_release(&buf); |
| |
| return ret; |
| } |
| |
| /* |
| * Slowly reply with a series of lines. This is to model an expensive to |
| * compute chunked response (which might happen if this callback is running |
| * in a thread and is fighting for a lock with other threads). |
| */ |
| #define SLOW_ROWS (1000) |
| #define SLOW_DELAY_MS (10) |
| static int app__slow_command(ipc_server_reply_cb *reply_cb, |
| struct ipc_server_reply_data *reply_data) |
| { |
| struct strbuf buf = STRBUF_INIT; |
| int row; |
| int ret; |
| |
| for (row = 0; row < SLOW_ROWS; row++) { |
| strbuf_setlen(&buf, 0); |
| strbuf_addf(&buf, "big: %.75d\n", row); |
| ret = reply_cb(reply_data, buf.buf, buf.len); |
| sleep_millisec(SLOW_DELAY_MS); |
| } |
| |
| strbuf_release(&buf); |
| |
| return ret; |
| } |
| |
| /* |
| * The client sent a command followed by a (possibly very) large buffer. |
| */ |
| static int app__sendbytes_command(const char *received, size_t received_len, |
| ipc_server_reply_cb *reply_cb, |
| struct ipc_server_reply_data *reply_data) |
| { |
| struct strbuf buf_resp = STRBUF_INIT; |
| const char *p = "?"; |
| int len_ballast = 0; |
| int k; |
| int errs = 0; |
| int ret; |
| |
| /* |
| * The test is setup to send: |
| * "sendbytes" SP <n * char> |
| */ |
| if (received_len < strlen("sendbytes ")) |
| BUG("received_len is short in app__sendbytes_command"); |
| |
| if (skip_prefix(received, "sendbytes ", &p)) |
| len_ballast = strlen(p); |
| |
| /* |
| * Verify that the ballast is n copies of a single letter. |
| * And that the multi-threaded IO layer didn't cross the streams. |
| */ |
| for (k = 1; k < len_ballast; k++) |
| if (p[k] != p[0]) |
| errs++; |
| |
| if (errs) |
| strbuf_addf(&buf_resp, "errs:%d\n", errs); |
| else |
| strbuf_addf(&buf_resp, "rcvd:%c%08d\n", p[0], len_ballast); |
| |
| ret = reply_cb(reply_data, buf_resp.buf, buf_resp.len); |
| |
| strbuf_release(&buf_resp); |
| |
| return ret; |
| } |
| |
| /* |
| * An arbitrary fixed address to verify that the application instance |
| * data is handled properly. |
| */ |
| static int my_app_data = 42; |
| |
| static ipc_server_application_cb test_app_cb; |
| |
| /* |
| * This is the "application callback" that sits on top of the |
| * "ipc-server". It completely defines the set of commands supported |
| * by this application. |
| */ |
| static int test_app_cb(void *application_data, |
| const char *command, size_t command_len, |
| ipc_server_reply_cb *reply_cb, |
| struct ipc_server_reply_data *reply_data) |
| { |
| /* |
| * Verify that we received the application-data that we passed |
| * when we started the ipc-server. (We have several layers of |
| * callbacks calling callbacks and it's easy to get things mixed |
| * up (especially when some are "void*").) |
| */ |
| if (application_data != (void*)&my_app_data) |
| BUG("application_cb: application_data pointer wrong"); |
| |
| if (command_len == 4 && !strncmp(command, "quit", 4)) { |
| /* |
| * The client sent a "quit" command. This is an async |
| * request for the server to shutdown. |
| * |
| * We DO NOT send the client a response message |
| * (because we have nothing to say and the other |
| * server threads have not yet stopped). |
| * |
| * Tell the ipc-server layer to start shutting down. |
| * This includes: stop listening for new connections |
| * on the socket/pipe and telling all worker threads |
| * to finish/drain their outgoing responses to other |
| * clients. |
| * |
| * This DOES NOT force an immediate sync shutdown. |
| */ |
| return SIMPLE_IPC_QUIT; |
| } |
| |
| if (command_len == 4 && !strncmp(command, "ping", 4)) { |
| const char *answer = "pong"; |
| return reply_cb(reply_data, answer, strlen(answer)); |
| } |
| |
| if (command_len == 3 && !strncmp(command, "big", 3)) |
| return app__big_command(reply_cb, reply_data); |
| |
| if (command_len == 5 && !strncmp(command, "chunk", 5)) |
| return app__chunk_command(reply_cb, reply_data); |
| |
| if (command_len == 4 && !strncmp(command, "slow", 4)) |
| return app__slow_command(reply_cb, reply_data); |
| |
| if (command_len >= 10 && starts_with(command, "sendbytes ")) |
| return app__sendbytes_command(command, command_len, |
| reply_cb, reply_data); |
| |
| return app__unhandled_command(command, reply_cb, reply_data); |
| } |
| |
| struct cl_args |
| { |
| const char *subcommand; |
| const char *path; |
| const char *token; |
| |
| int nr_threads; |
| int max_wait_sec; |
| int bytecount; |
| int batchsize; |
| |
| char bytevalue; |
| }; |
| |
| static struct cl_args cl_args = { |
| .subcommand = NULL, |
| .path = "ipc-test", |
| .token = NULL, |
| |
| .nr_threads = 5, |
| .max_wait_sec = 60, |
| .bytecount = 1024, |
| .batchsize = 10, |
| |
| .bytevalue = 'x', |
| }; |
| |
| /* |
| * This process will run as a simple-ipc server and listen for IPC commands |
| * from client processes. |
| */ |
| static int daemon__run_server(void) |
| { |
| int ret; |
| |
| struct ipc_server_opts opts = { |
| .nr_threads = cl_args.nr_threads, |
| }; |
| |
| /* |
| * Synchronously run the ipc-server. We don't need any application |
| * instance data, so pass an arbitrary pointer (that we'll later |
| * verify made the round trip). |
| */ |
| ret = ipc_server_run(cl_args.path, &opts, test_app_cb, (void*)&my_app_data); |
| if (ret == -2) |
| error("socket/pipe already in use: '%s'", cl_args.path); |
| else if (ret == -1) |
| error_errno("could not start server on: '%s'", cl_args.path); |
| |
| return ret; |
| } |
| |
| static start_bg_wait_cb bg_wait_cb; |
| |
| static int bg_wait_cb(const struct child_process *cp, void *cb_data) |
| { |
| int s = ipc_get_active_state(cl_args.path); |
| |
| switch (s) { |
| case IPC_STATE__LISTENING: |
| /* child is "ready" */ |
| return 0; |
| |
| case IPC_STATE__NOT_LISTENING: |
| case IPC_STATE__PATH_NOT_FOUND: |
| /* give child more time */ |
| return 1; |
| |
| default: |
| case IPC_STATE__INVALID_PATH: |
| case IPC_STATE__OTHER_ERROR: |
| /* all the time in world won't help */ |
| return -1; |
| } |
| } |
| |
| static int daemon__start_server(void) |
| { |
| struct child_process cp = CHILD_PROCESS_INIT; |
| enum start_bg_result sbgr; |
| |
| strvec_push(&cp.args, "test-tool"); |
| strvec_push(&cp.args, "simple-ipc"); |
| strvec_push(&cp.args, "run-daemon"); |
| strvec_pushf(&cp.args, "--name=%s", cl_args.path); |
| strvec_pushf(&cp.args, "--threads=%d", cl_args.nr_threads); |
| |
| cp.no_stdin = 1; |
| cp.no_stdout = 1; |
| cp.no_stderr = 1; |
| |
| sbgr = start_bg_command(&cp, bg_wait_cb, NULL, cl_args.max_wait_sec); |
| |
| switch (sbgr) { |
| case SBGR_READY: |
| return 0; |
| |
| default: |
| case SBGR_ERROR: |
| case SBGR_CB_ERROR: |
| return error("daemon failed to start"); |
| |
| case SBGR_TIMEOUT: |
| return error("daemon not online yet"); |
| |
| case SBGR_DIED: |
| return error("daemon terminated"); |
| } |
| } |
| |
| /* |
| * This process will run a quick probe to see if a simple-ipc server |
| * is active on this path. |
| * |
| * Returns 0 if the server is alive. |
| */ |
| static int client__probe_server(void) |
| { |
| enum ipc_active_state s; |
| |
| s = ipc_get_active_state(cl_args.path); |
| switch (s) { |
| case IPC_STATE__LISTENING: |
| return 0; |
| |
| case IPC_STATE__NOT_LISTENING: |
| return error("no server listening at '%s'", cl_args.path); |
| |
| case IPC_STATE__PATH_NOT_FOUND: |
| return error("path not found '%s'", cl_args.path); |
| |
| case IPC_STATE__INVALID_PATH: |
| return error("invalid pipe/socket name '%s'", cl_args.path); |
| |
| case IPC_STATE__OTHER_ERROR: |
| default: |
| return error("other error for '%s'", cl_args.path); |
| } |
| } |
| |
| /* |
| * Send an IPC command token to an already-running server daemon and |
| * print the response. |
| * |
| * This is a simple 1 word command/token that `test_app_cb()` (in the |
| * daemon process) will understand. |
| */ |
| static int client__send_ipc(void) |
| { |
| const char *command = "(no-command)"; |
| struct strbuf buf = STRBUF_INIT; |
| struct ipc_client_connect_options options |
| = IPC_CLIENT_CONNECT_OPTIONS_INIT; |
| |
| if (cl_args.token && *cl_args.token) |
| command = cl_args.token; |
| |
| options.wait_if_busy = 1; |
| options.wait_if_not_found = 0; |
| |
| if (!ipc_client_send_command(cl_args.path, &options, |
| command, strlen(command), |
| &buf)) { |
| if (buf.len) { |
| printf("%s\n", buf.buf); |
| fflush(stdout); |
| } |
| strbuf_release(&buf); |
| |
| return 0; |
| } |
| |
| return error("failed to send '%s' to '%s'", command, cl_args.path); |
| } |
| |
| /* |
| * Send an IPC command to an already-running server and ask it to |
| * shutdown. "send quit" is an async request and queues a shutdown |
| * event in the server, so we spin and wait here for it to actually |
| * shutdown to make the unit tests a little easier to write. |
| */ |
| static int client__stop_server(void) |
| { |
| int ret; |
| time_t time_limit, now; |
| enum ipc_active_state s; |
| |
| time(&time_limit); |
| time_limit += cl_args.max_wait_sec; |
| |
| cl_args.token = "quit"; |
| |
| ret = client__send_ipc(); |
| if (ret) |
| return ret; |
| |
| for (;;) { |
| sleep_millisec(100); |
| |
| s = ipc_get_active_state(cl_args.path); |
| |
| if (s != IPC_STATE__LISTENING) { |
| /* |
| * The socket/pipe is gone and/or has stopped |
| * responding. Lets assume that the daemon |
| * process has exited too. |
| */ |
| return 0; |
| } |
| |
| time(&now); |
| if (now > time_limit) |
| return error("daemon has not shutdown yet"); |
| } |
| } |
| |
| /* |
| * Send an IPC command followed by ballast to confirm that a large |
| * message can be sent and that the kernel or pkt-line layers will |
| * properly chunk it and that the daemon receives the entire message. |
| */ |
| static int do_sendbytes(int bytecount, char byte, const char *path, |
| const struct ipc_client_connect_options *options) |
| { |
| struct strbuf buf_send = STRBUF_INIT; |
| struct strbuf buf_resp = STRBUF_INIT; |
| |
| strbuf_addstr(&buf_send, "sendbytes "); |
| strbuf_addchars(&buf_send, byte, bytecount); |
| |
| if (!ipc_client_send_command(path, options, |
| buf_send.buf, buf_send.len, |
| &buf_resp)) { |
| strbuf_rtrim(&buf_resp); |
| printf("sent:%c%08d %s\n", byte, bytecount, buf_resp.buf); |
| fflush(stdout); |
| strbuf_release(&buf_send); |
| strbuf_release(&buf_resp); |
| |
| return 0; |
| } |
| |
| return error("client failed to sendbytes(%d, '%c') to '%s'", |
| bytecount, byte, path); |
| } |
| |
| /* |
| * Send an IPC command with ballast to an already-running server daemon. |
| */ |
| static int client__sendbytes(void) |
| { |
| struct ipc_client_connect_options options |
| = IPC_CLIENT_CONNECT_OPTIONS_INIT; |
| |
| options.wait_if_busy = 1; |
| options.wait_if_not_found = 0; |
| options.uds_disallow_chdir = 0; |
| |
| return do_sendbytes(cl_args.bytecount, cl_args.bytevalue, cl_args.path, |
| &options); |
| } |
| |
| struct multiple_thread_data { |
| pthread_t pthread_id; |
| struct multiple_thread_data *next; |
| const char *path; |
| int bytecount; |
| int batchsize; |
| int sum_errors; |
| int sum_good; |
| char letter; |
| }; |
| |
| static void *multiple_thread_proc(void *_multiple_thread_data) |
| { |
| struct multiple_thread_data *d = _multiple_thread_data; |
| int k; |
| struct ipc_client_connect_options options |
| = IPC_CLIENT_CONNECT_OPTIONS_INIT; |
| |
| options.wait_if_busy = 1; |
| options.wait_if_not_found = 0; |
| /* |
| * A multi-threaded client should not be randomly calling chdir(). |
| * The test will pass without this restriction because the test is |
| * not otherwise accessing the filesystem, but it makes us honest. |
| */ |
| options.uds_disallow_chdir = 1; |
| |
| trace2_thread_start("multiple"); |
| |
| for (k = 0; k < d->batchsize; k++) { |
| if (do_sendbytes(d->bytecount + k, d->letter, d->path, &options)) |
| d->sum_errors++; |
| else |
| d->sum_good++; |
| } |
| |
| trace2_thread_exit(); |
| return NULL; |
| } |
| |
| /* |
| * Start a client-side thread pool. Each thread sends a series of |
| * IPC requests. Each request is on a new connection to the server. |
| */ |
| static int client__multiple(void) |
| { |
| struct multiple_thread_data *list = NULL; |
| int k; |
| int sum_join_errors = 0; |
| int sum_thread_errors = 0; |
| int sum_good = 0; |
| |
| for (k = 0; k < cl_args.nr_threads; k++) { |
| struct multiple_thread_data *d = xcalloc(1, sizeof(*d)); |
| d->next = list; |
| d->path = cl_args.path; |
| d->bytecount = cl_args.bytecount + cl_args.batchsize*(k/26); |
| d->batchsize = cl_args.batchsize; |
| d->sum_errors = 0; |
| d->sum_good = 0; |
| d->letter = 'A' + (k % 26); |
| |
| if (pthread_create(&d->pthread_id, NULL, multiple_thread_proc, d)) { |
| warning("failed to create thread[%d] skipping remainder", k); |
| free(d); |
| break; |
| } |
| |
| list = d; |
| } |
| |
| while (list) { |
| struct multiple_thread_data *d = list; |
| |
| if (pthread_join(d->pthread_id, NULL)) |
| sum_join_errors++; |
| |
| sum_thread_errors += d->sum_errors; |
| sum_good += d->sum_good; |
| |
| list = d->next; |
| free(d); |
| } |
| |
| printf("client (good %d) (join %d), (errors %d)\n", |
| sum_good, sum_join_errors, sum_thread_errors); |
| |
| return (sum_join_errors + sum_thread_errors) ? 1 : 0; |
| } |
| |
| int cmd__simple_ipc(int argc, const char **argv) |
| { |
| const char * const simple_ipc_usage[] = { |
| N_("test-helper simple-ipc is-active [<name>] [<options>]"), |
| N_("test-helper simple-ipc run-daemon [<name>] [<threads>]"), |
| N_("test-helper simple-ipc start-daemon [<name>] [<threads>] [<max-wait>]"), |
| N_("test-helper simple-ipc stop-daemon [<name>] [<max-wait>]"), |
| N_("test-helper simple-ipc send [<name>] [<token>]"), |
| N_("test-helper simple-ipc sendbytes [<name>] [<bytecount>] [<byte>]"), |
| N_("test-helper simple-ipc multiple [<name>] [<threads>] [<bytecount>] [<batchsize>]"), |
| NULL |
| }; |
| |
| const char *bytevalue = NULL; |
| |
| struct option options[] = { |
| #ifndef GIT_WINDOWS_NATIVE |
| OPT_STRING(0, "name", &cl_args.path, N_("name"), N_("name or pathname of unix domain socket")), |
| #else |
| OPT_STRING(0, "name", &cl_args.path, N_("name"), N_("named-pipe name")), |
| #endif |
| OPT_INTEGER(0, "threads", &cl_args.nr_threads, N_("number of threads in server thread pool")), |
| OPT_INTEGER(0, "max-wait", &cl_args.max_wait_sec, N_("seconds to wait for daemon to start or stop")), |
| |
| OPT_INTEGER(0, "bytecount", &cl_args.bytecount, N_("number of bytes")), |
| OPT_INTEGER(0, "batchsize", &cl_args.batchsize, N_("number of requests per thread")), |
| |
| OPT_STRING(0, "byte", &bytevalue, N_("byte"), N_("ballast character")), |
| OPT_STRING(0, "token", &cl_args.token, N_("token"), N_("command token to send to the server")), |
| |
| OPT_END() |
| }; |
| |
| if (argc < 2) |
| usage_with_options(simple_ipc_usage, options); |
| |
| if (argc == 2 && !strcmp(argv[1], "-h")) |
| usage_with_options(simple_ipc_usage, options); |
| |
| if (argc == 2 && !strcmp(argv[1], "SUPPORTS_SIMPLE_IPC")) |
| return 0; |
| |
| cl_args.subcommand = argv[1]; |
| |
| argc--; |
| argv++; |
| |
| argc = parse_options(argc, argv, NULL, options, simple_ipc_usage, 0); |
| |
| if (cl_args.nr_threads < 1) |
| cl_args.nr_threads = 1; |
| if (cl_args.max_wait_sec < 0) |
| cl_args.max_wait_sec = 0; |
| if (cl_args.bytecount < 1) |
| cl_args.bytecount = 1; |
| if (cl_args.batchsize < 1) |
| cl_args.batchsize = 1; |
| |
| if (bytevalue && *bytevalue) |
| cl_args.bytevalue = bytevalue[0]; |
| |
| /* |
| * Use '!!' on all dispatch functions to map from `error()` style |
| * (returns -1) style to `test_must_fail` style (expects 1). This |
| * makes shell error messages less confusing. |
| */ |
| |
| if (!strcmp(cl_args.subcommand, "is-active")) |
| return !!client__probe_server(); |
| |
| if (!strcmp(cl_args.subcommand, "run-daemon")) |
| return !!daemon__run_server(); |
| |
| if (!strcmp(cl_args.subcommand, "start-daemon")) |
| return !!daemon__start_server(); |
| |
| /* |
| * Client commands follow. Ensure a server is running before |
| * sending any data. This might be overkill, but then again |
| * this is a test harness. |
| */ |
| |
| if (!strcmp(cl_args.subcommand, "stop-daemon")) { |
| if (client__probe_server()) |
| return 1; |
| return !!client__stop_server(); |
| } |
| |
| if (!strcmp(cl_args.subcommand, "send")) { |
| if (client__probe_server()) |
| return 1; |
| return !!client__send_ipc(); |
| } |
| |
| if (!strcmp(cl_args.subcommand, "sendbytes")) { |
| if (client__probe_server()) |
| return 1; |
| return !!client__sendbytes(); |
| } |
| |
| if (!strcmp(cl_args.subcommand, "multiple")) { |
| if (client__probe_server()) |
| return 1; |
| return !!client__multiple(); |
| } |
| |
| die("Unhandled subcommand: '%s'", cl_args.subcommand); |
| } |
| #endif |