Merge branch 'nd/threaded-index-pack'

Enables threading in index-pack to resolve base data in parallel.

By Nguyễn Thái Ngọc Duy (3) and Ramsay Jones (1)
* nd/threaded-index-pack:
  index-pack: disable threading if NO_PREAD is defined
  index-pack: support multithreaded delta resolving
  index-pack: restructure pack processing into three main functions
  compat/win32/pthread.h: Add an pthread_key_delete() implementation
diff --git a/builtin/index-pack.c b/builtin/index-pack.c
index 83555e5..dc2cfe6 100644
--- a/builtin/index-pack.c
+++ b/builtin/index-pack.c
@@ -9,6 +9,7 @@
 #include "progress.h"
 #include "fsck.h"
 #include "exec_cmd.h"
+#include "thread-utils.h"
 
 static const char index_pack_usage[] =
 "git index-pack [-v] [-o <index-file>] [--keep | --keep=<msg>] [--verify] [--strict] (<pack-file> | --stdin [--fix-thin] [<pack-file>])";
@@ -38,6 +39,19 @@
 	int ofs_first, ofs_last;
 };
 
+#if !defined(NO_PTHREADS) && defined(NO_PREAD)
+/* NO_PREAD uses compat/pread.c, which is not thread-safe. Disable threading. */
+#define NO_PTHREADS
+#endif
+
+struct thread_local {
+#ifndef NO_PTHREADS
+	pthread_t thread;
+#endif
+	struct base_data *base_cache;
+	size_t base_cache_used;
+};
+
 /*
  * Even if sizeof(union delta_base) == 24 on 64-bit archs, we really want
  * to memcmp() only the first 20 bytes.
@@ -54,11 +68,11 @@
 
 static struct object_entry *objects;
 static struct delta_entry *deltas;
-static struct base_data *base_cache;
-static size_t base_cache_used;
+static struct thread_local nothread_data;
 static int nr_objects;
 static int nr_deltas;
 static int nr_resolved_deltas;
+static int nr_threads;
 
 static int from_stdin;
 static int strict;
@@ -75,6 +89,77 @@
 static uint32_t input_crc32;
 static int input_fd, output_fd, pack_fd;
 
+#ifndef NO_PTHREADS
+
+static struct thread_local *thread_data;
+static int nr_dispatched;
+static int threads_active;
+
+static pthread_mutex_t read_mutex;
+#define read_lock()		lock_mutex(&read_mutex)
+#define read_unlock()		unlock_mutex(&read_mutex)
+
+static pthread_mutex_t counter_mutex;
+#define counter_lock()		lock_mutex(&counter_mutex)
+#define counter_unlock()	unlock_mutex(&counter_mutex)
+
+static pthread_mutex_t work_mutex;
+#define work_lock()		lock_mutex(&work_mutex)
+#define work_unlock()		unlock_mutex(&work_mutex)
+
+static pthread_key_t key;
+
+static inline void lock_mutex(pthread_mutex_t *mutex)
+{
+	if (threads_active)
+		pthread_mutex_lock(mutex);
+}
+
+static inline void unlock_mutex(pthread_mutex_t *mutex)
+{
+	if (threads_active)
+		pthread_mutex_unlock(mutex);
+}
+
+/*
+ * Mutex and conditional variable can't be statically-initialized on Windows.
+ */
+static void init_thread(void)
+{
+	init_recursive_mutex(&read_mutex);
+	pthread_mutex_init(&counter_mutex, NULL);
+	pthread_mutex_init(&work_mutex, NULL);
+	pthread_key_create(&key, NULL);
+	thread_data = xcalloc(nr_threads, sizeof(*thread_data));
+	threads_active = 1;
+}
+
+static void cleanup_thread(void)
+{
+	if (!threads_active)
+		return;
+	threads_active = 0;
+	pthread_mutex_destroy(&read_mutex);
+	pthread_mutex_destroy(&counter_mutex);
+	pthread_mutex_destroy(&work_mutex);
+	pthread_key_delete(key);
+	free(thread_data);
+}
+
+#else
+
+#define read_lock()
+#define read_unlock()
+
+#define counter_lock()
+#define counter_unlock()
+
+#define work_lock()
+#define work_unlock()
+
+#endif
+
+
 static int mark_link(struct object *obj, int type, void *data)
 {
 	if (!obj)
@@ -226,6 +311,25 @@
 	die(_("pack has bad object at offset %lu: %s"), offset, buf);
 }
 
+static inline struct thread_local *get_thread_data(void)
+{
+#ifndef NO_PTHREADS
+	if (threads_active)
+		return pthread_getspecific(key);
+	assert(!threads_active &&
+	       "This should only be reached when all threads are gone");
+#endif
+	return &nothread_data;
+}
+
+#ifndef NO_PTHREADS
+static void set_thread_data(struct thread_local *data)
+{
+	if (threads_active)
+		pthread_setspecific(key, data);
+}
+#endif
+
 static struct base_data *alloc_base_data(void)
 {
 	struct base_data *base = xmalloc(sizeof(struct base_data));
@@ -240,15 +344,16 @@
 	if (c->data) {
 		free(c->data);
 		c->data = NULL;
-		base_cache_used -= c->size;
+		get_thread_data()->base_cache_used -= c->size;
 	}
 }
 
 static void prune_base_data(struct base_data *retain)
 {
 	struct base_data *b;
-	for (b = base_cache;
-	     base_cache_used > delta_base_cache_limit && b;
+	struct thread_local *data = get_thread_data();
+	for (b = data->base_cache;
+	     data->base_cache_used > delta_base_cache_limit && b;
 	     b = b->child) {
 		if (b->data && b != retain)
 			free_base_data(b);
@@ -260,12 +365,12 @@
 	if (base)
 		base->child = c;
 	else
-		base_cache = c;
+		get_thread_data()->base_cache = c;
 
 	c->base = base;
 	c->child = NULL;
 	if (c->data)
-		base_cache_used += c->size;
+		get_thread_data()->base_cache_used += c->size;
 	prune_base_data(c);
 }
 
@@ -275,7 +380,7 @@
 	if (base)
 		base->child = NULL;
 	else
-		base_cache = NULL;
+		get_thread_data()->base_cache = NULL;
 	free_base_data(c);
 }
 
@@ -467,19 +572,24 @@
 			enum object_type type, unsigned char *sha1)
 {
 	hash_sha1_file(data, size, typename(type), sha1);
+	read_lock();
 	if (has_sha1_file(sha1)) {
 		void *has_data;
 		enum object_type has_type;
 		unsigned long has_size;
 		has_data = read_sha1_file(sha1, &has_type, &has_size);
+		read_unlock();
 		if (!has_data)
 			die(_("cannot read existing object %s"), sha1_to_hex(sha1));
 		if (size != has_size || type != has_type ||
 		    memcmp(data, has_data, size) != 0)
 			die(_("SHA1 COLLISION FOUND WITH %s !"), sha1_to_hex(sha1));
 		free(has_data);
-	}
+	} else
+		read_unlock();
+
 	if (strict) {
+		read_lock();
 		if (type == OBJ_BLOB) {
 			struct blob *blob = lookup_blob(sha1);
 			if (blob)
@@ -513,6 +623,7 @@
 			}
 			obj->flags |= FLAG_CHECKED;
 		}
+		read_unlock();
 	}
 }
 
@@ -558,7 +669,7 @@
 		if (!delta_nr) {
 			c->data = get_data_from_pack(obj);
 			c->size = obj->size;
-			base_cache_used += c->size;
+			get_thread_data()->base_cache_used += c->size;
 			prune_base_data(c);
 		}
 		for (; delta_nr > 0; delta_nr--) {
@@ -574,7 +685,7 @@
 			free(raw);
 			if (!c->data)
 				bad_object(obj->idx.offset, _("failed to apply delta"));
-			base_cache_used += c->size;
+			get_thread_data()->base_cache_used += c->size;
 			prune_base_data(c);
 		}
 		free(delta);
@@ -602,7 +713,9 @@
 		bad_object(delta_obj->idx.offset, _("failed to apply delta"));
 	sha1_object(result->data, result->size, delta_obj->real_type,
 		    delta_obj->idx.sha1);
+	counter_lock();
 	nr_resolved_deltas++;
+	counter_unlock();
 }
 
 static struct base_data *find_unresolved_deltas_1(struct base_data *base,
@@ -688,19 +801,50 @@
 				   objects[delta_b->obj_no].type);
 }
 
-/* Parse all objects and return the pack content SHA1 hash */
+static void resolve_base(struct object_entry *obj)
+{
+	struct base_data *base_obj = alloc_base_data();
+	base_obj->obj = obj;
+	base_obj->data = NULL;
+	find_unresolved_deltas(base_obj);
+}
+
+#ifndef NO_PTHREADS
+static void *threaded_second_pass(void *data)
+{
+	set_thread_data(data);
+	for (;;) {
+		int i;
+		work_lock();
+		display_progress(progress, nr_resolved_deltas);
+		while (nr_dispatched < nr_objects &&
+		       is_delta_type(objects[nr_dispatched].type))
+			nr_dispatched++;
+		if (nr_dispatched >= nr_objects) {
+			work_unlock();
+			break;
+		}
+		i = nr_dispatched++;
+		work_unlock();
+
+		resolve_base(&objects[i]);
+	}
+	return NULL;
+}
+#endif
+
+/*
+ * First pass:
+ * - find locations of all objects;
+ * - calculate SHA1 of all non-delta objects;
+ * - remember base (SHA1 or offset) for all deltas.
+ */
 static void parse_pack_objects(unsigned char *sha1)
 {
 	int i;
 	struct delta_entry *delta = deltas;
 	struct stat st;
 
-	/*
-	 * First pass:
-	 * - find locations of all objects;
-	 * - calculate SHA1 of all non-delta objects;
-	 * - remember base (SHA1 or offset) for all deltas.
-	 */
 	if (verbose)
 		progress = start_progress(
 				from_stdin ? _("Receiving objects") : _("Indexing objects"),
@@ -734,6 +878,19 @@
 	if (S_ISREG(st.st_mode) &&
 			lseek(input_fd, 0, SEEK_CUR) - input_len != st.st_size)
 		die(_("pack has junk at the end"));
+}
+
+/*
+ * Second pass:
+ * - for all non-delta objects, look if it is used as a base for
+ *   deltas;
+ * - if used as a base, uncompress the object and apply all deltas,
+ *   recursively checking if the resulting object is used as a base
+ *   for some more deltas.
+ */
+static void resolve_deltas(void)
+{
+	int i;
 
 	if (!nr_deltas)
 		return;
@@ -742,29 +899,83 @@
 	qsort(deltas, nr_deltas, sizeof(struct delta_entry),
 	      compare_delta_entry);
 
-	/*
-	 * Second pass:
-	 * - for all non-delta objects, look if it is used as a base for
-	 *   deltas;
-	 * - if used as a base, uncompress the object and apply all deltas,
-	 *   recursively checking if the resulting object is used as a base
-	 *   for some more deltas.
-	 */
 	if (verbose)
 		progress = start_progress(_("Resolving deltas"), nr_deltas);
+
+#ifndef NO_PTHREADS
+	nr_dispatched = 0;
+	if (nr_threads > 1 || getenv("GIT_FORCE_THREADS")) {
+		init_thread();
+		for (i = 0; i < nr_threads; i++) {
+			int ret = pthread_create(&thread_data[i].thread, NULL,
+						 threaded_second_pass, thread_data + i);
+			if (ret)
+				die("unable to create thread: %s", strerror(ret));
+		}
+		for (i = 0; i < nr_threads; i++)
+			pthread_join(thread_data[i].thread, NULL);
+		cleanup_thread();
+		return;
+	}
+#endif
+
 	for (i = 0; i < nr_objects; i++) {
 		struct object_entry *obj = &objects[i];
-		struct base_data *base_obj = alloc_base_data();
 
 		if (is_delta_type(obj->type))
 			continue;
-		base_obj->obj = obj;
-		base_obj->data = NULL;
-		find_unresolved_deltas(base_obj);
+		resolve_base(obj);
 		display_progress(progress, nr_resolved_deltas);
 	}
 }
 
+/*
+ * Third pass:
+ * - append objects to convert thin pack to full pack if required
+ * - write the final 20-byte SHA-1
+ */
+static void fix_unresolved_deltas(struct sha1file *f, int nr_unresolved);
+static void conclude_pack(int fix_thin_pack, const char *curr_pack, unsigned char *pack_sha1)
+{
+	if (nr_deltas == nr_resolved_deltas) {
+		stop_progress(&progress);
+		/* Flush remaining pack final 20-byte SHA1. */
+		flush();
+		return;
+	}
+
+	if (fix_thin_pack) {
+		struct sha1file *f;
+		unsigned char read_sha1[20], tail_sha1[20];
+		char msg[48];
+		int nr_unresolved = nr_deltas - nr_resolved_deltas;
+		int nr_objects_initial = nr_objects;
+		if (nr_unresolved <= 0)
+			die(_("confusion beyond insanity"));
+		objects = xrealloc(objects,
+				   (nr_objects + nr_unresolved + 1)
+				   * sizeof(*objects));
+		f = sha1fd(output_fd, curr_pack);
+		fix_unresolved_deltas(f, nr_unresolved);
+		sprintf(msg, "completed with %d local objects",
+			nr_objects - nr_objects_initial);
+		stop_progress_msg(&progress, msg);
+		sha1close(f, tail_sha1, 0);
+		hashcpy(read_sha1, pack_sha1);
+		fixup_pack_header_footer(output_fd, pack_sha1,
+					 curr_pack, nr_objects,
+					 read_sha1, consumed_bytes-20);
+		if (hashcmp(read_sha1, tail_sha1) != 0)
+			die("Unexpected tail checksum for %s "
+			    "(disk corruption?)", curr_pack);
+	}
+	if (nr_deltas != nr_resolved_deltas)
+		die(Q_("pack has %d unresolved delta",
+		       "pack has %d unresolved deltas",
+		       nr_deltas - nr_resolved_deltas),
+		    nr_deltas - nr_resolved_deltas);
+}
+
 static int write_compressed(struct sha1file *f, void *in, unsigned int size)
 {
 	git_zstream stream;
@@ -968,6 +1179,18 @@
 			die("bad pack.indexversion=%"PRIu32, opts->version);
 		return 0;
 	}
+	if (!strcmp(k, "pack.threads")) {
+		nr_threads = git_config_int(k, v);
+		if (nr_threads < 0)
+			die("invalid number of threads specified (%d)",
+			    nr_threads);
+#ifdef NO_PTHREADS
+		if (nr_threads != 1)
+			warning("no threads support, ignoring %s", k);
+		nr_threads = 1;
+#endif
+		return 0;
+	}
 	return git_default_config(k, v, cb);
 }
 
@@ -1129,6 +1352,17 @@
 				keep_msg = "";
 			} else if (!prefixcmp(arg, "--keep=")) {
 				keep_msg = arg + 7;
+			} else if (!prefixcmp(arg, "--threads=")) {
+				char *end;
+				nr_threads = strtoul(arg+10, &end, 0);
+				if (!arg[10] || *end || nr_threads < 0)
+					usage(index_pack_usage);
+#ifdef NO_PTHREADS
+				if (nr_threads != 1)
+					warning("no threads support, "
+						"ignoring %s", arg);
+				nr_threads = 1;
+#endif
 			} else if (!prefixcmp(arg, "--pack_header=")) {
 				struct pack_header *hdr;
 				char *c;
@@ -1200,47 +1434,22 @@
 	if (strict)
 		opts.flags |= WRITE_IDX_STRICT;
 
+#ifndef NO_PTHREADS
+	if (!nr_threads) {
+		nr_threads = online_cpus();
+		/* An experiment showed that more threads does not mean faster */
+		if (nr_threads > 3)
+			nr_threads = 3;
+	}
+#endif
+
 	curr_pack = open_pack_file(pack_name);
 	parse_pack_header();
 	objects = xcalloc(nr_objects + 1, sizeof(struct object_entry));
 	deltas = xcalloc(nr_objects, sizeof(struct delta_entry));
 	parse_pack_objects(pack_sha1);
-	if (nr_deltas == nr_resolved_deltas) {
-		stop_progress(&progress);
-		/* Flush remaining pack final 20-byte SHA1. */
-		flush();
-	} else {
-		if (fix_thin_pack) {
-			struct sha1file *f;
-			unsigned char read_sha1[20], tail_sha1[20];
-			char msg[48];
-			int nr_unresolved = nr_deltas - nr_resolved_deltas;
-			int nr_objects_initial = nr_objects;
-			if (nr_unresolved <= 0)
-				die(_("confusion beyond insanity"));
-			objects = xrealloc(objects,
-					   (nr_objects + nr_unresolved + 1)
-					   * sizeof(*objects));
-			f = sha1fd(output_fd, curr_pack);
-			fix_unresolved_deltas(f, nr_unresolved);
-			sprintf(msg, "completed with %d local objects",
-				nr_objects - nr_objects_initial);
-			stop_progress_msg(&progress, msg);
-			sha1close(f, tail_sha1, 0);
-			hashcpy(read_sha1, pack_sha1);
-			fixup_pack_header_footer(output_fd, pack_sha1,
-						 curr_pack, nr_objects,
-						 read_sha1, consumed_bytes-20);
-			if (hashcmp(read_sha1, tail_sha1) != 0)
-				die("Unexpected tail checksum for %s "
-				    "(disk corruption?)", curr_pack);
-		}
-		if (nr_deltas != nr_resolved_deltas)
-			die(Q_("pack has %d unresolved delta",
-			       "pack has %d unresolved deltas",
-			       nr_deltas - nr_resolved_deltas),
-			    nr_deltas - nr_resolved_deltas);
-	}
+	resolve_deltas();
+	conclude_pack(fix_thin_pack, curr_pack, pack_sha1);
 	free(deltas);
 	if (strict)
 		check_objects();