Commit 40deb7ed authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent ad39a4c6
...@@ -23,25 +23,20 @@ ...@@ -23,25 +23,20 @@
# XXX doc # XXX doc
from golang cimport error
from golang cimport context
from libcpp cimport nullptr_t, nullptr as nil from libcpp cimport nullptr_t, nullptr as nil
from libcpp.string cimport string from libcpp.string cimport string
from libcpp.utility cimport pair from libcpp.utility cimport pair
cdef extern from *: cdef extern from *:
ctypedef bint cbool "bool" ctypedef bint cbool "bool"
cdef extern from "wcfs_misc.h" nogil:
cppclass error:
string Error() const
cbool operator==(nullptr_t) const
cbool operator!=(nullptr_t) const
cbool operator==(const error&) const
cbool operator!=(const error&) const
cdef extern from "wcfs_watchlink.h" nogil: cdef extern from "wcfs_watchlink.h" nogil:
cppclass WatchLink: cppclass WatchLink:
error close() error close()
#error recvReq(ctx, PinReq *rx_into) #error recvReq(ctx, PinReq *rx_into)
pair[string, error] sendReq(IContext *ctx, const string &req) pair[string, error] sendReq(context.Context ctx, const string &req)
ctypedef WatchLink *pWatchLink # https://github.com/cython/cython/issues/534 ctypedef WatchLink *pWatchLink # https://github.com/cython/cython/issues/534
...@@ -84,7 +79,7 @@ cdef class PyWatchLink: ...@@ -84,7 +79,7 @@ cdef class PyWatchLink:
# XXX recvReq # XXX recvReq
def sendReq(PyWatchLink pywlink, PyContext pyctx, string req): # -> reply(string) def sendReq(PyWatchLink pywlink, context.PyContext pyctx, string req): # -> reply(string)
with nogil: with nogil:
_ = wlink_sendReq_pyexc(pywlink.wlink, pyctx.ctx, req) _ = wlink_sendReq_pyexc(pywlink.wlink, pyctx.ctx, req)
reply = _.first reply = _.first
...@@ -101,14 +96,6 @@ cdef class PyPinReq: ...@@ -101,14 +96,6 @@ cdef class PyPinReq:
pass pass
cdef extern from "wcfs_misc.h" nogil:
cppclass IContext "context::Context":
pass
cdef class PyContext:
cdef IContext *ctx
# ---- misc ---- # ---- misc ----
...@@ -122,5 +109,5 @@ cdef nogil: ...@@ -122,5 +109,5 @@ cdef nogil:
error wlink_close_pyexc(WatchLink *wlink) except +topyexc: error wlink_close_pyexc(WatchLink *wlink) except +topyexc:
return wlink.close() return wlink.close()
pair[string, error] wlink_sendReq_pyexc(WatchLink *wlink, IContext *ctx, const string &req) except +topyexc: pair[string, error] wlink_sendReq_pyexc(WatchLink *wlink, context.Context ctx, const string &req) except +topyexc:
return wlink.sendReq(ctx, req) return wlink.sendReq(ctx, req)
...@@ -64,15 +64,23 @@ static error _pathError(const char *op, const string &path, int syserr); ...@@ -64,15 +64,23 @@ static error _pathError(const char *op, const string &path, int syserr);
int _File::fd() const { return _fd; } int _File::fd() const { return _fd; }
string _File::name() const { return _path; } string _File::name() const { return _path; }
_File::_File() {}
_File::~_File() {}
void _File::decref() {
if (__decref())
delete this;
}
tuple<File, error> open(const string &path, int flags, mode_t mode) { tuple<File, error> open(const string &path, int flags, mode_t mode) {
File f; int fd = ::open(path.c_str(), flags, mode);
f._path = path; if (fd == -1)
f._fd = ::open(path.c_str(), flags, mode); return make_tuple(nil, _pathError("open", path, errno)); // XXX -> wrapper?
error err;
if (f._fd == -1) File f = adoptref(new _File);
err = f._errno("open"); f->_path = path;
return make_tuple(f, err); f->_fd = fd;
return make_tuple(f, nil);
} }
error _File::close() { error _File::close() {
...@@ -198,7 +206,7 @@ string sprintf(const char *format, ...) { ...@@ -198,7 +206,7 @@ string sprintf(const char *format, ...) {
error errorf(const string &format, ...) { error errorf(const string &format, ...) {
va_list argp; va_list argp;
va_start(argp, format); va_start(argp, format);
error err = error::New(fmt::sprintf(format.c_str(), argp)); error err = errors::New(fmt::sprintf(format.c_str(), argp));
va_end(argp); va_end(argp);
return err; return err;
} }
...@@ -206,7 +214,7 @@ error errorf(const string &format, ...) { ...@@ -206,7 +214,7 @@ error errorf(const string &format, ...) {
error errorf(const char *format, ...) { error errorf(const char *format, ...) {
va_list argp; va_list argp;
va_start(argp, format); va_start(argp, format);
error err = error::New(fmt::sprintf(format, argp)); error err = errors::New(fmt::sprintf(format, argp));
va_end(argp); va_end(argp);
return err; return err;
} }
......
...@@ -106,13 +106,19 @@ namespace os { ...@@ -106,13 +106,19 @@ namespace os {
// its operations return error with full file context. // its operations return error with full file context.
class _File; class _File;
typedef refptr<_File> File; typedef refptr<_File> File;
class _File { class _File : public object {
int _fd; int _fd;
string _path; string _path;
public: // don't new - create via open
private:
_File();
~_File();
friend tuple<File, error> open(const string &path, int flags, mode_t mode); friend tuple<File, error> open(const string &path, int flags, mode_t mode);
// XXX empty ctor -> fd=-1, path=? public:
void decref();
public:
int fd() const; int fd() const;
string name() const; string name() const;
error close(); error close();
......
...@@ -71,7 +71,7 @@ public: ...@@ -71,7 +71,7 @@ public:
error resync(zodb::Tid at); error resync(zodb::Tid at);
private: private:
void _pinner(context::Context *ctx); void _pinner(context::Context ctx);
void _pin1(PinReq *req); void _pin1(PinReq *req);
}; };
...@@ -161,7 +161,7 @@ error Conn::close() { // XXX error -> void? ...@@ -161,7 +161,7 @@ error Conn::close() { // XXX error -> void?
for (auto _ : wconn._filetab) { for (auto _ : wconn._filetab) {
auto f = _.second; auto f = _.second;
f->headf.close(); // XXX err f->headf->close(); // XXX err
//f->headf = None //f->headf = None
// XXX stop watching f // XXX stop watching f
...@@ -172,7 +172,7 @@ error Conn::close() { // XXX error -> void? ...@@ -172,7 +172,7 @@ error Conn::close() { // XXX error -> void?
} }
// _pinner receives pin messages from wcfs and adjusts wconn mappings. // _pinner receives pin messages from wcfs and adjusts wconn mappings.
void Conn::_pinner(context::Context *ctx) { void Conn::_pinner(context::Context ctx) {
Conn &wconn = *this; Conn &wconn = *this;
// XXX panic/exc -> log.CRITICAL // XXX panic/exc -> log.CRITICAL
...@@ -260,7 +260,7 @@ error Conn::resync(zodb::Tid at) { ...@@ -260,7 +260,7 @@ error Conn::resync(zodb::Tid at) {
// update f.headfsize and remmap to head/f zero regions that are now covered by head/f // update f.headfsize and remmap to head/f zero regions that are now covered by head/f
struct stat st; struct stat st;
auto err = f.headf.stat(&st); auto err = f.headf->stat(&st);
if (err != nil) if (err != nil)
return err; return err;
...@@ -333,11 +333,11 @@ error _Mapping::_remmapblk(int64_t blk, zodb::Tid at) { ...@@ -333,11 +333,11 @@ error _Mapping::_remmapblk(int64_t blk, zodb::Tid at) {
} }
defer([&]() { defer([&]() {
if (fclose) if (fclose)
fsfile.close(); fsfile->close();
}); });
struct stat st; struct stat st;
err = fsfile.stat(&st); err = fsfile->stat(&st);
if (err != nil) if (err != nil)
return err; return err;
if ((size_t)st.st_blksize != f->blksize) if ((size_t)st.st_blksize != f->blksize)
...@@ -406,7 +406,7 @@ static error _mmap_zero_into_ro(void *addr, size_t size) { ...@@ -406,7 +406,7 @@ static error _mmap_zero_into_ro(void *addr, size_t size) {
if (err != nil) if (err != nil)
return err; return err;
defer([&]() { defer([&]() {
z.close(); z->close();
}); });
err = mm::map_into(addr, size, PROT_READ, MAP_SHARED | MAP_NORESERVE, z, 0); err = mm::map_into(addr, size, PROT_READ, MAP_SHARED | MAP_NORESERVE, z, 0);
if (err != nil) if (err != nil)
......
...@@ -89,7 +89,7 @@ error WatchLink::close() { ...@@ -89,7 +89,7 @@ error WatchLink::close() {
error err = nil; error err = nil;
#endif #endif
error err2 = wlink._f.close(); error err2 = wlink._f->close();
if (err == nil) if (err == nil)
err = err2; err = err2;
...@@ -97,7 +97,7 @@ error WatchLink::close() { ...@@ -97,7 +97,7 @@ error WatchLink::close() {
} }
// _serveRX receives messages from ._f and dispatches them according to streamID. // _serveRX receives messages from ._f and dispatches them according to streamID.
error WatchLink::_serveRX(context::Context *ctx) { // XXX error -> where ? error WatchLink::_serveRX(context::Context ctx) { // XXX error -> where ?
WatchLink& wlink = *this; WatchLink& wlink = *this;
// when finishing - wakeup everyone waiting for rx // when finishing - wakeup everyone waiting for rx
...@@ -203,13 +203,13 @@ error WatchLink::_write(const string &pkt) { ...@@ -203,13 +203,13 @@ error WatchLink::_write(const string &pkt) {
//printf('C: watch : tx: %r' % pkt) //printf('C: watch : tx: %r' % pkt)
int n; int n;
error err; error err;
tie(n, err) = wlink->_f.write(pkt.c_str(), pkt.size()); tie(n, err) = wlink->_f->write(pkt.c_str(), pkt.size());
return err; return err;
} }
// sendReq sends client -> server request and returns server reply. // sendReq sends client -> server request and returns server reply.
// XXX -> reply | None when EOF // XXX -> reply | None when EOF
pair<string, error> WatchLink::sendReq(context::Context *ctx, const string &req) { pair<string, error> WatchLink::sendReq(context::Context ctx, const string &req) {
WatchLink *wlink = this; WatchLink *wlink = this;
// XXX err ctx // XXX err ctx
...@@ -231,7 +231,7 @@ pair<string, error> WatchLink::sendReq(context::Context *ctx, const string &req) ...@@ -231,7 +231,7 @@ pair<string, error> WatchLink::sendReq(context::Context *ctx, const string &req)
return make_pair(reply, nil); return make_pair(reply, nil);
} }
tuple</*rxq*/chan<rxPkt>, error> WatchLink::_sendReq(context::Context *ctx, const string &req) { tuple</*rxq*/chan<rxPkt>, error> WatchLink::_sendReq(context::Context ctx, const string &req) {
WatchLink *wlink = this; WatchLink *wlink = this;
// XXX err ctx? // XXX err ctx?
...@@ -269,7 +269,7 @@ tuple</*rxq*/chan<rxPkt>, error> WatchLink::_sendReq(context::Context *ctx, cons ...@@ -269,7 +269,7 @@ tuple</*rxq*/chan<rxPkt>, error> WatchLink::_sendReq(context::Context *ctx, cons
// recvReq receives client <- server request. // recvReq receives client <- server request.
static error _parsePinReq(PinReq *pin, const rxPkt *pkt); static error _parsePinReq(PinReq *pin, const rxPkt *pkt);
error WatchLink::recvReq(context::Context *ctx, PinReq *prx) { error WatchLink::recvReq(context::Context ctx, PinReq *prx) {
WatchLink& wlink = *this; WatchLink& wlink = *this;
// XXX err ctx? // XXX err ctx?
...@@ -345,7 +345,7 @@ tuple<string, error> WatchLink::_readline() { ...@@ -345,7 +345,7 @@ tuple<string, error> WatchLink::_readline() {
int n; int n;
error err; error err;
tie(n, err) = wlink._f.read(buf, sizeof(buf)); tie(n, err) = wlink._f->read(buf, sizeof(buf));
if (n > 0) { if (n > 0) {
// XXX limit line length to avoid DoS // XXX limit line length to avoid DoS
wlink._rxbuf += string(buf, n); wlink._rxbuf += string(buf, n);
......
...@@ -24,7 +24,12 @@ ...@@ -24,7 +24,12 @@
#define _NXD_WCFS_WATCHLINK_H_ #define _NXD_WCFS_WATCHLINK_H_
#include <golang/libgolang.h> #include <golang/libgolang.h>
#include <golang/context.h>
#include <golang/cxx.h>
#include <golang/sync.h>
using namespace golang; using namespace golang;
using cxx::dict;
using cxx::set;
#include "wcfs.h" #include "wcfs.h"
#include "wcfs_misc.h" #include "wcfs_misc.h"
...@@ -87,16 +92,16 @@ class WatchLink { ...@@ -87,16 +92,16 @@ class WatchLink {
public: public:
friend pair<WatchLink*, error> WCFS::_openwatch(); friend pair<WatchLink*, error> WCFS::_openwatch();
error close(); error close();
error recvReq(context::Context *ctx, PinReq *rx_into); error recvReq(context::Context ctx, PinReq *rx_into);
pair<string, error> sendReq(context::Context *ctx, const string &req); pair<string, error> sendReq(context::Context ctx, const string &req);
private: private:
void _closeTX(); void _closeTX();
error _serveRX(context::Context *ctx); error _serveRX(context::Context ctx);
tuple<string, error> _readline(); tuple<string, error> _readline();
error _send(StreamID stream, const string &msg); error _send(StreamID stream, const string &msg);
error _write(const string &pkt); error _write(const string &pkt);
tuple<chan<rxPkt>, error> _sendReq(context::Context *ctx, const string &req); tuple<chan<rxPkt>, error> _sendReq(context::Context ctx, const string &req);
}; };
// PinReq represents 1 server-initiated wcfs pin request received over /head/watch link. // PinReq represents 1 server-initiated wcfs pin request received over /head/watch link.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment