Commit cdffdf5d authored by Rusty Russell's avatar Rusty Russell

ccan/io: rewrite.

I found it difficult to use myself, particularly io_duplex().

So this removes that, as well as timers and debug (for the moment).

API changes:
1) An io_plan is passed by pointer, rather than copied on the stack.
3) All io_plans are generated using the struct io_conn.
3) tal is the allocator.
4) A new connection must be set up with a callback, so this is now the
   same as one generated from a listener.
5) io_read_partial and io_write_partial take an explicit length.
6) io_always() and io_wait() take an explit in/out arg.
7) io_break() does not return a plan.
Signed-off-by: default avatarRusty Russell <rusty@rustcorp.com.au>
parent 12e92434
......@@ -10,12 +10,6 @@
* (eg. read, write). It is also possible to write custom I/O
* plans.
*
* When compiled with DEBUG, control flow is changed so that rather
* than returning to the main io_loop(), plans are executed sequentially
* providing a backtrace showing what has occurred on that connection.
* Which connection(s) do this depends on the user-specified io_debug
* function.
*
* Example:
* // Given tr A-Z a-z outputs tr a-z a-z
* #include <ccan/io/io.h>
......@@ -41,7 +35,7 @@
* io_wake(b);
* }
*
* static struct io_plan read_in(struct io_conn *c, struct buffer *b)
* static struct io_plan *read_in(struct io_conn *c, struct buffer *b)
* {
* // Add what we just read.
* b->end += b->rlen;
......@@ -55,35 +49,33 @@
* if (b->start == b->end)
* b->start = b->end = 0;
*
* // Read in some of the rest.
* b->rlen = sizeof(b->buf) - b->end;
*
* // No room? Wait for writer
* if (b->rlen == 0)
* return io_wait(b, read_in, b);
* if (b->end == sizeof(b->buf))
* return io_wait(c, b, IO_IN, read_in, b);
*
* return io_read_partial(b->buf + b->end, &b->rlen, read_in, b);
* return io_read_partial(c, b->buf + b->end, sizeof(b->buf) - b->end,
* &b->rlen, read_in, b);
* }
*
* static struct io_plan write_out(struct io_conn *c, struct buffer *b)
* static struct io_plan *write_out(struct io_conn *c, struct buffer *b)
* {
* // Remove what we just wrote.
* b->start += b->wlen;
* assert(b->start <= sizeof(b->buf));
*
* // If we wrote somthing, wake writer.
* // If we wrote something, wake writer.
* if (b->wlen != 0)
* io_wake(b);
*
* b->wlen = b->end - b->start;
* // Nothing to write? Wait for reader.
* if (b->wlen == 0) {
* if (b->end == b->start) {
* if (b->finished)
* return io_close();
* return io_wait(b, write_out, b);
* return io_close(c);
* return io_wait(c, b, IO_OUT, write_out, b);
* }
*
* return io_write_partial(b->buf + b->start, &b->wlen, write_out, b);
* return io_write_partial(c, b->buf + b->start, b->end - b->start,
* &b->wlen, write_out, b);
* }
*
* // Feed a program our stdin, gather its stdout, print that at end.
......@@ -117,14 +109,14 @@
*
* // Read from stdin, write to child.
* memset(&to, 0, sizeof(to));
* reader = io_new_conn(STDIN_FILENO, read_in(NULL, &to));
* reader = io_new_conn(NULL, STDIN_FILENO, read_in, &to);
* io_set_finish(reader, finish, &to);
* io_new_conn(tochild[1], write_out(NULL, &to));
* io_new_conn(NULL, tochild[1], write_out, &to);
*
* // Read from child, write to stdout.
* reader = io_new_conn(fromchild[0], read_in(NULL, &from));
* reader = io_new_conn(NULL, fromchild[0], read_in, &from);
* io_set_finish(reader, finish, &from);
* io_new_conn(STDOUT_FILENO, write_out(NULL, &from));
* io_new_conn(NULL, STDOUT_FILENO, write_out, &from);
*
* io_loop();
* wait(&status);
......@@ -141,9 +133,8 @@ int main(int argc, char *argv[])
return 1;
if (strcmp(argv[1], "depends") == 0) {
printf("ccan/tal\n");
printf("ccan/typesafe_cb\n");
printf("ccan/time\n");
printf("ccan/timer\n");
return 0;
}
......
......@@ -2,18 +2,8 @@
#ifndef CCAN_IO_BACKEND_H
#define CCAN_IO_BACKEND_H
#include <stdbool.h>
#include <ccan/timer/timer.h>
#include <poll.h>
/* A setting for actions to always run (eg. zero-length reads). */
#define POLLALWAYS (((POLLIN|POLLOUT) + 1) & ~((POLLIN|POLLOUT)))
struct io_alloc {
void *(*alloc)(size_t size);
void *(*realloc)(void *ptr, size_t size);
void (*free)(void *ptr);
};
extern struct io_alloc io_alloc;
#include "io_plan.h"
struct fd {
int fd;
......@@ -25,77 +15,43 @@ struct fd {
struct io_listener {
struct fd fd;
const tal_t *ctx;
/* These are for connections we create. */
void (*init)(int fd, void *arg);
struct io_plan *(*init)(struct io_conn *conn, void *arg);
void *arg;
};
struct io_timeout {
struct timer timer;
struct io_conn *conn;
struct io_plan (*next)(struct io_conn *, void *arg);
void *next_arg;
};
/* One connection per client. */
struct io_conn {
struct fd fd;
/* always or closing list. */
struct io_conn *list;
void (*finish)(struct io_conn *, void *arg);
void *finish_arg;
struct io_conn *duplex;
struct io_timeout *timeout;
struct io_plan plan;
struct io_plan plan[2];
};
static inline bool timeout_active(const struct io_conn *conn)
{
return conn->timeout && conn->timeout->conn;
}
extern void *io_loop_return;
#ifdef DEBUG
extern struct io_conn *current;
static inline void set_current(struct io_conn *conn)
{
current = conn;
}
static inline bool doing_debug_on(struct io_conn *conn)
{
return io_debug_conn && io_debug_conn(conn);
}
static inline bool doing_debug(void)
{
return io_debug_conn;
}
#else
static inline void set_current(struct io_conn *conn)
{
}
static inline bool doing_debug_on(struct io_conn *conn)
{
return false;
}
static inline bool doing_debug(void)
{
return false;
}
#endif
bool add_listener(struct io_listener *l);
bool add_conn(struct io_conn *c);
bool add_duplex(struct io_conn *c);
void del_listener(struct io_listener *l);
void backend_plan_changed(struct io_conn *conn);
void backend_wait_changed(const void *wait);
void backend_add_timeout(struct io_conn *conn, struct timerel duration);
void backend_del_timeout(struct io_conn *conn);
void backend_new_closing(struct io_conn *conn);
void backend_new_always(struct io_conn *conn);
void backend_new_plan(struct io_conn *conn);
void backend_plan_done(struct io_conn *conn);
void backend_wake(const void *wait);
void backend_del_conn(struct io_conn *conn);
void io_ready(struct io_conn *conn);
void io_ready(struct io_conn *conn, int pollflags);
void io_do_always(struct io_conn *conn);
void io_do_wakeup(struct io_conn *conn, struct io_plan *plan);
void *do_io_loop(struct io_conn **ready);
#endif /* CCAN_IO_BACKEND_H */
This diff is collapsed.
This diff is collapsed.
......@@ -4,129 +4,50 @@
struct io_conn;
/**
* struct io_plan - a plan of what I/O to do.
* @pollflag: POLLIN or POLLOUT.
* @io: function to call when fd is available for @pollflag.
* @next: function to call after @io returns true.
* @next_arg: argument to @next.
* @u1: scratch area for I/O.
* @u2: scratch area for I/O.
*
* When the fd is POLLIN or POLLOUT (according to @pollflag), @io is
* called. If it returns -1, io_close() becomed the new plan (and errno
* is saved). If it returns 1, @next is called, otherwise @io is
* called again when @pollflag is available.
*
* You can use this to write your own io_plan functions.
* union io_plan_arg - scratch space for struct io_plan read/write fns.
*/
struct io_plan {
int pollflag;
/* Only NULL if idle. */
int (*io)(int fd, struct io_plan *plan);
/* Only NULL if closing. */
struct io_plan (*next)(struct io_conn *, void *arg);
void *next_arg;
union io_plan_arg {
char *cp;
void *vp;
const void *const_vp;
size_t s;
char c[sizeof(size_t)];
};
union {
char *cp;
void *vp;
const void *const_vp;
size_t s;
char c[sizeof(size_t)];
} u1;
union {
char *p;
void *vp;
const void *const_vp;
size_t s;
char c[sizeof(size_t)];
} u2;
enum io_plan_status {
/* As before calling next function. */
IO_UNSET,
/* Normal. */
IO_POLLING,
/* Waiting for io_wake */
IO_WAITING,
/* Always do this. */
IO_ALWAYS,
/* Closing (both plans will be the same). */
IO_CLOSING
};
#ifdef DEBUG
/**
* io_debug_conn - routine to select connection(s) to debug.
*
* If this is set, the routine should return true if the connection is a
* debugging candidate. If so, the callchain for I/O operations on this
* connection will be linear, for easier use of a debugger.
*
* You will also see calls to any callbacks which wake the connection
* which is being debugged.
*
* Example:
* static bool debug_all(struct io_conn *conn)
* {
* return true();
* }
* ...
* io_debug_conn = debug_all;
* struct io_plan - one half of I/O to do
* @status: the status of this plan.
* @io: function to call when fd becomes read/writable, returns 0 to be
* called again, 1 if it's finished, and -1 on error (fd will be closed)
* @next: the next function which is called if io returns 1.
* @next_arg: the argument to @next
* @u1, @u2: scratch space for @io.
*/
extern bool (*io_debug_conn)(struct io_conn *conn);
struct io_plan {
enum io_plan_status status;
/**
* io_debug - if we're debugging the current connection, call immediately.
*
* This determines if we are debugging the current connection: if so,
* it immediately applies the plan and calls back into io_loop() to
* create a linear call chain.
*
* Example:
* #define io_idle() io_debug(io_idle_())
* struct io_plan io_idle_(void);
*/
struct io_plan io_debug(struct io_plan plan);
int (*io)(int fd, struct io_plan *plan);
/**
* io_debug_io - return from function which actually does I/O.
*
* This determines if we are debugging the current connection: if so,
* it immediately sets the next function and calls into io_loop() to
* create a linear call chain.
*
* Example:
*
* static int do_write(int fd, struct io_plan *plan)
* {
* ssize_t ret = write(fd, plan->u.write.buf, plan->u.write.len);
* if (ret < 0)
* return io_debug_io(-1);
*
* plan->u.write.buf += ret;
* plan->u.write.len -= ret;
* return io_debug_io(plan->u.write.len == 0);
* }
*/
int io_debug_io(int ret);
struct io_plan *(*next)(struct io_conn *, void *arg);
void *next_arg;
/**
* io_plan_no_debug - mark the next plan not to be called immediately.
*
* Most routines which take a plan are about to apply it to the current
* connection. We (ab)use this pattern for debugging: as soon as such a
* plan is created it is called, to create a linear call chain.
*
* Some routines, like io_break(), io_duplex() and io_wake() take an
* io_plan, but they must not be applied immediately to the current
* connection, so we call this first.
*
* Example:
* #define io_break(ret, plan) (io_plan_no_debug(), io_break_((ret), (plan)))
* struct io_plan io_break_(void *ret, struct io_plan plan);
*/
#define io_plan_no_debug() ((io_plan_nodebug = true))
union io_plan_arg u1, u2;
};
extern bool io_plan_nodebug;
#else
static inline struct io_plan io_debug(struct io_plan plan)
{
return plan;
}
static inline int io_debug_io(int ret)
{
return ret;
}
#define io_plan_no_debug() (void)0
#endif
/* Helper to get a conn's io_plan. */
struct io_plan *io_get_plan(struct io_conn *conn, enum io_direction dir);
#endif /* CCAN_IO_PLAN_H */
This diff is collapsed.
#define DEBUG
#define PORT "64001"
#define main real_main
int real_main(void);
#include "run-01-start-finish.c"
#undef main
static bool always_debug(struct io_conn *conn) { return true; }
int main(void) { io_debug_conn = always_debug; return real_main(); }
......@@ -16,15 +16,17 @@ static void finish_ok(struct io_conn *conn, int *state)
ok1(*state == 1);
ok1(io_conn_fd(conn) == expected_fd);
(*state)++;
io_break(state + 1, io_never());
io_break(state + 1);
}
static void init_conn(int fd, int *state)
static struct io_plan *init_conn(struct io_conn *conn, int *state)
{
ok1(*state == 0);
(*state)++;
expected_fd = fd;
io_set_finish(io_new_conn(fd, io_close()), finish_ok, state);
expected_fd = io_conn_fd(conn);
io_set_finish(conn, finish_ok, state);
return io_close(conn);
}
static int make_listen_fd(const char *port, struct addrinfo **info)
......@@ -70,7 +72,7 @@ int main(void)
plan_tests(10);
fd = make_listen_fd(PORT, &addrinfo);
ok1(fd >= 0);
l = io_new_listener(fd, init_conn, &state);
l = io_new_listener(NULL, fd, init_conn, &state);
ok1(l);
fflush(stdout);
if (!fork()) {
......
#define DEBUG
#define PORT "64002"
#define main real_main
int real_main(void);
#include "run-02-read.c"
#undef main
static bool always_debug(struct io_conn *conn) { return true; }
int main(void) { io_debug_conn = always_debug; return real_main(); }
......@@ -19,17 +19,16 @@ static void finish_ok(struct io_conn *conn, struct data *d)
{
ok1(d->state == 1);
d->state++;
io_break(d, io_never());
io_break(d);
}
static void init_conn(int fd, struct data *d)
static struct io_plan *init_conn(struct io_conn *conn, struct data *d)
{
ok1(d->state == 0);
d->state++;
io_set_finish(io_new_conn(fd,
io_read(d->buf, sizeof(d->buf), io_close_cb, d)),
finish_ok, d);
io_set_finish(conn, finish_ok, d);
return io_read(conn, d->buf, sizeof(d->buf), io_close_cb, d);
}
static int make_listen_fd(const char *port, struct addrinfo **info)
......@@ -76,7 +75,7 @@ int main(void)
d->state = 0;
fd = make_listen_fd(PORT, &addrinfo);
ok1(fd >= 0);
l = io_new_listener(fd, init_conn, d);
l = io_new_listener(NULL, fd, init_conn, d);
ok1(l);
fflush(stdout);
if (!fork()) {
......
#define DEBUG
#define PORT "64003"
#define main real_main
int real_main(void);
#include "run-03-readpartial.c"
#undef main
static bool always_debug(struct io_conn *conn) { return true; }
int main(void) { io_debug_conn = always_debug; return real_main(); }
......@@ -20,18 +20,18 @@ static void finish_ok(struct io_conn *conn, struct data *d)
{
ok1(d->state == 1);
d->state++;
io_break(d, io_never());
io_break(d);
}
static void init_conn(int fd, struct data *d)
static struct io_plan *init_conn(struct io_conn *conn, struct data *d)
{
ok1(d->state == 0);
d->state++;
d->bytes = sizeof(d->buf);
io_set_finish(io_new_conn(fd,
io_read_partial(d->buf, &d->bytes, io_close_cb, d)),
finish_ok, d);
io_set_finish(conn, finish_ok, d);
return io_read_partial(conn, d->buf, sizeof(d->buf), &d->bytes,
io_close_cb, d);
}
static int make_listen_fd(const char *port, struct addrinfo **info)
......@@ -96,7 +96,7 @@ int main(void)
d->state = 0;
fd = make_listen_fd(PORT, &addrinfo);
ok1(fd >= 0);
l = io_new_listener(fd, init_conn, d);
l = io_new_listener(NULL, fd, init_conn, d);
ok1(l);
fflush(stdout);
if (!fork()) {
......
#define DEBUG
#define PORT "64004"
#define main real_main
int real_main(void);
#include "run-04-writepartial.c"
#undef main
static bool always_debug(struct io_conn *conn) { return true; }
int main(void) { io_debug_conn = always_debug; return real_main(); }
......@@ -20,16 +20,17 @@ static void finish_ok(struct io_conn *conn, struct data *d)
{
ok1(d->state == 1);
d->state++;
io_break(d, io_never());
io_break(d);
}
static void init_conn(int fd, struct data *d)
static struct io_plan *init_conn(struct io_conn *conn, struct data *d)
{
ok1(d->state == 0);
d->state++;
io_set_finish(io_new_conn(fd,
io_write_partial(d->buf, &d->bytes, io_close_cb, d)),
finish_ok, d);
io_set_finish(conn, finish_ok, d);
return io_write_partial(conn, d->buf, d->bytes, &d->bytes,
io_close_cb, d);
}
static int make_listen_fd(const char *port, struct addrinfo **info)
......@@ -97,7 +98,7 @@ int main(void)
memset(d->buf, 'a', d->bytes);
fd = make_listen_fd(PORT, &addrinfo);
ok1(fd >= 0);
l = io_new_listener(fd, init_conn, d);
l = io_new_listener(NULL, fd, init_conn, d);
ok1(l);
fflush(stdout);
if (!fork()) {
......
#define DEBUG
#define PORT "64005"
#define main real_main
int real_main(void);
#include "run-05-write.c"
#undef main
static bool always_debug(struct io_conn *conn) { return true; }
int main(void) { io_debug_conn = always_debug; return real_main(); }
......@@ -20,16 +20,15 @@ static void finish_ok(struct io_conn *conn, struct data *d)
{
ok1(d->state == 1);
d->state++;
io_break(d, io_never());
io_break(d);
}
static void init_conn(int fd, struct data *d)
static struct io_plan *init_conn(struct io_conn *conn, struct data *d)
{
ok1(d->state == 0);
d->state++;
io_set_finish(io_new_conn(fd, io_write(d->buf, d->bytes,
io_close_cb, d)),
finish_ok, d);
io_set_finish(conn, finish_ok, d);
return io_write(conn, d->buf, d->bytes, io_close_cb, d);
}
static int make_listen_fd(const char *port, struct addrinfo **info)
......@@ -100,7 +99,7 @@ int main(void)
memset(d->buf, 'a', d->bytes);
fd = make_listen_fd(PORT, &addrinfo);
ok1(fd >= 0);
l = io_new_listener(fd, init_conn, d);
l = io_new_listener(NULL, fd, init_conn, d);
ok1(l);
fflush(stdout);
if (!fork()) {
......
#define DEBUG
#define PORT "64006"
#define main real_main
int real_main(void);
#include "run-06-idle.c"
#undef main
static bool always_debug(struct io_conn *conn) { return true; }
int main(void) { io_debug_conn = always_debug; return real_main(); }
......@@ -20,11 +20,11 @@ struct data {
char buf[4];
};
static struct io_plan read_done(struct io_conn *conn, struct data *d)
static struct io_plan *read_done(struct io_conn *conn, struct data *d)
{
ok1(d->state == 2 || d->state == 3);
d->state++;
return io_close();
return io_close(conn);
}
static void finish_waker(struct io_conn *conn, struct data *d)
......@@ -38,33 +38,40 @@ static void finish_idle(struct io_conn *conn, struct data *d)
{
ok1(d->state == 3);
d->state++;
io_break(d, io_never());
io_break(d);
}
static struct io_plan never(struct io_conn *conn, void *arg)
static struct io_plan *never(struct io_conn *conn, void *arg)
{
abort();
}
static struct io_plan read_buf(struct io_conn *conn, struct data *d)
static struct io_plan *read_buf(struct io_conn *conn, struct data *d)
{
return io_read(d->buf, sizeof(d->buf), read_done, d);
return io_read(conn, d->buf, sizeof(d->buf), read_done, d);
}
static void init_conn(int fd, struct data *d)
static struct io_plan *init_waker(struct io_conn *conn, void *unused)
{
/* This is /dev/null, so will never succeed. */
return io_read(conn, unused, 1, never, NULL);
}
static struct io_plan *init_idle(struct io_conn *conn, struct data *d)
{
int fd2;
ok1(d->state == 0);
d->state++;
idler = io_new_conn(fd, io_wait(d, read_buf, d));
io_set_finish(idler, finish_idle, d);
idler = conn;
io_set_finish(conn, finish_idle, d);
/* This will wake us up, as read will fail. */
fd2 = open("/dev/null", O_RDONLY);
ok1(fd2 >= 0);
io_set_finish(io_new_conn(fd2, io_read(idler, 1, never, NULL)),
finish_waker, d);
io_set_finish(io_new_conn(NULL, fd2, init_waker, d), finish_waker, d);
return io_wait(conn, d, IO_IN, read_buf, d);
}
static int make_listen_fd(const char *port, struct addrinfo **info)
......@@ -111,7 +118,7 @@ int main(void)
d->state = 0;
fd = make_listen_fd(PORT, &addrinfo);
ok1(fd >= 0);
l = io_new_listener(fd, init_conn, d);
l = io_new_listener(NULL, fd, init_idle, d);
ok1(l);
fflush(stdout);
if (!fork()) {
......
#define DEBUG
#define PORT "64007"
#define main real_main
int real_main(void);
#include "run-07-break.c"
#undef main
static bool always_debug(struct io_conn *conn) { return true; }
int main(void) { io_debug_conn = always_debug; return real_main(); }
......@@ -15,11 +15,11 @@ struct data {
char buf[4];
};
static struct io_plan read_done(struct io_conn *conn, struct data *d)
static struct io_plan *read_done(struct io_conn *conn, struct data *d)
{
ok1(d->state == 1);
d->state++;
return io_close();
return io_close(conn);
}
static void finish_ok(struct io_conn *conn, struct data *d)
......@@ -28,15 +28,15 @@ static void finish_ok(struct io_conn *conn, struct data *d)
d->state++;
}
static void init_conn(int fd, struct data *d)
static struct io_plan *init_conn(struct io_conn *conn, struct data *d)
{
ok1(d->state == 0);
d->state++;
io_set_finish(io_new_conn(fd,
io_break(d,
io_read(d->buf, sizeof(d->buf), read_done, d))),
finish_ok, d);
io_set_finish(conn, finish_ok, d);
io_break(d);
return io_read(conn, d->buf, sizeof(d->buf), read_done, d);
}
static int make_listen_fd(const char *port, struct addrinfo **info)
......@@ -83,7 +83,7 @@ int main(void)
d->state = 0;
fd = make_listen_fd(PORT, &addrinfo);
ok1(fd >= 0);
l = io_new_listener(fd, init_conn, d);
l = io_new_listener(NULL, fd, init_conn, d);
ok1(l);
fflush(stdout);
if (!fork()) {
......
#define DEBUG
#define main real_main
int real_main(void);
#include "run-08-hangup-on-idle.c"
#undef main
static bool always_debug(struct io_conn *conn) { return true; }
int main(void) { io_debug_conn = always_debug; return real_main(); }
......@@ -8,39 +8,49 @@
static int fds2[2];
static struct io_plan timeout_wakeup(struct io_conn *conn, char *buf)
static struct io_plan *read_in(struct io_conn *conn, char *buf)
{
/* This kills the dummy connection. */
close(fds2[1]);
return io_read(buf, 16, io_close_cb, NULL);
return io_read(conn, buf, 16, io_close_cb, NULL);
}
static struct io_plan never(struct io_conn *conn, void *unused)
static struct io_plan *setup_waiter(struct io_conn *conn, char *buf)
{
abort();
return io_wait(conn, buf, IO_IN, read_in, buf);
}
static struct io_plan *wake_and_close(struct io_conn *conn, char *buf)
{
io_wake(buf);
return io_close(conn);
}
static struct io_plan *setup_waker(struct io_conn *conn, char *buf)
{
return io_read(conn, buf, 1, wake_and_close, buf);
}
int main(void)
{
int fds[2];
struct io_conn *conn;
char buf[16];
plan_tests(4);
ok1(pipe(fds) == 0);
/* Write then close. */
io_new_conn(fds[1], io_write("hello there world", 16,
io_close_cb, NULL));
conn = io_new_conn(fds[0], io_wait(buf, never, NULL));
/* To avoid assert(num_waiting) */
io_new_conn(NULL, fds[0], setup_waiter, buf);
ok1(pipe(fds2) == 0);
io_new_conn(fds2[0], io_read(buf, 16, io_close_cb, NULL));
io_new_conn(NULL, fds2[0], setup_waker, buf);
if (fork() == 0) {
write(fds[1], "hello there world", 16);
close(fds[1]);
/* After half a second, it will read. */
io_timeout(conn, time_from_msec(500), timeout_wakeup, buf);
/* Now wake it. */
sleep(1);
write(fds2[1], "", 1);
exit(0);
}
ok1(io_loop() == NULL);
ok1(memcmp(buf, "hello there world", 16) == 0);
......
#define DEBUG
#define main real_main
int real_main(void);
#include "run-08-read-after-hangup.c"
#undef main
static bool always_debug(struct io_conn *conn) { return true; }
int main(void) { io_debug_conn = always_debug; return real_main(); }
......@@ -9,15 +9,25 @@
static char inbuf[8];
static struct io_plan wake_it(struct io_conn *conn, struct io_conn *reader)
static struct io_plan *wake_it(struct io_conn *conn, struct io_conn *reader)
{
io_wake(inbuf);
return io_close();
return io_close(conn);
}
static struct io_plan read_buf(struct io_conn *conn, void *unused)
static struct io_plan *read_buf(struct io_conn *conn, void *unused)
{
return io_read(inbuf, 8, io_close_cb, NULL);
return io_read(conn, inbuf, 8, io_close_cb, NULL);
}
static struct io_plan *init_writer(struct io_conn *conn, struct io_conn *wakeme)
{
return io_write(conn, "EASYTEST", 8, wake_it, wakeme);
}
static struct io_plan *init_waiter(struct io_conn *conn, void *unused)
{
return io_wait(conn, inbuf, IO_IN, read_buf, NULL);
}
int main(void)
......@@ -28,8 +38,8 @@ int main(void)
plan_tests(3);
ok1(pipe(fds) == 0);
conn = io_new_conn(fds[0], io_wait(inbuf, read_buf, NULL));
io_new_conn(fds[1], io_write("EASYTEST", 8, wake_it, conn));
conn = io_new_conn(NULL, fds[0], init_waiter, NULL);
io_new_conn(conn, fds[1], init_writer, conn);
ok1(io_loop() == NULL);
ok1(memcmp(inbuf, "EASYTEST", sizeof(inbuf)) == 0);
......
#define DEBUG
#define PORT "64009"
#define main real_main
int real_main(void);
#include "run-09-connect.c"
#undef main
static bool always_debug(struct io_conn *conn) { return true; }
int main(void) { io_debug_conn = always_debug; return real_main(); }
......@@ -11,31 +11,33 @@
#endif
static struct io_listener *l;
static struct data *d2;
struct data {
int state;
char buf[10];
};
static struct io_plan closer(struct io_conn *conn, struct data *d)
static struct io_plan *closer(struct io_conn *conn, struct data *d)
{
d->state++;
return io_close();
return io_close(conn);
}
static struct io_plan connected(struct io_conn *conn, struct data *d2)
static struct io_plan *connected(struct io_conn *conn, struct data *d2)
{
ok1(d2->state == 0);
d2->state++;
return io_read(d2->buf, sizeof(d2->buf), closer, d2);
return io_read(conn, d2->buf, sizeof(d2->buf), closer, d2);
}
static void init_conn(int fd, struct data *d)
static struct io_plan *init_conn(struct io_conn *conn, struct data *d)
{
ok1(d->state == 0);
d->state++;
io_new_conn(fd, io_write(d->buf, sizeof(d->buf), closer, d));
io_close_listener(l);
return io_write(conn, d->buf, sizeof(d->buf), closer, d);
}
static int make_listen_fd(const char *port, struct addrinfo **info)
......@@ -70,9 +72,17 @@ static int make_listen_fd(const char *port, struct addrinfo **info)
return fd;
}
static struct io_plan *setup_connect(struct io_conn *conn,
struct addrinfo *addrinfo)
{
d2 = malloc(sizeof(*d2));
d2->state = 0;
return io_connect(conn, addrinfo, connected, d2);
}
int main(void)
{
struct data *d = malloc(sizeof(*d)), *d2 = malloc(sizeof(*d2));
struct data *d = malloc(sizeof(*d));
struct addrinfo *addrinfo;
int fd;
......@@ -82,13 +92,12 @@ int main(void)
memset(d->buf, 'a', sizeof(d->buf));
fd = make_listen_fd(PORT, &addrinfo);
ok1(fd >= 0);
l = io_new_listener(fd, init_conn, d);
l = io_new_listener(NULL, fd, init_conn, d);
ok1(l);
fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
addrinfo->ai_protocol);
d2->state = 0;
ok1(io_new_conn(fd, io_connect(fd, addrinfo, connected, d2)));
ok1(io_new_conn(NULL, fd, setup_connect, addrinfo));
ok1(io_loop() == NULL);
ok1(d->state == 2);
......
#define DEBUG
#define PORT "64010"
#define main real_main
int real_main(void);
#include "run-10-many.c"
#undef main
/* We stack overflow if we debug all of them! */
static bool debug_one(struct io_conn *conn)
{
return conn == buf[1].reader;
}
int main(void) { io_debug_conn = debug_one; return real_main(); }
......@@ -15,44 +15,49 @@ struct buffer {
char buf[32];
};
static struct io_plan poke_reader(struct io_conn *conn, struct buffer *buf);
static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf);
static struct io_plan *poke_reader(struct io_conn *conn, struct buffer *buf);
static struct io_plan *poke_writer(struct io_conn *conn, struct buffer *buf);
static struct io_plan read_buf(struct io_conn *conn, struct buffer *buf)
static struct io_plan *read_buf(struct io_conn *conn, struct buffer *buf)
{
return io_read(&buf->buf, sizeof(buf->buf), poke_writer, buf);
return io_read(conn, &buf->buf, sizeof(buf->buf), poke_writer, buf);
}
static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf)
static struct io_plan *poke_writer(struct io_conn *conn, struct buffer *buf)
{
assert(conn == buf->reader);
if (buf->iters == NUM_ITERS)
return io_close();
return io_close(conn);
/* You write. */
io_wake(&buf->writer);
/* I'll wait until you wake me. */
return io_wait(&buf->reader, read_buf, buf);
return io_wait(conn, &buf->reader, IO_IN, read_buf, buf);
}
static struct io_plan write_buf(struct io_conn *conn, struct buffer *buf)
static struct io_plan *write_buf(struct io_conn *conn, struct buffer *buf)
{
return io_write(&buf->buf, sizeof(buf->buf), poke_reader, buf);
return io_write(conn, &buf->buf, sizeof(buf->buf), poke_reader, buf);
}
static struct io_plan poke_reader(struct io_conn *conn, struct buffer *buf)
static struct io_plan *poke_reader(struct io_conn *conn, struct buffer *buf)
{
assert(conn == buf->writer);
/* You read. */
io_wake(&buf->reader);
if (++buf->iters == NUM_ITERS)
return io_close();
return io_close(conn);
/* I'll wait until you tell me to write. */
return io_wait(&buf->writer, write_buf, buf);
return io_wait(conn, &buf->writer, IO_OUT, write_buf, buf);
}
static struct io_plan *setup_reader(struct io_conn *conn, struct buffer *buf)
{
return io_wait(conn, &buf->reader, IO_IN, read_buf, buf);
}
static struct buffer buf[NUM];
......@@ -75,12 +80,11 @@ int main(void)
sprintf(buf[i].buf, "%i-%i", i, i);
/* Wait for writer to tell us to read. */
buf[i].reader = io_new_conn(last_read,
io_wait(&buf[i].reader, read_buf,
&buf[i]));
buf[i].reader = io_new_conn(NULL, last_read,
setup_reader, &buf[i]);
if (!buf[i].reader)
break;
buf[i].writer = io_new_conn(fds[1], write_buf(NULL, &buf[i]));
buf[i].writer = io_new_conn(NULL, fds[1], write_buf, &buf[i]);
if (!buf[i].writer)
break;
last_read = fds[0];
......@@ -91,10 +95,9 @@ int main(void)
/* Last one completes the cirle. */
i = 0;
sprintf(buf[i].buf, "%i-%i", i, i);
buf[i].reader = io_new_conn(last_read,
io_wait(&buf[i].reader, read_buf, &buf[i]));
buf[i].reader = io_new_conn(NULL, last_read, setup_reader, &buf[i]);
ok1(buf[i].reader);
buf[i].writer = io_new_conn(last_write, write_buf(NULL, &buf[i]));
buf[i].writer = io_new_conn(NULL, last_write, write_buf, &buf[i]);
ok1(buf[i].writer);
/* They should eventually exit */
......
#define DEBUG
#define PORT "64012"
#define main real_main
int real_main(void);
#include "run-12-bidir.c"
#undef main
static bool always_debug(struct io_conn *conn) { return true; }
int main(void) { io_debug_conn = always_debug; return real_main(); }
......@@ -6,6 +6,7 @@
#include <sys/wait.h>
#include <stdio.h>
#if 0
#ifndef PORT
#define PORT "65012"
#endif
......@@ -22,10 +23,10 @@ static void finish_ok(struct io_conn *conn, struct data *d)
d->state++;
}
static struct io_plan write_done(struct io_conn *conn, struct data *d)
static struct io_plan *write_done(struct io_conn *conn, struct data *d)
{
d->state++;
return io_close();
return io_close(conn);
}
static void init_conn(int fd, struct data *d)
......@@ -130,3 +131,9 @@ int main(void)
/* This exits depending on whether all tests passed */
return exit_status();
}
#else
int main(void)
{
return 0;
}
#endif
#define DEBUG
#define PORT "64013"
#define main real_main
int real_main(void);
#include "run-13-all-idle.c"
#undef main
static bool always_debug(struct io_conn *conn) { return true; }
int main(void) { io_debug_conn = always_debug; return real_main(); }
......@@ -7,6 +7,11 @@
#include <stdio.h>
#include <signal.h>
static struct io_plan *setup_waiter(struct io_conn *conn, int *status)
{
return io_wait(conn, status, IO_IN, io_close_cb, NULL);
}
int main(void)
{
int status;
......@@ -17,7 +22,7 @@ int main(void)
int fds[2];
ok1(pipe(fds) == 0);
io_new_conn(fds[0], io_wait(&status, io_close_cb, NULL));
io_new_conn(NULL, fds[0], setup_waiter, &status);
io_loop();
exit(1);
}
......
#define DEBUG
#define PORT "64014"
#define main real_main
int real_main(void);
#include "run-14-duplex-both-read.c"
#undef main
static bool always_debug(struct io_conn *conn) { return true; }
int main(void) { io_debug_conn = always_debug; return real_main(); }
......@@ -8,6 +8,7 @@
#include <sys/wait.h>
#include <stdio.h>
#if 0
#ifndef PORT
#define PORT "65014"
#endif
......@@ -138,3 +139,9 @@ int main(void)
/* This exits depending on whether all tests passed */
return exit_status();
}
#else
int main(void)
{
return 0;
}
#endif
#define DEBUG
#define PORT "64015"
#define main real_main
int real_main(void);
#include "run-15-timeout.c"
#undef main
static bool always_debug(struct io_conn *conn) { return true; }
int main(void) { io_debug_conn = always_debug; return real_main(); }
......@@ -7,6 +7,7 @@
#include <stdio.h>
#include <unistd.h>
#if 0
#ifndef PORT
#define PORT "65015"
#endif
......@@ -38,7 +39,7 @@ static void finish_ok(struct io_conn *conn, struct data *d)
{
ok1(d->state == 2);
d->state++;
io_break(d, io_never());
io_break(d);
}
static void init_conn(int fd, struct data *d)
......@@ -172,3 +173,9 @@ int main(void)
/* This exits depending on whether all tests passed */
return exit_status();
}
#else
int main(void)
{
return 0;
}
#endif
......@@ -8,6 +8,7 @@
#include <sys/wait.h>
#include <stdio.h>
#if 0
#ifndef PORT
#define PORT "65016"
#endif
......@@ -140,3 +141,9 @@ int main(void)
/* This exits depending on whether all tests passed */
return exit_status();
}
#else
int main(void)
{
return 0;
}
#endif
#define DEBUG
#define PORT "64017"
#define main real_main
int real_main(void);
#include "run-17-homemade-io.c"
#undef main
static bool always_debug(struct io_conn *conn) { return true; }
int main(void) { io_debug_conn = always_debug; return real_main(); }
......@@ -20,7 +20,7 @@ static void finish_ok(struct io_conn *conn, struct packet *pkt)
{
ok1(pkt->state == 3);
pkt->state++;
io_break(pkt, io_never());
io_break(pkt);
}
static int do_read_packet(int fd, struct io_plan *plan)
......@@ -41,7 +41,7 @@ static int do_read_packet(int fd, struct io_plan *plan)
ok1(pkt->state == 2);
pkt->state++;
if (pkt->len == 0)
return io_debug_io(1);
return 1;
if (!pkt->contents && !(pkt->contents = malloc(pkt->len)))
goto fail;
else {
......@@ -58,39 +58,39 @@ static int do_read_packet(int fd, struct io_plan *plan)
plan->u2.s += ret;
/* Finished? */
return io_debug_io(plan->u2.s >= sizeof(pkt->len)
&& plan->u2.s == pkt->len + sizeof(pkt->len));
return plan->u2.s >= sizeof(pkt->len)
&& plan->u2.s == pkt->len + sizeof(pkt->len);
fail:
free(pkt->contents);
return io_debug_io(-1);
return -1;
}
static struct io_plan io_read_packet(struct packet *pkt,
struct io_plan (*cb)(struct io_conn *, void *),
void *arg)
static struct io_plan *io_read_packet(struct io_conn *conn,
struct packet *pkt,
struct io_plan *(*cb)(struct io_conn *, void *),
void *arg)
{
struct io_plan plan;
struct io_plan *plan = io_get_plan(conn, IO_IN);
assert(cb);
pkt->contents = NULL;
plan.u1.vp = pkt;
plan.u2.s = 0;
plan.io = do_read_packet;
plan.next = cb;
plan.next_arg = arg;
plan.pollflag = POLLIN;
plan->u1.vp = pkt;
plan->u2.s = 0;
plan->io = do_read_packet;
plan->next = cb;
plan->next_arg = arg;
return plan;
}
static void init_conn(int fd, struct packet *pkt)
static struct io_plan *init_conn(struct io_conn *conn, struct packet *pkt)
{
ok1(pkt->state == 0);
pkt->state++;
io_set_finish(io_new_conn(fd, io_read_packet(pkt, io_close_cb, pkt)),
finish_ok, pkt);
io_set_finish(conn, finish_ok, pkt);
return io_read_packet(conn, pkt, io_close_cb, pkt);
}
static int make_listen_fd(const char *port, struct addrinfo **info)
......@@ -137,7 +137,7 @@ int main(void)
pkt->state = 0;
fd = make_listen_fd(PORT, &addrinfo);
ok1(fd >= 0);
l = io_new_listener(fd, init_conn, pkt);
l = io_new_listener(NULL, fd, init_conn, pkt);
ok1(l);
fflush(stdout);
if (!fork()) {
......
#define DEBUG
#define PORT "64018"
#define main real_main
int real_main(void);
#include "run-18-errno.c"
#undef main
static bool always_debug(struct io_conn *conn) { return true; }
int main(void) { io_debug_conn = always_debug; return real_main(); }
......@@ -22,23 +22,24 @@ static void finish_EBADF(struct io_conn *conn, int *state)
ok1(errno == EBADF);
ok1(*state == 3);
(*state)++;
io_break(state + 1, io_close());
io_break(state + 1);
}
static void init_conn(int fd, int *state)
static struct io_plan *init_conn(struct io_conn *conn, int *state)
{
if (*state == 0) {
(*state)++;
errno = 100;
io_set_finish(io_new_conn(fd, io_close()), finish_100, state);
io_set_finish(conn, finish_100, state);
return io_close(conn);
} else {
ok1(*state == 2);
(*state)++;
close(fd);
close(io_conn_fd(conn));
errno = 0;
io_set_finish(io_new_conn(fd, io_read(state, 1,
io_close_cb, NULL)),
finish_EBADF, state);
io_set_finish(conn, finish_EBADF, state);
return io_read(conn, state, 1, io_close_cb, NULL);
}
}
......@@ -85,7 +86,7 @@ int main(void)
plan_tests(12);
fd = make_listen_fd(PORT, &addrinfo);
ok1(fd >= 0);
l = io_new_listener(fd, init_conn, &state);
l = io_new_listener(NULL, fd, init_conn, &state);
ok1(l);
fflush(stdout);
if (!fork()) {
......
#define DEBUG
#define PORT "64019"
#define main real_main
int real_main(void);
#include "run-19-always.c"
#undef main
static bool always_debug(struct io_conn *conn) { return true; }
int main(void) { io_debug_conn = always_debug; return real_main(); }
......@@ -20,21 +20,22 @@ static void finish_ok(struct io_conn *conn, struct data *d)
{
ok1(d->state == 1);
d->state++;
io_break(d, io_never());
io_break(d);
}
static struct io_plan write_buf(struct io_conn *conn, struct data *d)
static struct io_plan *write_buf(struct io_conn *conn, struct data *d)
{
return io_write(d->buf, d->bytes, io_close_cb, d);
return io_write(conn, d->buf, d->bytes, io_close_cb, d);
}
static void init_conn(int fd, struct data *d)
static struct io_plan *init_conn(struct io_conn *conn, struct data *d)
{
ok1(d->state == 0);
d->state++;
io_set_finish(conn, finish_ok, d);
/* Empty read should run immediately... */
io_set_finish(io_new_conn(fd, io_read(NULL, 0, write_buf, d)),
finish_ok, d);
return io_read(conn, NULL, 0, write_buf, d);
}
static int make_listen_fd(const char *port, struct addrinfo **info)
......@@ -105,7 +106,7 @@ int main(void)
memset(d->buf, 'a', d->bytes);
fd = make_listen_fd(PORT, &addrinfo);
ok1(fd >= 0);
l = io_new_listener(fd, init_conn, d);
l = io_new_listener(NULL, fd, init_conn, d);
ok1(l);
fflush(stdout);
if (!fork()) {
......
#include <ccan/tap/tap.h>
#include <stdlib.h>
#include <stdio.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/wait.h>
/* Make sure we override these! */
static void *no_malloc(size_t size)
{
abort();
}
static void *no_realloc(void *p, size_t size)
{
abort();
}
static void no_free(void *p)
{
abort();
}
#define malloc no_malloc
#define realloc no_realloc
#define free no_free
#include <ccan/io/poll.c>
#include <ccan/io/io.c>
#undef malloc
#undef realloc
#undef free
static unsigned int alloc_count, realloc_count, free_count;
static void *ptrs[100];
static void **find_ptr(void *p)
{
unsigned int i;
for (i = 0; i < 100; i++)
if (ptrs[i] == p)
return ptrs + i;
return NULL;
}
static void *allocfn(size_t size)
{
alloc_count++;
return *find_ptr(NULL) = malloc(size);
}
static void *reallocfn(void *ptr, size_t size)
{
realloc_count++;
if (!ptr)
alloc_count++;
return *find_ptr(ptr) = realloc(ptr, size);
}
static void freefn(void *ptr)
{
free_count++;
free(ptr);
*find_ptr(ptr) = NULL;
}
#ifndef PORT
#define PORT "65115"
#endif
struct data {
int state;
int timeout_usec;
bool timed_out;
char buf[4];
};
static struct io_plan no_timeout(struct io_conn *conn, struct data *d)
{
ok1(d->state == 1);
d->state++;
return io_close();
}
static struct io_plan timeout(struct io_conn *conn, struct data *d)
{
ok1(d->state == 1);
d->state++;
d->timed_out = true;
return io_close();
}
static void finish_ok(struct io_conn *conn, struct data *d)
{
ok1(d->state == 2);
d->state++;
io_break(d, io_never());
}
static void init_conn(int fd, struct data *d)
{
struct io_conn *conn;
ok1(d->state == 0);
d->state++;
conn = io_new_conn(fd, io_read(d->buf, sizeof(d->buf), no_timeout, d));
io_set_finish(conn, finish_ok, d);
io_timeout(conn, time_from_usec(d->timeout_usec), timeout, d);
}
static int make_listen_fd(const char *port, struct addrinfo **info)
{
int fd, on = 1;
struct addrinfo *addrinfo, hints;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE;
hints.ai_protocol = 0;
if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
return -1;
fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
addrinfo->ai_protocol);
if (fd < 0)
return -1;
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
close(fd);
return -1;
}
if (listen(fd, 1) != 0) {
close(fd);
return -1;
}
*info = addrinfo;
return fd;
}
int main(void)
{
struct data *d = allocfn(sizeof(*d));
struct addrinfo *addrinfo;
struct io_listener *l;
int fd, status;
io_set_alloc(allocfn, reallocfn, freefn);
/* This is how many tests you plan to run */
plan_tests(25);
d->state = 0;
d->timed_out = false;
d->timeout_usec = 100000;
fd = make_listen_fd(PORT, &addrinfo);
ok1(fd >= 0);
l = io_new_listener(fd, init_conn, d);
ok1(l);
fflush(stdout);
if (!fork()) {
int i;
io_close_listener(l);
fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
addrinfo->ai_protocol);
if (fd < 0)
exit(1);
if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
exit(2);
signal(SIGPIPE, SIG_IGN);
usleep(500000);
for (i = 0; i < strlen("hellothere"); i++) {
if (write(fd, "hellothere" + i, 1) != 1)
break;
}
close(fd);
freeaddrinfo(addrinfo);
free(d);
exit(i);
}
ok1(io_loop() == d);
ok1(d->state == 3);
ok1(d->timed_out == true);
ok1(wait(&status));
ok1(WIFEXITED(status));
ok1(WEXITSTATUS(status) < sizeof(d->buf));
/* This one shouldn't time out. */
d->state = 0;
d->timed_out = false;
d->timeout_usec = 500000;
fflush(stdout);
if (!fork()) {
int i;
io_close_listener(l);
fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
addrinfo->ai_protocol);
if (fd < 0)
exit(1);
if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
exit(2);
signal(SIGPIPE, SIG_IGN);
usleep(100000);
for (i = 0; i < strlen("hellothere"); i++) {
if (write(fd, "hellothere" + i, 1) != 1)
break;
}
close(fd);
freeaddrinfo(addrinfo);
free(d);
exit(i);
}
ok1(io_loop() == d);
ok1(d->state == 3);
ok1(d->timed_out == false);
ok1(wait(&status));
ok1(WIFEXITED(status));
ok1(WEXITSTATUS(status) >= sizeof(d->buf));
io_close_listener(l);
freeaddrinfo(addrinfo);
/* We should have tested each one at least once! */
ok1(realloc_count);
ok1(alloc_count);
ok1(free_count);
ok1(free_count < alloc_count);
freefn(d);
ok1(free_count == alloc_count);
return exit_status();
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment