direct-io: move aio_complete into ->end_io

Filesystems with unwritten extent support must not complete an AIO request
until the transaction to convert the extent has been commited.  That means
the aio_complete calls needs to be moved into the ->end_io callback so
that the filesystem can control when to call it exactly.

This makes a bit of a mess out of dio_complete and the ->end_io callback
prototype even more complicated.

Signed-off-by: Christoph Hellwig <hch@lst.de>
Reviewed-by: Jan Kara <jack@suse.cz>
Signed-off-by: Alex Elder <aelder@sgi.com>
diff --git a/fs/direct-io.c b/fs/direct-io.c
index 7600aac..a10cb91 100644
--- a/fs/direct-io.c
+++ b/fs/direct-io.c
@@ -218,7 +218,7 @@
  * filesystems can use it to hold additional state between get_block calls and
  * dio_complete.
  */
-static int dio_complete(struct dio *dio, loff_t offset, int ret)
+static int dio_complete(struct dio *dio, loff_t offset, int ret, bool is_async)
 {
 	ssize_t transferred = 0;
 
@@ -239,14 +239,6 @@
 			transferred = dio->i_size - offset;
 	}
 
-	if (dio->end_io && dio->result)
-		dio->end_io(dio->iocb, offset, transferred,
-			    dio->map_bh.b_private);
-
-	if (dio->flags & DIO_LOCKING)
-		/* lockdep: non-owner release */
-		up_read_non_owner(&dio->inode->i_alloc_sem);
-
 	if (ret == 0)
 		ret = dio->page_errors;
 	if (ret == 0)
@@ -254,6 +246,17 @@
 	if (ret == 0)
 		ret = transferred;
 
+	if (dio->end_io && dio->result) {
+		dio->end_io(dio->iocb, offset, transferred,
+			    dio->map_bh.b_private, ret, is_async);
+	} else if (is_async) {
+		aio_complete(dio->iocb, ret, 0);
+	}
+
+	if (dio->flags & DIO_LOCKING)
+		/* lockdep: non-owner release */
+		up_read_non_owner(&dio->inode->i_alloc_sem);
+
 	return ret;
 }
 
@@ -277,8 +280,7 @@
 	spin_unlock_irqrestore(&dio->bio_lock, flags);
 
 	if (remaining == 0) {
-		int ret = dio_complete(dio, dio->iocb->ki_pos, 0);
-		aio_complete(dio->iocb, ret, 0);
+		dio_complete(dio, dio->iocb->ki_pos, 0, true);
 		kfree(dio);
 	}
 }
@@ -1126,7 +1128,7 @@
 	spin_unlock_irqrestore(&dio->bio_lock, flags);
 
 	if (ret2 == 0) {
-		ret = dio_complete(dio, offset, ret);
+		ret = dio_complete(dio, offset, ret, false);
 		kfree(dio);
 	} else
 		BUG_ON(ret != -EIOCBQUEUED);