Commit bfb80c56 authored by Rusty Russell's avatar Rusty Russell

ccan/io: use explicit IO callback functions, instead of io_state values.

Explicit callbacks are slower, but more flexible.
Signed-off-by: default avatarRusty Russell <rusty@rustcorp.com.au>
parent 174fe59f
...@@ -21,15 +21,14 @@ struct io_listener { ...@@ -21,15 +21,14 @@ struct io_listener {
void *conn_arg; void *conn_arg;
}; };
enum io_state { enum io_result {
/* These wait for something to input */ RESULT_AGAIN,
READ, RESULT_FINISHED,
READPART, RESULT_CLOSE
};
/* These wait for room to output */
WRITE,
WRITEPART,
enum io_state {
IO,
NEXT, /* eg starting, woken from idle, return from io_break. */ NEXT, /* eg starting, woken from idle, return from io_break. */
IDLE, IDLE,
FINISHED, FINISHED,
...@@ -82,6 +81,8 @@ struct io_conn { ...@@ -82,6 +81,8 @@ struct io_conn {
struct io_conn *duplex; struct io_conn *duplex;
struct io_timeout *timeout; struct io_timeout *timeout;
enum io_result (*io)(struct io_conn *conn);
int pollflag; /* 0, POLLIN or POLLOUT */ int pollflag; /* 0, POLLIN or POLLOUT */
enum io_state state; enum io_state state;
union { union {
......
...@@ -119,6 +119,20 @@ bool io_timeout_(struct io_conn *conn, struct timespec ts, ...@@ -119,6 +119,20 @@ bool io_timeout_(struct io_conn *conn, struct timespec ts,
return true; return true;
} }
static enum io_result do_write(struct io_conn *conn)
{
ssize_t ret = write(conn->fd.fd, conn->u.write.buf, conn->u.write.len);
if (ret < 0)
return RESULT_CLOSE;
conn->u.write.buf += ret;
conn->u.write.len -= ret;
if (conn->u.write.len == 0)
return RESULT_FINISHED;
else
return RESULT_AGAIN;
}
/* Queue some data to be written. */ /* Queue some data to be written. */
struct io_plan *io_write_(struct io_conn *conn, const void *data, size_t len, struct io_plan *io_write_(struct io_conn *conn, const void *data, size_t len,
struct io_plan *(*cb)(struct io_conn *, void *), struct io_plan *(*cb)(struct io_conn *, void *),
...@@ -126,10 +140,24 @@ struct io_plan *io_write_(struct io_conn *conn, const void *data, size_t len, ...@@ -126,10 +140,24 @@ struct io_plan *io_write_(struct io_conn *conn, const void *data, size_t len,
{ {
conn->u.write.buf = data; conn->u.write.buf = data;
conn->u.write.len = len; conn->u.write.len = len;
conn->io = do_write;
conn->next = cb; conn->next = cb;
conn->next_arg = arg; conn->next_arg = arg;
conn->pollflag = POLLOUT; conn->pollflag = POLLOUT;
return to_ioplan(WRITE); return to_ioplan(IO);
}
static enum io_result do_read(struct io_conn *conn)
{
ssize_t ret = read(conn->fd.fd, conn->u.read.buf, conn->u.read.len);
if (ret <= 0)
return RESULT_CLOSE;
conn->u.read.buf += ret;
conn->u.read.len -= ret;
if (conn->u.read.len == 0)
return RESULT_FINISHED;
else
return RESULT_AGAIN;
} }
/* Queue a request to read into a buffer. */ /* Queue a request to read into a buffer. */
...@@ -139,10 +167,21 @@ struct io_plan *io_read_(struct io_conn *conn, void *data, size_t len, ...@@ -139,10 +167,21 @@ struct io_plan *io_read_(struct io_conn *conn, void *data, size_t len,
{ {
conn->u.read.buf = data; conn->u.read.buf = data;
conn->u.read.len = len; conn->u.read.len = len;
conn->io = do_read;
conn->next = cb; conn->next = cb;
conn->next_arg = arg; conn->next_arg = arg;
conn->pollflag = POLLIN; conn->pollflag = POLLIN;
return to_ioplan(READ); return to_ioplan(IO);
}
static enum io_result do_read_partial(struct io_conn *conn)
{
ssize_t ret = read(conn->fd.fd, conn->u.readpart.buf,
*conn->u.readpart.lenp);
if (ret <= 0)
return RESULT_CLOSE;
*conn->u.readpart.lenp = ret;
return RESULT_FINISHED;
} }
/* Queue a partial request to read into a buffer. */ /* Queue a partial request to read into a buffer. */
...@@ -152,10 +191,21 @@ struct io_plan *io_read_partial_(struct io_conn *conn, void *data, size_t *len, ...@@ -152,10 +191,21 @@ struct io_plan *io_read_partial_(struct io_conn *conn, void *data, size_t *len,
{ {
conn->u.readpart.buf = data; conn->u.readpart.buf = data;
conn->u.readpart.lenp = len; conn->u.readpart.lenp = len;
conn->io = do_read_partial;
conn->next = cb; conn->next = cb;
conn->next_arg = arg; conn->next_arg = arg;
conn->pollflag = POLLIN; conn->pollflag = POLLIN;
return to_ioplan(READPART); return to_ioplan(IO);
}
static enum io_result do_write_partial(struct io_conn *conn)
{
ssize_t ret = write(conn->fd.fd, conn->u.writepart.buf,
*conn->u.writepart.lenp);
if (ret < 0)
return RESULT_CLOSE;
*conn->u.writepart.lenp = ret;
return RESULT_FINISHED;
} }
/* Queue a partial write request. */ /* Queue a partial write request. */
...@@ -166,10 +216,11 @@ struct io_plan *io_write_partial_(struct io_conn *conn, ...@@ -166,10 +216,11 @@ struct io_plan *io_write_partial_(struct io_conn *conn,
{ {
conn->u.writepart.buf = data; conn->u.writepart.buf = data;
conn->u.writepart.lenp = len; conn->u.writepart.lenp = len;
conn->io = do_write_partial;
conn->next = cb; conn->next = cb;
conn->next_arg = arg; conn->next_arg = arg;
conn->pollflag = POLLOUT; conn->pollflag = POLLOUT;
return to_ioplan(WRITEPART); return to_ioplan(IO);
} }
struct io_plan *io_idle(struct io_conn *conn) struct io_plan *io_idle(struct io_conn *conn)
...@@ -200,50 +251,17 @@ static struct io_plan *do_next(struct io_conn *conn) ...@@ -200,50 +251,17 @@ static struct io_plan *do_next(struct io_conn *conn)
struct io_plan *do_ready(struct io_conn *conn) struct io_plan *do_ready(struct io_conn *conn)
{ {
ssize_t ret; assert(conn->state == IO);
bool finished; switch (conn->io(conn)) {
case RESULT_CLOSE:
switch (conn->state) { return io_close(conn, NULL);
case WRITE: case RESULT_FINISHED:
ret = write(conn->fd.fd, conn->u.write.buf, conn->u.write.len); return do_next(conn);
if (ret < 0) case RESULT_AGAIN:
return io_close(conn, NULL); return to_ioplan(conn->state);
conn->u.write.buf += ret;
conn->u.write.len -= ret;
finished = (conn->u.write.len == 0);
break;
case WRITEPART:
ret = write(conn->fd.fd, conn->u.writepart.buf,
*conn->u.writepart.lenp);
if (ret < 0)
return io_close(conn, NULL);
*conn->u.writepart.lenp = ret;
finished = true;
break;
case READ:
ret = read(conn->fd.fd, conn->u.read.buf, conn->u.read.len);
if (ret <= 0)
return io_close(conn, NULL);
conn->u.read.buf += ret;
conn->u.read.len -= ret;
finished = (conn->u.read.len == 0);
break;
case READPART:
ret = read(conn->fd.fd, conn->u.readpart.buf,
*conn->u.readpart.lenp);
if (ret <= 0)
return io_close(conn, NULL);
*conn->u.readpart.lenp = ret;
finished = true;
break;
default: default:
/* Shouldn't happen. */
abort(); abort();
} }
if (finished)
return do_next(conn);
return to_ioplan(conn->state);
} }
/* Useful next functions. */ /* Useful next functions. */
......
...@@ -16,7 +16,7 @@ struct data { ...@@ -16,7 +16,7 @@ struct data {
char buf[4]; char buf[4];
}; };
static struct io_plan *do_read(struct io_conn *conn, struct data *d) static struct io_plan *plan_read(struct io_conn *conn, struct data *d)
{ {
ok1(d->state == 2 || d->state == 3); ok1(d->state == 2 || d->state == 3);
d->state++; d->state++;
...@@ -28,7 +28,7 @@ static struct io_plan *start_waker(struct io_conn *conn, struct data *d) ...@@ -28,7 +28,7 @@ static struct io_plan *start_waker(struct io_conn *conn, struct data *d)
ok1(d->state == 1); ok1(d->state == 1);
d->state++; d->state++;
io_wake(idler, do_read, d); io_wake(idler, plan_read, d);
return io_close(conn, NULL); return io_close(conn, NULL);
} }
......
...@@ -11,7 +11,7 @@ struct data { ...@@ -11,7 +11,7 @@ struct data {
char buf[4]; char buf[4];
}; };
static struct io_plan *do_read(struct io_conn *conn, struct data *d) static struct io_plan *plan_read(struct io_conn *conn, struct data *d)
{ {
ok1(d->state == 1); ok1(d->state == 1);
d->state++; d->state++;
...@@ -22,7 +22,7 @@ static struct io_plan *start_break(struct io_conn *conn, struct data *d) ...@@ -22,7 +22,7 @@ static struct io_plan *start_break(struct io_conn *conn, struct data *d)
{ {
ok1(d->state == 0); ok1(d->state == 0);
d->state++; d->state++;
return io_break(conn, d, do_read, d); return io_break(conn, d, plan_read, d);
} }
static void finish_ok(struct io_conn *conn, struct data *d) static void finish_ok(struct io_conn *conn, struct data *d)
......
...@@ -18,7 +18,7 @@ struct buffer { ...@@ -18,7 +18,7 @@ struct buffer {
static struct io_plan *poke_writer(struct io_conn *conn, struct buffer *buf); static struct io_plan *poke_writer(struct io_conn *conn, struct buffer *buf);
static struct io_plan *poke_reader(struct io_conn *conn, struct buffer *buf); static struct io_plan *poke_reader(struct io_conn *conn, struct buffer *buf);
static struct io_plan *do_read(struct io_conn *conn, struct buffer *buf) static struct io_plan *plan_read(struct io_conn *conn, struct buffer *buf)
{ {
assert(conn == buf->reader); assert(conn == buf->reader);
...@@ -26,7 +26,7 @@ static struct io_plan *do_read(struct io_conn *conn, struct buffer *buf) ...@@ -26,7 +26,7 @@ static struct io_plan *do_read(struct io_conn *conn, struct buffer *buf)
poke_writer, buf); poke_writer, buf);
} }
static struct io_plan *do_write(struct io_conn *conn, struct buffer *buf) static struct io_plan *plan_write(struct io_conn *conn, struct buffer *buf)
{ {
assert(conn == buf->writer); assert(conn == buf->writer);
...@@ -42,7 +42,7 @@ static struct io_plan *poke_writer(struct io_conn *conn, struct buffer *buf) ...@@ -42,7 +42,7 @@ static struct io_plan *poke_writer(struct io_conn *conn, struct buffer *buf)
return io_close(conn, NULL); return io_close(conn, NULL);
/* You write. */ /* You write. */
io_wake(buf->writer, do_write, buf); io_wake(buf->writer, plan_write, buf);
/* I'll wait until you wake me. */ /* I'll wait until you wake me. */
return io_idle(conn); return io_idle(conn);
...@@ -52,7 +52,7 @@ static struct io_plan *poke_reader(struct io_conn *conn, struct buffer *buf) ...@@ -52,7 +52,7 @@ static struct io_plan *poke_reader(struct io_conn *conn, struct buffer *buf)
{ {
assert(conn == buf->writer); assert(conn == buf->writer);
/* You read. */ /* You read. */
io_wake(buf->reader, do_read, buf); io_wake(buf->reader, plan_read, buf);
if (++buf->iters == NUM_ITERS) if (++buf->iters == NUM_ITERS)
return io_close(conn, NULL); return io_close(conn, NULL);
...@@ -91,7 +91,7 @@ int main(void) ...@@ -91,7 +91,7 @@ int main(void)
buf[i].reader = io_new_conn(last_read, reader, NULL, &buf[i]); buf[i].reader = io_new_conn(last_read, reader, NULL, &buf[i]);
if (!buf[i].reader) if (!buf[i].reader)
break; break;
buf[i].writer = io_new_conn(fds[1], do_write, NULL, &buf[i]); buf[i].writer = io_new_conn(fds[1], plan_write, NULL, &buf[i]);
if (!buf[i].writer) if (!buf[i].writer)
break; break;
last_read = fds[0]; last_read = fds[0];
...@@ -104,7 +104,7 @@ int main(void) ...@@ -104,7 +104,7 @@ int main(void)
sprintf(buf[i].buf, "%i-%i", i, i); sprintf(buf[i].buf, "%i-%i", i, i);
buf[i].reader = io_new_conn(last_read, reader, NULL, &buf[i]); buf[i].reader = io_new_conn(last_read, reader, NULL, &buf[i]);
ok1(buf[i].reader); ok1(buf[i].reader);
buf[i].writer = io_new_conn(last_write, do_write, NULL, &buf[i]); buf[i].writer = io_new_conn(last_write, plan_write, NULL, &buf[i]);
ok1(buf[i].writer); ok1(buf[i].writer);
/* They should eventually exit */ /* They should eventually exit */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment