Merge branch 'for-linus' of git://oss.sgi.com/xfs/xfs

* 'for-linus' of git://oss.sgi.com/xfs/xfs:
  xfs: use proper interfaces for on-stack plugging
  xfs: fix xfs_debug warnings
  xfs: fix variable set but not used warnings
  xfs: convert log tail checking to a warning
  xfs: catch bad block numbers freeing extents.
  xfs: push the AIL from memory reclaim and periodic sync
  xfs: clean up code layout in xfs_trans_ail.c
  xfs: convert the xfsaild threads to a workqueue
  xfs: introduce background inode reclaim work
  xfs: convert ENOSPC inode flushing to use new syncd workqueue
  xfs: introduce a xfssyncd workqueue
  xfs: fix extent format buffer allocation size
  xfs: fix unreferenced var error in xfs_buf.c

Also, applied patch from Tony Luck that fixes ia64:
  xfs_destroy_workqueues() should not be tagged with__exit
in the branch before merging.
diff --git a/fs/xfs/linux-2.6/xfs_buf.c b/fs/xfs/linux-2.6/xfs_buf.c
index 5ea4020..9ef9ed2 100644
--- a/fs/xfs/linux-2.6/xfs_buf.c
+++ b/fs/xfs/linux-2.6/xfs_buf.c
@@ -293,7 +293,6 @@
 	size_t			nbytes, offset;
 	gfp_t			gfp_mask = xb_to_gfp(flags);
 	unsigned short		page_count, i;
-	pgoff_t			first;
 	xfs_off_t		end;
 	int			error;
 
@@ -333,7 +332,6 @@
 		return error;
 
 	offset = bp->b_offset;
-	first = bp->b_file_offset >> PAGE_SHIFT;
 	bp->b_flags |= _XBF_PAGES;
 
 	for (i = 0; i < bp->b_page_count; i++) {
@@ -657,8 +655,6 @@
 	xfs_off_t		ioff,
 	size_t			isize)
 {
-	struct backing_dev_info *bdi;
-
 	if (bdi_read_congested(target->bt_bdi))
 		return;
 
@@ -919,8 +915,6 @@
 
 	if (atomic_read(&bp->b_pin_count) && (bp->b_flags & XBF_STALE))
 		xfs_log_force(bp->b_target->bt_mount, 0);
-	if (atomic_read(&bp->b_io_remaining))
-		blk_flush_plug(current);
 	down(&bp->b_sema);
 	XB_SET_OWNER(bp);
 
@@ -1309,8 +1303,6 @@
 {
 	trace_xfs_buf_iowait(bp, _RET_IP_);
 
-	if (atomic_read(&bp->b_io_remaining))
-		blk_flush_plug(current);
 	wait_for_completion(&bp->b_iowait);
 
 	trace_xfs_buf_iowait_done(bp, _RET_IP_);
@@ -1747,8 +1739,8 @@
 	do {
 		long	age = xfs_buf_age_centisecs * msecs_to_jiffies(10);
 		long	tout = xfs_buf_timer_centisecs * msecs_to_jiffies(10);
-		int	count = 0;
 		struct list_head tmp;
+		struct blk_plug plug;
 
 		if (unlikely(freezing(current))) {
 			set_bit(XBT_FORCE_SLEEP, &target->bt_flags);
@@ -1764,16 +1756,15 @@
 
 		xfs_buf_delwri_split(target, &tmp, age);
 		list_sort(NULL, &tmp, xfs_buf_cmp);
+
+		blk_start_plug(&plug);
 		while (!list_empty(&tmp)) {
 			struct xfs_buf *bp;
 			bp = list_first_entry(&tmp, struct xfs_buf, b_list);
 			list_del_init(&bp->b_list);
 			xfs_bdstrat_cb(bp);
-			count++;
 		}
-		if (count)
-			blk_flush_plug(current);
-
+		blk_finish_plug(&plug);
 	} while (!kthread_should_stop());
 
 	return 0;
@@ -1793,6 +1784,7 @@
 	int		pincount = 0;
 	LIST_HEAD(tmp_list);
 	LIST_HEAD(wait_list);
+	struct blk_plug plug;
 
 	xfs_buf_runall_queues(xfsconvertd_workqueue);
 	xfs_buf_runall_queues(xfsdatad_workqueue);
@@ -1807,6 +1799,8 @@
 	 * we do that after issuing all the IO.
 	 */
 	list_sort(NULL, &tmp_list, xfs_buf_cmp);
+
+	blk_start_plug(&plug);
 	while (!list_empty(&tmp_list)) {
 		bp = list_first_entry(&tmp_list, struct xfs_buf, b_list);
 		ASSERT(target == bp->b_target);
@@ -1817,10 +1811,10 @@
 		}
 		xfs_bdstrat_cb(bp);
 	}
+	blk_finish_plug(&plug);
 
 	if (wait) {
-		/* Expedite and wait for IO to complete. */
-		blk_flush_plug(current);
+		/* Wait for IO to complete. */
 		while (!list_empty(&wait_list)) {
 			bp = list_first_entry(&wait_list, struct xfs_buf, b_list);
 
diff --git a/fs/xfs/linux-2.6/xfs_message.c b/fs/xfs/linux-2.6/xfs_message.c
index 508e06f..3ca7956 100644
--- a/fs/xfs/linux-2.6/xfs_message.c
+++ b/fs/xfs/linux-2.6/xfs_message.c
@@ -28,53 +28,47 @@
 /*
  * XFS logging functions
  */
-static int
+static void
 __xfs_printk(
 	const char		*level,
 	const struct xfs_mount	*mp,
 	struct va_format	*vaf)
 {
 	if (mp && mp->m_fsname)
-		return printk("%sXFS (%s): %pV\n", level, mp->m_fsname, vaf);
-	return printk("%sXFS: %pV\n", level, vaf);
+		printk("%sXFS (%s): %pV\n", level, mp->m_fsname, vaf);
+	printk("%sXFS: %pV\n", level, vaf);
 }
 
-int xfs_printk(
+void xfs_printk(
 	const char		*level,
 	const struct xfs_mount	*mp,
 	const char		*fmt, ...)
 {
 	struct va_format	vaf;
 	va_list			args;
-	int			 r;
 
 	va_start(args, fmt);
 
 	vaf.fmt = fmt;
 	vaf.va = &args;
 
-	r = __xfs_printk(level, mp, &vaf);
+	__xfs_printk(level, mp, &vaf);
 	va_end(args);
-
-	return r;
 }
 
 #define define_xfs_printk_level(func, kern_level)		\
-int func(const struct xfs_mount *mp, const char *fmt, ...)	\
+void func(const struct xfs_mount *mp, const char *fmt, ...)	\
 {								\
 	struct va_format	vaf;				\
 	va_list			args;				\
-	int			r;				\
 								\
 	va_start(args, fmt);					\
 								\
 	vaf.fmt = fmt;						\
 	vaf.va = &args;						\
 								\
-	r = __xfs_printk(kern_level, mp, &vaf);			\
+	__xfs_printk(kern_level, mp, &vaf);			\
 	va_end(args);						\
-								\
-	return r;						\
 }								\
 
 define_xfs_printk_level(xfs_emerg, KERN_EMERG);
@@ -88,7 +82,7 @@
 define_xfs_printk_level(xfs_debug, KERN_DEBUG);
 #endif
 
-int
+void
 xfs_alert_tag(
 	const struct xfs_mount	*mp,
 	int			panic_tag,
@@ -97,7 +91,6 @@
 	struct va_format	vaf;
 	va_list			args;
 	int			do_panic = 0;
-	int			r;
 
 	if (xfs_panic_mask && (xfs_panic_mask & panic_tag)) {
 		xfs_printk(KERN_ALERT, mp,
@@ -110,12 +103,10 @@
 	vaf.fmt = fmt;
 	vaf.va = &args;
 
-	r = __xfs_printk(KERN_ALERT, mp, &vaf);
+	__xfs_printk(KERN_ALERT, mp, &vaf);
 	va_end(args);
 
 	BUG_ON(do_panic);
-
-	return r;
 }
 
 void
diff --git a/fs/xfs/linux-2.6/xfs_message.h b/fs/xfs/linux-2.6/xfs_message.h
index e77ffa1..f1b3fc1 100644
--- a/fs/xfs/linux-2.6/xfs_message.h
+++ b/fs/xfs/linux-2.6/xfs_message.h
@@ -3,32 +3,34 @@
 
 struct xfs_mount;
 
-extern int xfs_printk(const char *level, const struct xfs_mount *mp,
+extern void xfs_printk(const char *level, const struct xfs_mount *mp,
                       const char *fmt, ...)
         __attribute__ ((format (printf, 3, 4)));
-extern int xfs_emerg(const struct xfs_mount *mp, const char *fmt, ...)
+extern void xfs_emerg(const struct xfs_mount *mp, const char *fmt, ...)
         __attribute__ ((format (printf, 2, 3)));
-extern int xfs_alert(const struct xfs_mount *mp, const char *fmt, ...)
+extern void xfs_alert(const struct xfs_mount *mp, const char *fmt, ...)
         __attribute__ ((format (printf, 2, 3)));
-extern int xfs_alert_tag(const struct xfs_mount *mp, int tag,
+extern void xfs_alert_tag(const struct xfs_mount *mp, int tag,
 			 const char *fmt, ...)
         __attribute__ ((format (printf, 3, 4)));
-extern int xfs_crit(const struct xfs_mount *mp, const char *fmt, ...)
+extern void xfs_crit(const struct xfs_mount *mp, const char *fmt, ...)
         __attribute__ ((format (printf, 2, 3)));
-extern int xfs_err(const struct xfs_mount *mp, const char *fmt, ...)
+extern void xfs_err(const struct xfs_mount *mp, const char *fmt, ...)
         __attribute__ ((format (printf, 2, 3)));
-extern int xfs_warn(const struct xfs_mount *mp, const char *fmt, ...)
+extern void xfs_warn(const struct xfs_mount *mp, const char *fmt, ...)
         __attribute__ ((format (printf, 2, 3)));
-extern int xfs_notice(const struct xfs_mount *mp, const char *fmt, ...)
+extern void xfs_notice(const struct xfs_mount *mp, const char *fmt, ...)
         __attribute__ ((format (printf, 2, 3)));
-extern int xfs_info(const struct xfs_mount *mp, const char *fmt, ...)
+extern void xfs_info(const struct xfs_mount *mp, const char *fmt, ...)
         __attribute__ ((format (printf, 2, 3)));
 
 #ifdef DEBUG
-extern int xfs_debug(const struct xfs_mount *mp, const char *fmt, ...)
+extern void xfs_debug(const struct xfs_mount *mp, const char *fmt, ...)
         __attribute__ ((format (printf, 2, 3)));
 #else
-#define xfs_debug(mp, fmt, ...)	(0)
+static inline void xfs_debug(const struct xfs_mount *mp, const char *fmt, ...)
+{
+}
 #endif
 
 extern void assfail(char *expr, char *f, int l);
diff --git a/fs/xfs/linux-2.6/xfs_super.c b/fs/xfs/linux-2.6/xfs_super.c
index 1ba5c45..b38e58d 100644
--- a/fs/xfs/linux-2.6/xfs_super.c
+++ b/fs/xfs/linux-2.6/xfs_super.c
@@ -816,75 +816,6 @@
 	return 0;
 }
 
-/*
- * XFS AIL push thread support
- */
-void
-xfsaild_wakeup(
-	struct xfs_ail		*ailp,
-	xfs_lsn_t		threshold_lsn)
-{
-	/* only ever move the target forwards */
-	if (XFS_LSN_CMP(threshold_lsn, ailp->xa_target) > 0) {
-		ailp->xa_target = threshold_lsn;
-		wake_up_process(ailp->xa_task);
-	}
-}
-
-STATIC int
-xfsaild(
-	void	*data)
-{
-	struct xfs_ail	*ailp = data;
-	xfs_lsn_t	last_pushed_lsn = 0;
-	long		tout = 0; /* milliseconds */
-
-	while (!kthread_should_stop()) {
-		/*
-		 * for short sleeps indicating congestion, don't allow us to
-		 * get woken early. Otherwise all we do is bang on the AIL lock
-		 * without making progress.
-		 */
-		if (tout && tout <= 20)
-			__set_current_state(TASK_KILLABLE);
-		else
-			__set_current_state(TASK_INTERRUPTIBLE);
-		schedule_timeout(tout ?
-				 msecs_to_jiffies(tout) : MAX_SCHEDULE_TIMEOUT);
-
-		/* swsusp */
-		try_to_freeze();
-
-		ASSERT(ailp->xa_mount->m_log);
-		if (XFS_FORCED_SHUTDOWN(ailp->xa_mount))
-			continue;
-
-		tout = xfsaild_push(ailp, &last_pushed_lsn);
-	}
-
-	return 0;
-}	/* xfsaild */
-
-int
-xfsaild_start(
-	struct xfs_ail	*ailp)
-{
-	ailp->xa_target = 0;
-	ailp->xa_task = kthread_run(xfsaild, ailp, "xfsaild/%s",
-				    ailp->xa_mount->m_fsname);
-	if (IS_ERR(ailp->xa_task))
-		return -PTR_ERR(ailp->xa_task);
-	return 0;
-}
-
-void
-xfsaild_stop(
-	struct xfs_ail	*ailp)
-{
-	kthread_stop(ailp->xa_task);
-}
-
-
 /* Catch misguided souls that try to use this interface on XFS */
 STATIC struct inode *
 xfs_fs_alloc_inode(
@@ -1191,22 +1122,12 @@
 		return -error;
 
 	if (laptop_mode) {
-		int	prev_sync_seq = mp->m_sync_seq;
-
 		/*
 		 * The disk must be active because we're syncing.
 		 * We schedule xfssyncd now (now that the disk is
 		 * active) instead of later (when it might not be).
 		 */
-		wake_up_process(mp->m_sync_task);
-		/*
-		 * We have to wait for the sync iteration to complete.
-		 * If we don't, the disk activity caused by the sync
-		 * will come after the sync is completed, and that
-		 * triggers another sync from laptop mode.
-		 */
-		wait_event(mp->m_wait_single_sync_task,
-				mp->m_sync_seq != prev_sync_seq);
+		flush_delayed_work_sync(&mp->m_sync_work);
 	}
 
 	return 0;
@@ -1490,9 +1411,6 @@
 	spin_lock_init(&mp->m_sb_lock);
 	mutex_init(&mp->m_growlock);
 	atomic_set(&mp->m_active_trans, 0);
-	INIT_LIST_HEAD(&mp->m_sync_list);
-	spin_lock_init(&mp->m_sync_lock);
-	init_waitqueue_head(&mp->m_wait_single_sync_task);
 
 	mp->m_super = sb;
 	sb->s_fs_info = mp;
@@ -1799,6 +1717,38 @@
 }
 
 STATIC int __init
+xfs_init_workqueues(void)
+{
+	/*
+	 * max_active is set to 8 to give enough concurency to allow
+	 * multiple work operations on each CPU to run. This allows multiple
+	 * filesystems to be running sync work concurrently, and scales with
+	 * the number of CPUs in the system.
+	 */
+	xfs_syncd_wq = alloc_workqueue("xfssyncd", WQ_CPU_INTENSIVE, 8);
+	if (!xfs_syncd_wq)
+		goto out;
+
+	xfs_ail_wq = alloc_workqueue("xfsail", WQ_CPU_INTENSIVE, 8);
+	if (!xfs_ail_wq)
+		goto out_destroy_syncd;
+
+	return 0;
+
+out_destroy_syncd:
+	destroy_workqueue(xfs_syncd_wq);
+out:
+	return -ENOMEM;
+}
+
+STATIC void
+xfs_destroy_workqueues(void)
+{
+	destroy_workqueue(xfs_ail_wq);
+	destroy_workqueue(xfs_syncd_wq);
+}
+
+STATIC int __init
 init_xfs_fs(void)
 {
 	int			error;
@@ -1813,10 +1763,14 @@
 	if (error)
 		goto out;
 
-	error = xfs_mru_cache_init();
+	error = xfs_init_workqueues();
 	if (error)
 		goto out_destroy_zones;
 
+	error = xfs_mru_cache_init();
+	if (error)
+		goto out_destroy_wq;
+
 	error = xfs_filestream_init();
 	if (error)
 		goto out_mru_cache_uninit;
@@ -1833,6 +1787,10 @@
 	if (error)
 		goto out_cleanup_procfs;
 
+	error = xfs_init_workqueues();
+	if (error)
+		goto out_sysctl_unregister;
+
 	vfs_initquota();
 
 	error = register_filesystem(&xfs_fs_type);
@@ -1850,6 +1808,8 @@
 	xfs_filestream_uninit();
  out_mru_cache_uninit:
 	xfs_mru_cache_uninit();
+ out_destroy_wq:
+	xfs_destroy_workqueues();
  out_destroy_zones:
 	xfs_destroy_zones();
  out:
@@ -1866,6 +1826,7 @@
 	xfs_buf_terminate();
 	xfs_filestream_uninit();
 	xfs_mru_cache_uninit();
+	xfs_destroy_workqueues();
 	xfs_destroy_zones();
 }
 
diff --git a/fs/xfs/linux-2.6/xfs_sync.c b/fs/xfs/linux-2.6/xfs_sync.c
index 9cf35a6..e4f9c1b 100644
--- a/fs/xfs/linux-2.6/xfs_sync.c
+++ b/fs/xfs/linux-2.6/xfs_sync.c
@@ -22,6 +22,7 @@
 #include "xfs_log.h"
 #include "xfs_inum.h"
 #include "xfs_trans.h"
+#include "xfs_trans_priv.h"
 #include "xfs_sb.h"
 #include "xfs_ag.h"
 #include "xfs_mount.h"
@@ -39,6 +40,8 @@
 #include <linux/kthread.h>
 #include <linux/freezer.h>
 
+struct workqueue_struct	*xfs_syncd_wq;	/* sync workqueue */
+
 /*
  * The inode lookup is done in batches to keep the amount of lock traffic and
  * radix tree lookups to a minimum. The batch size is a trade off between
@@ -431,62 +434,12 @@
 	xfs_unmountfs_writesb(mp);
 }
 
-/*
- * Enqueue a work item to be picked up by the vfs xfssyncd thread.
- * Doing this has two advantages:
- * - It saves on stack space, which is tight in certain situations
- * - It can be used (with care) as a mechanism to avoid deadlocks.
- * Flushing while allocating in a full filesystem requires both.
- */
-STATIC void
-xfs_syncd_queue_work(
-	struct xfs_mount *mp,
-	void		*data,
-	void		(*syncer)(struct xfs_mount *, void *),
-	struct completion *completion)
+static void
+xfs_syncd_queue_sync(
+	struct xfs_mount        *mp)
 {
-	struct xfs_sync_work *work;
-
-	work = kmem_alloc(sizeof(struct xfs_sync_work), KM_SLEEP);
-	INIT_LIST_HEAD(&work->w_list);
-	work->w_syncer = syncer;
-	work->w_data = data;
-	work->w_mount = mp;
-	work->w_completion = completion;
-	spin_lock(&mp->m_sync_lock);
-	list_add_tail(&work->w_list, &mp->m_sync_list);
-	spin_unlock(&mp->m_sync_lock);
-	wake_up_process(mp->m_sync_task);
-}
-
-/*
- * Flush delayed allocate data, attempting to free up reserved space
- * from existing allocations.  At this point a new allocation attempt
- * has failed with ENOSPC and we are in the process of scratching our
- * heads, looking about for more room...
- */
-STATIC void
-xfs_flush_inodes_work(
-	struct xfs_mount *mp,
-	void		*arg)
-{
-	struct inode	*inode = arg;
-	xfs_sync_data(mp, SYNC_TRYLOCK);
-	xfs_sync_data(mp, SYNC_TRYLOCK | SYNC_WAIT);
-	iput(inode);
-}
-
-void
-xfs_flush_inodes(
-	xfs_inode_t	*ip)
-{
-	struct inode	*inode = VFS_I(ip);
-	DECLARE_COMPLETION_ONSTACK(completion);
-
-	igrab(inode);
-	xfs_syncd_queue_work(ip->i_mount, inode, xfs_flush_inodes_work, &completion);
-	wait_for_completion(&completion);
-	xfs_log_force(ip->i_mount, XFS_LOG_SYNC);
+	queue_delayed_work(xfs_syncd_wq, &mp->m_sync_work,
+				msecs_to_jiffies(xfs_syncd_centisecs * 10));
 }
 
 /*
@@ -496,9 +449,10 @@
  */
 STATIC void
 xfs_sync_worker(
-	struct xfs_mount *mp,
-	void		*unused)
+	struct work_struct *work)
 {
+	struct xfs_mount *mp = container_of(to_delayed_work(work),
+					struct xfs_mount, m_sync_work);
 	int		error;
 
 	if (!(mp->m_flags & XFS_MOUNT_RDONLY)) {
@@ -508,73 +462,106 @@
 			error = xfs_fs_log_dummy(mp);
 		else
 			xfs_log_force(mp, 0);
-		xfs_reclaim_inodes(mp, 0);
 		error = xfs_qm_sync(mp, SYNC_TRYLOCK);
+
+		/* start pushing all the metadata that is currently dirty */
+		xfs_ail_push_all(mp->m_ail);
 	}
-	mp->m_sync_seq++;
-	wake_up(&mp->m_wait_single_sync_task);
+
+	/* queue us up again */
+	xfs_syncd_queue_sync(mp);
 }
 
-STATIC int
-xfssyncd(
-	void			*arg)
+/*
+ * Queue a new inode reclaim pass if there are reclaimable inodes and there
+ * isn't a reclaim pass already in progress. By default it runs every 5s based
+ * on the xfs syncd work default of 30s. Perhaps this should have it's own
+ * tunable, but that can be done if this method proves to be ineffective or too
+ * aggressive.
+ */
+static void
+xfs_syncd_queue_reclaim(
+	struct xfs_mount        *mp)
 {
-	struct xfs_mount	*mp = arg;
-	long			timeleft;
-	xfs_sync_work_t		*work, *n;
-	LIST_HEAD		(tmp);
 
-	set_freezable();
-	timeleft = xfs_syncd_centisecs * msecs_to_jiffies(10);
-	for (;;) {
-		if (list_empty(&mp->m_sync_list))
-			timeleft = schedule_timeout_interruptible(timeleft);
-		/* swsusp */
-		try_to_freeze();
-		if (kthread_should_stop() && list_empty(&mp->m_sync_list))
-			break;
+	/*
+	 * We can have inodes enter reclaim after we've shut down the syncd
+	 * workqueue during unmount, so don't allow reclaim work to be queued
+	 * during unmount.
+	 */
+	if (!(mp->m_super->s_flags & MS_ACTIVE))
+		return;
 
-		spin_lock(&mp->m_sync_lock);
-		/*
-		 * We can get woken by laptop mode, to do a sync -
-		 * that's the (only!) case where the list would be
-		 * empty with time remaining.
-		 */
-		if (!timeleft || list_empty(&mp->m_sync_list)) {
-			if (!timeleft)
-				timeleft = xfs_syncd_centisecs *
-							msecs_to_jiffies(10);
-			INIT_LIST_HEAD(&mp->m_sync_work.w_list);
-			list_add_tail(&mp->m_sync_work.w_list,
-					&mp->m_sync_list);
-		}
-		list_splice_init(&mp->m_sync_list, &tmp);
-		spin_unlock(&mp->m_sync_lock);
-
-		list_for_each_entry_safe(work, n, &tmp, w_list) {
-			(*work->w_syncer)(mp, work->w_data);
-			list_del(&work->w_list);
-			if (work == &mp->m_sync_work)
-				continue;
-			if (work->w_completion)
-				complete(work->w_completion);
-			kmem_free(work);
-		}
+	rcu_read_lock();
+	if (radix_tree_tagged(&mp->m_perag_tree, XFS_ICI_RECLAIM_TAG)) {
+		queue_delayed_work(xfs_syncd_wq, &mp->m_reclaim_work,
+			msecs_to_jiffies(xfs_syncd_centisecs / 6 * 10));
 	}
+	rcu_read_unlock();
+}
 
-	return 0;
+/*
+ * This is a fast pass over the inode cache to try to get reclaim moving on as
+ * many inodes as possible in a short period of time. It kicks itself every few
+ * seconds, as well as being kicked by the inode cache shrinker when memory
+ * goes low. It scans as quickly as possible avoiding locked inodes or those
+ * already being flushed, and once done schedules a future pass.
+ */
+STATIC void
+xfs_reclaim_worker(
+	struct work_struct *work)
+{
+	struct xfs_mount *mp = container_of(to_delayed_work(work),
+					struct xfs_mount, m_reclaim_work);
+
+	xfs_reclaim_inodes(mp, SYNC_TRYLOCK);
+	xfs_syncd_queue_reclaim(mp);
+}
+
+/*
+ * Flush delayed allocate data, attempting to free up reserved space
+ * from existing allocations.  At this point a new allocation attempt
+ * has failed with ENOSPC and we are in the process of scratching our
+ * heads, looking about for more room.
+ *
+ * Queue a new data flush if there isn't one already in progress and
+ * wait for completion of the flush. This means that we only ever have one
+ * inode flush in progress no matter how many ENOSPC events are occurring and
+ * so will prevent the system from bogging down due to every concurrent
+ * ENOSPC event scanning all the active inodes in the system for writeback.
+ */
+void
+xfs_flush_inodes(
+	struct xfs_inode	*ip)
+{
+	struct xfs_mount	*mp = ip->i_mount;
+
+	queue_work(xfs_syncd_wq, &mp->m_flush_work);
+	flush_work_sync(&mp->m_flush_work);
+}
+
+STATIC void
+xfs_flush_worker(
+	struct work_struct *work)
+{
+	struct xfs_mount *mp = container_of(work,
+					struct xfs_mount, m_flush_work);
+
+	xfs_sync_data(mp, SYNC_TRYLOCK);
+	xfs_sync_data(mp, SYNC_TRYLOCK | SYNC_WAIT);
 }
 
 int
 xfs_syncd_init(
 	struct xfs_mount	*mp)
 {
-	mp->m_sync_work.w_syncer = xfs_sync_worker;
-	mp->m_sync_work.w_mount = mp;
-	mp->m_sync_work.w_completion = NULL;
-	mp->m_sync_task = kthread_run(xfssyncd, mp, "xfssyncd/%s", mp->m_fsname);
-	if (IS_ERR(mp->m_sync_task))
-		return -PTR_ERR(mp->m_sync_task);
+	INIT_WORK(&mp->m_flush_work, xfs_flush_worker);
+	INIT_DELAYED_WORK(&mp->m_sync_work, xfs_sync_worker);
+	INIT_DELAYED_WORK(&mp->m_reclaim_work, xfs_reclaim_worker);
+
+	xfs_syncd_queue_sync(mp);
+	xfs_syncd_queue_reclaim(mp);
+
 	return 0;
 }
 
@@ -582,7 +569,9 @@
 xfs_syncd_stop(
 	struct xfs_mount	*mp)
 {
-	kthread_stop(mp->m_sync_task);
+	cancel_delayed_work_sync(&mp->m_sync_work);
+	cancel_delayed_work_sync(&mp->m_reclaim_work);
+	cancel_work_sync(&mp->m_flush_work);
 }
 
 void
@@ -601,6 +590,10 @@
 				XFS_INO_TO_AGNO(ip->i_mount, ip->i_ino),
 				XFS_ICI_RECLAIM_TAG);
 		spin_unlock(&ip->i_mount->m_perag_lock);
+
+		/* schedule periodic background inode reclaim */
+		xfs_syncd_queue_reclaim(ip->i_mount);
+
 		trace_xfs_perag_set_reclaim(ip->i_mount, pag->pag_agno,
 							-1, _RET_IP_);
 	}
@@ -1017,7 +1010,13 @@
 }
 
 /*
- * Shrinker infrastructure.
+ * Inode cache shrinker.
+ *
+ * When called we make sure that there is a background (fast) inode reclaim in
+ * progress, while we will throttle the speed of reclaim via doiing synchronous
+ * reclaim of inodes. That means if we come across dirty inodes, we wait for
+ * them to be cleaned, which we hope will not be very long due to the
+ * background walker having already kicked the IO off on those dirty inodes.
  */
 static int
 xfs_reclaim_inode_shrink(
@@ -1032,10 +1031,15 @@
 
 	mp = container_of(shrink, struct xfs_mount, m_inode_shrink);
 	if (nr_to_scan) {
+		/* kick background reclaimer and push the AIL */
+		xfs_syncd_queue_reclaim(mp);
+		xfs_ail_push_all(mp->m_ail);
+
 		if (!(gfp_mask & __GFP_FS))
 			return -1;
 
-		xfs_reclaim_inodes_ag(mp, SYNC_TRYLOCK, &nr_to_scan);
+		xfs_reclaim_inodes_ag(mp, SYNC_TRYLOCK | SYNC_WAIT,
+					&nr_to_scan);
 		/* terminate if we don't exhaust the scan */
 		if (nr_to_scan > 0)
 			return -1;
diff --git a/fs/xfs/linux-2.6/xfs_sync.h b/fs/xfs/linux-2.6/xfs_sync.h
index 32ba662..e3a6ad2 100644
--- a/fs/xfs/linux-2.6/xfs_sync.h
+++ b/fs/xfs/linux-2.6/xfs_sync.h
@@ -32,6 +32,8 @@
 #define SYNC_WAIT		0x0001	/* wait for i/o to complete */
 #define SYNC_TRYLOCK		0x0002  /* only try to lock inodes */
 
+extern struct workqueue_struct	*xfs_syncd_wq;	/* sync workqueue */
+
 int xfs_syncd_init(struct xfs_mount *mp);
 void xfs_syncd_stop(struct xfs_mount *mp);
 
diff --git a/fs/xfs/quota/xfs_qm.c b/fs/xfs/quota/xfs_qm.c
index 254ee06..69228aa 100644
--- a/fs/xfs/quota/xfs_qm.c
+++ b/fs/xfs/quota/xfs_qm.c
@@ -461,12 +461,10 @@
 	struct xfs_quotainfo	*q = mp->m_quotainfo;
 	int			recl;
 	struct xfs_dquot	*dqp;
-	int			niters;
 	int			error;
 
 	if (!q)
 		return 0;
-	niters = 0;
 again:
 	mutex_lock(&q->qi_dqlist_lock);
 	list_for_each_entry(dqp, &q->qi_dqlist, q_mplist) {
@@ -1314,14 +1312,9 @@
 {
 	xfs_buf_t	*bp;
 	int		error;
-	int		notcommitted;
-	int		incr;
 	int		type;
 
 	ASSERT(blkcnt > 0);
-	notcommitted = 0;
-	incr = (blkcnt > XFS_QM_MAX_DQCLUSTER_LOGSZ) ?
-		XFS_QM_MAX_DQCLUSTER_LOGSZ : blkcnt;
 	type = flags & XFS_QMOPT_UQUOTA ? XFS_DQ_USER :
 		(flags & XFS_QMOPT_PQUOTA ? XFS_DQ_PROJ : XFS_DQ_GROUP);
 	error = 0;
diff --git a/fs/xfs/quota/xfs_qm.h b/fs/xfs/quota/xfs_qm.h
index c9446f1..567b29b 100644
--- a/fs/xfs/quota/xfs_qm.h
+++ b/fs/xfs/quota/xfs_qm.h
@@ -65,11 +65,6 @@
  * block in the dquot/xqm code.
  */
 #define XFS_DQUOT_CLUSTER_SIZE_FSB	(xfs_filblks_t)1
-/*
- * When doing a quotacheck, we log dquot clusters of this many FSBs at most
- * in a single transaction. We don't want to ask for too huge a log reservation.
- */
-#define XFS_QM_MAX_DQCLUSTER_LOGSZ	3
 
 typedef xfs_dqhash_t	xfs_dqlist_t;
 
diff --git a/fs/xfs/quota/xfs_qm_syscalls.c b/fs/xfs/quota/xfs_qm_syscalls.c
index 0d62a07..2dadb15 100644
--- a/fs/xfs/quota/xfs_qm_syscalls.c
+++ b/fs/xfs/quota/xfs_qm_syscalls.c
@@ -313,14 +313,12 @@
 {
 	int		error;
 	uint		qf;
-	uint		accflags;
 	__int64_t	sbflags;
 
 	flags &= (XFS_ALL_QUOTA_ACCT | XFS_ALL_QUOTA_ENFD);
 	/*
 	 * Switching on quota accounting must be done at mount time.
 	 */
-	accflags = flags & XFS_ALL_QUOTA_ACCT;
 	flags &= ~(XFS_ALL_QUOTA_ACCT);
 
 	sbflags = 0;
diff --git a/fs/xfs/xfs_alloc.c b/fs/xfs/xfs_alloc.c
index 4bc3c64..27d64d7 100644
--- a/fs/xfs/xfs_alloc.c
+++ b/fs/xfs/xfs_alloc.c
@@ -2395,17 +2395,33 @@
 	memset(&args, 0, sizeof(xfs_alloc_arg_t));
 	args.tp = tp;
 	args.mp = tp->t_mountp;
+
+	/*
+	 * validate that the block number is legal - the enables us to detect
+	 * and handle a silent filesystem corruption rather than crashing.
+	 */
 	args.agno = XFS_FSB_TO_AGNO(args.mp, bno);
-	ASSERT(args.agno < args.mp->m_sb.sb_agcount);
+	if (args.agno >= args.mp->m_sb.sb_agcount)
+		return EFSCORRUPTED;
+
 	args.agbno = XFS_FSB_TO_AGBNO(args.mp, bno);
+	if (args.agbno >= args.mp->m_sb.sb_agblocks)
+		return EFSCORRUPTED;
+
 	args.pag = xfs_perag_get(args.mp, args.agno);
-	if ((error = xfs_alloc_fix_freelist(&args, XFS_ALLOC_FLAG_FREEING)))
+	ASSERT(args.pag);
+
+	error = xfs_alloc_fix_freelist(&args, XFS_ALLOC_FLAG_FREEING);
+	if (error)
 		goto error0;
-#ifdef DEBUG
-	ASSERT(args.agbp != NULL);
-	ASSERT((args.agbno + len) <=
-		be32_to_cpu(XFS_BUF_TO_AGF(args.agbp)->agf_length));
-#endif
+
+	/* validate the extent size is legal now we have the agf locked */
+	if (args.agbno + len >
+			be32_to_cpu(XFS_BUF_TO_AGF(args.agbp)->agf_length)) {
+		error = EFSCORRUPTED;
+		goto error0;
+	}
+
 	error = xfs_free_ag_extent(tp, args.agbp, args.agno, args.agbno, len, 0);
 error0:
 	xfs_perag_put(args.pag);
diff --git a/fs/xfs/xfs_inode_item.c b/fs/xfs/xfs_inode_item.c
index 46cc401..576fdfe 100644
--- a/fs/xfs/xfs_inode_item.c
+++ b/fs/xfs/xfs_inode_item.c
@@ -198,6 +198,41 @@
 }
 
 /*
+ * xfs_inode_item_format_extents - convert in-core extents to on-disk form
+ *
+ * For either the data or attr fork in extent format, we need to endian convert
+ * the in-core extent as we place them into the on-disk inode. In this case, we
+ * need to do this conversion before we write the extents into the log. Because
+ * we don't have the disk inode to write into here, we allocate a buffer and
+ * format the extents into it via xfs_iextents_copy(). We free the buffer in
+ * the unlock routine after the copy for the log has been made.
+ *
+ * In the case of the data fork, the in-core and on-disk fork sizes can be
+ * different due to delayed allocation extents. We only log on-disk extents
+ * here, so always use the physical fork size to determine the size of the
+ * buffer we need to allocate.
+ */
+STATIC void
+xfs_inode_item_format_extents(
+	struct xfs_inode	*ip,
+	struct xfs_log_iovec	*vecp,
+	int			whichfork,
+	int			type)
+{
+	xfs_bmbt_rec_t		*ext_buffer;
+
+	ext_buffer = kmem_alloc(XFS_IFORK_SIZE(ip, whichfork), KM_SLEEP);
+	if (whichfork == XFS_DATA_FORK)
+		ip->i_itemp->ili_extents_buf = ext_buffer;
+	else
+		ip->i_itemp->ili_aextents_buf = ext_buffer;
+
+	vecp->i_addr = ext_buffer;
+	vecp->i_len = xfs_iextents_copy(ip, ext_buffer, whichfork);
+	vecp->i_type = type;
+}
+
+/*
  * This is called to fill in the vector of log iovecs for the
  * given inode log item.  It fills the first item with an inode
  * log format structure, the second with the on-disk inode structure,
@@ -213,7 +248,6 @@
 	struct xfs_inode	*ip = iip->ili_inode;
 	uint			nvecs;
 	size_t			data_bytes;
-	xfs_bmbt_rec_t		*ext_buffer;
 	xfs_mount_t		*mp;
 
 	vecp->i_addr = &iip->ili_format;
@@ -320,22 +354,8 @@
 			} else
 #endif
 			{
-				/*
-				 * There are delayed allocation extents
-				 * in the inode, or we need to convert
-				 * the extents to on disk format.
-				 * Use xfs_iextents_copy()
-				 * to copy only the real extents into
-				 * a separate buffer.  We'll free the
-				 * buffer in the unlock routine.
-				 */
-				ext_buffer = kmem_alloc(ip->i_df.if_bytes,
-					KM_SLEEP);
-				iip->ili_extents_buf = ext_buffer;
-				vecp->i_addr = ext_buffer;
-				vecp->i_len = xfs_iextents_copy(ip, ext_buffer,
-						XFS_DATA_FORK);
-				vecp->i_type = XLOG_REG_TYPE_IEXT;
+				xfs_inode_item_format_extents(ip, vecp,
+					XFS_DATA_FORK, XLOG_REG_TYPE_IEXT);
 			}
 			ASSERT(vecp->i_len <= ip->i_df.if_bytes);
 			iip->ili_format.ilf_dsize = vecp->i_len;
@@ -445,19 +465,12 @@
 			 */
 			vecp->i_addr = ip->i_afp->if_u1.if_extents;
 			vecp->i_len = ip->i_afp->if_bytes;
+			vecp->i_type = XLOG_REG_TYPE_IATTR_EXT;
 #else
 			ASSERT(iip->ili_aextents_buf == NULL);
-			/*
-			 * Need to endian flip before logging
-			 */
-			ext_buffer = kmem_alloc(ip->i_afp->if_bytes,
-				KM_SLEEP);
-			iip->ili_aextents_buf = ext_buffer;
-			vecp->i_addr = ext_buffer;
-			vecp->i_len = xfs_iextents_copy(ip, ext_buffer,
-					XFS_ATTR_FORK);
+			xfs_inode_item_format_extents(ip, vecp,
+					XFS_ATTR_FORK, XLOG_REG_TYPE_IATTR_EXT);
 #endif
-			vecp->i_type = XLOG_REG_TYPE_IATTR_EXT;
 			iip->ili_format.ilf_asize = vecp->i_len;
 			vecp++;
 			nvecs++;
diff --git a/fs/xfs/xfs_itable.c b/fs/xfs/xfs_itable.c
index dc1882a..751e94f 100644
--- a/fs/xfs/xfs_itable.c
+++ b/fs/xfs/xfs_itable.c
@@ -204,7 +204,6 @@
 	xfs_agi_t		*agi;	/* agi header data */
 	xfs_agino_t		agino;	/* inode # in allocation group */
 	xfs_agnumber_t		agno;	/* allocation group number */
-	xfs_daddr_t		bno;	/* inode cluster start daddr */
 	int			chunkidx; /* current index into inode chunk */
 	int			clustidx; /* current index into inode cluster */
 	xfs_btree_cur_t		*cur;	/* btree cursor for ialloc btree */
@@ -463,7 +462,6 @@
 						 mp->m_sb.sb_inopblog);
 				}
 				ino = XFS_AGINO_TO_INO(mp, agno, agino);
-				bno = XFS_AGB_TO_DADDR(mp, agno, agbno);
 				/*
 				 * Skip if this inode is free.
 				 */
diff --git a/fs/xfs/xfs_log.c b/fs/xfs/xfs_log.c
index 25efa9b..b612ce4 100644
--- a/fs/xfs/xfs_log.c
+++ b/fs/xfs/xfs_log.c
@@ -761,7 +761,7 @@
 		break;
 	case XLOG_STATE_COVER_NEED:
 	case XLOG_STATE_COVER_NEED2:
-		if (!xfs_trans_ail_tail(log->l_ailp) &&
+		if (!xfs_ail_min_lsn(log->l_ailp) &&
 		    xlog_iclogs_empty(log)) {
 			if (log->l_covered_state == XLOG_STATE_COVER_NEED)
 				log->l_covered_state = XLOG_STATE_COVER_DONE;
@@ -801,7 +801,7 @@
 	xfs_lsn_t		tail_lsn;
 	struct log		*log = mp->m_log;
 
-	tail_lsn = xfs_trans_ail_tail(mp->m_ail);
+	tail_lsn = xfs_ail_min_lsn(mp->m_ail);
 	if (!tail_lsn)
 		tail_lsn = atomic64_read(&log->l_last_sync_lsn);
 
@@ -1239,7 +1239,7 @@
 	 * the filesystem is shutting down.
 	 */
 	if (!XLOG_FORCED_SHUTDOWN(log))
-		xfs_trans_ail_push(log->l_ailp, threshold_lsn);
+		xfs_ail_push(log->l_ailp, threshold_lsn);
 }
 
 /*
@@ -3407,6 +3407,17 @@
 		xfs_emerg(log->l_mp, "%s: invalid ptr", __func__);
 }
 
+/*
+ * Check to make sure the grant write head didn't just over lap the tail.  If
+ * the cycles are the same, we can't be overlapping.  Otherwise, make sure that
+ * the cycles differ by exactly one and check the byte count.
+ *
+ * This check is run unlocked, so can give false positives. Rather than assert
+ * on failures, use a warn-once flag and a panic tag to allow the admin to
+ * determine if they want to panic the machine when such an error occurs. For
+ * debug kernels this will have the same effect as using an assert but, unlinke
+ * an assert, it can be turned off at runtime.
+ */
 STATIC void
 xlog_verify_grant_tail(
 	struct log	*log)
@@ -3414,17 +3425,22 @@
 	int		tail_cycle, tail_blocks;
 	int		cycle, space;
 
-	/*
-	 * Check to make sure the grant write head didn't just over lap the
-	 * tail.  If the cycles are the same, we can't be overlapping.
-	 * Otherwise, make sure that the cycles differ by exactly one and
-	 * check the byte count.
-	 */
 	xlog_crack_grant_head(&log->l_grant_write_head, &cycle, &space);
 	xlog_crack_atomic_lsn(&log->l_tail_lsn, &tail_cycle, &tail_blocks);
 	if (tail_cycle != cycle) {
-		ASSERT(cycle - 1 == tail_cycle);
-		ASSERT(space <= BBTOB(tail_blocks));
+		if (cycle - 1 != tail_cycle &&
+		    !(log->l_flags & XLOG_TAIL_WARN)) {
+			xfs_alert_tag(log->l_mp, XFS_PTAG_LOGRES,
+				"%s: cycle - 1 != tail_cycle", __func__);
+			log->l_flags |= XLOG_TAIL_WARN;
+		}
+
+		if (space > BBTOB(tail_blocks) &&
+		    !(log->l_flags & XLOG_TAIL_WARN)) {
+			xfs_alert_tag(log->l_mp, XFS_PTAG_LOGRES,
+				"%s: space > BBTOB(tail_blocks)", __func__);
+			log->l_flags |= XLOG_TAIL_WARN;
+		}
 	}
 }
 
diff --git a/fs/xfs/xfs_log_priv.h b/fs/xfs/xfs_log_priv.h
index ffae692..5864850 100644
--- a/fs/xfs/xfs_log_priv.h
+++ b/fs/xfs/xfs_log_priv.h
@@ -144,6 +144,7 @@
 #define	XLOG_RECOVERY_NEEDED	0x4	/* log was recovered */
 #define XLOG_IO_ERROR		0x8	/* log hit an I/O error, and being
 					   shutdown */
+#define XLOG_TAIL_WARN		0x10	/* log tail verify warning issued */
 
 #ifdef __KERNEL__
 /*
diff --git a/fs/xfs/xfs_mount.h b/fs/xfs/xfs_mount.h
index a62e897..19af0ab 100644
--- a/fs/xfs/xfs_mount.h
+++ b/fs/xfs/xfs_mount.h
@@ -203,12 +203,9 @@
 	struct mutex		m_icsb_mutex;	/* balancer sync lock */
 #endif
 	struct xfs_mru_cache	*m_filestream;  /* per-mount filestream data */
-	struct task_struct	*m_sync_task;	/* generalised sync thread */
-	xfs_sync_work_t		m_sync_work;	/* work item for VFS_SYNC */
-	struct list_head	m_sync_list;	/* sync thread work item list */
-	spinlock_t		m_sync_lock;	/* work item list lock */
-	int			m_sync_seq;	/* sync thread generation no. */
-	wait_queue_head_t	m_wait_single_sync_task;
+	struct delayed_work	m_sync_work;	/* background sync work */
+	struct delayed_work	m_reclaim_work;	/* background inode reclaim */
+	struct work_struct	m_flush_work;	/* background inode flush */
 	__int64_t		m_update_flags;	/* sb flags we need to update
 						   on the next remount,rw */
 	struct shrinker		m_inode_shrink;	/* inode reclaim shrinker */
diff --git a/fs/xfs/xfs_trans_ail.c b/fs/xfs/xfs_trans_ail.c
index 12aff95..acdb92f 100644
--- a/fs/xfs/xfs_trans_ail.c
+++ b/fs/xfs/xfs_trans_ail.c
@@ -28,74 +28,138 @@
 #include "xfs_trans_priv.h"
 #include "xfs_error.h"
 
-STATIC void xfs_ail_splice(struct xfs_ail *, struct list_head *, xfs_lsn_t);
-STATIC void xfs_ail_delete(struct xfs_ail *, xfs_log_item_t *);
-STATIC xfs_log_item_t * xfs_ail_min(struct xfs_ail *);
-STATIC xfs_log_item_t * xfs_ail_next(struct xfs_ail *, xfs_log_item_t *);
+struct workqueue_struct	*xfs_ail_wq;	/* AIL workqueue */
 
 #ifdef DEBUG
-STATIC void xfs_ail_check(struct xfs_ail *, xfs_log_item_t *);
-#else
+/*
+ * Check that the list is sorted as it should be.
+ */
+STATIC void
+xfs_ail_check(
+	struct xfs_ail	*ailp,
+	xfs_log_item_t	*lip)
+{
+	xfs_log_item_t	*prev_lip;
+
+	if (list_empty(&ailp->xa_ail))
+		return;
+
+	/*
+	 * Check the next and previous entries are valid.
+	 */
+	ASSERT((lip->li_flags & XFS_LI_IN_AIL) != 0);
+	prev_lip = list_entry(lip->li_ail.prev, xfs_log_item_t, li_ail);
+	if (&prev_lip->li_ail != &ailp->xa_ail)
+		ASSERT(XFS_LSN_CMP(prev_lip->li_lsn, lip->li_lsn) <= 0);
+
+	prev_lip = list_entry(lip->li_ail.next, xfs_log_item_t, li_ail);
+	if (&prev_lip->li_ail != &ailp->xa_ail)
+		ASSERT(XFS_LSN_CMP(prev_lip->li_lsn, lip->li_lsn) >= 0);
+
+
+#ifdef XFS_TRANS_DEBUG
+	/*
+	 * Walk the list checking lsn ordering, and that every entry has the
+	 * XFS_LI_IN_AIL flag set. This is really expensive, so only do it
+	 * when specifically debugging the transaction subsystem.
+	 */
+	prev_lip = list_entry(&ailp->xa_ail, xfs_log_item_t, li_ail);
+	list_for_each_entry(lip, &ailp->xa_ail, li_ail) {
+		if (&prev_lip->li_ail != &ailp->xa_ail)
+			ASSERT(XFS_LSN_CMP(prev_lip->li_lsn, lip->li_lsn) <= 0);
+		ASSERT((lip->li_flags & XFS_LI_IN_AIL) != 0);
+		prev_lip = lip;
+	}
+#endif /* XFS_TRANS_DEBUG */
+}
+#else /* !DEBUG */
 #define	xfs_ail_check(a,l)
 #endif /* DEBUG */
 
+/*
+ * Return a pointer to the first item in the AIL.  If the AIL is empty, then
+ * return NULL.
+ */
+static xfs_log_item_t *
+xfs_ail_min(
+	struct xfs_ail  *ailp)
+{
+	if (list_empty(&ailp->xa_ail))
+		return NULL;
+
+	return list_first_entry(&ailp->xa_ail, xfs_log_item_t, li_ail);
+}
+
+ /*
+ * Return a pointer to the last item in the AIL.  If the AIL is empty, then
+ * return NULL.
+ */
+static xfs_log_item_t *
+xfs_ail_max(
+	struct xfs_ail  *ailp)
+{
+	if (list_empty(&ailp->xa_ail))
+		return NULL;
+
+	return list_entry(ailp->xa_ail.prev, xfs_log_item_t, li_ail);
+}
 
 /*
- * This is called by the log manager code to determine the LSN
- * of the tail of the log.  This is exactly the LSN of the first
- * item in the AIL.  If the AIL is empty, then this function
- * returns 0.
+ * Return a pointer to the item which follows the given item in the AIL.  If
+ * the given item is the last item in the list, then return NULL.
+ */
+static xfs_log_item_t *
+xfs_ail_next(
+	struct xfs_ail  *ailp,
+	xfs_log_item_t  *lip)
+{
+	if (lip->li_ail.next == &ailp->xa_ail)
+		return NULL;
+
+	return list_first_entry(&lip->li_ail, xfs_log_item_t, li_ail);
+}
+
+/*
+ * This is called by the log manager code to determine the LSN of the tail of
+ * the log.  This is exactly the LSN of the first item in the AIL.  If the AIL
+ * is empty, then this function returns 0.
  *
- * We need the AIL lock in order to get a coherent read of the
- * lsn of the last item in the AIL.
+ * We need the AIL lock in order to get a coherent read of the lsn of the last
+ * item in the AIL.
  */
 xfs_lsn_t
-xfs_trans_ail_tail(
+xfs_ail_min_lsn(
 	struct xfs_ail	*ailp)
 {
-	xfs_lsn_t	lsn;
+	xfs_lsn_t	lsn = 0;
 	xfs_log_item_t	*lip;
 
 	spin_lock(&ailp->xa_lock);
 	lip = xfs_ail_min(ailp);
-	if (lip == NULL) {
-		lsn = (xfs_lsn_t)0;
-	} else {
+	if (lip)
 		lsn = lip->li_lsn;
-	}
 	spin_unlock(&ailp->xa_lock);
 
 	return lsn;
 }
 
 /*
- * xfs_trans_push_ail
- *
- * This routine is called to move the tail of the AIL forward.  It does this by
- * trying to flush items in the AIL whose lsns are below the given
- * threshold_lsn.
- *
- * the push is run asynchronously in a separate thread, so we return the tail
- * of the log right now instead of the tail after the push. This means we will
- * either continue right away, or we will sleep waiting on the async thread to
- * do its work.
- *
- * We do this unlocked - we only need to know whether there is anything in the
- * AIL at the time we are called. We don't need to access the contents of
- * any of the objects, so the lock is not needed.
+ * Return the maximum lsn held in the AIL, or zero if the AIL is empty.
  */
-void
-xfs_trans_ail_push(
-	struct xfs_ail	*ailp,
-	xfs_lsn_t	threshold_lsn)
+static xfs_lsn_t
+xfs_ail_max_lsn(
+	struct xfs_ail  *ailp)
 {
-	xfs_log_item_t	*lip;
+	xfs_lsn_t       lsn = 0;
+	xfs_log_item_t  *lip;
 
-	lip = xfs_ail_min(ailp);
-	if (lip && !XFS_FORCED_SHUTDOWN(ailp->xa_mount)) {
-		if (XFS_LSN_CMP(threshold_lsn, ailp->xa_target) > 0)
-			xfsaild_wakeup(ailp, threshold_lsn);
-	}
+	spin_lock(&ailp->xa_lock);
+	lip = xfs_ail_max(ailp);
+	if (lip)
+		lsn = lip->li_lsn;
+	spin_unlock(&ailp->xa_lock);
+
+	return lsn;
 }
 
 /*
@@ -236,16 +300,57 @@
 }
 
 /*
- * xfsaild_push does the work of pushing on the AIL.  Returning a timeout of
- * zero indicates that the caller should sleep until woken.
+ * splice the log item list into the AIL at the given LSN.
  */
-long
-xfsaild_push(
-	struct xfs_ail	*ailp,
-	xfs_lsn_t	*last_lsn)
+static void
+xfs_ail_splice(
+	struct xfs_ail  *ailp,
+	struct list_head *list,
+	xfs_lsn_t       lsn)
 {
-	long		tout = 0;
-	xfs_lsn_t	last_pushed_lsn = *last_lsn;
+	xfs_log_item_t  *next_lip;
+
+	/* If the list is empty, just insert the item.  */
+	if (list_empty(&ailp->xa_ail)) {
+		list_splice(list, &ailp->xa_ail);
+		return;
+	}
+
+	list_for_each_entry_reverse(next_lip, &ailp->xa_ail, li_ail) {
+		if (XFS_LSN_CMP(next_lip->li_lsn, lsn) <= 0)
+			break;
+	}
+
+	ASSERT(&next_lip->li_ail == &ailp->xa_ail ||
+	       XFS_LSN_CMP(next_lip->li_lsn, lsn) <= 0);
+
+	list_splice_init(list, &next_lip->li_ail);
+}
+
+/*
+ * Delete the given item from the AIL.  Return a pointer to the item.
+ */
+static void
+xfs_ail_delete(
+	struct xfs_ail  *ailp,
+	xfs_log_item_t  *lip)
+{
+	xfs_ail_check(ailp, lip);
+	list_del(&lip->li_ail);
+	xfs_trans_ail_cursor_clear(ailp, lip);
+}
+
+/*
+ * xfs_ail_worker does the work of pushing on the AIL. It will requeue itself
+ * to run at a later time if there is more work to do to complete the push.
+ */
+STATIC void
+xfs_ail_worker(
+	struct work_struct *work)
+{
+	struct xfs_ail	*ailp = container_of(to_delayed_work(work),
+					struct xfs_ail, xa_work);
+	long		tout;
 	xfs_lsn_t	target =  ailp->xa_target;
 	xfs_lsn_t	lsn;
 	xfs_log_item_t	*lip;
@@ -256,15 +361,15 @@
 
 	spin_lock(&ailp->xa_lock);
 	xfs_trans_ail_cursor_init(ailp, cur);
-	lip = xfs_trans_ail_cursor_first(ailp, cur, *last_lsn);
+	lip = xfs_trans_ail_cursor_first(ailp, cur, ailp->xa_last_pushed_lsn);
 	if (!lip || XFS_FORCED_SHUTDOWN(mp)) {
 		/*
 		 * AIL is empty or our push has reached the end.
 		 */
 		xfs_trans_ail_cursor_done(ailp, cur);
 		spin_unlock(&ailp->xa_lock);
-		*last_lsn = 0;
-		return tout;
+		ailp->xa_last_pushed_lsn = 0;
+		return;
 	}
 
 	XFS_STATS_INC(xs_push_ail);
@@ -301,13 +406,13 @@
 		case XFS_ITEM_SUCCESS:
 			XFS_STATS_INC(xs_push_ail_success);
 			IOP_PUSH(lip);
-			last_pushed_lsn = lsn;
+			ailp->xa_last_pushed_lsn = lsn;
 			break;
 
 		case XFS_ITEM_PUSHBUF:
 			XFS_STATS_INC(xs_push_ail_pushbuf);
 			IOP_PUSHBUF(lip);
-			last_pushed_lsn = lsn;
+			ailp->xa_last_pushed_lsn = lsn;
 			push_xfsbufd = 1;
 			break;
 
@@ -319,7 +424,7 @@
 
 		case XFS_ITEM_LOCKED:
 			XFS_STATS_INC(xs_push_ail_locked);
-			last_pushed_lsn = lsn;
+			ailp->xa_last_pushed_lsn = lsn;
 			stuck++;
 			break;
 
@@ -374,9 +479,23 @@
 		wake_up_process(mp->m_ddev_targp->bt_task);
 	}
 
+	/* assume we have more work to do in a short while */
+	tout = 10;
 	if (!count) {
 		/* We're past our target or empty, so idle */
-		last_pushed_lsn = 0;
+		ailp->xa_last_pushed_lsn = 0;
+
+		/*
+		 * Check for an updated push target before clearing the
+		 * XFS_AIL_PUSHING_BIT. If the target changed, we've got more
+		 * work to do. Wait a bit longer before starting that work.
+		 */
+		smp_rmb();
+		if (ailp->xa_target == target) {
+			clear_bit(XFS_AIL_PUSHING_BIT, &ailp->xa_flags);
+			return;
+		}
+		tout = 50;
 	} else if (XFS_LSN_CMP(lsn, target) >= 0) {
 		/*
 		 * We reached the target so wait a bit longer for I/O to
@@ -384,7 +503,7 @@
 		 * start the next scan from the start of the AIL.
 		 */
 		tout = 50;
-		last_pushed_lsn = 0;
+		ailp->xa_last_pushed_lsn = 0;
 	} else if ((stuck * 100) / count > 90) {
 		/*
 		 * Either there is a lot of contention on the AIL or we
@@ -396,14 +515,61 @@
 		 * continuing from where we were.
 		 */
 		tout = 20;
-	} else {
-		/* more to do, but wait a short while before continuing */
-		tout = 10;
 	}
-	*last_lsn = last_pushed_lsn;
-	return tout;
+
+	/* There is more to do, requeue us.  */
+	queue_delayed_work(xfs_syncd_wq, &ailp->xa_work,
+					msecs_to_jiffies(tout));
 }
 
+/*
+ * This routine is called to move the tail of the AIL forward.  It does this by
+ * trying to flush items in the AIL whose lsns are below the given
+ * threshold_lsn.
+ *
+ * The push is run asynchronously in a workqueue, which means the caller needs
+ * to handle waiting on the async flush for space to become available.
+ * We don't want to interrupt any push that is in progress, hence we only queue
+ * work if we set the pushing bit approriately.
+ *
+ * We do this unlocked - we only need to know whether there is anything in the
+ * AIL at the time we are called. We don't need to access the contents of
+ * any of the objects, so the lock is not needed.
+ */
+void
+xfs_ail_push(
+	struct xfs_ail	*ailp,
+	xfs_lsn_t	threshold_lsn)
+{
+	xfs_log_item_t	*lip;
+
+	lip = xfs_ail_min(ailp);
+	if (!lip || XFS_FORCED_SHUTDOWN(ailp->xa_mount) ||
+	    XFS_LSN_CMP(threshold_lsn, ailp->xa_target) <= 0)
+		return;
+
+	/*
+	 * Ensure that the new target is noticed in push code before it clears
+	 * the XFS_AIL_PUSHING_BIT.
+	 */
+	smp_wmb();
+	ailp->xa_target = threshold_lsn;
+	if (!test_and_set_bit(XFS_AIL_PUSHING_BIT, &ailp->xa_flags))
+		queue_delayed_work(xfs_syncd_wq, &ailp->xa_work, 0);
+}
+
+/*
+ * Push out all items in the AIL immediately
+ */
+void
+xfs_ail_push_all(
+	struct xfs_ail  *ailp)
+{
+	xfs_lsn_t       threshold_lsn = xfs_ail_max_lsn(ailp);
+
+	if (threshold_lsn)
+		xfs_ail_push(ailp, threshold_lsn);
+}
 
 /*
  * This is to be called when an item is unlocked that may have
@@ -615,7 +781,6 @@
 	xfs_mount_t	*mp)
 {
 	struct xfs_ail	*ailp;
-	int		error;
 
 	ailp = kmem_zalloc(sizeof(struct xfs_ail), KM_MAYFAIL);
 	if (!ailp)
@@ -624,15 +789,9 @@
 	ailp->xa_mount = mp;
 	INIT_LIST_HEAD(&ailp->xa_ail);
 	spin_lock_init(&ailp->xa_lock);
-	error = xfsaild_start(ailp);
-	if (error)
-		goto out_free_ailp;
+	INIT_DELAYED_WORK(&ailp->xa_work, xfs_ail_worker);
 	mp->m_ail = ailp;
 	return 0;
-
-out_free_ailp:
-	kmem_free(ailp);
-	return error;
 }
 
 void
@@ -641,124 +800,6 @@
 {
 	struct xfs_ail	*ailp = mp->m_ail;
 
-	xfsaild_stop(ailp);
+	cancel_delayed_work_sync(&ailp->xa_work);
 	kmem_free(ailp);
 }
-
-/*
- * splice the log item list into the AIL at the given LSN.
- */
-STATIC void
-xfs_ail_splice(
-	struct xfs_ail	*ailp,
-	struct list_head *list,
-	xfs_lsn_t	lsn)
-{
-	xfs_log_item_t	*next_lip;
-
-	/*
-	 * If the list is empty, just insert the item.
-	 */
-	if (list_empty(&ailp->xa_ail)) {
-		list_splice(list, &ailp->xa_ail);
-		return;
-	}
-
-	list_for_each_entry_reverse(next_lip, &ailp->xa_ail, li_ail) {
-		if (XFS_LSN_CMP(next_lip->li_lsn, lsn) <= 0)
-			break;
-	}
-
-	ASSERT((&next_lip->li_ail == &ailp->xa_ail) ||
-	       (XFS_LSN_CMP(next_lip->li_lsn, lsn) <= 0));
-
-	list_splice_init(list, &next_lip->li_ail);
-	return;
-}
-
-/*
- * Delete the given item from the AIL.  Return a pointer to the item.
- */
-STATIC void
-xfs_ail_delete(
-	struct xfs_ail	*ailp,
-	xfs_log_item_t	*lip)
-{
-	xfs_ail_check(ailp, lip);
-	list_del(&lip->li_ail);
-	xfs_trans_ail_cursor_clear(ailp, lip);
-}
-
-/*
- * Return a pointer to the first item in the AIL.
- * If the AIL is empty, then return NULL.
- */
-STATIC xfs_log_item_t *
-xfs_ail_min(
-	struct xfs_ail	*ailp)
-{
-	if (list_empty(&ailp->xa_ail))
-		return NULL;
-
-	return list_first_entry(&ailp->xa_ail, xfs_log_item_t, li_ail);
-}
-
-/*
- * Return a pointer to the item which follows
- * the given item in the AIL.  If the given item
- * is the last item in the list, then return NULL.
- */
-STATIC xfs_log_item_t *
-xfs_ail_next(
-	struct xfs_ail	*ailp,
-	xfs_log_item_t	*lip)
-{
-	if (lip->li_ail.next == &ailp->xa_ail)
-		return NULL;
-
-	return list_first_entry(&lip->li_ail, xfs_log_item_t, li_ail);
-}
-
-#ifdef DEBUG
-/*
- * Check that the list is sorted as it should be.
- */
-STATIC void
-xfs_ail_check(
-	struct xfs_ail	*ailp,
-	xfs_log_item_t	*lip)
-{
-	xfs_log_item_t	*prev_lip;
-
-	if (list_empty(&ailp->xa_ail))
-		return;
-
-	/*
-	 * Check the next and previous entries are valid.
-	 */
-	ASSERT((lip->li_flags & XFS_LI_IN_AIL) != 0);
-	prev_lip = list_entry(lip->li_ail.prev, xfs_log_item_t, li_ail);
-	if (&prev_lip->li_ail != &ailp->xa_ail)
-		ASSERT(XFS_LSN_CMP(prev_lip->li_lsn, lip->li_lsn) <= 0);
-
-	prev_lip = list_entry(lip->li_ail.next, xfs_log_item_t, li_ail);
-	if (&prev_lip->li_ail != &ailp->xa_ail)
-		ASSERT(XFS_LSN_CMP(prev_lip->li_lsn, lip->li_lsn) >= 0);
-
-
-#ifdef XFS_TRANS_DEBUG
-	/*
-	 * Walk the list checking lsn ordering, and that every entry has the
-	 * XFS_LI_IN_AIL flag set. This is really expensive, so only do it
-	 * when specifically debugging the transaction subsystem.
-	 */
-	prev_lip = list_entry(&ailp->xa_ail, xfs_log_item_t, li_ail);
-	list_for_each_entry(lip, &ailp->xa_ail, li_ail) {
-		if (&prev_lip->li_ail != &ailp->xa_ail)
-			ASSERT(XFS_LSN_CMP(prev_lip->li_lsn, lip->li_lsn) <= 0);
-		ASSERT((lip->li_flags & XFS_LI_IN_AIL) != 0);
-		prev_lip = lip;
-	}
-#endif /* XFS_TRANS_DEBUG */
-}
-#endif /* DEBUG */
diff --git a/fs/xfs/xfs_trans_priv.h b/fs/xfs/xfs_trans_priv.h
index 35162c2..6b164e9e 100644
--- a/fs/xfs/xfs_trans_priv.h
+++ b/fs/xfs/xfs_trans_priv.h
@@ -65,16 +65,22 @@
 struct xfs_ail {
 	struct xfs_mount	*xa_mount;
 	struct list_head	xa_ail;
-	uint			xa_gen;
-	struct task_struct	*xa_task;
 	xfs_lsn_t		xa_target;
 	struct xfs_ail_cursor	xa_cursors;
 	spinlock_t		xa_lock;
+	struct delayed_work	xa_work;
+	xfs_lsn_t		xa_last_pushed_lsn;
+	unsigned long		xa_flags;
 };
 
+#define XFS_AIL_PUSHING_BIT	0
+
 /*
  * From xfs_trans_ail.c
  */
+
+extern struct workqueue_struct	*xfs_ail_wq;	/* AIL workqueue */
+
 void	xfs_trans_ail_update_bulk(struct xfs_ail *ailp,
 				struct xfs_log_item **log_items, int nr_items,
 				xfs_lsn_t lsn) __releases(ailp->xa_lock);
@@ -98,12 +104,13 @@
 	xfs_trans_ail_delete_bulk(ailp, &lip, 1);
 }
 
-void			xfs_trans_ail_push(struct xfs_ail *, xfs_lsn_t);
+void			xfs_ail_push(struct xfs_ail *, xfs_lsn_t);
+void			xfs_ail_push_all(struct xfs_ail *);
+xfs_lsn_t		xfs_ail_min_lsn(struct xfs_ail *ailp);
+
 void			xfs_trans_unlocked_item(struct xfs_ail *,
 					xfs_log_item_t *);
 
-xfs_lsn_t		xfs_trans_ail_tail(struct xfs_ail *ailp);
-
 struct xfs_log_item	*xfs_trans_ail_cursor_first(struct xfs_ail *ailp,
 					struct xfs_ail_cursor *cur,
 					xfs_lsn_t lsn);
@@ -112,11 +119,6 @@
 void			xfs_trans_ail_cursor_done(struct xfs_ail *ailp,
 					struct xfs_ail_cursor *cur);
 
-long	xfsaild_push(struct xfs_ail *, xfs_lsn_t *);
-void	xfsaild_wakeup(struct xfs_ail *, xfs_lsn_t);
-int	xfsaild_start(struct xfs_ail *);
-void	xfsaild_stop(struct xfs_ail *);
-
 #if BITS_PER_LONG != 64
 static inline void
 xfs_trans_ail_copy_lsn(