Btrfs: Allow worker threads to exit when idle

The Btrfs worker threads don't currently die off after they have
been idle for a while, leading to a lot of threads sitting around
doing nothing for each mount.

Also, they are unable to start atomically (from end_io hanlders).

This commit reworks the worker threads so they can be started
from end_io handlers (just setting a flag that asks for a thread
to be added at a later date) and so they can exit if they
have been idle for a long time.

Signed-off-by: Chris Mason <chris.mason@oracle.com>
diff --git a/fs/btrfs/async-thread.c b/fs/btrfs/async-thread.c
index 019e8af..f10c895 100644
--- a/fs/btrfs/async-thread.c
+++ b/fs/btrfs/async-thread.c
@@ -48,6 +48,9 @@
 	/* number of things on the pending list */
 	atomic_t num_pending;
 
+	/* reference counter for this struct */
+	atomic_t refs;
+
 	unsigned long sequence;
 
 	/* protects the pending list. */
@@ -93,6 +96,31 @@
 	}
 }
 
+static void check_pending_worker_creates(struct btrfs_worker_thread *worker)
+{
+	struct btrfs_workers *workers = worker->workers;
+	unsigned long flags;
+
+	rmb();
+	if (!workers->atomic_start_pending)
+		return;
+
+	spin_lock_irqsave(&workers->lock, flags);
+	if (!workers->atomic_start_pending)
+		goto out;
+
+	workers->atomic_start_pending = 0;
+	if (workers->num_workers >= workers->max_workers)
+		goto out;
+
+	spin_unlock_irqrestore(&workers->lock, flags);
+	btrfs_start_workers(workers, 1);
+	return;
+
+out:
+	spin_unlock_irqrestore(&workers->lock, flags);
+}
+
 static noinline int run_ordered_completions(struct btrfs_workers *workers,
 					    struct btrfs_work *work)
 {
@@ -140,6 +168,36 @@
 	return 0;
 }
 
+static void put_worker(struct btrfs_worker_thread *worker)
+{
+	if (atomic_dec_and_test(&worker->refs))
+		kfree(worker);
+}
+
+static int try_worker_shutdown(struct btrfs_worker_thread *worker)
+{
+	int freeit = 0;
+
+	spin_lock_irq(&worker->lock);
+	spin_lock_irq(&worker->workers->lock);
+	if (worker->workers->num_workers > 1 &&
+	    worker->idle &&
+	    !worker->working &&
+	    !list_empty(&worker->worker_list) &&
+	    list_empty(&worker->prio_pending) &&
+	    list_empty(&worker->pending)) {
+		freeit = 1;
+		list_del_init(&worker->worker_list);
+		worker->workers->num_workers--;
+	}
+	spin_unlock_irq(&worker->workers->lock);
+	spin_unlock_irq(&worker->lock);
+
+	if (freeit)
+		put_worker(worker);
+	return freeit;
+}
+
 /*
  * main loop for servicing work items
  */
@@ -175,6 +233,8 @@
 			 */
 			run_ordered_completions(worker->workers, work);
 
+			check_pending_worker_creates(worker);
+
 			spin_lock_irq(&worker->lock);
 			check_idle_worker(worker);
 		}
@@ -226,8 +286,13 @@
 				worker->working = 0;
 				spin_unlock_irq(&worker->lock);
 
-				if (!kthread_should_stop())
-					schedule();
+				if (!kthread_should_stop()) {
+					schedule_timeout(HZ * 120);
+					if (!worker->working &&
+					    try_worker_shutdown(worker)) {
+						return 0;
+					}
+				}
 			}
 			__set_current_state(TASK_RUNNING);
 		}
@@ -242,16 +307,30 @@
 {
 	struct list_head *cur;
 	struct btrfs_worker_thread *worker;
+	int can_stop;
 
+	spin_lock_irq(&workers->lock);
 	list_splice_init(&workers->idle_list, &workers->worker_list);
 	while (!list_empty(&workers->worker_list)) {
 		cur = workers->worker_list.next;
 		worker = list_entry(cur, struct btrfs_worker_thread,
 				    worker_list);
-		kthread_stop(worker->task);
-		list_del(&worker->worker_list);
-		kfree(worker);
+
+		atomic_inc(&worker->refs);
+		workers->num_workers -= 1;
+		if (!list_empty(&worker->worker_list)) {
+			list_del_init(&worker->worker_list);
+			put_worker(worker);
+			can_stop = 1;
+		} else
+			can_stop = 0;
+		spin_unlock_irq(&workers->lock);
+		if (can_stop)
+			kthread_stop(worker->task);
+		spin_lock_irq(&workers->lock);
+		put_worker(worker);
 	}
+	spin_unlock_irq(&workers->lock);
 	return 0;
 }
 
@@ -270,6 +349,8 @@
 	workers->idle_thresh = 32;
 	workers->name = name;
 	workers->ordered = 0;
+	workers->atomic_start_pending = 0;
+	workers->atomic_worker_start = 0;
 }
 
 /*
@@ -294,6 +375,7 @@
 		INIT_LIST_HEAD(&worker->worker_list);
 		spin_lock_init(&worker->lock);
 		atomic_set(&worker->num_pending, 0);
+		atomic_set(&worker->refs, 1);
 		worker->workers = workers;
 		worker->task = kthread_run(worker_loop, worker,
 					   "btrfs-%s-%d", workers->name,
@@ -303,7 +385,6 @@
 			kfree(worker);
 			goto fail;
 		}
-
 		spin_lock_irq(&workers->lock);
 		list_add_tail(&worker->worker_list, &workers->idle_list);
 		worker->idle = 1;
@@ -367,6 +448,7 @@
 {
 	struct btrfs_worker_thread *worker;
 	unsigned long flags;
+	struct list_head *fallback;
 
 again:
 	spin_lock_irqsave(&workers->lock, flags);
@@ -376,19 +458,10 @@
 	if (!worker) {
 		spin_lock_irqsave(&workers->lock, flags);
 		if (workers->num_workers >= workers->max_workers) {
-			struct list_head *fallback = NULL;
-			/*
-			 * we have failed to find any workers, just
-			 * return the force one
-			 */
-			if (!list_empty(&workers->worker_list))
-				fallback = workers->worker_list.next;
-			if (!list_empty(&workers->idle_list))
-				fallback = workers->idle_list.next;
-			BUG_ON(!fallback);
-			worker = list_entry(fallback,
-				  struct btrfs_worker_thread, worker_list);
-			spin_unlock_irqrestore(&workers->lock, flags);
+			goto fallback;
+		} else if (workers->atomic_worker_start) {
+			workers->atomic_start_pending = 1;
+			goto fallback;
 		} else {
 			spin_unlock_irqrestore(&workers->lock, flags);
 			/* we're below the limit, start another worker */
@@ -397,6 +470,22 @@
 		}
 	}
 	return worker;
+
+fallback:
+	fallback = NULL;
+	/*
+	 * we have failed to find any workers, just
+	 * return the first one we can find.
+	 */
+	if (!list_empty(&workers->worker_list))
+		fallback = workers->worker_list.next;
+	if (!list_empty(&workers->idle_list))
+		fallback = workers->idle_list.next;
+	BUG_ON(!fallback);
+	worker = list_entry(fallback,
+		  struct btrfs_worker_thread, worker_list);
+	spin_unlock_irqrestore(&workers->lock, flags);
+	return worker;
 }
 
 /*
@@ -435,9 +524,9 @@
 		worker->working = 1;
 	}
 
-	spin_unlock_irqrestore(&worker->lock, flags);
 	if (wake)
 		wake_up_process(worker->task);
+	spin_unlock_irqrestore(&worker->lock, flags);
 out:
 
 	return 0;
@@ -492,10 +581,10 @@
 		wake = 1;
 	worker->working = 1;
 
-	spin_unlock_irqrestore(&worker->lock, flags);
-
 	if (wake)
 		wake_up_process(worker->task);
+	spin_unlock_irqrestore(&worker->lock, flags);
+
 out:
 	return 0;
 }
diff --git a/fs/btrfs/async-thread.h b/fs/btrfs/async-thread.h
index 1b511c1..a562ad8 100644
--- a/fs/btrfs/async-thread.h
+++ b/fs/btrfs/async-thread.h
@@ -73,6 +73,15 @@
 	/* force completions in the order they were queued */
 	int ordered;
 
+	/* more workers required, but in an interrupt handler */
+	int atomic_start_pending;
+
+	/*
+	 * are we allowed to sleep while starting workers or are we required
+	 * to start them at a later time?
+	 */
+	int atomic_worker_start;
+
 	/* list with all the work threads.  The workers on the idle thread
 	 * may be actively servicing jobs, but they haven't yet hit the
 	 * idle thresh limit above.
diff --git a/fs/btrfs/disk-io.c b/fs/btrfs/disk-io.c
index 3cf4cfa..20cefc6 100644
--- a/fs/btrfs/disk-io.c
+++ b/fs/btrfs/disk-io.c
@@ -1682,7 +1682,7 @@
 		err = -EINVAL;
 		goto fail_iput;
 	}
-
+printk("thread pool is %d\n", fs_info->thread_pool_size);
 	/*
 	 * we need to start all the end_io workers up front because the
 	 * queue work function gets called at interrupt time, and so it
@@ -1727,20 +1727,22 @@
 	fs_info->endio_workers.idle_thresh = 4;
 	fs_info->endio_meta_workers.idle_thresh = 4;
 
-	fs_info->endio_write_workers.idle_thresh = 64;
-	fs_info->endio_meta_write_workers.idle_thresh = 64;
+	fs_info->endio_write_workers.idle_thresh = 2;
+	fs_info->endio_meta_write_workers.idle_thresh = 2;
+
+	fs_info->endio_workers.atomic_worker_start = 1;
+	fs_info->endio_meta_workers.atomic_worker_start = 1;
+	fs_info->endio_write_workers.atomic_worker_start = 1;
+	fs_info->endio_meta_write_workers.atomic_worker_start = 1;
 
 	btrfs_start_workers(&fs_info->workers, 1);
 	btrfs_start_workers(&fs_info->submit_workers, 1);
 	btrfs_start_workers(&fs_info->delalloc_workers, 1);
 	btrfs_start_workers(&fs_info->fixup_workers, 1);
-	btrfs_start_workers(&fs_info->endio_workers, fs_info->thread_pool_size);
-	btrfs_start_workers(&fs_info->endio_meta_workers,
-			    fs_info->thread_pool_size);
-	btrfs_start_workers(&fs_info->endio_meta_write_workers,
-			    fs_info->thread_pool_size);
-	btrfs_start_workers(&fs_info->endio_write_workers,
-			    fs_info->thread_pool_size);
+	btrfs_start_workers(&fs_info->endio_workers, 1);
+	btrfs_start_workers(&fs_info->endio_meta_workers, 1);
+	btrfs_start_workers(&fs_info->endio_meta_write_workers, 1);
+	btrfs_start_workers(&fs_info->endio_write_workers, 1);
 
 	fs_info->bdi.ra_pages *= btrfs_super_num_devices(disk_super);
 	fs_info->bdi.ra_pages = max(fs_info->bdi.ra_pages,