Btrfs: Wait for kernel threads to make progress during async submission

Before this change, btrfs would use a bdi congestion function to make
sure there weren't too many pending async checksum work items.

This change makes the process creating async work items wait instead,
leading to fewer congestion returns from the bdi.  This improves
pdflush background_writeout scanning.

Signed-off-by: Chris Mason <chris.mason@oracle.com>
diff --git a/fs/btrfs/disk-io.c b/fs/btrfs/disk-io.c
index 1bf210d..1aed1f4 100644
--- a/fs/btrfs/disk-io.c
+++ b/fs/btrfs/disk-io.c
@@ -429,31 +429,36 @@
 	return 0;
 }
 
+static unsigned long async_submit_limit(struct btrfs_fs_info *info)
+{
+	unsigned long limit = min_t(unsigned long,
+				    info->workers.max_workers,
+				    info->fs_devices->open_devices);
+	return 256 * limit;
+}
+
 int btrfs_congested_async(struct btrfs_fs_info *info, int iodone)
 {
-	int limit = 256 * info->fs_devices->open_devices;
-
-	if (iodone)
-		limit = (limit * 3) / 2;
-	if (atomic_read(&info->nr_async_submits) > limit)
-		return 1;
-
-	return atomic_read(&info->nr_async_bios) > limit;
+	return atomic_read(&info->nr_async_bios) > async_submit_limit(info);
 }
 
 static void run_one_async_submit(struct btrfs_work *work)
 {
 	struct btrfs_fs_info *fs_info;
 	struct async_submit_bio *async;
+	int limit;
 
 	async = container_of(work, struct  async_submit_bio, work);
 	fs_info = BTRFS_I(async->inode)->root->fs_info;
+
+	limit = async_submit_limit(fs_info);
+	limit = limit * 2 / 3;
+
 	atomic_dec(&fs_info->nr_async_submits);
 
-	if ((async->bio->bi_rw & (1 << BIO_RW)) &&
-	    !btrfs_congested_async(fs_info, 1)) {
-		clear_bdi_congested(&fs_info->bdi, WRITE);
-	}
+	if (atomic_read(&fs_info->nr_async_submits) < limit)
+		wake_up(&fs_info->async_submit_wait);
+
 	async->submit_bio_hook(async->inode, async->rw, async->bio,
 			       async->mirror_num);
 	kfree(async);
@@ -464,6 +469,7 @@
 			extent_submit_bio_hook_t *submit_bio_hook)
 {
 	struct async_submit_bio *async;
+	int limit = async_submit_limit(fs_info);
 
 	async = kmalloc(sizeof(*async), GFP_NOFS);
 	if (!async)
@@ -478,6 +484,10 @@
 	async->work.flags = 0;
 	atomic_inc(&fs_info->nr_async_submits);
 	btrfs_queue_worker(&fs_info->workers, &async->work);
+
+	wait_event_timeout(fs_info->async_submit_wait,
+			   (atomic_read(&fs_info->nr_async_submits) < limit),
+			   HZ/10);
 	return 0;
 }
 
@@ -545,16 +555,11 @@
 	if (wbc->sync_mode == WB_SYNC_NONE) {
 		u64 num_dirty;
 		u64 start = 0;
-		unsigned long thresh = 96 * 1024 * 1024;
+		unsigned long thresh = 8 * 1024 * 1024;
 
 		if (wbc->for_kupdate)
 			return 0;
 
-		if (current_is_pdflush()) {
-			thresh = 96 * 1024 * 1024;
-		} else {
-			thresh = 8 * 1024 * 1024;
-		}
 		num_dirty = count_range_bits(tree, &start, (u64)-1,
 					     thresh, EXTENT_DIRTY);
 		if (num_dirty < thresh) {
@@ -1333,6 +1338,7 @@
 	mutex_init(&fs_info->volume_mutex);
 	init_waitqueue_head(&fs_info->transaction_throttle);
 	init_waitqueue_head(&fs_info->transaction_wait);
+	init_waitqueue_head(&fs_info->async_submit_wait);
 
 #if 0
 	ret = add_hasher(fs_info, "crc32c");
@@ -1380,6 +1386,7 @@
 	 * devices
 	 */
 	fs_info->submit_workers.idle_thresh = 64;
+	fs_info->workers.idle_thresh = 32;
 
 	btrfs_init_workers(&fs_info->fixup_workers, "fixup", 1);
 	btrfs_init_workers(&fs_info->endio_workers, "endio",
@@ -1849,7 +1856,7 @@
 	struct extent_io_tree *tree;
 	u64 num_dirty;
 	u64 start = 0;
-	unsigned long thresh = 2 * 1024 * 1024;
+	unsigned long thresh = 12 * 1024 * 1024;
 	tree = &BTRFS_I(root->fs_info->btree_inode)->io_tree;
 
 	if (current_is_pdflush())