Commit e92e2f65 authored by Rusty Russell's avatar Rusty Russell

ccan/io: duplex support.

This is actually pretty simple now.
Signed-off-by: default avatarRusty Russell <rusty@rustcorp.com.au>
parent 6109a0a6
...@@ -56,9 +56,10 @@ static void next_plan(struct io_conn *conn, struct io_plan *plan) ...@@ -56,9 +56,10 @@ static void next_plan(struct io_conn *conn, struct io_plan *plan)
plan = next(conn, plan->next_arg); plan = next(conn, plan->next_arg);
/* It should have set a plan inside this conn. */ /* It should have set a plan inside this conn (or duplex) */
assert(plan == &conn->plan[IO_IN] assert(plan == &conn->plan[IO_IN]
|| plan == &conn->plan[IO_OUT]); || plan == &conn->plan[IO_OUT]
|| plan == &conn->plan[2]);
assert(conn->plan[IO_IN].status != IO_UNSET assert(conn->plan[IO_IN].status != IO_UNSET
|| conn->plan[IO_OUT].status != IO_UNSET); || conn->plan[IO_OUT].status != IO_UNSET);
...@@ -428,7 +429,7 @@ void io_break(const void *ret) ...@@ -428,7 +429,7 @@ void io_break(const void *ret)
io_loop_return = (void *)ret; io_loop_return = (void *)ret;
} }
struct io_plan *io_never(struct io_conn *conn) struct io_plan *io_never(struct io_conn *conn, void *unused)
{ {
return io_always(conn, io_never_called, NULL); return io_always(conn, io_never_called, NULL);
} }
...@@ -437,3 +438,11 @@ int io_conn_fd(const struct io_conn *conn) ...@@ -437,3 +438,11 @@ int io_conn_fd(const struct io_conn *conn)
{ {
return conn->fd.fd; return conn->fd.fd;
} }
struct io_plan *io_duplex(struct io_plan *in_plan, struct io_plan *out_plan)
{
/* in_plan must be conn->plan[IO_IN], out_plan must be [IO_OUT] */
assert(out_plan == in_plan + 1);
return out_plan + 1;
}
...@@ -403,6 +403,38 @@ struct io_plan *io_connect_(struct io_conn *conn, const struct addrinfo *addr, ...@@ -403,6 +403,38 @@ struct io_plan *io_connect_(struct io_conn *conn, const struct addrinfo *addr,
struct io_plan *(*next)(struct io_conn *, void *), struct io_plan *(*next)(struct io_conn *, void *),
void *arg); void *arg);
/**
* io_duplex - set plans for both input and output.
* @conn: the connection that plan is for.
* @in: the input plan
* @out: the output plan
*
* Most plans are either for input or output; io_duplex creates a plan
* which does both. This is often used in the init function to create
* two independent streams, though it can be used once on any connection.
*
* Note that if either plan closes the connection, it will be closed.
*
* Note that if one plan is io_wait or io_always, that causes a problem:
* they look at the input and output plan slots to figure out which to
* use, but if the other plan hasn't been evaluated yet, that will fail.
* In this case, you'll need to ensure the other plan is evaluated first,
* eg. "struct io_plan *r = io_read(...); return io_duplex(r, io_always(...))"
*
* Example:
* struct buf {
* char in[100];
* char out[100];
* };
*
* static struct io_plan *read_and_write(struct io_conn *conn, struct buf *b)
* {
* return io_duplex(io_read(conn, b->in, sizeof(b->in), io_close_cb, b),
* io_write(conn, b->out, sizeof(b->out), io_close_cb, b));
* }
*/
struct io_plan *io_duplex(struct io_plan *in_plan, struct io_plan *out_plan);
/** /**
* io_wait - leave a plan idle until something wakes us. * io_wait - leave a plan idle until something wakes us.
* @conn: the connection that plan is for. * @conn: the connection that plan is for.
...@@ -471,6 +503,7 @@ void io_break(const void *ret); ...@@ -471,6 +503,7 @@ void io_break(const void *ret);
/** /**
* io_never - assert if callback is called. * io_never - assert if callback is called.
* @conn: the connection that plan is for. * @conn: the connection that plan is for.
* @unused: an unused parameter to make this suitable for use as a callback.
* *
* Sometimes you want to make it clear that a callback should never happen * Sometimes you want to make it clear that a callback should never happen
* (eg. for io_break). This will assert() if called. * (eg. for io_break). This will assert() if called.
...@@ -480,10 +513,10 @@ void io_break(const void *ret); ...@@ -480,10 +513,10 @@ void io_break(const void *ret);
* { * {
* io_break(conn); * io_break(conn);
* // We won't ever return from io_break * // We won't ever return from io_break
* return io_never(conn); * return io_never(conn, NULL);
* } * }
*/ */
struct io_plan *io_never(struct io_conn *conn); struct io_plan *io_never(struct io_conn *conn, void *unused);
/* FIXME: io_recvfrom/io_sendto */ /* FIXME: io_recvfrom/io_sendto */
......
...@@ -6,7 +6,6 @@ ...@@ -6,7 +6,6 @@
#include <sys/wait.h> #include <sys/wait.h>
#include <stdio.h> #include <stdio.h>
#if 0
#ifndef PORT #ifndef PORT
#define PORT "65012" #define PORT "65012"
#endif #endif
...@@ -14,6 +13,7 @@ ...@@ -14,6 +13,7 @@
struct data { struct data {
struct io_listener *l; struct io_listener *l;
int state; int state;
int done;
char buf[4]; char buf[4];
char wbuf[32]; char wbuf[32];
}; };
...@@ -23,28 +23,27 @@ static void finish_ok(struct io_conn *conn, struct data *d) ...@@ -23,28 +23,27 @@ static void finish_ok(struct io_conn *conn, struct data *d)
d->state++; d->state++;
} }
static struct io_plan *write_done(struct io_conn *conn, struct data *d) static struct io_plan *rw_done(struct io_conn *conn, struct data *d)
{ {
d->state++; d->state++;
return io_close(conn); d->done++;
if (d->done == 2)
return io_close(conn);
return io_wait(conn, NULL, io_never, NULL);
} }
static void init_conn(int fd, struct data *d) static struct io_plan *init_conn(struct io_conn *conn, struct data *d)
{ {
struct io_conn *conn;
ok1(d->state == 0); ok1(d->state == 0);
d->state++; d->state++;
io_close_listener(d->l); io_close_listener(d->l);
memset(d->wbuf, 7, sizeof(d->wbuf)); memset(d->wbuf, 7, sizeof(d->wbuf));
conn = io_new_conn(fd, io_read(d->buf, sizeof(d->buf), io_close_cb, d));
io_set_finish(conn, finish_ok, d);
conn = io_duplex(conn, io_write(d->wbuf, sizeof(d->wbuf), write_done, d));
ok1(conn);
io_set_finish(conn, finish_ok, d); io_set_finish(conn, finish_ok, d);
return io_duplex(io_read(conn, d->buf, sizeof(d->buf), rw_done, d),
io_write(conn, d->wbuf, sizeof(d->wbuf), rw_done, d));
} }
static int make_listen_fd(const char *port, struct addrinfo **info) static int make_listen_fd(const char *port, struct addrinfo **info)
...@@ -88,9 +87,10 @@ int main(void) ...@@ -88,9 +87,10 @@ int main(void)
/* This is how many tests you plan to run */ /* This is how many tests you plan to run */
plan_tests(10); plan_tests(10);
d->state = 0; d->state = 0;
d->done = 0;
fd = make_listen_fd(PORT, &addrinfo); fd = make_listen_fd(PORT, &addrinfo);
ok1(fd >= 0); ok1(fd >= 0);
d->l = io_new_listener(fd, init_conn, d); d->l = io_new_listener(NULL, fd, init_conn, d);
ok1(d->l); ok1(d->l);
fflush(stdout); fflush(stdout);
if (!fork()) { if (!fork()) {
...@@ -121,6 +121,7 @@ int main(void) ...@@ -121,6 +121,7 @@ int main(void)
freeaddrinfo(addrinfo); freeaddrinfo(addrinfo);
ok1(io_loop() == NULL); ok1(io_loop() == NULL);
ok1(d->state == 4); ok1(d->state == 4);
ok1(d->done == 2);
ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0); ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0);
free(d); free(d);
...@@ -131,9 +132,3 @@ int main(void) ...@@ -131,9 +132,3 @@ int main(void)
/* This exits depending on whether all tests passed */ /* This exits depending on whether all tests passed */
return exit_status(); return exit_status();
} }
#else
int main(void)
{
return 0;
}
#endif
...@@ -8,7 +8,6 @@ ...@@ -8,7 +8,6 @@
#include <sys/wait.h> #include <sys/wait.h>
#include <stdio.h> #include <stdio.h>
#if 0
#ifndef PORT #ifndef PORT
#define PORT "65014" #define PORT "65014"
#endif #endif
...@@ -16,7 +15,6 @@ ...@@ -16,7 +15,6 @@
struct data { struct data {
struct io_listener *l; struct io_listener *l;
int state; int state;
struct io_conn *c1, *c2;
char buf[4]; char buf[4];
char wbuf[32]; char wbuf[32];
}; };
...@@ -26,23 +24,24 @@ static void finish_ok(struct io_conn *conn, struct data *d) ...@@ -26,23 +24,24 @@ static void finish_ok(struct io_conn *conn, struct data *d)
d->state++; d->state++;
} }
static struct io_plan end(struct io_conn *conn, struct data *d) static struct io_plan *end(struct io_conn *conn, struct data *d)
{ {
d->state++; d->state++;
return io_close(); if (d->state == 4)
return io_close(conn);
else
return io_wait(conn, NULL, io_never, NULL);
} }
static struct io_plan make_duplex(struct io_conn *conn, struct data *d) static struct io_plan *make_duplex(struct io_conn *conn, struct data *d)
{ {
d->state++;
/* Have duplex read the rest of the buffer. */ /* Have duplex read the rest of the buffer. */
d->c2 = io_duplex(conn, io_read(d->buf+1, sizeof(d->buf)-1, end, d)); return io_duplex(io_read(conn, d->buf+1, sizeof(d->buf)-1, end, d),
ok1(d->c2); io_write(conn, d->wbuf, sizeof(d->wbuf), end, d));
io_set_finish(d->c2, finish_ok, d);
return io_write(d->wbuf, sizeof(d->wbuf), end, d);
} }
static void init_conn(int fd, struct data *d) static struct io_plan *init_conn(struct io_conn *conn, struct data *d)
{ {
ok1(d->state == 0); ok1(d->state == 0);
d->state++; d->state++;
...@@ -50,9 +49,8 @@ static void init_conn(int fd, struct data *d) ...@@ -50,9 +49,8 @@ static void init_conn(int fd, struct data *d)
io_close_listener(d->l); io_close_listener(d->l);
memset(d->wbuf, 7, sizeof(d->wbuf)); memset(d->wbuf, 7, sizeof(d->wbuf));
io_set_finish(conn, finish_ok, d);
d->c1 = io_new_conn(fd, io_read(d->buf, 1, make_duplex, d)); return io_read(conn, d->buf, 1, make_duplex, d);
io_set_finish(d->c1, finish_ok, d);
} }
static int make_listen_fd(const char *port, struct addrinfo **info) static int make_listen_fd(const char *port, struct addrinfo **info)
...@@ -94,11 +92,11 @@ int main(void) ...@@ -94,11 +92,11 @@ int main(void)
int fd, status; int fd, status;
/* This is how many tests you plan to run */ /* This is how many tests you plan to run */
plan_tests(10); plan_tests(9);
d->state = 0; d->state = 0;
fd = make_listen_fd(PORT, &addrinfo); fd = make_listen_fd(PORT, &addrinfo);
ok1(fd >= 0); ok1(fd >= 0);
d->l = io_new_listener(fd, init_conn, d); d->l = io_new_listener(NULL, fd, init_conn, d);
ok1(d->l); ok1(d->l);
fflush(stdout); fflush(stdout);
if (!fork()) { if (!fork()) {
...@@ -139,9 +137,3 @@ int main(void) ...@@ -139,9 +137,3 @@ int main(void)
/* This exits depending on whether all tests passed */ /* This exits depending on whether all tests passed */
return exit_status(); return exit_status();
} }
#else
int main(void)
{
return 0;
}
#endif
...@@ -8,14 +8,12 @@ ...@@ -8,14 +8,12 @@
#include <sys/wait.h> #include <sys/wait.h>
#include <stdio.h> #include <stdio.h>
#if 0
#ifndef PORT #ifndef PORT
#define PORT "65016" #define PORT "65016"
#endif #endif
struct data { struct data {
struct io_listener *l; struct io_listener *l;
struct io_conn *writer;
int state; int state;
char buf[4]; char buf[4];
char wbuf[32]; char wbuf[32];
...@@ -26,35 +24,27 @@ static void finish_ok(struct io_conn *conn, struct data *d) ...@@ -26,35 +24,27 @@ static void finish_ok(struct io_conn *conn, struct data *d)
d->state++; d->state++;
} }
static struct io_plan write_done(struct io_conn *conn, struct data *d) static struct io_plan *io_done(struct io_conn *conn, struct data *d)
{ {
d->state++; d->state++;
return io_wait(d, io_close_cb, NULL); if (d->state == 3)
return io_close(conn);
return io_wait(conn, d, io_close_cb, NULL);
} }
static struct io_plan read_done(struct io_conn *conn, struct data *d) static struct io_plan *init_conn(struct io_conn *conn, struct data *d)
{ {
d->state++;
io_close_other(d->writer);
return io_close();
}
static void init_conn(int fd, struct data *d)
{
struct io_conn *conn;
ok1(d->state == 0); ok1(d->state == 0);
d->state++; d->state++;
memset(d->wbuf, 7, sizeof(d->wbuf)); memset(d->wbuf, 7, sizeof(d->wbuf));
conn = io_new_conn(fd, io_read(d->buf, sizeof(d->buf), read_done, d));
io_set_finish(conn, finish_ok, d); io_set_finish(conn, finish_ok, d);
d->writer = io_duplex(conn, io_write(d->wbuf, sizeof(d->wbuf), write_done, d));
ok1(d->writer);
io_set_finish(d->writer, finish_ok, d);
io_close_listener(d->l); io_close_listener(d->l);
return io_duplex(io_read(conn, d->buf, sizeof(d->buf), io_done, d),
io_write(conn, d->wbuf, sizeof(d->wbuf), io_done, d));
} }
static int make_listen_fd(const char *port, struct addrinfo **info) static int make_listen_fd(const char *port, struct addrinfo **info)
...@@ -96,11 +86,11 @@ int main(void) ...@@ -96,11 +86,11 @@ int main(void)
int fd, status; int fd, status;
/* This is how many tests you plan to run */ /* This is how many tests you plan to run */
plan_tests(10); plan_tests(9);
d->state = 0; d->state = 0;
fd = make_listen_fd(PORT, &addrinfo); fd = make_listen_fd(PORT, &addrinfo);
ok1(fd >= 0); ok1(fd >= 0);
d->l = io_new_listener(fd, init_conn, d); d->l = io_new_listener(NULL, fd, init_conn, d);
ok1(d->l); ok1(d->l);
fflush(stdout); fflush(stdout);
if (!fork()) { if (!fork()) {
...@@ -130,7 +120,7 @@ int main(void) ...@@ -130,7 +120,7 @@ int main(void)
} }
freeaddrinfo(addrinfo); freeaddrinfo(addrinfo);
ok1(io_loop() == NULL); ok1(io_loop() == NULL);
ok1(d->state == 5); ok1(d->state == 4);
ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0); ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0);
free(d); free(d);
...@@ -141,9 +131,3 @@ int main(void) ...@@ -141,9 +131,3 @@ int main(void)
/* This exits depending on whether all tests passed */ /* This exits depending on whether all tests passed */
return exit_status(); return exit_status();
} }
#else
int main(void)
{
return 0;
}
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment