md: Fix possible deadlock with multiple mempool allocations.

It is not safe to allocate from a mempool while holding an item
previously allocated from that mempool as that can deadlock when the
mempool is close to exhaustion.

So don't use a bio list to collect the bios to write to multiple
devices in raid1 and raid10.
Instead queue each bio as it becomes available so an unplug will
activate all previously allocated bios and so a new bio has a chance
of being allocated.

This means we must set the 'remaining' count to '1' before submitting
any requests, then when all are submitted, decrement 'remaining' and
possible handle the write completion at that point.

Reported-by: Torsten Kaiser <just.for.lkml@googlemail.com>
Tested-by: Torsten Kaiser <just.for.lkml@googlemail.com>
Signed-off-by: NeilBrown <neilb@suse.de>
diff --git a/drivers/md/raid1.c b/drivers/md/raid1.c
index a4b85a9..3362cfc 100644
--- a/drivers/md/raid1.c
+++ b/drivers/md/raid1.c
@@ -306,6 +306,28 @@
 	rdev_dec_pending(conf->mirrors[mirror].rdev, conf->mddev);
 }
 
+static void r1_bio_write_done(r1bio_t *r1_bio, int vcnt, struct bio_vec *bv,
+			      int behind)
+{
+	if (atomic_dec_and_test(&r1_bio->remaining))
+	{
+		/* it really is the end of this request */
+		if (test_bit(R1BIO_BehindIO, &r1_bio->state)) {
+			/* free extra copy of the data pages */
+			int i = vcnt;
+			while (i--)
+				safe_put_page(bv[i].bv_page);
+		}
+		/* clear the bitmap if all writes complete successfully */
+		bitmap_endwrite(r1_bio->mddev->bitmap, r1_bio->sector,
+				r1_bio->sectors,
+				!test_bit(R1BIO_Degraded, &r1_bio->state),
+				behind);
+		md_write_end(r1_bio->mddev);
+		raid_end_bio_io(r1_bio);
+	}
+}
+
 static void raid1_end_write_request(struct bio *bio, int error)
 {
 	int uptodate = test_bit(BIO_UPTODATE, &bio->bi_flags);
@@ -373,21 +395,7 @@
 	 * Let's see if all mirrored write operations have finished
 	 * already.
 	 */
-	if (atomic_dec_and_test(&r1_bio->remaining)) {
-		if (test_bit(R1BIO_BehindIO, &r1_bio->state)) {
-			/* free extra copy of the data pages */
-			int i = bio->bi_vcnt;
-			while (i--)
-				safe_put_page(bio->bi_io_vec[i].bv_page);
-		}
-		/* clear the bitmap if all writes complete successfully */
-		bitmap_endwrite(r1_bio->mddev->bitmap, r1_bio->sector,
-				r1_bio->sectors,
-				!test_bit(R1BIO_Degraded, &r1_bio->state),
-				behind);
-		md_write_end(r1_bio->mddev);
-		raid_end_bio_io(r1_bio);
-	}
+	r1_bio_write_done(r1_bio, bio->bi_vcnt, bio->bi_io_vec, behind);
 
 	if (to_put)
 		bio_put(to_put);
@@ -735,23 +743,26 @@
 }
 
 
-/* duplicate the data pages for behind I/O */
-static struct page **alloc_behind_pages(struct bio *bio)
+/* duplicate the data pages for behind I/O 
+ * We return a list of bio_vec rather than just page pointers
+ * as it makes freeing easier
+ */
+static struct bio_vec *alloc_behind_pages(struct bio *bio)
 {
 	int i;
 	struct bio_vec *bvec;
-	struct page **pages = kzalloc(bio->bi_vcnt * sizeof(struct page *),
+	struct bio_vec *pages = kzalloc(bio->bi_vcnt * sizeof(struct bio_vec),
 					GFP_NOIO);
 	if (unlikely(!pages))
 		goto do_sync_io;
 
 	bio_for_each_segment(bvec, bio, i) {
-		pages[i] = alloc_page(GFP_NOIO);
-		if (unlikely(!pages[i]))
+		pages[i].bv_page = alloc_page(GFP_NOIO);
+		if (unlikely(!pages[i].bv_page))
 			goto do_sync_io;
-		memcpy(kmap(pages[i]) + bvec->bv_offset,
+		memcpy(kmap(pages[i].bv_page) + bvec->bv_offset,
 			kmap(bvec->bv_page) + bvec->bv_offset, bvec->bv_len);
-		kunmap(pages[i]);
+		kunmap(pages[i].bv_page);
 		kunmap(bvec->bv_page);
 	}
 
@@ -759,8 +770,8 @@
 
 do_sync_io:
 	if (pages)
-		for (i = 0; i < bio->bi_vcnt && pages[i]; i++)
-			put_page(pages[i]);
+		for (i = 0; i < bio->bi_vcnt && pages[i].bv_page; i++)
+			put_page(pages[i].bv_page);
 	kfree(pages);
 	PRINTK("%dB behind alloc failed, doing sync I/O\n", bio->bi_size);
 	return NULL;
@@ -775,8 +786,7 @@
 	int i, targets = 0, disks;
 	struct bitmap *bitmap;
 	unsigned long flags;
-	struct bio_list bl;
-	struct page **behind_pages = NULL;
+	struct bio_vec *behind_pages = NULL;
 	const int rw = bio_data_dir(bio);
 	const unsigned long do_sync = (bio->bi_rw & REQ_SYNC);
 	const unsigned long do_flush_fua = (bio->bi_rw & (REQ_FLUSH | REQ_FUA));
@@ -873,13 +883,6 @@
 	 * bios[x] to bio
 	 */
 	disks = conf->raid_disks;
-#if 0
-	{ static int first=1;
-	if (first) printk("First Write sector %llu disks %d\n",
-			  (unsigned long long)r1_bio->sector, disks);
-	first = 0;
-	}
-#endif
  retry_write:
 	blocked_rdev = NULL;
 	rcu_read_lock();
@@ -937,10 +940,11 @@
 	    (behind_pages = alloc_behind_pages(bio)) != NULL)
 		set_bit(R1BIO_BehindIO, &r1_bio->state);
 
-	atomic_set(&r1_bio->remaining, 0);
+	atomic_set(&r1_bio->remaining, 1);
 	atomic_set(&r1_bio->behind_remaining, 0);
 
-	bio_list_init(&bl);
+	bitmap_startwrite(bitmap, bio->bi_sector, r1_bio->sectors,
+				test_bit(R1BIO_BehindIO, &r1_bio->state));
 	for (i = 0; i < disks; i++) {
 		struct bio *mbio;
 		if (!r1_bio->bios[i])
@@ -967,35 +971,25 @@
 			 * them all
 			 */
 			__bio_for_each_segment(bvec, mbio, j, 0)
-				bvec->bv_page = behind_pages[j];
+				bvec->bv_page = behind_pages[j].bv_page;
 			if (test_bit(WriteMostly, &conf->mirrors[i].rdev->flags))
 				atomic_inc(&r1_bio->behind_remaining);
 		}
 
 		atomic_inc(&r1_bio->remaining);
-
-		bio_list_add(&bl, mbio);
+		spin_lock_irqsave(&conf->device_lock, flags);
+		bio_list_add(&conf->pending_bio_list, mbio);
+		blk_plug_device(mddev->queue);
+		spin_unlock_irqrestore(&conf->device_lock, flags);
 	}
+	r1_bio_write_done(r1_bio, bio->bi_vcnt, behind_pages, behind_pages != NULL);
 	kfree(behind_pages); /* the behind pages are attached to the bios now */
 
-	bitmap_startwrite(bitmap, bio->bi_sector, r1_bio->sectors,
-				test_bit(R1BIO_BehindIO, &r1_bio->state));
-	spin_lock_irqsave(&conf->device_lock, flags);
-	bio_list_merge(&conf->pending_bio_list, &bl);
-	bio_list_init(&bl);
-
-	blk_plug_device(mddev->queue);
-	spin_unlock_irqrestore(&conf->device_lock, flags);
-
-	/* In case raid1d snuck into freeze_array */
+	/* In case raid1d snuck in to freeze_array */
 	wake_up(&conf->wait_barrier);
 
 	if (do_sync)
 		md_wakeup_thread(mddev->thread);
-#if 0
-	while ((bio = bio_list_pop(&bl)) != NULL)
-		generic_make_request(bio);
-#endif
 
 	return 0;
 }
diff --git a/drivers/md/raid10.c b/drivers/md/raid10.c
index 387fe4b..8f5543a 100644
--- a/drivers/md/raid10.c
+++ b/drivers/md/raid10.c
@@ -801,7 +801,6 @@
 	const int rw = bio_data_dir(bio);
 	const unsigned long do_sync = (bio->bi_rw & REQ_SYNC);
 	const unsigned long do_fua = (bio->bi_rw & REQ_FUA);
-	struct bio_list bl;
 	unsigned long flags;
 	mdk_rdev_t *blocked_rdev;
 
@@ -950,9 +949,9 @@
 		goto retry_write;
 	}
 
-	atomic_set(&r10_bio->remaining, 0);
+	atomic_set(&r10_bio->remaining, 1);
+	bitmap_startwrite(mddev->bitmap, bio->bi_sector, r10_bio->sectors, 0);
 
-	bio_list_init(&bl);
 	for (i = 0; i < conf->copies; i++) {
 		struct bio *mbio;
 		int d = r10_bio->devs[i].devnum;
@@ -970,22 +969,22 @@
 		mbio->bi_private = r10_bio;
 
 		atomic_inc(&r10_bio->remaining);
-		bio_list_add(&bl, mbio);
+		spin_lock_irqsave(&conf->device_lock, flags);
+		bio_list_add(&conf->pending_bio_list, mbio);
+		blk_plug_device(mddev->queue);
+		spin_unlock_irqrestore(&conf->device_lock, flags);
 	}
 
-	if (unlikely(!atomic_read(&r10_bio->remaining))) {
-		/* the array is dead */
+	if (atomic_dec_and_test(&r10_bio->remaining)) {
+		/* This matches the end of raid10_end_write_request() */
+		bitmap_endwrite(r10_bio->mddev->bitmap, r10_bio->sector,
+				r10_bio->sectors,
+				!test_bit(R10BIO_Degraded, &r10_bio->state),
+				0);
 		md_write_end(mddev);
 		raid_end_bio_io(r10_bio);
-		return 0;
 	}
 
-	bitmap_startwrite(mddev->bitmap, bio->bi_sector, r10_bio->sectors, 0);
-	spin_lock_irqsave(&conf->device_lock, flags);
-	bio_list_merge(&conf->pending_bio_list, &bl);
-	blk_plug_device(mddev->queue);
-	spin_unlock_irqrestore(&conf->device_lock, flags);
-
 	/* In case raid10d snuck in to freeze_array */
 	wake_up(&conf->wait_barrier);