time.cpp 14.6 KB
Newer Older
Kirill Smelkov's avatar
Kirill Smelkov committed
1
// Copyright (C) 2019-2024  Nexedi SA and Contributors.
2
//                          Kirill Smelkov <kirr@nexedi.com>
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.

// Package time mirrors Go package time.
// See time.h for package overview.

#include "golang/time.h"
Kirill Smelkov's avatar
Kirill Smelkov committed
24
#include "timer-wheel.h"
25

Kirill Smelkov's avatar
Kirill Smelkov committed
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41

#define DEBUG 0
#if DEBUG
#  define debugf(format, ...) fprintf(stderr, format, ##__VA_ARGS__)
#else
#  define debugf(format, ...) do {} while (0)
#endif


// golang::sync:: (private imports)
namespace golang {
namespace sync {

bool _semaacquire_timed(_sema *sema, uint64_t timeout_ns);

}}  // golang::sync::
42

43 44 45 46 47

// golang::time:: (except sleep and now)
namespace golang {
namespace time {

48 49
// ---- timers ----

50 51
Ticker new_ticker(double dt);
Timer  new_timer (double dt);
52
Timer  _new_timer(double dt, func<void()>);
53 54 55 56


chan<double> tick(double dt) {
    if (dt <= 0)
57
        return nil;
58 59 60 61 62 63 64
    return new_ticker(dt)->c;
}

chan<double> after(double dt) {
    return new_timer(dt)->c;
}

65
Timer after_func(double dt, func<void()> f) {
66 67 68
    return _new_timer(dt, f);
}

Kirill Smelkov's avatar
Kirill Smelkov committed
69 70 71 72 73
Timer new_timer(double dt) {
    return _new_timer(dt, nil);
}


Kirill Smelkov's avatar
Kirill Smelkov committed
74
// Ticker (small wrapper around Timer)
75 76 77
_Ticker::_Ticker()  {}
_Ticker::~_Ticker() {}
void _Ticker::decref() {
78 79 80 81
    if (__decref())
        delete this;
}

82
Ticker new_ticker(double dt) {
83 84 85
    if (dt <= 0)
        panic("ticker: dt <= 0");

86
    Ticker tx = adoptref(new _Ticker());
87 88 89
    tx->c     = makechan<double>(1); // 1-buffer -- same as in Go
    tx->_dt   = dt;
    tx->_stop = false;
Kirill Smelkov's avatar
Kirill Smelkov committed
90
    tx->_timer = after_func(dt, [tx]() { tx ->_tick(); });
91 92 93
    return tx;
}

94 95
void _Ticker::stop() {
    _Ticker &tx = *this;
96 97 98

    tx._mu.lock();
    tx._stop = true;
Kirill Smelkov's avatar
Kirill Smelkov committed
99 100 101 102
    if (tx._timer != nil) {
        tx._timer->stop();
        tx._timer = nil;  // break Ticker -> Timer -> _tick -> Ticker cycle
    }
103 104 105 106 107 108 109

    // drain what _tick could have been queued already
    while (tx.c.len() > 0)
        tx.c.recv();
    tx._mu.unlock();
}

110 111
void _Ticker::_tick() {
    _Ticker &tx = *this;
112

Kirill Smelkov's avatar
Kirill Smelkov committed
113 114
    tx._mu.lock();
    if (tx._stop) {
115
        tx._mu.unlock();
Kirill Smelkov's avatar
Kirill Smelkov committed
116
        return;
117
    }
Kirill Smelkov's avatar
Kirill Smelkov committed
118 119 120 121 122 123 124 125 126 127 128 129

    // XXX adjust for accumulated error δ?
    tx._timer->reset(tx._dt);

    // send from under ._mu so that .stop can be sure there is no
    // ongoing send while it drains the channel.
    double t = now();
    select({
        _default,
        tx.c.sends(&t),
    });
    tx._mu.unlock();
130 131 132
}


Kirill Smelkov's avatar
Kirill Smelkov committed
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
// Timers
//
// Timers are implemented via Timer Wheel.
// For this time arrow is divided into equal periods named ticks, and Ratas
// library[1] is used to manage timers with granularity of ticks. We employ
// ticks to avoid unnecessary overhead of managing timeout-style timers with
// nanosecond precision.
//
// Let g denote tick granularity.
//
// The timers are provided with guaranty that their expiration happens after
// requested expiration time. In other words the following invariant is always true:
//
//      t(exp) ≤ t(fire)
//
// we also want that firing _ideally_ happens not much far away from requested
// expiration time, meaning that the following property is aimed for, but not guaranteed:
//
//               t(fire) < t(exp) + g
//
// a tick Ti is associated with [i-1,i)·g time range. It is said that tick Ti
// "happens" at i·g point in time. Firing of timers associated with tick Ti is
// done when Ti happens - ideally at i·g time or strictly speaking ≥ that point.
//
// When timers are armed their expiration tick is set as Texp = ⌊t(exp)/g+1⌋ to
// be in time range that tick Texp covers.
//
//
// A special goroutine, _timer_loop, is dedicated to advance time of the
// timer-wheel as ticks happen, and to run expired timers. When there is
// nothing to do that goroutine pauses itself and goes to sleep until either
// next expiration moment, or until new timer with earlier expiration time is
// armed. To be able to simultaneously select on those two condition a
// semaphore with acquisition timeout is employed. Please see _tSema for
// details.
//
//
// [1] Ratas - A hierarchical timer wheel.
//     https://www.snellman.net/blog/archive/2016-07-27-ratas-hierarchical-timer-wheel,
//     https://github.com/jsnell/ratas

// Tns indicates time measured in nanoseconds.
// It is used for documentation purposes mainly to distinguish from the time measured in ticks.
typedef uint64_t Tns;

// _tick_g is ticks granularity in nanoseconds.
static const Tns _tick_g = 1024;   // 1 tick is ~ 1 μs


// timer-wheel holds registry of all timers and manages them.
static sync::Mutex* _tWheelMu;  // lock for timer wheel + sleep/wakeup channel (see _tSema & co below)
static TimerWheel*  _tWheel;    // for each timer the wheel holds 1 reference to _TimerImpl object

// _TimerImpl amends _Timer with timer-wheel entry and implementation-specific state.
enum _TimerState {
    _TimerDisarmed, // timer is not registered to timer wheel and is not firing
    _TimerArmed,    // timer is     registered to timer wheel and is not firing
    _TimerFiring    // timer is currently firing  (and not on the timer wheel)
};
struct _TimerImpl : _Timer {
    void _fire();
    void _queue_fire();
    MemberTimerEvent<_TimerImpl, &_TimerImpl::_queue_fire>  _tWheelEntry;

    func<void()> _f;

    sync::Mutex _mu;
    _TimerState _state;

    // entry on "firing" list; see _tFiring for details
    _TimerImpl* _tFiringNext;   // TODO could reuse _tWheelEntry.{next_,prev_} for "firing" list

    _TimerImpl();
    ~_TimerImpl();
};

_TimerImpl::_TimerImpl() : _tWheelEntry(this) {}
_TimerImpl::~_TimerImpl() {}

212 213 214
_Timer::_Timer()  {}
_Timer::~_Timer() {}
void _Timer::decref() {
215
    if (__decref())
Kirill Smelkov's avatar
Kirill Smelkov committed
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328
        delete static_cast<_TimerImpl*>(this);
}


// _tSema and _tSleeping + _tWaking organize sleep/wakeup channel.
//
// Timer loop uses wakeup sema to both:
//   * sleep until next timer expires, and
//   * become woken up earlier if new timer with earlier expiration time is armed
//
// _tSleeping + _tWaking are used by the timer loop and clients to coordinate
// _tSema operations, so that the value of sema is always 0 or 1, and that
// every new loop cycle starts with sema=0, meaning that sema.Acquire will block.
//
// Besides same.Acquire, all operations on the sleep/wakeup channel are done under _tWheelMu.
static sync::_sema* _tSema;
static bool         _tSleeping; // 1 iff timer loop:
                                //   \/ decided to go to sleep on wakeup sema
                                //   \/ sleeps on wakeup sema via Acquire
                                //   \/ woken up after Acquire before setting _tSleeping=0 back
static bool         _tWaking;   // 1 iff client timer arm:
                                //   /\ saw _tSleeping=1 && _tWaking=0 and decided to do wakeup
                                //   /\ (did Release \/ will do Release)
                                //   /\ until timer loop set back _tWaking=0
static Tns          _tSleeping_until; // until when timer loop is sleeping if _tSleeping=1


// _timer_loop implements timer loop: it runs in dedicated goroutine ticking the
// timer-wheel and sleeping in between ticks.
static void _timer_loop();
static void _timer_loop_fire_queued();
void _init() {
    _tWheelMu  = new sync::Mutex();
    _tWheel    = new TimerWheel(_nanotime() / _tick_g);
    _tSema     = sync::_makesema();  sync::_semaacquire(_tSema); // 1 -> 0
    _tSleeping = false;
    _tWaking   = false;
    _tSleeping_until = 0;
    go(_timer_loop);
}

static void _timer_loop() {
    while (1) {
        // tick the wheel. This puts expired timers on firing list but delays
        // really firing them until we release _tWheelMu.
        _tWheelMu->lock();
        Tick now_t  = _nanotime() / _tick_g;
        Tick wnow_t = _tWheel->now();
        Tick wdt_t  = now_t - wnow_t;
        debugf("LOOP: now_t: %lu  wnow_t: %lu  δ_t %lu ...\n", now_t, wnow_t, wdt_t);
        if (now_t > wnow_t)          // advance(0) panics. Avoid that if we wake up earlier
            _tWheel->advance(wdt_t); // inside the same tick, e.g. due to signal.
        _tWheelMu->unlock();

        // fire the timers queued on the firing list
        _timer_loop_fire_queued();


        // go to sleep until next timer expires or wakeup comes from new arming.
        //
        // limit max sleeping time because contrary to other wheel operations -
        // - e.g. insert and delete which are O(1), the complexity of
        // ticks_to_next_event is O(time till next expiry).
        Tns tsleep_max = 1*1E9; // 1s
        bool sleeping = false;

        _tWheelMu->lock();
        Tick wsleep_t = _tWheel->ticks_to_next_event(tsleep_max / _tick_g);
        Tick wnext_t  = _tWheel->now() + wsleep_t;

        Tns tnext = wnext_t * _tick_g;
        Tns tnow  = _nanotime();

        if (tnext > tnow) {
            _tSleeping = sleeping = true;
            _tSleeping_until = tnext;
        }
        _tWheelMu->unlock();

        if (!sleeping)
            continue;

        Tns tsleep = tnext - tnow;
        debugf("LOOP: sleeping %.3f μs ...\n", tsleep / 1e3);

        bool acq = sync::_semaacquire_timed(_tSema, tsleep);

        // bring sleep/wakeup channel back into reset state with S=0
        _tWheelMu->lock();
        //  acq ^  waking   Release was done while Acquire was blocked                       S=0
        //  acq ^ !waking   impossible
        // !acq ^  waking   Acquire finished due to timeout;    Release was done after that  S=1
        // !acq ^ !waking   Acquire finished due to timeout; no Release was done at all      S=0

        debugf("LOOP: woken up  acq=%d  waking=%d\n", acq, _tWaking);

        if ( acq && !_tWaking) {
            _tWheelMu->unlock();
            panic("BUG: timer loop: woken up with acq ^ !waking");
        }
        if (!acq &&  _tWaking) {
            acq = sync::_semaacquire_timed(_tSema, 0); // S=1 -> acquire should be immediate
            if (!acq) {
                _tWheelMu->unlock();
                panic("BUG: timer loop: reacquire after acq ^ waking failed");
            }
        }

        _tSleeping = false;
        _tWaking   = false;
        _tSleeping_until = 0;
        _tWheelMu->unlock();
    }
329 330
}

331
Timer _new_timer(double dt, func<void()> f) {
Kirill Smelkov's avatar
Kirill Smelkov committed
332 333 334 335 336 337 338 339
    _TimerImpl* _t = new _TimerImpl();

    _t->c    = (f == nil ? makechan<double>(1) : nil);
    _t->_f   = f;
    _t->_state = _TimerDisarmed;
    _t->_tFiringNext = nil;

    Timer t = adoptref(static_cast<_Timer*>(_t));
340 341 342 343
    t->reset(dt);
    return t;
}

Kirill Smelkov's avatar
Kirill Smelkov committed
344
void _Timer::reset(double dt) {
Kirill Smelkov's avatar
Kirill Smelkov committed
345 346 347 348
    _TimerImpl& t = *static_cast<_TimerImpl*>(this);

    if (dt <= 0)
        dt = 0;
Kirill Smelkov's avatar
Kirill Smelkov committed
349

Kirill Smelkov's avatar
Kirill Smelkov committed
350 351 352 353
    Tns  when   = _nanotime() + Tns(dt*1e9);
    Tick when_t = when / _tick_g + 1;  // Ti covers [i-1,i)·g

    _tWheelMu->lock();
Kirill Smelkov's avatar
Kirill Smelkov committed
354
    t._mu.lock();
Kirill Smelkov's avatar
Kirill Smelkov committed
355
    if (t._state != _TimerDisarmed) {
Kirill Smelkov's avatar
Kirill Smelkov committed
356
        t._mu.unlock();
Kirill Smelkov's avatar
Kirill Smelkov committed
357
        _tWheelMu->unlock();
Kirill Smelkov's avatar
Kirill Smelkov committed
358 359
        panic("Timer.reset: the timer is armed; must be stopped or expired");
    }
Kirill Smelkov's avatar
Kirill Smelkov committed
360 361 362 363 364 365 366 367 368 369 370 371 372
    t._state = _TimerArmed;

    Tick wnow_t = _tWheel->now();
    Tick wdt_t;
    if (when_t > wnow_t)
        wdt_t = when_t - wnow_t;
    else
        wdt_t = 1; // schedule(0) panics

    // the wheel will keep a reference to the timer
    t.incref();

    _tWheel->schedule(&t._tWheelEntry, wdt_t);
Kirill Smelkov's avatar
Kirill Smelkov committed
373
    t._mu.unlock();
Kirill Smelkov's avatar
Kirill Smelkov committed
374 375 376 377 378 379 380 381 382 383 384

    // wakeup timer loop if it is sleeping until later than new timer expiry
    if (_tSleeping) {
        if ((when < _tSleeping_until) && !_tWaking) {
            debugf("USER: waking up loop\n");
            _tWaking = true;
            sync::_semarelease(_tSema);
        }
    }

    _tWheelMu->unlock();
385 386
}

387
bool _Timer::stop() {
Kirill Smelkov's avatar
Kirill Smelkov committed
388
    _TimerImpl& t = *static_cast<_TimerImpl*>(this);
389 390
    bool canceled;

Kirill Smelkov's avatar
Kirill Smelkov committed
391
    _tWheelMu->lock();
392 393
    t._mu.lock();

Kirill Smelkov's avatar
Kirill Smelkov committed
394 395
    switch (t._state) {
    case _TimerDisarmed:
396
        canceled = false;
Kirill Smelkov's avatar
Kirill Smelkov committed
397 398 399 400 401 402 403 404 405 406 407 408 409
        break;

    case _TimerArmed:
        // timer wheel is holding this timer entry. Remove it from there.
        t._tWheelEntry.cancel();
        t.decref();
        canceled = true;
        break;

    case _TimerFiring:
        // the timer is on "firing" list. Timer loop will process it and skip
        // upon seeing ._state = _TimerDisarmed. It will also be the timer loop
        // to drop the reference to the timer that timer-wheel was holding.
410
        canceled = true;
Kirill Smelkov's avatar
Kirill Smelkov committed
411 412 413 414 415
        break;

    default:
        panic("invalid timer state");

416 417
    }

Kirill Smelkov's avatar
Kirill Smelkov committed
418 419 420
    if (canceled)
        t._state = _TimerDisarmed;

421 422 423 424 425
    // drain what _fire could have been queued already
    while (t.c.len() > 0)
        t.c.recv();

    t._mu.unlock();
Kirill Smelkov's avatar
Kirill Smelkov committed
426 427
    _tWheelMu->unlock();

428 429 430
    return canceled;
}

Kirill Smelkov's avatar
Kirill Smelkov committed
431 432 433 434 435 436 437
// when timers are fired by _tWheel.advance(), they are first popped from _tWheel and put on
// _tFiring list, so that the real firing could be done without holding _tWheelMu.
static _TimerImpl* _tFiring     = nil;
static _TimerImpl* _tFiringLast = nil;

void _TimerImpl::_queue_fire() {
    _TimerImpl& t = *this;
438 439

    t._mu.lock();
Kirill Smelkov's avatar
Kirill Smelkov committed
440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459
    assert(t._state == _TimerArmed);
    t._state = _TimerFiring;
    t._mu.unlock();

    t._tFiringNext = nil;
    if (_tFiring == nil)
        _tFiring = &t;
    if (_tFiringLast != nil)
        _tFiringLast->_tFiringNext = &t;
    _tFiringLast = &t;
}

static void _timer_loop_fire_queued() {
    for (_TimerImpl* t = _tFiring; t != nil;) {
        _TimerImpl* fnext = t->_tFiringNext;
        t->_tFiringNext = nil;
        t->_fire();

        t->decref(); // wheel was holding a reference to the timer
        t = fnext;
460
    }
Kirill Smelkov's avatar
Kirill Smelkov committed
461 462 463
    _tFiring     = nil;
    _tFiringLast = nil;
}
464

Kirill Smelkov's avatar
Kirill Smelkov committed
465 466 467 468 469 470 471 472 473 474 475 476 477 478 479
void _TimerImpl::_fire() {
    _TimerImpl& t = *this;

    bool fire = false;
    t._mu.lock();
    if (t._state == _TimerFiring) {  // stop could disarm the timer in the meantime
        t._state = _TimerDisarmed;
        fire = true;

        debugf("LOOP: firing @ %lu ...\n", t._tWheelEntry.scheduled_at());

        // send under ._mu so that .stop can be sure that if it sees
        // ._state = _TimerDisarmed, there is no ongoing .c send.
        if (t._f == nil)
            t.c.send(now());
480 481 482 483
    }
    t._mu.unlock();

    // call ._f not from under ._mu not to deadlock e.g. if ._f wants to reset the timer.
Kirill Smelkov's avatar
Kirill Smelkov committed
484 485
    if (fire && t._f != nil)
        t._f();
486 487
}

488
}}  // golang::time::