Elijah Newren | 5579f44 | 2023-04-11 00:41:48 -0700 | [diff] [blame] | 1 | #include "git-compat-util.h" |
Elijah Newren | f394e09 | 2023-03-21 06:25:54 +0000 | [diff] [blame] | 2 | #include "gettext.h" |
Jeff Hostetler | 7cd5dbc | 2021-03-22 10:29:47 +0000 | [diff] [blame] | 3 | #include "simple-ipc.h" |
| 4 | #include "strbuf.h" |
Jeff Hostetler | 7cd5dbc | 2021-03-22 10:29:47 +0000 | [diff] [blame] | 5 | #include "thread-utils.h" |
Elijah Newren | 74ea5c9 | 2023-04-11 03:00:38 +0000 | [diff] [blame] | 6 | #include "trace2.h" |
Jeff Hostetler | 7cd5dbc | 2021-03-22 10:29:47 +0000 | [diff] [blame] | 7 | #include "unix-socket.h" |
| 8 | #include "unix-stream-server.h" |
| 9 | |
Jeff Hostetler | 6aac70a | 2021-05-20 18:28:10 +0000 | [diff] [blame] | 10 | #ifndef SUPPORTS_SIMPLE_IPC |
| 11 | /* |
| 12 | * This source file should only be compiled when Simple IPC is supported. |
| 13 | * See the top-level Makefile. |
| 14 | */ |
| 15 | #error SUPPORTS_SIMPLE_IPC not defined |
Jeff Hostetler | 7cd5dbc | 2021-03-22 10:29:47 +0000 | [diff] [blame] | 16 | #endif |
| 17 | |
| 18 | enum ipc_active_state ipc_get_active_state(const char *path) |
| 19 | { |
| 20 | enum ipc_active_state state = IPC_STATE__OTHER_ERROR; |
| 21 | struct ipc_client_connect_options options |
| 22 | = IPC_CLIENT_CONNECT_OPTIONS_INIT; |
| 23 | struct stat st; |
| 24 | struct ipc_client_connection *connection_test = NULL; |
| 25 | |
| 26 | options.wait_if_busy = 0; |
| 27 | options.wait_if_not_found = 0; |
| 28 | |
| 29 | if (lstat(path, &st) == -1) { |
| 30 | switch (errno) { |
| 31 | case ENOENT: |
| 32 | case ENOTDIR: |
| 33 | return IPC_STATE__NOT_LISTENING; |
| 34 | default: |
| 35 | return IPC_STATE__INVALID_PATH; |
| 36 | } |
| 37 | } |
| 38 | |
Johannes Schindelin | 974ef7c | 2021-11-10 11:09:10 +0000 | [diff] [blame] | 39 | #ifdef __CYGWIN__ |
| 40 | /* |
| 41 | * Cygwin emulates Unix sockets by writing special-crafted files whose |
| 42 | * `system` bit is set. |
| 43 | * |
| 44 | * If we are too fast, Cygwin might still be in the process of marking |
| 45 | * the underlying file as a system file. Until then, we will not see a |
| 46 | * Unix socket here, but a plain file instead. Just in case that this |
| 47 | * is happening, wait a little and try again. |
| 48 | */ |
| 49 | { |
| 50 | static const int delay[] = { 1, 10, 20, 40, -1 }; |
| 51 | int i; |
| 52 | |
| 53 | for (i = 0; S_ISREG(st.st_mode) && delay[i] > 0; i++) { |
| 54 | sleep_millisec(delay[i]); |
| 55 | if (lstat(path, &st) == -1) |
| 56 | return IPC_STATE__INVALID_PATH; |
| 57 | } |
| 58 | } |
| 59 | #endif |
| 60 | |
Jeff Hostetler | 7cd5dbc | 2021-03-22 10:29:47 +0000 | [diff] [blame] | 61 | /* also complain if a plain file is in the way */ |
| 62 | if ((st.st_mode & S_IFMT) != S_IFSOCK) |
| 63 | return IPC_STATE__INVALID_PATH; |
| 64 | |
| 65 | /* |
| 66 | * Just because the filesystem has a S_IFSOCK type inode |
| 67 | * at `path`, doesn't mean it that there is a server listening. |
| 68 | * Ping it to be sure. |
| 69 | */ |
| 70 | state = ipc_client_try_connect(path, &options, &connection_test); |
| 71 | ipc_client_close_connection(connection_test); |
| 72 | |
| 73 | return state; |
| 74 | } |
| 75 | |
| 76 | /* |
| 77 | * Retry frequency when trying to connect to a server. |
| 78 | * |
| 79 | * This value should be short enough that we don't seriously delay our |
| 80 | * caller, but not fast enough that our spinning puts pressure on the |
| 81 | * system. |
| 82 | */ |
| 83 | #define WAIT_STEP_MS (50) |
| 84 | |
| 85 | /* |
| 86 | * Try to connect to the server. If the server is just starting up or |
| 87 | * is very busy, we may not get a connection the first time. |
| 88 | */ |
| 89 | static enum ipc_active_state connect_to_server( |
| 90 | const char *path, |
| 91 | int timeout_ms, |
| 92 | const struct ipc_client_connect_options *options, |
| 93 | int *pfd) |
| 94 | { |
| 95 | int k; |
| 96 | |
| 97 | *pfd = -1; |
| 98 | |
| 99 | for (k = 0; k < timeout_ms; k += WAIT_STEP_MS) { |
| 100 | int fd = unix_stream_connect(path, options->uds_disallow_chdir); |
| 101 | |
| 102 | if (fd != -1) { |
| 103 | *pfd = fd; |
| 104 | return IPC_STATE__LISTENING; |
| 105 | } |
| 106 | |
| 107 | if (errno == ENOENT) { |
| 108 | if (!options->wait_if_not_found) |
| 109 | return IPC_STATE__PATH_NOT_FOUND; |
| 110 | |
| 111 | goto sleep_and_try_again; |
| 112 | } |
| 113 | |
| 114 | if (errno == ETIMEDOUT) { |
| 115 | if (!options->wait_if_busy) |
| 116 | return IPC_STATE__NOT_LISTENING; |
| 117 | |
| 118 | goto sleep_and_try_again; |
| 119 | } |
| 120 | |
| 121 | if (errno == ECONNREFUSED) { |
| 122 | if (!options->wait_if_busy) |
| 123 | return IPC_STATE__NOT_LISTENING; |
| 124 | |
| 125 | goto sleep_and_try_again; |
| 126 | } |
| 127 | |
| 128 | return IPC_STATE__OTHER_ERROR; |
| 129 | |
| 130 | sleep_and_try_again: |
| 131 | sleep_millisec(WAIT_STEP_MS); |
| 132 | } |
| 133 | |
| 134 | return IPC_STATE__NOT_LISTENING; |
| 135 | } |
| 136 | |
| 137 | /* |
| 138 | * The total amount of time that we are willing to wait when trying to |
| 139 | * connect to a server. |
| 140 | * |
| 141 | * When the server is first started, it might take a little while for |
| 142 | * it to become ready to service requests. Likewise, the server may |
| 143 | * be very (temporarily) busy and not respond to our connections. |
| 144 | * |
| 145 | * We should gracefully and silently handle those conditions and try |
| 146 | * again for a reasonable time period. |
| 147 | * |
| 148 | * The value chosen here should be long enough for the server |
| 149 | * to reliably heal from the above conditions. |
| 150 | */ |
| 151 | #define MY_CONNECTION_TIMEOUT_MS (1000) |
| 152 | |
| 153 | enum ipc_active_state ipc_client_try_connect( |
| 154 | const char *path, |
| 155 | const struct ipc_client_connect_options *options, |
| 156 | struct ipc_client_connection **p_connection) |
| 157 | { |
| 158 | enum ipc_active_state state = IPC_STATE__OTHER_ERROR; |
| 159 | int fd = -1; |
| 160 | |
| 161 | *p_connection = NULL; |
| 162 | |
| 163 | trace2_region_enter("ipc-client", "try-connect", NULL); |
| 164 | trace2_data_string("ipc-client", NULL, "try-connect/path", path); |
| 165 | |
| 166 | state = connect_to_server(path, MY_CONNECTION_TIMEOUT_MS, |
| 167 | options, &fd); |
| 168 | |
| 169 | trace2_data_intmax("ipc-client", NULL, "try-connect/state", |
| 170 | (intmax_t)state); |
| 171 | trace2_region_leave("ipc-client", "try-connect", NULL); |
| 172 | |
| 173 | if (state == IPC_STATE__LISTENING) { |
| 174 | (*p_connection) = xcalloc(1, sizeof(struct ipc_client_connection)); |
| 175 | (*p_connection)->fd = fd; |
| 176 | } |
| 177 | |
| 178 | return state; |
| 179 | } |
| 180 | |
| 181 | void ipc_client_close_connection(struct ipc_client_connection *connection) |
| 182 | { |
| 183 | if (!connection) |
| 184 | return; |
| 185 | |
| 186 | if (connection->fd != -1) |
| 187 | close(connection->fd); |
| 188 | |
| 189 | free(connection); |
| 190 | } |
| 191 | |
| 192 | int ipc_client_send_command_to_connection( |
| 193 | struct ipc_client_connection *connection, |
Jeff Hostetler | a3e2033 | 2021-09-20 15:36:13 +0000 | [diff] [blame] | 194 | const char *message, size_t message_len, |
| 195 | struct strbuf *answer) |
Jeff Hostetler | 7cd5dbc | 2021-03-22 10:29:47 +0000 | [diff] [blame] | 196 | { |
| 197 | int ret = 0; |
| 198 | |
| 199 | strbuf_setlen(answer, 0); |
| 200 | |
| 201 | trace2_region_enter("ipc-client", "send-command", NULL); |
| 202 | |
Jeff Hostetler | a3e2033 | 2021-09-20 15:36:13 +0000 | [diff] [blame] | 203 | if (write_packetized_from_buf_no_flush(message, message_len, |
Jeff Hostetler | 7cd5dbc | 2021-03-22 10:29:47 +0000 | [diff] [blame] | 204 | connection->fd) < 0 || |
| 205 | packet_flush_gently(connection->fd) < 0) { |
| 206 | ret = error(_("could not send IPC command")); |
| 207 | goto done; |
| 208 | } |
| 209 | |
| 210 | if (read_packetized_to_strbuf( |
| 211 | connection->fd, answer, |
| 212 | PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR) < 0) { |
| 213 | ret = error(_("could not read IPC response")); |
| 214 | goto done; |
| 215 | } |
| 216 | |
| 217 | done: |
| 218 | trace2_region_leave("ipc-client", "send-command", NULL); |
| 219 | return ret; |
| 220 | } |
| 221 | |
| 222 | int ipc_client_send_command(const char *path, |
| 223 | const struct ipc_client_connect_options *options, |
Jeff Hostetler | a3e2033 | 2021-09-20 15:36:13 +0000 | [diff] [blame] | 224 | const char *message, size_t message_len, |
| 225 | struct strbuf *answer) |
Jeff Hostetler | 7cd5dbc | 2021-03-22 10:29:47 +0000 | [diff] [blame] | 226 | { |
| 227 | int ret = -1; |
| 228 | enum ipc_active_state state; |
| 229 | struct ipc_client_connection *connection = NULL; |
| 230 | |
| 231 | state = ipc_client_try_connect(path, options, &connection); |
| 232 | |
| 233 | if (state != IPC_STATE__LISTENING) |
| 234 | return ret; |
| 235 | |
Jeff Hostetler | a3e2033 | 2021-09-20 15:36:13 +0000 | [diff] [blame] | 236 | ret = ipc_client_send_command_to_connection(connection, |
| 237 | message, message_len, |
| 238 | answer); |
Jeff Hostetler | 7cd5dbc | 2021-03-22 10:29:47 +0000 | [diff] [blame] | 239 | |
| 240 | ipc_client_close_connection(connection); |
| 241 | |
| 242 | return ret; |
| 243 | } |
| 244 | |
| 245 | static int set_socket_blocking_flag(int fd, int make_nonblocking) |
| 246 | { |
| 247 | int flags; |
| 248 | |
| 249 | flags = fcntl(fd, F_GETFL, NULL); |
| 250 | |
| 251 | if (flags < 0) |
| 252 | return -1; |
| 253 | |
| 254 | if (make_nonblocking) |
| 255 | flags |= O_NONBLOCK; |
| 256 | else |
| 257 | flags &= ~O_NONBLOCK; |
| 258 | |
| 259 | return fcntl(fd, F_SETFL, flags); |
| 260 | } |
| 261 | |
| 262 | /* |
| 263 | * Magic numbers used to annotate callback instance data. |
| 264 | * These are used to help guard against accidentally passing the |
| 265 | * wrong instance data across multiple levels of callbacks (which |
| 266 | * is easy to do if there are `void*` arguments). |
| 267 | */ |
| 268 | enum magic { |
| 269 | MAGIC_SERVER_REPLY_DATA, |
| 270 | MAGIC_WORKER_THREAD_DATA, |
| 271 | MAGIC_ACCEPT_THREAD_DATA, |
| 272 | MAGIC_SERVER_DATA, |
| 273 | }; |
| 274 | |
| 275 | struct ipc_server_reply_data { |
| 276 | enum magic magic; |
| 277 | int fd; |
| 278 | struct ipc_worker_thread_data *worker_thread_data; |
| 279 | }; |
| 280 | |
| 281 | struct ipc_worker_thread_data { |
| 282 | enum magic magic; |
| 283 | struct ipc_worker_thread_data *next_thread; |
| 284 | struct ipc_server_data *server_data; |
| 285 | pthread_t pthread_id; |
| 286 | }; |
| 287 | |
| 288 | struct ipc_accept_thread_data { |
| 289 | enum magic magic; |
| 290 | struct ipc_server_data *server_data; |
| 291 | |
| 292 | struct unix_ss_socket *server_socket; |
| 293 | |
| 294 | int fd_send_shutdown; |
| 295 | int fd_wait_shutdown; |
| 296 | pthread_t pthread_id; |
| 297 | }; |
| 298 | |
| 299 | /* |
| 300 | * With unix-sockets, the conceptual "ipc-server" is implemented as a single |
| 301 | * controller "accept-thread" thread and a pool of "worker-thread" threads. |
| 302 | * The former does the usual `accept()` loop and dispatches connections |
| 303 | * to an idle worker thread. The worker threads wait in an idle loop for |
| 304 | * a new connection, communicate with the client and relay data to/from |
| 305 | * the `application_cb` and then wait for another connection from the |
| 306 | * server thread. This avoids the overhead of constantly creating and |
| 307 | * destroying threads. |
| 308 | */ |
| 309 | struct ipc_server_data { |
| 310 | enum magic magic; |
| 311 | ipc_server_application_cb *application_cb; |
| 312 | void *application_data; |
| 313 | struct strbuf buf_path; |
| 314 | |
| 315 | struct ipc_accept_thread_data *accept_thread; |
| 316 | struct ipc_worker_thread_data *worker_thread_list; |
| 317 | |
| 318 | pthread_mutex_t work_available_mutex; |
| 319 | pthread_cond_t work_available_cond; |
| 320 | |
| 321 | /* |
| 322 | * Accepted but not yet processed client connections are kept |
| 323 | * in a circular buffer FIFO. The queue is empty when the |
| 324 | * positions are equal. |
| 325 | */ |
| 326 | int *fifo_fds; |
| 327 | int queue_size; |
| 328 | int back_pos; |
| 329 | int front_pos; |
| 330 | |
| 331 | int shutdown_requested; |
| 332 | int is_stopped; |
| 333 | }; |
| 334 | |
| 335 | /* |
| 336 | * Remove and return the oldest queued connection. |
| 337 | * |
| 338 | * Returns -1 if empty. |
| 339 | */ |
| 340 | static int fifo_dequeue(struct ipc_server_data *server_data) |
| 341 | { |
| 342 | /* ASSERT holding mutex */ |
| 343 | |
| 344 | int fd; |
| 345 | |
| 346 | if (server_data->back_pos == server_data->front_pos) |
| 347 | return -1; |
| 348 | |
| 349 | fd = server_data->fifo_fds[server_data->front_pos]; |
| 350 | server_data->fifo_fds[server_data->front_pos] = -1; |
| 351 | |
| 352 | server_data->front_pos++; |
| 353 | if (server_data->front_pos == server_data->queue_size) |
| 354 | server_data->front_pos = 0; |
| 355 | |
| 356 | return fd; |
| 357 | } |
| 358 | |
| 359 | /* |
| 360 | * Push a new fd onto the back of the queue. |
| 361 | * |
| 362 | * Drop it and return -1 if queue is already full. |
| 363 | */ |
| 364 | static int fifo_enqueue(struct ipc_server_data *server_data, int fd) |
| 365 | { |
| 366 | /* ASSERT holding mutex */ |
| 367 | |
| 368 | int next_back_pos; |
| 369 | |
| 370 | next_back_pos = server_data->back_pos + 1; |
| 371 | if (next_back_pos == server_data->queue_size) |
| 372 | next_back_pos = 0; |
| 373 | |
| 374 | if (next_back_pos == server_data->front_pos) { |
| 375 | /* Queue is full. Just drop it. */ |
| 376 | close(fd); |
| 377 | return -1; |
| 378 | } |
| 379 | |
| 380 | server_data->fifo_fds[server_data->back_pos] = fd; |
| 381 | server_data->back_pos = next_back_pos; |
| 382 | |
| 383 | return fd; |
| 384 | } |
| 385 | |
| 386 | /* |
| 387 | * Wait for a connection to be queued to the FIFO and return it. |
| 388 | * |
| 389 | * Returns -1 if someone has already requested a shutdown. |
| 390 | */ |
| 391 | static int worker_thread__wait_for_connection( |
| 392 | struct ipc_worker_thread_data *worker_thread_data) |
| 393 | { |
| 394 | /* ASSERT NOT holding mutex */ |
| 395 | |
| 396 | struct ipc_server_data *server_data = worker_thread_data->server_data; |
| 397 | int fd = -1; |
| 398 | |
| 399 | pthread_mutex_lock(&server_data->work_available_mutex); |
| 400 | for (;;) { |
| 401 | if (server_data->shutdown_requested) |
| 402 | break; |
| 403 | |
| 404 | fd = fifo_dequeue(server_data); |
| 405 | if (fd >= 0) |
| 406 | break; |
| 407 | |
| 408 | pthread_cond_wait(&server_data->work_available_cond, |
| 409 | &server_data->work_available_mutex); |
| 410 | } |
| 411 | pthread_mutex_unlock(&server_data->work_available_mutex); |
| 412 | |
| 413 | return fd; |
| 414 | } |
| 415 | |
| 416 | /* |
| 417 | * Forward declare our reply callback function so that any compiler |
| 418 | * errors are reported when we actually define the function (in addition |
| 419 | * to any errors reported when we try to pass this callback function as |
| 420 | * a parameter in a function call). The former are easier to understand. |
| 421 | */ |
| 422 | static ipc_server_reply_cb do_io_reply_callback; |
| 423 | |
| 424 | /* |
| 425 | * Relay application's response message to the client process. |
| 426 | * (We do not flush at this point because we allow the caller |
| 427 | * to chunk data to the client thru us.) |
| 428 | */ |
| 429 | static int do_io_reply_callback(struct ipc_server_reply_data *reply_data, |
| 430 | const char *response, size_t response_len) |
| 431 | { |
| 432 | if (reply_data->magic != MAGIC_SERVER_REPLY_DATA) |
| 433 | BUG("reply_cb called with wrong instance data"); |
| 434 | |
| 435 | return write_packetized_from_buf_no_flush(response, response_len, |
| 436 | reply_data->fd); |
| 437 | } |
| 438 | |
| 439 | /* A randomly chosen value. */ |
| 440 | #define MY_WAIT_POLL_TIMEOUT_MS (10) |
| 441 | |
| 442 | /* |
| 443 | * If the client hangs up without sending any data on the wire, just |
| 444 | * quietly close the socket and ignore this client. |
| 445 | * |
| 446 | * This worker thread is committed to reading the IPC request data |
| 447 | * from the client at the other end of this fd. Wait here for the |
| 448 | * client to actually put something on the wire -- because if the |
| 449 | * client just does a ping (connect and hangup without sending any |
| 450 | * data), our use of the pkt-line read routines will spew an error |
| 451 | * message. |
| 452 | * |
| 453 | * Return -1 if the client hung up. |
| 454 | * Return 0 if data (possibly incomplete) is ready. |
| 455 | */ |
| 456 | static int worker_thread__wait_for_io_start( |
| 457 | struct ipc_worker_thread_data *worker_thread_data, |
| 458 | int fd) |
| 459 | { |
| 460 | struct ipc_server_data *server_data = worker_thread_data->server_data; |
| 461 | struct pollfd pollfd[1]; |
| 462 | int result; |
| 463 | |
| 464 | for (;;) { |
| 465 | pollfd[0].fd = fd; |
| 466 | pollfd[0].events = POLLIN; |
| 467 | |
| 468 | result = poll(pollfd, 1, MY_WAIT_POLL_TIMEOUT_MS); |
| 469 | if (result < 0) { |
| 470 | if (errno == EINTR) |
| 471 | continue; |
| 472 | goto cleanup; |
| 473 | } |
| 474 | |
| 475 | if (result == 0) { |
| 476 | /* a timeout */ |
| 477 | |
| 478 | int in_shutdown; |
| 479 | |
| 480 | pthread_mutex_lock(&server_data->work_available_mutex); |
| 481 | in_shutdown = server_data->shutdown_requested; |
| 482 | pthread_mutex_unlock(&server_data->work_available_mutex); |
| 483 | |
| 484 | /* |
| 485 | * If a shutdown is already in progress and this |
| 486 | * client has not started talking yet, just drop it. |
| 487 | */ |
| 488 | if (in_shutdown) |
| 489 | goto cleanup; |
| 490 | continue; |
| 491 | } |
| 492 | |
| 493 | if (pollfd[0].revents & POLLHUP) |
| 494 | goto cleanup; |
| 495 | |
| 496 | if (pollfd[0].revents & POLLIN) |
| 497 | return 0; |
| 498 | |
| 499 | goto cleanup; |
| 500 | } |
| 501 | |
| 502 | cleanup: |
| 503 | close(fd); |
| 504 | return -1; |
| 505 | } |
| 506 | |
| 507 | /* |
| 508 | * Receive the request/command from the client and pass it to the |
| 509 | * registered request-callback. The request-callback will compose |
| 510 | * a response and call our reply-callback to send it to the client. |
| 511 | */ |
| 512 | static int worker_thread__do_io( |
| 513 | struct ipc_worker_thread_data *worker_thread_data, |
| 514 | int fd) |
| 515 | { |
| 516 | /* ASSERT NOT holding lock */ |
| 517 | |
| 518 | struct strbuf buf = STRBUF_INIT; |
| 519 | struct ipc_server_reply_data reply_data; |
| 520 | int ret = 0; |
| 521 | |
| 522 | reply_data.magic = MAGIC_SERVER_REPLY_DATA; |
| 523 | reply_data.worker_thread_data = worker_thread_data; |
| 524 | |
| 525 | reply_data.fd = fd; |
| 526 | |
| 527 | ret = read_packetized_to_strbuf( |
| 528 | reply_data.fd, &buf, |
| 529 | PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR); |
| 530 | if (ret >= 0) { |
| 531 | ret = worker_thread_data->server_data->application_cb( |
| 532 | worker_thread_data->server_data->application_data, |
Jeff Hostetler | a3e2033 | 2021-09-20 15:36:13 +0000 | [diff] [blame] | 533 | buf.buf, buf.len, do_io_reply_callback, &reply_data); |
Jeff Hostetler | 7cd5dbc | 2021-03-22 10:29:47 +0000 | [diff] [blame] | 534 | |
| 535 | packet_flush_gently(reply_data.fd); |
| 536 | } |
| 537 | else { |
| 538 | /* |
| 539 | * The client probably disconnected/shutdown before it |
| 540 | * could send a well-formed message. Ignore it. |
| 541 | */ |
| 542 | } |
| 543 | |
| 544 | strbuf_release(&buf); |
| 545 | close(reply_data.fd); |
| 546 | |
| 547 | return ret; |
| 548 | } |
| 549 | |
| 550 | /* |
| 551 | * Block SIGPIPE on the current thread (so that we get EPIPE from |
| 552 | * write() rather than an actual signal). |
| 553 | * |
| 554 | * Note that using sigchain_push() and _pop() to control SIGPIPE |
| 555 | * around our IO calls is not thread safe: |
| 556 | * [] It uses a global stack of handler frames. |
| 557 | * [] It uses ALLOC_GROW() to resize it. |
| 558 | * [] Finally, according to the `signal(2)` man-page: |
| 559 | * "The effects of `signal()` in a multithreaded process are unspecified." |
| 560 | */ |
| 561 | static void thread_block_sigpipe(sigset_t *old_set) |
| 562 | { |
| 563 | sigset_t new_set; |
| 564 | |
| 565 | sigemptyset(&new_set); |
| 566 | sigaddset(&new_set, SIGPIPE); |
| 567 | |
| 568 | sigemptyset(old_set); |
| 569 | pthread_sigmask(SIG_BLOCK, &new_set, old_set); |
| 570 | } |
| 571 | |
| 572 | /* |
| 573 | * Thread proc for an IPC worker thread. It handles a series of |
| 574 | * connections from clients. It pulls the next fd from the queue |
| 575 | * processes it, and then waits for the next client. |
| 576 | * |
| 577 | * Block SIGPIPE in this worker thread for the life of the thread. |
| 578 | * This avoids stray (and sometimes delayed) SIGPIPE signals caused |
| 579 | * by client errors and/or when we are under extremely heavy IO load. |
| 580 | * |
| 581 | * This means that the application callback will have SIGPIPE blocked. |
| 582 | * The callback should not change it. |
| 583 | */ |
| 584 | static void *worker_thread_proc(void *_worker_thread_data) |
| 585 | { |
| 586 | struct ipc_worker_thread_data *worker_thread_data = _worker_thread_data; |
| 587 | struct ipc_server_data *server_data = worker_thread_data->server_data; |
| 588 | sigset_t old_set; |
| 589 | int fd, io; |
| 590 | int ret; |
| 591 | |
| 592 | trace2_thread_start("ipc-worker"); |
| 593 | |
| 594 | thread_block_sigpipe(&old_set); |
| 595 | |
| 596 | for (;;) { |
| 597 | fd = worker_thread__wait_for_connection(worker_thread_data); |
| 598 | if (fd == -1) |
| 599 | break; /* in shutdown */ |
| 600 | |
| 601 | io = worker_thread__wait_for_io_start(worker_thread_data, fd); |
| 602 | if (io == -1) |
| 603 | continue; /* client hung up without sending anything */ |
| 604 | |
| 605 | ret = worker_thread__do_io(worker_thread_data, fd); |
| 606 | |
| 607 | if (ret == SIMPLE_IPC_QUIT) { |
| 608 | trace2_data_string("ipc-worker", NULL, "queue_stop_async", |
| 609 | "application_quit"); |
| 610 | /* |
| 611 | * The application layer is telling the ipc-server |
| 612 | * layer to shutdown. |
| 613 | * |
| 614 | * We DO NOT have a response to send to the client. |
| 615 | * |
| 616 | * Queue an async stop (to stop the other threads) and |
| 617 | * allow this worker thread to exit now (no sense waiting |
| 618 | * for the thread-pool shutdown signal). |
| 619 | * |
| 620 | * Other non-idle worker threads are allowed to finish |
| 621 | * responding to their current clients. |
| 622 | */ |
| 623 | ipc_server_stop_async(server_data); |
| 624 | break; |
| 625 | } |
| 626 | } |
| 627 | |
| 628 | trace2_thread_exit(); |
| 629 | return NULL; |
| 630 | } |
| 631 | |
| 632 | /* A randomly chosen value. */ |
| 633 | #define MY_ACCEPT_POLL_TIMEOUT_MS (60 * 1000) |
| 634 | |
| 635 | /* |
| 636 | * Accept a new client connection on our socket. This uses non-blocking |
| 637 | * IO so that we can also wait for shutdown requests on our socket-pair |
| 638 | * without actually spinning on a fast timeout. |
| 639 | */ |
| 640 | static int accept_thread__wait_for_connection( |
| 641 | struct ipc_accept_thread_data *accept_thread_data) |
| 642 | { |
| 643 | struct pollfd pollfd[2]; |
| 644 | int result; |
| 645 | |
| 646 | for (;;) { |
| 647 | pollfd[0].fd = accept_thread_data->fd_wait_shutdown; |
| 648 | pollfd[0].events = POLLIN; |
| 649 | |
| 650 | pollfd[1].fd = accept_thread_data->server_socket->fd_socket; |
| 651 | pollfd[1].events = POLLIN; |
| 652 | |
| 653 | result = poll(pollfd, 2, MY_ACCEPT_POLL_TIMEOUT_MS); |
| 654 | if (result < 0) { |
| 655 | if (errno == EINTR) |
| 656 | continue; |
| 657 | return result; |
| 658 | } |
| 659 | |
| 660 | if (result == 0) { |
| 661 | /* a timeout */ |
| 662 | |
| 663 | /* |
| 664 | * If someone deletes or force-creates a new unix |
| 665 | * domain socket at our path, all future clients |
| 666 | * will be routed elsewhere and we silently starve. |
| 667 | * If that happens, just queue a shutdown. |
| 668 | */ |
| 669 | if (unix_ss_was_stolen( |
| 670 | accept_thread_data->server_socket)) { |
| 671 | trace2_data_string("ipc-accept", NULL, |
| 672 | "queue_stop_async", |
| 673 | "socket_stolen"); |
| 674 | ipc_server_stop_async( |
| 675 | accept_thread_data->server_data); |
| 676 | } |
| 677 | continue; |
| 678 | } |
| 679 | |
| 680 | if (pollfd[0].revents & POLLIN) { |
| 681 | /* shutdown message queued to socketpair */ |
| 682 | return -1; |
| 683 | } |
| 684 | |
| 685 | if (pollfd[1].revents & POLLIN) { |
| 686 | /* a connection is available on server_socket */ |
| 687 | |
| 688 | int client_fd = |
| 689 | accept(accept_thread_data->server_socket->fd_socket, |
| 690 | NULL, NULL); |
| 691 | if (client_fd >= 0) |
| 692 | return client_fd; |
| 693 | |
| 694 | /* |
| 695 | * An error here is unlikely -- it probably |
| 696 | * indicates that the connecting process has |
| 697 | * already dropped the connection. |
| 698 | */ |
| 699 | continue; |
| 700 | } |
| 701 | |
| 702 | BUG("unandled poll result errno=%d r[0]=%d r[1]=%d", |
| 703 | errno, pollfd[0].revents, pollfd[1].revents); |
| 704 | } |
| 705 | } |
| 706 | |
| 707 | /* |
| 708 | * Thread proc for the IPC server "accept thread". This waits for |
| 709 | * an incoming socket connection, appends it to the queue of available |
| 710 | * connections, and notifies a worker thread to process it. |
| 711 | * |
| 712 | * Block SIGPIPE in this thread for the life of the thread. This |
| 713 | * avoids any stray SIGPIPE signals when closing pipe fds under |
| 714 | * extremely heavy loads (such as when the fifo queue is full and we |
| 715 | * drop incomming connections). |
| 716 | */ |
| 717 | static void *accept_thread_proc(void *_accept_thread_data) |
| 718 | { |
| 719 | struct ipc_accept_thread_data *accept_thread_data = _accept_thread_data; |
| 720 | struct ipc_server_data *server_data = accept_thread_data->server_data; |
| 721 | sigset_t old_set; |
| 722 | |
| 723 | trace2_thread_start("ipc-accept"); |
| 724 | |
| 725 | thread_block_sigpipe(&old_set); |
| 726 | |
| 727 | for (;;) { |
| 728 | int client_fd = accept_thread__wait_for_connection( |
| 729 | accept_thread_data); |
| 730 | |
| 731 | pthread_mutex_lock(&server_data->work_available_mutex); |
| 732 | if (server_data->shutdown_requested) { |
| 733 | pthread_mutex_unlock(&server_data->work_available_mutex); |
| 734 | if (client_fd >= 0) |
| 735 | close(client_fd); |
| 736 | break; |
| 737 | } |
| 738 | |
| 739 | if (client_fd < 0) { |
| 740 | /* ignore transient accept() errors */ |
| 741 | } |
| 742 | else { |
| 743 | fifo_enqueue(server_data, client_fd); |
| 744 | pthread_cond_broadcast(&server_data->work_available_cond); |
| 745 | } |
| 746 | pthread_mutex_unlock(&server_data->work_available_mutex); |
| 747 | } |
| 748 | |
| 749 | trace2_thread_exit(); |
| 750 | return NULL; |
| 751 | } |
| 752 | |
| 753 | /* |
| 754 | * We can't predict the connection arrival rate relative to the worker |
| 755 | * processing rate, therefore we allow the "accept-thread" to queue up |
| 756 | * a generous number of connections, since we'd rather have the client |
| 757 | * not unnecessarily timeout if we can avoid it. (The assumption is |
| 758 | * that this will be used for FSMonitor and a few second wait on a |
| 759 | * connection is better than having the client timeout and do the full |
| 760 | * computation itself.) |
| 761 | * |
| 762 | * The FIFO queue size is set to a multiple of the worker pool size. |
| 763 | * This value chosen at random. |
| 764 | */ |
| 765 | #define FIFO_SCALE (100) |
| 766 | |
| 767 | /* |
| 768 | * The backlog value for `listen(2)`. This doesn't need to huge, |
| 769 | * rather just large enough for our "accept-thread" to wake up and |
| 770 | * queue incoming connections onto the FIFO without the kernel |
| 771 | * dropping any. |
| 772 | * |
| 773 | * This value chosen at random. |
| 774 | */ |
| 775 | #define LISTEN_BACKLOG (50) |
| 776 | |
| 777 | static int create_listener_socket( |
| 778 | const char *path, |
| 779 | const struct ipc_server_opts *ipc_opts, |
| 780 | struct unix_ss_socket **new_server_socket) |
| 781 | { |
| 782 | struct unix_ss_socket *server_socket = NULL; |
| 783 | struct unix_stream_listen_opts uslg_opts = UNIX_STREAM_LISTEN_OPTS_INIT; |
| 784 | int ret; |
| 785 | |
| 786 | uslg_opts.listen_backlog_size = LISTEN_BACKLOG; |
| 787 | uslg_opts.disallow_chdir = ipc_opts->uds_disallow_chdir; |
| 788 | |
| 789 | ret = unix_ss_create(path, &uslg_opts, -1, &server_socket); |
| 790 | if (ret) |
| 791 | return ret; |
| 792 | |
| 793 | if (set_socket_blocking_flag(server_socket->fd_socket, 1)) { |
| 794 | int saved_errno = errno; |
| 795 | unix_ss_free(server_socket); |
| 796 | errno = saved_errno; |
| 797 | return -1; |
| 798 | } |
| 799 | |
| 800 | *new_server_socket = server_socket; |
| 801 | |
| 802 | trace2_data_string("ipc-server", NULL, "listen-with-lock", path); |
| 803 | return 0; |
| 804 | } |
| 805 | |
| 806 | static int setup_listener_socket( |
| 807 | const char *path, |
| 808 | const struct ipc_server_opts *ipc_opts, |
| 809 | struct unix_ss_socket **new_server_socket) |
| 810 | { |
| 811 | int ret, saved_errno; |
| 812 | |
| 813 | trace2_region_enter("ipc-server", "create-listener_socket", NULL); |
| 814 | |
| 815 | ret = create_listener_socket(path, ipc_opts, new_server_socket); |
| 816 | |
| 817 | saved_errno = errno; |
| 818 | trace2_region_leave("ipc-server", "create-listener_socket", NULL); |
| 819 | errno = saved_errno; |
| 820 | |
| 821 | return ret; |
| 822 | } |
| 823 | |
| 824 | /* |
| 825 | * Start IPC server in a pool of background threads. |
| 826 | */ |
| 827 | int ipc_server_run_async(struct ipc_server_data **returned_server_data, |
| 828 | const char *path, const struct ipc_server_opts *opts, |
| 829 | ipc_server_application_cb *application_cb, |
| 830 | void *application_data) |
| 831 | { |
| 832 | struct unix_ss_socket *server_socket = NULL; |
| 833 | struct ipc_server_data *server_data; |
| 834 | int sv[2]; |
| 835 | int k; |
| 836 | int ret; |
| 837 | int nr_threads = opts->nr_threads; |
| 838 | |
| 839 | *returned_server_data = NULL; |
| 840 | |
| 841 | /* |
| 842 | * Create a socketpair and set sv[1] to non-blocking. This |
| 843 | * will used to send a shutdown message to the accept-thread |
| 844 | * and allows the accept-thread to wait on EITHER a client |
| 845 | * connection or a shutdown request without spinning. |
| 846 | */ |
| 847 | if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) < 0) |
| 848 | return -1; |
| 849 | |
| 850 | if (set_socket_blocking_flag(sv[1], 1)) { |
| 851 | int saved_errno = errno; |
| 852 | close(sv[0]); |
| 853 | close(sv[1]); |
| 854 | errno = saved_errno; |
| 855 | return -1; |
| 856 | } |
| 857 | |
| 858 | ret = setup_listener_socket(path, opts, &server_socket); |
| 859 | if (ret) { |
| 860 | int saved_errno = errno; |
| 861 | close(sv[0]); |
| 862 | close(sv[1]); |
| 863 | errno = saved_errno; |
| 864 | return ret; |
| 865 | } |
| 866 | |
| 867 | server_data = xcalloc(1, sizeof(*server_data)); |
| 868 | server_data->magic = MAGIC_SERVER_DATA; |
| 869 | server_data->application_cb = application_cb; |
| 870 | server_data->application_data = application_data; |
| 871 | strbuf_init(&server_data->buf_path, 0); |
| 872 | strbuf_addstr(&server_data->buf_path, path); |
| 873 | |
| 874 | if (nr_threads < 1) |
| 875 | nr_threads = 1; |
| 876 | |
| 877 | pthread_mutex_init(&server_data->work_available_mutex, NULL); |
| 878 | pthread_cond_init(&server_data->work_available_cond, NULL); |
| 879 | |
| 880 | server_data->queue_size = nr_threads * FIFO_SCALE; |
| 881 | CALLOC_ARRAY(server_data->fifo_fds, server_data->queue_size); |
| 882 | |
| 883 | server_data->accept_thread = |
| 884 | xcalloc(1, sizeof(*server_data->accept_thread)); |
| 885 | server_data->accept_thread->magic = MAGIC_ACCEPT_THREAD_DATA; |
| 886 | server_data->accept_thread->server_data = server_data; |
| 887 | server_data->accept_thread->server_socket = server_socket; |
| 888 | server_data->accept_thread->fd_send_shutdown = sv[0]; |
| 889 | server_data->accept_thread->fd_wait_shutdown = sv[1]; |
| 890 | |
| 891 | if (pthread_create(&server_data->accept_thread->pthread_id, NULL, |
| 892 | accept_thread_proc, server_data->accept_thread)) |
| 893 | die_errno(_("could not start accept_thread '%s'"), path); |
| 894 | |
| 895 | for (k = 0; k < nr_threads; k++) { |
| 896 | struct ipc_worker_thread_data *wtd; |
| 897 | |
| 898 | wtd = xcalloc(1, sizeof(*wtd)); |
| 899 | wtd->magic = MAGIC_WORKER_THREAD_DATA; |
| 900 | wtd->server_data = server_data; |
| 901 | |
| 902 | if (pthread_create(&wtd->pthread_id, NULL, worker_thread_proc, |
| 903 | wtd)) { |
| 904 | if (k == 0) |
| 905 | die(_("could not start worker[0] for '%s'"), |
| 906 | path); |
| 907 | /* |
| 908 | * Limp along with the thread pool that we have. |
| 909 | */ |
| 910 | break; |
| 911 | } |
| 912 | |
| 913 | wtd->next_thread = server_data->worker_thread_list; |
| 914 | server_data->worker_thread_list = wtd; |
| 915 | } |
| 916 | |
| 917 | *returned_server_data = server_data; |
| 918 | return 0; |
| 919 | } |
| 920 | |
| 921 | /* |
| 922 | * Gently tell the IPC server treads to shutdown. |
| 923 | * Can be run on any thread. |
| 924 | */ |
| 925 | int ipc_server_stop_async(struct ipc_server_data *server_data) |
| 926 | { |
| 927 | /* ASSERT NOT holding mutex */ |
| 928 | |
| 929 | int fd; |
| 930 | |
| 931 | if (!server_data) |
| 932 | return 0; |
| 933 | |
| 934 | trace2_region_enter("ipc-server", "server-stop-async", NULL); |
| 935 | |
| 936 | pthread_mutex_lock(&server_data->work_available_mutex); |
| 937 | |
| 938 | server_data->shutdown_requested = 1; |
| 939 | |
| 940 | /* |
| 941 | * Write a byte to the shutdown socket pair to wake up the |
| 942 | * accept-thread. |
| 943 | */ |
| 944 | if (write(server_data->accept_thread->fd_send_shutdown, "Q", 1) < 0) |
| 945 | error_errno("could not write to fd_send_shutdown"); |
| 946 | |
| 947 | /* |
| 948 | * Drain the queue of existing connections. |
| 949 | */ |
| 950 | while ((fd = fifo_dequeue(server_data)) != -1) |
| 951 | close(fd); |
| 952 | |
| 953 | /* |
| 954 | * Gently tell worker threads to stop processing new connections |
| 955 | * and exit. (This does not abort in-process conversations.) |
| 956 | */ |
| 957 | pthread_cond_broadcast(&server_data->work_available_cond); |
| 958 | |
| 959 | pthread_mutex_unlock(&server_data->work_available_mutex); |
| 960 | |
| 961 | trace2_region_leave("ipc-server", "server-stop-async", NULL); |
| 962 | |
| 963 | return 0; |
| 964 | } |
| 965 | |
| 966 | /* |
| 967 | * Wait for all IPC server threads to stop. |
| 968 | */ |
| 969 | int ipc_server_await(struct ipc_server_data *server_data) |
| 970 | { |
| 971 | pthread_join(server_data->accept_thread->pthread_id, NULL); |
| 972 | |
| 973 | if (!server_data->shutdown_requested) |
| 974 | BUG("ipc-server: accept-thread stopped for '%s'", |
| 975 | server_data->buf_path.buf); |
| 976 | |
| 977 | while (server_data->worker_thread_list) { |
| 978 | struct ipc_worker_thread_data *wtd = |
| 979 | server_data->worker_thread_list; |
| 980 | |
| 981 | pthread_join(wtd->pthread_id, NULL); |
| 982 | |
| 983 | server_data->worker_thread_list = wtd->next_thread; |
| 984 | free(wtd); |
| 985 | } |
| 986 | |
| 987 | server_data->is_stopped = 1; |
| 988 | |
| 989 | return 0; |
| 990 | } |
| 991 | |
| 992 | void ipc_server_free(struct ipc_server_data *server_data) |
| 993 | { |
| 994 | struct ipc_accept_thread_data * accept_thread_data; |
| 995 | |
| 996 | if (!server_data) |
| 997 | return; |
| 998 | |
| 999 | if (!server_data->is_stopped) |
| 1000 | BUG("cannot free ipc-server while running for '%s'", |
| 1001 | server_data->buf_path.buf); |
| 1002 | |
| 1003 | accept_thread_data = server_data->accept_thread; |
| 1004 | if (accept_thread_data) { |
| 1005 | unix_ss_free(accept_thread_data->server_socket); |
| 1006 | |
| 1007 | if (accept_thread_data->fd_send_shutdown != -1) |
| 1008 | close(accept_thread_data->fd_send_shutdown); |
| 1009 | if (accept_thread_data->fd_wait_shutdown != -1) |
| 1010 | close(accept_thread_data->fd_wait_shutdown); |
| 1011 | |
| 1012 | free(server_data->accept_thread); |
| 1013 | } |
| 1014 | |
| 1015 | while (server_data->worker_thread_list) { |
| 1016 | struct ipc_worker_thread_data *wtd = |
| 1017 | server_data->worker_thread_list; |
| 1018 | |
| 1019 | server_data->worker_thread_list = wtd->next_thread; |
| 1020 | free(wtd); |
| 1021 | } |
| 1022 | |
| 1023 | pthread_cond_destroy(&server_data->work_available_cond); |
| 1024 | pthread_mutex_destroy(&server_data->work_available_mutex); |
| 1025 | |
| 1026 | strbuf_release(&server_data->buf_path); |
| 1027 | |
| 1028 | free(server_data->fifo_fds); |
| 1029 | free(server_data); |
| 1030 | } |