The unified diff between revisions [750f4172..] and [027bf4ad..] is displayed below. It can also be downloaded as a raw diff.
#
#
# add_file "doc/README.coss"
# content [f3d1795611ceb5a15d21983cce6a8a79b2679b8a]
#
# add_file "src/fs/coss/coss_dump.c"
# content [5f1e07193408a5e9d857e9a6ae6e03a3ea52363f]
#
# patch "src/enums.h"
# from [955437e924a040325c5940579961596c655549ca]
# to [cdbb572d3e510c8b6c856c755c8ce78577f6b838]
#
# patch "src/fs/aufs/aiops.c"
# from [001958a88018bc0aa584d1ba827f1555de05f798]
# to [f4333dc404ce40eb59edcca5eea7fe1a51ab2a5b]
#
# patch "src/fs/coss/async_io.c"
# from [e99230ad47e7444f12a3a76b42e2550418f90934]
# to [c6cb2ad9fdc22b1d16b3bff55b7567630b7e44a6]
#
# patch "src/fs/coss/coss-notes.txt"
# from [e939839c0002eb394f332071abe36b959a9574fe]
# to [d9484ace977b7771e294485657f99c87812b1ec8]
#
# patch "src/fs/coss/store_coss.h"
# from [f7b8c89f19545d3a75c073a25b44f22612e3e41d]
# to [8d5088bd9045dad182e38790fa7421d23d5a03d2]
#
# patch "src/fs/coss/store_dir_coss.c"
# from [371e6c070f6001f5e22de7286ec60b880529ff4b]
# to [c73efc86785c4f33374493b8db62904ec58b7df3]
#
# patch "src/fs/coss/store_io_coss.c"
# from [f8c1c7290dd460e33dd473ce2eb350a8f542e14a]
# to [6c6acd007a53f403d82cbe9e5a92dc4e64c0f97c]
#
# patch "src/store_client.c"
# from [d961bf8e2ac9cf641164b0dd9c6b5d0cef3688f0]
# to [35cf0d9660b7ea121d8da18a53b6d958cf75dc3e]
#
# patch "src/store_swapmeta.c"
# from [7d767872403f2adbc507a0fc81661dd161046f53]
# to [369d3779b78fe2673c4b4bbc83c4b85ba758eeac]
#
============================================================
--- doc/README.coss f3d1795611ceb5a15d21983cce6a8a79b2679b8a
+++ doc/README.coss f3d1795611ceb5a15d21983cce6a8a79b2679b8a
@@ -0,0 +1,43 @@
+
+COSS needs some more work done before it can be considered ready
+for prime-time.
+
+The current list:
+
+* COSS does have some bugs which result in swapin errors. These errors don't
+ affect the client - it just triggers a cache miss - but they should be tracked
+ down and squished.
+ + I think its to do with a race condition during open:
+ * object gets created, and written to the stripe
+ * stripe gets written to disk
+ * object is opened again; this triggers a reallocation to a fresh place
+ in the new stripe and a read is scheduled
+ * object is opened AGAIN before the object is reallocated - this time the
+ read simply memcpy()s the stripe data, assuming said stripe data is valid
+ * .. and gets 0's..
+
+* Rebuilding from a dirty cache isn't supported. Ideally the 'swap log' should
+ be inline to the COSS storage system: the object metadata and swap log should
+ be written out as part of the stripe data.
+
+* Make sure that a rebuild, clean or dirty, doesn't end up pointing to 'bad'
+ object data.
+
+* Better documentation and examples.
+
+The future list, time permitting:
+
+* The store swapout/swapin API isn't that great for COSS. The biggest optimisation:
+ allowing the store dir side to specify how much data its able to fill in, rather
+ than copying it all a few times in fixed size pages.
+
+* COSS can only be selected at present if the object size is known up-front (and we
+ don't try writing out more data than we said we would! This should also be checked!)
+ This puts quite a restriction on what COSS can do. The swapout logic should be
+ changed somewhat; the final swapout decision should be left until we know no storedir
+ can satisfy the swapout. The easiest way is to take the MAX() of the specified max_file_size
+ parameters for each storedir and delay making the swapout decision until we've read more
+ than that. COSS can then get another chance at being selected. This has the added advantage
+ of having the whole object already in memory, allowing it to be copied over in one hit
+ rather than in piecemeal chunks.
+
============================================================
--- src/fs/coss/coss_dump.c 5f1e07193408a5e9d857e9a6ae6e03a3ea52363f
+++ src/fs/coss/coss_dump.c 5f1e07193408a5e9d857e9a6ae6e03a3ea52363f
@@ -0,0 +1,197 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <assert.h>
+#include <sys/types.h>
+#include <fcntl.h>
+
+#include "defines.h"
+#include "enums.h"
+
+struct _tlv;
+typedef struct _tlv tlv;
+
+struct _tlv {
+ char type;
+ int length;
+ void *value;
+ tlv *next;
+};
+
+#undef debug
+#define debug(a, b) printf
+
+#define MEM_TLV sizeof(tlv)
+#define memAllocate(a) malloc(a)
+#define memFree(a, b) free(a)
+#define xmemcpy(a, b, c) memcpy(a, b, c)
+#define xmalloc(a) malloc(a)
+#define xfree(a) free(a)
+
+#define squid_off_t off_t
+
+static tlv **
+storeSwapTLVAdd(int type, const void *ptr, size_t len, tlv ** tail)
+{
+ tlv *t = memAllocate(MEM_TLV);
+ t->type = (char) type;
+ t->length = (int) len;
+ t->value = xmalloc(len);
+ xmemcpy(t->value, ptr, len);
+ *tail = t;
+ return &t->next; /* return new tail pointer */
+}
+
+void
+storeSwapTLVFree(tlv * n)
+{
+ tlv *t;
+ while ((t = n) != NULL) {
+ n = t->next;
+ xfree(t->value);
+ memFree(t, MEM_TLV);
+ }
+}
+
+char *
+storeSwapMetaPack(tlv * tlv_list, int *length)
+{
+ int buflen = 0;
+ tlv *t;
+ int j = 0;
+ char *buf;
+ assert(length != NULL);
+ buflen++; /* STORE_META_OK */
+ buflen += sizeof(int); /* size of header to follow */
+ for (t = tlv_list; t; t = t->next)
+ buflen += sizeof(char) + sizeof(int) + t->length;
+ buflen++; /* STORE_META_END */
+ buf = xmalloc(buflen);
+ buf[j++] = (char) STORE_META_OK;
+ xmemcpy(&buf[j], &buflen, sizeof(int));
+ j += sizeof(int);
+ for (t = tlv_list; t; t = t->next) {
+ buf[j++] = (char) t->type;
+ xmemcpy(&buf[j], &t->length, sizeof(int));
+ j += sizeof(int);
+ xmemcpy(&buf[j], t->value, t->length);
+ j += t->length;
+ }
+ buf[j++] = (char) STORE_META_END;
+ assert((int) j == buflen);
+ *length = buflen;
+ return buf;
+}
+
+tlv *
+storeSwapMetaUnpack(const char *buf, int *hdr_len)
+{
+ tlv *TLV; /* we'll return this */
+ tlv **T = &TLV;
+ char type;
+ int length;
+ int buflen;
+ int j = 0;
+ assert(buf != NULL);
+ assert(hdr_len != NULL);
+ if (buf[j++] != (char) STORE_META_OK)
+ return NULL;
+ xmemcpy(&buflen, &buf[j], sizeof(int));
+ j += sizeof(int);
+ /*
+ * sanity check on 'buflen' value. It should be at least big
+ * enough to hold one type and one length.
+ */
+ if (buflen <= (sizeof(char) + sizeof(int)))
+ return NULL;
+ while (buflen - j > (sizeof(char) + sizeof(int))) {
+ type = buf[j++];
+ /* VOID is reserved, but allow some slack for new types.. */
+ if (type <= STORE_META_VOID || type > STORE_META_END + 10) {
+ debug(20, 0) ("storeSwapMetaUnpack: bad type (%d)!\n", type);
+ break;
+ }
+ xmemcpy(&length, &buf[j], sizeof(int));
+ if (length < 0 || length > (1 << 16)) {
+ debug(20, 0) ("storeSwapMetaUnpack: insane length (%d)!\n", length);
+ break;
+ }
+ j += sizeof(int);
+ if (j + length > buflen) {
+ debug(20, 0) ("storeSwapMetaUnpack: overflow!\n");
+ debug(20, 0) ("\ttype=%d, length=%d, buflen=%d, offset=%d\n",
+ type, length, buflen, (int) j);
+ break;
+ }
+ T = storeSwapTLVAdd(type, &buf[j], (size_t) length, T);
+ j += length;
+ }
+ *hdr_len = buflen;
+ return TLV;
+}
+
+
+#define STRIPESIZE 1048576
+#define BLOCKSIZE 1024
+#define BLKBITS 10
+
+void
+parse_stripe(int stripeid, char *buf, int len)
+{
+ int j = 0;
+ int o = 0;
+ int bl = 0;
+ tlv *t, *tlv_list;
+ int64_t *l;
+ int tmp;
+
+ while (j < len) {
+ l = NULL;
+ bl = 0;
+ tlv_list = storeSwapMetaUnpack(&buf[j], &bl);
+ if (tlv_list == NULL) {
+ printf(" Object: NULL\n");
+ return;
+ }
+ printf(" Object: (filen %d) hdr size %d\n", j / BLOCKSIZE + (stripeid * STRIPESIZE / BLOCKSIZE), bl);
+ for (t = tlv_list; t; t = t->next) {
+ switch(t->type) {
+ case STORE_META_URL:
+ printf(" URL: %s\n", t->value);
+ break;
+ case STORE_META_OBJSIZE:
+ l = t->value;
+ printf("Size: %lld (len %d)\n", *l, t->length);
+ break;
+ }
+ }
+ if (l == NULL) {
+ printf(" STRIPE: Completed, got an object with no size\n");
+ return;
+ }
+ j = j + *l + bl;
+ /* And now, the blocksize! */
+ tmp = j / BLOCKSIZE;
+ tmp = (tmp+1) * BLOCKSIZE;
+ j = tmp;
+ }
+}
+
+int
+main(int argc, char *argv[])
+{
+ int fd;
+ char buf[STRIPESIZE];
+ int i = 0, len;
+
+ fd = open(argv[1], O_RDONLY);
+ if (fd < 0) {
+ perror("open");
+ exit(1);
+ }
+ while ((len = read(fd, buf, STRIPESIZE)) > 0) {
+ printf("STRIPE: %d (len %d)\n", i, len);
+ parse_stripe(i, buf, len);
+ i++;
+ }
+}
============================================================
--- src/enums.h 955437e924a040325c5940579961596c655549ca
+++ src/enums.h cdbb572d3e510c8b6c856c755c8ce78577f6b838
@@ -629,6 +629,7 @@ enum {
STORE_META_VALID,
STORE_META_VARY_HEADERS, /* Stores Vary request headers */
STORE_META_STD_LFS, /* standard metadata in lfs format */
+ STORE_META_OBJSIZE, /* object size, if its known */
STORE_META_END
};
============================================================
--- src/fs/aufs/aiops.c 001958a88018bc0aa584d1ba827f1555de05f798
+++ src/fs/aufs/aiops.c f4333dc404ce40eb59edcca5eea7fe1a51ab2a5b
@@ -51,6 +51,16 @@
#include <sched.h>
#endif
+/* For pread()/pwrite() */
+#define __USE_UNIX98
+#include <unistd.h>
+
+ssize_t pread64(int fd, void *buf, size_t count, off_t offset);
+ssize_t pwrite64(int fd, const void *buf, size_t count, off_t offset);
+#define pread pread64
+#define pwrite pwrite64
+
+
#define RIDICULOUS_LENGTH 4096
#ifdef AUFS_IO_THREADS
@@ -139,17 +149,17 @@ static int squidaio_initialised = 0;
static squidaio_thread_t *threads = NULL;
static int squidaio_initialised = 0;
-#define AIO_LARGE_BUFS 16384
-#define AIO_MEDIUM_BUFS AIO_LARGE_BUFS >> 1
-#define AIO_SMALL_BUFS AIO_LARGE_BUFS >> 2
-#define AIO_TINY_BUFS AIO_LARGE_BUFS >> 3
+#define AIO_LARGE_BUFS 65536
+#define AIO_MEDIUM_BUFS 8192
+#define AIO_SMALL_BUFS 4096
+#define AIO_TINY_BUFS 2048
#define AIO_MICRO_BUFS 128
static MemPool *squidaio_large_bufs = NULL; /* 16K */
static MemPool *squidaio_medium_bufs = NULL; /* 8K */
static MemPool *squidaio_small_bufs = NULL; /* 4K */
static MemPool *squidaio_tiny_bufs = NULL; /* 2K */
-static MemPool *squidaio_micro_bufs = NULL; /* 128K */
+static MemPool *squidaio_micro_bufs = NULL; /* 128 */
static int request_queue_len = 0;
static MemPool *squidaio_request_pool = NULL;
@@ -666,8 +676,11 @@ squidaio_do_read(squidaio_request_t * re
static void
squidaio_do_read(squidaio_request_t * requestp)
{
+#if 0
lseek(requestp->fd, requestp->offset, requestp->whence);
requestp->ret = read(requestp->fd, requestp->bufferp, requestp->buflen);
+#endif
+ requestp->ret = pread(requestp->fd, requestp->bufferp, requestp->buflen, requestp->offset);
requestp->err = errno;
}
@@ -695,7 +708,12 @@ squidaio_do_write(squidaio_request_t * r
static void
squidaio_do_write(squidaio_request_t * requestp)
{
- requestp->ret = write(requestp->fd, requestp->bufferp, requestp->buflen);
+#if 0
+ if (requestp->offset != -1) {
+ lseek(requestp->fd, requestp->offset, requestp->whence);
+ }
+#endif
+ requestp->ret = pwrite(requestp->fd, requestp->bufferp, requestp->buflen, requestp->offset);
requestp->err = errno;
}
============================================================
--- src/fs/coss/async_io.c e99230ad47e7444f12a3a76b42e2550418f90934
+++ src/fs/coss/async_io.c c6cb2ad9fdc22b1d16b3bff55b7567630b7e44a6
@@ -61,6 +61,7 @@ a_file_read(async_queue_t * q, int fd, v
async_queue_entry_t *qe;
assert(q->aq_state == AQ_STATE_SETUP);
+ assert(offset >= 0);
/* Find a free slot */
slot = a_file_findslot(q);
@@ -109,6 +110,7 @@ a_file_write(async_queue_t * q, int fd,
async_queue_entry_t *qe;
assert(q->aq_state == AQ_STATE_SETUP);
+ assert(offset >= 0);
/* Find a free slot */
slot = a_file_findslot(q);
@@ -196,6 +198,9 @@ a_file_callback(async_queue_t * q)
fd = aqe->aq_e_fd;
type = aqe->aq_e_type;
+ /* debugging assert */
+ assert(reterr == 0);
+
/* Free slot */
memset(aqe, 0, sizeof(async_queue_entry_t));
aqe->aq_e_state = AQ_ENTRY_FREE;
============================================================
--- src/fs/coss/coss-notes.txt e939839c0002eb394f332071abe36b959a9574fe
+++ src/fs/coss/coss-notes.txt d9484ace977b7771e294485657f99c87812b1ec8
@@ -121,3 +121,11 @@
However, COSS_ALLOC_NOTIFY was still present in the store_dir_coss.c
rebuild routines. To avoid assertions during rebuild, I commented
out the storeCossAllocate(SD, e, COSS_ALLOC_NOTIFY) call.
+
+-- Notes: Adrian Chadd, 9/May/2006
+
+* The types used by COSS have been modified to support Large file support,
+ at least under Linux. One can compile with --with-large-files to make
+ sure the right options have been enabled. no compile or run-time checks
+ are currently made to ensure the code has been compiled to support
+ large filesystems.. at least yet.
============================================================
--- src/fs/coss/store_coss.h f7b8c89f19545d3a75c073a25b44f22612e3e41d
+++ src/fs/coss/store_coss.h 8d5088bd9045dad182e38790fa7421d23d5a03d2
@@ -7,13 +7,41 @@
/* Note that swap_filen in sio/e are actually disk offsets too! */
+typedef struct _cossmembuf CossMemBuf;
+typedef struct _cossinfo CossInfo;
+typedef struct _cossstate CossState;
+typedef struct _cossindex CossIndexNode;
+typedef struct _coss_pending_reloc CossPendingReloc;
+typedef struct _coss_read_op CossReadOp;
+
/* What we're doing in storeCossAllocate() */
#define COSS_ALLOC_NOTIFY 0
#define COSS_ALLOC_ALLOCATE 1
#define COSS_ALLOC_REALLOC 2
+/*
+ * Define this if you would like to use the aufs IO method for
+ * disk IO instead of the POSIX AIO method.
+ */
+#define USE_AUFSOPS 1
+
+#if USE_AUFSOPS
+/* XXX a hack; the async ops should be broken out! */
+typedef void AIOCB(int fd, void *cbdata, const char *buf,
+ int aio_return, int aio_errno);
+void aioWrite(int, off_t offset, char *, int size, AIOCB *, void *, FREE *);
+void aioRead(int, off_t offset, int size, AIOCB *, void *);
+void aioInit(void);
+int aioCheckCallbacks(SwapDir *);
+void aioSync(SwapDir *);
+void squidaio_init(void);
+void squidaio_shutdown(void);
+#endif
+
+
struct _coss_stats {
int stripes;
+ int dead_stripes;
struct {
int alloc;
int realloc;
@@ -33,32 +61,82 @@ struct _cossmembuf {
struct _cossmembuf {
dlink_node node;
- size_t diskstart; /* in bytes */
- size_t diskend; /* in bytes */
+ off_t diskstart; /* in bytes */
+ off_t diskend; /* in bytes */
+ int stripe;
SwapDir *SD;
int lockcount;
char buffer[COSS_MEMBUF_SZ];
struct _cossmembuf_flags {
unsigned int full:1;
unsigned int writing:1;
+ unsigned int written:1;
+ unsigned int dead:1;
} flags;
+ int numobjs;
};
+typedef enum {
+ COSS_OP_NONE,
+ COSS_OP_READ,
+} coss_op_t;
+struct _coss_read_op {
+ /*
+ * callback/callback data are part of the sio, and only one
+ * read op will be scheduled at any time
+ */
+ coss_op_t type;
+ dlink_node node; /* per-storedir list */
+ dlink_node pending_op_node; /* children of the parent op we're blocking on */
+ storeIOState *sio;
+ size_t requestlen;
+ size_t requestoffset; /* in blocks */
+ off_t reqdiskoffset; /* in blocks */
+ char *requestbuf;
+ char completed;
+ CossPendingReloc *pr; /* NULL if we're not on a pending op list yet */
+};
+
+struct _cossstripe {
+ int id;
+ int numdiskobjs;
+ int pending_relocs;
+ struct _cossmembuf *membuf;
+};
+
+struct _coss_pending_reloc {
+ CossInfo *cs;
+ dlink_node node;
+ size_t len;
+ sfileno original_filen, new_filen; /* in blocks, not in bytes */
+ dlink_list ops;
+ char *p;
+};
+
+
/* Per-storedir info */
struct _cossinfo {
dlink_list membufs;
+ dlink_list dead_membufs;
struct _cossmembuf *current_membuf;
- size_t current_offset; /* in bytes */
+ off_t current_offset; /* in bytes */
int fd;
int swaplog_fd;
int numcollisions;
dlink_list index;
+ dlink_list pending_relocs;
+ dlink_list pending_ops;
+ int pending_reloc_count;
int count;
async_queue_t aq;
dlink_node *walk_current;
unsigned int blksz_bits;
unsigned int blksz_mask; /* just 1<<blksz_bits - 1 */
+
+ int numstripes;
+ struct _cossstripe *stripes;
+ int curstripe;
};
struct _cossindex {
@@ -71,28 +149,26 @@ struct _cossstate {
/* Per-storeiostate info */
struct _cossstate {
- char *readbuffer;
char *requestbuf;
size_t requestlen;
size_t requestoffset; /* in blocks */
- sfileno reqdiskoffset; /* in blocks */
+ off_t reqdiskoffset; /* in blocks */
struct {
unsigned int reading:1;
unsigned int writing:1;
+ unsigned int reloc:1;
} flags;
struct _cossmembuf *locked_membuf;
};
-typedef struct _cossmembuf CossMemBuf;
-typedef struct _cossinfo CossInfo;
-typedef struct _cossstate CossState;
-typedef struct _cossindex CossIndexNode;
/* Whether the coss system has been setup or not */
extern int coss_initialised;
extern MemPool *coss_membuf_pool;
extern MemPool *coss_state_pool;
extern MemPool *coss_index_pool;
+extern MemPool *coss_realloc_pool;
+extern MemPool *coss_op_pool;
/*
* Store IO stuff
@@ -108,6 +184,8 @@ extern void storeCossStartMembuf(SwapDir
extern void storeCossAdd(SwapDir *, StoreEntry *);
extern void storeCossRemove(SwapDir *, StoreEntry *);
extern void storeCossStartMembuf(SwapDir * SD);
+extern void membufsDump(CossInfo *cs, StoreEntry *e);
+extern void storeCossFreeDeadMemBufs(CossInfo *cs);
extern struct _coss_stats coss_stats;
============================================================
--- src/fs/coss/store_dir_coss.c 371e6c070f6001f5e22de7286ec60b880529ff4b
+++ src/fs/coss/store_dir_coss.c c73efc86785c4f33374493b8db62904ec58b7df3
@@ -46,6 +46,8 @@ MemPool *coss_index_pool = NULL;
int coss_initialised = 0;
MemPool *coss_state_pool = NULL;
MemPool *coss_index_pool = NULL;
+MemPool *coss_realloc_pool = NULL;
+MemPool *coss_op_pool = NULL;
typedef struct _RebuildState RebuildState;
struct _RebuildState {
@@ -166,7 +168,12 @@ storeCossDirInit(SwapDir * sd)
storeCossDirInit(SwapDir * sd)
{
CossInfo *cs = (CossInfo *) sd->fsdata;
+#if USE_AUFSOPS
+ aioInit();
+ squidaio_init();
+#else
a_file_setupqueue(&cs->aq);
+#endif
storeCossDirOpenSwapLog(sd);
storeCossDirRebuild(sd);
cs->fd = file_open(sd->path, O_RDWR | O_CREAT);
@@ -672,7 +679,11 @@ storeCossDirShutdown(SwapDir * SD)
CossInfo *cs = (CossInfo *) SD->fsdata;
storeCossSync(SD); /* This'll call a_file_syncqueue() */
+#if USE_AUFSOPS
+ aioSync(SD);
+#else
a_file_closequeue(&cs->aq);
+#endif
file_close(cs->fd);
cs->fd = -1;
@@ -719,8 +730,13 @@ storeCossDirCallback(SwapDir * SD)
storeCossDirCallback(SwapDir * SD)
{
CossInfo *cs = (CossInfo *) SD->fsdata;
-
+ storeCossFreeDeadMemBufs(cs);
+#if USE_AUFSOPS
+ /* I believe this call, at the present, checks all callbacks for all SDs, not just ours */
+ return aioCheckCallbacks(SD);
+#else
return a_file_callback(&cs->aq);
+#endif
}
/* ========== LOCAL FUNCTIONS ABOVE, GLOBAL FUNCTIONS BELOW ========== */
@@ -749,6 +765,8 @@ storeCossDirStats(SwapDir * SD, StoreEnt
if (SD->flags.read_only)
storeAppendPrintf(sentry, " READ-ONLY");
storeAppendPrintf(sentry, "\n");
+ storeAppendPrintf(sentry, "Pending Relocations: %d\n", cs->pending_reloc_count);
+ membufsDump(cs, sentry);
}
static void
@@ -834,9 +852,16 @@ storeCossDirParse(SwapDir * sd, int inde
debug(47, 0) ("COSS cache_dir size = %d KB\n", sd->max_size);
fatal("COSS cache_dir size exceeds largest offset\n");
}
+ cs->numstripes = (off_t)(sd->max_size << 10) / COSS_MEMBUF_SZ;
+ debug(47, 1) ("COSS: number of stripes: %d of %d bytes each\n", cs->numstripes, COSS_MEMBUF_SZ);
+ cs->stripes = xcalloc(cs->numstripes, sizeof(struct _cossstripe));
+ for (i = 0; i < cs->numstripes; i++) {
+ cs->stripes[i].id = i;
+ cs->stripes[i].membuf = NULL;
+ cs->stripes[i].numdiskobjs = -1;
+ }
}
-
static void
storeCossDirReconfigure(SwapDir * sd, int index, char *path)
{
@@ -974,9 +999,9 @@ storeCossStats(StoreEntry * sentry)
"write", coss_stats.write.ops, coss_stats.write.success, coss_stats.write.fail);
storeAppendPrintf(sentry, tbl_fmt,
"s_write", coss_stats.stripe_write.ops, coss_stats.stripe_write.success, coss_stats.stripe_write.fail);
-
storeAppendPrintf(sentry, "\n");
storeAppendPrintf(sentry, "stripes: %d\n", coss_stats.stripes);
+ storeAppendPrintf(sentry, "dead_stripes: %d\n", coss_stats.dead_stripes);
storeAppendPrintf(sentry, "alloc.alloc: %d\n", coss_stats.alloc.alloc);
storeAppendPrintf(sentry, "alloc.realloc: %d\n", coss_stats.alloc.realloc);
storeAppendPrintf(sentry, "alloc.collisions: %d\n", coss_stats.alloc.collisions);
@@ -996,6 +1021,8 @@ storeFsSetup_coss(storefs_entry_t * stor
storefs->donefunc = storeCossDirDone;
coss_state_pool = memPoolCreate("COSS IO State data", sizeof(CossState));
coss_index_pool = memPoolCreate("COSS index data", sizeof(CossIndexNode));
+ coss_realloc_pool = memPoolCreate("COSS pending realloc", sizeof(CossPendingReloc));
+ coss_op_pool = memPoolCreate("COSS pending operation", sizeof(CossReadOp));
cachemgrRegister("coss", "COSS Stats", storeCossStats, 0, 1);
coss_initialised = 1;
}
============================================================
--- src/fs/coss/store_io_coss.c f8c1c7290dd460e33dd473ce2eb350a8f542e14a
+++ src/fs/coss/store_io_coss.c 6c6acd007a53f403d82cbe9e5a92dc4e64c0f97c
@@ -38,25 +38,43 @@
#include "async_io.h"
#include "store_coss.h"
+#if USE_AUFSOPS
+static AIOCB storeCossWriteMemBufDone;
+#else
static DWCB storeCossWriteMemBufDone;
-static DRCB storeCossReadDone;
+#endif
static void storeCossIOCallback(storeIOState * sio, int errflag);
-static char *storeCossMemPointerFromDiskOffset(SwapDir * SD, size_t offset, CossMemBuf ** mb);
+static char *storeCossMemPointerFromDiskOffset(CossInfo *cs, off_t offset, CossMemBuf ** mb);
static void storeCossMemBufLock(SwapDir * SD, storeIOState * e);
static void storeCossMemBufUnlock(SwapDir * SD, storeIOState * e);
static void storeCossWriteMemBuf(SwapDir * SD, CossMemBuf * t);
-static void storeCossWriteMemBufDone(int fd, int errflag, size_t len, void *my_data);
-static CossMemBuf *storeCossCreateMemBuf(SwapDir * SD, size_t start,
- sfileno curfn, int *collision);
+static CossMemBuf *storeCossCreateMemBuf(SwapDir * SD, int stripe, sfileno curfn, int *collision);
static CBDUNL storeCossIOFreeEntry;
static off_t storeCossFilenoToDiskOffset(sfileno f, CossInfo *);
static sfileno storeCossDiskOffsetToFileno(off_t o, CossInfo *);
static void storeCossMaybeWriteMemBuf(SwapDir * SD, CossMemBuf * t);
+static void storeCossMaybeFreeBuf(CossInfo *cs, CossMemBuf *mb);
+static int storeCossFilenoToStripe(CossInfo *cs, sfileno filen);
static void membuf_describe(CossMemBuf * t, int level, int line);
+/* Handle relocates - temporary routines until readops have been fleshed out */
+void storeCossNewPendingRelocate(CossInfo *cs, storeIOState *sio, sfileno original_filen, sfileno new_filen);
+CossPendingReloc * storeCossGetPendingReloc(CossInfo *cs, sfileno new_filen);
+#if USE_AUFSOPS
+AIOCB storeCossCompletePendingReloc;
+#else
+DRCB storeCossCompletePendingReloc;
+#endif
+
+/* Read operation code */
+CossReadOp * storeCossCreateReadOp(CossInfo *cs, storeIOState *sio);
+void storeCossCompleteReadOp(CossInfo *cs, CossReadOp *op, int error);
+void storeCossKickReadOp(CossInfo *cs, CossReadOp *op);
+
CBDATA_TYPE(storeIOState);
CBDATA_TYPE(CossMemBuf);
+CBDATA_TYPE(CossPendingReloc);
/* === PUBLIC =========================================================== */
@@ -75,6 +93,7 @@ storeCossAllocate(SwapDir * SD, const St
off_t retofs;
size_t allocsize;
int coll = 0;
+ sfileno f;
sfileno checkf;
/* Make sure we chcek collisions if reallocating */
@@ -104,8 +123,9 @@ storeCossAllocate(SwapDir * SD, const St
cs->current_membuf->flags.full = 1;
cs->current_membuf->diskend = cs->current_offset;
storeCossMaybeWriteMemBuf(SD, cs->current_membuf);
+ /* cs->current_membuf may be invalid at this point */
cs->current_offset = 0; /* wrap back to beginning */
- debug(79, 2) ("storeCossAllocate: wrap to 0\n");
+ debug(79, 2) ("storeCossAllocate: %s: wrap to 0\n", SD->path);
newmb = storeCossCreateMemBuf(SD, 0, checkf, &coll);
cs->current_membuf = newmb;
@@ -119,21 +139,27 @@ storeCossAllocate(SwapDir * SD, const St
cs->current_membuf->flags.full = 1;
cs->current_offset = cs->current_membuf->diskend;
storeCossMaybeWriteMemBuf(SD, cs->current_membuf);
- debug(79, 2) ("storeCossAllocate: New offset - %ld\n",
- (long int) cs->current_offset);
- newmb = storeCossCreateMemBuf(SD, cs->current_offset, checkf, &coll);
+ /* cs->current_membuf may be invalid at this point */
+ debug(79, 3) ("storeCossAllocate: %s: New offset - %lld\n", SD->path,
+ (long long int) cs->current_offset);
+ assert(cs->curstripe < (cs->numstripes - 1));
+ newmb = storeCossCreateMemBuf(SD, cs->curstripe + 1, checkf, &coll);
cs->current_membuf = newmb;
}
/* If we didn't get a collision, then update the current offset and return it */
if (coll == 0) {
retofs = cs->current_offset;
cs->current_offset = retofs + allocsize;
+ cs->current_membuf->numobjs++;
/* round up to our blocksize */
cs->current_offset = ((cs->current_offset + cs->blksz_mask) >> cs->blksz_bits) << cs->blksz_bits;
- return storeCossDiskOffsetToFileno(retofs, cs);
+ f = storeCossDiskOffsetToFileno(retofs, cs);
+ assert(f >= 0 && f <= 0xffffff);
+ debug(79, 3) ("storeCossAllocate: offset %lld, filen: %d\n", retofs, f);
+ return f;
} else {
coss_stats.alloc.collisions++;
- debug(79, 3) ("storeCossAllocate: Collision\n");
+ debug(79, 3) ("storeCossAllocate: %s: Collision\n", SD->path);
return -1;
}
}
@@ -141,7 +167,7 @@ storeCossUnlink(SwapDir * SD, StoreEntry
void
storeCossUnlink(SwapDir * SD, StoreEntry * e)
{
- debug(79, 3) ("storeCossUnlink: offset %d\n", e->swap_filen);
+ debug(79, 3) ("storeCossUnlink: %s: offset %d\n", SD->path, e->swap_filen);
coss_stats.unlink.ops++;
coss_stats.unlink.success++;
storeCossRemove(SD, e);
@@ -174,10 +200,7 @@ storeCossCreate(SwapDir * SD, StoreEntry
sio->st_size = objectLen(e) + e->mem_obj->swap_hdr_sz;
sio->swap_dirn = SD->index;
sio->swap_filen = storeCossAllocate(SD, e, COSS_ALLOC_ALLOCATE);
- debug(79, 3) ("storeCossCreate: offset %ld, size %ld, end %ld\n",
- (long int) storeCossFilenoToDiskOffset(sio->swap_filen, SD->fsdata),
- (long int) sio->st_size,
- (long int) (sio->swap_filen + sio->st_size));
+ debug(79, 3) ("storeCossCreate: %p: filen: %d\n", sio, sio->swap_filen);
assert(-1 != sio->swap_filen);
sio->callback = callback;
@@ -188,7 +211,6 @@ storeCossCreate(SwapDir * SD, StoreEntry
cstate->flags.writing = 0;
cstate->flags.reading = 0;
- cstate->readbuffer = NULL;
cstate->reqdiskoffset = -1;
/* Now add it into the index list */
@@ -207,14 +229,15 @@ storeCossOpen(SwapDir * SD, StoreEntry *
char *p;
CossState *cstate;
sfileno f = e->swap_filen;
+ sfileno nf;
CossInfo *cs = (CossInfo *) SD->fsdata;
- debug(79, 3) ("storeCossOpen: offset %d\n", f);
- coss_stats.open.ops++;
-
sio = cbdataAlloc(storeIOState);
cstate = memPoolAlloc(coss_state_pool);
+ debug(79, 3) ("storeCossOpen: %p: offset %d\n", sio, f);
+ coss_stats.open.ops++;
+
sio->fsstate = cstate;
sio->swap_filen = f;
sio->swap_dirn = SD->index;
@@ -229,15 +252,18 @@ storeCossOpen(SwapDir * SD, StoreEntry *
cstate->flags.writing = 0;
cstate->flags.reading = 0;
- cstate->readbuffer = NULL;
cstate->reqdiskoffset = -1;
- p = storeCossMemPointerFromDiskOffset(SD, storeCossFilenoToDiskOffset(f, cs), NULL);
+
/* make local copy so we don't have to lock membuf */
+ p = storeCossMemPointerFromDiskOffset(cs, storeCossFilenoToDiskOffset(f, cs), NULL);
if (p) {
- cstate->readbuffer = xmalloc(sio->st_size);
- xmemcpy(cstate->readbuffer, p, sio->st_size);
coss_stats.open_mem_hits++;
+ // This seems to cause a crash: either the membuf pointer is set wrong or the membuf
+ // is deallocated from underneath us.
+ storeCossMemBufLock(SD, sio);
+ debug(79,3) ("storeCossOpen: %s: memory hit!\n", SD->path);
} else {
+ debug(79, 3) ("storeCossOpen: %s: memory miss - doing reallocation\n", SD->path);
/* Do the allocation */
/* this is the first time we've been called on a new sio
* read the whole object into memory, then return the
@@ -250,45 +276,50 @@ storeCossOpen(SwapDir * SD, StoreEntry *
* into the cossmembuf for later writing ..
*/
cstate->reqdiskoffset = storeCossFilenoToDiskOffset(sio->swap_filen, cs);
- sio->swap_filen = storeCossAllocate(SD, e, COSS_ALLOC_REALLOC);
- if (sio->swap_filen == -1) {
+ assert(cstate->reqdiskoffset >= 0);
+ nf = storeCossAllocate(SD, e, COSS_ALLOC_REALLOC);
+ if (nf == -1) {
/* We have to clean up neatly .. */
coss_stats.open.fail++;
cbdataFree(sio);
cs->numcollisions++;
- debug(79, 2) ("storeCossOpen: Reallocation of %d/%d failed\n", e->swap_dirn, e->swap_filen);
+ debug(79, 3) ("storeCossOpen: Reallocation of %d/%d failed\n", e->swap_dirn, e->swap_filen);
/* XXX XXX XXX Will squid call storeUnlink for this object? */
return NULL;
}
+ storeCossNewPendingRelocate(cs, sio, sio->swap_filen, nf);
+ sio->swap_filen = nf;
+ cstate->flags.reloc = 1;
/* Notify the upper levels that we've changed file number */
sio->file_callback(sio->callback_data, 0, sio);
-
+ /*
+ * lock the new buffer so it doesn't get swapped out on us
+ * this will get unlocked in storeCossClose
+ */
+ storeCossMemBufLock(SD, sio);
/*
- * lock the buffer so it doesn't get swapped out on us
- * this will get unlocked in storeCossClose
- */
- storeCossMemBufLock(SD, sio);
-
- /*
* Do the index magic to keep the disk and memory LRUs identical
*/
storeCossRemove(SD, e);
storeCossAdd(SD, e);
-
- /*
- * NOTE cstate->readbuffer is NULL. We'll actually read
- * the disk data into the MemBuf in storeCossRead() and
- * return that pointer back to the caller
- */
}
coss_stats.open.success++;
return sio;
}
+/*
+ * Aha! The unlocked membuf.
+ *
+ * If its storeCossCreate, then it was locked. Fine.
+ * If it was storeCossOpen() and we found the object in-stripe then cool,
+ * its locked.
+ * If it was storeCossOpen() and we didn't find the object in-stripe then
+ * we reallocated the object into the current stripe and locked THAT.
+ */
void
storeCossClose(SwapDir * SD, storeIOState * sio)
{
- debug(79, 3) ("storeCossClose: offset %d\n", sio->swap_filen);
+ debug(79, 3) ("storeCossClose: %p: offset %d\n", sio, sio->swap_filen);
coss_stats.close.ops++;
coss_stats.close.success++;
storeCossMemBufUnlock(SD, sio);
@@ -298,16 +329,16 @@ storeCossRead(SwapDir * SD, storeIOState
void
storeCossRead(SwapDir * SD, storeIOState * sio, char *buf, size_t size, squid_off_t offset, STRCB * callback, void *callback_data)
{
- char *p;
CossState *cstate = (CossState *) sio->fsstate;
CossInfo *cs = (CossInfo *) SD->fsdata;
+ CossReadOp *op;
coss_stats.read.ops++;
assert(sio->read.callback == NULL);
assert(sio->read.callback_data == NULL);
sio->read.callback = callback;
sio->read.callback_data = callback_data;
- debug(79, 3) ("storeCossRead: offset %ld\n", (long int) offset);
+ debug(79, 3) ("storeCossRead: %s: offset %ld\n", SD->path, (long int) offset);
sio->offset = offset;
cstate->flags.reading = 1;
if ((offset + size) > sio->st_size)
@@ -315,25 +346,10 @@ storeCossRead(SwapDir * SD, storeIOState
cstate->requestlen = size;
cstate->requestbuf = buf;
cstate->requestoffset = offset;
- if (cstate->readbuffer == NULL) {
- p = storeCossMemPointerFromDiskOffset(SD, storeCossFilenoToDiskOffset(sio->swap_filen, cs), NULL);
- a_file_read(&cs->aq, cs->fd,
- p,
- sio->st_size,
- cstate->reqdiskoffset,
- storeCossReadDone,
- sio);
- cstate->reqdiskoffset = 0; /* XXX */
- } else {
- /*
- * It was copied from memory in storeCossOpen()
- */
- storeCossReadDone(cs->fd,
- cstate->readbuffer,
- sio->st_size,
- 0,
- sio);
- }
+ /* All of these reads should be treated as pending ones */
+ /* Ie, we create a read op; then we 'kick' the read op to see if it can be completed now */
+ op = storeCossCreateReadOp(cs, sio);
+ storeCossKickReadOp(cs, op);
}
void
@@ -350,9 +366,10 @@ storeCossWrite(SwapDir * SD, storeIOStat
assert(sio->e->mem_obj->object_sz != -1);
coss_stats.write.ops++;
- debug(79, 3) ("storeCossWrite: offset %ld, len %lu\n", (long int) sio->offset, (unsigned long int) size);
+ debug(79, 3) ("storeCossWrite: %s: offset %ld, len %lu\n", SD->path,
+ (long int) sio->offset, (unsigned long int) size);
diskoffset = storeCossFilenoToDiskOffset(sio->swap_filen, SD->fsdata) + sio->offset;
- dest = storeCossMemPointerFromDiskOffset(SD, diskoffset, &membuf);
+ dest = storeCossMemPointerFromDiskOffset(SD->fsdata, diskoffset, &membuf);
assert(dest != NULL);
xmemcpy(dest, buf, size);
sio->offset += size;
@@ -365,57 +382,11 @@ static void
/* === STATIC =========================================================== */
static void
-storeCossReadDone(int fd, const char *buf, int len, int errflag, void *my_data)
-{
- storeIOState *sio = my_data;
- char *p;
- STRCB *callback = sio->read.callback;
- void *their_data = sio->read.callback_data;
- SwapDir *SD = INDEXSD(sio->swap_dirn);
- CossState *cstate = (CossState *) sio->fsstate;
- ssize_t rlen;
-
- debug(79, 3) ("storeCossReadDone: fileno %d, FD %d, len %d\n",
- sio->swap_filen, fd, len);
- cstate->flags.reading = 0;
- if (errflag) {
- coss_stats.read.fail++;
- if (errflag > 0) {
- errno = errflag;
- debug(79, 1) ("storeCossReadDone: error: %s\n", xstrerror());
- } else {
- debug(79, 1) ("storeCossReadDone: got failure (%d)\n", errflag);
- }
- rlen = -1;
- } else {
- coss_stats.read.success++;
- if (cstate->readbuffer == NULL) {
- cstate->readbuffer = xmalloc(sio->st_size);
- p = storeCossMemPointerFromDiskOffset(SD,
- storeCossFilenoToDiskOffset(sio->swap_filen, SD->fsdata),
- NULL);
- xmemcpy(cstate->readbuffer, p, sio->st_size);
- }
- sio->offset += len;
- xmemcpy(cstate->requestbuf, &cstate->readbuffer[cstate->requestoffset],
- cstate->requestlen);
- rlen = (size_t) cstate->requestlen;
- }
- assert(callback);
- assert(their_data);
- sio->read.callback = NULL;
- sio->read.callback_data = NULL;
- if (cbdataValid(their_data))
- callback(their_data, cstate->requestbuf, rlen);
-}
-
-static void
storeCossIOCallback(storeIOState * sio, int errflag)
{
CossState *cstate = (CossState *) sio->fsstate;
debug(79, 3) ("storeCossIOCallback: errflag=%d\n", errflag);
assert(NULL == cstate->locked_membuf);
- xfree(cstate->readbuffer);
if (cbdataValid(sio->callback_data))
sio->callback(sio->callback_data, errflag, sio);
cbdataUnlock(sio->callback_data);
@@ -424,11 +395,10 @@ static char *
}
static char *
-storeCossMemPointerFromDiskOffset(SwapDir * SD, size_t offset, CossMemBuf ** mb)
+storeCossMemPointerFromDiskOffset(CossInfo *cs, off_t offset, CossMemBuf ** mb)
{
CossMemBuf *t;
dlink_node *m;
- CossInfo *cs = (CossInfo *) SD->fsdata;
for (m = cs->membufs.head; m; m = m->next) {
t = m->data;
@@ -465,6 +435,8 @@ storeCossMemBufLock(SwapDir * SD, storeI
{
CossMemBuf *t = storeCossFilenoToMembuf(SD, sio->swap_filen);
CossState *cstate = (CossState *) sio->fsstate;
+ assert(cstate->locked_membuf == NULL);
+ assert(t->flags.dead == 0);
debug(79, 3) ("storeCossMemBufLock: locking %p, lockcount %d\n",
t, t->lockcount);
cstate->locked_membuf = t;
@@ -475,28 +447,37 @@ storeCossMemBufUnlock(SwapDir * SD, stor
storeCossMemBufUnlock(SwapDir * SD, storeIOState * sio)
{
CossState *cstate = (CossState *) sio->fsstate;
+ CossInfo *cs = SD->fsdata;
CossMemBuf *t = cstate->locked_membuf;
if (NULL == t)
return;
+ assert(t->flags.dead == 0);
debug(79, 3) ("storeCossMemBufUnlock: unlocking %p, lockcount %d\n",
t, t->lockcount);
t->lockcount--;
cstate->locked_membuf = NULL;
storeCossMaybeWriteMemBuf(SD, t);
+ /* cs->current_membuf may be invalid at this point */
+ storeCossMaybeFreeBuf(cs, t);
}
static void
storeCossMaybeWriteMemBuf(SwapDir * SD, CossMemBuf * t)
{
+ //CossInfo *cs = SD->fsdata;
membuf_describe(t, 3, __LINE__);
+ assert(t->flags.dead == 0);
if (!t->flags.full)
debug(79, 3) ("membuf %p not full\n", t);
else if (t->flags.writing)
debug(79, 3) ("membuf %p writing\n", t);
else if (t->lockcount)
debug(79, 3) ("membuf %p lockcount=%d\n", t, t->lockcount);
+ else if (t->flags.written)
+ debug(79, 3) ("membuf %p written\n", t);
else
storeCossWriteMemBuf(SD, t);
+ /* t may be invalid at this point */
}
void
@@ -504,10 +485,14 @@ storeCossSync(SwapDir * SD)
{
CossInfo *cs = (CossInfo *) SD->fsdata;
dlink_node *m;
- int end;
+ off_t end;
/* First, flush pending IO ops */
+#if USE_AUFSOPS
+ aioSync(SD);
+#else
a_file_syncqueue(&cs->aq);
+#endif
/* Then, flush any in-memory partial membufs */
if (!cs->membufs.head)
@@ -529,58 +514,167 @@ storeCossWriteMemBuf(SwapDir * SD, CossM
{
CossInfo *cs = (CossInfo *) SD->fsdata;
coss_stats.stripe_write.ops++;
- debug(79, 3) ("storeCossWriteMemBuf: offset %ld, len %ld\n",
+ assert(t->flags.dead == 0);
+ debug(79, 3) ("storeCossWriteMemBuf: %p: offset %ld, len %ld\n", t,
(long int) t->diskstart, (long int) (t->diskend - t->diskstart));
t->flags.writing = 1;
+ /* Check to see whether anything has a pending relocate (ie, a disk read)
+ * scheduled from the disk data we're about to overwrite.
+ * According to the specification this should never, ever happen - all the
+ * objects underneath this stripe were deallocated before we started
+ * using them - but there is a possibility that an object was opened
+ * before the objects underneath the membufs stripe were purged and there
+ * is still a pending relocate for it. Its a slim chance but it might happen.
+ */
+ assert(t->stripe < cs->numstripes);
+ if (cs->stripes[t->stripe].pending_relocs > 0) {
+ debug(79, 1) ("WARNING: %s: One or more pending relocate (reads) from stripe %d are queued - and I'm now writing over that part of the disk. This may result in object data corruption!\n", SD->path, t->stripe);
+ }
+ /*
+ * normally nothing should have this node locked here - but between the time
+ * we call a_file_write and the IO completes someone might have snuck in and
+ * attached itself somehow. This is why there's a distinction between "written"
+ * and "writing". Read the rest of the code for more details.
+ */
+#if USE_AUFSOPS
+ /* XXX The last stripe, for now, ain't the coss stripe size for some reason */
+ /* XXX This may cause problems later on; worry about figuring it out later on */
+ //assert(t->diskend - t->diskstart == COSS_MEMBUF_SZ);
+ debug(79, 3) ("aioWrite: FD %d: disk start: %llu, size %llu\n", cs->fd, t->diskstart, t->diskend - t->diskstart);
+ aioWrite(cs->fd, t->diskstart, &(t->buffer[0]), t->diskend - t->diskstart, storeCossWriteMemBufDone, t, NULL);
+#else
a_file_write(&cs->aq, cs->fd, t->diskstart, &t->buffer,
t->diskend - t->diskstart, storeCossWriteMemBufDone, t, NULL);
+#endif
}
+/*
+ * Check if a memory buffer can be freed.
+ * Memory buffers can be freed if their refcount is 0 and they've been written.
+ */
+static void
+storeCossMaybeFreeBuf(CossInfo *cs, CossMemBuf *mb)
+{
+ assert(mb->lockcount >= 0);
+ /* It'd be nice if we could walk all the pending sio's somehow to see if some has this membuf locked .. */
+ if (mb->flags.dead == 1) {
+ debug(79, 1) ("storeCossMaybeFreeBuf: %p: dead; it'll be freed soon enough\n", mb);
+ return;
+ }
+ /* Place on dead list rather than free
+ * the asyncio code fails over to a 'sync' path; which may mean a membuf is
+ * deallocated somewhere deep in the stack level. This way we just mark them
+ * as dead and deallocate membufs early in the stack frame (ie, before we
+ * call the asyncio disk completion handler.)
+ */
+ if (mb->lockcount == 0 && mb->flags.written == 1) {
+ debug (79, 3) ("storeCossMaybeFreeBuf: %p: lockcount = 0, written = 1: marking dead\n", mb);
+ mb->flags.dead = 1;
+ dlinkDelete(&mb->node, &cs->membufs);
+ dlinkAddTail(mb, &mb->node, &cs->dead_membufs);
+ coss_stats.dead_stripes++;
+ coss_stats.stripes--;
+ }
+}
+void
+storeCossFreeDeadMemBufs(CossInfo *cs)
+{
+ CossMemBuf *mb;
+ while (cs->dead_membufs.head != NULL) {
+ mb = cs->dead_membufs.head->data;
+ assert(mb->flags.dead == 1);
+ debug(79, 3) ("storeCossFreeDeadMemBufs: %p: freeing\n", mb);
+ dlinkDelete(&mb->node, &cs->dead_membufs);
+ cbdataFree(mb);
+ coss_stats.dead_stripes--;
+ }
+}
+
+/*
+ * Writing a membuf has completed. Set the written flag to 1; membufs might have been
+ * locked for read between the initial membuf write and the completion of the disk
+ * write.
+ */
+#if USE_AUFSOPS
static void
-storeCossWriteMemBufDone(int fd, int errflag, size_t len, void *my_data)
+storeCossWriteMemBufDone(int fd, void *my_data, const char *buf, int aio_return, int aio_errno)
+#else
+static void
+storeCossWriteMemBufDone(int fd, int r_errflag, size_t r_len, void *my_data)
+#endif
{
CossMemBuf *t = my_data;
CossInfo *cs = (CossInfo *) t->SD->fsdata;
+ int errflag;
+ int len;
+#if USE_AUFSOPS
+ len = aio_return;
+ if (aio_errno)
+ errflag = aio_errno == ENOSPC ? DISK_NO_SPACE_LEFT : DISK_ERROR;
+ else
+ errflag = DISK_OK;
+#else
+ len = r_len;
+ errflag = r_errflag;
+#endif
- debug(79, 3) ("storeCossWriteMemBufDone: buf %p, len %ld\n", t, (long int) len);
+ debug(79, 3) ("storeCossWriteMemBufDone: stripe %d, buf %p, len %ld\n", t->stripe, t, (long int) len);
if (errflag) {
coss_stats.stripe_write.fail++;
debug(79, 1) ("storeCossWriteMemBufDone: got failure (%d)\n", errflag);
- debug(79, 1) ("FD %d, size=%x\n", fd, (int) (t->diskend - t->diskstart));
+ debug(79, 1) ("FD %d, size=%d\n", fd, (int) (t->diskend - t->diskstart));
} else {
coss_stats.stripe_write.success++;
}
-
- dlinkDelete(&t->node, &cs->membufs);
- cbdataFree(t);
- coss_stats.stripes--;
+ assert(cs->stripes[t->stripe].membuf == t);
+ debug(79, 2) ("storeCossWriteMemBufDone: %s: stripe %d: numobjs written: %d, lockcount %d\n", t->SD->path, t->stripe, t->numobjs, t->lockcount);
+ cs->stripes[t->stripe].numdiskobjs = t->numobjs;
+ cs->stripes[t->stripe].membuf = NULL;
+ t->flags.written = 1;
+ t->flags.writing = 0;
+ storeCossMaybeFreeBuf(cs, t);
}
+/*
+ * This creates a memory buffer but assumes its going to be at the end
+ * of the "LRU" and thusly will delete expire objects which appear under
+ * it.
+ */
static CossMemBuf *
-storeCossCreateMemBuf(SwapDir * SD, size_t start,
- sfileno curfn, int *collision)
+storeCossCreateMemBuf(SwapDir * SD, int stripe, sfileno curfn, int *collision)
{
CossMemBuf *newmb, *t;
StoreEntry *e;
dlink_node *m, *prev;
int numreleased = 0;
CossInfo *cs = (CossInfo *) SD->fsdata;
+ off_t start = (off_t) stripe * COSS_MEMBUF_SZ;
+ assert(start >= 0);
+ /* No, we shouldn't ever try to create a membuf if we haven't freed the one on
+ * this stripe. Grr */
+ assert(cs->stripes[stripe].membuf == NULL);
+ cs->curstripe = stripe;
+
newmb = cbdataAlloc(CossMemBuf);
+ cs->stripes[stripe].membuf = newmb;
newmb->diskstart = start;
- debug(79, 3) ("storeCossCreateMemBuf: creating new membuf at %ld\n", (long int) newmb->diskstart);
- debug(79, 3) ("storeCossCreateMemBuf: at %p\n", newmb);
+ newmb->stripe = stripe;
+ debug(79, 2) ("storeCossCreateMemBuf: %s: creating new membuf at stripe %d, %lld (%p)\n", SD->path, stripe, (long long int) newmb->diskstart, newmb);
newmb->diskend = newmb->diskstart + COSS_MEMBUF_SZ;
newmb->flags.full = 0;
newmb->flags.writing = 0;
newmb->lockcount = 0;
+ newmb->numobjs = 0;
newmb->SD = SD;
/* XXX This should be reversed, with the new buffer last in the chain */
dlinkAdd(newmb, &newmb->node, &cs->membufs);
+ assert(newmb->diskstart >= 0);
+ assert(newmb->diskend >= 0);
/* Print out the list of membufs */
- debug(79, 3) ("storeCossCreateMemBuf: membuflist:\n");
+ debug(79, 3) ("storeCossCreateMemBuf: %s: membuflist:\n", SD->path);
for (m = cs->membufs.head; m; m = m->next) {
t = m->data;
membuf_describe(t, 3, __LINE__);
@@ -619,7 +713,13 @@ storeCossStartMembuf(SwapDir * sd)
CBDATA_INIT_TYPE_FREECB(storeIOState, storeCossIOFreeEntry);
CBDATA_INIT_TYPE_FREECB(CossMemBuf, NULL);
CBDATA_INIT_TYPE_FREECB(storeIOState, storeCossIOFreeEntry);
- newmb = storeCossCreateMemBuf(sd, cs->current_offset, -1, NULL);
+ CBDATA_INIT_TYPE_FREECB(CossPendingReloc, NULL);
+ /*
+ * XXX for now we start at the beginning of the disk;
+ * The rebuild logic doesn't 'know' to pad out the current
+ * offset to make it a multiple of COSS_MEMBUF_SZ.
+ */
+ newmb = storeCossCreateMemBuf(sd, 0, -1, NULL);
assert(!cs->current_membuf);
cs->current_membuf = newmb;
}
@@ -636,7 +736,12 @@ storeCossFilenoToDiskOffset(sfileno f, C
static off_t
storeCossFilenoToDiskOffset(sfileno f, CossInfo * cs)
{
- return (off_t) f << cs->blksz_bits;
+ off_t doff;
+
+ doff = (off_t) f;
+ doff <<= cs->blksz_bits;
+ assert(doff >= 0);
+ return doff;
}
static sfileno
@@ -649,10 +754,294 @@ membuf_describe(CossMemBuf * t, int leve
static void
membuf_describe(CossMemBuf * t, int level, int line)
{
- debug(79, level) ("membuf %p, LC:%02d, ST:%010lu, FL:%c%c\n",
+ assert(t->lockcount >= 0);
+ debug(79, level) ("membuf id:%d (%p), LC:%02d, ST:%010lu, FL:%c%c%c\n",
+ t->stripe,
t,
t->lockcount,
(unsigned long) t->diskstart,
t->flags.full ? 'F' : '.',
- t->flags.writing ? 'W' : '.');
+ t->flags.writing ? 'W' : '.',
+ t->flags.written ? 'T' : '.');
}
+
+static int
+storeCossFilenoToStripe(CossInfo *cs, sfileno filen)
+{
+ off_t o;
+ /* Calculate sfileno to disk offset */
+ o = ((off_t) filen) << cs->blksz_bits;
+ /* Now, divide by COSS_MEMBUF_SZ to get which stripe it is in */
+ return (int) (o / (off_t) COSS_MEMBUF_SZ);
+}
+
+/*
+ * New stuff
+ */
+void
+storeCossNewPendingRelocate(CossInfo *cs, storeIOState *sio, sfileno original_filen, sfileno new_filen)
+{
+ CossPendingReloc *pr;
+ char *p;
+ off_t disk_offset;
+ int stripe;
+
+ pr = cbdataAlloc(CossPendingReloc);
+ cbdataLock(pr);
+ pr->cs = cs;
+ pr->original_filen = original_filen;
+ pr->new_filen = new_filen;
+ pr->len = sio->e->swap_file_sz;
+ debug(79, 3) ("COSS Pending Relocate: %d -> %d: beginning\n", pr->original_filen, pr->new_filen);
+ cs->pending_reloc_count++;
+ dlinkAddTail(pr, &pr->node, &cs->pending_relocs);
+
+ /* Update the stripe count */
+ stripe = storeCossFilenoToStripe(cs, original_filen);
+ assert(stripe >= 0);
+ assert(stripe < cs->numstripes);
+ assert(cs->stripes[stripe].pending_relocs >= 0);
+ cs->stripes[stripe].pending_relocs++;
+
+ /* And now; we begin the IO */
+ p = storeCossMemPointerFromDiskOffset(cs, storeCossFilenoToDiskOffset(new_filen, cs), NULL);
+ pr->p = p;
+ disk_offset = storeCossFilenoToDiskOffset(original_filen, cs);
+ debug(79, 3) ("COSS Pending Relocate: size %d, disk_offset %llu\n", sio->e->swap_file_sz, disk_offset);
+#if USE_AUFSOPS
+ /* NOTE: the damned buffer isn't passed into aioRead! */
+ debug(79, 3) ("COSS: aioRead: FD %d, from %d -> %d, offset %llu, len: %d\n", cs->fd, pr->original_filen, pr->new_filen, disk_offset, pr->len);
+ aioRead(cs->fd, (off_t) disk_offset, pr->len, storeCossCompletePendingReloc, pr);
+#else
+ a_file_read(&cs->aq, cs->fd,
+ p,
+ pr->len,
+ disk_offset,
+ storeCossCompletePendingReloc,
+ pr);
+#endif
+}
+
+CossPendingReloc *
+storeCossGetPendingReloc(CossInfo *cs, sfileno new_filen)
+{
+ dlink_node *n;
+ CossPendingReloc *pr;
+
+ n = cs->pending_relocs.head;
+ while (n != NULL) {
+ pr = n->data;
+ if (pr->new_filen == new_filen) {
+ return pr;
+ }
+ n = n->next;
+ }
+ return NULL;
+}
+#if USE_AUFSOPS
+void
+storeCossCompletePendingReloc(int fd, void *my_data, const char *buf, int aio_return, int aio_errno)
+#else
+void
+storeCossCompletePendingReloc(int fd, const char *buf, int r_len, int r_errflag, void *my_data)
+#endif
+{
+ CossPendingReloc *pr = my_data;
+ CossReadOp *op;
+ CossInfo *cs = pr->cs;
+ int stripe;
+ int errflag, len;
+#if USE_AUFSOPS
+ char *p;
+#endif
+
+#if USE_AUFSOPS
+ len = aio_return;
+ if (aio_errno)
+ errflag = aio_errno == ENOSPC ? DISK_NO_SPACE_LEFT : DISK_ERROR;
+ else
+ errflag = DISK_OK;
+#else
+ errflag = r_errflag;
+ len = r_len;
+#endif
+
+ debug(79, 3) ("storeCossCompletePendingReloc: %p\n", pr);
+ assert(cbdataValid(pr));
+ if (errflag != 0) {
+ coss_stats.read.fail++;
+ if (errflag > 0) {
+ errno = errflag;
+ debug(79, 1) ("storeCossCompletePendingReloc: error: %s\n", xstrerror());
+ } else {
+ debug(79, 1) ("storeCossCompletePendingReloc: got failure (%d)\n", errflag);
+ }
+ } else {
+ debug(79, 3) ("COSS Pending Relocate: %d -> %d: completed\n", pr->original_filen, pr->new_filen);
+ coss_stats.read.success++;
+ }
+ /* aufs aioRead() doesn't take a buffer, it reads into its own. Grr */
+#if USE_AUFSOPS
+ p = storeCossMemPointerFromDiskOffset(cs, storeCossFilenoToDiskOffset(pr->new_filen, cs), NULL);
+ assert(p != NULL);
+ assert(p == pr->p);
+ xmemcpy(p, buf, len);
+#endif
+
+ /* Nope, we're not a pending relocate anymore! */
+ dlinkDelete(&pr->node, &cs->pending_relocs);
+
+ /* Update the stripe count */
+ stripe = storeCossFilenoToStripe(cs, pr->original_filen);
+ assert(stripe >= 0);
+ assert(stripe < cs->numstripes);
+ assert(cs->stripes[stripe].pending_relocs >= 1);
+ cs->stripes[stripe].pending_relocs--;
+
+ /* Relocate has completed; we can now complete pending read ops on this particular entry */
+ while (pr->ops.head != NULL) {
+ op = pr->ops.head->data;
+ debug (79, 3) ("storeCossCompletePendingReloc: %p: dequeueing op %p\n", pr, op);
+ op->pr = NULL;
+ dlinkDelete(&op->pending_op_node, &pr->ops);
+ storeCossCompleteReadOp(cs, op, errflag);
+ /* XXX again, this shouldn't be here (find the dlinkAddTail() in storeCossKickReadOp); these should
+ * be abstracted out. */
+ }
+ /* Good, now we can delete it */
+ cbdataUnlock(pr);
+ cbdataFree(pr);
+ assert(cs->pending_reloc_count != 0);
+ cs->pending_reloc_count--;
+}
+
+/*
+ * Begin a read operation
+ *
+ * the current 'state' of the read operation has already been set in storeIOState.
+ *
+ * We assume that the read operation will be from a currently in-memory MemBuf.
+ */
+CossReadOp *
+storeCossCreateReadOp(CossInfo *cs, storeIOState *sio)
+{
+ CossReadOp *op;
+ CossState *cstate = sio->fsstate;
+
+ /* Create entry */
+ op = memPoolAlloc(coss_op_pool);
+
+ debug(79, 3) ("COSS: Creating Read operation: %p: filen %d, offset %lld, size %lld\n", op, sio->swap_filen, (long long int) cstate->requestoffset, (long long int) cstate->requestlen);
+
+ /* Fill in details */
+ op->type = COSS_OP_READ;
+ op->sio = sio;
+ op->requestlen = cstate->requestlen;
+ op->requestoffset = cstate->requestoffset;
+ op->reqdiskoffset = cstate->reqdiskoffset;
+ op->requestbuf = cstate->requestbuf;
+
+ /* Add to list */
+ dlinkAddTail(op, &op->node, &cs->pending_ops);
+ return op;
+}
+
+void
+storeCossCompleteReadOp(CossInfo *cs, CossReadOp *op, int error)
+{
+ storeIOState *sio = op->sio;
+ STRCB *callback = sio->read.callback;
+ void *callback_data = sio->read.callback_data;
+ CossState *cstate = sio->fsstate;
+ ssize_t rlen = -1;
+ char *p;
+ SwapDir *SD = INDEXSD(sio->swap_dirn);
+
+ debug(79, 3) ("storeCossCompleteReadOp: op %p, op dependencies satisfied, completing\n", op);
+
+ assert(callback);
+ assert(callback_data);
+ assert(storeCossGetPendingReloc(cs, sio->swap_filen) == NULL);
+ /* and make sure we aren't on a pending op list! */
+ assert(op->pr == NULL);
+ /* Is the callback still valid? If so; copy the data and callback */
+ if (cbdataValid(callback_data) && cbdataValid(sio)) {
+ sio->read.callback = NULL;
+ sio->read.callback_data = NULL;
+ if (error == 0) {
+ /* P is the beginning of the object data we're interested in */
+ p = storeCossMemPointerFromDiskOffset(cs, storeCossFilenoToDiskOffset(sio->swap_filen, SD->fsdata), NULL);
+ assert(p != NULL);
+ /* cstate->requestlen contains the current copy length */
+ assert(cstate->requestlen == op->requestlen);
+ assert(cstate->requestbuf == op->requestbuf);
+ assert(cstate->requestoffset == op->requestoffset);
+ xmemcpy(cstate->requestbuf, &p[cstate->requestoffset], cstate->requestlen);
+ rlen = cstate->requestlen;
+ }
+ callback(callback_data, cstate->requestbuf, rlen);
+ }
+
+ /* Remove from the operation list */
+ dlinkDelete(&op->node, &cs->pending_ops);
+
+ /* Completed! */
+ memPoolFree(coss_op_pool, op);
+}
+
+/* See if the read op can be satisfied now */
+void
+storeCossKickReadOp(CossInfo *cs, CossReadOp *op)
+{
+ CossPendingReloc *pr;
+
+ debug(79, 3) ("storeCossKickReadOp: op %p\n", op);
+
+ if ((pr = storeCossGetPendingReloc(cs, op->sio->swap_filen)) == NULL) {
+ debug(79, 3) ("COSS: filen: %d, tis already in memory; serving.\n", op->sio->swap_filen);
+ storeCossCompleteReadOp(cs, op, 0);
+ } else {
+ debug(79, 3) ("COSS: filen: %d, not in memory, she'll have to wait.\n", op->sio->swap_filen);
+ /* XXX Eww, hack! It has to be done; but doing it here is yuck */
+ if (op->pr == NULL) {
+ debug(79, 3) ("storeCossKickReadOp: %p: op not bound to a pending read %p; binding\n", op, pr);
+ dlinkAddTail(op, &op->pending_op_node, &pr->ops);
+ op->pr = pr;
+ }
+ }
+}
+
+static void
+membufsPrint(StoreEntry *e, CossMemBuf *t, char *prefix)
+{
+ storeAppendPrintf(e, "%s: %d, lockcount: %d, numobjects %d, flags: %s,%s,%s\n",
+ prefix, t->stripe, t->lockcount, t->numobjs,
+ t->flags.full ? "FULL" : "NOTFULL",
+ t->flags.writing ? "WRITING" : "NOTWRITING",
+ t->flags.written ? "WRITTEN" : "NOTWRITTEN");
+}
+
+void
+membufsDump(CossInfo *cs, StoreEntry *e)
+{
+ dlink_node *m;
+ int i;
+ m = cs->membufs.head;
+ while (m != NULL) {
+ CossMemBuf *t = m->data;
+ membufsPrint(e, t, "Stripe");
+ m = m->next;
+ }
+ m = cs->dead_membufs.head;
+ while (m != NULL) {
+ CossMemBuf *t = m->data;
+ membufsPrint(e, t, "Dead Stripe");
+ m = m->next;
+ }
+ storeAppendPrintf(e, "Pending Relocations:\n");
+ for (i = 0; i < cs->numstripes; i++) {
+ if (cs->stripes[i].pending_relocs > 0) {
+ storeAppendPrintf(e, " Stripe: %d Number: %d\n", i, cs->stripes[i].pending_relocs);
+ }
+ }
+}
============================================================
--- src/store_client.c d961bf8e2ac9cf641164b0dd9c6b5d0cef3688f0
+++ src/store_client.c 35cf0d9660b7ea121d8da18a53b6d958cf75dc3e
@@ -449,6 +449,8 @@ storeClientReadHeader(void *data, const
break;
}
break;
+ case STORE_META_OBJSIZE:
+ break;
case STORE_META_STD:
case STORE_META_STD_LFS:
break;
============================================================
--- src/store_swapmeta.c 7d767872403f2adbc507a0fc81661dd161046f53
+++ src/store_swapmeta.c 369d3779b78fe2673c4b4bbc83c4b85ba758eeac
@@ -68,6 +68,8 @@ storeSwapMetaBuild(StoreEntry * e)
tlv **T = &TLV;
const char *url;
const char *vary;
+ const squid_off_t objsize = objectLen(e);
+
assert(e->mem_obj != NULL);
assert(e->swap_status == SWAPOUT_WRITING);
url = storeUrl(e);
@@ -79,6 +81,10 @@ storeSwapMetaBuild(StoreEntry * e)
T = storeSwapTLVAdd(STORE_META_STD_LFS, &e->timestamp, STORE_HDR_METASIZE, T);
#endif
T = storeSwapTLVAdd(STORE_META_URL, url, strlen(url) + 1, T);
+ /* XXX this should eventually take the large-file-support stuff into account */
+ if (objsize > -1) {
+ T = storeSwapTLVAdd(STORE_META_OBJSIZE, &objsize, sizeof(objsize), T);
+ }
vary = e->mem_obj->vary_headers;
if (vary)
T = storeSwapTLVAdd(STORE_META_VARY_HEADERS, vary, strlen(vary) + 1, T);