Understanding The Memcached Source Code-Event Driven II

slab allocator (I, II, III) is the core module of the cache system, which largely determines how efficient the bottleneck resource, memory, can be utilized. The other 3 parts, namely,

LRU algorithm (I , II , III) for entry expiration; and an

event driven model (I , II - this article , III) based on libevent; and the

consistent harsh (not complete) for data distribution,

are built around it.

In classic multithreading, blocking I/O operations constrain the maximum number of requests a server can handle. Hence asynchronous event driven model is used to eliminate the throughput bottleneck. As such, the synchronous and potentially slow process is divided into logic segments that are free of I/O, and are executed asynchronously.

When it comes to asynchronization, extra space is required to store contexts. This is because the logic segments, that could be associated with different sessions, are executed in an interleaved way. For instance, in the case when asynchronization is implemented (emulated) using synchronous multithreading, the “extra space” is in the form of thread stack. Whilst contexts are maintained in user land in event driven.

conn is the representative of those contexts in Memcached.

Core data structure - conn

typedef struct conn conn;
struct conn {
int sfd;

...// scr: not applicable

enum conn_states state;

...// scr: not applicable

struct event event;
short ev_flags;
short which; /** which events were just triggered */

char *rbuf; /** buffer to read commands into */
char *rcurr; /** but if we parsed some already, this is where we stopped */
int rsize; /** total allocated size of rbuf */
int rbytes; /** how much data, starting from rcur, do we have unparsed */

char *wbuf;
char *wcurr;
int wsize;
int wbytes;
/** which state to go into after finishing current write */
enum conn_states write_and_go;
void *write_and_free; /** free this memory after finishing writing */

char *ritem; /** when we read in an item's value, it goes here */
int rlbytes;

/* data for the nread state */

/**
* item is used to hold an item structure created after reading the command
* line of set/add/replace commands, but before we finished reading the actual
* data. The data is read into ITEM_data(item) to avoid extra copying.
*/

void *item; /* for commands set/add/replace */

/* data for the swallow state */
int sbytes; /* how many bytes to swallow */

/* data for the mwrite state */
struct iovec *iov;
int iovsize; /* number of elements allocated in iov[] */
int iovused; /* number of elements used in iov[] */

struct msghdr *msglist;
int msgsize; /* number of elements allocated in msglist[] */
int msgused; /* number of elements used in msglist[] */
int msgcurr; /* element in msglist[] being transmitted now */
int msgbytes; /* number of bytes in current msg */

item **ilist; /* list of items to write out */
int isize;
item **icurr;
int ileft;

...// scr: not applicable

enum protocol protocol; /* which protocol this connection speaks */

...// scr: not applicable

socklen_t request_addr_size;
unsigned char *hdrbuf; /* udp packet headers */
int hdrsize; /* number of headers' worth of space is allocated */

bool noreply; /* True if the reply should not be sent. */

...// scr: not applicable

short cmd; /* current command being processed */

...// scr: not applicable

int keylen;
conn *next; /* Used for generating a list of conn structures */
LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */
};

Properties in discussion

fd - the file descriptor a event is rooted. used by last post

state - the main focus of this post

rbuf - read buffer address. used by try_read_network

rcurr - address of unprocessed data. used by try_read_network

rsize - current size of the read buffer. used by try_read_network

rbytes - size of data to be processed (it is also used as an indicator for leftover data in various places). initialised by try_read_network, updated by process_get_command, used by try_read_command

last_cmd_time - updated when start processing a command. used by try_read_network

ilist - the item list that is associated with the context; icurr and ileft indicate the current entry and number of entries left. used by process_get_command, conn_release_items

iov - the actual storage for pointers of output data, which is used by msglist; iovsize, and iovused are its allocated size and used size respectively. initialised by process_command, used by add_iov, ensure_iov_space


Here the data structures (struct msghdr and struct iovec) is required by sendmsg. The relevant text about the API is pasted bellow.

The msg_iov and msg_iovlen fields of message specify zero or more buffers containing the data to be sent. msg_iov points to an array of iovec structures; msg_iovlen shall be set to the dimension of this array. In each iovec structure, the iov_base field specifies a storage area and the iov_len field gives its size in bytes. Some of these sizes can be zero. The data from each storage area indicated by msg_iov is sent in turn.

msglist - the list that stores the struct msghdr themselves; msgsize and msgused are its allocated size and used size respectively; msgbytes indicates totall size of the output data size; msgcurr points to the index that has been processed (written).

Yet nothing is bette than a chart to demonstrate the data structures and the layout in memory.

msglist & iov

State switch

An event triggers cascading changes of states which in turn invokes various procedures, before drive machine relinquishes control and waits for a new event arrival. In last post, we have seen this process on dispatch thread, in which

1) conn_listening is triggered by a new connection;

2) dispatch_conn_new is invoked, which transfer the new accepted fd, as well as succeeding events to one of the worker threads;

3) dispatch thread gives up CPU and waits for new “new connection” events.

In this post, we are going to see more complex state switches that effectively link together the procedures we discussed in LRU III,


The state of a given session is represented by conn.state of its associated context.

and this time we are going to adopt a similar approach as LRU III, i.e., sending telnet commands to a Memcached instance, to navigate the outermost layer of the Memcached application.


We will also switch ON the convenient verbose with -vvv to better observe the internal state transitions.


...
case 'v':
settings.verbose++;
break;
...

memcached.c:5518SourceRead

Read

Firstly (as usual) we telnet to the Memcached instance, and add some items

...// add some items
~telnet localhost 11211
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
...
Telnet inputSourceRead
...
<36 new auto-negotiating client connection
...
Server verboseSourceRead


Here 36 is the accepted fd. As mentioned, the following operations will be on this fd.

Next we send the exact same read command to the Memcached instance as in LRU III

> get test
Telnet inputSourceRead
...
36: going from conn_new_cmd to conn_waiting
36: going from conn_waiting to conn_read
36: going from conn_read to conn_parse_cmd
36: Client using the ascii protocol
<36 get test
> FOUND KEY test
>36 sending key test
>36 END
36: going from conn_parse_cmd to conn_mwrite
36: going from conn_mwrite to conn_new_cmd
36: going from conn_new_cmd to conn_waiting
36: going from conn_waiting to conn_read
...
Server verboseSourceRead

As mentioned in last post, the initial state of worker threads is

conn_new_cmd

so we get started from here.

...
static void drive_machine(conn *c) {
int nreqs = settings.reqs_per_event; // scr: --------> 1)
...
case conn_new_cmd:
/* Only process nreqs at a time to avoid starving other
connections */
--nreqs; // scr: ----------------------------> 1)
if (nreqs >= 0) {
reset_cmd_handler(c); // scr: -----------> 2)
} else {
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.conn_yields++;
pthread_mutex_unlock(&c->thread->stats.mutex);
if (c->rbytes > 0) {
...// scr: error handling
}
stop = true; // scr: --------------------> 3)
}
break;
...

static void reset_cmd_handler(conn *c) {
c->cmd = -1;
c->substate = bin_no_state;
if(c->item != NULL) {
item_remove(c->item);
c->item = NULL;
}
conn_shrink(c);
if (c->rbytes > 0) {
conn_set_state(c, conn_parse_cmd); // scr: -----> 2a)
} else {
conn_set_state(c, conn_waiting); // scr: -------> 2b)
}
}
memcached.c:4361 & reset_cmd_handlerSourceRead

1) nreqs (settings.reqs_per_event) is the maximum requests one event loop iteration should handle. Note that the threshold is needed because new connections will not be handled if one event loop iteration takes too long to complete. Note also that the connection being interrupted will be fired again and get the chance to enter the drive machine with a “read” event since the descriptor is set with EV_PERSIST in the last post.


...
settings.reqs_per_event = 20;
...
case 'R':
settings.reqs_per_event = atoi(optarg);
if (settings.reqs_per_event == 0) {
fprintf(stderr, "Number of requests per event must be greater than 0\n");
return 1;
}
break;
...

memcached.c:5545, 6112SourceRead

2) Initializes the relevant properties in the context for a new command.

2a) If there are leftover data, then switch to conn_parse_cmd directly.

2b) If it is a fresh session, then switch to conn_waiting.

3) Yield the current iteration when the threshold is reached.

conn_waiting

...
case conn_waiting:
if (!update_event(c, EV_READ | EV_PERSIST)) {
if (settings.verbose > 0)
fprintf(stderr, "Couldn't update event\n");
conn_set_state(c, conn_closing);
break;
}

conn_set_state(c, conn_read);
stop = true;
break;
...
memcached.c:4322SourceRead

Simply reset the descriptor with the original flags (i.e., EV_READ, EV_PERSIST), update the state of the context to the next hop (conn_read), and relinquish the CPU.

conn_read

...
case conn_read:
res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c); //1)

switch (res) {
case READ_NO_DATA_RECEIVED:
conn_set_state(c, conn_waiting);
break;
case READ_DATA_RECEIVED:
conn_set_state(c, conn_parse_cmd); // scr: ---------------------> 2)
break;
case READ_ERROR:
conn_set_state(c, conn_closing);
break;
case READ_MEMORY_ERROR: /* Failed to allocate more memory */
/* State already set by try_read_network */
break;
}
break;
...
memcached.c:4334SourceRead

1) Read from the file descriptor and save the data to the context.

2) Switch to the next state, conn_parse_cmd.

try_read_network

static enum try_read_result try_read_network(conn *c) {
enum try_read_result gotdata = READ_NO_DATA_RECEIVED;
int res;
int num_allocs = 0;
assert(c != NULL);

if (c->rcurr != c->rbuf) { // scr: -------------------------> 1)
if (c->rbytes != 0) /* otherwise there's nothing to copy */
memmove(c->rbuf, c->rcurr, c->rbytes);
c->rcurr = c->rbuf;
}

while (1) {
if (c->rbytes >= c->rsize) { // scr: -------------------> 2)
if (num_allocs == 4) {
return gotdata;
}
++num_allocs;
char *new_rbuf = realloc(c->rbuf, c->rsize * 2);
if (!new_rbuf) {
...// scr: error handling
}
c->rcurr = c->rbuf = new_rbuf;
c->rsize *= 2;
}

int avail = c->rsize - c->rbytes; // scr: --------------> 3)
res = read(c->sfd, c->rbuf + c->rbytes, avail);
if (res > 0) {
...// scr: stat
gotdata = READ_DATA_RECEIVED;
c->rbytes += res;
if (res == avail) { // scr: -----------------------> 3a)
continue;
} else {
break; // scr: --------------------------------> 3b)
}
}
if (res == 0) {
return READ_ERROR;
}
if (res == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) { //src:3b)
break;
}
return READ_ERROR;
}
}
return gotdata;
}
memcached.c:try_read_networkSourceRead

Here the while (1) is used to handle logic flow for buffer expanding instead of loop.

1) Move rcurr to the beginning of the read buffer.

2) If the data size exceeds the read buffer size, try expanding the buffer (for at most 4 times).

3) Calculate the available buffer space for reading from the socket, and update rbytes accordingly.

3a) Goto 2) if the buffer is full.

3b) Return READ_DATA_RECEIVED, which switches the state to conn_parse_cmd in the state machine pass through.

conn_parse_cmd

...
case conn_parse_cmd :
if (try_read_command(c) == 0) {
/* wee need more data! */
conn_set_state(c, conn_waiting);
}

break;
...
memcached.c:try_read_networkSourceRead

try_read_command

static int try_read_command(conn *c) {
assert(c != NULL);
assert(c->rcurr <= (c->rbuf + c->rsize));
assert(c->rbytes > 0);

if (c->protocol == negotiating_prot || c->transport == udp_transport) {
if ((unsigned char)c->rbuf[0] == (unsigned char)PROTOCOL_BINARY_REQ) {
c->protocol = binary_prot;
} else {
c->protocol = ascii_prot; // scr: -------------------------> 1)
}

if (settings.verbose > 1) { // scr: ---------------------------> ~)
fprintf(stderr, "%d: Client using the %s protocol\n", c->sfd,
prot_text(c->protocol));
}
}

if (c->protocol == binary_prot) {
...// scr: not applicable
} else {
char *el, *cont;

if (c->rbytes == 0)
return 0;

el = memchr(c->rcurr, '\n', c->rbytes); // scr: ---------------> 2)
if (!el) {
...// scr: not applicable
}
cont = el + 1;
if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
el--;
}
*el = '\0'; // scr: -------------------------------------------> 2)

assert(cont <= (c->rcurr + c->rbytes));

c->last_cmd_time = current_time; // scr: ----------------------> 3)
process_command(c, c->rcurr); // scr: -------------------------> 4)

c->rbytes -= (cont - c->rcurr); // scr: -----------------------> 5)
c->rcurr = cont; // scr: --------------------------------------> 6)

assert(c->rcurr <= (c->rbuf + c->rsize));
}

return 1;
}
memcached.c:try_read_commandSourceRead

1) Determine the protocol type, in this case is ascii_prot.

~) Verbose message we saw in the beginning.

2) Trim all the '\n' and '\r' in the end, store the position of the command last character to el, and store the command end to cont.

3) Update last_cmd_time.

4) Call process_command which locates the “get” command and call process_get_command. In process_command, a) tokenize_command is a string parsing method that stores command (i.e., “get”) in tokens[COMMAND_TOKEN] and key (i.e., test) in tokens[KEY_TOKEN]; b) initialization of msgcurr, msgused, iovused; c) initialization other fields in add_msghdr; and d) process_get_command is the next step.


...
c->msgcurr = 0; // scr: ---------------------------------------------> b)
c->msgused = 0;
c->iovused = 0;
if (add_msghdr(c) != 0) {
out_of_memory(c, "SERVER_ERROR out of memory preparing response");
return;
}

ntokens = tokenize_command(command, tokens, MAX_TOKENS); // scr: ----> a)
if (ntokens >= 3 &&
((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
(strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {

process_get_command(c, tokens, ntokens, false); // scr: ---------> c)
} else if ...
...

memcached.c:process_commandSourceRead

5) Update rbytes with the length of the command that has been processed (cont - c->rcurr).

6) Move the rcurr to the unprocessed data located at end of the command portion.

add_msghdr

Before the logic reaches process_get_command, an entry should be initialised in msglist for the current command.

static int add_msghdr(conn *c)
{
struct msghdr *msg;

assert(c != NULL);

if (c->msgsize == c->msgused) { // scr: --------------------> 1)
msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
if (! msg) {
STATS_LOCK();
stats.malloc_fails++;
STATS_UNLOCK();
return -1;
}
c->msglist = msg;
c->msgsize *= 2;
}

msg = c->msglist + c->msgused; // scr: ---------------------> 2)

/* this wipes msg_iovlen, msg_control, msg_controllen, and
msg_flags, the last 3 of which aren't defined on solaris: */
memset(msg, 0, sizeof(struct msghdr)); // scr: -------------> 3)

msg->msg_iov = &c->iov[c->iovused]; // scr: ----------------> 3)

if (IS_UDP(c->transport) && c->request_addr_size > 0) {
...// scr: UDP related
}

c->msgbytes = 0; // scr: -----------------------------------> 4)
c->msgused++; // scr: --------------------------------------> 5)

if (IS_UDP(c->transport)) {
...// scr: UDP related
}

return 0;
}
add_msghdr@memcached.cSourceRead

1) Expand the msglist when required.

2) Point to the next empty entry in msglist with msg.

3) Initialise the entry pointed by msg. Here the critical operation is msg->msg_iov = &c->iov[c->iovused]; which links the msglist to the specific entry in iov. (Figure - msglist & iov)

4) Initialise msgbytes to 0.

5) Update msgused accordingly.

process_get_command

We have seen this method in beginning of LRU III. This time, we will complete its pass through with the context of event driven.

static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, bool return_cas) {
char *key;
size_t nkey;
int i = 0;
item *it;
token_t *key_token = &tokens[KEY_TOKEN];
char *suffix;
assert(c != NULL);

do {
while(key_token->length != 0) { // scr: -----------------> 1)

key = key_token->value;
nkey = key_token->length;

if(nkey > KEY_MAX_LENGTH) {
out_string(c, "CLIENT_ERROR bad command line format");
while (i-- > 0) {
item_remove(*(c->ilist + i));
}
return;
}

it = item_get(key, nkey, c); // scr: ----------------> 2)
if (settings.detail_enabled) {
stats_prefix_record_get(key, nkey, NULL != it);
}
if (it) {
if (i >= c->isize) { // scr: --------------------> 3)
item **new_list = realloc(c->ilist, sizeof(item *) * c->isize * 2);
if (new_list) {
c->isize *= 2;
c->ilist = new_list;
} else {
...// scr: stat
item_remove(it);
break;
}
}

if (return_cas)
{
...// scr: cas
}
else
{
MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nkey,
it->nbytes, ITEM_get_cas(it));
if (add_iov(c, "VALUE ", 6) != 0 || // scr: ---> 4)
add_iov(c, ITEM_key(it), it->nkey) != 0 ||
add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0)
{
item_remove(it);
break;
}
}

...// scr: verbose & stat

item_update(it); // scr: ------------------------> 5)
*(c->ilist + i) = it; // scr: -------------------> 6)
i++;

} else {
...// scr: stat
}

key_token++; // scr: --------------------------------> 1)
}

/*
* If the command string hasn't been fully processed, get the next set
* of tokens.
*/
if(key_token->value != NULL) { // scr: ------------------> 1)
ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);
key_token = tokens;
}

} while(key_token->value != NULL);

c->icurr = c->ilist; // scr: --------------------------------> 6)
c->ileft = i; scr: ------------------------------------------> 6)
...// scr: cas & verbose

if (key_token->value != NULL || add_iov(c, "END\r\n", 5) != 0
|| (IS_UDP(c->transport) && build_udp_headers(c) != 0)) {
out_of_memory(c, "SERVER_ERROR out of memory writing get response");
}
else { // scr: ----------------------------------------------> 7)
conn_set_state(c, conn_mwrite);
c->msgcurr = 0;
}
}
process_get_command@memcached.cSourceRead

1) Iterate through key token array. Here we got one key token ‘test’.

2) Call item_get for the item pointer.

3) Increase the ilist size if it is full, and . Here ilist stores the item being processed. In the end of the current command processing, this list is used to batch release the items reference counts.

4) add_iov prepares the output of this session.

5) Call item_update to manipulate the LRU lists.

6) Link the item currently being processed to ilist, and update the associated fields.

7) Move on to the next state conn_mwrite.

add_iov

static int add_iov(conn *c, const void *buf, int len) {
struct msghdr *m;
int leftover;
bool limit_to_mtu;

assert(c != NULL);

do {
m = &c->msglist[c->msgused - 1]; // scr: -------------------> 1)

/*
* Limit UDP packets, and the first payloads of TCP replies, to
* UDP_MAX_PAYLOAD_SIZE bytes.
*/
limit_to_mtu = IS_UDP(c->transport) || (1 == c->msgused);

/* We may need to start a new msghdr if this one is full. */
if (m->msg_iovlen == IOV_MAX ||
(limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
add_msghdr(c);
m = &c->msglist[c->msgused - 1]; // scr: ---------------> 7)
}

if (ensure_iov_space(c) != 0) // scr: ----------------------> 2)
return -1;

/* If the fragment is too big to fit in the datagram, split it up */
if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE; //scr*)
len -= leftover;
} else {
leftover = 0;
}

m = &c->msglist[c->msgused - 1]; // scr: ------------------> 1)
m->msg_iov[m->msg_iovlen].iov_base = (void *)buf; // scr: -> 3)
m->msg_iov[m->msg_iovlen].iov_len = len;

c->msgbytes += len; // scr: -------------------------------> 4)
c->iovused++; // scr: -------------------------------------> 5)
m->msg_iovlen++; // scr: ----------------------------------> 6)

buf = ((char *)buf) + len;
len = leftover;
} while (leftover > 0);

return 0;
}
add_iov@memcached.cSourceRead

This method initialised an entry on iov list and add it to the last in-use item in msglist (Figure - msglist & iov).

1) Get the tail of the in use portion of msglist.

2) Expend iov if necessary.

3) Initialize the iov_base and iov_len fields within the iov entry. Note that the msg_iov has been linked to the position of specific entry in iov, hence operations on msg_iov change the content of iov as well.

4) Update msgbytes with the total item size.

5, 6) Update iovused and msg_iovlen accordingly.

7) Handle MTU with the assistance of do while loop.

conn_mwrite

...
case conn_mwrite:
if (IS_UDP(c->transport) && c->msgcurr == 0 && build_udp_headers(c) != 0) {
...// scr: UDP related
}
switch (transmit(c)) {
...// scr: state processing
}
break;
...
memcached.c:4521SourceRead

Before explaining the logic process of conn_mwrite state, we look at the essential within first, which is

transmit

static enum transmit_result transmit(conn *c) {
assert(c != NULL);

if (c->msgcurr < c->msgused &&
c->msglist[c->msgcurr].msg_iovlen == 0) { // scr: ---------> 1)
/* Finished writing the current msg; advance to the next. */
c->msgcurr++;
}
if (c->msgcurr < c->msgused) { // scr: ----------------------------> 2)
ssize_t res;
struct msghdr *m = &c->msglist[c->msgcurr];

res = sendmsg(c->sfd, m, 0);
if (res > 0) {
...// scr: state
/* We've written some of the data. Remove the completed
iovec entries from the list of pending writes. */
while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) {
res -= m->msg_iov->iov_len;
m->msg_iovlen--;
m->msg_iov++;
}

/* Might have written just part of the last iovec entry;
adjust it so the next write will do the rest. */
if (res > 0) {
m->msg_iov->iov_base = (caddr_t)m->msg_iov->iov_base + res;
m->msg_iov->iov_len -= res;
}
return TRANSMIT_INCOMPLETE;
}
if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) { // 3)
if (!update_event(c, EV_WRITE | EV_PERSIST)) {
if (settings.verbose > 0)
fprintf(stderr, "Couldn't update event\n");
conn_set_state(c, conn_closing);
return TRANSMIT_HARD_ERROR;
}
return TRANSMIT_SOFT_ERROR;
}
/* if res == 0 or res == -1 and error is not EAGAIN or EWOULDBLOCK,
we have a real error, on which we close the connection */
if (settings.verbose > 0)
perror("Failed to write, and not due to blocking");

if (IS_UDP(c->transport))
...// scr: UDP related
else
conn_set_state(c, conn_closing);
return TRANSMIT_HARD_ERROR; // scr: --------------------------> 4)
} else {
return TRANSMIT_COMPLETE; // scr: ----------------------------> 5)
}
}
transmit@memcached.cSourceRead

As the essential method of state conn_mwrite processing, transmit goes through the msglist (starting from 0, the initial value) and tries its best to send out all the pending data accumulated in the current session. This is done within itself or in subsequent passes through the event loop. Only when blocking operation is indicated by EAGAIN or EWOULDBLOCK, the state machine stops the current event loop iteration, and the same session will be resumed when the buffer space becomes available again.

1) If the msg_iovlen is 0, the writing of msgcurr slot has finished, hence move to the next slot.

2) Call sendmsg and move msg_iov, iov_base and iov_len according to the data length (res) that has been written successfully. This leads to case b) of the state processing.

3) As mentioned, EAGAIN or EWOULDBLOCK returned by sendmsg leads to case c) of state processing.

4) Errors other than the above two lead to case c) of state processing.

5) c->msgcurr >= c->msgused means write of all data of the session finished, which leads to b) of the state processing.

Back to state processing

...
case conn_mwrite:
if (IS_UDP(c->transport) && c->msgcurr == 0 && build_udp_headers(c) != 0) {
...// scr: UDP related
}
switch (transmit(c)) {
case TRANSMIT_COMPLETE:
if (c->state == conn_mwrite) { // scr: ------------> a)
conn_release_items(c);
/* XXX: I don't know why this wasn't the general case */
if(c->protocol == binary_prot) {
conn_set_state(c, c->write_and_go);
} else {
conn_set_state(c, conn_new_cmd);
}
} else if (c->state == conn_write) {
...// scr: not applicable
} else {
...// scr: not applicable
}
break;

case TRANSMIT_INCOMPLETE: // scr: ---------------------> b)
case TRANSMIT_HARD_ERROR:
break; /* Continue in state machine. */

case TRANSMIT_SOFT_ERROR: // scr: ---------------------> c)
stop = true;
break;
}
break;
...
memcached.c:4521SourceRead

According to the result of transmit, the logic flows to the following 3 branches,

a) If the result is TRANSMIT_COMPLETE, 1) finalise the current command processing with conn_release_items; 2) switch the state to conn_new_cmd which 3) eventually falls to conn_waiting and, as discussed, finishes the current event loop.

b) If the result is TRANSMIT_INCOMPLETE and TRANSMIT_HARD_ERROR, the state machine keeps the same state, and the subsequent passes through the event loop continues consuming more data in msglist. Unlike read operation, TRANSMIT_INCOMPLETE does not lead to immediate event loop finish because write operation does not block until buffer is full.

c) TRANSMIT_SOFT_ERROR means the buffer is full, hence finish the current event loop iteration straight away.

Finish read

static void conn_release_items(conn *c) {
assert(c != NULL);

if (c->item) {
...// scr: not applicable
}

while (c->ileft > 0) {
item *it = *(c->icurr);
assert((it->it_flags & ITEM_SLABBED) == 0);
item_remove(it); // scr: ---------------------> 1)
c->icurr++;
c->ileft--;
}

...// scr: cas

c->icurr = c->ilist;
...// scr: cas
}
conn_release_items@memcached.cSourceRead

Not sure if you noticed or not, there is a subtle bug in the LRU III, the reference count of the item in read operation is not returned to 0 as in other operations. This is because

1) all items ownership are batched released here at the end the (read) command processing.

That's it. Did I make a serious mistake? or miss out on anything important? Or you simply like the read. Link me on -- I'd be chuffed to hear your feedback.