ocfs2: Add xattr bucket iteration for large numbers of EAs

Ocfs2 breaks up xattr index tree leaves into 4k regions, called buckets.
Attributes are stored within a given bucket, depending on hash value.

After a discussion with Mark, we decided that the per-bucket index
(xe_entry[]) would only exist in the 1st block of a bucket. Likewise,
name/value pairs will not straddle more than one block. This allows the
majority of operations to work directly on the buffer heads in a leaf block.

This patch adds code to iterate the buckets in an EA. A new abstration of
ocfs2_xattr_bucket is added. It records the bhs in this bucket and
ocfs2_xattr_header. This keeps the code neat, improving readibility.

Signed-off-by: Tao Ma <tao.ma@oracle.com>
Signed-off-by: Mark Fasheh <mfasheh@suse.com>
diff --git a/fs/ocfs2/ocfs2_fs.h b/fs/ocfs2/ocfs2_fs.h
index 98e1f8b..8d5e72f 100644
--- a/fs/ocfs2/ocfs2_fs.h
+++ b/fs/ocfs2/ocfs2_fs.h
@@ -755,8 +755,13 @@
 	__le16	xh_count;                       /* contains the count of how
 						   many records are in the
 						   local xattr storage. */
-	__le16	xh_reserved1;
-	__le32	xh_reserved2;
+	__le16	xh_free_start;                  /* current offset for storing
+						   xattr. */
+	__le16	xh_name_value_len;              /* total length of name/value
+						   length in this bucket. */
+	__le16	xh_num_buckets;                 /* bucket nums in one extent
+						   record, only valid in the
+						   first bucket. */
 	__le64  xh_csum;
 	struct ocfs2_xattr_entry xh_entries[0]; /* xattr entry list. */
 };
@@ -793,6 +798,10 @@
 #define OCFS2_XATTR_SIZE(size)	(((size) + OCFS2_XATTR_ROUND) & \
 				~(OCFS2_XATTR_ROUND))
 
+#define OCFS2_XATTR_BUCKET_SIZE			4096
+#define OCFS2_XATTR_MAX_BLOCKS_PER_BUCKET 	(OCFS2_XATTR_BUCKET_SIZE \
+						 / OCFS2_MIN_BLOCKSIZE)
+
 /*
  * On disk structure for xattr block.
  */
@@ -963,6 +972,17 @@
 	return 0;
 
 }
+
+static inline u16 ocfs2_xattr_recs_per_xb(struct super_block *sb)
+{
+	int size;
+
+	size = sb->s_blocksize -
+		offsetof(struct ocfs2_xattr_block,
+			 xb_attrs.xb_root.xt_list.l_recs);
+
+	return size / sizeof(struct ocfs2_extent_rec);
+}
 #else
 static inline int ocfs2_fast_symlink_chars(int blocksize)
 {
@@ -1046,6 +1066,17 @@
 
 	return 0;
 }
+
+static inline int ocfs2_xattr_recs_per_xb(int blocksize)
+{
+	int size;
+
+	size = blocksize -
+		offsetof(struct ocfs2_xattr_block,
+			 xb_attrs.xb_root.xt_list.l_recs);
+
+	return size / sizeof(struct ocfs2_extent_rec);
+}
 #endif  /* __KERNEL__ */
 
 
diff --git a/fs/ocfs2/xattr.c b/fs/ocfs2/xattr.c
index 67bebd9..fb17f7f 100644
--- a/fs/ocfs2/xattr.c
+++ b/fs/ocfs2/xattr.c
@@ -52,6 +52,7 @@
 #include "suballoc.h"
 #include "uptodate.h"
 #include "buffer_head_io.h"
+#include "super.h"
 #include "xattr.h"
 
 
@@ -60,6 +61,11 @@
 	struct ocfs2_extent_rec		er;
 };
 
+struct ocfs2_xattr_bucket {
+	struct buffer_head *bhs[OCFS2_XATTR_MAX_BLOCKS_PER_BUCKET];
+	struct ocfs2_xattr_header *xh;
+};
+
 #define OCFS2_XATTR_ROOT_SIZE	(sizeof(struct ocfs2_xattr_def_value_root))
 #define OCFS2_XATTR_INLINE_SIZE	80
 
@@ -99,6 +105,11 @@
 	int not_found;
 };
 
+static int ocfs2_xattr_tree_list_index_block(struct inode *inode,
+					struct ocfs2_xattr_tree_root *xt,
+					char *buffer,
+					size_t buffer_size);
+
 static inline struct xattr_handler *ocfs2_xattr_handler(int name_index)
 {
 	struct xattr_handler *handler = NULL;
@@ -483,7 +494,7 @@
 				  size_t buffer_size)
 {
 	struct buffer_head *blk_bh = NULL;
-	struct ocfs2_xattr_header *header = NULL;
+	struct ocfs2_xattr_block *xb;
 	int ret = 0;
 
 	if (!di->i_xattr_loc)
@@ -503,10 +514,17 @@
 		goto cleanup;
 	}
 
-	header = &((struct ocfs2_xattr_block *)blk_bh->b_data)->
-		 xb_attrs.xb_header;
+	xb = (struct ocfs2_xattr_block *)blk_bh->b_data;
 
-	ret = ocfs2_xattr_list_entries(inode, header, buffer, buffer_size);
+	if (!(le16_to_cpu(xb->xb_flags) & OCFS2_XATTR_INDEXED)) {
+		struct ocfs2_xattr_header *header = &xb->xb_attrs.xb_header;
+		ret = ocfs2_xattr_list_entries(inode, header,
+					       buffer, buffer_size);
+	} else {
+		struct ocfs2_xattr_tree_root *xt = &xb->xb_attrs.xb_root;
+		ret = ocfs2_xattr_tree_list_index_block(inode, xt,
+						   buffer, buffer_size);
+	}
 cleanup:
 	brelse(blk_bh);
 
@@ -1923,3 +1941,232 @@
 	return ret;
 }
 
+/*
+ * Find the xattr extent rec which may contains name_hash.
+ * e_cpos will be the first name hash of the xattr rec.
+ * el must be the ocfs2_xattr_header.xb_attrs.xb_root.xt_list.
+ */
+static int ocfs2_xattr_get_rec(struct inode *inode,
+			       u32 name_hash,
+			       u64 *p_blkno,
+			       u32 *e_cpos,
+			       u32 *num_clusters,
+			       struct ocfs2_extent_list *el)
+{
+	int ret = 0, i;
+	struct buffer_head *eb_bh = NULL;
+	struct ocfs2_extent_block *eb;
+	struct ocfs2_extent_rec *rec = NULL;
+	u64 e_blkno = 0;
+
+	if (el->l_tree_depth) {
+		ret = ocfs2_find_leaf(inode, el, name_hash, &eb_bh);
+		if (ret) {
+			mlog_errno(ret);
+			goto out;
+		}
+
+		eb = (struct ocfs2_extent_block *) eb_bh->b_data;
+		el = &eb->h_list;
+
+		if (el->l_tree_depth) {
+			ocfs2_error(inode->i_sb,
+				    "Inode %lu has non zero tree depth in "
+				    "xattr tree block %llu\n", inode->i_ino,
+				    (unsigned long long)eb_bh->b_blocknr);
+			ret = -EROFS;
+			goto out;
+		}
+	}
+
+	for (i = le16_to_cpu(el->l_next_free_rec) - 1; i >= 0; i--) {
+		rec = &el->l_recs[i];
+
+		if (le32_to_cpu(rec->e_cpos) <= name_hash) {
+			e_blkno = le64_to_cpu(rec->e_blkno);
+			break;
+		}
+	}
+
+	if (!e_blkno) {
+		ocfs2_error(inode->i_sb, "Inode %lu has bad extent "
+			    "record (%u, %u, 0) in xattr", inode->i_ino,
+			    le32_to_cpu(rec->e_cpos),
+			    ocfs2_rec_clusters(el, rec));
+		ret = -EROFS;
+		goto out;
+	}
+
+	*p_blkno = le64_to_cpu(rec->e_blkno);
+	*num_clusters = le16_to_cpu(rec->e_leaf_clusters);
+	if (e_cpos)
+		*e_cpos = le32_to_cpu(rec->e_cpos);
+out:
+	brelse(eb_bh);
+	return ret;
+}
+
+typedef int (xattr_bucket_func)(struct inode *inode,
+				struct ocfs2_xattr_bucket *bucket,
+				void *para);
+
+static int ocfs2_iterate_xattr_buckets(struct inode *inode,
+				       u64 blkno,
+				       u32 clusters,
+				       xattr_bucket_func *func,
+				       void *para)
+{
+	int i, j, ret = 0;
+	int blk_per_bucket = ocfs2_blocks_per_xattr_bucket(inode->i_sb);
+	u32 bpc = ocfs2_xattr_buckets_per_cluster(OCFS2_SB(inode->i_sb));
+	u32 num_buckets = clusters * bpc;
+	struct ocfs2_xattr_bucket bucket;
+
+	memset(&bucket, 0, sizeof(bucket));
+
+	mlog(0, "iterating xattr buckets in %u clusters starting from %llu\n",
+	     clusters, blkno);
+
+	for (i = 0; i < num_buckets; i++, blkno += blk_per_bucket) {
+		ret = ocfs2_read_blocks(OCFS2_SB(inode->i_sb),
+					blkno, blk_per_bucket,
+					bucket.bhs, OCFS2_BH_CACHED, inode);
+		if (ret) {
+			mlog_errno(ret);
+			goto out;
+		}
+
+		bucket.xh = (struct ocfs2_xattr_header *)bucket.bhs[0]->b_data;
+		/*
+		 * The real bucket num in this series of blocks is stored
+		 * in the 1st bucket.
+		 */
+		if (i == 0)
+			num_buckets = le16_to_cpu(bucket.xh->xh_num_buckets);
+
+		mlog(0, "iterating xattr bucket %llu\n", blkno);
+		if (func) {
+			ret = func(inode, &bucket, para);
+			if (ret) {
+				mlog_errno(ret);
+				break;
+			}
+		}
+
+		for (j = 0; j < blk_per_bucket; j++)
+			brelse(bucket.bhs[j]);
+		memset(&bucket, 0, sizeof(bucket));
+	}
+
+out:
+	for (j = 0; j < blk_per_bucket; j++)
+		brelse(bucket.bhs[j]);
+
+	return ret;
+}
+
+struct ocfs2_xattr_tree_list {
+	char *buffer;
+	size_t buffer_size;
+};
+
+static int ocfs2_xattr_bucket_get_name_value(struct inode *inode,
+					     struct ocfs2_xattr_header *xh,
+					     int index,
+					     int *block_off,
+					     int *new_offset)
+{
+	u16 name_offset;
+
+	if (index < 0 || index >= le16_to_cpu(xh->xh_count))
+		return -EINVAL;
+
+	name_offset = le16_to_cpu(xh->xh_entries[index].xe_name_offset);
+
+	*block_off = name_offset >> inode->i_sb->s_blocksize_bits;
+	*new_offset = name_offset % inode->i_sb->s_blocksize;
+
+	return 0;
+}
+
+static int ocfs2_list_xattr_bucket(struct inode *inode,
+				   struct ocfs2_xattr_bucket *bucket,
+				   void *para)
+{
+	int ret = 0;
+	struct ocfs2_xattr_tree_list *xl = (struct ocfs2_xattr_tree_list *)para;
+	size_t size;
+	int i, block_off, new_offset;
+
+	for (i = 0 ; i < le16_to_cpu(bucket->xh->xh_count); i++) {
+		struct ocfs2_xattr_entry *entry = &bucket->xh->xh_entries[i];
+		struct xattr_handler *handler =
+			ocfs2_xattr_handler(ocfs2_xattr_get_type(entry));
+
+		if (handler) {
+			ret = ocfs2_xattr_bucket_get_name_value(inode,
+								bucket->xh,
+								i,
+								&block_off,
+								&new_offset);
+			if (ret)
+				break;
+			size = handler->list(inode, xl->buffer, xl->buffer_size,
+					     bucket->bhs[block_off]->b_data +
+					     new_offset,
+					     entry->xe_name_len);
+			if (xl->buffer) {
+				if (size > xl->buffer_size)
+					return -ERANGE;
+				xl->buffer += size;
+			}
+			xl->buffer_size -= size;
+		}
+	}
+
+	return ret;
+}
+
+static int ocfs2_xattr_tree_list_index_block(struct inode *inode,
+					     struct ocfs2_xattr_tree_root *xt,
+					     char *buffer,
+					     size_t buffer_size)
+{
+	struct ocfs2_extent_list *el = &xt->xt_list;
+	int ret = 0;
+	u32 name_hash = UINT_MAX, e_cpos = 0, num_clusters = 0;
+	u64 p_blkno = 0;
+	struct ocfs2_xattr_tree_list xl = {
+		.buffer = buffer,
+		.buffer_size = buffer_size,
+	};
+
+	if (le16_to_cpu(el->l_next_free_rec) == 0)
+		return 0;
+
+	while (name_hash > 0) {
+		ret = ocfs2_xattr_get_rec(inode, name_hash, &p_blkno,
+					  &e_cpos, &num_clusters, el);
+		if (ret) {
+			mlog_errno(ret);
+			goto out;
+		}
+
+		ret = ocfs2_iterate_xattr_buckets(inode, p_blkno, num_clusters,
+						  ocfs2_list_xattr_bucket,
+						  &xl);
+		if (ret) {
+			mlog_errno(ret);
+			goto out;
+		}
+
+		if (e_cpos == 0)
+			break;
+
+		name_hash = e_cpos - 1;
+	}
+
+	ret = buffer_size - xl.buffer_size;
+out:
+	return ret;
+}
diff --git a/fs/ocfs2/xattr.h b/fs/ocfs2/xattr.h
index ed32377..02afa87 100644
--- a/fs/ocfs2/xattr.h
+++ b/fs/ocfs2/xattr.h
@@ -48,4 +48,13 @@
 extern int ocfs2_xattr_remove(struct inode *inode, struct buffer_head *di_bh);
 extern struct xattr_handler *ocfs2_xattr_handlers[];
 
+static inline u16 ocfs2_xattr_buckets_per_cluster(struct ocfs2_super *osb)
+{
+	return (1 << osb->s_clustersize_bits) / OCFS2_XATTR_BUCKET_SIZE;
+}
+
+static inline u16 ocfs2_blocks_per_xattr_bucket(struct super_block *sb)
+{
+	return OCFS2_XATTR_BUCKET_SIZE / (1 << sb->s_blocksize_bits);
+}
 #endif /* OCFS2_XATTR_H */