[PATCH] Parallelize pulling by ssh

This causes ssh-pull to request objects in prefetch() and read then in
fetch(), such that it reduces the unpipelined round-trip time.

This also makes sha1_write_from_fd() support having a buffer of data
which it accidentally read from the fd after the object; this was
formerly not a problem, because it would always get a short read at
the end of an object, because the next object had not been
requested. This is no longer true.

Signed-off-by: Daniel Barkalow <barkalow@iabervon.org>
Signed-off-by: Junio C Hamano <junkio@cox.net>
diff --git a/cache.h b/cache.h
index 381db35..1b49f0f 100644
--- a/cache.h
+++ b/cache.h
@@ -198,7 +198,8 @@
 /* Read a tree into the cache */
 extern int read_tree(void *buffer, unsigned long size, int stage, const char **paths);
 
-extern int write_sha1_from_fd(const unsigned char *sha1, int fd);
+extern int write_sha1_from_fd(const unsigned char *sha1, int fd, char *buffer,
+			      size_t bufsize, size_t *bufposn);
 extern int write_sha1_to_fd(int fd, const unsigned char *sha1);
 
 extern int has_sha1_pack(const unsigned char *sha1);
diff --git a/sha1_file.c b/sha1_file.c
index e808c91..df5eb2a 100644
--- a/sha1_file.c
+++ b/sha1_file.c
@@ -1389,14 +1389,14 @@
 	return 0;
 }
 
-int write_sha1_from_fd(const unsigned char *sha1, int fd)
+int write_sha1_from_fd(const unsigned char *sha1, int fd, char *buffer,
+		       size_t bufsize, size_t *bufposn)
 {
 	char *filename = sha1_file_name(sha1);
 
 	int local;
 	z_stream stream;
 	unsigned char real_sha1[20];
-	unsigned char buf[4096];
 	unsigned char discard[4096];
 	int ret;
 	SHA_CTX c;
@@ -1414,7 +1414,24 @@
 
 	do {
 		ssize_t size;
-		size = read(fd, buf, 4096);
+		if (*bufposn) {
+			stream.avail_in = *bufposn;
+			stream.next_in = buffer;
+			do {
+				stream.next_out = discard;
+				stream.avail_out = sizeof(discard);
+				ret = inflate(&stream, Z_SYNC_FLUSH);
+				SHA1_Update(&c, discard, sizeof(discard) -
+					    stream.avail_out);
+			} while (stream.avail_in && ret == Z_OK);
+			write(local, buffer, *bufposn - stream.avail_in);
+			memmove(buffer, buffer + *bufposn - stream.avail_in,
+				stream.avail_in);
+			*bufposn = stream.avail_in;
+			if (ret != Z_OK)
+				break;
+		}
+		size = read(fd, buffer + *bufposn, bufsize - *bufposn);
 		if (size <= 0) {
 			close(local);
 			unlink(filename);
@@ -1423,18 +1440,8 @@
 			perror("Reading from connection");
 			return -1;
 		}
-		write(local, buf, size);
-		stream.avail_in = size;
-		stream.next_in = buf;
-		do {
-			stream.next_out = discard;
-			stream.avail_out = sizeof(discard);
-			ret = inflate(&stream, Z_SYNC_FLUSH);
-			SHA1_Update(&c, discard, sizeof(discard) -
-				    stream.avail_out);
-		} while (stream.avail_in && ret == Z_OK);
-		
-	} while (ret == Z_OK);
+		*bufposn += size;
+	} while (1);
 	inflateEnd(&stream);
 
 	close(local);
diff --git a/ssh-pull.c b/ssh-pull.c
index 4cf9b6a..bdc99df 100644
--- a/ssh-pull.c
+++ b/ssh-pull.c
@@ -10,24 +10,49 @@
 static unsigned char remote_version = 0;
 static unsigned char local_version = 1;
 
+ssize_t force_write(int fd, void *buffer, size_t length)
+{
+	ssize_t ret = 0;
+	while (ret < length) {
+		ssize_t size = write(fd, buffer + ret, length - ret);
+		if (size < 0) {
+			return size;
+		}
+		if (size == 0) {
+			return ret;
+		}
+		ret += size;
+	}
+	return ret;
+}
+
 void prefetch(unsigned char *sha1)
 {
+	char type = 'o';
+	force_write(fd_out, &type, 1);
+	force_write(fd_out, sha1, 20);
+	//memcpy(requested + 20 * prefetches++, sha1, 20);
 }
 
+static char conn_buf[4096];
+static size_t conn_buf_posn = 0;
+
 int fetch(unsigned char *sha1)
 {
 	int ret;
 	signed char remote;
-	char type = 'o';
-	if (has_sha1_file(sha1))
-		return 0;
-	write(fd_out, &type, 1);
-	write(fd_out, sha1, 20);
-	if (read(fd_in, &remote, 1) < 1)
-		return -1;
+
+	if (conn_buf_posn) {
+		remote = conn_buf[0];
+		memmove(conn_buf, conn_buf + 1, --conn_buf_posn);
+	} else {
+		if (read(fd_in, &remote, 1) < 1)
+			return -1;
+	}
+	//fprintf(stderr, "Got %d\n", remote);
 	if (remote < 0)
 		return remote;
-	ret = write_sha1_from_fd(sha1, fd_in);
+	ret = write_sha1_from_fd(sha1, fd_in, conn_buf, 4096, &conn_buf_posn);
 	if (!ret)
 		pull_say("got %s\n", sha1_to_hex(sha1));
 	return ret;