Understanding The Memcached Source Code - Event Driven III

slab allocator (I, II, III) is the core module of the cache system, which largely determines how efficient the bottleneck resource, memory, can be utilized. The other 3 parts, namely,

LRU algorithm (I , II , III) for entry expiration; and an

event driven model (I , II , III - this article) based on libevent; and

consistent hashing for data distribution,

are built around it.

We continue examining the other two operations, i.e., create and delete, in the event driven context. Now it’s a good chance to revisit the core data structure and look at the

Properties in discussion

wbuf - the address for write buffer for simple response output (e.g., STORED). used by out_string

wcurr - not very useful as it points to the same address as wbuf. used by conn_write

wsize - the total size of write buffer. used by out_string to determine buff overflow

wbytes - the length of data populated in write buffer. value is set in out_string; used by conn_write when writing it to the “real” output buffer iov

write_and_go - set to conn_new_cmd in the very last step to form a “state loop”

ritem - one of the essential properties for data reading. it is set to the address of the data portion of the actual item created by create command

rlbytes - one of the essential properties for data reading. it is set to the length of the data in need

item - record of the actual item created by create command

noreply - determined by the command. we assume it is set to false

Create

As usual, we start with a command sent to a Memcached server.

> add test 0 60 11 (\r\n)
> hello world

As mentioned in LRU III, two passes are involved in the command processing, the first pass creates an empty object after reading the first line, and the second populates the object with the concrete value contained in the second line. Such division is for the multiple I/O triggered by line breaks in telnet.


In fact, most of the logic involved in this post has been discussed before such as in LRU III and Event Driven II. Hence this post will only resolve the missing parts and linking points.

For the first command,

> add test 0 60 11 (\r\n)

The Memcached instance outputs the following lines. This time we omit the output for accepting the new connection

27: going from conn_new_cmd to conn_waiting
27: going from conn_waiting to conn_read
27: going from conn_read to conn_parse_cmd
27: Client using the ascii protocol
<27 add test 0 60 11
27: going from conn_parse_cmd to conn_nread

The logic for command reading and parsing (conn_new_cmd to conn_parse_cmd) are the same as what described in Event Driven II. The difference is that

process_update_command

is invoked after the command parsing. Though the method has been examined in LRU III, it is worth reminding that the last step is to update the session context for the next state (conn_nread) which handles the actual data reading.

static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) {
... // LRU III
c->item = it;
c->ritem = ITEM_data(it);
c->rlbytes = it->nbytes;
c->cmd = comm;
conn_set_state(c, conn_nread);
}
process_update_command@memcached.c

Next we look at the second command

...
> hello world

and its associated output

> NOT FOUND test
>27 STORED
27: going from conn_nread to conn_write
27: going from conn_write to conn_new_cmd
...

The key code fragment for the state switching above is

conn_nread

...
static void drive_machine(conn *c) {
...
case conn_nread:
if (c->rlbytes == 0) { // scr: ---------------------------> 5)
complete_nread(c);
break;
}

...// scr: error handling

/* first check if we have leftovers in the conn_read buffer */
if (c->rbytes > 0) { // scr: -----------------------------> 1)
int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
if (c->ritem != c->rcurr) {
memmove(c->ritem, c->rcurr, tocopy);
}
c->ritem += tocopy;
c->rlbytes -= tocopy;
c->rcurr += tocopy;
c->rbytes -= tocopy;
if (c->rlbytes == 0) {
break;
}
}

/* now try reading from the socket */
res = read(c->sfd, c->ritem, c->rlbytes); // scr: --------> 2)
if (res > 0) {
...// scr: stat
if (c->rcurr == c->ritem) {
c->rcurr += res;
}
c->ritem += res;
c->rlbytes -= res;
break;
}
if (res == 0) { /* end of stream */ // scr: --------------> 3)
conn_set_state(c, conn_closing);
break;
} // scr: ------------------------------------------------> 4)
if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
if (!update_event(c, EV_READ | EV_PERSIST)) {
...// scr: error handling
}
stop = true;
break;
}
...// scr: error handling
break;
...
process_update_command@memcached.c

1) Check if there are some leftover data (from the command read phase). If so, read directly.


More specific, say, if you can enter the command above fast enough (maybe copy paste it directly to telnet), the data portion will be coalesced in with the command by read.

2) Read the data to the memory pointed by ritem.

3) If the connection is closed (FIN) in the middle of the read, close the session.

4) If the data is separated into multiple reads, then set the drive machine to listen to more data and suspend.

5) Normal termination - read finished, call complete_nread which is covered in LRU III.

The missing part in LRU III is out_string. Combined with conn_write, it functions as a simpler version of process_get_command for “simple response”, and the actual data writing is handled by conn_mwrite (note the fall through... in the switch case) block which, as discussed, changes the state back to conn_new_cmd. Next we discuss the process in detail.

out_string

static void out_string(conn *c, const char *str) {
size_t len;

...//scr: not applicable

if (settings.verbose > 1)
fprintf(stderr, ">%d %s\n", c->sfd, str);

/* Nuke a partial output... */
c->msgcurr = 0; // scr: ---------------------------> 1)
c->msgused = 0;
c->iovused = 0;
add_msghdr(c);

len = strlen(str); // scr: ------------------------> 2)
if ((len + 2) > c->wsize) {
/* ought to be always enough. just fail for simplicity */
str = "SERVER_ERROR output line too long";
len = strlen(str);
}

memcpy(c->wbuf, str, len); // scr: ----------------> 3)
memcpy(c->wbuf + len, "\r\n", 2);
c->wbytes = len + 2;
c->wcurr = c->wbuf;

conn_set_state(c, conn_write); // scr: ------------> 4)
c->write_and_go = conn_new_cmd; // scr: -----------> 5)
return;
}
memcached.c:out_string

1) Initialize the iov. the mechanism and add_msghdr has been discussed in the last post

2) Calculate string length, and be paranoid for survival.

3) Populate wbuf with the output string and point wcurr to wbuf.

4) Indicate the next state conn_write.

5) Set the last state to conn_new_cmd and form the “loop”.

conn_write

...
case conn_write:
/*
* We want to write out a simple response. If we haven't already,
* assemble it into a msgbuf list (this will be a single-entry
* list for TCP or a two-entry list for UDP).
*/
if (c->iovused == 0 || (IS_UDP(c->transport) && c->iovused == 1)) {
if (add_iov(c, c->wcurr, c->wbytes) != 0) { // scr: ---> 1)
if (settings.verbose > 0)
fprintf(stderr, "Couldn't build response\n");
conn_set_state(c, conn_closing);
break;
}
}

/* fall through... */
case conn_mwrite:
...// scr: discussed
switch (transmit(c)) {
case TRANSMIT_COMPLETE:
if (c->state == conn_mwrite) {
...// scr: discussed
} else if (c->state == conn_write) {
if (c->write_and_free) {
...// scr: not applicable
}
conn_set_state(c, c->write_and_go); // scr: -------> 2)
} else {
...// scr: discussed
}
break;
...
memcached.c:4507

1) Add the content of “write buffer” to the actual iov.

2) Pick up the write_and_go (i.e., conn_new_cmd) and set it to the next state.

Next, we send the

Delete

command to delete the entry we just added.

> delete test

And the output this time is

28: going from conn_read to conn_parse_cmd
<28 delete test
> FOUND KEY test
>28 DELETED
28: going from conn_parse_cmd to conn_write
28: going from conn_write to conn_new_cmd
28: going from conn_new_cmd to conn_waiting
28: going from conn_waiting to conn_read

Like add the entry point of this command (after parsed) is process_delete_command which has been fully covered in LRU III. Furthermore, out_string is called within process_delete_command to trigger the state switches from conn_write, and back to conn_new_cmd.

To go

Memcached state machine