Btrfs: Add async worker threads for pre and post IO checksumming

Btrfs has been using workqueues to spread the checksumming load across
other CPUs in the system.  But, workqueues only schedule work on the
same CPU that queued the work, giving them a limited benefit for systems with
higher CPU counts.

This code adds a generic facility to schedule work with pools of kthreads,
and changes the bio submission code to queue bios up.  The queueing is
important to make sure large numbers of procs on the system don't
turn streaming workloads into random workloads by sending IO down
concurrently.

The end result of all of this is much higher performance (and CPU usage) when
doing checksumming on large machines.  Two worker pools are created,
one for writes and one for endio processing.  The two could deadlock if
we tried to service both from a single pool.

Signed-off-by: Chris Mason <chris.mason@oracle.com>
diff --git a/fs/btrfs/disk-io.c b/fs/btrfs/disk-io.c
index b9a5364..98ff4fb 100644
--- a/fs/btrfs/disk-io.c
+++ b/fs/btrfs/disk-io.c
@@ -31,6 +31,7 @@
 #include "btrfs_inode.h"
 #include "volumes.h"
 #include "print-tree.h"
+#include "async-thread.h"
 
 #if 0
 static int check_tree_block(struct btrfs_root *root, struct extent_buffer *buf)
@@ -46,8 +47,7 @@
 #endif
 
 static struct extent_io_ops btree_extent_io_ops;
-static struct workqueue_struct *end_io_workqueue;
-static struct workqueue_struct *async_submit_workqueue;
+static void end_workqueue_fn(struct btrfs_work *work);
 
 struct end_io_wq {
 	struct bio *bio;
@@ -57,6 +57,7 @@
 	int error;
 	int metadata;
 	struct list_head list;
+	struct btrfs_work work;
 };
 
 struct async_submit_bio {
@@ -66,6 +67,7 @@
 	extent_submit_bio_hook_t *submit_bio_hook;
 	int rw;
 	int mirror_num;
+	struct btrfs_work work;
 };
 
 struct extent_map *btree_get_extent(struct inode *inode, struct page *page,
@@ -389,7 +391,6 @@
 {
 	struct end_io_wq *end_io_wq = bio->bi_private;
 	struct btrfs_fs_info *fs_info;
-	unsigned long flags;
 
 #if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,23)
 	if (bio->bi_size)
@@ -397,11 +398,10 @@
 #endif
 
 	fs_info = end_io_wq->info;
-	spin_lock_irqsave(&fs_info->end_io_work_lock, flags);
 	end_io_wq->error = err;
-	list_add_tail(&end_io_wq->list, &fs_info->end_io_work_list);
-	spin_unlock_irqrestore(&fs_info->end_io_work_lock, flags);
-	queue_work(end_io_workqueue, &fs_info->end_io_work);
+	end_io_wq->work.func = end_workqueue_fn;
+	end_io_wq->work.flags = 0;
+	btrfs_queue_worker(&fs_info->endio_workers, &end_io_wq->work);
 
 #if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,23)
 	return 0;
@@ -428,6 +428,19 @@
 	return 0;
 }
 
+static void run_one_async_submit(struct btrfs_work *work)
+{
+	struct btrfs_fs_info *fs_info;
+	struct async_submit_bio *async;
+
+	async = container_of(work, struct  async_submit_bio, work);
+	fs_info = BTRFS_I(async->inode)->root->fs_info;
+	atomic_dec(&fs_info->nr_async_submits);
+	async->submit_bio_hook(async->inode, async->rw, async->bio,
+			       async->mirror_num);
+	kfree(async);
+}
+
 int btrfs_wq_submit_bio(struct btrfs_fs_info *fs_info, struct inode *inode,
 			int rw, struct bio *bio, int mirror_num,
 			extent_submit_bio_hook_t *submit_bio_hook)
@@ -443,13 +456,10 @@
 	async->bio = bio;
 	async->mirror_num = mirror_num;
 	async->submit_bio_hook = submit_bio_hook;
-
-	spin_lock(&fs_info->async_submit_work_lock);
-	list_add_tail(&async->list, &fs_info->async_submit_work_list);
+	async->work.func = run_one_async_submit;
+	async->work.flags = 0;
 	atomic_inc(&fs_info->nr_async_submits);
-	spin_unlock(&fs_info->async_submit_work_lock);
-
-	queue_work(async_submit_workqueue, &fs_info->async_submit_work);
+	btrfs_queue_worker(&fs_info->workers, &async->work);
 	return 0;
 }
 
@@ -462,19 +472,32 @@
 
 	offset = bio->bi_sector << 9;
 
+	/*
+	 * when we're called for a write, we're already in the async
+	 * submission context.  Just jump ingo btrfs_map_bio
+	 */
 	if (rw & (1 << BIO_RW)) {
-		return btrfs_map_bio(BTRFS_I(inode)->root, rw, bio, mirror_num);
+		return btrfs_map_bio(BTRFS_I(inode)->root, rw, bio,
+				     mirror_num, 0);
 	}
 
+	/*
+	 * called for a read, do the setup so that checksum validation
+	 * can happen in the async kernel threads
+	 */
 	ret = btrfs_bio_wq_end_io(root->fs_info, bio, 1);
 	BUG_ON(ret);
 
-	return btrfs_map_bio(BTRFS_I(inode)->root, rw, bio, mirror_num);
+	return btrfs_map_bio(BTRFS_I(inode)->root, rw, bio, mirror_num, 1);
 }
 
 static int btree_submit_bio_hook(struct inode *inode, int rw, struct bio *bio,
 				 int mirror_num)
 {
+	/*
+	 * kthread helpers are used to submit writes so that checksumming
+	 * can happen in parallel across all CPUs
+	 */
 	if (!(rw & (1 << BIO_RW))) {
 		return __btree_submit_bio_hook(inode, rw, bio, mirror_num);
 	}
@@ -1036,95 +1059,40 @@
 	return ret;
 }
 
-#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,18)
-static void btrfs_end_io_csum(void *p)
-#else
-static void btrfs_end_io_csum(struct work_struct *work)
-#endif
+/*
+ * called by the kthread helper functions to finally call the bio end_io
+ * functions.  This is where read checksum verification actually happens
+ */
+static void end_workqueue_fn(struct btrfs_work *work)
 {
-#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,18)
-	struct btrfs_fs_info *fs_info = p;
-#else
-	struct btrfs_fs_info *fs_info = container_of(work,
-						     struct btrfs_fs_info,
-						     end_io_work);
-#endif
-	unsigned long flags;
-	struct end_io_wq *end_io_wq;
 	struct bio *bio;
-	struct list_head *next;
+	struct end_io_wq *end_io_wq;
+	struct btrfs_fs_info *fs_info;
 	int error;
-	int was_empty;
 
-	while(1) {
-		spin_lock_irqsave(&fs_info->end_io_work_lock, flags);
-		if (list_empty(&fs_info->end_io_work_list)) {
-			spin_unlock_irqrestore(&fs_info->end_io_work_lock,
-					       flags);
-			return;
-		}
-		next = fs_info->end_io_work_list.next;
-		list_del(next);
-		spin_unlock_irqrestore(&fs_info->end_io_work_lock, flags);
+	end_io_wq = container_of(work, struct end_io_wq, work);
+	bio = end_io_wq->bio;
+	fs_info = end_io_wq->info;
 
-		end_io_wq = list_entry(next, struct end_io_wq, list);
-
-		bio = end_io_wq->bio;
-		if (end_io_wq->metadata && !bio_ready_for_csum(bio)) {
-			spin_lock_irqsave(&fs_info->end_io_work_lock, flags);
-			was_empty = list_empty(&fs_info->end_io_work_list);
-			list_add_tail(&end_io_wq->list,
-				      &fs_info->end_io_work_list);
-			spin_unlock_irqrestore(&fs_info->end_io_work_lock,
-					       flags);
-			if (was_empty)
-				return;
-			continue;
-		}
-		error = end_io_wq->error;
-		bio->bi_private = end_io_wq->private;
-		bio->bi_end_io = end_io_wq->end_io;
-		kfree(end_io_wq);
+	/* metadata bios are special because the whole tree block must
+	 * be checksummed at once.  This makes sure the entire block is in
+	 * ram and up to date before trying to verify things.  For
+	 * blocksize <= pagesize, it is basically a noop
+	 */
+	if (end_io_wq->metadata && !bio_ready_for_csum(bio)) {
+		btrfs_queue_worker(&fs_info->endio_workers,
+				   &end_io_wq->work);
+		return;
+	}
+	error = end_io_wq->error;
+	bio->bi_private = end_io_wq->private;
+	bio->bi_end_io = end_io_wq->end_io;
+	kfree(end_io_wq);
 #if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,23)
-		bio_endio(bio, bio->bi_size, error);
+	bio_endio(bio, bio->bi_size, error);
 #else
-		bio_endio(bio, error);
+	bio_endio(bio, error);
 #endif
-	}
-}
-
-#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,18)
-static void btrfs_async_submit_work(void *p)
-#else
-static void btrfs_async_submit_work(struct work_struct *work)
-#endif
-{
-#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,18)
-	struct btrfs_fs_info *fs_info = p;
-#else
-	struct btrfs_fs_info *fs_info = container_of(work,
-						     struct btrfs_fs_info,
-						     async_submit_work);
-#endif
-	struct async_submit_bio *async;
-	struct list_head *next;
-
-	while(1) {
-		spin_lock(&fs_info->async_submit_work_lock);
-		if (list_empty(&fs_info->async_submit_work_list)) {
-			spin_unlock(&fs_info->async_submit_work_lock);
-			return;
-		}
-		next = fs_info->async_submit_work_list.next;
-		list_del(next);
-		atomic_dec(&fs_info->nr_async_submits);
-		spin_unlock(&fs_info->async_submit_work_lock);
-
-		async = list_entry(next, struct async_submit_bio, list);
-		async->submit_bio_hook(async->inode, async->rw, async->bio,
-				       async->mirror_num);
-		kfree(async);
-	}
 }
 
 struct btrfs_root *open_ctree(struct super_block *sb,
@@ -1155,19 +1123,11 @@
 		err = -ENOMEM;
 		goto fail;
 	}
-	end_io_workqueue = create_workqueue("btrfs-end-io");
-	BUG_ON(!end_io_workqueue);
-	async_submit_workqueue = create_workqueue("btrfs-async-submit");
-
 	INIT_RADIX_TREE(&fs_info->fs_roots_radix, GFP_NOFS);
 	INIT_LIST_HEAD(&fs_info->trans_list);
 	INIT_LIST_HEAD(&fs_info->dead_roots);
 	INIT_LIST_HEAD(&fs_info->hashers);
-	INIT_LIST_HEAD(&fs_info->end_io_work_list);
-	INIT_LIST_HEAD(&fs_info->async_submit_work_list);
 	spin_lock_init(&fs_info->hash_lock);
-	spin_lock_init(&fs_info->end_io_work_lock);
-	spin_lock_init(&fs_info->async_submit_work_lock);
 	spin_lock_init(&fs_info->delalloc_lock);
 	spin_lock_init(&fs_info->new_trans_lock);
 
@@ -1222,13 +1182,8 @@
 	fs_info->do_barriers = 1;
 
 #if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,18)
-	INIT_WORK(&fs_info->end_io_work, btrfs_end_io_csum, fs_info);
-	INIT_WORK(&fs_info->async_submit_work, btrfs_async_submit_work,
-		  fs_info);
 	INIT_WORK(&fs_info->trans_work, btrfs_transaction_cleaner, fs_info);
 #else
-	INIT_WORK(&fs_info->end_io_work, btrfs_end_io_csum);
-	INIT_WORK(&fs_info->async_submit_work, btrfs_async_submit_work);
 	INIT_DELAYED_WORK(&fs_info->trans_work, btrfs_transaction_cleaner);
 #endif
 	BTRFS_I(fs_info->btree_inode)->root = tree_root;
@@ -1240,6 +1195,19 @@
 	mutex_init(&fs_info->trans_mutex);
 	mutex_init(&fs_info->fs_mutex);
 
+	/* we need to start all the end_io workers up front because the
+	 * queue work function gets called at interrupt time.  The endio
+	 * workers don't normally start IO, so some number of them <= the
+	 * number of cpus is fine.  They handle checksumming after a read.
+	 *
+	 * The other worker threads do start IO, so the max is larger than
+	 * the number of CPUs.  FIXME, tune this for huge machines
+	 */
+	btrfs_init_workers(&fs_info->workers, num_online_cpus() * 2);
+	btrfs_init_workers(&fs_info->endio_workers, num_online_cpus());
+	btrfs_start_workers(&fs_info->workers, 1);
+	btrfs_start_workers(&fs_info->endio_workers, num_online_cpus());
+
 #if 0
 	ret = add_hasher(fs_info, "crc32c");
 	if (ret) {
@@ -1375,6 +1343,8 @@
 	extent_io_tree_empty_lru(&BTRFS_I(fs_info->btree_inode)->io_tree);
 fail_iput:
 	iput(fs_info->btree_inode);
+	btrfs_stop_workers(&fs_info->workers);
+	btrfs_stop_workers(&fs_info->endio_workers);
 fail:
 	btrfs_close_devices(fs_info->fs_devices);
 	btrfs_mapping_tree_free(&fs_info->mapping_tree);
@@ -1623,16 +1593,10 @@
 	extent_io_tree_empty_lru(&fs_info->extent_ins);
 	extent_io_tree_empty_lru(&BTRFS_I(fs_info->btree_inode)->io_tree);
 
-	flush_workqueue(async_submit_workqueue);
-	flush_workqueue(end_io_workqueue);
-
 	truncate_inode_pages(fs_info->btree_inode->i_mapping, 0);
 
-	flush_workqueue(async_submit_workqueue);
-	destroy_workqueue(async_submit_workqueue);
-
-	flush_workqueue(end_io_workqueue);
-	destroy_workqueue(end_io_workqueue);
+	btrfs_stop_workers(&fs_info->workers);
+	btrfs_stop_workers(&fs_info->endio_workers);
 
 	iput(fs_info->btree_inode);
 #if 0