Commit 949d9c0c authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent e04b23bb
...@@ -450,137 +450,6 @@ error _Conn::__pin1(PinReq *req) { ...@@ -450,137 +450,6 @@ error _Conn::__pin1(PinReq *req) {
return nil; return nil;
} }
// open opens FileH corresponding to ZBigFile foid.
pair<FileH, error> _Conn::open(zodb::Oid foid) {
_Conn& wconn = *this;
error err;
wconn._atMu.RLock();
defer([&]() {
wconn._atMu.RUnlock();
});
xerr::Contextf E("%s: open f<%s>", v(wconn), v(foid));
wconn._mu.Lock();
if (wconn._downErr != nil) {
wconn._mu.Unlock();
return make_pair(nil, E(wconn._downErr));
}
FileH f; bool ok;
tie(f, ok) = wconn._filehTab.get_(foid);
if (ok) {
wconn._mu.Unlock();
f->_openReady.recv();
if (f->_openErr != nil)
return make_pair(nil, E(f->_openErr));
// XXX incref open count
return make_pair(f, nil);
}
// create in-flight-opening FileH entry and perform open with wconn._mu released
// NOTE wconn._atMu is still held because open relies on wconn.at being stable.
f = adoptref(new _FileH());
f->wconn = newref(&wconn);
f->foid = foid;
f->_openReady = makechan<structZ>();
f->_openErr = nil;
bool retok = false;
wconn._filehTab[foid] = f;
defer([&]() {
if (!retok) {
wconn._mu.Lock();
// XXX assert filehTab[foid] == f
wconn._filehTab.erase(foid);
wconn._mu.Unlock();
}
f->_openReady.close();
});
wconn._mu.Unlock();
f->_openErr = f->_open();
if (f->_openErr != nil)
return make_pair(nil, E(f->_openErr));
retok = true;
return make_pair(f, nil);
}
// _open performs actual open of FileH marked as "in-flight-open" in wconn.filehTab.
//
// Called with:
// - wconn.atMu held
// - wconn.mu not locked
// - f.mu not locked
error _FileH::_open() {
_FileH* f = this;
Conn wconn = f->wconn;
error err;
tie(f->_headf, err)
= wconn->_wc->_open(fmt::sprintf("head/bigfile/%s", v(foid)));
if (err != nil)
return err;
bool retok = false;
defer([&]() {
if (!retok)
f->_headf->close();
});
struct stat st;
err = f->_headf->stat(&st);
if (err != nil)
return err;
f->blksize = st.st_blksize;
f->_headfsize = st.st_size; // FIXME getting headfsize _before_ waiting for wcfs/head/at ≥ wconn.at
if (!(f->_headfsize % f->blksize == 0))
return fmt::errorf("wcfs bug: %s size (%d) %% blksize (%d) != 0",
v(f->_headf->name()), f->_headfsize, f->blksize);
// start watching f
// NOTE we are _not_ holding wconn.mu nor f.mu - only wconn.atMu to rely on wconn.at being stable.
// NOTE wcfs will reply "ok" only after wcfs/head/at ≥ wconn.at
string ack;
tie(ack, err) = wconn->_wlink->sendReq(context::background(), fmt::sprintf("watch %s @%s", v(foid), v(wconn->at)));
if (err != nil)
return err;
if (ack != "ok") {
return fmt::errorf("watch: %s", v(ack));
}
retok = true;
return nil;
}
// close releases resources associated with FileH.
//
// Left fileh mappings become invalid to use except unmap.
error _FileH::close() {
_FileH& fileh = *this;
Conn wconn = fileh.wconn;
xerr::Contextf E("%s: close f<%s>", v(wconn), v(fileh.foid));
// XXX change all fileh.mmaps to cause EFAULT on any access after fileh.close
// XXX "watch foid -" -> wconn.wlink (stop watching the file)
// remove fileh from wconn._filehTab
// fileh.close can be called several times and after first call another
// fileh could be opened for the same foid. Be careful not to erase it.
wconn->_mu.Lock();
// XXX decref open count
if (wconn->_filehTab.get(fileh.foid)._ptr() == &fileh)
wconn->_filehTab.erase(fileh.foid);
wconn->_mu.Unlock();
return E(fileh._headf->close());
}
// resync resyncs connection and its file mappings onto different database view. // resync resyncs connection and its file mappings onto different database view.
// //
// bigfile/_file_zob.pyx arranges to call Conn.resync at transaction boundaries // bigfile/_file_zob.pyx arranges to call Conn.resync at transaction boundaries
...@@ -717,6 +586,137 @@ error _Conn::resync(zodb::Tid at) { ...@@ -717,6 +586,137 @@ error _Conn::resync(zodb::Tid at) {
return nil; return nil;
} }
// open opens FileH corresponding to ZBigFile foid.
pair<FileH, error> _Conn::open(zodb::Oid foid) {
_Conn& wconn = *this;
error err;
wconn._atMu.RLock();
defer([&]() {
wconn._atMu.RUnlock();
});
xerr::Contextf E("%s: open f<%s>", v(wconn), v(foid));
wconn._mu.Lock();
if (wconn._downErr != nil) {
wconn._mu.Unlock();
return make_pair(nil, E(wconn._downErr));
}
FileH f; bool ok;
tie(f, ok) = wconn._filehTab.get_(foid);
if (ok) {
wconn._mu.Unlock();
f->_openReady.recv();
if (f->_openErr != nil)
return make_pair(nil, E(f->_openErr));
// XXX incref open count
return make_pair(f, nil);
}
// create in-flight-opening FileH entry and perform open with wconn._mu released
// NOTE wconn._atMu is still held because open relies on wconn.at being stable.
f = adoptref(new _FileH());
f->wconn = newref(&wconn);
f->foid = foid;
f->_openReady = makechan<structZ>();
f->_openErr = nil;
bool retok = false;
wconn._filehTab[foid] = f;
defer([&]() {
if (!retok) {
wconn._mu.Lock();
// XXX assert filehTab[foid] == f
wconn._filehTab.erase(foid);
wconn._mu.Unlock();
}
f->_openReady.close();
});
wconn._mu.Unlock();
f->_openErr = f->_open();
if (f->_openErr != nil)
return make_pair(nil, E(f->_openErr));
retok = true;
return make_pair(f, nil);
}
// _open performs actual open of FileH marked as "in-flight-open" in wconn.filehTab.
//
// Called with:
// - wconn.atMu held
// - wconn.mu not locked
// - f.mu not locked
error _FileH::_open() {
_FileH* f = this;
Conn wconn = f->wconn;
error err;
tie(f->_headf, err)
= wconn->_wc->_open(fmt::sprintf("head/bigfile/%s", v(foid)));
if (err != nil)
return err;
bool retok = false;
defer([&]() {
if (!retok)
f->_headf->close();
});
struct stat st;
err = f->_headf->stat(&st);
if (err != nil)
return err;
f->blksize = st.st_blksize;
f->_headfsize = st.st_size; // FIXME getting headfsize _before_ waiting for wcfs/head/at ≥ wconn.at
if (!(f->_headfsize % f->blksize == 0))
return fmt::errorf("wcfs bug: %s size (%d) %% blksize (%d) != 0",
v(f->_headf->name()), f->_headfsize, f->blksize);
// start watching f
// NOTE we are _not_ holding wconn.mu nor f.mu - only wconn.atMu to rely on wconn.at being stable.
// NOTE wcfs will reply "ok" only after wcfs/head/at ≥ wconn.at
string ack;
tie(ack, err) = wconn->_wlink->sendReq(context::background(), fmt::sprintf("watch %s @%s", v(foid), v(wconn->at)));
if (err != nil)
return err;
if (ack != "ok") {
return fmt::errorf("watch: %s", v(ack));
}
retok = true;
return nil;
}
// close releases resources associated with FileH.
//
// Left fileh mappings become invalid to use except unmap.
error _FileH::close() {
_FileH& fileh = *this;
Conn wconn = fileh.wconn;
xerr::Contextf E("%s: close f<%s>", v(wconn), v(fileh.foid));
// XXX change all fileh.mmaps to cause EFAULT on any access after fileh.close
// XXX "watch foid -" -> wconn.wlink (stop watching the file)
// remove fileh from wconn._filehTab
// fileh.close can be called several times and after first call another
// fileh could be opened for the same foid. Be careful not to erase it.
wconn->_mu.Lock();
// XXX decref open count
if (wconn->_filehTab.get(fileh.foid)._ptr() == &fileh)
wconn->_filehTab.erase(fileh.foid);
wconn->_mu.Unlock();
return E(fileh._headf->close());
}
// mmap creates file mapping representing file[blk_start +blk_len) data as of wconn.at database state. // mmap creates file mapping representing file[blk_start +blk_len) data as of wconn.at database state.
// //
// If vma != nil, created mapping is associated with that vma of user-space virtual memory manager: // If vma != nil, created mapping is associated with that vma of user-space virtual memory manager:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment