[BACK]Return to xfs_buf.c CVS log [TXT][DIR] Up to [Development] / xfs-linux / linux-2.4

File: [Development] / xfs-linux / linux-2.4 / Attic / xfs_buf.c (download)

Revision 1.216, Mon Jul 3 03:47:52 2006 UTC (11 years, 3 months ago) by nathans.longdrop.melbourne.sgi.com
Branch: MAIN
Changes since 1.215: +6 -5 lines

Improve xfsbufd delayed write submission patterns, after blktrace analysis.

Under a sequential create+allocate workload, blktrace reported backward
writes being issued by xfsbufd, and frequent inappropriate queue unplugs.
We now insert at the tail when moving from the delwri lists to the temp
lists, which maintains correct ordering, and we avoid unplugging queues
deep in the submit paths when we'd shortly do it at a higher level anyway.
blktrace now reports much healthier write patterns from xfsbufd for this
workload (and likely many others).
Merge of xfs-linux-melb:xfs-kern:26396a by kenmcd.

/*
 * Copyright (c) 2000-2006 Silicon Graphics, Inc.
 * All Rights Reserved.
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License as
 * published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it would be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write the Free Software Foundation,
 * Inc.,  51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 */
#include <linux/stddef.h>
#include <linux/errno.h>
#include <linux/slab.h>
#include <linux/pagemap.h>
#include <linux/init.h>
#include <linux/vmalloc.h>
#include <linux/blkdev.h>
#include <linux/locks.h>
#include <linux/sysctl.h>
#include <linux/proc_fs.h>
#include <linux/hash.h>
#include "xfs_linux.h"

#define BN_ALIGN_MASK	((1 << (PAGE_CACHE_SHIFT - BBSHIFT)) - 1)

#ifndef GFP_READAHEAD
#define GFP_READAHEAD	0
#endif

/*
 * A backport of the 2.5 scheduler is used by many vendors of 2.4-based
 * distributions.
 * We can only guess it's presences by the lack of the SCHED_YIELD flag.
 * If the heuristic doesn't work, change this define by hand.
 */
#ifndef SCHED_YIELD
#define __HAVE_NEW_SCHEDULER	1
#endif

/*
 * cpumask_t is used for supporting NR_CPUS > BITS_PER_LONG.
 * If support for this is present, migrate_to_cpu exists and provides
 * a wrapper around the set_cpus_allowed routine.
 */
#ifdef copy_cpumask
#define __HAVE_CPUMASK_T	1
#endif

#ifndef __HAVE_CPUMASK_T
# ifndef __HAVE_NEW_SCHEDULER
#  define migrate_to_cpu(cpu)	\
	do { current->cpus_allowed = 1UL << (cpu); } while (0)
# else
#  define migrate_to_cpu(cpu)	\
	set_cpus_allowed(current, 1UL << (cpu))
# endif
#endif

#ifndef VM_MAP
#define VM_MAP	VM_ALLOC
#endif

STATIC kmem_zone_t *xfs_buf_zone;
STATIC kmem_shaker_t xfs_buf_shake;

#define MAX_IO_DAEMONS		NR_CPUS
#define CPU_TO_DAEMON(cpu)	(cpu)
STATIC int xb_logio_daemons[MAX_IO_DAEMONS];
STATIC struct list_head xfs_buf_logiodone_tq[MAX_IO_DAEMONS];
STATIC wait_queue_head_t xfs_buf_logiodone_wait[MAX_IO_DAEMONS];
STATIC int xb_dataio_daemons[MAX_IO_DAEMONS];
STATIC struct list_head xfs_buf_dataiodone_tq[MAX_IO_DAEMONS];
STATIC wait_queue_head_t xfs_buf_dataiodone_wait[MAX_IO_DAEMONS];

/*
 * For pre-allocated buffer head pool
 */

#define NR_RESERVED_BH	64
static wait_queue_head_t	xb_resv_bh_wait;
static spinlock_t		xb_resv_bh_lock = SPIN_LOCK_UNLOCKED;
struct buffer_head		*xb_resv_bh = NULL;	/* list of bh */
int				xb_resv_bh_cnt = 0;	/* # of bh available */

STATIC void _xfs_buf_ioapply(xfs_buf_t *);
STATIC int xfs_buf_daemon(void *data);
STATIC int xfs_buf_daemon_wakeup(int, unsigned int);
STATIC void xfs_buf_delwri_queue(xfs_buf_t *, int);
STATIC void xfs_buf_runall_queues(struct list_head[]);

#ifdef XFS_BUF_TRACE
void
xfs_buf_trace(
	xfs_buf_t	*bp,
	char		*id,
	void		*data,
	void		*ra)
{
	ktrace_enter(xfs_buf_trace_buf,
		bp, id,
		(void *)(unsigned long)bp->b_flags,
		(void *)(unsigned long)bp->b_hold.counter,
		(void *)(unsigned long)bp->b_sema.count.counter,
		(void *)current,
		data, ra,
		(void *)(unsigned long)((bp->b_file_offset>>32) & 0xffffffff),
		(void *)(unsigned long)(bp->b_file_offset & 0xffffffff),
		(void *)(unsigned long)bp->b_buffer_length,
		NULL, NULL, NULL, NULL, NULL);
}
ktrace_t *xfs_buf_trace_buf;
#define XFS_BUF_TRACE_SIZE	4096
#define XB_TRACE(bp, id, data)	\
	xfs_buf_trace(bp, id, (void *)data, (void *)__builtin_return_address(0))
#else
#define XB_TRACE(bp, id, data)	do { } while (0)
#endif

#ifdef XFS_BUF_LOCK_TRACKING
# define XB_SET_OWNER(bp)	((bp)->b_last_holder = current->pid)
# define XB_CLEAR_OWNER(bp)	((bp)->b_last_holder = -1)
# define XB_GET_OWNER(bp)	((bp)->b_last_holder)
#else
# define XB_SET_OWNER(bp)	do { } while (0)
# define XB_CLEAR_OWNER(bp)	do { } while (0)
# define XB_GET_OWNER(bp)	do { } while (0)
#endif

#define xb_to_gfp(flags) \
	(((flags) & XBF_READ_AHEAD) ? GFP_READAHEAD : \
	 ((flags) & XBF_DONT_BLOCK) ? GFP_NOFS : GFP_KERNEL)

#define xb_to_km(flags) \
	 (((flags) & XBF_DONT_BLOCK) ? KM_NOFS : KM_SLEEP)

#define xfs_buf_allocate(flags) \
	kmem_zone_alloc(xfs_buf_zone, xb_to_km(flags))
#define xfs_buf_deallocate(bp) \
	kmem_zone_free(xfs_buf_zone, (bp));

/*
 *	Mapping of multi-page buffers into contiguous virtual space
 */

typedef struct a_list {
	void		*vm_addr;
	struct a_list	*next;
} a_list_t;

STATIC a_list_t		*as_free_head;
STATIC int		as_list_len;
STATIC DEFINE_SPINLOCK(as_lock);

/*
 *	Try to batch vunmaps because they are costly.
 */
STATIC void
free_address(
	void		*addr)
{
	a_list_t	*aentry;

	aentry = kmalloc(sizeof(a_list_t), GFP_ATOMIC & ~__GFP_HIGH);
	if (likely(aentry != NULL)) {
		spin_lock(&as_lock);
		aentry->next = as_free_head;
		aentry->vm_addr = addr;
		as_free_head = aentry;
		as_list_len++;
		spin_unlock(&as_lock);
	} else {
		vunmap(addr);
	}
}

STATIC void
purge_addresses(void)
{
	a_list_t	*aentry, *old;

	if (as_free_head == NULL)
		return;

	spin_lock(&as_lock);
	aentry = as_free_head;
	as_free_head = NULL;
	as_list_len = 0;
	spin_unlock(&as_lock);

	while ((old = aentry) != NULL) {
		vunmap(aentry->vm_addr);
		aentry = aentry->next;
		kfree(old);
	}
}

/*
 *	Internal xfs_buf_t object manipulation
 */

STATIC void
_xfs_buf_initialize(
	xfs_buf_t		*bp,
	xfs_buftarg_t		*target,
	xfs_off_t		range_base,
	size_t			range_length,
	xfs_buf_flags_t		flags)
{
	/*
	 * We don't want certain flags to appear in b_flags.
	 */
	flags &= ~(XBF_LOCK|XBF_MAPPED|XBF_DONT_BLOCK|XBF_READ_AHEAD);

	memset(bp, 0, sizeof(xfs_buf_t));
	atomic_set(&bp->b_hold, 1);
	init_MUTEX_LOCKED(&bp->b_iodonesema);
	INIT_LIST_HEAD(&bp->b_list);
	INIT_LIST_HEAD(&bp->b_hash_list);
	init_MUTEX_LOCKED(&bp->b_sema); /* held, no waiters */
	XB_SET_OWNER(bp);
	bp->b_target = target;
	bp->b_file_offset = range_base;
	/*
	 * Set buffer_length and count_desired to the same value initially.
	 * I/O routines should use count_desired, which will be the same in
	 * most cases but may be reset (e.g. XFS recovery).
	 */
	bp->b_buffer_length = bp->b_count_desired = range_length;
	bp->b_flags = flags | XBF_NONE;
	bp->b_bn = XFS_BUF_DADDR_NULL;
	atomic_set(&bp->b_pin_count, 0);
	init_waitqueue_head(&bp->b_waiters);

	XFS_STATS_INC(xb_create);
	XB_TRACE(bp, "initialize", target);
}

/*
 *	Allocate a page array capable of holding a specified number
 *	of pages, and point the page buf at it.
 */
STATIC int
_xfs_buf_get_pages(
	xfs_buf_t		*bp,
	int			page_count,
	xfs_buf_flags_t		flags)
{
	/* Make sure that we have a page list */
	if (bp->b_pages == NULL) {
		bp->b_offset = xfs_buf_poff(bp->b_file_offset);
		bp->b_page_count = page_count;
		if (page_count <= XB_PAGES) {
			bp->b_pages = bp->b_page_array;
		} else {
			bp->b_pages = kmem_alloc(sizeof(struct page *) *
					page_count, xb_to_km(flags));
			if (bp->b_pages == NULL)
				return -ENOMEM;
		}
		memset(bp->b_pages, 0, sizeof(struct page *) * page_count);
	}
	return 0;
}

/*
 *	Frees b_pages if it was allocated.
 */
STATIC void
_xfs_buf_free_pages(
	xfs_buf_t	*bp)
{
	if (bp->b_pages != bp->b_page_array) {
		kmem_free(bp->b_pages,
			  bp->b_page_count * sizeof(struct page *));
	}
}

/*
 *	Releases the specified buffer.
 *
 * 	The modification state of any associated pages is left unchanged.
 * 	The buffer most not be on any hash - use xfs_buf_rele instead for
 * 	hashed and refcounted buffers
 */
void
xfs_buf_free(
	xfs_buf_t		*bp)
{
	XB_TRACE(bp, "free", 0);

	ASSERT(list_empty(&bp->b_hash_list));

	if (bp->b_flags & _XBF_PAGE_CACHE) {
		uint		i;

		if ((bp->b_flags & XBF_MAPPED) && (bp->b_page_count > 1))
			free_address(bp->b_addr - bp->b_offset);

		for (i = 0; i < bp->b_page_count; i++)
			page_cache_release(bp->b_pages[i]);
		_xfs_buf_free_pages(bp);
	} else if (bp->b_flags & _XBF_KMEM_ALLOC) {
		 /*
		  * XXX(hch): bp->b_count_desired might be incorrect (see
		  * xfs_buf_associate_memory for details), but fortunately
		  * the Linux version of kmem_free ignores the len argument..
		  */
		kmem_free(bp->b_addr, bp->b_count_desired);
		_xfs_buf_free_pages(bp);
	}

	xfs_buf_deallocate(bp);
}

/*
 *	Finds all pages for buffer in question and builds it's page list.
 */
STATIC int
_xfs_buf_lookup_pages(
	xfs_buf_t		*bp,
	uint			flags)
{
	struct address_space	*mapping = bp->b_target->bt_mapping;
	size_t			blocksize = bp->b_target->bt_bsize;
	int			gfp_mask = xb_to_gfp(flags);
	unsigned short		page_count, i;
	pgoff_t			first;
	xfs_off_t		end;
	int			error;

	end = bp->b_file_offset + bp->b_buffer_length;
	page_count = xfs_buf_btoc(end) - xfs_buf_btoct(bp->b_file_offset);

	error = _xfs_buf_get_pages(bp, page_count, flags);
	if (unlikely(error))
		return error;
	bp->b_flags |= _XBF_PAGE_CACHE;

	first = bp->b_file_offset >> PAGE_CACHE_SHIFT;

	for (i = 0; i < bp->b_page_count; i++) {
		struct page	*page;
		uint		retries = 0;

	      retry:
		page = find_or_create_page(mapping, first + i, gfp_mask);
		if (unlikely(page == NULL)) {
			if (flags & XBF_READ_AHEAD) {
				bp->b_page_count = i;
				for (i = 0; i < bp->b_page_count; i++)
					unlock_page(bp->b_pages[i]);
				return -ENOMEM;
			}

			/*
			 * This could deadlock.
			 *
			 * But until all the XFS lowlevel code is revamped to
			 * handle buffer allocation failures we can't do much.
			 */
			if (!(++retries % 100))
				printk(KERN_ERR
					"possible deadlock in %s (mode:0x%x)\n",
					__FUNCTION__, gfp_mask);

			XFS_STATS_INC(xb_page_retries);
			xfs_buf_daemon_wakeup(0, gfp_mask);
			set_current_state(TASK_UNINTERRUPTIBLE);
			schedule_timeout(10);
			goto retry;
		}

		XFS_STATS_INC(xb_page_found);

		/* if we need to do I/O on a page record the fact */
		if (!Page_Uptodate(page)) {
			page_count--;
			if (blocksize == PAGE_CACHE_SIZE && (flags & XBF_READ))
				bp->b_locked = 1;
		}

		bp->b_pages[i] = page;
	}

	if (!bp->b_locked) {
		for (i = 0; i < bp->b_page_count; i++)
			unlock_page(bp->b_pages[i]);
	}

	if (page_count) {
		/* if we have any uptodate pages, mark that in the buffer */
		bp->b_flags &= ~XBF_NONE;

		/* if some pages aren't uptodate, mark that in the buffer */
		if (page_count != bp->b_page_count)
			bp->b_flags |= XBF_PARTIAL;
	}

	XB_TRACE(bp, "lookup_pages", (long)page_count);
	return error;
}

/*
 *	Map buffer into kernel address-space if nessecary.
 */
STATIC int
_xfs_buf_map_pages(
	xfs_buf_t		*bp,
	uint			flags)
{
	/* A single page buffer is always mappable */
	if (bp->b_page_count == 1) {
		bp->b_addr = page_address(bp->b_pages[0]) + bp->b_offset;
		bp->b_flags |= XBF_MAPPED;
	} else if (flags & XBF_MAPPED) {
		if (as_list_len > 64)
			purge_addresses();
		bp->b_addr = vmap(bp->b_pages, bp->b_page_count,
					VM_MAP, PAGE_KERNEL);
		if (unlikely(bp->b_addr == NULL))
			return -ENOMEM;
		bp->b_addr += bp->b_offset;
		bp->b_flags |= XBF_MAPPED;
	}

	return 0;
}

/*
 *	Pre-allocation of a pool of buffer heads for use in
 *	low-memory situations.
 */

/*
 *	_xfs_buf_prealloc_bh
 *
 *	Pre-allocate a pool of "count" buffer heads at startup.
 *	Puts them on a list at "xb_resv_bh"
 *	Returns number of bh actually allocated to pool.
 */
STATIC int
_xfs_buf_prealloc_bh(
	int			count)
{
	struct buffer_head	*bh;
	int			i;

	for (i = 0; i < count; i++) {
		bh = kmem_cache_alloc(bh_cachep, SLAB_KERNEL);
		if (!bh)
			break;
		bh->b_pprev = &xb_resv_bh;
		bh->b_next = xb_resv_bh;
		xb_resv_bh = bh;
		xb_resv_bh_cnt++;
	}
	return i;
}

/*
 *	_xfs_buf_get_prealloc_bh
 *
 *	Get one buffer head from our pre-allocated pool.
 *	If pool is empty, sleep 'til one comes back in.
 *	Returns aforementioned buffer head.
 */
STATIC struct buffer_head *
_xfs_buf_get_prealloc_bh(void)
{
	unsigned long		flags;
	struct buffer_head	*bh;
	DECLARE_WAITQUEUE	(wait, current);

	spin_lock_irqsave(&xb_resv_bh_lock, flags);

	if (xb_resv_bh_cnt < 1) {
		add_wait_queue(&xb_resv_bh_wait, &wait);
		do {
			set_current_state(TASK_UNINTERRUPTIBLE);
			spin_unlock_irqrestore(&xb_resv_bh_lock, flags);
			run_task_queue(&tq_disk);
			schedule();
			spin_lock_irqsave(&xb_resv_bh_lock, flags);
		} while (xb_resv_bh_cnt < 1);
		__set_current_state(TASK_RUNNING);
		remove_wait_queue(&xb_resv_bh_wait, &wait);
	}

	BUG_ON(xb_resv_bh_cnt < 1);
	BUG_ON(!xb_resv_bh);

	bh = xb_resv_bh;
	xb_resv_bh = bh->b_next;
	xb_resv_bh_cnt--;

	spin_unlock_irqrestore(&xb_resv_bh_lock, flags);
	return bh;
}

/*
 *	_xfs_buf_free_bh
 *
 *	Take care of buffer heads that we're finished with.
 *	Call this instead of just kmem_cache_free(bh_cachep, bh)
 *	when you're done with a bh.
 *
 *	If our pre-allocated pool is full, just free the buffer head.
 *	Otherwise, put it back in the pool, and wake up anybody
 *	waiting for one.
 */
STATIC inline void
_xfs_buf_free_bh(
	struct buffer_head	*bh)
{
	unsigned long		flags;
	int			free;

	if (! (free = xb_resv_bh_cnt >= NR_RESERVED_BH)) {
		spin_lock_irqsave(&xb_resv_bh_lock, flags);

		if (! (free = xb_resv_bh_cnt >= NR_RESERVED_BH)) {
			bh->b_pprev = &xb_resv_bh;
			bh->b_next = xb_resv_bh;
			xb_resv_bh = bh;
			xb_resv_bh_cnt++;

			if (waitqueue_active(&xb_resv_bh_wait)) {
				wake_up(&xb_resv_bh_wait);
			}
		}

		spin_unlock_irqrestore(&xb_resv_bh_lock, flags);
	}
	if (free) {
		kmem_cache_free(bh_cachep, bh);
	}
}

/*
 *	Finding and Reading Buffers
 */

/*
 *	_xfs_buf_find
 *
 *	Looks up, and creates if absent, a lockable buffer for
 *	a given range of an inode.  The buffer is returned
 *	locked.	 If other overlapping buffers exist, they are
 *	released before the new buffer is created and locked,
 *	which may imply that this call will block until those buffers
 *	are unlocked.  No I/O is implied by this call.
 */
xfs_buf_t *
_xfs_buf_find(
	xfs_buftarg_t		*btp,	/* block device target		*/
	xfs_off_t		ioff,	/* starting offset of range	*/
	size_t			isize,	/* length of range		*/
	xfs_buf_flags_t		flags,
	xfs_buf_t		*new_bp)
{
	xfs_off_t		range_base;
	size_t			range_length;
	xfs_bufhash_t		*hash;
	xfs_buf_t		*bp, *n;

	range_base = (ioff << BBSHIFT);
	range_length = (isize << BBSHIFT);

	/* Check for IOs smaller than the sector size / not sector aligned */
	ASSERT(!(range_length < (1 << btp->bt_sshift)));
	ASSERT(!(range_base & (xfs_off_t)btp->bt_smask));

	hash = &btp->bt_hash[hash_long((unsigned long)ioff, btp->bt_hashshift)];

	spin_lock(&hash->bh_lock);

	list_for_each_entry_safe(bp, n, &hash->bh_list, b_hash_list) {
		ASSERT(btp == bp->b_target);
		if (bp->b_file_offset == range_base &&
		    bp->b_buffer_length == range_length) {
			/*
			 * If we look at something, bring it to the
			 * front of the list for next time.
			 */
			atomic_inc(&bp->b_hold);
			list_move(&bp->b_hash_list, &hash->bh_list);
			goto found;
		}
	}

	/* No match found */
	if (new_bp) {
		_xfs_buf_initialize(new_bp, btp, range_base,
				range_length, flags);
		new_bp->b_hash = hash;
		list_add(&new_bp->b_hash_list, &hash->bh_list);
	} else {
		XFS_STATS_INC(xb_miss_locked);
	}

	spin_unlock(&hash->bh_lock);
	return new_bp;

found:
	spin_unlock(&hash->bh_lock);

	/* Attempt to get the semaphore without sleeping,
	 * if this does not work then we need to drop the
	 * spinlock and do a hard attempt on the semaphore.
	 */
	if (down_trylock(&bp->b_sema)) {
		if (!(flags & XBF_TRYLOCK)) {
			/* wait for buffer ownership */
			XB_TRACE(bp, "get_lock", 0);
			xfs_buf_lock(bp);
			XFS_STATS_INC(xb_get_locked_waited);
		} else {
			/* We asked for a trylock and failed, no need
			 * to look at file offset and length here, we
			 * know that this buffer at least overlaps our
			 * buffer and is locked, therefore our buffer
			 * either does not exist, or is this buffer.
			 */
			xfs_buf_rele(bp);
			XFS_STATS_INC(xb_busy_locked);
			return NULL;
		}
	} else {
		/* trylock worked */
		XB_SET_OWNER(bp);
	}

	if (bp->b_flags & XBF_STALE) {
		ASSERT((bp->b_flags & _XBF_DELWRI_Q) == 0);
		bp->b_flags &= XBF_MAPPED;
	}
	XB_TRACE(bp, "got_lock", 0);
	XFS_STATS_INC(xb_get_locked);
	return bp;
}

/*
 *	xfs_buf_get_flags assembles a buffer covering the specified range.
 *
 *	Storage in memory for all portions of the buffer will be allocated,
 *	although backing storage may not be.
 */
xfs_buf_t *
xfs_buf_get_flags(
	xfs_buftarg_t		*target,/* target for buffer		*/
	xfs_off_t		ioff,	/* starting offset of range	*/
	size_t			isize,	/* length of range		*/
	xfs_buf_flags_t		flags)
{
	xfs_buf_t		*bp, *new_bp;
	int			error = 0, i;

	new_bp = xfs_buf_allocate(flags);
	if (unlikely(!new_bp))
		return NULL;

	bp = _xfs_buf_find(target, ioff, isize, flags, new_bp);
	if (bp == new_bp) {
		error = _xfs_buf_lookup_pages(bp, flags);
		if (error)
			goto no_buffer;
	} else {
		xfs_buf_deallocate(new_bp);
		if (unlikely(bp == NULL))
			return NULL;
	}

	for (i = 0; i < bp->b_page_count; i++)
		mark_page_accessed(bp->b_pages[i]);

	if (!(bp->b_flags & XBF_MAPPED)) {
		error = _xfs_buf_map_pages(bp, flags);
		if (unlikely(error)) {
			printk(KERN_WARNING "%s: failed to map pages\n",
					__FUNCTION__);
			goto no_buffer;
		}
	}

	XFS_STATS_INC(xb_get);

	/*
	 * Always fill in the block number now, the mapped cases can do
	 * their own overlay of this later.
	 */
	bp->b_bn = ioff;
	bp->b_count_desired = bp->b_buffer_length;

	XB_TRACE(bp, "get", (unsigned long)flags);
	return bp;

 no_buffer:
	if (flags & (XBF_LOCK | XBF_TRYLOCK))
		xfs_buf_unlock(bp);
	xfs_buf_rele(bp);
	return NULL;
}

xfs_buf_t *
xfs_buf_read_flags(
	xfs_buftarg_t		*target,
	xfs_off_t		ioff,
	size_t			isize,
	xfs_buf_flags_t		flags)
{
	xfs_buf_t		*bp;

	flags |= XBF_READ;

	bp = xfs_buf_get_flags(target, ioff, isize, flags);
	if (bp) {
		if (XBF_NOT_DONE(bp)) {
			XB_TRACE(bp, "read", (unsigned long)flags);
			XFS_STATS_INC(xb_get_read);
			xfs_buf_iostart(bp, flags);
		} else if (flags & XBF_ASYNC) {
			XB_TRACE(bp, "read_async", (unsigned long)flags);
			/*
			 * Read ahead call which is already satisfied,
			 * drop the buffer
			 */
			goto no_buffer;
		} else {
			XB_TRACE(bp, "read_done", (unsigned long)flags);
			/* We do not want read in the flags */
			bp->b_flags &= ~XBF_READ;
		}
	}

	return bp;

 no_buffer:
	if (flags & (XBF_LOCK | XBF_TRYLOCK))
		xfs_buf_unlock(bp);
	xfs_buf_rele(bp);
	return NULL;
}

/*
 * Create a skeletal xfs_buf (no pages associated with it).
 */
xfs_buf_t *
xfs_buf_lookup(
	xfs_buftarg_t		*target,
	xfs_off_t		ioff,
	size_t			isize,
	xfs_buf_flags_t	flags)
{
	xfs_buf_t		*bp;

	flags |= _XBF_PRIVATE_BH;
	bp = xfs_buf_allocate(flags);
	if (bp) {
		_xfs_buf_initialize(bp, target, ioff, isize, flags);
	}
	return bp;
}

/*
 * If we are not low on memory then do the readahead in a deadlock
 * safe manner.
 */
void
xfs_buf_readahead(
	xfs_buftarg_t		*target,
	xfs_off_t		ioff,
	size_t			isize,
	xfs_buf_flags_t		flags)
{
	flags |= (XBF_TRYLOCK|XBF_ASYNC|XBF_READ_AHEAD);
	xfs_buf_read_flags(target, ioff, isize, flags);
}

xfs_buf_t *
xfs_buf_get_empty(
	size_t			len,
	xfs_buftarg_t		*target)
{
	xfs_buf_t		*bp;

	bp = xfs_buf_allocate(0);
	if (bp)
		_xfs_buf_initialize(bp, target, 0, len, 0);
	return bp;
}

static inline struct page *
mem_to_page(
	void			*addr)
{
	if (((unsigned long)addr < VMALLOC_START) ||
	    ((unsigned long)addr >= VMALLOC_END)) {
		return virt_to_page(addr);
	} else {
		return vmalloc_to_page(addr);
	}
}

int
xfs_buf_associate_memory(
	xfs_buf_t		*bp,
	void			*mem,
	size_t			len)
{
	int			rval;
	int			i = 0;
	size_t			ptr;
	size_t			end, end_cur;
	off_t			offset;
	int			page_count;

	page_count = PAGE_CACHE_ALIGN(len) >> PAGE_CACHE_SHIFT;
	offset = (off_t) mem - ((off_t)mem & PAGE_CACHE_MASK);
	if (offset && (len > PAGE_CACHE_SIZE))
		page_count++;

	/* Free any previous set of page pointers */
	if (bp->b_pages)
		_xfs_buf_free_pages(bp);

	bp->b_pages = NULL;
	bp->b_addr = mem;

	rval = _xfs_buf_get_pages(bp, page_count, 0);
	if (rval)
		return rval;

	bp->b_offset = offset;
	ptr = (size_t) mem & PAGE_CACHE_MASK;
	end = PAGE_CACHE_ALIGN((size_t) mem + len);
	end_cur = end;
	/* set up first page */
	bp->b_pages[0] = mem_to_page(mem);

	ptr += PAGE_CACHE_SIZE;
	bp->b_page_count = ++i;
	while (ptr < end) {
		bp->b_pages[i] = mem_to_page((void *)ptr);
		bp->b_page_count = ++i;
		ptr += PAGE_CACHE_SIZE;
	}
	bp->b_locked = 0;

	bp->b_count_desired = bp->b_buffer_length = len;
	bp->b_flags |= XBF_MAPPED | _XBF_PRIVATE_BH;

	return 0;
}

xfs_buf_t *
xfs_buf_get_noaddr(
	size_t			len,
	xfs_buftarg_t		*target)
{
	size_t			malloc_len = len;
	xfs_buf_t		*bp;
	void			*data;
	int			error;

	bp = xfs_buf_allocate(0);
	if (unlikely(bp == NULL))
		goto fail;
	_xfs_buf_initialize(bp, target, 0, len, XBF_FORCEIO);

 try_again:
	data = kmem_alloc(malloc_len, KM_SLEEP | KM_MAYFAIL);
	if (unlikely(data == NULL))
		goto fail_free_buf;

	/* check whether alignment matches.. */
	if ((__psunsigned_t)data !=
	    ((__psunsigned_t)data & ~target->bt_smask)) {
		/* .. else double the size and try again */
		kmem_free(data, malloc_len);
		malloc_len <<= 1;
		goto try_again;
	}

	error = xfs_buf_associate_memory(bp, data, len);
	if (error)
		goto fail_free_mem;
	bp->b_flags |= _XBF_KMEM_ALLOC;

	xfs_buf_unlock(bp);

	XB_TRACE(bp, "no_daddr", data);
	return bp;
 fail_free_mem:
	kmem_free(data, malloc_len);
 fail_free_buf:
	xfs_buf_free(bp);
 fail:
	return NULL;
}

/*
 *	Increment reference count on buffer, to hold the buffer concurrently
 *	with another thread which may release (free) the buffer asynchronously.
 *	Must hold the buffer already to call this function.
 */
void
xfs_buf_hold(
	xfs_buf_t		*bp)
{
	atomic_inc(&bp->b_hold);
	XB_TRACE(bp, "hold", 0);
}

/*
 *	Releases a hold on the specified buffer.  If the
 *	the hold count is 1, calls xfs_buf_free.
 */
void
xfs_buf_rele(
	xfs_buf_t		*bp)
{
	xfs_bufhash_t		*hash = bp->b_hash;

	XB_TRACE(bp, "rele", bp->b_relse);

	if (unlikely(!hash)) {
		ASSERT(!bp->b_relse);
		if (atomic_dec_and_test(&bp->b_hold))
			xfs_buf_free(bp);
		return;
	}

	if (atomic_dec_and_lock(&bp->b_hold, &hash->bh_lock)) {
		int		do_free = 1;

		if (bp->b_relse) {
			atomic_inc(&bp->b_hold);
			spin_unlock(&hash->bh_lock);
			(*(bp->b_relse)) (bp);
			spin_lock(&hash->bh_lock);
			do_free = 0;
		}

		if (bp->b_flags & XBF_FS_MANAGED) {
			do_free = 0;
		}

		if (do_free) {
			ASSERT((bp->b_flags & (XBF_DELWRI|_XBF_DELWRI_Q)) == 0);
			list_del_init(&bp->b_hash_list);
			spin_unlock(&hash->bh_lock);
			xfs_buf_free(bp);
		} else {
			spin_unlock(&hash->bh_lock);
		}
	} else {
		/*
		 * Catch reference count leaks
		 */
		ASSERT(atomic_read(&bp->b_hold) >= 0);
	}
}


/*
 *	Mutual exclusion on buffers.  Locking model:
 *
 *	Buffers associated with inodes for which buffer locking
 *	is not enabled are not protected by semaphores, and are
 *	assumed to be exclusively owned by the caller.  There is a
 *	spinlock in the buffer, used by the caller when concurrent
 *	access is possible.
 */

/*
 *	Locks a buffer object, if it is not already locked.
 *	Note that this in no way locks the underlying pages, so it is only
 *	useful for synchronizing concurrent use of buffer objects, not for
 *	synchronizing independent access to the underlying pages.
 */
int
xfs_buf_cond_lock(
	xfs_buf_t		*bp)
{
	int			locked;

	locked = down_trylock(&bp->b_sema) == 0;
	if (locked) {
		XB_SET_OWNER(bp);
	}
	XB_TRACE(bp, "cond_lock", (long)locked);
	return locked ? 0 : -EBUSY;
}

#if defined(DEBUG) || defined(XFS_BLI_TRACE)
int
xfs_buf_lock_value(
	xfs_buf_t		*bp)
{
	return atomic_read(&bp->b_sema.count);
}
#endif

/*
 *	Locks a buffer object.
 *	Note that this in no way locks the underlying pages, so it is only
 *	useful for synchronizing concurrent use of buffer objects, not for
 *	synchronizing independent access to the underlying pages.
 */
void
xfs_buf_lock(
	xfs_buf_t		*bp)
{
	XB_TRACE(bp, "lock", 0);
	if (atomic_read(&bp->b_io_remaining))
		run_task_queue(&tq_disk);
	down(&bp->b_sema);
	XB_SET_OWNER(bp);
	XB_TRACE(bp, "locked", 0);
}

/*
 *	Releases the lock on the buffer object.
 *	If the buffer is marked delwri but is not queued, do so before we
 *	unlock the buffer as we need to set flags correctly.  We also need to
 *	take a reference for the delwri queue because the unlocker is going to
 *	drop their's and they don't know we just queued it.
 */
void
xfs_buf_unlock(
	xfs_buf_t		*bp)
{
	if ((bp->b_flags & (XBF_DELWRI|_XBF_DELWRI_Q)) == XBF_DELWRI) {
		atomic_inc(&bp->b_hold);
		bp->b_flags |= XBF_ASYNC;
		xfs_buf_delwri_queue(bp, 0);
	}

	XB_CLEAR_OWNER(bp);
	up(&bp->b_sema);
	XB_TRACE(bp, "unlock", 0);
}


/*
 *	Pinning Buffer Storage in Memory
 *	Ensure that no attempt to force a buffer to disk will succeed.
 */
void
xfs_buf_pin(
	xfs_buf_t		*bp)
{
	atomic_inc(&bp->b_pin_count);
	XB_TRACE(bp, "pin", (long)bp->b_pin_count.counter);
}

void
xfs_buf_unpin(
	xfs_buf_t		*bp)
{
	if (atomic_dec_and_test(&bp->b_pin_count))
		wake_up_all(&bp->b_waiters);
	XB_TRACE(bp, "unpin", (long)bp->b_pin_count.counter);
}

int
xfs_buf_ispin(
	xfs_buf_t		*bp)
{
	return atomic_read(&bp->b_pin_count);
}

STATIC void
xfs_buf_wait_unpin(
	xfs_buf_t		*bp)
{
	DECLARE_WAITQUEUE	(wait, current);

	if (atomic_read(&bp->b_pin_count) == 0)
		return;

	add_wait_queue(&bp->b_waiters, &wait);
	for (;;) {
		set_current_state(TASK_UNINTERRUPTIBLE);
		if (atomic_read(&bp->b_pin_count) == 0)
			break;
		if (atomic_read(&bp->b_io_remaining))
			run_task_queue(&tq_disk);
		schedule();
	}
	remove_wait_queue(&bp->b_waiters, &wait);
	set_current_state(TASK_RUNNING);
}

/*
 *	Buffer Utility Routines
 */

void
xfs_buf_iodone_sched(
	void			*v)
{
	xfs_buf_t		*bp = (xfs_buf_t *)v;

	if (bp->b_iodone)
		(*(bp->b_iodone))(bp);
	else if (bp->b_flags & XBF_ASYNC)
		xfs_buf_relse(bp);
}

void
xfs_buf_ioend(
	xfs_buf_t		*bp,
	int			dataio,
	int			schedule)
{
	bp->b_flags &= ~(XBF_READ | XBF_WRITE);
	if (bp->b_error == 0) {
		bp->b_flags &= ~(XBF_PARTIAL | XBF_NONE);
	}

	XB_TRACE(bp, "iodone", bp->b_iodone);

	if ((bp->b_iodone) || (bp->b_flags & XBF_ASYNC)) {
		if (schedule) {
			int	daemon = CPU_TO_DAEMON(smp_processor_id());

			INIT_TQUEUE(&bp->b_iodone_sched,
				xfs_buf_iodone_sched, (void *)bp);
			queue_task(&bp->b_iodone_sched, dataio ?
				&xfs_buf_dataiodone_tq[daemon] :
				&xfs_buf_logiodone_tq[daemon]);
			wake_up(dataio ?
				&xfs_buf_dataiodone_wait[daemon] :
				&xfs_buf_logiodone_wait[daemon]);
		} else {
			xfs_buf_iodone_sched(bp);
		}
	} else {
		up(&bp->b_iodonesema);
	}
}

void
xfs_buf_ioerror(
	xfs_buf_t		*bp,
	int			error)
{
	ASSERT(error >= 0 && error <= 0xffff);
	bp->b_error = (unsigned short)error;
	XB_TRACE(bp, "ioerror", (unsigned long)error);
}

/*
 *	Initiate I/O on a buffer, based on the flags supplied.
 *	The b_iodone routine in the buffer supplied will only be called
 *	when all of the subsidiary I/O requests, if any, have been completed.
 */
int
xfs_buf_iostart(
	xfs_buf_t		*bp,
	xfs_buf_flags_t		flags)
{
	int			status = 0;

	XB_TRACE(bp, "iostart", (unsigned long)flags);

	if (flags & XBF_DELWRI) {
		bp->b_flags &= ~(XBF_READ | XBF_WRITE | XBF_ASYNC);
		bp->b_flags |= flags & (XBF_DELWRI | XBF_ASYNC);
		xfs_buf_delwri_queue(bp, 1);
		return status;
	}

	bp->b_flags &= ~(XBF_READ | XBF_WRITE | XBF_ASYNC | XBF_DELWRI | \
			XBF_READ_AHEAD | _XBF_RUN_QUEUES);
	bp->b_flags |= flags & (XBF_READ | XBF_WRITE | XBF_ASYNC | \
			XBF_READ_AHEAD | _XBF_RUN_QUEUES);

	BUG_ON(bp->b_bn == XFS_BUF_DADDR_NULL);

	/* For writes allow an alternate strategy routine to precede
	 * the actual I/O request (which may not be issued at all in
	 * a shutdown situation, for example).
	 */
	status = (flags & XBF_WRITE) ?
		xfs_buf_iostrategy(bp) : xfs_buf_iorequest(bp);

	/* Wait for I/O if we are not an async request.
	 * Note: async I/O request completion will release the buffer,
	 * and that can already be done by this point.  So using the
	 * buffer pointer from here on, after async I/O, is invalid.
	 */
	if (!status && !(flags & XBF_ASYNC))
		status = xfs_buf_iowait(bp);

	return status;
}

STATIC __inline__ int
_xfs_buf_iolocked(
	xfs_buf_t		*bp)
{
	ASSERT(bp->b_flags & (XBF_READ | XBF_WRITE));
	if (bp->b_target->bt_bsize < PAGE_CACHE_SIZE)
		return bp->b_locked;
	if (bp->b_flags & XBF_READ)
		return bp->b_locked;
	return (bp->b_flags & _XBF_PAGE_CACHE);
}

STATIC void
_xfs_buf_ioend(
	xfs_buf_t		*bp,
	int			schedule)
{
	int			i;

	if (atomic_dec_and_test(&bp->b_io_remaining) != 1)
		return;

	if (_xfs_buf_iolocked(bp))
		for (i = 0; i < bp->b_page_count; i++)
			unlock_page(bp->b_pages[i]);
	bp->b_locked = 0;
	xfs_buf_ioend(bp, (bp->b_flags & XBF_FS_DATAIOD), schedule);
}

STATIC void
_end_io_xfs_buf(
	struct buffer_head	*bh,
	int			uptodate,
	int			fullpage)
{
	struct page		*page = bh->b_page;
	xfs_buf_t		*bp = (xfs_buf_t *)bh->b_private;

	mark_buffer_uptodate(bh, uptodate);
	put_bh(bh);

	if (!uptodate) {
		SetPageError(page);
		bp->b_error = EIO;
	}

	if (fullpage) {
		unlock_buffer(bh);
		_xfs_buf_free_bh(bh);
		if (!PageError(page))
			SetPageUptodate(page);
	} else {
		static spinlock_t page_uptodate_lock = SPIN_LOCK_UNLOCKED;
		struct buffer_head *bp;
		unsigned long flags;

		ASSERT(PageLocked(page));
		spin_lock_irqsave(&page_uptodate_lock, flags);
		clear_buffer_async(bh);
		unlock_buffer(bh);
		for (bp = bh->b_this_page; bp != bh; bp = bp->b_this_page) {
			if (buffer_locked(bp)) {
				if (buffer_async(bp))
					break;
			} else if (!buffer_uptodate(bp))
				break;
		}
		spin_unlock_irqrestore(&page_uptodate_lock, flags);
		if (bp == bh && !PageError(page))
			SetPageUptodate(page);
	}

	_xfs_buf_ioend(bp, 1);
}

STATIC void
_xfs_buf_end_io_complete_pages(
	struct buffer_head	*bh,
	int			uptodate)
{
	_end_io_xfs_buf(bh, uptodate, 1);
}

STATIC void
_xfs_buf_end_io_partial_pages(
	struct buffer_head	*bh,
	int			uptodate)
{
	_end_io_xfs_buf(bh, uptodate, 0);
}

/*
 *	Handling of buftargs.
 */

/*
 *	Wait for any bufs with callbacks that have been submitted but
 *	have not yet returned... walk the hash list for the target.
 */
void
xfs_wait_buftarg(
	xfs_buftarg_t	*btp)
{
	xfs_buf_t	*bp, *n;
	xfs_bufhash_t	*hash;
	uint		i;

	for (i = 0; i < (1 << btp->bt_hashshift); i++) {
		hash = &btp->bt_hash[i];
again:
		spin_lock(&hash->bh_lock);
		list_for_each_entry_safe(bp, n, &hash->bh_list, b_hash_list) {
			ASSERT(btp == bp->b_target);
			if (!(bp->b_flags & XBF_FS_MANAGED)) {
				spin_unlock(&hash->bh_lock);
				/*
				 * Catch superblock reference count leaks
				 * immediately
				 */
				BUG_ON(bp->b_bn == 0);
				delay(100);
				goto again;
			}
		}
		spin_unlock(&hash->bh_lock);
	}
}

/*
 *	Allocate buffer hash table for a given target.
 *	For devices containing metadata (i.e. not the log/realtime devices)
 *	we need to allocate a much larger hash table.
 */
STATIC void
xfs_alloc_bufhash(
	xfs_buftarg_t		*btp,
	int			external)
{
	unsigned int		i;

	btp->bt_hashshift = external ? 3 : 8;	/* 8 or 256 buckets */
	btp->bt_hashmask = (1 << btp->bt_hashshift) - 1;
	btp->bt_hash = kmem_zalloc((1 << btp->bt_hashshift) *
					sizeof(xfs_bufhash_t), KM_SLEEP);
	for (i = 0; i < (1 << btp->bt_hashshift); i++) {
		spin_lock_init(&btp->bt_hash[i].bh_lock);
		INIT_LIST_HEAD(&btp->bt_hash[i].bh_list);
	}
}

STATIC void
xfs_free_bufhash(
	xfs_buftarg_t		*btp)
{
	kmem_free(btp->bt_hash,
			(1 << btp->bt_hashshift) * sizeof(xfs_bufhash_t));
	btp->bt_hash = NULL;
}

/*
 *	buftarg list for delwrite queue processing
 */
STATIC LIST_HEAD(xfs_buftarg_list);
STATIC DEFINE_SPINLOCK(xfs_buftarg_lock);

STATIC void
xfs_register_buftarg(
	xfs_buftarg_t           *btp)
{
	spin_lock(&xfs_buftarg_lock);
	list_add(&btp->bt_list, &xfs_buftarg_list);
	spin_unlock(&xfs_buftarg_lock);
}

STATIC void
xfs_unregister_buftarg(
	xfs_buftarg_t           *btp)
{
	spin_lock(&xfs_buftarg_lock);
	list_del(&btp->bt_list);
	spin_unlock(&xfs_buftarg_lock);
}

void
xfs_free_buftarg(
	xfs_buftarg_t		*btp,
	int			external)
{
	xfs_flush_buftarg(btp, 1);
	if (external)
		xfs_blkdev_put(btp->bt_bdev);
	xfs_free_bufhash(btp);
	iput(btp->bt_mapping->host);

	/* unregister the buftarg first so that we don't get a
	 * wakeup finding a non-existent task */
	xfs_unregister_buftarg(btp);
	clear_bit(XBT_ACTIVE, &btp->bt_flags);
	barrier();
	wait_for_completion(&btp->bt_done);

	kmem_free(btp, sizeof(*btp));
}

int
xfs_setsize_buftarg(
	xfs_buftarg_t		*btp,
	unsigned int		blocksize,
	unsigned int		sectorsize)
{
	btp->bt_bsize = blocksize;
	btp->bt_sshift = ffs(sectorsize) - 1;
	btp->bt_smask = sectorsize - 1;

	if (set_blocksize(btp->bt_kdev, sectorsize)) {
		printk(KERN_WARNING
			"XFS: Cannot set_blocksize to %u on device 0x%x\n",
			sectorsize, kdev_t_to_nr(btp->bt_kdev));
		return EINVAL;
	}
	return 0;
}

STATIC int
xfs_mapping_buftarg(
	xfs_buftarg_t		*btp,
	struct block_device	*bdev)
{
	kdev_t			kdev;
	struct inode		*inode;
	struct address_space	*mapping;
	static struct address_space_operations mapping_aops = {
		.sync_page = block_sync_page,
	};

	kdev = to_kdev_t(bdev->bd_dev);
	inode = new_inode(bdev->bd_inode->i_sb);
	if (!inode) {
		printk(KERN_WARNING
			"XFS: Cannot allocate mapping inode for device %s\n",
			XFS_BUFTARG_NAME(btp));
		return ENOMEM;
	}
	inode->i_mode = S_IFBLK;
	inode->i_dev  = kdev;
	inode->i_rdev = kdev;
	inode->i_bdev = bdev;
	mapping = &inode->i_data;
	mapping->a_ops = &mapping_aops;
	mapping->gfp_mask = GFP_NOFS;
	btp->bt_mapping = mapping;
	return 0;
}

STATIC int
xfs_alloc_delwrite_queue(
	xfs_buftarg_t		*btp)
{
	int	error = 0;

	INIT_LIST_HEAD(&btp->bt_list);
	INIT_LIST_HEAD(&btp->bt_delwrite_queue);
	spinlock_init(&btp->bt_delwrite_lock, "delwri_lock");
	init_completion(&btp->bt_done);
	btp->bt_flags = 0;
	error = kernel_thread(xfs_buf_daemon, btp, CLONE_FS|CLONE_FILES|CLONE_VM);
	if (error < 0)
		goto out_error;
	xfs_register_buftarg(btp);
	return 0;
out_error:
	return error;
}

xfs_buftarg_t *
xfs_alloc_buftarg(
	struct block_device	*bdev,
	int			external)
{
	xfs_buftarg_t		*btp;
	kdev_t			kdev;

	btp = kmem_zalloc(sizeof(*btp), KM_SLEEP);

	kdev = to_kdev_t(bdev->bd_dev);
	btp->bt_dev =  bdev->bd_dev;
	btp->bt_kdev = kdev;
	btp->bt_bdev = bdev;
	switch (MAJOR(btp->bt_dev)) {
	case MD_MAJOR:
	case EVMS_MAJOR:
		btp->bt_ioflags = XBIO_ALIGNED_ONLY;
		break;
	case LOOP_MAJOR:
	case LVM_BLK_MAJOR:
		btp->bt_ioflags = XBIO_SECTOR_ONLY;
		break;
	}
	if (xfs_setsize_buftarg(btp, PAGE_CACHE_SIZE, get_hardsect_size(kdev)))
		goto error;
	if (xfs_mapping_buftarg(btp, bdev))
		goto error;
	if (xfs_alloc_delwrite_queue(btp))
		goto error;
	xfs_alloc_bufhash(btp, external);
	return btp;

error:
	kmem_free(btp, sizeof(*btp));
	return NULL;
}

/*
 *	Initiate I/O on part of a page we are interested in
 */
STATIC int
_xfs_buf_page_io(
	struct page		*page,	/* Page structure we are dealing with */
	xfs_buftarg_t		*btp,	/* device parameters (bsz, ssz, dev) */
	xfs_buf_t		*bp,	/* xfs_buf holding it, can be NULL */
	xfs_daddr_t		bn,	/* starting block number */
	size_t			pg_offset,	/* starting offset in page */
	size_t			pg_length,	/* count of data to process */
	int			rw,	/* read/write operation */
	int			flush)
{
	size_t			sector;
	size_t			blk_length = 0;
	struct buffer_head	*bh, *head, *bufferlist[MAX_BUF_PER_PAGE];
	int			sector_shift = btp->bt_sshift;
	int			i = 0, cnt = 0;
	int			public_bh = 0;
	int			multi_ok;

	if ((btp->bt_bsize < PAGE_CACHE_SIZE) &&
	    !(bp->b_flags & _XBF_PRIVATE_BH)) {
		int		cache_ok;

		cache_ok = !((bp->b_flags & XBF_FORCEIO) || (rw == WRITE));
		public_bh = multi_ok = 1;
		sector = 1 << sector_shift;

		ASSERT(PageLocked(page));
		if (!page_has_buffers(page))
			create_empty_buffers(page, btp->bt_kdev, sector);

		i = sector >> BBSHIFT;
		bn -= (pg_offset >> BBSHIFT);

		/* Find buffer_heads belonging to just this xfs_buf */
		bh = head = page_buffers(page);
		do {
			if (buffer_uptodate(bh) && cache_ok)
				continue;
			if (blk_length < pg_offset)
				continue;
			if (blk_length >= pg_offset + pg_length)
				break;

			lock_buffer(bh);
			get_bh(bh);
			bh->b_size = sector;
			bh->b_blocknr = bn;
			bufferlist[cnt++] = bh;

		} while ((bn += i),
			 (blk_length += sector),
			  (bh = bh->b_this_page) != head);

		goto request;
	}

	/* Calculate the block offsets and length we will be using */
	if (pg_offset) {
		size_t		block_offset;

		block_offset = pg_offset >> sector_shift;
		block_offset = pg_offset - (block_offset << sector_shift);
		blk_length = (pg_length + block_offset + btp->bt_smask) >>
								sector_shift;
	} else {
		blk_length = (pg_length + btp->bt_smask) >> sector_shift;
	}

	/* This will attempt to make a request bigger than the sector
	 * size if we are well aligned.
	 */
	switch (bp->b_target->bt_ioflags) {
	case 0:
		sector = blk_length << sector_shift;
		blk_length = 1;
		break;
	case XBIO_ALIGNED_ONLY:
		if ((pg_offset == 0) && (pg_length == PAGE_CACHE_SIZE) &&
		    (((unsigned int) bn) & BN_ALIGN_MASK) == 0) {
			sector = blk_length << sector_shift;
			blk_length = 1;
			break;
		}
	case XBIO_SECTOR_ONLY:
		/* Fallthrough, same as default */
	default:
		sector = 1 << sector_shift;
	}

	/* If we are doing I/O larger than the bh->b_size field then
	 * we need to split this request up.
	 */
	while (sector > ((1ULL << NBBY * sizeof(bh->b_size)) - 1)) {
		sector >>= 1;
		blk_length++;
	}

	multi_ok = (blk_length != 1);
	i = sector >> BBSHIFT;

	for (; blk_length > 0; bn += i, blk_length--, pg_offset += sector) {
		bh = kmem_cache_alloc(bh_cachep, SLAB_NOFS);
		if (!bh)
			bh = _xfs_buf_get_prealloc_bh();
		memset(bh, 0, sizeof(*bh));
		bh->b_blocknr = bn;
		bh->b_size = sector;
		bh->b_dev = btp->bt_kdev;
		set_buffer_locked(bh);
		set_bh_page(bh, page, pg_offset);
		init_waitqueue_head(&bh->b_wait);
		atomic_set(&bh->b_count, 1);
		bufferlist[cnt++] = bh;
	}

request:
	if (cnt) {
		void	(*callback)(struct buffer_head *, int);

		callback = (multi_ok && public_bh) ?
				_xfs_buf_end_io_partial_pages :
				_xfs_buf_end_io_complete_pages;

		/* Account for additional buffers in progress */
		atomic_add(cnt, &bp->b_io_remaining);

#ifdef RQ_WRITE_ORDERED
		if (flush)
			set_bit(BH_Ordered_Flush, &bufferlist[cnt-1]->b_state);
#endif

		for (i = 0; i < cnt; i++) {
			bh = bufferlist[i];
			init_buffer(bh, callback, bp);
			bh->b_rdev = bh->b_dev;
			bh->b_rsector = bh->b_blocknr;
			set_buffer_mapped(bh);
			set_buffer_async(bh);
			set_buffer_req(bh);
			if (rw == WRITE)
				set_buffer_uptodate(bh);
			generic_make_request(rw, bh);
		}
		return 0;
	}

	/*
	 * We have no I/O to submit, let the caller know that
	 * we have skipped over this page entirely.
	 */
	return 1;
}

STATIC void
_xfs_buf_page_apply(
	xfs_buf_t		*bp,
	xfs_off_t		offset,
	struct page		*page,
	size_t			pg_offset,
	size_t			pg_length,
	int			last)
{
	xfs_daddr_t		bn = bp->b_bn;
	xfs_buftarg_t		*btp = bp->b_target;
	xfs_off_t		bp_offset;
	int			status, locking;

	ASSERT(page);
	ASSERT(bp->b_flags & (XBF_READ|XBF_WRITE));

	if ((btp->bt_bsize == PAGE_CACHE_SIZE) &&
	    (bp->b_buffer_length < PAGE_CACHE_SIZE) &&
	    (bp->b_flags & XBF_READ) && bp->b_locked) {
		bn -= (bp->b_offset >> BBSHIFT);
		pg_offset = 0;
		pg_length = PAGE_CACHE_SIZE;
	} else {
		bp_offset = offset - bp->b_file_offset;
		if (bp_offset) {
			bn += (bp_offset + BBMASK) >> BBSHIFT;
		}
	}

	locking = _xfs_buf_iolocked(bp);
	if (bp->b_flags & XBF_WRITE) {
		if (locking && !bp->b_locked)
			lock_page(page);
		status = _xfs_buf_page_io(page, btp, bp, bn,
				pg_offset, pg_length, WRITE,
				last && (bp->b_flags & XBF_ORDERED));
	} else {
		status = _xfs_buf_page_io(page, btp, bp, bn,
				pg_offset, pg_length, READ, 0);
	}
	if (status && locking && !(bp->b_target->bt_bsize < PAGE_CACHE_SIZE))
		unlock_page(page);
}

/*
 *	xfs_buf_iorequest -- the core I/O request routine.
 */
int
xfs_buf_iorequest(			/* start real I/O		*/
	xfs_buf_t		*bp)	/* buffer to convey to device	*/
{
	XB_TRACE(bp, "iorequest", 0);

	if (bp->b_flags & XBF_DELWRI) {
		xfs_buf_delwri_queue(bp, 1);
		return 0;
	}

	if (bp->b_flags & XBF_WRITE) {
		xfs_buf_wait_unpin(bp);
	}

	xfs_buf_hold(bp);

	/* Set the count to 1 initially, this will stop an I/O
	 * completion callout which happens before we have started
	 * all the I/O from calling xfs_buf_ioend too early.
	 */
	atomic_set(&bp->b_io_remaining, 1);

	if (xfs_io_bypass && (bp->b_flags & XBF_DIRECTIO)) {
        	request_queue_t	*q;
		io_private_t	*iop;
		int		error = 0;

		q = blk_get_queue(bp->b_target->bt_dev);
	    	if (q && q->queuedata &&
		    XIO_MAGIC == (*(unsigned int *)q->queuedata)) {
			iop = (io_private_t *)q->queuedata;
			if (iop->map_io_request) {
				int index;

				int locking = _xfs_buf_iolocked(bp);
				if (bp->b_flags & XBF_WRITE)
					if (locking && !bp->b_locked) {
						for (index = 0; index < bp->b_page_count; index++)
							lock_page(bp->b_pages[index]);
						bp->b_locked = 1;
					}
				error = iop->map_io_request((void *)bp);
				if (error)
					xfs_buf_ioerror(bp, error);
				bp->b_flags &= ~XBF_DIRECTIO;
				_xfs_buf_ioend(bp, 0);
				xfs_buf_rele(bp);
				return 0;
			}
		}
	}

	_xfs_buf_ioapply(bp);
	_xfs_buf_ioend(bp, 0);

	xfs_buf_rele(bp);
	return 0;
}

/*
 *	Waits for I/O to complete on the buffer supplied.
 *	It returns immediately if no I/O is pending.
 *	It returns the I/O error code, if any, or 0 if there was no error.
 */
int
xfs_buf_iowait(
	xfs_buf_t		*bp)
{
	XB_TRACE(bp, "iowait", 0);
	if (atomic_read(&bp->b_io_remaining))
		run_task_queue(&tq_disk);
	if ((bp->b_flags & XBF_FS_DATAIOD))
		xfs_buf_runall_queues(xfs_buf_dataiodone_tq);
	down(&bp->b_iodonesema);
	XB_TRACE(bp, "iowaited", (long)bp->b_error);
	return bp->b_error;
}

xfs_caddr_t
xfs_buf_offset(
	xfs_buf_t		*bp,
	size_t			offset)
{
	struct page		*page;

	if (bp->b_flags & XBF_MAPPED)
		return XFS_BUF_PTR(bp) + offset;

	offset += bp->b_offset;
	page = bp->b_pages[offset >> PAGE_CACHE_SHIFT];
	return (xfs_caddr_t)page_address(page) + (offset & (PAGE_CACHE_SIZE-1));
}

/*
 *	Move data into or out of a buffer.
 */
void
xfs_buf_iomove(
	xfs_buf_t		*bp,	/* buffer to process		*/
	size_t			boff,	/* starting buffer offset	*/
	size_t			bsize,	/* length to copy		*/
	caddr_t			data,	/* data address			*/
	xfs_buf_rw_t		mode)	/* read/write/zero flag		*/
{
	size_t			bend, cpoff, csize;
	struct page		*page;

	bend = boff + bsize;
	while (boff < bend) {
		page = bp->b_pages[xfs_buf_btoct(boff + bp->b_offset)];
		cpoff = xfs_buf_poff(boff + bp->b_offset);
		csize = min_t(size_t,
			      PAGE_CACHE_SIZE-cpoff, bp->b_count_desired-boff);

		ASSERT(((csize + cpoff) <= PAGE_CACHE_SIZE));

		switch (mode) {
		case XBRW_ZERO:
			memset(page_address(page) + cpoff, 0, csize);
			break;
		case XBRW_READ:
			memcpy(data, page_address(page) + cpoff, csize);
			break;
		case XBRW_WRITE:
			memcpy(page_address(page) + cpoff, data, csize);
		}

		boff += csize;
		data += csize;
	}
}

/*
 *	Applies _xfs_buf_page_apply to each page of the xfs_buf_t.
 */
STATIC void
_xfs_buf_ioapply(			/* apply function to pages	*/
	xfs_buf_t		*bp)	/* buffer to examine		*/
{
	int			index;
	xfs_off_t		buffer_offset = bp->b_file_offset;
	size_t			buffer_len = bp->b_count_desired;
	size_t			page_offset, len;
	size_t			cur_offset, cur_len;

	cur_offset = bp->b_offset;
	cur_len = buffer_len;

	if (!bp->b_locked && !(bp->b_flags & XBF_DIRECTIO) &&
	    (bp->b_target->bt_bsize < PAGE_CACHE_SIZE)) {
		for (index = 0; index < bp->b_page_count; index++)
			lock_page(bp->b_pages[index]);
		bp->b_locked = 1;
	}

	for (index = 0; index < bp->b_page_count; index++) {
		if (cur_len == 0)
			break;
		if (cur_offset >= PAGE_CACHE_SIZE) {
			cur_offset -= PAGE_CACHE_SIZE;
			continue;
		}

		page_offset = cur_offset;
		cur_offset = 0;

		len = PAGE_CACHE_SIZE - page_offset;
		if (len > cur_len)
			len = cur_len;
		cur_len -= len;

		_xfs_buf_page_apply(bp, buffer_offset,
				bp->b_pages[index], page_offset, len,
				index + 1 == bp->b_page_count);
		buffer_offset += len;
		buffer_len -= len;
	}

	/*
	 * Run the block device task queue here, while we have
	 * a hold on the xfs_buf (important to have that hold).
	 */
	if (bp->b_flags & _XBF_RUN_QUEUES) {
		bp->b_flags &= ~_XBF_RUN_QUEUES;
		if (atomic_read(&bp->b_io_remaining) > 1)
			run_task_queue(&tq_disk);
	}
}


/*
 *	Delayed write buffer list handling
 */
STATIC void
xfs_buf_delwri_queue(
	xfs_buf_t		*bp,
	int			unlock)
{
	struct list_head	*dwq = &bp->b_target->bt_delwrite_queue;
	spinlock_t		*dwlk = &bp->b_target->bt_delwrite_lock;

	XB_TRACE(bp, "delwri_q", (long)unlock);
	ASSERT((bp->b_flags & (XBF_DELWRI|XBF_ASYNC)) ==
					(XBF_DELWRI|XBF_ASYNC));

	spin_lock(dwlk);
	/* If already in the queue, dequeue and place at tail */
	if (!list_empty(&bp->b_list)) {
		ASSERT(bp->b_flags & _XBF_DELWRI_Q);
		if (unlock)
			atomic_dec(&bp->b_hold);
		list_del(&bp->b_list);
	}

	bp->b_flags |= _XBF_DELWRI_Q;
	list_add_tail(&bp->b_list, dwq);
	bp->b_queuetime = jiffies;
	spin_unlock(dwlk);

	if (unlock)
		xfs_buf_unlock(bp);
}

void
xfs_buf_delwri_dequeue(
	xfs_buf_t		*bp)
{
	spinlock_t		*dwlk = &bp->b_target->bt_delwrite_lock;
	int			dequeued = 0;

	spin_lock(dwlk);
	if ((bp->b_flags & XBF_DELWRI) && !list_empty(&bp->b_list)) {
		ASSERT(bp->b_flags & _XBF_DELWRI_Q);
		list_del_init(&bp->b_list);
		dequeued = 1;
	}
	bp->b_flags &= ~(XBF_DELWRI|_XBF_DELWRI_Q);
	spin_unlock(dwlk);

	if (dequeued)
		xfs_buf_rele(bp);

	XB_TRACE(bp, "delwri_dq", (long)dequeued);
}


/*
 *	The xfs_buf iodone daemons
 */

STATIC int
xfs_buf_iodone_daemon(
	void			*__bind_cpu,
	const char		*name,
	int			xfs_buf_daemons[],
	struct list_head	xfs_buf_iodone_tq[],
	wait_queue_head_t	xfs_buf_iodone_wait[])
{
	int			bind_cpu, cpu;
	DECLARE_WAITQUEUE	(wait, current);

	bind_cpu = (int) (long)__bind_cpu;
	cpu = CPU_TO_DAEMON(cpu_logical_map(bind_cpu));

	/*  Set up the thread  */
	daemonize();

	/* Avoid signals */
	sigmask_lock();
	sigfillset(&current->blocked);
	__recalc_sigpending(current);
	sigmask_unlock();

	/* Migrate to the right CPU */
	migrate_to_cpu(cpu);
#ifdef __HAVE_NEW_SCHEDULER
	if (smp_processor_id() != cpu)
		BUG();
#else
	while (smp_processor_id() != cpu)
		schedule();
#endif

	sprintf(current->comm, "%s/%d", name, bind_cpu);
	INIT_LIST_HEAD(&xfs_buf_iodone_tq[cpu]);
	init_waitqueue_head(&xfs_buf_iodone_wait[cpu]);
	__set_current_state(TASK_INTERRUPTIBLE);
	mb();

	xfs_buf_daemons[cpu] = 1;

	for (;;) {
		add_wait_queue(&xfs_buf_iodone_wait[cpu], &wait);

		if (TQ_ACTIVE(xfs_buf_iodone_tq[cpu]))
			__set_task_state(current, TASK_RUNNING);
		schedule();
		remove_wait_queue(&xfs_buf_iodone_wait[cpu], &wait);
		run_task_queue(&xfs_buf_iodone_tq[cpu]);
		if (xfs_buf_daemons[cpu] == 0)
			break;
		__set_current_state(TASK_INTERRUPTIBLE);
	}

	xfs_buf_daemons[cpu] = -1;
	wake_up_interruptible(&xfs_buf_iodone_wait[cpu]);
	return 0;
}

STATIC void
xfs_buf_runall_queues(
	struct list_head	xfs_buf_iodone_tq[])
{
	int	pcpu, cpu;

	for (cpu = 0; cpu < min(smp_num_cpus, MAX_IO_DAEMONS); cpu++) {
		pcpu = CPU_TO_DAEMON(cpu_logical_map(cpu));

		run_task_queue(&xfs_buf_iodone_tq[pcpu]);
	}
}

STATIC int
xfs_buf_logiodone_daemon(
	void			*__bind_cpu)
{
	return xfs_buf_iodone_daemon(__bind_cpu, "xfslogd", xb_logio_daemons,
			xfs_buf_logiodone_tq, xfs_buf_logiodone_wait);
}

STATIC int
xfs_buf_dataiodone_daemon(
	void			*__bind_cpu)
{
	return xfs_buf_iodone_daemon(__bind_cpu, "xfsdatad", xb_dataio_daemons,
			xfs_buf_dataiodone_tq, xfs_buf_dataiodone_wait);
}


STATIC int
xfs_buf_daemon_wakeup(
	int			priority,
	unsigned int		mask)
{
	xfs_buftarg_t		*btp, *n;

	spin_lock(&xfs_buftarg_lock);
	list_for_each_entry_safe(btp, n, &xfs_buftarg_list, bt_list) {
		set_bit(XBT_FORCE_FLUSH, &btp->bt_flags);
		barrier();
		wake_up_process(btp->bt_task);
	}
	spin_unlock(&xfs_buftarg_lock);
	return 0;
}

STATIC int
xfs_buf_daemon(
	void			*data)
{
	struct list_head	tmp;
	unsigned long		age;
	xfs_buf_t		*bp, *n;
	int			count;
	xfs_buftarg_t		*target = (xfs_buftarg_t *)data;
	struct list_head	*dwq = &target->bt_delwrite_queue;
	spinlock_t		*dwlk = &target->bt_delwrite_lock;

	/*  Set up the thread  */
	daemonize();

	/* Mark it active */
	target->bt_task = current;
	set_bit(XBT_ACTIVE, &target->bt_flags);
	barrier();

	/* Avoid signals */
	sigmask_lock();
	sigfillset(&current->blocked);
	__recalc_sigpending(current);
	sigmask_unlock();

	strcpy(current->comm, "xfsbufd");
	current->flags |= PF_MEMALLOC;

	INIT_LIST_HEAD(&tmp);
	do {
		set_current_state(TASK_INTERRUPTIBLE);
		schedule_timeout((xfs_buf_timer_centisecs * HZ) / 100);

		count = 0;
		age = (xfs_buf_age_centisecs * HZ) / 100;
		spin_lock(dwlk);
		list_for_each_entry_safe(bp, n, dwq, b_list) {
			XB_TRACE(bp, "walkq1", (long)xfs_buf_ispin(bp));
			ASSERT(bp->b_flags & XBF_DELWRI);

			if (!xfs_buf_ispin(bp) && !xfs_buf_cond_lock(bp)) {
				if (!test_bit(XBT_FORCE_FLUSH,
						&target->bt_flags) &&
				    time_before(jiffies,
						bp->b_queuetime + age)) {
					xfs_buf_unlock(bp);
					break;
				}

				bp->b_flags &= ~(XBF_DELWRI|_XBF_DELWRI_Q|
						 _XBF_RUN_QUEUES);
				bp->b_flags |= XBF_WRITE;
				list_move_tail(&bp->b_list, &tmp);
				count++;
			}
		}
		spin_unlock(dwlk);

		while (!list_empty(&tmp)) {
			bp = list_entry(tmp.next, xfs_buf_t, b_list);
			ASSERT(target == bp->b_target);
			list_del_init(&bp->b_list);
			xfs_buf_iostrategy(bp);
		}

		if (as_list_len > 0)
			purge_addresses();
		if (count)
			run_task_queue(&tq_disk);

		clear_bit(XBT_FORCE_FLUSH, &target->bt_flags);
	} while (test_bit(XBT_ACTIVE, &target->bt_flags));

	complete_and_exit(&target->bt_done, 0);
}

/*
 *	Go through all incore buffers, and release buffers if they belong
 *	to the given device. This is used in filesystem error handling to
 *	preserve the consistency of its metadata.
 */
int
xfs_flush_buftarg(
	xfs_buftarg_t		*target,
	int			wait)
{
	struct list_head	tmp;
	xfs_buf_t		*bp, *n;
	int			pincount = 0;
	int			flush_cnt = 0;
	struct list_head	*dwq = &target->bt_delwrite_queue;
	spinlock_t		*dwlk = &target->bt_delwrite_lock;

	xfs_buf_runall_queues(xfs_buf_dataiodone_tq);
	xfs_buf_runall_queues(xfs_buf_logiodone_tq);

	INIT_LIST_HEAD(&tmp);
	spin_lock(dwlk);
	list_for_each_entry_safe(bp, n, dwq, b_list) {

		ASSERT(bp->b_target == target);
		ASSERT(bp->b_flags & (XBF_DELWRI|_XBF_DELWRI_Q));
		XB_TRACE(bp, "walkq2", (long)xfs_buf_ispin(bp));
		if (xfs_buf_ispin(bp)) {
			pincount++;
			continue;
		}

		list_move_tail(&bp->b_list, &tmp);
	}
	spin_unlock(dwlk);

	/*
	 * Dropped the delayed write list lock, now walk the temporary list
	 */
	list_for_each_entry_safe(bp, n, &tmp, b_list) {
		xfs_buf_lock(bp);
		bp->b_flags &= ~(XBF_DELWRI|_XBF_DELWRI_Q|_XBF_RUN_QUEUES);
		bp->b_flags |= XBF_WRITE;
		if (wait)
			bp->b_flags &= ~XBF_ASYNC;
		else
			list_del_init(&bp->b_list);

		xfs_buf_iostrategy(bp);

		if (++flush_cnt > 32) {
			run_task_queue(&tq_disk);
			flush_cnt = 0;
		}
	}

	run_task_queue(&tq_disk);

	/*
	 * Remaining list items must be flushed before returning
	 */
	while (!list_empty(&tmp)) {
		bp = list_entry(tmp.next, xfs_buf_t, b_list);

		list_del_init(&bp->b_list);

		xfs_iowait(bp);
		xfs_buf_relse(bp);
	}

	return pincount;
}

STATIC int
xfs_buf_daemon_start(void)
{
	int		cpu, pcpu;

	for (cpu = 0; cpu < min(smp_num_cpus, MAX_IO_DAEMONS); cpu++) {
		pcpu = CPU_TO_DAEMON(cpu_logical_map(cpu));

		if (kernel_thread(xfs_buf_logiodone_daemon,
				(void *)(long) cpu,
				CLONE_FS|CLONE_FILES|CLONE_VM) < 0) {
			printk("xfs_buf_logiodone daemon failed to start\n");
		} else {
			while (!xb_logio_daemons[pcpu])
				yield();
		}
	}
	for (cpu = 0; cpu < min(smp_num_cpus, MAX_IO_DAEMONS); cpu++) {
		pcpu = CPU_TO_DAEMON(cpu_logical_map(cpu));

		if (kernel_thread(xfs_buf_dataiodone_daemon,
				(void *)(long) cpu,
				CLONE_FS|CLONE_FILES|CLONE_VM) < 0) {
			printk("xfs_buf_dataiodone daemon failed to start\n");
		} else {
			while (!xb_dataio_daemons[pcpu])
				yield();
		}
	}
	return 0;
}

/*
 *	Note: do not mark as __exit, it is called from xfs_buf_terminate.
 */
STATIC void
xfs_buf_daemon_stop(void)
{
	int		cpu, pcpu;

	for (pcpu = 0; pcpu < min(smp_num_cpus, MAX_IO_DAEMONS); pcpu++) {
		cpu = CPU_TO_DAEMON(cpu_logical_map(pcpu));

		xb_logio_daemons[cpu] = 0;
		wake_up(&xfs_buf_logiodone_wait[cpu]);
		wait_event_interruptible(xfs_buf_logiodone_wait[cpu],
				xb_logio_daemons[cpu] == -1);

		xb_dataio_daemons[cpu] = 0;
		wake_up(&xfs_buf_dataiodone_wait[cpu]);
		wait_event_interruptible(xfs_buf_dataiodone_wait[cpu],
				xb_dataio_daemons[cpu] == -1);
	}
}

/*
 *	Initialization and Termination
 */

int __init
xfs_buf_init(void)
{
	xfs_buf_zone = kmem_zone_init_flags(sizeof(xfs_buf_t), "xfs_buf_t",
						KM_ZONE_HWALIGN, NULL);
	if (!xfs_buf_zone) {
		printk("XFS: couldn't init xfs_buf_t cache\n");
		return -ENOMEM;
	}

	if (_xfs_buf_prealloc_bh(NR_RESERVED_BH) < NR_RESERVED_BH) {
		printk("XFS: couldn't allocate %d reserved buffers\n",
			NR_RESERVED_BH);
		kmem_zone_destroy(xfs_buf_zone);
		return -ENOMEM;
	}
	init_waitqueue_head(&xb_resv_bh_wait);

#ifdef XFS_BUF_TRACE
	xfs_buf_trace_buf = ktrace_alloc(XFS_BUF_TRACE_SIZE, KM_SLEEP);
#endif

	xfs_buf_daemon_start();

	xfs_buf_shake = kmem_shake_register(xfs_buf_daemon_wakeup);
	if (xfs_buf_shake == NULL) {
		xfs_buf_terminate();
		return -ENOMEM;
	}

	return 0;
}

/*
 *	Note: do not mark as __exit, this is also called from the __init code.
 */
void
xfs_buf_terminate(void)
{
	xfs_buf_daemon_stop();

#ifdef XFS_BUF_TRACE
	ktrace_free(xfs_buf_trace_buf);
#endif

	kmem_zone_destroy(xfs_buf_zone);
	kmem_shake_deregister(xfs_buf_shake);
}