Commit 57d9d1be authored by Rusty Russell's avatar Rusty Russell

ccan/io: initialize connection with an explicit I/O plan.

Rather than going via a callback, which tends to just set up I/O, do
any setup before the call to io_new_conn(), then pass it the io_plan
directly.

The patch shows how much this simplifies our test code.
Signed-off-by: default avatarRusty Russell <rusty@rustcorp.com.au>
parent 737705f0
......@@ -34,49 +34,29 @@
* // This reads from stdin.
* static struct io_plan wake_writer(struct io_conn *, struct stdin_buffer *);
* // This writes the stdin buffer to the child.
* static struct io_plan write_to_child(struct io_conn *c,
* struct stdin_buffer *b);
* static struct io_plan read_stdin(struct io_conn *c, struct stdin_buffer *b)
* {
* assert(c == b->reader);
* b->len = sizeof(b->inbuf);
* return io_read_partial(b->inbuf, &b->len, wake_writer, b);
* }
* static struct io_plan wake_reader(struct io_conn *, struct stdin_buffer *);
*
* static struct io_plan wake_writer(struct io_conn *c, struct stdin_buffer *b)
* {
* assert(c == b->reader);
* io_wake(b->writer, write_to_child, b);
* io_wake(b->writer, io_write(b->inbuf, b->len, wake_reader, b));
* return io_idle();
* }
*
* static void reader_exit(struct io_conn *c, struct stdin_buffer *b)
* {
* assert(c == b->reader);
* io_wake(b->writer, write_to_child, b);
* io_wake(b->writer, io_close(b->writer, NULL));
* b->reader = NULL;
* }
*
* static struct io_plan wake_reader(struct io_conn *c, struct stdin_buffer *b)
* {
* assert(c == b->writer);
* io_wake(b->reader, read_stdin, b);
* return io_idle();
* }
*
* static struct io_plan write_to_child(struct io_conn *conn,
* struct stdin_buffer *b)
* {
* assert(conn == b->writer);
* if (!b->reader)
* return io_close(conn, NULL);
* return io_write(b->inbuf, b->len, wake_reader, b);
* }
*
* static struct io_plan start_writer(struct io_conn *conn,
* struct stdin_buffer *b)
* {
* assert(conn == b->writer);
* if (!b->reader)
* return io_close(c, NULL);
* b->len = sizeof(b->inbuf);
* io_wake(b->reader, io_read_partial(b->inbuf, &b->len, wake_writer, b));
* return io_idle();
* }
*
......@@ -92,15 +72,8 @@
* {
* b->off += b->rlen;
*
* if (b->off == b->max) {
* if (b->max == 0)
* b->max = 128;
* else if (b->max >= 1024*1024)
* b->max += 1024*1024;
* else
* b->max *= 2;
* b->buf = realloc(b->buf, b->max);
* }
* if (b->off == b->max)
* b->buf = realloc(b->buf, b->max *= 2);
*
* b->rlen = b->max - b->off;
* return io_read_partial(b->buf + b->off, &b->rlen, read_from_child, b);
......@@ -110,11 +83,12 @@
* int main(int argc, char *argv[])
* {
* int tochild[2], fromchild[2];
* struct buffer out = { 0, 0, 0, NULL };
* struct buffer out;
* struct stdin_buffer sbuf;
* int status;
* size_t off;
* ssize_t ret;
* struct io_conn *from_child;
*
* if (argc == 1)
* errx(1, "Usage: runner <cmdline>...");
......@@ -137,11 +111,23 @@
* close(fromchild[1]);
* signal(SIGPIPE, SIG_IGN);
*
* sbuf.reader = io_new_conn(STDIN_FILENO, read_stdin, reader_exit, &sbuf);
* sbuf.writer = io_new_conn(tochild[1], start_writer, fail_child_write,
* sbuf.len = sizeof(sbuf.inbuf);
* sbuf.reader = io_new_conn(STDIN_FILENO,
* io_read_partial(sbuf.inbuf, &sbuf.len,
* wake_writer, &sbuf),
* reader_exit, &sbuf);
* sbuf.writer = io_new_conn(tochild[1], io_idle(), fail_child_write,
* &sbuf);
* if (!sbuf.reader || !sbuf.writer
* || !io_new_conn(fromchild[0], read_from_child, NULL, &out))
*
* out.max = 128;
* out.off = 0;
* out.rlen = 128;
* out.buf = malloc(out.max);
* from_child = io_new_conn(fromchild[0],
* io_read_partial(out.buf, &out.rlen,
* read_from_child, &out),
* NULL, NULL);
* if (!sbuf.reader || !sbuf.writer || !from_child)
* err(1, "Allocating connections");
*
* io_loop();
......
......@@ -28,7 +28,7 @@ struct client {
static struct io_plan write_reply(struct io_conn *conn, struct client *client);
static struct io_plan read_request(struct io_conn *conn, struct client *client)
{
return io_read(conn, client->request_buffer, REQUEST_SIZE,
return io_read(client->request_buffer, REQUEST_SIZE,
write_reply, client);
}
......@@ -41,7 +41,7 @@ static struct io_plan write_complete(struct io_conn *conn, struct client *client
static struct io_plan write_reply(struct io_conn *conn, struct client *client)
{
return io_write(conn, client->reply_buffer, REPLY_SIZE,
return io_write(client->reply_buffer, REPLY_SIZE,
write_complete, client);
}
......@@ -108,12 +108,7 @@ static void sigalarm(int sig)
static struct io_plan do_timeout(struct io_conn *conn, char *buf)
{
return io_break(conn, buf, NULL, NULL);
}
static struct io_plan do_timeout_read(struct io_conn *conn, char *buf)
{
return io_read(conn, buf, 1, do_timeout, buf);
return io_break(buf, io_idle());
}
int main(int argc, char *argv[])
......@@ -155,11 +150,14 @@ int main(int argc, char *argv[])
if (ret < 0)
err(1, "Accepting fd");
/* For efficiency, we share client structure */
io_new_conn(ret, read_request, NULL, &client);
io_new_conn(ret,
io_read(client.request_buffer, REQUEST_SIZE,
write_reply, &client),
NULL, NULL);
}
}
io_new_conn(timeout[0], do_timeout_read, NULL, &buf);
io_new_conn(timeout[0], io_read(&buf, 1, do_timeout, &buf), NULL, NULL);
close(wake[0]);
for (i = 0; i < NUM_CHILDREN; i++)
......
......@@ -29,26 +29,25 @@ static struct io_plan write_reply(struct io_conn *conn, struct client *client);
static struct io_plan read_body(struct io_conn *conn, struct client *client)
{
assert(client->len <= REQUEST_MAX);
return io_read(conn, client->request_buffer, client->len,
return io_read(client->request_buffer, client->len,
write_reply, client);
}
static struct io_plan read_header(struct io_conn *conn, struct client *client)
static struct io_plan io_read_header(struct client *client)
{
return io_read(conn, &client->len, sizeof(client->len),
read_body, client);
return io_read(&client->len, sizeof(client->len), read_body, client);
}
/* once we're done, loop again. */
static struct io_plan write_complete(struct io_conn *conn, struct client *client)
{
completed++;
return read_header(conn, client);
return io_read_header(client);
}
static struct io_plan write_reply(struct io_conn *conn, struct client *client)
{
return io_write(conn, &client->len, sizeof(client->len),
return io_write(&client->len, sizeof(client->len),
write_complete, client);
}
......@@ -114,12 +113,7 @@ static void sigalarm(int sig)
static struct io_plan do_timeout(struct io_conn *conn, char *buf)
{
return io_break(conn, buf, NULL, NULL);
}
static struct io_plan do_timeout_read(struct io_conn *conn, char *buf)
{
return io_read(conn, buf, 1, do_timeout, buf);
return io_break(buf, io_idle());
}
int main(int argc, char *argv[])
......@@ -163,11 +157,11 @@ int main(int argc, char *argv[])
err(1, "Accepting fd");
/* For efficiency, we share buffer */
client->request_buffer = buffer;
io_new_conn(ret, read_header, NULL, client);
io_new_conn(ret, io_read_header(client), NULL, NULL);
}
}
io_new_conn(timeout[0], do_timeout_read, NULL, &buf);
io_new_conn(timeout[0], io_read(&buf, 1, do_timeout, &buf), NULL, NULL);
close(wake[0]);
for (i = 0; i < NUM_CHILDREN; i++)
......
......@@ -16,23 +16,8 @@ struct buffer {
char buf[32];
};
static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf);
static struct io_plan poke_reader(struct io_conn *conn, struct buffer *buf);
static struct io_plan do_read(struct io_conn *conn, struct buffer *buf)
{
assert(conn == buf->reader);
return io_read(conn, &buf->buf, sizeof(buf->buf), poke_writer, buf);
}
static struct io_plan do_write(struct io_conn *conn, struct buffer *buf)
{
assert(conn == buf->writer);
return io_write(conn, &buf->buf, sizeof(buf->buf), poke_reader, buf);
}
static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf)
{
assert(conn == buf->reader);
......@@ -41,31 +26,25 @@ static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf)
return io_close(conn, NULL);
/* You write. */
io_wake(buf->writer, do_write, buf);
io_wake(buf->writer,
io_write(&buf->buf, sizeof(buf->buf), poke_reader, buf));
/* I'll wait until you wake me. */
return io_idle(conn);
return io_idle();
}
static struct io_plan poke_reader(struct io_conn *conn, struct buffer *buf)
{
assert(conn == buf->writer);
/* You read. */
io_wake(buf->reader, do_read, buf);
io_wake(buf->reader,
io_read(&buf->buf, sizeof(buf->buf), poke_writer, buf));
if (++buf->iters == NUM_ITERS)
return io_close(conn, NULL);
/* I'll wait until you tell me to write. */
return io_idle(conn);
}
static struct io_plan reader(struct io_conn *conn, struct buffer *buf)
{
assert(conn == buf->reader);
/* Wait for writer to tell us to read. */
return io_idle(conn);
return io_idle();
}
int main(void)
......@@ -87,10 +66,14 @@ int main(void)
memset(buf[i].buf, i, sizeof(buf[i].buf));
sprintf(buf[i].buf, "%i-%i", i, i);
buf[i].reader = io_new_conn(last_read, reader, NULL, &buf[i]);
buf[i].reader = io_new_conn(last_read, io_idle(), NULL, NULL);
if (!buf[i].reader)
err(1, "Creating reader %i", i);
buf[i].writer = io_new_conn(fds[1], do_write, NULL, &buf[i]);
buf[i].writer = io_new_conn(fds[1],
io_write(&buf[i].buf,
sizeof(buf[i].buf),
poke_reader, &buf[i]),
NULL, NULL);
if (!buf[i].writer)
err(1, "Creating writer %i", i);
last_read = fds[0];
......@@ -100,10 +83,13 @@ int main(void)
i = 0;
buf[i].iters = 0;
sprintf(buf[i].buf, "%i-%i", i, i);
buf[i].reader = io_new_conn(last_read, reader, NULL, &buf[i]);
buf[i].reader = io_new_conn(last_read, io_idle(), NULL, NULL);
if (!buf[i].reader)
err(1, "Creating reader %i", i);
buf[i].writer = io_new_conn(last_write, do_write, NULL, &buf[i]);
buf[i].writer = io_new_conn(last_write, io_write(&buf[i].buf,
sizeof(buf[i].buf),
poke_reader, &buf[i]),
NULL, NULL);
if (!buf[i].writer)
err(1, "Creating writer %i", i);
......
......@@ -40,7 +40,7 @@ void io_close_listener(struct io_listener *l)
}
struct io_conn *io_new_conn_(int fd,
struct io_plan (*start)(struct io_conn *, void *),
struct io_plan plan,
void (*finish)(struct io_conn *, void *),
void *arg)
{
......@@ -51,11 +51,9 @@ struct io_conn *io_new_conn_(int fd,
conn->fd.listener = false;
conn->fd.fd = fd;
conn->plan.next = start;
conn->plan = plan;
conn->finish = finish;
conn->finish_arg = conn->plan.next_arg = arg;
conn->plan.pollflag = 0;
conn->plan.state = IO_NEXT;
conn->finish_arg = arg;
conn->duplex = NULL;
conn->timeout = NULL;
if (!add_conn(conn)) {
......@@ -66,9 +64,9 @@ struct io_conn *io_new_conn_(int fd,
}
struct io_conn *io_duplex_(struct io_conn *old,
struct io_plan (*start)(struct io_conn *, void *),
void (*finish)(struct io_conn *, void *),
void *arg)
struct io_plan plan,
void (*finish)(struct io_conn *, void *),
void *arg)
{
struct io_conn *conn;
......@@ -80,12 +78,10 @@ struct io_conn *io_duplex_(struct io_conn *old,
conn->fd.listener = false;
conn->fd.fd = old->fd.fd;
conn->plan.next = start;
conn->finish = finish;
conn->finish_arg = conn->plan.next_arg = arg;
conn->plan.pollflag = 0;
conn->plan.state = IO_NEXT;
conn->plan = plan;
conn->duplex = old;
conn->finish = finish;
conn->finish_arg = arg;
conn->timeout = NULL;
if (!add_duplex(conn)) {
free(conn);
......@@ -239,18 +235,14 @@ struct io_plan io_idle(void)
return plan;
}
void io_wake_(struct io_conn *conn,
struct io_plan (*fn)(struct io_conn *, void *), void *arg)
void io_wake(struct io_conn *conn, struct io_plan plan)
{
/* It might have finished, but we haven't called its finish() yet. */
if (conn->plan.state == IO_FINISHED)
return;
assert(conn->plan.state == IO_IDLE);
conn->plan.next = fn;
conn->plan.next_arg = arg;
conn->plan.pollflag = 0;
conn->plan.state = IO_NEXT;
conn->plan = plan;
backend_wakeup(conn);
}
......@@ -289,18 +281,9 @@ struct io_plan io_close(struct io_conn *conn, void *arg)
}
/* Exit the loop, returning this (non-NULL) arg. */
struct io_plan io_break_(void *ret,
struct io_plan (*fn)(struct io_conn *, void *),
void *arg)
struct io_plan io_break(void *ret, struct io_plan plan)
{
struct io_plan plan;
io_loop_return = ret;
plan.state = IO_NEXT;
plan.pollflag = 0;
plan.next = fn;
plan.next_arg = arg;
return plan;
}
......@@ -64,29 +64,23 @@ struct io_plan {
/**
* io_new_conn - create a new connection.
* @fd: the file descriptor.
* @start: the first function to call.
* @plan: the first I/O function.
* @finish: the function to call when it's closed or fails.
* @arg: the argument to both @start and @finish.
* @arg: the argument to @finish.
*
* This creates a connection which owns @fd. @start will be called on the
* next return to io_loop(), and @finish will be called when an I/O operation
* This creates a connection which owns @fd. @plan will be called on the
* next io_loop(), and @finish will be called when an I/O operation
* fails, or you call io_close() on the connection.
*
* The @start function must call one of the io queueing functions
* (eg. io_read, io_write) and return the next function to call once
* that is done using io_next(). The alternative is to call io_close().
*
* Returns NULL on error (and sets errno).
*/
#define io_new_conn(fd, start, finish, arg) \
io_new_conn_((fd), \
typesafe_cb_preargs(struct io_plan, void *, \
(start), (arg), struct io_conn *), \
#define io_new_conn(fd, plan, finish, arg) \
io_new_conn_((fd), (plan), \
typesafe_cb_preargs(void, void *, (finish), (arg), \
struct io_conn *), \
(arg))
struct io_conn *io_new_conn_(int fd,
struct io_plan (*start)(struct io_conn *, void *),
struct io_plan plan,
void (*finish)(struct io_conn *, void *),
void *arg);
......@@ -243,9 +237,9 @@ bool io_timeout_(struct io_conn *conn, struct timespec ts,
/**
* io_duplex - split an fd into two connections.
* @conn: a connection.
* @start: the first function to call.
* @plan: the first I/O function to call.
* @finish: the function to call when it's closed or fails.
* @arg: the argument to both @start and @finish.
* @arg: the argument to @finish.
*
* Sometimes you want to be able to simultaneously read and write on a
* single fd, but io forces a linear call sequence. The solition is
......@@ -254,56 +248,38 @@ bool io_timeout_(struct io_conn *conn, struct timespec ts,
*
* You must io_close() both of them to close the fd.
*/
#define io_duplex(conn, start, finish, arg) \
io_duplex_((conn), \
typesafe_cb_preargs(struct io_plan, void *, \
(start), (arg), struct io_conn *), \
#define io_duplex(conn, plan, finish, arg) \
io_duplex_((conn), (plan), \
typesafe_cb_preargs(void, void *, (finish), (arg), \
struct io_conn *), \
(arg))
struct io_conn *io_duplex_(struct io_conn *conn,
struct io_plan (*start)(struct io_conn *, void *),
struct io_plan plan,
void (*finish)(struct io_conn *, void *),
void *arg);
/**
* io_wake - wake up and idle connection.
* io_wake - wake up an idle connection.
* @conn: an idle connection.
* @fn: the next function to call once queued IO is complete.
* @arg: the argument to @next.
* @plan: the next I/O function for @conn.
*
* This makes @conn run its @next function the next time around the
* io_loop().
* This makes @conn do I/O the next time around the io_loop().
*/
#define io_wake(conn, fn, arg) \
io_wake_((conn), \
typesafe_cb_preargs(struct io_plan, void *, \
(fn), (arg), struct io_conn *), \
(arg))
void io_wake_(struct io_conn *conn,
struct io_plan (*fn)(struct io_conn *, void *), void *arg);
void io_wake(struct io_conn *conn, struct io_plan plan);
/**
* io_break - return from io_loop()
* @ret: non-NULL value to return from io_loop().
* @cb: function to call once on return
* @arg: @cb argument
* @plan: I/O to perform on return (if any)
*
* This breaks out of the io_loop. As soon as the current @next
* function returns, any io_closed()'d connections will have their
* finish callbacks called, then io_loop() with return with @ret.
*
* If io_loop() is called again, then @cb will be called.
* If io_loop() is called again, then @plan will be carried out.
*/
#define io_break(ret, fn, arg) \
io_break_((ret), \
typesafe_cb_preargs(struct io_plan, void *, \
(fn), (arg), struct io_conn *), \
(arg))
struct io_plan io_break_(void *ret,
struct io_plan (*fn)(struct io_conn *, void *),
void *arg);
struct io_plan io_break(void *ret, struct io_plan plan);
/* FIXME: io_recvfrom/io_sendto */
......
......@@ -37,6 +37,9 @@ static bool add_fd(struct fd *fd, short events)
fds[num_fds] = fd;
fd->backend_info = num_fds;
num_fds++;
if (events)
num_waiting++;
return true;
}
......@@ -46,6 +49,8 @@ static void del_fd(struct fd *fd)
assert(n != -1);
assert(n < num_fds);
if (pollfds[n].events)
num_waiting--;
if (n != num_fds - 1) {
/* Move last one over us. */
pollfds[n] = pollfds[num_fds-1];
......@@ -69,22 +74,49 @@ bool add_listener(struct io_listener *l)
{
if (!add_fd(&l->fd, POLLIN))
return false;
num_waiting++;
return true;
}
static void adjust_counts(enum io_state state)
{
if (state == IO_NEXT)
num_next++;
else if (state == IO_FINISHED)
num_finished++;
}
static void update_pollevents(struct io_conn *conn)
{
struct pollfd *pfd = &pollfds[conn->fd.backend_info];
if (pfd->events)
num_waiting--;
pfd->events = conn->plan.pollflag;
if (conn->duplex) {
int mask = conn->duplex->plan.pollflag;
/* You can't *both* read/write. */
assert(!mask || pfd->events != mask);
pfd->events |= mask;
}
if (pfd->events)
num_waiting++;
adjust_counts(conn->plan.state);
}
bool add_conn(struct io_conn *c)
{
if (!add_fd(&c->fd, 0))
if (!add_fd(&c->fd, c->plan.pollflag))
return false;
num_next++;
adjust_counts(c->plan.state);
return true;
}
bool add_duplex(struct io_conn *c)
{
c->fd.backend_info = c->duplex->fd.backend_info;
num_next++;
update_pollevents(c);
return true;
}
......@@ -114,32 +146,13 @@ void del_listener(struct io_listener *l)
static void backend_set_state(struct io_conn *conn, struct io_plan plan)
{
struct pollfd *pfd = &pollfds[conn->fd.backend_info];
if (pfd->events)
num_waiting--;
pfd->events = plan.pollflag;
if (conn->duplex) {
int mask = conn->duplex->plan.pollflag;
/* You can't *both* read/write. */
assert(!mask || pfd->events != mask);
pfd->events |= mask;
}
if (pfd->events)
num_waiting++;
if (plan.state == IO_NEXT)
num_next++;
else if (plan.state == IO_FINISHED)
num_finished++;
conn->plan = plan;
update_pollevents(conn);
}
void backend_wakeup(struct io_conn *conn)
{
num_next++;
update_pollevents(conn);
}
static void accept_conn(struct io_listener *l)
......
......@@ -6,23 +6,18 @@
#include <sys/wait.h>
#include <stdio.h>
static struct io_plan start_ok(struct io_conn *conn, int *state)
{
ok1(*state == 0);
(*state)++;
return io_close(conn, NULL);
}
static void finish_ok(struct io_conn *conn, int *state)
{
ok1(*state == 1);
(*state)++;
io_break(state + 1, NULL, NULL);
io_break(state + 1, io_idle());
}
static void init_conn(int fd, int *state)
{
if (!io_new_conn(fd, start_ok, finish_ok, state))
ok1(*state == 0);
(*state)++;
if (!io_new_conn(fd, io_close(NULL, NULL), finish_ok, state))
abort();
}
......
......@@ -11,23 +11,20 @@ struct data {
char buf[4];
};
static struct io_plan start_ok(struct io_conn *conn, struct data *d)
{
ok1(d->state == 0);
d->state++;
return io_read(d->buf, sizeof(d->buf), io_close, d);
}
static void finish_ok(struct io_conn *conn, struct data *d)
{
ok1(d->state == 1);
d->state++;
io_break(d, NULL, NULL);
io_break(d, io_idle());
}
static void init_conn(int fd, struct data *d)
{
if (!io_new_conn(fd, start_ok, finish_ok, d))
ok1(d->state == 0);
d->state++;
if (!io_new_conn(fd, io_read(d->buf, sizeof(d->buf), io_close, d),
finish_ok, d))
abort();
}
......
......@@ -12,24 +12,21 @@ struct data {
char buf[4];
};
static struct io_plan start_ok(struct io_conn *conn, struct data *d)
{
ok1(d->state == 0);
d->state++;
d->bytes = sizeof(d->buf);
return io_read_partial(d->buf, &d->bytes, io_close, d);
}
static void finish_ok(struct io_conn *conn, struct data *d)
{
ok1(d->state == 1);
d->state++;
io_break(d, NULL, NULL);
io_break(d, io_idle());
}
static void init_conn(int fd, struct data *d)
{
if (!io_new_conn(fd, start_ok, finish_ok, d))
ok1(d->state == 0);
d->state++;
d->bytes = sizeof(d->buf);
if (!io_new_conn(fd, io_read_partial(d->buf, &d->bytes, io_close, d),
finish_ok, d))
abort();
}
......
......@@ -12,23 +12,19 @@ struct data {
char *buf;
};
static struct io_plan start_ok(struct io_conn *conn, struct data *d)
{
ok1(d->state == 0);
d->state++;
return io_write_partial(d->buf, &d->bytes, io_close, d);
}
static void finish_ok(struct io_conn *conn, struct data *d)
{
ok1(d->state == 1);
d->state++;
io_break(d, NULL, NULL);
io_break(d, io_idle());
}
static void init_conn(int fd, struct data *d)
{
if (!io_new_conn(fd, start_ok, finish_ok, d))
ok1(d->state == 0);
d->state++;
if (!io_new_conn(fd, io_write_partial(d->buf, &d->bytes, io_close, d),
finish_ok, d))
abort();
}
......
......@@ -12,23 +12,19 @@ struct data {
char *buf;
};
static struct io_plan start_ok(struct io_conn *conn, struct data *d)
{
ok1(d->state == 0);
d->state++;
return io_write(d->buf, d->bytes, io_close, d);
}
static void finish_ok(struct io_conn *conn, struct data *d)
{
ok1(d->state == 1);
d->state++;
io_break(d, NULL, NULL);
io_break(d, io_idle());
}
static void init_conn(int fd, struct data *d)
{
if (!io_new_conn(fd, start_ok, finish_ok, d))
ok1(d->state == 0);
d->state++;
if (!io_new_conn(fd, io_write(d->buf, d->bytes, io_close, d),
finish_ok, d))
abort();
}
......
......@@ -16,55 +16,39 @@ struct data {
char buf[4];
};
static struct io_plan plan_read(struct io_conn *conn, struct data *d)
static struct io_plan read_done(struct io_conn *conn, struct data *d)
{
ok1(d->state == 2 || d->state == 3);
d->state++;
return io_read(d->buf, sizeof(d->buf), io_close, d);
return io_close(conn, NULL);
}
static struct io_plan start_waker(struct io_conn *conn, struct data *d)
static void finish_waker(struct io_conn *conn, struct data *d)
{
io_wake(idler, io_read(d->buf, sizeof(d->buf), read_done, d));
ok1(d->state == 1);
d->state++;
io_wake(idler, plan_read, d);
return io_close(conn, NULL);
}
static void finish_waker(struct io_conn *conn, struct data *d)
static void finish_idle(struct io_conn *conn, struct data *d)
{
ok1(d->state == 2 || d->state == 3);
ok1(d->state == 3);
d->state++;
io_break(d, io_idle());
}
static struct io_plan start_idle(struct io_conn *conn, struct data *d)
static void init_conn(int fd, struct data *d)
{
int fd;
int fd2;
ok1(d->state == 0);
d->state++;
idler = conn;
idler = io_new_conn(fd, io_idle(), finish_idle, d);
/* This will wake us up. */
fd = open("/dev/null", O_RDONLY);
ok1(fd >= 0);
ok1(io_new_conn(fd, start_waker, finish_waker, d));
return io_idle();
}
static void finish_idle(struct io_conn *conn, struct data *d)
{
ok1(d->state == 4);
d->state++;
io_break(d, NULL, NULL);
}
static void init_conn(int fd, struct data *d)
{
if (!io_new_conn(fd, start_idle, finish_idle, d))
abort();
/* This will wake us up, as read will fail. */
fd2 = open("/dev/null", O_RDONLY);
ok1(fd2 >= 0);
ok1(io_new_conn(fd2, io_read(idler, 1, NULL, NULL), finish_waker, d));
}
static int make_listen_fd(const char *port, struct addrinfo **info)
......@@ -107,7 +91,7 @@ int main(void)
int fd, status;
/* This is how many tests you plan to run */
plan_tests(15);
plan_tests(14);
d->state = 0;
fd = make_listen_fd("65006", &addrinfo);
ok1(fd >= 0);
......@@ -137,7 +121,7 @@ int main(void)
freeaddrinfo(addrinfo);
ok1(io_loop() == d);
ok1(d->state == 5);
ok1(d->state == 4);
ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0);
free(d);
io_close_listener(l);
......
......@@ -11,18 +11,11 @@ struct data {
char buf[4];
};
static struct io_plan plan_read(struct io_conn *conn, struct data *d)
static struct io_plan read_done(struct io_conn *conn, struct data *d)
{
ok1(d->state == 1);
d->state++;
return io_read(d->buf, sizeof(d->buf), io_close, d);
}
static struct io_plan start_break(struct io_conn *conn, struct data *d)
{
ok1(d->state == 0);
d->state++;
return io_break(d, plan_read, d);
return io_close(conn, NULL);
}
static void finish_ok(struct io_conn *conn, struct data *d)
......@@ -33,7 +26,13 @@ static void finish_ok(struct io_conn *conn, struct data *d)
static void init_conn(int fd, struct data *d)
{
if (!io_new_conn(fd, start_break, finish_ok, d))
ok1(d->state == 0);
d->state++;
if (!io_new_conn(fd,
io_break(d,
io_read(d->buf, sizeof(d->buf), read_done, d)),
finish_ok, d))
abort();
}
......
......@@ -15,23 +15,8 @@ struct buffer {
char buf[32];
};
static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf);
static struct io_plan poke_reader(struct io_conn *conn, struct buffer *buf);
static struct io_plan plan_read(struct io_conn *conn, struct buffer *buf)
{
assert(conn == buf->reader);
return io_read(&buf->buf, sizeof(buf->buf), poke_writer, buf);
}
static struct io_plan plan_write(struct io_conn *conn, struct buffer *buf)
{
assert(conn == buf->writer);
return io_write(&buf->buf, sizeof(buf->buf), poke_reader, buf);
}
static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf)
{
assert(conn == buf->reader);
......@@ -40,7 +25,8 @@ static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf)
return io_close(conn, NULL);
/* You write. */
io_wake(buf->writer, plan_write, buf);
io_wake(buf->writer,
io_write(&buf->buf, sizeof(buf->buf), poke_reader, buf));
/* I'll wait until you wake me. */
return io_idle();
......@@ -50,7 +36,8 @@ static struct io_plan poke_reader(struct io_conn *conn, struct buffer *buf)
{
assert(conn == buf->writer);
/* You read. */
io_wake(buf->reader, plan_read, buf);
io_wake(buf->reader,
io_read(&buf->buf, sizeof(buf->buf), poke_writer, buf));
if (++buf->iters == NUM_ITERS)
return io_close(conn, NULL);
......@@ -59,14 +46,6 @@ static struct io_plan poke_reader(struct io_conn *conn, struct buffer *buf)
return io_idle();
}
static struct io_plan reader(struct io_conn *conn, struct buffer *buf)
{
assert(conn == buf->reader);
/* Wait for writer to tell us to read. */
return io_idle();
}
static struct buffer buf[NUM];
int main(void)
......@@ -86,10 +65,15 @@ int main(void)
memset(buf[i].buf, i, sizeof(buf[i].buf));
sprintf(buf[i].buf, "%i-%i", i, i);
buf[i].reader = io_new_conn(last_read, reader, NULL, &buf[i]);
/* Wait for writer to tell us to read. */
buf[i].reader = io_new_conn(last_read, io_idle(), NULL, &buf[i]);
if (!buf[i].reader)
break;
buf[i].writer = io_new_conn(fds[1], plan_write, NULL, &buf[i]);
buf[i].writer = io_new_conn(fds[1],
io_write(&buf[i].buf,
sizeof(buf[i].buf),
poke_reader, &buf[i]),
NULL, &buf[i]);
if (!buf[i].writer)
break;
last_read = fds[0];
......@@ -100,9 +84,12 @@ int main(void)
/* Last one completes the cirle. */
i = 0;
sprintf(buf[i].buf, "%i-%i", i, i);
buf[i].reader = io_new_conn(last_read, reader, NULL, &buf[i]);
buf[i].reader = io_new_conn(last_read, io_idle(), NULL, NULL);
ok1(buf[i].reader);
buf[i].writer = io_new_conn(last_write, plan_write, NULL, &buf[i]);
buf[i].writer = io_new_conn(last_write,
io_write(&buf[i].buf, sizeof(buf[i].buf),
poke_reader, &buf[i]),
NULL, NULL);
ok1(buf[i].writer);
/* They should eventually exit */
......
......@@ -18,28 +18,27 @@ static void finish_ok(struct io_conn *conn, struct data *d)
d->state++;
}
static struct io_plan write_out(struct io_conn *conn, struct data *d)
static struct io_plan write_done(struct io_conn *conn, struct data *d)
{
d->state++;
return io_write(d->wbuf, sizeof(d->wbuf), io_close, d);
return io_close(conn, NULL);
}
static struct io_plan start_ok(struct io_conn *conn, struct data *d)
static void init_conn(int fd, struct data *d)
{
struct io_conn *conn;
ok1(d->state == 0);
d->state++;
io_close_listener(d->l);
memset(d->wbuf, 7, sizeof(d->wbuf));
ok1(io_duplex(conn, write_out, finish_ok, d));
return io_read(d->buf, sizeof(d->buf), io_close, d);
}
static void init_conn(int fd, struct data *d)
{
if (!io_new_conn(fd, start_ok, finish_ok, d))
abort();
conn = io_new_conn(fd, io_read(d->buf, sizeof(d->buf), io_close, d),
finish_ok, d);
ok1(io_duplex(conn, io_write(d->wbuf, sizeof(d->wbuf), write_done, d),
finish_ok, d));
}
static int make_listen_fd(const char *port, struct addrinfo **info)
......
......@@ -7,11 +7,6 @@
#include <stdio.h>
#include <signal.h>
static struct io_plan start(struct io_conn *conn, void *unused)
{
return io_idle();
}
int main(void)
{
int status;
......@@ -22,7 +17,7 @@ int main(void)
int fds[2];
ok1(pipe(fds) == 0);
io_new_conn(fds[0], start, NULL, NULL);
io_new_conn(fds[0], io_idle(), NULL, NULL);
io_loop();
exit(1);
}
......
......@@ -30,25 +30,23 @@ static struct io_plan timeout(struct io_conn *conn, struct data *d)
return io_close(conn, d);
}
static struct io_plan start_ok(struct io_conn *conn, struct data *d)
{
ok1(d->state == 0);
d->state++;
io_timeout(conn, time_from_usec(d->timeout_usec), timeout, d);
return io_read(d->buf, sizeof(d->buf), no_timeout, d);
}
static void finish_ok(struct io_conn *conn, struct data *d)
{
ok1(d->state == 2);
d->state++;
io_break(d, NULL, NULL);
io_break(d, io_idle());
}
static void init_conn(int fd, struct data *d)
{
if (!io_new_conn(fd, start_ok, finish_ok, d))
abort();
struct io_conn *conn;
ok1(d->state == 0);
d->state++;
conn = io_new_conn(fd, io_read(d->buf, sizeof(d->buf), no_timeout, d),
finish_ok, d);
io_timeout(conn, time_from_usec(d->timeout_usec), timeout, d);
}
static int make_listen_fd(const char *port, struct addrinfo **info)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment