// Copyright (c) 2001, 2002 Zope Foundation and Contributors.
// All Rights Reserved.
//
// Copyright (C) 2018-2021  Nexedi SA and Contributors.
//                          Kirill Smelkov <kirr@nexedi.com>
//
// This software is subject to the provisions of the Zope Public License,
// Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
// THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
// WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
// FOR A PARTICULAR PURPOSE

package btree

// See https://github.com/zopefoundation/BTrees/blob/4.5.0-1-gc8bf24e/BTrees/Development.txt#L198
// for BTree & Bucket organization details.

import (
	"context"
	"fmt"
	"math"
	"reflect"
	"sort"

	pickle "github.com/kisielk/og-rek"
	"lab.nexedi.com/kirr/go123/xerr"
	"lab.nexedi.com/kirr/neo/go/zodb"
	"lab.nexedi.com/kirr/neo/go/zodb/internal/pickletools"
)

// Node represents a tree node - either BTree or Bucket.
type Node interface {
	zodb.IPersistent
	node()
}

func (t *BTree)  node() {}
func (b *Bucket) node() {}

// BTree is a non-leaf node of a B⁺ tree.
//
// It contains []Entry in ↑ key order.
//
// It mimics BTree from btree/py.
type BTree struct {
	zodb.Persistent

	// https://github.com/zopefoundation/BTrees/blob/4.5.0-1-gc8bf24e/BTrees/BTreeModuleTemplate.c#L205:
	//
	// firstbucket points to the bucket containing the smallest key in
	// the BTree.  This is found by traversing leftmost child pointers
	// (data[0].child) until reaching a Bucket.
	firstbucket *Bucket

	// https://github.com/zopefoundation/BTrees/blob/4.5.0-1-gc8bf24e/BTrees/BTreeModuleTemplate.c#L211:
	//
	// The BTree points to 'len' children, via the "child" fields of the data
	// array.  There are len-1 keys in the 'key' fields, stored in increasing
	// order.  data[0].key is unused.  For i in 0 .. len-1, all keys reachable
	// from data[i].child are >= data[i].key and < data[i+1].key, at the
	// endpoints pretending that data[0].key is -∞ and data[len].key is +∞.
	data []Entry
}

// Entry is one BTree node entry.
//
// It contains key and child, which is either BTree or Bucket.
//
// Key limits child's keys - see BTree.Entryv for details.
type Entry struct {
	key   KEY
	child Node // BTree or Bucket
}

// Bucket is a leaf node of a B⁺ tree.
//
// It contains []BucketEntry in ↑ key order.
//
// It mimics Bucket from btree/py.
type Bucket struct {
	zodb.Persistent

	// https://github.com/zopefoundation/BTrees/blob/4.5.0-1-gc8bf24e/BTrees/BTreeModuleTemplate.c#L179:
	//
	// A Bucket wraps contiguous vectors of keys and values.  Keys are unique,
	// and stored in sorted order.  The 'values' pointer may be NULL if the
	// Bucket is used to implement a set.  Buckets serving as leafs of BTrees
	// are chained together via 'next', so that the entire BTree contents
	// can be traversed in sorted order quickly and easily.

	next   *Bucket       // the bucket with the next-larger keys
	keys   []KEY         // 'len' keys, in increasing order
	values []interface{} // 'len' corresponding values
}

// BucketEntry is one Bucket node entry.
//
// It contains key and value.
type BucketEntry struct {
	key   KEY
	value interface{}
}

const _KeyMin KEY = math.Min<Key>
const _KeyMax KEY = math.Max<Key>

// ---- access []entry ----

// Key returns BTree entry key.
func (e *Entry) Key() KEY { return e.key }

// Child returns BTree entry child.
func (e *Entry) Child() Node { return e.child }

// Entryv returns entries of a BTree node.
//
// Entries keys limit the keys of all children reachable from an entry:
//
//	[i].Key ≤ [i].Child.*.Key < [i+1].Key		i ∈ [0, len([]))
//
//	[0].Key       = -∞	; always returned so
//	[len(ev)].Key = +∞	; should be assumed so
//
//
// Children of all entries are guaranteed to be of the same kind - either all BTree, or all Bucket.
//
// The caller must not modify returned array.
func (t *BTree) Entryv() /*readonly*/ []Entry {
	return t.data
}

// Key returns Bucket entry key.
func (e *BucketEntry) Key() KEY { return e.key }

// Value returns Bucket entry value.
func (e *BucketEntry) Value() interface{} { return e.value }

// Entryv returns entries of a Bucket node.
func (b *Bucket) Entryv() /*readonly*/ []BucketEntry {
	ev := make([]BucketEntry, len(b.keys))
	for i, k := range b.keys {
		ev[i] = BucketEntry{k, b.values[i]}
	}
	return ev
}

// ---- leaf nodes iteration ----

// FirstBucket returns bucket containing the smallest key in the tree.
func (t *BTree) FirstBucket() *Bucket {
	return t.firstbucket
}

// Next returns tree bucket with next larger keys relative to current bucket.
func (b *Bucket) Next() *Bucket {
	return b.next
}

// ---- point query ----

// Get searches BTree by key.
//
// It loads intermediate BTree nodes from database on demand as needed.
//
// t need not be activated beforehand for Get to work.
func (t *BTree) Get(ctx context.Context, key KEY) (_ interface{}, _ bool, err error) {
	return t.VGet(ctx, key, nil)
}

// VGet is like Get but also calls visit while traversing the tree.
//
// Visit is called with node being activated.
func (t *BTree) VGet(ctx context.Context, key KEY, visit func(node Node)) (_ interface{}, _ bool, err error) {
	defer xerr.Contextf(&err, "btree(%s): get %v", t.POid(), key)
	err = t.PActivate(ctx)
	if err != nil {
		return nil, false, err
	}

	if visit != nil {
		visit(t)
	}

	if len(t.data) == 0 {
		// empty btree
		t.PDeactivate()
		return nil, false, nil
	}

	for {
		// search i: K(i) ≤ k < K(i+1)	; K(0) = -∞, K(len) = +∞
		i := sort.Search(len(t.data), func(i int) bool {
			j := i + 1
			if j == len(t.data) {
				return true // [len].key = +∞
			}
			return key < t.data[j].key
		})

		child := t.data[i].child
		t.PDeactivate()
		err = child.PActivate(ctx)
		if err != nil {
			return nil, false, err
		}

		if visit != nil {
			visit(child)
		}

		switch child := child.(type) {
		case *BTree:
			t = child

		case *Bucket:
			v, ok := child.Get(key)
			child.PDeactivate()
			return v, ok, nil
		}
	}
}

// Get searches Bucket by key.
//
// no loading from database is done. The bucket must be already activated.
func (b *Bucket) Get(key KEY) (interface{}, bool) {
	// search i: K(i-1) < k ≤ K(i)		; K(-1) = -∞, K(len) = +∞
	i := sort.Search(len(b.keys), func(i int) bool {
		return key <= b.keys[i]
	})

	if i == len(b.keys) || b.keys[i] != key {
		return nil, false // not found
	}
	return b.values[i], true
}

// ---- min/max key ----

// MinKey returns minimum key in BTree.
//
// If the tree is empty, ok=false is returned.
// The tree does not need to be activated beforehand.
func (t *BTree) MinKey(ctx context.Context) (_ KEY, ok bool, err error) {
	return t.VMinKey(ctx, nil)
}

// VMinKey is like MinKey but also calls visit while traversing the tree.
//
// Visit is called with node being activated.
func (t *BTree) VMinKey(ctx context.Context, visit func(node Node)) (_ KEY, ok bool, err error) {
	defer xerr.Contextf(&err, "btree(%s): minkey", t.POid())
	err = t.PActivate(ctx)
	if err != nil {
		return 0, false, err
	}

	if visit != nil {
		visit(t)
	}

	if len(t.data) == 0 {
		// empty btree
		t.PDeactivate()
		return 0, false, nil
	}

	// NOTE -> can also use t.firstbucket
	for {
		child := t.data[0].child
		t.PDeactivate()
		err = child.PActivate(ctx)
		if err != nil {
			return 0, false, err
		}

		if visit != nil {
			visit(child)
		}

		switch child := child.(type) {
		case *BTree:
			t = child

		case *Bucket:
			k, ok := child.MinKey()
			child.PDeactivate()
			return k, ok, nil
		}
	}
}

// MaxKey returns maximum key in BTree.
//
// If the tree is empty, ok=false is returned.
// The tree does not need to be activated beforehand.
func (t *BTree) MaxKey(ctx context.Context) (_ KEY, _ bool, err error) {
	return t.VMaxKey(ctx, nil)
}

// VMaxKey is like MaxKey but also calls visit while traversing the tree.
//
// Visit is called with node being activated.
func (t *BTree) VMaxKey(ctx context.Context, visit func(node Node)) (_ KEY, _ bool, err error) {
	defer xerr.Contextf(&err, "btree(%s): maxkey", t.POid())
	err = t.PActivate(ctx)
	if err != nil {
		return 0, false, err
	}

	if visit != nil {
		visit(t)
	}

	l := len(t.data)
	if l == 0 {
		// empty btree
		t.PDeactivate()
		return 0, false, nil
	}

	for {
		child := t.data[l-1].child
		t.PDeactivate()
		err = child.PActivate(ctx)
		if err != nil {
			return 0, false, err
		}

		if visit != nil {
			visit(child)
		}

		switch child := child.(type) {
		case *BTree:
			t = child

		case *Bucket:
			k, ok := child.MaxKey()
			child.PDeactivate()
			return k, ok, nil
		}
	}
}

// MinKey returns minimum key in Bucket.
//
// If the bucket is empty, ok=false is returned.
func (b *Bucket) MinKey() (_ KEY, ok bool) {
	if len(b.keys) == 0 {
		return 0, false
	}
	return b.keys[0], true
}

// MaxKey returns maximum key in Bucket.
//
// If the bucket is empty, ok=false is returned.
func (b *Bucket) MaxKey() (_ KEY, ok bool) {
	l := len(b.keys)
	if l == 0 {
		return 0, false
	}
	return b.keys[l-1], true
}

// ---- serialization ----

// https://github.com/zopefoundation/BTrees/blob/4.5.0-1-gc8bf24e/BTrees/BucketTemplate.c#L1195:
//
// For a mapping bucket (self->values is not NULL), a one-tuple or two-tuple.
// The first element is a tuple interleaving keys and values, of length
// 2 * self->len.  The second element is the next bucket, present iff next is
// non-NULL:
//
//	(
//	     (keys[0], values[0], keys[1], values[1], ...,
//	                          keys[len-1], values[len-1]),
//	     <self->next iff non-NULL>
//	)

type bucketState Bucket // hide state methods from public API

// DropState implements zodb.Stateful to discard bucket state.
func (b *bucketState) DropState() {
	b.next = nil
	b.keys = nil
	b.values = nil
}

// PyGetState implements zodb.PyStateful to get bucket data as pystate.
func (b *bucketState) PyGetState() interface{} {
	// XXX assert len(b.keys) == len(b.values) ?
	t := make(pickle.Tuple, 0, 2*len(b.keys))
	for i := range b.keys {
		t = append(t, b.keys[i], b.values[i])
	}
	t = pickle.Tuple{t}

	if b.next != nil {
		t = append(t, b.next)
	}

	return t
}

// PySetState implements zodb.PyStateful to set bucket data from pystate.
func (b *bucketState) PySetState(pystate interface{}) (err error) {
	t, ok := pystate.(pickle.Tuple)
	if !ok {
		return fmt.Errorf("top: expect (...); got %T", pystate)
	}
	if !(1 <= len(t) && len(t) <= 2) {
		return fmt.Errorf("top: expect [1..2](); got [%d]()", len(t))
	}

	// .next present
	if len(t) == 2 {
		next, ok := t[1].(*Bucket)
		if !ok {
			return fmt.Errorf(".next must be Bucket; got %T", t[1])
		}
		b.next = next
	}

	// main part
	t, ok = t[0].(pickle.Tuple)
	if !ok {
		return fmt.Errorf("data: expect (...); got %T", t[0])
	}
	if len(t)%2 != 0 {
		return fmt.Errorf("data: expect [%%2](); got [%d]()", len(t))
	}

	// reset arrays just in case
	n := len(t) / 2
	b.keys = make([]KEY, 0, n)
	b.values = make([]interface{}, 0, n)

	var kprev int64
	for i := 0; i < n; i++ {
		xk := t[2*i]
		v := t[2*i+1]

		k, ok := pickletools.Xint64(xk)
		if !ok {
			return fmt.Errorf("data: [%d]: key must be integer; got %T", i, xk)
		}

		kk := KEY(k)
		if int64(kk) != k {
			return fmt.Errorf("data: [%d]: key overflows %T", i, kk)
		}

		if i > 0 && !(k > kprev) {
			return fmt.Errorf("data: [%d]: key not ↑", i)
		}
		kprev = k

		b.keys = append(b.keys, kk)
		b.values = append(b.values, v)
	}

	return nil
}


// https://github.com/zopefoundation/BTrees/blob/4.5.0-1-gc8bf24e/BTrees/BTreeTemplate.c#L1087:
//
// For an empty BTree (self->len == 0), None.
//
// For a BTree with one child (self->len == 1), and that child is a bucket,
// and that bucket has a NULL oid, a one-tuple containing a one-tuple
// containing the bucket's state:
//
//     (
//         (
//              child[0].__getstate__(),
//         ),
//     )
//
// Else a two-tuple.  The first element is a tuple interleaving the BTree's
// keys and direct children, of size 2*self->len - 1 (key[0] is unused and
// is not saved).  The second element is the firstbucket:
//
//     (
//          (child[0], key[1], child[1], key[2], child[2], ...,
//                                       key[len-1], child[len-1]),
//          self->firstbucket
//     )
//
// In the above, key[i] means self->data[i].key, and similarly for child[i].

type btreeState BTree // hide state methods from public API

// DropState implements zodb.Stateful.
func (t *btreeState) DropState() {
	t.firstbucket = nil
	t.data = nil
}

// PyGetState implements zodb.PyStateful to get btree data as pystate.
func (bt *btreeState) PyGetState() interface{} {
	// empty btree
	if len(bt.data) == 0 {
		// XXX assert .firstbucket = nil ?
		return pickle.None{}
	}

	// btree with 1 child bucket without oid
	if len(bt.data) == 1 {
		bucket, ok := bt.data[0].child.(*Bucket)
		if ok && bucket.POid() == zodb.InvalidOid {
			return pickle.Tuple{pickle.Tuple{((*bucketState)(bucket)).PyGetState()}}
		}
	}

	// regular btree
	t := make(pickle.Tuple, 0, 2*len(bt.data)-1)
	for i, entry := range bt.data {
		// key[0] is unused and not saved
		if i > 0 {
			t = append(t, entry.key)
		}
		t = append(t, entry.child)
	}
	t = pickle.Tuple{t}
	t = append(t, bt.firstbucket)
	return t
}

// PySetState implements zodb.PyStateful to set btree data from pystate.
func (bt *btreeState) PySetState(pystate interface{}) (err error) {
	// empty btree
	if _, ok := pystate.(pickle.None); ok {
		bt.firstbucket = nil
		bt.data = nil
		return nil
	}

	t, ok := pystate.(pickle.Tuple)
	if !ok {
		return fmt.Errorf("top: expect (...); got %T", pystate)
	}
	if !(1 <= len(t) && len(t) <= 2) {
		return fmt.Errorf("top: expect [1..2](); got [%d]()", len(t))
	}

	// btree with 1 child bucket without oid
	if len(t) == 1 {
		t, ok := t[0].(pickle.Tuple)
		if !ok {
			return fmt.Errorf("bucket1: expect [1](); got %T", t[0])
		}
		if len(t) != 1 {
			return fmt.Errorf("bucket1: expect [1](); got [%d]()", len(t))
		}
		bucket := zodb.NewPersistent(reflect.TypeOf(Bucket{}), bt.PJar()).(*Bucket)
		err := (*bucketState)(bucket).PySetState(t[0])
		if err != nil {
			return fmt.Errorf("bucket1: %s", err)
		}

		bt.firstbucket = bucket
		bt.data = []Entry{{key: 0, child: bucket}}
		return nil
	}

	// regular btree
	bt.firstbucket, ok = t[1].(*Bucket)
	if !ok {
		return fmt.Errorf("first bucket: must be Bucket; got %T", t[1])
	}

	t, ok = t[0].(pickle.Tuple)
	if !ok {
		return fmt.Errorf("data: expect (...); got %T", t[0])
	}
	if len(t)%2 == 0 {
		return fmt.Errorf("data: expect [!%%2](); got [%d]()", len(t))
	}

	n := (len(t) + 1) / 2
	bt.data = make([]Entry, 0, n)
	var kprev int64
	var childrenKind int // 1 - BTree, 2 - Bucket
	for i, idx := 0, 0; i < n; i++ {
		key := int64(_KeyMin) // KEY(-∞)   (qualifies for ≤)
		if i > 0 {
			// key[0] is unused and not saved
			key, ok = t[idx].(int64) // XXX Xint
			if !ok {
				return fmt.Errorf("data: [%d]: key must be integer; got %T", i, t[idx])
			}
			idx++
		}
		child := t[idx]
		idx++

		kkey := KEY(key)
		if int64(kkey) != key {
			return fmt.Errorf("data: [%d]: key overflows %T", i, kkey)
		}

		if i > 1 && !(key > kprev) {
			return fmt.Errorf("data: [%d]: key not ↑", i)
		}
		kprev = key

		// check all children are of the same type
		var kind int // see childrenKind ^^^
		switch child.(type) {
		default:
			return fmt.Errorf("data: [%d]: child must be BTree|Bucket; got %T", i, child)

		case *BTree:
			kind = 1
		case *Bucket:
			kind = 2
		}

		if i == 0 {
			childrenKind = kind
		}
		if kind != childrenKind {
			return fmt.Errorf("data: [%d]: children must be of the same type", i)
		}

		bt.data = append(bt.data, Entry{key: kkey, child: child.(Node)})
	}

	return nil
}


// ---- register classes to ZODB ----

func init() {
	t := reflect.TypeOf
	zodb.RegisterClass("BTrees.BTree.BTree",  t(BTree{}),  t(btreeState{}))
	zodb.RegisterClass("BTrees.BTree.Bucket", t(Bucket{}), t(bucketState{}))
}