Commit d5073ae4 authored by Leif Walsh's avatar Leif Walsh Committed by Yoni Fogel

[t:3315] improving sort code by bottoming out in a quicksort

git-svn-id: file:///svn/toku/tokudb@34892 c7de825b-a66e-492c-adef-691d508d4ae1
parent 432ee683
...@@ -22,13 +22,12 @@ merge_c(void *vdest, void *va, int an, void *vb, int bn, int width, ...@@ -22,13 +22,12 @@ merge_c(void *vdest, void *va, int an, void *vb, int bn, int width,
{ {
char *dest = vdest, *a = va, *b = vb; char *dest = vdest, *a = va, *b = vb;
while (an > 0 && bn > 0) { while (an > 0 && bn > 0) {
int c = cmp(extra, a, b); if (cmp(extra, a, b) < 0) {
if (c < 0) {
memcpy(dest, a, width); memcpy(dest, a, width);
dest+=width; a+=width; an--; dest += width; a += width; an--;
} else { } else {
memcpy(dest, b, width); memcpy(dest, b, width);
dest+=width; b+=width; bn--; dest += width; b += width; bn--;
} }
} }
if (an > 0) { if (an > 0) {
...@@ -49,23 +48,25 @@ binsearch(void *key, void *va, int n, int abefore, int width, ...@@ -49,23 +48,25 @@ binsearch(void *key, void *va, int n, int abefore, int width,
} }
char *a = va; char *a = va;
int mid = n / 2; int mid = n / 2;
void *akey = a + mid * width; void *akey = &a[mid * width];
int c = cmp(extra, key, akey); int c = cmp(extra, key, akey);
if (c == 0) { if (c < 0) {
// this won't happen because msns are unique, but is here for completeness
return abefore + mid;
} else if (c < 0) {
if (n == 1) { if (n == 1) {
return abefore; return abefore;
} else { } else {
return binsearch(key, a, mid, abefore, width, extra, cmp); return binsearch(key, a, mid, abefore, width, extra, cmp);
} }
} else { } else if (c > 0) {
if (n == 1) { if (n == 1) {
return abefore + 1; return abefore + 1;
} else { } else {
return binsearch(key, a+mid*width, n-mid, abefore+mid, width, extra, cmp); return binsearch(key, akey, n - mid, abefore + mid, width,
extra, cmp);
} }
} else {
// this won't happen because msns are unique, but is here for
// completeness
return abefore + mid;
} }
} }
...@@ -82,48 +83,119 @@ merge(void *vdest, void *va, int an, void *vb, int bn, int width, ...@@ -82,48 +83,119 @@ merge(void *vdest, void *va, int an, void *vb, int bn, int width,
char *tmp1 = a; a = b; b = tmp1; char *tmp1 = a; a = b; b = tmp1;
int tmp2 = an; an = bn; bn = tmp2; int tmp2 = an; an = bn; bn = tmp2;
} }
int a2 = an/2; int a2 = an / 2;
void *akey = a + a2 * width; void *akey = &a[a2 * width];
int b2 = binsearch(akey, b, bn, 0, width, extra, cmp); int b2 = binsearch(akey, b, bn, 0, width, extra, cmp);
int ra, rb; int ra, rb;
ra = cilk_spawn merge(dest, a, a2, b, b2, width, extra, cmp); ra = cilk_spawn merge(dest, a, a2, b, b2, width, extra, cmp);
rb = merge(dest+(a2+b2)*width, a+a2*width, an-a2, b+b2*width, bn-b2, width, extra, cmp); rb = merge(&dest[(a2 + b2) * width], akey, an - a2, &b[b2 * width], bn - b2,
width, extra, cmp);
cilk_sync; cilk_sync;
if (ra != 0) return ra; if (ra != 0) return ra;
return rb; return rb;
} }
static inline void
swap(void *va, void *vb, int width)
{
u_int64_t *ia = va, *ib = vb, it;
while ((unsigned int) width >= sizeof(u_int64_t)) {
it = *ia;
*ia = *ib;
*ib = it;
width -= sizeof(u_int64_t);
ia++;
ib++;
}
if (width == 0) { return; }
unsigned char *ca = (void *) ia, *cb = (void *) ib, ct;
while (width > 0) {
ct = *ca;
*ca = *cb;
*cb = ct;
width--;
ca++;
cb++;
}
}
static int
quicksort_r(void *va, int n, int width,
void *extra, int (*cmp)(void *, const void *, const void *))
{
if (n <= 1) { return 0; }
unsigned char *a = va;
unsigned char *pivot = &a[(n - 1) * width];
unsigned char *mid = &a[(n / 2) * width];
// The pivot is the last position in the array, but is the median of
// three elements (first, middle, last).
if (cmp(extra, a, pivot) > 0) {
swap(a, pivot, width);
}
if (cmp(extra, pivot, mid) > 0) {
swap(pivot, mid, width);
if (cmp(extra, a, pivot) > 0) {
swap(a, pivot, width);
}
}
unsigned char *lp = a, *rp = &a[(n - 2) * width];
while (lp < rp) {
// In the case where we have a lot of duplicate elements, this is
// kind of horrible (it's O(n^2)). It could be fixed by
// partitioning into less, equal, and greater, but since the only
// place we're using it right now has no duplicates (the MSNs are
// guaranteed unique), it's fine to do it this way, and probably
// better because it's simpler.
while (cmp(extra, lp, pivot) < 0) {
lp += width;
}
while (cmp(extra, pivot, rp) <= 0) {
rp -= width;
}
if (lp < rp) {
swap(lp, rp, width);
lp += width;
rp -= width;
}
}
if (lp == rp && cmp(extra, lp, pivot) < 0) {
// A weird case where lp and rp are both pointing to the rightmost
// element less than the pivot, we want lp to point to the first
// element greater than or equal to the pivot.
lp += width;
}
// Swap the pivot back into place.
swap(pivot, lp, width);
int r = quicksort_r(a, (lp - a) / width, width, extra, cmp);
if (r != 0) { return r; }
// The pivot is in this spot and we don't need to sort it, so move
// over one space before calling quicksort_r again.
lp += width;
r = quicksort_r(lp, n - (lp - a) / width, width, extra, cmp);
return r;
}
int int
mergesort_r(void *va, int n, int width, mergesort_r(void *va, int n, int width,
void *extra, int (*cmp)(void *, const void *, const void *)) void *extra, int (*cmp)(void *, const void *, const void *))
{ {
const BOOL use_cilk = (n > 10000);
if (n <= 1) { return 0; } if (n <= 1) { return 0; }
unsigned char *a = va; if (n < 10000) {
int mid = n/2; return quicksort_r(va, n, width, extra, cmp);
int r1, r2;
if (use_cilk) {
r1 = cilk_spawn mergesort_r(a, mid, width, extra, cmp);
} else {
r1 = mergesort_r(a, mid, width, extra, cmp);
} }
r2 = mergesort_r(a+mid*width, n-mid, width, extra, cmp); unsigned char *a = va;
int mid = n / 2;
int r1 = cilk_spawn mergesort_r(a, mid, width, extra, cmp);
int r2 = mergesort_r(&a[mid * width], n - mid, width, extra, cmp);
cilk_sync; cilk_sync;
if (r1 != 0) return r1; if (r1 != 0) return r1;
if (r2 != 0) return r2; if (r2 != 0) return r2;
void *tmp = toku_xmalloc(n * width); void *tmp = toku_xmalloc(n * width);
int r; int r = merge(tmp, a, mid, &a[mid * width], n - mid, width, extra, cmp);
if (use_cilk) { if (r == 0) {
r = merge(tmp, a, mid, a+mid*width, n-mid, width, extra, cmp); memcpy(a, tmp, n * width);
} else {
r = merge_c(tmp, a, mid, a+mid*width, n-mid, width, extra, cmp);
}
if (r != 0) {
toku_free(tmp);
return r;
} }
memcpy(a, tmp, n*width);
toku_free(tmp); toku_free(tmp);
return 0; return r;
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment