Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

Pull Ceph updates from Sage Weil:
 "There is quite a bit here, including some overdue refactoring and
  cleanup on the mon_client and osd_client code from Ilya, scattered
  writeback support for CephFS and a pile of bug fixes from Zheng, and a
  few random cleanups and fixes from others"

[ I already decided not to pull this because of it having been rebased
  recently, but ended up changing my mind after all.  Next time I'll
  really hold people to it.  Oh well.   - Linus ]

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (34 commits)
  libceph: use KMEM_CACHE macro
  ceph: use kmem_cache_zalloc
  rbd: use KMEM_CACHE macro
  ceph: use lookup request to revalidate dentry
  ceph: kill ceph_get_dentry_parent_inode()
  ceph: fix security xattr deadlock
  ceph: don't request vxattrs from MDS
  ceph: fix mounting same fs multiple times
  ceph: remove unnecessary NULL check
  ceph: avoid updating directory inode's i_size accidentally
  ceph: fix race during filling readdir cache
  libceph: use sizeof_footer() more
  ceph: kill ceph_empty_snapc
  ceph: fix a wrong comparison
  ceph: replace CURRENT_TIME by current_fs_time()
  ceph: scattered page writeback
  libceph: add helper that duplicates last extent operation
  libceph: enable large, variable-sized OSD requests
  libceph: osdc->req_mempool should be backed by a slab pool
  libceph: make r_request msg_size calculation clearer
  ...
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 4a87678..9c62344 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -1847,14 +1847,12 @@
 	if (osd_req->r_result < 0)
 		obj_request->result = osd_req->r_result;
 
-	rbd_assert(osd_req->r_num_ops <= CEPH_OSD_MAX_OP);
-
 	/*
 	 * We support a 64-bit length, but ultimately it has to be
 	 * passed to the block layer, which just supports a 32-bit
 	 * length field.
 	 */
-	obj_request->xferred = osd_req->r_reply_op_len[0];
+	obj_request->xferred = osd_req->r_ops[0].outdata_len;
 	rbd_assert(obj_request->xferred < (u64)UINT_MAX);
 
 	opcode = osd_req->r_ops[0].op;
@@ -5643,18 +5641,12 @@
 static int rbd_slab_init(void)
 {
 	rbd_assert(!rbd_img_request_cache);
-	rbd_img_request_cache = kmem_cache_create("rbd_img_request",
-					sizeof (struct rbd_img_request),
-					__alignof__(struct rbd_img_request),
-					0, NULL);
+	rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
 	if (!rbd_img_request_cache)
 		return -ENOMEM;
 
 	rbd_assert(!rbd_obj_request_cache);
-	rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
-					sizeof (struct rbd_obj_request),
-					__alignof__(struct rbd_obj_request),
-					0, NULL);
+	rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
 	if (!rbd_obj_request_cache)
 		goto out_err;
 
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 19adeb0..fc5cae2 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -175,8 +175,8 @@
 
 static int ceph_releasepage(struct page *page, gfp_t g)
 {
-	struct inode *inode = page->mapping ? page->mapping->host : NULL;
-	dout("%p releasepage %p idx %lu\n", inode, page, page->index);
+	dout("%p releasepage %p idx %lu\n", page->mapping->host,
+	     page, page->index);
 	WARN_ON(PageDirty(page));
 
 	/* Can we release the page from the cache? */
@@ -276,7 +276,7 @@
 	for (i = 0; i < num_pages; i++) {
 		struct page *page = osd_data->pages[i];
 
-		if (rc < 0 && rc != ENOENT)
+		if (rc < 0 && rc != -ENOENT)
 			goto unlock;
 		if (bytes < (int)PAGE_CACHE_SIZE) {
 			/* zero (remainder of) page */
@@ -606,71 +606,71 @@
 	struct inode *inode = req->r_inode;
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_osd_data *osd_data;
-	unsigned wrote;
 	struct page *page;
-	int num_pages;
-	int i;
+	int num_pages, total_pages = 0;
+	int i, j;
+	int rc = req->r_result;
 	struct ceph_snap_context *snapc = req->r_snapc;
 	struct address_space *mapping = inode->i_mapping;
-	int rc = req->r_result;
-	u64 bytes = req->r_ops[0].extent.length;
 	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
-	long writeback_stat;
-	unsigned issued = ceph_caps_issued(ci);
+	bool remove_page;
 
-	osd_data = osd_req_op_extent_osd_data(req, 0);
-	BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
-	num_pages = calc_pages_for((u64)osd_data->alignment,
-					(u64)osd_data->length);
-	if (rc >= 0) {
-		/*
-		 * Assume we wrote the pages we originally sent.  The
-		 * osd might reply with fewer pages if our writeback
-		 * raced with a truncation and was adjusted at the osd,
-		 * so don't believe the reply.
-		 */
-		wrote = num_pages;
-	} else {
-		wrote = 0;
+
+	dout("writepages_finish %p rc %d\n", inode, rc);
+	if (rc < 0)
 		mapping_set_error(mapping, rc);
-	}
-	dout("writepages_finish %p rc %d bytes %llu wrote %d (pages)\n",
-	     inode, rc, bytes, wrote);
+
+	/*
+	 * We lost the cache cap, need to truncate the page before
+	 * it is unlocked, otherwise we'd truncate it later in the
+	 * page truncation thread, possibly losing some data that
+	 * raced its way in
+	 */
+	remove_page = !(ceph_caps_issued(ci) &
+			(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO));
 
 	/* clean all pages */
-	for (i = 0; i < num_pages; i++) {
-		page = osd_data->pages[i];
-		BUG_ON(!page);
-		WARN_ON(!PageUptodate(page));
+	for (i = 0; i < req->r_num_ops; i++) {
+		if (req->r_ops[i].op != CEPH_OSD_OP_WRITE)
+			break;
 
-		writeback_stat =
-			atomic_long_dec_return(&fsc->writeback_count);
-		if (writeback_stat <
-		    CONGESTION_OFF_THRESH(fsc->mount_options->congestion_kb))
-			clear_bdi_congested(&fsc->backing_dev_info,
-					    BLK_RW_ASYNC);
+		osd_data = osd_req_op_extent_osd_data(req, i);
+		BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
+		num_pages = calc_pages_for((u64)osd_data->alignment,
+					   (u64)osd_data->length);
+		total_pages += num_pages;
+		for (j = 0; j < num_pages; j++) {
+			page = osd_data->pages[j];
+			BUG_ON(!page);
+			WARN_ON(!PageUptodate(page));
 
-		ceph_put_snap_context(page_snap_context(page));
-		page->private = 0;
-		ClearPagePrivate(page);
-		dout("unlocking %d %p\n", i, page);
-		end_page_writeback(page);
+			if (atomic_long_dec_return(&fsc->writeback_count) <
+			     CONGESTION_OFF_THRESH(
+					fsc->mount_options->congestion_kb))
+				clear_bdi_congested(&fsc->backing_dev_info,
+						    BLK_RW_ASYNC);
 
-		/*
-		 * We lost the cache cap, need to truncate the page before
-		 * it is unlocked, otherwise we'd truncate it later in the
-		 * page truncation thread, possibly losing some data that
-		 * raced its way in
-		 */
-		if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0)
-			generic_error_remove_page(inode->i_mapping, page);
+			ceph_put_snap_context(page_snap_context(page));
+			page->private = 0;
+			ClearPagePrivate(page);
+			dout("unlocking %p\n", page);
+			end_page_writeback(page);
 
-		unlock_page(page);
+			if (remove_page)
+				generic_error_remove_page(inode->i_mapping,
+							  page);
+
+			unlock_page(page);
+		}
+		dout("writepages_finish %p wrote %llu bytes cleaned %d pages\n",
+		     inode, osd_data->length, rc >= 0 ? num_pages : 0);
+
+		ceph_release_pages(osd_data->pages, num_pages);
 	}
-	dout("%p wrote+cleaned %d pages\n", inode, wrote);
-	ceph_put_wrbuffer_cap_refs(ci, num_pages, snapc);
 
-	ceph_release_pages(osd_data->pages, num_pages);
+	ceph_put_wrbuffer_cap_refs(ci, total_pages, snapc);
+
+	osd_data = osd_req_op_extent_osd_data(req, 0);
 	if (osd_data->pages_from_pool)
 		mempool_free(osd_data->pages,
 			     ceph_sb_to_client(inode->i_sb)->wb_pagevec_pool);
@@ -778,17 +778,15 @@
 	while (!done && index <= end) {
 		unsigned i;
 		int first;
-		pgoff_t next;
-		int pvec_pages, locked_pages;
-		struct page **pages = NULL;
+		pgoff_t strip_unit_end = 0;
+		int num_ops = 0, op_idx;
+		int pvec_pages, locked_pages = 0;
+		struct page **pages = NULL, **data_pages;
 		mempool_t *pool = NULL;	/* Becomes non-null if mempool used */
 		struct page *page;
 		int want;
-		u64 offset, len;
-		long writeback_stat;
+		u64 offset = 0, len = 0;
 
-		next = 0;
-		locked_pages = 0;
 		max_pages = max_pages_ever;
 
 get_more_pages:
@@ -824,8 +822,8 @@
 				unlock_page(page);
 				break;
 			}
-			if (next && (page->index != next)) {
-				dout("not consecutive %p\n", page);
+			if (strip_unit_end && (page->index > strip_unit_end)) {
+				dout("end of strip unit %p\n", page);
 				unlock_page(page);
 				break;
 			}
@@ -867,36 +865,31 @@
 			/*
 			 * We have something to write.  If this is
 			 * the first locked page this time through,
-			 * allocate an osd request and a page array
-			 * that it will use.
+			 * calculate max possinle write size and
+			 * allocate a page array
 			 */
 			if (locked_pages == 0) {
-				BUG_ON(pages);
+				u64 objnum;
+				u64 objoff;
+
 				/* prepare async write request */
 				offset = (u64)page_offset(page);
 				len = wsize;
-				req = ceph_osdc_new_request(&fsc->client->osdc,
-							&ci->i_layout, vino,
-							offset, &len, 0,
-							do_sync ? 2 : 1,
-							CEPH_OSD_OP_WRITE,
-							CEPH_OSD_FLAG_WRITE |
-							CEPH_OSD_FLAG_ONDISK,
-							snapc, truncate_seq,
-							truncate_size, true);
-				if (IS_ERR(req)) {
-					rc = PTR_ERR(req);
+
+				rc = ceph_calc_file_object_mapping(&ci->i_layout,
+								offset, len,
+								&objnum, &objoff,
+								&len);
+				if (rc < 0) {
 					unlock_page(page);
 					break;
 				}
 
-				if (do_sync)
-					osd_req_op_init(req, 1,
-							CEPH_OSD_OP_STARTSYNC, 0);
+				num_ops = 1 + do_sync;
+				strip_unit_end = page->index +
+					((len - 1) >> PAGE_CACHE_SHIFT);
 
-				req->r_callback = writepages_finish;
-				req->r_inode = inode;
-
+				BUG_ON(pages);
 				max_pages = calc_pages_for(0, (u64)len);
 				pages = kmalloc(max_pages * sizeof (*pages),
 						GFP_NOFS);
@@ -905,6 +898,20 @@
 					pages = mempool_alloc(pool, GFP_NOFS);
 					BUG_ON(!pages);
 				}
+
+				len = 0;
+			} else if (page->index !=
+				   (offset + len) >> PAGE_CACHE_SHIFT) {
+				if (num_ops >= (pool ?  CEPH_OSD_SLAB_OPS :
+							CEPH_OSD_MAX_OPS)) {
+					redirty_page_for_writepage(wbc, page);
+					unlock_page(page);
+					break;
+				}
+
+				num_ops++;
+				offset = (u64)page_offset(page);
+				len = 0;
 			}
 
 			/* note position of first page in pvec */
@@ -913,18 +920,16 @@
 			dout("%p will write page %p idx %lu\n",
 			     inode, page, page->index);
 
-			writeback_stat =
-			       atomic_long_inc_return(&fsc->writeback_count);
-			if (writeback_stat > CONGESTION_ON_THRESH(
+			if (atomic_long_inc_return(&fsc->writeback_count) >
+			    CONGESTION_ON_THRESH(
 				    fsc->mount_options->congestion_kb)) {
 				set_bdi_congested(&fsc->backing_dev_info,
 						  BLK_RW_ASYNC);
 			}
 
-			set_page_writeback(page);
 			pages[locked_pages] = page;
 			locked_pages++;
-			next = page->index + 1;
+			len += PAGE_CACHE_SIZE;
 		}
 
 		/* did we get anything? */
@@ -944,38 +949,119 @@
 			/* shift unused pages over in the pvec...  we
 			 * will need to release them below. */
 			for (j = i; j < pvec_pages; j++) {
-				dout(" pvec leftover page %p\n",
-				     pvec.pages[j]);
+				dout(" pvec leftover page %p\n", pvec.pages[j]);
 				pvec.pages[j-i+first] = pvec.pages[j];
 			}
 			pvec.nr -= i-first;
 		}
 
-		/* Format the osd request message and submit the write */
+new_request:
 		offset = page_offset(pages[0]);
-		len = (u64)locked_pages << PAGE_CACHE_SHIFT;
-		if (snap_size == -1) {
-			len = min(len, (u64)i_size_read(inode) - offset);
-			 /* writepages_finish() clears writeback pages
-			  * according to the data length, so make sure
-			  * data length covers all locked pages */
-			len = max(len, 1 +
-				((u64)(locked_pages - 1) << PAGE_CACHE_SHIFT));
-		} else {
-			len = min(len, snap_size - offset);
+		len = wsize;
+
+		req = ceph_osdc_new_request(&fsc->client->osdc,
+					&ci->i_layout, vino,
+					offset, &len, 0, num_ops,
+					CEPH_OSD_OP_WRITE,
+					CEPH_OSD_FLAG_WRITE |
+					CEPH_OSD_FLAG_ONDISK,
+					snapc, truncate_seq,
+					truncate_size, false);
+		if (IS_ERR(req)) {
+			req = ceph_osdc_new_request(&fsc->client->osdc,
+						&ci->i_layout, vino,
+						offset, &len, 0,
+						min(num_ops,
+						    CEPH_OSD_SLAB_OPS),
+						CEPH_OSD_OP_WRITE,
+						CEPH_OSD_FLAG_WRITE |
+						CEPH_OSD_FLAG_ONDISK,
+						snapc, truncate_seq,
+						truncate_size, true);
+			BUG_ON(IS_ERR(req));
 		}
-		dout("writepages got %d pages at %llu~%llu\n",
-		     locked_pages, offset, len);
+		BUG_ON(len < page_offset(pages[locked_pages - 1]) +
+			     PAGE_CACHE_SIZE - offset);
 
-		osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0,
+		req->r_callback = writepages_finish;
+		req->r_inode = inode;
+
+		/* Format the osd request message and submit the write */
+		len = 0;
+		data_pages = pages;
+		op_idx = 0;
+		for (i = 0; i < locked_pages; i++) {
+			u64 cur_offset = page_offset(pages[i]);
+			if (offset + len != cur_offset) {
+				if (op_idx + do_sync + 1 == req->r_num_ops)
+					break;
+				osd_req_op_extent_dup_last(req, op_idx,
+							   cur_offset - offset);
+				dout("writepages got pages at %llu~%llu\n",
+				     offset, len);
+				osd_req_op_extent_osd_data_pages(req, op_idx,
+							data_pages, len, 0,
 							!!pool, false);
+				osd_req_op_extent_update(req, op_idx, len);
 
-		pages = NULL;	/* request message now owns the pages array */
+				len = 0;
+				offset = cur_offset; 
+				data_pages = pages + i;
+				op_idx++;
+			}
+
+			set_page_writeback(pages[i]);
+			len += PAGE_CACHE_SIZE;
+		}
+
+		if (snap_size != -1) {
+			len = min(len, snap_size - offset);
+		} else if (i == locked_pages) {
+			/* writepages_finish() clears writeback pages
+			 * according to the data length, so make sure
+			 * data length covers all locked pages */
+			u64 min_len = len + 1 - PAGE_CACHE_SIZE;
+			len = min(len, (u64)i_size_read(inode) - offset);
+			len = max(len, min_len);
+		}
+		dout("writepages got pages at %llu~%llu\n", offset, len);
+
+		osd_req_op_extent_osd_data_pages(req, op_idx, data_pages, len,
+						 0, !!pool, false);
+		osd_req_op_extent_update(req, op_idx, len);
+
+		if (do_sync) {
+			op_idx++;
+			osd_req_op_init(req, op_idx, CEPH_OSD_OP_STARTSYNC, 0);
+		}
+		BUG_ON(op_idx + 1 != req->r_num_ops);
+
 		pool = NULL;
+		if (i < locked_pages) {
+			BUG_ON(num_ops <= req->r_num_ops);
+			num_ops -= req->r_num_ops;
+			num_ops += do_sync;
+			locked_pages -= i;
 
-		/* Update the write op length in case we changed it */
-
-		osd_req_op_extent_update(req, 0, len);
+			/* allocate new pages array for next request */
+			data_pages = pages;
+			pages = kmalloc(locked_pages * sizeof (*pages),
+					GFP_NOFS);
+			if (!pages) {
+				pool = fsc->wb_pagevec_pool;
+				pages = mempool_alloc(pool, GFP_NOFS);
+				BUG_ON(!pages);
+			}
+			memcpy(pages, data_pages + i,
+			       locked_pages * sizeof(*pages));
+			memset(data_pages + i, 0,
+			       locked_pages * sizeof(*pages));
+		} else {
+			BUG_ON(num_ops != req->r_num_ops);
+			index = pages[i - 1]->index + 1;
+			/* request message now owns the pages array */
+			pages = NULL;
+		}
 
 		vino = ceph_vino(inode);
 		ceph_osdc_build_request(req, offset, snapc, vino.snap,
@@ -985,9 +1071,10 @@
 		BUG_ON(rc);
 		req = NULL;
 
-		/* continue? */
-		index = next;
-		wbc->nr_to_write -= locked_pages;
+		wbc->nr_to_write -= i;
+		if (pages)
+			goto new_request;
+
 		if (wbc->nr_to_write <= 0)
 			done = 1;
 
@@ -1522,7 +1609,7 @@
 				    ceph_vino(inode), 0, &len, 0, 1,
 				    CEPH_OSD_OP_CREATE,
 				    CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
-				    ceph_empty_snapc, 0, 0, false);
+				    NULL, 0, 0, false);
 	if (IS_ERR(req)) {
 		err = PTR_ERR(req);
 		goto out;
@@ -1540,9 +1627,8 @@
 				    ceph_vino(inode), 0, &len, 1, 3,
 				    CEPH_OSD_OP_WRITE,
 				    CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
-				    ceph_empty_snapc,
-				    ci->i_truncate_seq, ci->i_truncate_size,
-				    false);
+				    NULL, ci->i_truncate_seq,
+				    ci->i_truncate_size, false);
 	if (IS_ERR(req)) {
 		err = PTR_ERR(req);
 		goto out;
@@ -1663,8 +1749,7 @@
 		goto out;
 	}
 
-	rd_req = ceph_osdc_alloc_request(&fsc->client->osdc,
-					 ceph_empty_snapc,
+	rd_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL,
 					 1, false, GFP_NOFS);
 	if (!rd_req) {
 		err = -ENOMEM;
@@ -1678,8 +1763,7 @@
 		 "%llx.00000000", ci->i_vino.ino);
 	rd_req->r_base_oid.name_len = strlen(rd_req->r_base_oid.name);
 
-	wr_req = ceph_osdc_alloc_request(&fsc->client->osdc,
-					 ceph_empty_snapc,
+	wr_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL,
 					 1, false, GFP_NOFS);
 	if (!wr_req) {
 		err = -ENOMEM;
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 6fe0ad2..de17bb2 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -991,7 +991,7 @@
 			u32 seq, u64 flush_tid, u64 oldest_flush_tid,
 			u32 issue_seq, u32 mseq, u64 size, u64 max_size,
 			struct timespec *mtime, struct timespec *atime,
-			u64 time_warp_seq,
+			struct timespec *ctime, u64 time_warp_seq,
 			kuid_t uid, kgid_t gid, umode_t mode,
 			u64 xattr_version,
 			struct ceph_buffer *xattrs_buf,
@@ -1042,6 +1042,8 @@
 		ceph_encode_timespec(&fc->mtime, mtime);
 	if (atime)
 		ceph_encode_timespec(&fc->atime, atime);
+	if (ctime)
+		ceph_encode_timespec(&fc->ctime, ctime);
 	fc->time_warp_seq = cpu_to_le32(time_warp_seq);
 
 	fc->uid = cpu_to_le32(from_kuid(&init_user_ns, uid));
@@ -1116,7 +1118,7 @@
 	int held, revoking, dropping, keep;
 	u64 seq, issue_seq, mseq, time_warp_seq, follows;
 	u64 size, max_size;
-	struct timespec mtime, atime;
+	struct timespec mtime, atime, ctime;
 	int wake = 0;
 	umode_t mode;
 	kuid_t uid;
@@ -1180,6 +1182,7 @@
 	ci->i_requested_max_size = max_size;
 	mtime = inode->i_mtime;
 	atime = inode->i_atime;
+	ctime = inode->i_ctime;
 	time_warp_seq = ci->i_time_warp_seq;
 	uid = inode->i_uid;
 	gid = inode->i_gid;
@@ -1198,7 +1201,7 @@
 	ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id,
 		op, keep, want, flushing, seq,
 		flush_tid, oldest_flush_tid, issue_seq, mseq,
-		size, max_size, &mtime, &atime, time_warp_seq,
+		size, max_size, &mtime, &atime, &ctime, time_warp_seq,
 		uid, gid, mode, xattr_version, xattr_blob,
 		follows, inline_data);
 	if (ret < 0) {
@@ -1320,7 +1323,7 @@
 			     capsnap->dirty, 0, capsnap->flush_tid, 0,
 			     0, mseq, capsnap->size, 0,
 			     &capsnap->mtime, &capsnap->atime,
-			     capsnap->time_warp_seq,
+			     &capsnap->ctime, capsnap->time_warp_seq,
 			     capsnap->uid, capsnap->gid, capsnap->mode,
 			     capsnap->xattr_version, capsnap->xattr_blob,
 			     capsnap->follows, capsnap->inline_data);
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index fd11fb2..fadc243 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -38,7 +38,7 @@
 	if (dentry->d_fsdata)
 		return 0;
 
-	di = kmem_cache_alloc(ceph_dentry_cachep, GFP_KERNEL | __GFP_ZERO);
+	di = kmem_cache_zalloc(ceph_dentry_cachep, GFP_KERNEL);
 	if (!di)
 		return -ENOMEM;          /* oh well */
 
@@ -68,23 +68,6 @@
 	return 0;
 }
 
-struct inode *ceph_get_dentry_parent_inode(struct dentry *dentry)
-{
-	struct inode *inode = NULL;
-
-	if (!dentry)
-		return NULL;
-
-	spin_lock(&dentry->d_lock);
-	if (!IS_ROOT(dentry)) {
-		inode = d_inode(dentry->d_parent);
-		ihold(inode);
-	}
-	spin_unlock(&dentry->d_lock);
-	return inode;
-}
-
-
 /*
  * for readdir, we encode the directory frag and offset within that
  * frag into f_pos.
@@ -624,6 +607,7 @@
 	struct ceph_mds_client *mdsc = fsc->mdsc;
 	struct ceph_mds_request *req;
 	int op;
+	int mask;
 	int err;
 
 	dout("lookup %p dentry %p '%pd'\n",
@@ -666,8 +650,12 @@
 		return ERR_CAST(req);
 	req->r_dentry = dget(dentry);
 	req->r_num_caps = 2;
-	/* we only need inode linkage */
-	req->r_args.getattr.mask = cpu_to_le32(CEPH_STAT_CAP_INODE);
+
+	mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
+	if (ceph_security_xattr_wanted(dir))
+		mask |= CEPH_CAP_XATTR_SHARED;
+	req->r_args.getattr.mask = cpu_to_le32(mask);
+
 	req->r_locked_dir = dir;
 	err = ceph_mdsc_do_request(mdsc, NULL, req);
 	err = ceph_handle_snapdir(req, dentry, err);
@@ -1095,6 +1083,7 @@
 static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
 {
 	int valid = 0;
+	struct dentry *parent;
 	struct inode *dir;
 
 	if (flags & LOOKUP_RCU)
@@ -1103,7 +1092,8 @@
 	dout("d_revalidate %p '%pd' inode %p offset %lld\n", dentry,
 	     dentry, d_inode(dentry), ceph_dentry(dentry)->offset);
 
-	dir = ceph_get_dentry_parent_inode(dentry);
+	parent = dget_parent(dentry);
+	dir = d_inode(parent);
 
 	/* always trust cached snapped dentries, snapdir dentry */
 	if (ceph_snap(dir) != CEPH_NOSNAP) {
@@ -1121,13 +1111,48 @@
 			valid = 1;
 	}
 
+	if (!valid) {
+		struct ceph_mds_client *mdsc =
+			ceph_sb_to_client(dir->i_sb)->mdsc;
+		struct ceph_mds_request *req;
+		int op, mask, err;
+
+		op = ceph_snap(dir) == CEPH_SNAPDIR ?
+			CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP;
+		req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS);
+		if (!IS_ERR(req)) {
+			req->r_dentry = dget(dentry);
+			req->r_num_caps = 2;
+
+			mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
+			if (ceph_security_xattr_wanted(dir))
+				mask |= CEPH_CAP_XATTR_SHARED;
+			req->r_args.getattr.mask = mask;
+
+			req->r_locked_dir = dir;
+			err = ceph_mdsc_do_request(mdsc, NULL, req);
+			if (err == 0 || err == -ENOENT) {
+				if (dentry == req->r_dentry) {
+					valid = !d_unhashed(dentry);
+				} else {
+					d_invalidate(req->r_dentry);
+					err = -EAGAIN;
+				}
+			}
+			ceph_mdsc_put_request(req);
+			dout("d_revalidate %p lookup result=%d\n",
+			     dentry, err);
+		}
+	}
+
 	dout("d_revalidate %p %s\n", dentry, valid ? "valid" : "invalid");
 	if (valid) {
 		ceph_dentry_lru_touch(dentry);
 	} else {
 		ceph_dir_clear_complete(dir);
 	}
-	iput(dir);
+
+	dput(parent);
 	return valid;
 }
 
diff --git a/fs/ceph/export.c b/fs/ceph/export.c
index 3b31723..6e72c98 100644
--- a/fs/ceph/export.c
+++ b/fs/ceph/export.c
@@ -71,12 +71,18 @@
 	inode = ceph_find_inode(sb, vino);
 	if (!inode) {
 		struct ceph_mds_request *req;
+		int mask;
 
 		req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LOOKUPINO,
 					       USE_ANY_MDS);
 		if (IS_ERR(req))
 			return ERR_CAST(req);
 
+		mask = CEPH_STAT_CAP_INODE;
+		if (ceph_security_xattr_wanted(d_inode(sb->s_root)))
+			mask |= CEPH_CAP_XATTR_SHARED;
+		req->r_args.getattr.mask = cpu_to_le32(mask);
+
 		req->r_ino1 = vino;
 		req->r_num_caps = 1;
 		err = ceph_mdsc_do_request(mdsc, NULL, req);
@@ -128,6 +134,7 @@
 	struct ceph_mds_request *req;
 	struct inode *inode;
 	struct dentry *dentry;
+	int mask;
 	int err;
 
 	req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LOOKUPPARENT,
@@ -144,6 +151,12 @@
 			.snap = CEPH_NOSNAP,
 		};
 	}
+
+	mask = CEPH_STAT_CAP_INODE;
+	if (ceph_security_xattr_wanted(d_inode(sb->s_root)))
+		mask |= CEPH_CAP_XATTR_SHARED;
+	req->r_args.getattr.mask = cpu_to_le32(mask);
+
 	req->r_num_caps = 1;
 	err = ceph_mdsc_do_request(mdsc, NULL, req);
 	inode = req->r_target_inode;
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index eb9028e8..ef38f01 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -157,7 +157,7 @@
 	case S_IFDIR:
 		dout("init_file %p %p 0%o (regular)\n", inode, file,
 		     inode->i_mode);
-		cf = kmem_cache_alloc(ceph_file_cachep, GFP_KERNEL | __GFP_ZERO);
+		cf = kmem_cache_zalloc(ceph_file_cachep, GFP_KERNEL);
 		if (cf == NULL) {
 			ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
 			return -ENOMEM;
@@ -300,6 +300,7 @@
 	struct ceph_mds_request *req;
 	struct dentry *dn;
 	struct ceph_acls_info acls = {};
+       int mask;
 	int err;
 
 	dout("atomic_open %p dentry %p '%pd' %s flags %d mode 0%o\n",
@@ -335,6 +336,12 @@
 			acls.pagelist = NULL;
 		}
 	}
+
+       mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
+       if (ceph_security_xattr_wanted(dir))
+               mask |= CEPH_CAP_XATTR_SHARED;
+       req->r_args.open.mask = cpu_to_le32(mask);
+
 	req->r_locked_dir = dir;           /* caller holds dir->i_mutex */
 	err = ceph_mdsc_do_request(mdsc,
 				   (flags & (O_CREAT|O_TRUNC)) ? dir : NULL,
@@ -725,7 +732,6 @@
 	ret = ceph_osdc_start_request(req->r_osdc, req, false);
 out:
 	if (ret < 0) {
-		BUG_ON(ret == -EOLDSNAPC);
 		req->r_result = ret;
 		ceph_aio_complete_req(req, NULL);
 	}
@@ -783,7 +789,7 @@
 	int num_pages = 0;
 	int flags;
 	int ret;
-	struct timespec mtime = CURRENT_TIME;
+	struct timespec mtime = current_fs_time(inode->i_sb);
 	size_t count = iov_iter_count(iter);
 	loff_t pos = iocb->ki_pos;
 	bool write = iov_iter_rw(iter) == WRITE;
@@ -949,7 +955,6 @@
 				ret = ceph_osdc_start_request(req->r_osdc,
 							      req, false);
 			if (ret < 0) {
-				BUG_ON(ret == -EOLDSNAPC);
 				req->r_result = ret;
 				ceph_aio_complete_req(req, NULL);
 			}
@@ -988,7 +993,7 @@
 	int flags;
 	int check_caps = 0;
 	int ret;
-	struct timespec mtime = CURRENT_TIME;
+	struct timespec mtime = current_fs_time(inode->i_sb);
 	size_t count = iov_iter_count(from);
 
 	if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index e48fd8b..ed58b16 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -549,6 +549,10 @@
 	if (ceph_seq_cmp(truncate_seq, ci->i_truncate_seq) > 0 ||
 	    (truncate_seq == ci->i_truncate_seq && size > inode->i_size)) {
 		dout("size %lld -> %llu\n", inode->i_size, size);
+		if (size > 0 && S_ISDIR(inode->i_mode)) {
+			pr_err("fill_file_size non-zero size for directory\n");
+			size = 0;
+		}
 		i_size_write(inode, size);
 		inode->i_blocks = (size + (1<<9) - 1) >> 9;
 		ci->i_reported_size = size;
@@ -1261,6 +1265,7 @@
 			dout(" %p links to %p %llx.%llx, not %llx.%llx\n",
 			     dn, d_inode(dn), ceph_vinop(d_inode(dn)),
 			     ceph_vinop(in));
+			d_invalidate(dn);
 			have_lease = false;
 		}
 
@@ -1349,15 +1354,20 @@
 
 	if (!ctl->page || pgoff != page_index(ctl->page)) {
 		ceph_readdir_cache_release(ctl);
-		ctl->page  = grab_cache_page(&dir->i_data, pgoff);
+		if (idx == 0)
+			ctl->page = grab_cache_page(&dir->i_data, pgoff);
+		else
+			ctl->page = find_lock_page(&dir->i_data, pgoff);
 		if (!ctl->page) {
 			ctl->index = -1;
-			return -ENOMEM;
+			return idx == 0 ? -ENOMEM : 0;
 		}
 		/* reading/filling the cache are serialized by
 		 * i_mutex, no need to use page lock */
 		unlock_page(ctl->page);
 		ctl->dentries = kmap(ctl->page);
+		if (idx == 0)
+			memset(ctl->dentries, 0, PAGE_CACHE_SIZE);
 	}
 
 	if (req->r_dir_release_cnt == atomic64_read(&ci->i_release_count) &&
@@ -1380,7 +1390,7 @@
 	struct qstr dname;
 	struct dentry *dn;
 	struct inode *in;
-	int err = 0, ret, i;
+	int err = 0, skipped = 0, ret, i;
 	struct inode *snapdir = NULL;
 	struct ceph_mds_request_head *rhead = req->r_request->front.iov_base;
 	struct ceph_dentry_info *di;
@@ -1492,7 +1502,17 @@
 		}
 
 		if (d_really_is_negative(dn)) {
-			struct dentry *realdn = splice_dentry(dn, in);
+			struct dentry *realdn;
+
+			if (ceph_security_xattr_deadlock(in)) {
+				dout(" skip splicing dn %p to inode %p"
+				     " (security xattr deadlock)\n", dn, in);
+				iput(in);
+				skipped++;
+				goto next_item;
+			}
+
+			realdn = splice_dentry(dn, in);
 			if (IS_ERR(realdn)) {
 				err = PTR_ERR(realdn);
 				d_drop(dn);
@@ -1509,7 +1529,7 @@
 				    req->r_session,
 				    req->r_request_started);
 
-		if (err == 0 && cache_ctl.index >= 0) {
+		if (err == 0 && skipped == 0 && cache_ctl.index >= 0) {
 			ret = fill_readdir_cache(d_inode(parent), dn,
 						 &cache_ctl, req);
 			if (ret < 0)
@@ -1520,7 +1540,7 @@
 			dput(dn);
 	}
 out:
-	if (err == 0) {
+	if (err == 0 && skipped == 0) {
 		req->r_did_prepopulate = true;
 		req->r_readdir_cache_idx = cache_ctl.index;
 	}
@@ -1950,7 +1970,7 @@
 	if (dirtied) {
 		inode_dirty_flags = __ceph_mark_dirty_caps(ci, dirtied,
 							   &prealloc_cf);
-		inode->i_ctime = CURRENT_TIME;
+		inode->i_ctime = current_fs_time(inode->i_sb);
 	}
 
 	release &= issued;
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 911d64d..44852c3 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -1729,7 +1729,7 @@
 	init_completion(&req->r_safe_completion);
 	INIT_LIST_HEAD(&req->r_unsafe_item);
 
-	req->r_stamp = CURRENT_TIME;
+	req->r_stamp = current_fs_time(mdsc->fsc->sb);
 
 	req->r_op = op;
 	req->r_direct_mode = mode;
@@ -2540,6 +2540,7 @@
 
 	/* insert trace into our cache */
 	mutex_lock(&req->r_fill_mutex);
+	current->journal_info = req;
 	err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session);
 	if (err == 0) {
 		if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
@@ -2547,6 +2548,7 @@
 			ceph_readdir_prepopulate(req, req->r_session);
 		ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
 	}
+	current->journal_info = NULL;
 	mutex_unlock(&req->r_fill_mutex);
 
 	up_read(&mdsc->snap_rwsem);
@@ -3764,7 +3766,6 @@
 	dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
 
 	/* do we need it? */
-	ceph_monc_got_mdsmap(&mdsc->fsc->client->monc, epoch);
 	mutex_lock(&mdsc->mutex);
 	if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
 		dout("handle_map epoch %u <= our %u\n",
@@ -3791,6 +3792,8 @@
 	mdsc->fsc->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size;
 
 	__wake_requests(mdsc, &mdsc->waiting_for_map);
+	ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP,
+			  mdsc->mdsmap->m_epoch);
 
 	mutex_unlock(&mdsc->mutex);
 	schedule_delayed(mdsc);
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index 4aa7122..9caaa7f 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -296,8 +296,6 @@
 }
 
 
-struct ceph_snap_context *ceph_empty_snapc;
-
 /*
  * build the snap context for a given realm.
  */
@@ -987,17 +985,3 @@
 		up_write(&mdsc->snap_rwsem);
 	return;
 }
-
-int __init ceph_snap_init(void)
-{
-	ceph_empty_snapc = ceph_create_snap_context(0, GFP_NOFS);
-	if (!ceph_empty_snapc)
-		return -ENOMEM;
-	ceph_empty_snapc->seq = 1;
-	return 0;
-}
-
-void ceph_snap_exit(void)
-{
-	ceph_put_snap_context(ceph_empty_snapc);
-}
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index ca4d5e8..c973043d 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -439,8 +439,8 @@
 
 	if (fsopt->flags & CEPH_MOUNT_OPT_DIRSTAT)
 		seq_puts(m, ",dirstat");
-	if ((fsopt->flags & CEPH_MOUNT_OPT_RBYTES) == 0)
-		seq_puts(m, ",norbytes");
+	if ((fsopt->flags & CEPH_MOUNT_OPT_RBYTES))
+		seq_puts(m, ",rbytes");
 	if (fsopt->flags & CEPH_MOUNT_OPT_NOASYNCREADDIR)
 		seq_puts(m, ",noasyncreaddir");
 	if ((fsopt->flags & CEPH_MOUNT_OPT_DCACHE) == 0)
@@ -530,7 +530,7 @@
 		goto fail;
 	}
 	fsc->client->extra_mon_dispatch = extra_mon_dispatch;
-	fsc->client->monc.want_mdsmap = 1;
+	ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 0, true);
 
 	fsc->mount_options = fsopt;
 
@@ -793,22 +793,20 @@
 	struct dentry *root;
 	int first = 0;   /* first vfsmount for this super_block */
 
-	dout("mount start\n");
+	dout("mount start %p\n", fsc);
 	mutex_lock(&fsc->client->mount_mutex);
 
-	err = __ceph_open_session(fsc->client, started);
-	if (err < 0)
-		goto out;
+	if (!fsc->sb->s_root) {
+		err = __ceph_open_session(fsc->client, started);
+		if (err < 0)
+			goto out;
 
-	dout("mount opening root\n");
-	root = open_root_dentry(fsc, "", started);
-	if (IS_ERR(root)) {
-		err = PTR_ERR(root);
-		goto out;
-	}
-	if (fsc->sb->s_root) {
-		dput(root);
-	} else {
+		dout("mount opening root\n");
+		root = open_root_dentry(fsc, "", started);
+		if (IS_ERR(root)) {
+			err = PTR_ERR(root);
+			goto out;
+		}
 		fsc->sb->s_root = root;
 		first = 1;
 
@@ -818,6 +816,7 @@
 	}
 
 	if (path[0] == 0) {
+		root = fsc->sb->s_root;
 		dget(root);
 	} else {
 		dout("mount opening base mountpoint\n");
@@ -833,16 +832,14 @@
 	mutex_unlock(&fsc->client->mount_mutex);
 	return root;
 
-out:
-	mutex_unlock(&fsc->client->mount_mutex);
-	return ERR_PTR(err);
-
 fail:
 	if (first) {
 		dput(fsc->sb->s_root);
 		fsc->sb->s_root = NULL;
 	}
-	goto out;
+out:
+	mutex_unlock(&fsc->client->mount_mutex);
+	return ERR_PTR(err);
 }
 
 static int ceph_set_super(struct super_block *s, void *data)
@@ -1042,19 +1039,14 @@
 
 	ceph_flock_init();
 	ceph_xattr_init();
-	ret = ceph_snap_init();
-	if (ret)
-		goto out_xattr;
 	ret = register_filesystem(&ceph_fs_type);
 	if (ret)
-		goto out_snap;
+		goto out_xattr;
 
 	pr_info("loaded (mds proto %d)\n", CEPH_MDSC_PROTOCOL);
 
 	return 0;
 
-out_snap:
-	ceph_snap_exit();
 out_xattr:
 	ceph_xattr_exit();
 	destroy_caches();
@@ -1066,7 +1058,6 @@
 {
 	dout("exit_ceph\n");
 	unregister_filesystem(&ceph_fs_type);
-	ceph_snap_exit();
 	ceph_xattr_exit();
 	destroy_caches();
 }
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 9c458eb..e705c4d6 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -37,8 +37,7 @@
 #define CEPH_MOUNT_OPT_FSCACHE         (1<<10) /* use fscache */
 #define CEPH_MOUNT_OPT_NOPOOLPERM      (1<<11) /* no pool permission check */
 
-#define CEPH_MOUNT_OPT_DEFAULT    (CEPH_MOUNT_OPT_RBYTES | \
-				   CEPH_MOUNT_OPT_DCACHE)
+#define CEPH_MOUNT_OPT_DEFAULT    CEPH_MOUNT_OPT_DCACHE
 
 #define ceph_set_mount_opt(fsc, opt) \
 	(fsc)->mount_options->flags |= CEPH_MOUNT_OPT_##opt;
@@ -469,7 +468,7 @@
 #define CEPH_I_POOL_PERM	(1 << 4)  /* pool rd/wr bits are valid */
 #define CEPH_I_POOL_RD		(1 << 5)  /* can read from pool */
 #define CEPH_I_POOL_WR		(1 << 6)  /* can write to pool */
-
+#define CEPH_I_SEC_INITED	(1 << 7)  /* security initialized */
 
 static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci,
 					   long long release_count,
@@ -721,7 +720,6 @@
 
 
 /* snap.c */
-extern struct ceph_snap_context *ceph_empty_snapc;
 struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc,
 					       u64 ino);
 extern void ceph_get_snap_realm(struct ceph_mds_client *mdsc,
@@ -738,8 +736,6 @@
 extern int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
 				  struct ceph_cap_snap *capsnap);
 extern void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc);
-extern int ceph_snap_init(void);
-extern void ceph_snap_exit(void);
 
 /*
  * a cap_snap is "pending" if it is still awaiting an in-progress
@@ -808,6 +804,20 @@
 extern void ceph_xattr_exit(void);
 extern const struct xattr_handler *ceph_xattr_handlers[];
 
+#ifdef CONFIG_SECURITY
+extern bool ceph_security_xattr_deadlock(struct inode *in);
+extern bool ceph_security_xattr_wanted(struct inode *in);
+#else
+static inline bool ceph_security_xattr_deadlock(struct inode *in)
+{
+	return false;
+}
+static inline bool ceph_security_xattr_wanted(struct inode *in)
+{
+	return false;
+}
+#endif
+
 /* acl.c */
 struct ceph_acls_info {
 	void *default_acl;
@@ -947,7 +957,6 @@
 extern void ceph_dentry_lru_del(struct dentry *dn);
 extern void ceph_invalidate_dentry_lease(struct dentry *dentry);
 extern unsigned ceph_dentry_hash(struct inode *dir, struct dentry *dn);
-extern struct inode *ceph_get_dentry_parent_inode(struct dentry *dentry);
 extern void ceph_readdir_cache_release(struct ceph_readdir_cache_control *ctl);
 
 /*
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index 819163d..9410abd 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -714,31 +714,62 @@
 	}
 }
 
+static inline int __get_request_mask(struct inode *in) {
+	struct ceph_mds_request *req = current->journal_info;
+	int mask = 0;
+	if (req && req->r_target_inode == in) {
+		if (req->r_op == CEPH_MDS_OP_LOOKUP ||
+		    req->r_op == CEPH_MDS_OP_LOOKUPINO ||
+		    req->r_op == CEPH_MDS_OP_LOOKUPPARENT ||
+		    req->r_op == CEPH_MDS_OP_GETATTR) {
+			mask = le32_to_cpu(req->r_args.getattr.mask);
+		} else if (req->r_op == CEPH_MDS_OP_OPEN ||
+			   req->r_op == CEPH_MDS_OP_CREATE) {
+			mask = le32_to_cpu(req->r_args.open.mask);
+		}
+	}
+	return mask;
+}
+
 ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value,
 		      size_t size)
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
-	int err;
 	struct ceph_inode_xattr *xattr;
 	struct ceph_vxattr *vxattr = NULL;
+	int req_mask;
+	int err;
 
 	if (!ceph_is_valid_xattr(name))
 		return -ENODATA;
 
 	/* let's see if a virtual xattr was requested */
 	vxattr = ceph_match_vxattr(inode, name);
-	if (vxattr && !(vxattr->exists_cb && !vxattr->exists_cb(ci))) {
-		err = vxattr->getxattr_cb(ci, value, size);
+	if (vxattr) {
+		err = -ENODATA;
+		if (!(vxattr->exists_cb && !vxattr->exists_cb(ci)))
+			err = vxattr->getxattr_cb(ci, value, size);
 		return err;
 	}
 
+	req_mask = __get_request_mask(inode);
+
 	spin_lock(&ci->i_ceph_lock);
 	dout("getxattr %p ver=%lld index_ver=%lld\n", inode,
 	     ci->i_xattrs.version, ci->i_xattrs.index_version);
 
 	if (ci->i_xattrs.version == 0 ||
-	    !__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1)) {
+	    !((req_mask & CEPH_CAP_XATTR_SHARED) ||
+	      __ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1))) {
 		spin_unlock(&ci->i_ceph_lock);
+
+		/* security module gets xattr while filling trace */
+		if (current->journal_info != NULL) {
+			pr_warn_ratelimited("sync getxattr %p "
+					    "during filling trace\n", inode);
+			return -EBUSY;
+		}
+
 		/* get xattrs from mds (if we don't already have them) */
 		err = ceph_do_getattr(inode, CEPH_STAT_CAP_XATTR, true);
 		if (err)
@@ -765,6 +796,9 @@
 
 	memcpy(value, xattr->val, xattr->val_len);
 
+	if (current->journal_info != NULL &&
+	    !strncmp(name, XATTR_SECURITY_PREFIX, XATTR_SECURITY_PREFIX_LEN))
+		ci->i_ceph_flags |= CEPH_I_SEC_INITED;
 out:
 	spin_unlock(&ci->i_ceph_lock);
 	return err;
@@ -999,7 +1033,7 @@
 		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL,
 					       &prealloc_cf);
 		ci->i_xattrs.dirty = true;
-		inode->i_ctime = CURRENT_TIME;
+		inode->i_ctime = current_fs_time(inode->i_sb);
 	}
 
 	spin_unlock(&ci->i_ceph_lock);
@@ -1015,7 +1049,15 @@
 do_sync_unlocked:
 	if (lock_snap_rwsem)
 		up_read(&mdsc->snap_rwsem);
-	err = ceph_sync_setxattr(dentry, name, value, size, flags);
+
+	/* security module set xattr while filling trace */
+	if (current->journal_info != NULL) {
+		pr_warn_ratelimited("sync setxattr %p "
+				    "during filling trace\n", inode);
+		err = -EBUSY;
+	} else {
+		err = ceph_sync_setxattr(dentry, name, value, size, flags);
+	}
 out:
 	ceph_free_cap_flush(prealloc_cf);
 	kfree(newname);
@@ -1136,7 +1178,7 @@
 	dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL,
 				       &prealloc_cf);
 	ci->i_xattrs.dirty = true;
-	inode->i_ctime = CURRENT_TIME;
+	inode->i_ctime = current_fs_time(inode->i_sb);
 	spin_unlock(&ci->i_ceph_lock);
 	if (lock_snap_rwsem)
 		up_read(&mdsc->snap_rwsem);
@@ -1164,3 +1206,25 @@
 
 	return __ceph_removexattr(dentry, name);
 }
+
+#ifdef CONFIG_SECURITY
+bool ceph_security_xattr_wanted(struct inode *in)
+{
+	return in->i_security != NULL;
+}
+
+bool ceph_security_xattr_deadlock(struct inode *in)
+{
+	struct ceph_inode_info *ci;
+	bool ret;
+	if (in->i_security == NULL)
+		return false;
+	ci = ceph_inode(in);
+	spin_lock(&ci->i_ceph_lock);
+	ret = !(ci->i_ceph_flags & CEPH_I_SEC_INITED) &&
+	      !(ci->i_xattrs.version > 0 &&
+		__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 0));
+	spin_unlock(&ci->i_ceph_lock);
+	return ret;
+}
+#endif
diff --git a/include/linux/ceph/ceph_features.h b/include/linux/ceph/ceph_features.h
index 15151f3..ae2f668 100644
--- a/include/linux/ceph/ceph_features.h
+++ b/include/linux/ceph/ceph_features.h
@@ -105,6 +105,7 @@
  */
 #define CEPH_FEATURES_SUPPORTED_DEFAULT		\
 	(CEPH_FEATURE_NOSRCADDR |		\
+	 CEPH_FEATURE_SUBSCRIBE2 |		\
 	 CEPH_FEATURE_RECONNECT_SEQ |		\
 	 CEPH_FEATURE_PGID64 |			\
 	 CEPH_FEATURE_PGPOOL3 |			\
@@ -127,6 +128,7 @@
 
 #define CEPH_FEATURES_REQUIRED_DEFAULT   \
 	(CEPH_FEATURE_NOSRCADDR |	 \
+	 CEPH_FEATURE_SUBSCRIBE2 |	 \
 	 CEPH_FEATURE_RECONNECT_SEQ |	 \
 	 CEPH_FEATURE_PGID64 |		 \
 	 CEPH_FEATURE_PGPOOL3 |		 \
diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h
index d7d072a..37f28bf 100644
--- a/include/linux/ceph/ceph_fs.h
+++ b/include/linux/ceph/ceph_fs.h
@@ -198,8 +198,8 @@
 #define CEPH_SUBSCRIBE_ONETIME    1  /* i want only 1 update after have */
 
 struct ceph_mon_subscribe_item {
-	__le64 have_version;    __le64 have;
-	__u8 onetime;
+	__le64 start;
+	__u8 flags;
 } __attribute__ ((packed));
 
 struct ceph_mon_subscribe_ack {
@@ -376,7 +376,8 @@
 		__le32 stripe_count;         /* ... */
 		__le32 object_size;
 		__le32 file_replication;
-		__le32 unused;               /* used to be preferred osd */
+               __le32 mask;                 /* CEPH_CAP_* */
+               __le32 old_size;
 	} __attribute__ ((packed)) open;
 	struct {
 		__le32 flags;
diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h
index 3e3799c..e7975e4 100644
--- a/include/linux/ceph/libceph.h
+++ b/include/linux/ceph/libceph.h
@@ -47,7 +47,6 @@
 	unsigned long mount_timeout;		/* jiffies */
 	unsigned long osd_idle_ttl;		/* jiffies */
 	unsigned long osd_keepalive_timeout;	/* jiffies */
-	unsigned long monc_ping_timeout;	/* jiffies */
 
 	/*
 	 * any type that can't be simply compared or doesn't need need
@@ -68,7 +67,12 @@
 #define CEPH_MOUNT_TIMEOUT_DEFAULT	msecs_to_jiffies(60 * 1000)
 #define CEPH_OSD_KEEPALIVE_DEFAULT	msecs_to_jiffies(5 * 1000)
 #define CEPH_OSD_IDLE_TTL_DEFAULT	msecs_to_jiffies(60 * 1000)
-#define CEPH_MONC_PING_TIMEOUT_DEFAULT	msecs_to_jiffies(30 * 1000)
+
+#define CEPH_MONC_HUNT_INTERVAL		msecs_to_jiffies(3 * 1000)
+#define CEPH_MONC_PING_INTERVAL		msecs_to_jiffies(10 * 1000)
+#define CEPH_MONC_PING_TIMEOUT		msecs_to_jiffies(30 * 1000)
+#define CEPH_MONC_HUNT_BACKOFF		2
+#define CEPH_MONC_HUNT_MAX_MULT		10
 
 #define CEPH_MSG_MAX_FRONT_LEN	(16*1024*1024)
 #define CEPH_MSG_MAX_MIDDLE_LEN	(16*1024*1024)
diff --git a/include/linux/ceph/mon_client.h b/include/linux/ceph/mon_client.h
index 81810dc..e230e7e 100644
--- a/include/linux/ceph/mon_client.h
+++ b/include/linux/ceph/mon_client.h
@@ -68,18 +68,24 @@
 
 	bool hunting;
 	int cur_mon;                       /* last monitor i contacted */
-	unsigned long sub_sent, sub_renew_after;
+	unsigned long sub_renew_after;
+	unsigned long sub_renew_sent;
 	struct ceph_connection con;
 
+	bool had_a_connection;
+	int hunt_mult; /* [1..CEPH_MONC_HUNT_MAX_MULT] */
+
 	/* pending generic requests */
 	struct rb_root generic_request_tree;
 	int num_generic_requests;
 	u64 last_tid;
 
-	/* mds/osd map */
-	int want_mdsmap;
-	int want_next_osdmap; /* 1 = want, 2 = want+asked */
-	u32 have_osdmap, have_mdsmap;
+	/* subs, indexed with CEPH_SUB_* */
+	struct {
+		struct ceph_mon_subscribe_item item;
+		bool want;
+		u32 have; /* epoch */
+	} subs[3];
 
 #ifdef CONFIG_DEBUG_FS
 	struct dentry *debugfs_file;
@@ -93,14 +99,23 @@
 extern int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl);
 extern void ceph_monc_stop(struct ceph_mon_client *monc);
 
+enum {
+	CEPH_SUB_MDSMAP = 0,
+	CEPH_SUB_MONMAP,
+	CEPH_SUB_OSDMAP,
+};
+
+extern const char *ceph_sub_str[];
+
 /*
  * The model here is to indicate that we need a new map of at least
- * epoch @want, and also call in when we receive a map.  We will
+ * epoch @epoch, and also call in when we receive a map.  We will
  * periodically rerequest the map from the monitor cluster until we
  * get what we want.
  */
-extern int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 have);
-extern int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 have);
+bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch,
+			bool continuous);
+void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch);
 
 extern void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc);
 extern int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 7506b48..4343df8 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -43,7 +43,8 @@
 };
 
 
-#define CEPH_OSD_MAX_OP	3
+#define CEPH_OSD_SLAB_OPS	2
+#define CEPH_OSD_MAX_OPS	16
 
 enum ceph_osd_data_type {
 	CEPH_OSD_DATA_TYPE_NONE = 0,
@@ -77,7 +78,10 @@
 struct ceph_osd_req_op {
 	u16 op;           /* CEPH_OSD_OP_* */
 	u32 flags;        /* CEPH_OSD_OP_FLAG_* */
-	u32 payload_len;
+	u32 indata_len;   /* request */
+	u32 outdata_len;  /* reply */
+	s32 rval;
+
 	union {
 		struct ceph_osd_data raw_data_in;
 		struct {
@@ -136,7 +140,6 @@
 
 	/* request osd ops array  */
 	unsigned int		r_num_ops;
-	struct ceph_osd_req_op	r_ops[CEPH_OSD_MAX_OP];
 
 	/* these are updated on each send */
 	__le32           *r_request_osdmap_epoch;
@@ -148,8 +151,6 @@
 	struct ceph_eversion *r_request_reassert_version;
 
 	int               r_result;
-	int               r_reply_op_len[CEPH_OSD_MAX_OP];
-	s32               r_reply_op_result[CEPH_OSD_MAX_OP];
 	int               r_got_reply;
 	int		  r_linger;
 
@@ -174,6 +175,8 @@
 	unsigned long     r_stamp;            /* send OR check time */
 
 	struct ceph_snap_context *r_snapc;    /* snap context for writes */
+
+	struct ceph_osd_req_op r_ops[];
 };
 
 struct ceph_request_redirect {
@@ -263,6 +266,8 @@
 					u64 truncate_size, u32 truncate_seq);
 extern void osd_req_op_extent_update(struct ceph_osd_request *osd_req,
 					unsigned int which, u64 length);
+extern void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req,
+				       unsigned int which, u64 offset_inc);
 
 extern struct ceph_osd_data *osd_req_op_extent_osd_data(
 					struct ceph_osd_request *osd_req,
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index bcbec33..dcc18c6 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -361,7 +361,6 @@
 	opt->osd_keepalive_timeout = CEPH_OSD_KEEPALIVE_DEFAULT;
 	opt->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT;
 	opt->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT;
-	opt->monc_ping_timeout = CEPH_MONC_PING_TIMEOUT_DEFAULT;
 
 	/* get mon ip(s) */
 	/* ip1[:port1][,ip2[:port2]...] */
@@ -686,6 +685,9 @@
 			return client->auth_err;
 	}
 
+	pr_info("client%llu fsid %pU\n", ceph_client_id(client), &client->fsid);
+	ceph_debugfs_client_init(client);
+
 	return 0;
 }
 EXPORT_SYMBOL(__ceph_open_session);
diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c
index 593dc2e..b902fbc 100644
--- a/net/ceph/debugfs.c
+++ b/net/ceph/debugfs.c
@@ -112,15 +112,20 @@
 	struct ceph_mon_generic_request *req;
 	struct ceph_mon_client *monc = &client->monc;
 	struct rb_node *rp;
+	int i;
 
 	mutex_lock(&monc->mutex);
 
-	if (monc->have_mdsmap)
-		seq_printf(s, "have mdsmap %u\n", (unsigned int)monc->have_mdsmap);
-	if (monc->have_osdmap)
-		seq_printf(s, "have osdmap %u\n", (unsigned int)monc->have_osdmap);
-	if (monc->want_next_osdmap)
-		seq_printf(s, "want next osdmap\n");
+	for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
+		seq_printf(s, "have %s %u", ceph_sub_str[i],
+			   monc->subs[i].have);
+		if (monc->subs[i].want)
+			seq_printf(s, " want %llu%s",
+				   le64_to_cpu(monc->subs[i].item.start),
+				   (monc->subs[i].item.flags &
+					CEPH_SUBSCRIBE_ONETIME ?  "" : "+"));
+		seq_putc(s, '\n');
+	}
 
 	for (rp = rb_first(&monc->generic_request_tree); rp; rp = rb_next(rp)) {
 		__u16 op;
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 9382619..1831f63 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -235,18 +235,12 @@
 static int ceph_msgr_slab_init(void)
 {
 	BUG_ON(ceph_msg_cache);
-	ceph_msg_cache = kmem_cache_create("ceph_msg",
-					sizeof (struct ceph_msg),
-					__alignof__(struct ceph_msg), 0, NULL);
-
+	ceph_msg_cache = KMEM_CACHE(ceph_msg, 0);
 	if (!ceph_msg_cache)
 		return -ENOMEM;
 
 	BUG_ON(ceph_msg_data_cache);
-	ceph_msg_data_cache = kmem_cache_create("ceph_msg_data",
-					sizeof (struct ceph_msg_data),
-					__alignof__(struct ceph_msg_data),
-					0, NULL);
+	ceph_msg_data_cache = KMEM_CACHE(ceph_msg_data, 0);
 	if (ceph_msg_data_cache)
 		return 0;
 
@@ -1221,25 +1215,19 @@
 static void prepare_write_message_footer(struct ceph_connection *con)
 {
 	struct ceph_msg *m = con->out_msg;
-	int v = con->out_kvec_left;
 
 	m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE;
 
 	dout("prepare_write_message_footer %p\n", con);
-	con->out_kvec[v].iov_base = &m->footer;
+	con_out_kvec_add(con, sizeof_footer(con), &m->footer);
 	if (con->peer_features & CEPH_FEATURE_MSG_AUTH) {
 		if (con->ops->sign_message)
 			con->ops->sign_message(m);
 		else
 			m->footer.sig = 0;
-		con->out_kvec[v].iov_len = sizeof(m->footer);
-		con->out_kvec_bytes += sizeof(m->footer);
 	} else {
 		m->old_footer.flags = m->footer.flags;
-		con->out_kvec[v].iov_len = sizeof(m->old_footer);
-		con->out_kvec_bytes += sizeof(m->old_footer);
 	}
-	con->out_kvec_left++;
 	con->out_more = m->more_to_follow;
 	con->out_msg_done = true;
 }
@@ -2409,11 +2397,7 @@
 	}
 
 	/* footer */
-	if (need_sign)
-		size = sizeof(m->footer);
-	else
-		size = sizeof(m->old_footer);
-
+	size = sizeof_footer(con);
 	end += size;
 	ret = read_partial(con, end, size, &m->footer);
 	if (ret <= 0)
@@ -3089,10 +3073,7 @@
 			con->out_skip += con_out_kvec_skip(con);
 		} else {
 			BUG_ON(!msg->data_length);
-			if (con->peer_features & CEPH_FEATURE_MSG_AUTH)
-				con->out_skip += sizeof(msg->footer);
-			else
-				con->out_skip += sizeof(msg->old_footer);
+			con->out_skip += sizeof_footer(con);
 		}
 		/* data, middle, front */
 		if (msg->data_length)
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index de85ddd..cf638c0 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -122,51 +122,91 @@
 	ceph_msg_revoke(monc->m_subscribe);
 	ceph_msg_revoke_incoming(monc->m_subscribe_ack);
 	ceph_con_close(&monc->con);
-	monc->cur_mon = -1;
+
 	monc->pending_auth = 0;
 	ceph_auth_reset(monc->auth);
 }
 
 /*
- * Open a session with a (new) monitor.
+ * Pick a new monitor at random and set cur_mon.  If we are repicking
+ * (i.e. cur_mon is already set), be sure to pick a different one.
  */
-static int __open_session(struct ceph_mon_client *monc)
+static void pick_new_mon(struct ceph_mon_client *monc)
 {
-	char r;
-	int ret;
+	int old_mon = monc->cur_mon;
 
-	if (monc->cur_mon < 0) {
-		get_random_bytes(&r, 1);
-		monc->cur_mon = r % monc->monmap->num_mon;
-		dout("open_session num=%d r=%d -> mon%d\n",
-		     monc->monmap->num_mon, r, monc->cur_mon);
-		monc->sub_sent = 0;
-		monc->sub_renew_after = jiffies;  /* i.e., expired */
-		monc->want_next_osdmap = !!monc->want_next_osdmap;
+	BUG_ON(monc->monmap->num_mon < 1);
 
-		dout("open_session mon%d opening\n", monc->cur_mon);
-		ceph_con_open(&monc->con,
-			      CEPH_ENTITY_TYPE_MON, monc->cur_mon,
-			      &monc->monmap->mon_inst[monc->cur_mon].addr);
-
-		/* send an initial keepalive to ensure our timestamp is
-		 * valid by the time we are in an OPENED state */
-		ceph_con_keepalive(&monc->con);
-
-		/* initiatiate authentication handshake */
-		ret = ceph_auth_build_hello(monc->auth,
-					    monc->m_auth->front.iov_base,
-					    monc->m_auth->front_alloc_len);
-		__send_prepared_auth_request(monc, ret);
+	if (monc->monmap->num_mon == 1) {
+		monc->cur_mon = 0;
 	} else {
-		dout("open_session mon%d already open\n", monc->cur_mon);
+		int max = monc->monmap->num_mon;
+		int o = -1;
+		int n;
+
+		if (monc->cur_mon >= 0) {
+			if (monc->cur_mon < monc->monmap->num_mon)
+				o = monc->cur_mon;
+			if (o >= 0)
+				max--;
+		}
+
+		n = prandom_u32() % max;
+		if (o >= 0 && n >= o)
+			n++;
+
+		monc->cur_mon = n;
 	}
-	return 0;
+
+	dout("%s mon%d -> mon%d out of %d mons\n", __func__, old_mon,
+	     monc->cur_mon, monc->monmap->num_mon);
 }
 
-static bool __sub_expired(struct ceph_mon_client *monc)
+/*
+ * Open a session with a new monitor.
+ */
+static void __open_session(struct ceph_mon_client *monc)
 {
-	return time_after_eq(jiffies, monc->sub_renew_after);
+	int ret;
+
+	pick_new_mon(monc);
+
+	monc->hunting = true;
+	if (monc->had_a_connection) {
+		monc->hunt_mult *= CEPH_MONC_HUNT_BACKOFF;
+		if (monc->hunt_mult > CEPH_MONC_HUNT_MAX_MULT)
+			monc->hunt_mult = CEPH_MONC_HUNT_MAX_MULT;
+	}
+
+	monc->sub_renew_after = jiffies; /* i.e., expired */
+	monc->sub_renew_sent = 0;
+
+	dout("%s opening mon%d\n", __func__, monc->cur_mon);
+	ceph_con_open(&monc->con, CEPH_ENTITY_TYPE_MON, monc->cur_mon,
+		      &monc->monmap->mon_inst[monc->cur_mon].addr);
+
+	/*
+	 * send an initial keepalive to ensure our timestamp is valid
+	 * by the time we are in an OPENED state
+	 */
+	ceph_con_keepalive(&monc->con);
+
+	/* initiate authentication handshake */
+	ret = ceph_auth_build_hello(monc->auth,
+				    monc->m_auth->front.iov_base,
+				    monc->m_auth->front_alloc_len);
+	BUG_ON(ret <= 0);
+	__send_prepared_auth_request(monc, ret);
+}
+
+static void reopen_session(struct ceph_mon_client *monc)
+{
+	if (!monc->hunting)
+		pr_info("mon%d %s session lost, hunting for new mon\n",
+		    monc->cur_mon, ceph_pr_addr(&monc->con.peer_addr.in_addr));
+
+	__close_session(monc);
+	__open_session(monc);
 }
 
 /*
@@ -174,74 +214,70 @@
  */
 static void __schedule_delayed(struct ceph_mon_client *monc)
 {
-	struct ceph_options *opt = monc->client->options;
 	unsigned long delay;
 
-	if (monc->cur_mon < 0 || __sub_expired(monc)) {
-		delay = 10 * HZ;
-	} else {
-		delay = 20 * HZ;
-		if (opt->monc_ping_timeout > 0)
-			delay = min(delay, opt->monc_ping_timeout / 3);
-	}
+	if (monc->hunting)
+		delay = CEPH_MONC_HUNT_INTERVAL * monc->hunt_mult;
+	else
+		delay = CEPH_MONC_PING_INTERVAL;
+
 	dout("__schedule_delayed after %lu\n", delay);
-	schedule_delayed_work(&monc->delayed_work,
-			      round_jiffies_relative(delay));
+	mod_delayed_work(system_wq, &monc->delayed_work,
+			 round_jiffies_relative(delay));
 }
 
+const char *ceph_sub_str[] = {
+	[CEPH_SUB_MDSMAP] = "mdsmap",
+	[CEPH_SUB_MONMAP] = "monmap",
+	[CEPH_SUB_OSDMAP] = "osdmap",
+};
+
 /*
- * Send subscribe request for mdsmap and/or osdmap.
+ * Send subscribe request for one or more maps, according to
+ * monc->subs.
  */
 static void __send_subscribe(struct ceph_mon_client *monc)
 {
-	dout("__send_subscribe sub_sent=%u exp=%u want_osd=%d\n",
-	     (unsigned int)monc->sub_sent, __sub_expired(monc),
-	     monc->want_next_osdmap);
-	if ((__sub_expired(monc) && !monc->sub_sent) ||
-	    monc->want_next_osdmap == 1) {
-		struct ceph_msg *msg = monc->m_subscribe;
-		struct ceph_mon_subscribe_item *i;
-		void *p, *end;
-		int num;
+	struct ceph_msg *msg = monc->m_subscribe;
+	void *p = msg->front.iov_base;
+	void *const end = p + msg->front_alloc_len;
+	int num = 0;
+	int i;
 
-		p = msg->front.iov_base;
-		end = p + msg->front_alloc_len;
+	dout("%s sent %lu\n", __func__, monc->sub_renew_sent);
 
-		num = 1 + !!monc->want_next_osdmap + !!monc->want_mdsmap;
-		ceph_encode_32(&p, num);
+	BUG_ON(monc->cur_mon < 0);
 
-		if (monc->want_next_osdmap) {
-			dout("__send_subscribe to 'osdmap' %u\n",
-			     (unsigned int)monc->have_osdmap);
-			ceph_encode_string(&p, end, "osdmap", 6);
-			i = p;
-			i->have = cpu_to_le64(monc->have_osdmap);
-			i->onetime = 1;
-			p += sizeof(*i);
-			monc->want_next_osdmap = 2;  /* requested */
-		}
-		if (monc->want_mdsmap) {
-			dout("__send_subscribe to 'mdsmap' %u+\n",
-			     (unsigned int)monc->have_mdsmap);
-			ceph_encode_string(&p, end, "mdsmap", 6);
-			i = p;
-			i->have = cpu_to_le64(monc->have_mdsmap);
-			i->onetime = 0;
-			p += sizeof(*i);
-		}
-		ceph_encode_string(&p, end, "monmap", 6);
-		i = p;
-		i->have = 0;
-		i->onetime = 0;
-		p += sizeof(*i);
+	if (!monc->sub_renew_sent)
+		monc->sub_renew_sent = jiffies | 1; /* never 0 */
 
-		msg->front.iov_len = p - msg->front.iov_base;
-		msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
-		ceph_msg_revoke(msg);
-		ceph_con_send(&monc->con, ceph_msg_get(msg));
+	msg->hdr.version = cpu_to_le16(2);
 
-		monc->sub_sent = jiffies | 1;  /* never 0 */
+	for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
+		if (monc->subs[i].want)
+			num++;
 	}
+	BUG_ON(num < 1); /* monmap sub is always there */
+	ceph_encode_32(&p, num);
+	for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
+		const char *s = ceph_sub_str[i];
+
+		if (!monc->subs[i].want)
+			continue;
+
+		dout("%s %s start %llu flags 0x%x\n", __func__, s,
+		     le64_to_cpu(monc->subs[i].item.start),
+		     monc->subs[i].item.flags);
+		ceph_encode_string(&p, end, s, strlen(s));
+		memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item));
+		p += sizeof(monc->subs[i].item);
+	}
+
+	BUG_ON(p != (end - 35 - (ARRAY_SIZE(monc->subs) - num) * 19));
+	msg->front.iov_len = p - msg->front.iov_base;
+	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
+	ceph_msg_revoke(msg);
+	ceph_con_send(&monc->con, ceph_msg_get(msg));
 }
 
 static void handle_subscribe_ack(struct ceph_mon_client *monc,
@@ -255,15 +291,16 @@
 	seconds = le32_to_cpu(h->duration);
 
 	mutex_lock(&monc->mutex);
-	if (monc->hunting) {
-		pr_info("mon%d %s session established\n",
-			monc->cur_mon,
-			ceph_pr_addr(&monc->con.peer_addr.in_addr));
-		monc->hunting = false;
+	if (monc->sub_renew_sent) {
+		monc->sub_renew_after = monc->sub_renew_sent +
+					    (seconds >> 1) * HZ - 1;
+		dout("%s sent %lu duration %d renew after %lu\n", __func__,
+		     monc->sub_renew_sent, seconds, monc->sub_renew_after);
+		monc->sub_renew_sent = 0;
+	} else {
+		dout("%s sent %lu renew after %lu, ignoring\n", __func__,
+		     monc->sub_renew_sent, monc->sub_renew_after);
 	}
-	dout("handle_subscribe_ack after %d seconds\n", seconds);
-	monc->sub_renew_after = monc->sub_sent + (seconds >> 1)*HZ - 1;
-	monc->sub_sent = 0;
 	mutex_unlock(&monc->mutex);
 	return;
 bad:
@@ -272,36 +309,82 @@
 }
 
 /*
- * Keep track of which maps we have
+ * Register interest in a map
+ *
+ * @sub: one of CEPH_SUB_*
+ * @epoch: X for "every map since X", or 0 for "just the latest"
  */
-int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 got)
+static bool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub,
+				 u32 epoch, bool continuous)
 {
-	mutex_lock(&monc->mutex);
-	monc->have_mdsmap = got;
-	mutex_unlock(&monc->mutex);
-	return 0;
-}
-EXPORT_SYMBOL(ceph_monc_got_mdsmap);
+	__le64 start = cpu_to_le64(epoch);
+	u8 flags = !continuous ? CEPH_SUBSCRIBE_ONETIME : 0;
 
-int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 got)
+	dout("%s %s epoch %u continuous %d\n", __func__, ceph_sub_str[sub],
+	     epoch, continuous);
+
+	if (monc->subs[sub].want &&
+	    monc->subs[sub].item.start == start &&
+	    monc->subs[sub].item.flags == flags)
+		return false;
+
+	monc->subs[sub].item.start = start;
+	monc->subs[sub].item.flags = flags;
+	monc->subs[sub].want = true;
+
+	return true;
+}
+
+bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch,
+			bool continuous)
+{
+	bool need_request;
+
+	mutex_lock(&monc->mutex);
+	need_request = __ceph_monc_want_map(monc, sub, epoch, continuous);
+	mutex_unlock(&monc->mutex);
+
+	return need_request;
+}
+EXPORT_SYMBOL(ceph_monc_want_map);
+
+/*
+ * Keep track of which maps we have
+ *
+ * @sub: one of CEPH_SUB_*
+ */
+static void __ceph_monc_got_map(struct ceph_mon_client *monc, int sub,
+				u32 epoch)
+{
+	dout("%s %s epoch %u\n", __func__, ceph_sub_str[sub], epoch);
+
+	if (monc->subs[sub].want) {
+		if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME)
+			monc->subs[sub].want = false;
+		else
+			monc->subs[sub].item.start = cpu_to_le64(epoch + 1);
+	}
+
+	monc->subs[sub].have = epoch;
+}
+
+void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch)
 {
 	mutex_lock(&monc->mutex);
-	monc->have_osdmap = got;
-	monc->want_next_osdmap = 0;
+	__ceph_monc_got_map(monc, sub, epoch);
 	mutex_unlock(&monc->mutex);
-	return 0;
 }
+EXPORT_SYMBOL(ceph_monc_got_map);
 
 /*
  * Register interest in the next osdmap
  */
 void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc)
 {
-	dout("request_next_osdmap have %u\n", monc->have_osdmap);
+	dout("%s have %u\n", __func__, monc->subs[CEPH_SUB_OSDMAP].have);
 	mutex_lock(&monc->mutex);
-	if (!monc->want_next_osdmap)
-		monc->want_next_osdmap = 1;
-	if (monc->want_next_osdmap < 2)
+	if (__ceph_monc_want_map(monc, CEPH_SUB_OSDMAP,
+				 monc->subs[CEPH_SUB_OSDMAP].have + 1, false))
 		__send_subscribe(monc);
 	mutex_unlock(&monc->mutex);
 }
@@ -320,15 +403,15 @@
 	long ret;
 
 	mutex_lock(&monc->mutex);
-	while (monc->have_osdmap < epoch) {
+	while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) {
 		mutex_unlock(&monc->mutex);
 
 		if (timeout && time_after_eq(jiffies, started + timeout))
 			return -ETIMEDOUT;
 
 		ret = wait_event_interruptible_timeout(monc->client->auth_wq,
-						monc->have_osdmap >= epoch,
-						ceph_timeout_jiffies(timeout));
+				     monc->subs[CEPH_SUB_OSDMAP].have >= epoch,
+				     ceph_timeout_jiffies(timeout));
 		if (ret < 0)
 			return ret;
 
@@ -341,11 +424,14 @@
 EXPORT_SYMBOL(ceph_monc_wait_osdmap);
 
 /*
- *
+ * Open a session with a random monitor.  Request monmap and osdmap,
+ * which are waited upon in __ceph_open_session().
  */
 int ceph_monc_open_session(struct ceph_mon_client *monc)
 {
 	mutex_lock(&monc->mutex);
+	__ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true);
+	__ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false);
 	__open_session(monc);
 	__schedule_delayed(monc);
 	mutex_unlock(&monc->mutex);
@@ -353,29 +439,15 @@
 }
 EXPORT_SYMBOL(ceph_monc_open_session);
 
-/*
- * We require the fsid and global_id in order to initialize our
- * debugfs dir.
- */
-static bool have_debugfs_info(struct ceph_mon_client *monc)
-{
-	dout("have_debugfs_info fsid %d globalid %lld\n",
-	     (int)monc->client->have_fsid, monc->auth->global_id);
-	return monc->client->have_fsid && monc->auth->global_id > 0;
-}
-
 static void ceph_monc_handle_map(struct ceph_mon_client *monc,
 				 struct ceph_msg *msg)
 {
 	struct ceph_client *client = monc->client;
 	struct ceph_monmap *monmap = NULL, *old = monc->monmap;
 	void *p, *end;
-	int had_debugfs_info, init_debugfs = 0;
 
 	mutex_lock(&monc->mutex);
 
-	had_debugfs_info = have_debugfs_info(monc);
-
 	dout("handle_monmap\n");
 	p = msg->front.iov_base;
 	end = p + msg->front.iov_len;
@@ -395,29 +467,11 @@
 	client->monc.monmap = monmap;
 	kfree(old);
 
-	if (!client->have_fsid) {
-		client->have_fsid = true;
-		if (!had_debugfs_info && have_debugfs_info(monc)) {
-			pr_info("client%lld fsid %pU\n",
-				ceph_client_id(monc->client),
-				&monc->client->fsid);
-			init_debugfs = 1;
-		}
-		mutex_unlock(&monc->mutex);
+	__ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch);
+	client->have_fsid = true;
 
-		if (init_debugfs) {
-			/*
-			 * do debugfs initialization without mutex to avoid
-			 * creating a locking dependency
-			 */
-			ceph_debugfs_client_init(monc->client);
-		}
-
-		goto out_unlocked;
-	}
 out:
 	mutex_unlock(&monc->mutex);
-out_unlocked:
 	wake_up_all(&client->auth_wq);
 }
 
@@ -745,18 +799,15 @@
 	dout("monc delayed_work\n");
 	mutex_lock(&monc->mutex);
 	if (monc->hunting) {
-		__close_session(monc);
-		__open_session(monc);  /* continue hunting */
+		dout("%s continuing hunt\n", __func__);
+		reopen_session(monc);
 	} else {
-		struct ceph_options *opt = monc->client->options;
 		int is_auth = ceph_auth_is_authenticated(monc->auth);
 		if (ceph_con_keepalive_expired(&monc->con,
-					       opt->monc_ping_timeout)) {
+					       CEPH_MONC_PING_TIMEOUT)) {
 			dout("monc keepalive timeout\n");
 			is_auth = 0;
-			__close_session(monc);
-			monc->hunting = true;
-			__open_session(monc);
+			reopen_session(monc);
 		}
 
 		if (!monc->hunting) {
@@ -764,8 +815,14 @@
 			__validate_auth(monc);
 		}
 
-		if (is_auth)
-			__send_subscribe(monc);
+		if (is_auth) {
+			unsigned long now = jiffies;
+
+			dout("%s renew subs? now %lu renew after %lu\n",
+			     __func__, now, monc->sub_renew_after);
+			if (time_after_eq(now, monc->sub_renew_after))
+				__send_subscribe(monc);
+		}
 	}
 	__schedule_delayed(monc);
 	mutex_unlock(&monc->mutex);
@@ -852,18 +909,14 @@
 		      &monc->client->msgr);
 
 	monc->cur_mon = -1;
-	monc->hunting = true;
-	monc->sub_renew_after = jiffies;
-	monc->sub_sent = 0;
+	monc->had_a_connection = false;
+	monc->hunt_mult = 1;
 
 	INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
 	monc->generic_request_tree = RB_ROOT;
 	monc->num_generic_requests = 0;
 	monc->last_tid = 0;
 
-	monc->have_mdsmap = 0;
-	monc->have_osdmap = 0;
-	monc->want_next_osdmap = 1;
 	return 0;
 
 out_auth_reply:
@@ -888,7 +941,7 @@
 
 	mutex_lock(&monc->mutex);
 	__close_session(monc);
-
+	monc->cur_mon = -1;
 	mutex_unlock(&monc->mutex);
 
 	/*
@@ -910,26 +963,40 @@
 }
 EXPORT_SYMBOL(ceph_monc_stop);
 
+static void finish_hunting(struct ceph_mon_client *monc)
+{
+	if (monc->hunting) {
+		dout("%s found mon%d\n", __func__, monc->cur_mon);
+		monc->hunting = false;
+		monc->had_a_connection = true;
+		monc->hunt_mult /= 2; /* reduce by 50% */
+		if (monc->hunt_mult < 1)
+			monc->hunt_mult = 1;
+	}
+}
+
 static void handle_auth_reply(struct ceph_mon_client *monc,
 			      struct ceph_msg *msg)
 {
 	int ret;
 	int was_auth = 0;
-	int had_debugfs_info, init_debugfs = 0;
 
 	mutex_lock(&monc->mutex);
-	had_debugfs_info = have_debugfs_info(monc);
 	was_auth = ceph_auth_is_authenticated(monc->auth);
 	monc->pending_auth = 0;
 	ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
 				     msg->front.iov_len,
 				     monc->m_auth->front.iov_base,
 				     monc->m_auth->front_alloc_len);
+	if (ret > 0) {
+		__send_prepared_auth_request(monc, ret);
+		goto out;
+	}
+
+	finish_hunting(monc);
+
 	if (ret < 0) {
 		monc->client->auth_err = ret;
-		wake_up_all(&monc->client->auth_wq);
-	} else if (ret > 0) {
-		__send_prepared_auth_request(monc, ret);
 	} else if (!was_auth && ceph_auth_is_authenticated(monc->auth)) {
 		dout("authenticated, starting session\n");
 
@@ -939,23 +1006,15 @@
 
 		__send_subscribe(monc);
 		__resend_generic_request(monc);
+
+		pr_info("mon%d %s session established\n", monc->cur_mon,
+			ceph_pr_addr(&monc->con.peer_addr.in_addr));
 	}
 
-	if (!had_debugfs_info && have_debugfs_info(monc)) {
-		pr_info("client%lld fsid %pU\n",
-			ceph_client_id(monc->client),
-			&monc->client->fsid);
-		init_debugfs = 1;
-	}
+out:
 	mutex_unlock(&monc->mutex);
-
-	if (init_debugfs) {
-		/*
-		 * do debugfs initialization without mutex to avoid
-		 * creating a locking dependency
-		 */
-		ceph_debugfs_client_init(monc->client);
-	}
+	if (monc->client->auth_err < 0)
+		wake_up_all(&monc->client->auth_wq);
 }
 
 static int __validate_auth(struct ceph_mon_client *monc)
@@ -1096,29 +1155,17 @@
 {
 	struct ceph_mon_client *monc = con->private;
 
-	if (!monc)
-		return;
-
-	dout("mon_fault\n");
 	mutex_lock(&monc->mutex);
-	if (!con->private)
-		goto out;
-
-	if (!monc->hunting)
-		pr_info("mon%d %s session lost, "
-			"hunting for new mon\n", monc->cur_mon,
-			ceph_pr_addr(&monc->con.peer_addr.in_addr));
-
-	__close_session(monc);
-	if (!monc->hunting) {
-		/* start hunting */
-		monc->hunting = true;
-		__open_session(monc);
-	} else {
-		/* already hunting, let's wait a bit */
-		__schedule_delayed(monc);
+	dout("%s mon%d\n", __func__, monc->cur_mon);
+	if (monc->cur_mon >= 0) {
+		if (!monc->hunting) {
+			dout("%s hunting for new mon\n", __func__);
+			reopen_session(monc);
+			__schedule_delayed(monc);
+		} else {
+			dout("%s already hunting\n", __func__);
+		}
 	}
-out:
 	mutex_unlock(&monc->mutex);
 }
 
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 5bc0537..32355d9d 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -338,9 +338,10 @@
 	ceph_put_snap_context(req->r_snapc);
 	if (req->r_mempool)
 		mempool_free(req, req->r_osdc->req_mempool);
-	else
+	else if (req->r_num_ops <= CEPH_OSD_SLAB_OPS)
 		kmem_cache_free(ceph_osd_request_cache, req);
-
+	else
+		kfree(req);
 }
 
 void ceph_osdc_get_request(struct ceph_osd_request *req)
@@ -369,28 +370,22 @@
 	struct ceph_msg *msg;
 	size_t msg_size;
 
-	BUILD_BUG_ON(CEPH_OSD_MAX_OP > U16_MAX);
-	BUG_ON(num_ops > CEPH_OSD_MAX_OP);
-
-	msg_size = 4 + 4 + 8 + 8 + 4+8;
-	msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
-	msg_size += 1 + 8 + 4 + 4;     /* pg_t */
-	msg_size += 4 + CEPH_MAX_OID_NAME_LEN; /* oid */
-	msg_size += 2 + num_ops*sizeof(struct ceph_osd_op);
-	msg_size += 8;  /* snapid */
-	msg_size += 8;  /* snap_seq */
-	msg_size += 8 * (snapc ? snapc->num_snaps : 0);  /* snaps */
-	msg_size += 4;
-
 	if (use_mempool) {
+		BUG_ON(num_ops > CEPH_OSD_SLAB_OPS);
 		req = mempool_alloc(osdc->req_mempool, gfp_flags);
-		memset(req, 0, sizeof(*req));
+	} else if (num_ops <= CEPH_OSD_SLAB_OPS) {
+		req = kmem_cache_alloc(ceph_osd_request_cache, gfp_flags);
 	} else {
-		req = kmem_cache_zalloc(ceph_osd_request_cache, gfp_flags);
+		BUG_ON(num_ops > CEPH_OSD_MAX_OPS);
+		req = kmalloc(sizeof(*req) + num_ops * sizeof(req->r_ops[0]),
+			      gfp_flags);
 	}
-	if (req == NULL)
+	if (unlikely(!req))
 		return NULL;
 
+	/* req only, each op is zeroed in _osd_req_op_init() */
+	memset(req, 0, sizeof(*req));
+
 	req->r_osdc = osdc;
 	req->r_mempool = use_mempool;
 	req->r_num_ops = num_ops;
@@ -408,18 +403,36 @@
 	req->r_base_oloc.pool = -1;
 	req->r_target_oloc.pool = -1;
 
+	msg_size = OSD_OPREPLY_FRONT_LEN;
+	if (num_ops > CEPH_OSD_SLAB_OPS) {
+		/* ceph_osd_op and rval */
+		msg_size += (num_ops - CEPH_OSD_SLAB_OPS) *
+			    (sizeof(struct ceph_osd_op) + 4);
+	}
+
 	/* create reply message */
 	if (use_mempool)
 		msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
 	else
-		msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
-				   OSD_OPREPLY_FRONT_LEN, gfp_flags, true);
+		msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size,
+				   gfp_flags, true);
 	if (!msg) {
 		ceph_osdc_put_request(req);
 		return NULL;
 	}
 	req->r_reply = msg;
 
+	msg_size = 4 + 4 + 4; /* client_inc, osdmap_epoch, flags */
+	msg_size += 4 + 4 + 4 + 8; /* mtime, reassert_version */
+	msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
+	msg_size += 1 + 8 + 4 + 4; /* pgid */
+	msg_size += 4 + CEPH_MAX_OID_NAME_LEN; /* oid */
+	msg_size += 2 + num_ops * sizeof(struct ceph_osd_op);
+	msg_size += 8; /* snapid */
+	msg_size += 8; /* snap_seq */
+	msg_size += 4 + 8 * (snapc ? snapc->num_snaps : 0); /* snaps */
+	msg_size += 4; /* retry_attempt */
+
 	/* create request message; allow space for oid */
 	if (use_mempool)
 		msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
@@ -498,7 +511,7 @@
 	if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_WRITEFULL)
 		payload_len += length;
 
-	op->payload_len = payload_len;
+	op->indata_len = payload_len;
 }
 EXPORT_SYMBOL(osd_req_op_extent_init);
 
@@ -517,10 +530,32 @@
 	BUG_ON(length > previous);
 
 	op->extent.length = length;
-	op->payload_len -= previous - length;
+	op->indata_len -= previous - length;
 }
 EXPORT_SYMBOL(osd_req_op_extent_update);
 
+void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req,
+				unsigned int which, u64 offset_inc)
+{
+	struct ceph_osd_req_op *op, *prev_op;
+
+	BUG_ON(which + 1 >= osd_req->r_num_ops);
+
+	prev_op = &osd_req->r_ops[which];
+	op = _osd_req_op_init(osd_req, which + 1, prev_op->op, prev_op->flags);
+	/* dup previous one */
+	op->indata_len = prev_op->indata_len;
+	op->outdata_len = prev_op->outdata_len;
+	op->extent = prev_op->extent;
+	/* adjust offset */
+	op->extent.offset += offset_inc;
+	op->extent.length -= offset_inc;
+
+	if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL)
+		op->indata_len -= offset_inc;
+}
+EXPORT_SYMBOL(osd_req_op_extent_dup_last);
+
 void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
 			u16 opcode, const char *class, const char *method)
 {
@@ -554,7 +589,7 @@
 
 	op->cls.argc = 0;	/* currently unused */
 
-	op->payload_len = payload_len;
+	op->indata_len = payload_len;
 }
 EXPORT_SYMBOL(osd_req_op_cls_init);
 
@@ -587,7 +622,7 @@
 	op->xattr.cmp_mode = cmp_mode;
 
 	ceph_osd_data_pagelist_init(&op->xattr.osd_data, pagelist);
-	op->payload_len = payload_len;
+	op->indata_len = payload_len;
 	return 0;
 }
 EXPORT_SYMBOL(osd_req_op_xattr_init);
@@ -707,7 +742,7 @@
 			BUG_ON(osd_data->type == CEPH_OSD_DATA_TYPE_NONE);
 			dst->cls.indata_len = cpu_to_le32(data_length);
 			ceph_osdc_msg_data_add(req->r_request, osd_data);
-			src->payload_len += data_length;
+			src->indata_len += data_length;
 			request_data_len += data_length;
 		}
 		osd_data = &src->cls.response_data;
@@ -750,7 +785,7 @@
 
 	dst->op = cpu_to_le16(src->op);
 	dst->flags = cpu_to_le32(src->flags);
-	dst->payload_len = cpu_to_le32(src->payload_len);
+	dst->payload_len = cpu_to_le32(src->indata_len);
 
 	return request_data_len;
 }
@@ -1810,7 +1845,7 @@
 
 	ceph_decode_need(&p, end, 4, bad_put);
 	numops = ceph_decode_32(&p);
-	if (numops > CEPH_OSD_MAX_OP)
+	if (numops > CEPH_OSD_MAX_OPS)
 		goto bad_put;
 	if (numops != req->r_num_ops)
 		goto bad_put;
@@ -1821,7 +1856,7 @@
 		int len;
 
 		len = le32_to_cpu(op->payload_len);
-		req->r_reply_op_len[i] = len;
+		req->r_ops[i].outdata_len = len;
 		dout(" op %d has %d bytes\n", i, len);
 		payload_len += len;
 		p += sizeof(*op);
@@ -1836,7 +1871,7 @@
 	ceph_decode_need(&p, end, 4 + numops * 4, bad_put);
 	retry_attempt = ceph_decode_32(&p);
 	for (i = 0; i < numops; i++)
-		req->r_reply_op_result[i] = ceph_decode_32(&p);
+		req->r_ops[i].rval = ceph_decode_32(&p);
 
 	if (le16_to_cpu(msg->hdr.version) >= 6) {
 		p += 8 + 4; /* skip replay_version */
@@ -2187,7 +2222,8 @@
 		goto bad;
 done:
 	downgrade_write(&osdc->map_sem);
-	ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
+	ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
+			  osdc->osdmap->epoch);
 
 	/*
 	 * subscribe to subsequent osdmap updates if full to ensure
@@ -2646,8 +2682,8 @@
 	    round_jiffies_relative(osdc->client->options->osd_idle_ttl));
 
 	err = -ENOMEM;
-	osdc->req_mempool = mempool_create_kmalloc_pool(10,
-					sizeof(struct ceph_osd_request));
+	osdc->req_mempool = mempool_create_slab_pool(10,
+						     ceph_osd_request_cache);
 	if (!osdc->req_mempool)
 		goto out;
 
@@ -2782,11 +2818,12 @@
 
 int ceph_osdc_setup(void)
 {
+	size_t size = sizeof(struct ceph_osd_request) +
+	    CEPH_OSD_SLAB_OPS * sizeof(struct ceph_osd_req_op);
+
 	BUG_ON(ceph_osd_request_cache);
-	ceph_osd_request_cache = kmem_cache_create("ceph_osd_request",
-					sizeof (struct ceph_osd_request),
-					__alignof__(struct ceph_osd_request),
-					0, NULL);
+	ceph_osd_request_cache = kmem_cache_create("ceph_osd_request", size,
+						   0, 0, NULL);
 
 	return ceph_osd_request_cache ? 0 : -ENOMEM;
 }