netfs: Add a struct to group modifications together and flushed in order

Define struct netfs_flush_group for use in grouping related modifications
together so that they can be flushed in some sort of order (ceph snaps, for
instance).  The netfs gets to start a new flush group when it needs to and
this is added to the end of the fifo.  When writes need to be written out,
the flusher can only flush modifications from a group if all preceding
groups have already been flushed.

Implement object lifecycle functions for the struct.

Display the count of extant netfs_flush_group structs in proc.

Signed-off-by: David Howells <dhowells@redhat.com>
diff --git a/fs/netfs/buffered_flush.c b/fs/netfs/buffered_flush.c
index 2e40d90..32d58c1 100644
--- a/fs/netfs/buffered_flush.c
+++ b/fs/netfs/buffered_flush.c
@@ -301,6 +301,10 @@ static bool netfs_check_for_conflicting_regions(struct netfs_inode *ctx,
 			break;
 		_debug("confl? [D=%x] %lx-%lx", r->debug_id, r->first, r->last);
 
+		if (r->group != list_first_entry_or_null(&ctx->flush_groups,
+							 struct netfs_flush_group,
+							 group_link))
+			goto conflict;
 		if (ctx->ops->is_write_compatible &&
 		    !ctx->ops->is_write_compatible(ctx, file, r))
 			goto conflict;
@@ -328,7 +332,8 @@ int netfs_flush_conflicting_writes(struct netfs_inode *ctx,
 	spin_unlock(&ctx->dirty_lock);
 
 	if (check) {
-		folio_unlock(unlock_this);
+		if (unlock_this)
+			folio_unlock(unlock_this);
 		pr_warn("NEED TO FLUSH CONFLICTING REGIONS\n");
 		return -EAGAIN;
 	}
@@ -365,6 +370,8 @@ void netfs_check_dirty_list(char c, const struct list_head *list,
 	const struct list_head *p;
 	int i = 0;
 
+	if (c == 'W')
+		goto failed;
 	return;
 
 	if (list->next == list) {
@@ -423,6 +430,9 @@ static void netfs_split_out_regions(struct netfs_io_request *wreq,
 
 	spin_lock(&ctx->dirty_lock);
 
+	while (region->absorbed_by)
+		region = region->absorbed_by;
+
 	netfs_check_dirty_list('S', &ctx->dirty_regions, region);
 
 	if (wreq->first != region->first) {
@@ -460,6 +470,9 @@ static void netfs_split_out_regions(struct netfs_io_request *wreq,
 		BUG_ON(ctx->dirty_regions.prev != &ctx->dirty_regions);
 	else
 		BUG_ON(ctx->dirty_regions.prev == &ctx->dirty_regions);
+	list_for_each_entry_from(region, &wreq->regions, dirty_link) {
+		list_del_init(&region->flush_link);
+	}
 	spin_unlock(&ctx->dirty_lock);
 
 	list_for_each_entry(p, &wreq->regions, dirty_link) {
@@ -521,8 +534,64 @@ static void netfs_wait_for_writeback(struct netfs_io_request *wreq,
 }
 
 /*
- * Extend the region to be written back to include subsequent contiguously
- * dirty pages if possible, but don't sleep while doing so.
+ * Advance to the next dirty region covering the writeback that we're
+ * extending.
+ */
+static bool netfs_extend_to_next_region(struct netfs_inode *ctx,
+					struct netfs_dirty_region *start_region,
+					struct netfs_dirty_region **_region,
+					pgoff_t index)
+{
+	struct netfs_dirty_region *region = *_region, *old = NULL;
+
+	spin_lock(&ctx->dirty_lock);
+
+	/* The dirty list may have been altered whilst we were working, so
+	 * allow for the region we were focussing on to have been absorbed,
+	 * split and/or superseded.
+	 */
+	while (region->absorbed_by)
+		region = region->absorbed_by;
+
+	if (index <= region->last)
+		goto cont;
+
+	while (index < region->first)
+		region = netfs_next_region(ctx, region);
+	if (index <= region->last)
+		goto cont;
+
+	region = netfs_next_region(ctx, region);
+	if (!region)
+		goto stop;
+
+	if (region->group != (*_region)->group)
+		goto stop;
+
+	// TODO: Allow discontiguity
+	if (region->first > index)
+		goto stop;
+
+cont:
+	if (region != *_region) {
+		netfs_get_dirty_region(ctx, region, netfs_region_trace_get_wback);
+		old = *_region;
+		*_region = region;
+	}
+
+	spin_unlock(&ctx->dirty_lock);
+	if (old && old != start_region)
+		netfs_put_dirty_region(ctx, old, netfs_region_trace_put_wback);
+	return true;
+
+stop:
+	spin_unlock(&ctx->dirty_lock);
+	return false;
+}
+
+/*
+ * Extend the span to be written back to include subsequent contiguously dirty
+ * pages if possible, but don't sleep while doing so.
  *
  * If this page holds new content, then we can include filler zeros in the
  * writeback.
@@ -530,8 +599,9 @@ static void netfs_wait_for_writeback(struct netfs_io_request *wreq,
 static void netfs_extend_writeback(struct netfs_io_request *wreq,
 				   struct writeback_control *wbc,
 				   struct netfs_inode *ctx,
-				   struct netfs_dirty_region *region)
+				   struct netfs_dirty_region *start_region)
 {
+	struct netfs_dirty_region *region = start_region;
 	struct folio_batch fbatch;
 	struct folio *folio;
 	unsigned int i;
@@ -599,9 +669,15 @@ static void netfs_extend_writeback(struct netfs_io_request *wreq,
 		 * there if any of those folios are mapped.
 		 */
 		folio_batch_init(&fbatch);
-		_debug("extend %lx %lx", index, xas.xa_index);
-		rcu_read_lock();
 
+		if (index > region->last &&
+		    !netfs_extend_to_next_region(ctx, start_region, &region, index)) {
+			kdebug("stop!");
+			goto stop;
+		}
+
+		kdebug("extend D=%x %lx %lx", region->debug_id, index, xas.xa_index);
+		rcu_read_lock();
 		xas_for_each(&xas, folio, ULONG_MAX) {
 			stop = true;
 			if (xas_retry(&xas, folio))
@@ -641,6 +717,8 @@ static void netfs_extend_writeback(struct netfs_io_request *wreq,
 				break;
 			if (stop)
 				break;
+			if (index > region->last)
+				break;
 		}
 
 		if (!stop)
@@ -681,6 +759,9 @@ static void netfs_extend_writeback(struct netfs_io_request *wreq,
 	} while (!stop);
 
 	_leave(" ok [%zx]", wreq->last);
+stop:
+	if (region != start_region)
+		netfs_put_dirty_region(ctx, region, netfs_region_trace_put_wback);
 	return;
 
 nomem_cancel_wb:
@@ -693,6 +774,7 @@ static void netfs_extend_writeback(struct netfs_io_request *wreq,
 		folio_put(folio);
 	}
 	_leave(" cancel [%zx]", wreq->last);
+	goto stop;
 }
 
 /*
@@ -824,6 +906,61 @@ static int netfs_find_writeback_start(struct netfs_io_request *wreq,
 }
 
 /*
+ * Make sure there's a flush group.
+ */
+int netfs_require_flush_group(struct inode *inode, bool force)
+{
+	struct netfs_flush_group *group;
+	struct netfs_inode *ctx = netfs_inode(inode);
+
+	if (list_empty(&ctx->flush_groups) || force) {
+		kdebug("new flush group");
+		group = netfs_new_flush_group(inode, NULL);
+		if (!group)
+			return -ENOMEM;
+	}
+	return 0;
+}
+
+/*
+ * Select a region from an old flush group to write back instead of a region
+ * from the currently live flush group.
+ */
+static struct netfs_dirty_region *netfs_select_from_flush_group(
+	struct writeback_control *wbc,
+	struct netfs_inode *ctx,
+	struct netfs_flush_group *group)
+{
+	struct netfs_dirty_region *region;
+
+	region = list_first_entry_or_null(&group->region_list,
+					  struct netfs_dirty_region, flush_link);
+	if (region) {
+		kleave(" = D=%x", region->debug_id);
+		return region;
+	}
+
+	if (atomic_read(&group->nr_regions) == 0) {
+		list_del_init(&group->group_link);
+		spin_unlock(&ctx->dirty_lock);
+		goto again;
+	}
+
+	netfs_get_flush_group(group);
+	spin_unlock(&ctx->dirty_lock);
+
+	mutex_unlock(&ctx->wb_mutex);
+	kdebug("wait for flush");
+	wait_var_event(&group->nr_regions, atomic_read(&group->nr_regions) == 0);
+	kdebug("waited for flush");
+	mutex_lock(&ctx->wb_mutex);
+
+again:
+	netfs_put_flush_group(ctx, group);
+	return ERR_PTR(-EAGAIN);
+}
+
+/*
  * Flush some of the dirty queue, transforming a part of a sequence of dirty
  * regions into a block we can flush.
  *
@@ -846,8 +983,10 @@ static int netfs_select_dirty(struct netfs_io_request *wreq,
 			      pgoff_t *_first, pgoff_t last)
 {
 	struct netfs_dirty_region *region;
+	struct netfs_flush_group *group;
 	pgoff_t first = *_first;
 	pgoff_t csize = 1UL << ctx->cache_order;
+	bool advance = true;
 	int ret;
 
 	/* Round out the range we're looking through to accommodate whole cache
@@ -870,11 +1009,31 @@ static int netfs_select_dirty(struct netfs_io_request *wreq,
 
 	/* Find the first dirty region that overlaps the requested range */
 	spin_lock(&ctx->dirty_lock);
+
 	region = netfs_scan_for_region(ctx, first, last);
-	if (region) {
-		_debug("scan got R=%08x", region->debug_id);
-		//netfs_get_dirty_region(ctx, region, netfs_region_trace_get_wback);
+	if (region)
+		kdebug("scan got D=%08x", region->debug_id);
+
+	/* If the region selected is not in the bottommost flush group, we need
+	 * to flush prerequisites first.
+	 */
+	if (region && region->group) {
+		group = list_first_entry(&ctx->flush_groups,
+					 struct netfs_flush_group, group_link);
+		if (region->group != group) {
+			kdebug("flush prereq");
+			region = netfs_select_from_flush_group(wbc, ctx, group);
+			if (IS_ERR(region)) {
+				ret = PTR_ERR(region);
+				goto unlock;
+			}
+			advance = false;
+		}
 	}
+
+	if (region)
+		netfs_get_dirty_region(ctx, region, netfs_region_trace_get_wback);
+
 	spin_unlock(&ctx->dirty_lock);
 	if (!region) {
 		_debug("scan failed");
@@ -888,12 +1047,14 @@ static int netfs_select_dirty(struct netfs_io_request *wreq,
 	 */
 	if (*_first < region->first)
 		*_first = region->first;
+
 	ret = netfs_find_writeback_start(wreq, wbc, region, _first, last);
 	if (ret <= 0)
-		goto unlock;
+		goto put_region;
 
 	netfs_extend_writeback(wreq, wbc, ctx, region);
-	*_first = wreq->last + 1;
+	if (advance)
+		*_first = wreq->last + 1;
 
 	netfs_split_out_regions(wreq, ctx, region);
 
@@ -903,6 +1064,8 @@ static int netfs_select_dirty(struct netfs_io_request *wreq,
 	netfs_add_wback_to_list(ctx, wreq);
 	ret = 1;
 
+put_region:
+	netfs_put_dirty_region(ctx, region, netfs_region_trace_put_wback);
 unlock:
 	mutex_unlock(&ctx->wb_mutex);
 	_leave(" = %d [%lx]", ret, *_first);
@@ -946,6 +1109,7 @@ static int netfs_flush_range(struct address_space *mapping,
 	ret = netfs_select_dirty(wreq, wbc, ctx, _first, last);
 	switch (ret) {
 	case -EAGAIN:
+		kdebug("retry");
 		goto retry;
 	default:
 		goto out_unlocked;
diff --git a/fs/netfs/buffered_write.c b/fs/netfs/buffered_write.c
index e624934..01882af 100644
--- a/fs/netfs/buffered_write.c
+++ b/fs/netfs/buffered_write.c
@@ -100,6 +100,10 @@ bool netfs_are_regions_mergeable(struct netfs_inode *ctx,
 	if (b->from > a->to &&
 	    b->from < ctx->zero_point)
 		return false;
+	if (b->group != a->group) {
+		kdebug("different groups %px %px", b->group, a->group);
+		return false;
+	}
 	if (ctx->ops->are_regions_mergeable)
 		return ctx->ops->are_regions_mergeable(ctx, a, b);
 	return true;
@@ -113,6 +117,19 @@ static bool netfs_can_merge(struct netfs_inode *ctx,
 	return netfs_are_regions_mergeable(ctx, onto, x);
 }
 
+static void netfs_region_absorbed(struct netfs_inode *ctx,
+				  struct netfs_dirty_region *into,
+				  struct netfs_dirty_region *absorbed,
+				  struct list_head *discards,
+				  enum netfs_dirty_trace why)
+{
+	absorbed->absorbed_by =
+		netfs_get_dirty_region(ctx, into, netfs_region_trace_get_absorbed_by);
+	list_del_init(&absorbed->flush_link);
+	list_move(&absorbed->dirty_link, discards);
+	trace_netfs_dirty(ctx, into, absorbed, why);
+}
+
 /*
  * See if the extended target region bridges to the next region.  Returns true.
  */
@@ -134,8 +151,8 @@ static bool netfs_try_bridge_next(struct netfs_inode *ctx,
 	if (netfs_are_regions_mergeable(ctx, target, next)) {
 		target->to = next->to;
 		target->last = next->last;
-		list_move(&next->dirty_link, discards);
-		trace_netfs_dirty(ctx, target, next, netfs_dirty_trace_bridged);
+		netfs_region_absorbed(ctx, target, next, discards,
+				      netfs_dirty_trace_bridged);
 		goto out;
 	}
 
@@ -148,9 +165,8 @@ static bool netfs_try_bridge_next(struct netfs_inode *ctx,
 
 	if (target->last >= next->last) {
 		/* Next entry is superseded in its entirety. */
-		list_move(&next->dirty_link, discards);
-		trace_netfs_dirty(ctx, target, next,
-				  netfs_dirty_trace_supersede_all);
+		netfs_region_absorbed(ctx, target, next, discards,
+				      netfs_dirty_trace_supersede_all);
 		if (target->last > next->last)
 			goto again;
 		goto out;
@@ -173,7 +189,8 @@ static bool netfs_continue_modification(struct netfs_inode *ctx,
 					struct list_head *discards)
 {
 	if (proposal->from != target->to ||
-	    proposal->type != target->type)
+	    proposal->type != target->type ||
+	    proposal->group != target->group)
 		return false;
 	if (proposal->type != NETFS_COPY_TO_CACHE &&
 	    ctx->ops->are_regions_mergeable &&
@@ -227,6 +244,71 @@ static bool netfs_merge_with_next(struct netfs_inode *ctx,
 }
 
 /*
+ * Set the flush group on a dirty region.
+ */
+static void netfs_set_flush_group(struct netfs_inode *ctx,
+				  struct netfs_dirty_region *insertion,
+				  struct netfs_dirty_region *insert_point,
+				  enum netfs_dirty_trace how)
+{
+	struct netfs_dirty_region *r;
+	struct netfs_flush_group *group;
+	struct list_head *p;
+
+	if (list_empty(&ctx->flush_groups)) {
+		insertion->group = NULL;
+		return;
+	}
+
+	group = list_last_entry(&ctx->flush_groups,
+				struct netfs_flush_group, group_link);
+
+	insertion->group = netfs_get_flush_group(group);
+	atomic_inc(&group->nr_regions);
+
+	switch (how) {
+	case netfs_dirty_trace_insert_only:
+		smp_mb();
+		list_add_tail(&insertion->flush_link, &group->region_list);
+		return;
+
+	case netfs_dirty_trace_insert_before:
+	case netfs_dirty_trace_supersede_front:
+		smp_mb();
+		if (group == insert_point->group) {
+			list_add_tail(&insertion->flush_link,
+				      &insert_point->flush_link);
+			return;
+		}
+		break;
+
+	case netfs_dirty_trace_insert_after:
+	case netfs_dirty_trace_supersede_back:
+		smp_mb();
+		if (group == insert_point->group) {
+			list_add(&insertion->flush_link,
+				 &insert_point->flush_link);
+			return;
+		}
+		break;
+
+	default:
+		BUG_ON(1);
+	}
+
+	/* We need to search through the flush group's region list and
+	 * insert into the right place.
+	 */
+	list_for_each(p, &group->region_list) {
+		r = list_entry(p, struct netfs_dirty_region, flush_link);
+		if (r->from > insertion->from)
+			break;
+	}
+
+	list_add_tail(&insertion->flush_link, p);
+}
+
+/*
  * Insert a new region at the specified point, initialising it from the
  * proposed region.
  */
@@ -243,6 +325,8 @@ static void netfs_insert_new(struct netfs_inode *ctx,
 	insertion->to    = proposal->to;
 	insertion->type  = proposal->type;
 	netfs_init_dirty_region(ctx, insertion, file);
+	netfs_set_flush_group(ctx, insertion, insert_point, how);
+
 	switch (how) {
 	case netfs_dirty_trace_insert_only:
 		list_add_tail(&insertion->dirty_link, &ctx->dirty_regions);
@@ -280,6 +364,7 @@ void netfs_split_off_front(struct netfs_inode *ctx,
 
 	front->debug_id = atomic_inc_return(&netfs_region_debug_ids);
 	front->type	= back->type;
+	front->group	= netfs_get_flush_group(back->group);
 	front->first	= back->first;
 	front->last	= front_last;
 	back->first	= front->last + 1;
@@ -293,6 +378,10 @@ void netfs_split_off_front(struct netfs_inode *ctx,
 
 	list_move_tail(&front->dirty_link, &back->dirty_link);
 	list_add(&front->proc_link,  &back->proc_link);
+	if (front->group) {
+		atomic_inc(&front->group->nr_regions);
+		list_add_tail(&front->flush_link, &back->flush_link);
+	}
 
 	trace_netfs_dirty(ctx, front, back, why);
 }
@@ -323,20 +412,20 @@ static void netfs_supersede_cache_copy(struct netfs_inode *ctx,
 		if (merge_prev && !merge_next) {
 			prev->to   = proposal->from;
 			prev->last = proposal->last;
-			list_move_tail(&target->dirty_link, discards);
-			trace_netfs_dirty(ctx, prev, target, netfs_dirty_trace_merged_prev_super);
+			netfs_region_absorbed(ctx, prev, target, discards,
+					      netfs_dirty_trace_merged_prev_super);
 		} else if (merge_next && !merge_prev) {
 			next->from  = proposal->from;
 			next->first = proposal->first;
-			list_move_tail(&target->dirty_link, discards);
-			trace_netfs_dirty(ctx, prev, target, netfs_dirty_trace_merged_next_super);
+			netfs_region_absorbed(ctx, next, target, discards,
+					      netfs_dirty_trace_merged_next_super);
 		} else if (merge_next && merge_prev) {
 			prev->to   = next->to;
 			prev->last = next->last;
-			list_move_tail(&target->dirty_link, discards);
-			trace_netfs_dirty(ctx, prev, target, netfs_dirty_trace_merged_next_super);
-			list_move_tail(&next->dirty_link, discards);
-			trace_netfs_dirty(ctx, prev, next, netfs_dirty_trace_merged_next);
+			netfs_region_absorbed(ctx, prev, target, discards,
+					      netfs_dirty_trace_merged_next_super);
+			netfs_region_absorbed(ctx, prev, next, discards,
+					      netfs_dirty_trace_merged_next);
 		} else if (!merge_prev && !merge_next) {
 			target->from = proposal->from;
 			target->to   = proposal->to;
@@ -453,6 +542,9 @@ static void netfs_commit_region(struct netfs_inode *ctx, struct file *file,
 
 	spin_lock(&ctx->dirty_lock);
 
+	if (!list_empty(&ctx->flush_groups))
+		proposal->group = list_last_entry(&ctx->flush_groups,
+						  struct netfs_flush_group, group_link);
 	target = netfs_find_region(ctx, proposal->first, proposal->last);
 
 	/* If there aren't any other regions, just insert and be done. */
@@ -604,6 +696,7 @@ void netfs_discard_regions(struct netfs_inode *ctx,
 	while ((p = list_first_entry_or_null(discards,
 					     struct netfs_dirty_region, dirty_link))) {
 		list_del(&p->dirty_link);
+		BUG_ON(!list_empty(&p->flush_link));
 		netfs_put_dirty_region(ctx, p, why);
 	}
 }
@@ -840,6 +933,14 @@ ssize_t netfs_file_write_iter_locked(struct kiocb *iocb, struct iov_iter *from)
 	if (ret)
 		goto error;
 
+	{
+#warning TRIGGER NEW FLUSH GROUP FOR TESTING
+		static atomic_t jump;
+		ret = netfs_require_flush_group(inode, (atomic_inc_return(&jump) & 3) == 3);
+		if (ret < 0)
+			goto error;
+	}
+
 	ret = netfs_flush_conflicting_writes(ctx, file, iocb->ki_pos,
 					     iov_iter_count(from), NULL);
 	if (ret < 0 && ret != -EAGAIN)
diff --git a/fs/netfs/internal.h b/fs/netfs/internal.h
index 9dd949c..b84f949 100644
--- a/fs/netfs/internal.h
+++ b/fs/netfs/internal.h
@@ -18,6 +18,7 @@
 /*
  * buffered_flush.c
  */
+int netfs_require_flush_group(struct inode *inode, bool force);
 void netfs_check_dirty_list(char c, const struct list_head *list,
 			    const struct netfs_dirty_region *star);
 int netfs_flush_conflicting_writes(struct netfs_inode *ctx, struct file *file,
@@ -153,10 +154,11 @@ struct netfs_dirty_region *netfs_alloc_dirty_region(gfp_t gfp);
 struct netfs_dirty_region *netfs_get_dirty_region(struct netfs_inode *ctx,
 						  struct netfs_dirty_region *region,
 						  enum netfs_region_trace what);
-void netfs_free_dirty_region(struct netfs_inode *ctx, struct netfs_dirty_region *region);
 void netfs_put_dirty_region(struct netfs_inode *ctx,
 			    struct netfs_dirty_region *region,
 			    enum netfs_region_trace what);
+struct netfs_flush_group *netfs_get_flush_group(struct netfs_flush_group *group);
+void netfs_put_flush_group(struct netfs_inode *ctx, struct netfs_flush_group *group);
 
 static inline void netfs_see_request(struct netfs_io_request *rreq,
 				     enum netfs_rreq_ref_trace what)
@@ -196,6 +198,7 @@ extern atomic_t netfs_n_wh_upload_failed;
 extern atomic_t netfs_n_wh_write;
 extern atomic_t netfs_n_wh_write_done;
 extern atomic_t netfs_n_wh_write_failed;
+extern atomic_t netfs_n_wh_flush_group;
 
 
 static inline void netfs_stat(atomic_t *stat)
diff --git a/fs/netfs/misc.c b/fs/netfs/misc.c
index cf25fe9..0d2af94 100644
--- a/fs/netfs/misc.c
+++ b/fs/netfs/misc.c
@@ -244,6 +244,7 @@ int netfs_wait_for_credit(struct writeback_control *wbc)
 void netfs_clear_inode(struct netfs_inode *ctx)
 {
 	struct netfs_dirty_region *region;
+	struct netfs_flush_group *group;
 
 	trace_netfs_clear_inode(ctx);
 
@@ -254,6 +255,13 @@ void netfs_clear_inode(struct netfs_inode *ctx)
 		netfs_put_dirty_region(ctx, region, netfs_region_trace_put_clear);
 	}
 
+	while ((group = list_first_entry_or_null(&ctx->flush_groups,
+						 struct netfs_flush_group,
+						 group_link))) {
+		list_del_init(&group->group_link);
+		netfs_put_flush_group(ctx, group);
+	}
+
 	clear_inode(&ctx->inode);
 }
 EXPORT_SYMBOL(netfs_clear_inode);
diff --git a/fs/netfs/objects.c b/fs/netfs/objects.c
index f2a48a5..e253488 100644
--- a/fs/netfs/objects.c
+++ b/fs/netfs/objects.c
@@ -5,6 +5,7 @@
  * Written by David Howells (dhowells@redhat.com)
  */
 
+#include <linux/export.h>
 #include <linux/slab.h>
 #include "internal.h"
 
@@ -262,17 +263,31 @@ struct netfs_dirty_region *netfs_get_dirty_region(struct netfs_inode *ctx,
 	return region;
 }
 
-void netfs_free_dirty_region(struct netfs_inode *ctx,
-			     struct netfs_dirty_region *region)
+static void netfs_free_dirty_region(struct netfs_inode *ctx,
+				    struct netfs_dirty_region *region,
+				    enum netfs_region_trace what)
 {
+	struct netfs_dirty_region *absorbed_by;
+
 	if (region) {
 		trace_netfs_ref_region(region->debug_id, 0, netfs_region_trace_free);
 		if (!list_empty(&region->proc_link))
 			netfs_proc_del_region(region);
 		if (ctx->ops->free_dirty_region)
 			ctx->ops->free_dirty_region(region);
+		BUG_ON(!list_empty(&region->flush_link));
+		if (region->group) {
+			int nr = atomic_dec_return(&region->group->nr_regions);
+
+			if (nr == 0)
+				wake_up_var(&region->group->nr_regions);
+			netfs_put_flush_group(ctx, region->group);
+		}
 		netfs_stat_d(&netfs_n_wh_region);
+		absorbed_by = region->absorbed_by;
 		kfree(region);
+		netfs_put_dirty_region(ctx, absorbed_by,
+				       netfs_region_trace_put_absorbed_by);
 	}
 }
 
@@ -289,6 +304,68 @@ void netfs_put_dirty_region(struct netfs_inode *ctx,
 	trace_netfs_ref_region(region->debug_id, ref - 1, what);
 	if (dead) {
 		netfs_return_write_credit(region);
-		netfs_free_dirty_region(ctx, region);
+		netfs_free_dirty_region(ctx, region, what);
+	}
+}
+
+/**
+ * netfs_new_flush_group - Create a new write flush group
+ * @inode: The inode for which this is a flush group.
+ * @netfs_priv: Netfs private data to include in the new group
+ *
+ * Create a new flush group and add it to the top of the inode's group list.
+ * Flush groups are used to control the order in which dirty data is written
+ * back to the server.
+ */
+struct netfs_flush_group *netfs_new_flush_group(struct inode *inode, void *netfs_priv)
+{
+	struct netfs_flush_group *group, *prev;
+	struct netfs_inode *ctx = netfs_inode(inode);
+
+	group = kzalloc(sizeof(*group), GFP_KERNEL);
+	if (group) {
+		group->netfs_priv = netfs_priv;
+		INIT_LIST_HEAD(&group->region_list);
+		refcount_set(&group->ref, 1);
+		netfs_stat(&netfs_n_wh_flush_group);
+
+		spin_lock(&ctx->dirty_lock);
+		group->flush_id = ++ctx->flush_counter;
+
+		/* We drop the region count on the old top group so that
+		 * writeback can get rid of it.
+		 */
+		if (!list_empty(&ctx->flush_groups)) {
+			prev = list_last_entry(&ctx->flush_groups,
+					       struct netfs_flush_group, group_link);
+			if (atomic_dec_and_test(&prev->nr_regions))
+				wake_up_var(&prev->nr_regions);
+		}
+
+		/* We keep the region count elevated on the new group to
+		 * prevent wakeups whilst this is the top group.
+		 */
+		atomic_set(&group->nr_regions, 1);
+		list_add_tail(&group->group_link, &ctx->flush_groups);
+
+		spin_unlock(&ctx->dirty_lock);
+	}
+	return group;
+}
+EXPORT_SYMBOL(netfs_new_flush_group);
+
+struct netfs_flush_group *netfs_get_flush_group(struct netfs_flush_group *group)
+{
+	refcount_inc(&group->ref);
+	return group;
+}
+
+void netfs_put_flush_group(struct netfs_inode *ctx, struct netfs_flush_group *group)
+{
+	if (group && refcount_dec_and_test(&group->ref)) {
+		netfs_stat_d(&netfs_n_wh_flush_group);
+		if (ctx->ops->free_flush_group)
+			ctx->ops->free_flush_group(ctx, group);
+		kfree(group);
 	}
 }
diff --git a/fs/netfs/stats.c b/fs/netfs/stats.c
index 7c0a98a..75d6eb6 100644
--- a/fs/netfs/stats.c
+++ b/fs/netfs/stats.c
@@ -32,6 +32,7 @@ atomic_t netfs_n_wh_upload_failed;
 atomic_t netfs_n_wh_write;
 atomic_t netfs_n_wh_write_done;
 atomic_t netfs_n_wh_write_failed;
+atomic_t netfs_n_wh_flush_group;
 
 void netfs_stats_show(struct seq_file *m)
 {
@@ -56,8 +57,9 @@ void netfs_stats_show(struct seq_file *m)
 		   atomic_read(&netfs_n_rh_read),
 		   atomic_read(&netfs_n_rh_read_done),
 		   atomic_read(&netfs_n_rh_read_failed));
-	seq_printf(m, "WrHelp : R=%u\n",
-		   atomic_read(&netfs_n_wh_region));
+	seq_printf(m, "WrHelp : R=%u F=%u\n",
+		   atomic_read(&netfs_n_wh_region),
+		   atomic_read(&netfs_n_wh_flush_group));
 	seq_printf(m, "WrHelp : UL=%u us=%u uf=%u\n",
 		   atomic_read(&netfs_n_wh_upload),
 		   atomic_read(&netfs_n_wh_upload_done),
diff --git a/include/linux/netfs.h b/include/linux/netfs.h
index c7b511d..272fdfa 100644
--- a/include/linux/netfs.h
+++ b/include/linux/netfs.h
@@ -136,6 +136,7 @@ struct netfs_inode {
 	struct inode		inode;		/* The VFS inode */
 	const struct netfs_request_ops *ops;
 	struct mutex		wb_mutex;	/* Mutex controlling writeback setup */
+	struct list_head	flush_groups;	/* FIFO of flushable groups */
 	struct list_head	writebacks;	/* List of writebacks in progress */
 	struct list_head	dirty_regions;	/* List of dirty regions in the pagecache */
 	spinlock_t		dirty_lock;	/* Lock for dirty_regions & writebacks */
@@ -149,6 +150,7 @@ struct netfs_inode {
 #define NETFS_ICTX_ENCRYPTED	0		/* The file contents are encrypted */
 #define NETFS_ICTX_DO_RMW	1		/* Set if RMW required (no write streaming) */
 #define NETFS_ICTX_ODIRECT	2		/* Set if inode in direct I/O mode */
+	unsigned int		flush_counter;	/* Flush group ID counter */
 	unsigned char		min_bshift;	/* log2 min block size for bounding box or 0 */
 	unsigned char		obj_bshift;	/* log2 storage object shift (ceph/pnfs) or 0 */
 	unsigned char		crypto_bshift;	/* log2 of crypto block size */
@@ -327,6 +329,9 @@ enum netfs_region_type {
 struct netfs_dirty_region {
 	struct list_head	dirty_link;	/* Link in netfs_inode::dirty_regions */
 	struct list_head	proc_link;	/* Link in /proc/fs/netfs/regions */
+	struct list_head	flush_link;	/* Link in netfs_io_request::regions */
+	struct netfs_flush_group *group;	/* Flush group this region is part of */
+	struct netfs_dirty_region *absorbed_by;	/* Region that superseded/absorbed this one */
 	void			*netfs_priv;	/* Private data for the netfs */
 	size_t			credit;		/* Amount of credit used */
 	pgoff_t			first;		/* First page index in region */
@@ -339,6 +344,26 @@ struct netfs_dirty_region {
 };
 
 /*
+ * Descriptor for a set of writes that will need to be flushed together.
+ *
+ * These are maintained as a FIFO.  The frontmost group in the FIFO is the only
+ * one that can be written from; the rearmost group in the FIFO is the only one
+ * that can be modified.
+ *
+ * When a prospective write collides with a dirty region in an earlier group,
+ * that group and all those in front of it have to be written out, in order,
+ * before the modification can take place.
+ */
+struct netfs_flush_group {
+	struct list_head	group_link;	/* Link in netfs_inode::flush_groups */
+	struct list_head	region_list;	/* List of regions in this group */
+	void			*netfs_priv;
+	refcount_t		ref;
+	atomic_t		nr_regions;	/* Number of regions in the group */
+	unsigned int		flush_id;
+};
+
+/*
  * Operations the network filesystem can/must provide to the helpers.
  */
 struct netfs_request_ops {
@@ -385,6 +410,10 @@ struct netfs_request_ops {
 	bool (*is_write_compatible)(struct netfs_inode *ctx,
 				    struct file *file,
 				    const struct netfs_dirty_region *front);
+
+	/* Flush group handling */
+	void (*free_flush_group)(struct netfs_inode *ctx,
+				 struct netfs_flush_group *group);
 };
 
 /*
@@ -476,6 +505,7 @@ extern struct netfs_io_request *netfs_prepare_to_truncate(struct dentry *dentry,
 							  struct iattr *attr);
 extern void netfs_truncate(struct netfs_io_request *treq);
 extern void netfs_clear_inode(struct netfs_inode *ctx);
+extern struct netfs_flush_group *netfs_new_flush_group(struct inode *, void *);
 
 /**
  * netfs_inode - Get the netfs inode context from the inode
@@ -503,6 +533,7 @@ static inline void netfs_inode_init(struct netfs_inode *ctx,
 	ctx->ops = ops;
 	ctx->remote_i_size = i_size_read(&ctx->inode);
 	ctx->zero_point = ctx->remote_i_size;
+	INIT_LIST_HEAD(&ctx->flush_groups);
 	INIT_LIST_HEAD(&ctx->writebacks);
 	INIT_LIST_HEAD(&ctx->dirty_regions);
 	spin_lock_init(&ctx->dirty_lock);
diff --git a/include/trace/events/netfs.h b/include/trace/events/netfs.h
index 04281ef..5e3396d 100644
--- a/include/trace/events/netfs.h
+++ b/include/trace/events/netfs.h
@@ -113,11 +113,14 @@
 
 #define netfs_region_traces					\
 	EM(netfs_region_trace_free,		"FREE       ")	\
+	EM(netfs_region_trace_get_absorbed_by,	"GET ABSB-BY")	\
 	EM(netfs_region_trace_get_wback,	"GET WBACK  ")	\
 	EM(netfs_region_trace_new,		"NEW        ")	\
+	EM(netfs_region_trace_put_absorbed_by,	"PUT ABSB-BY")	\
 	EM(netfs_region_trace_put_clear,	"PUT CLEAR  ")	\
 	EM(netfs_region_trace_put_merged,	"PUT MERGED ")	\
 	EM(netfs_region_trace_put_truncated,	"PUT TRUNC  ")	\
+	EM(netfs_region_trace_put_wback,	"PUT WBACK  ")	\
 	E_(netfs_region_trace_put_written,	"PUT WRITTEN")
 
 #define netfs_dirty_traces					\
@@ -521,6 +524,7 @@ TRACE_EVENT(netfs_dirty,
 		    __field(unsigned int,		debug_id	)
 		    __field(unsigned int,		debug_id2	)
 		    __field(unsigned int,		ref		)
+		    __field(unsigned int,		flush_id	)
 		    __field(enum netfs_dirty_trace,	why		)
 			     ),
 
@@ -531,17 +535,19 @@ TRACE_EVENT(netfs_dirty,
 		    __entry->last	= region->last;
 		    __entry->from	= region->from;
 		    __entry->to		= region->to;
+		    __entry->flush_id	= region->group ? region->group->flush_id : 0;
 		    __entry->debug_id	= region->debug_id;
 		    __entry->debug_id2	= region2 ? region2->debug_id : 0;
 			   ),
 
-	    TP_printk("i=%lx D=%x %s pg=%04lx-%04lx dt=%llx-%llx XD=%x",
+	    TP_printk("i=%lx D=%x %s pg=%04lx-%04lx dt=%llx-%llx F=%x XD=%x",
 		      __entry->ino, __entry->debug_id,
 		      __print_symbolic(__entry->why, netfs_dirty_traces),
 		      __entry->first,
 		      __entry->last,
 		      __entry->from,
 		      __entry->to - 1,
+		      __entry->flush_id,
 		      __entry->debug_id2
 		      )
 	    );