Commit 1135ef15 authored by Volker Dobler's avatar Volker Dobler Committed by Russ Cox

sort: implement stable sorting

This CL provides stable in-place sorting by use of
bottom up merge sort with in-place merging done by
the SymMerge algorithm from P.-S. Kim and A. Kutzner.

The additional space needed for stable sorting (in the form of
stack space) is logarithmic in the inputs size n.
Number of calls to Less and Swap grow like O(n * log n) and
O(n * log n * log n):
Stable sorting random data uses significantly more calls
to Swap than the unstable quicksort implementation (5 times more
on n=100, 10 times more on n=1e4 and 23 times more on n=1e8).
The number of calls to Less is practically the same for Sort and
Stable.

Stable sorting 1 million random integers takes 5 times longer
than using Sort.

BenchmarkSortString1K      50000       328662 ns/op
BenchmarkStableString1K    50000       380231 ns/op  1.15 slower
BenchmarkSortInt1K         50000       157336 ns/op
BenchmarkStableInt1K       50000       191167 ns/op  1.22 slower
BenchmarkSortInt64K         1000     14466297 ns/op
BenchmarkStableInt64K        500     16190266 ns/op  1.12 slower

BenchmarkSort1e2          200000        64923 ns/op
BenchmarkStable1e2         50000       167128 ns/op  2.57 slower
BenchmarkSort1e4            1000     14540613 ns/op
BenchmarkStable1e4           100     58117289 ns/op  4.00 slower
BenchmarkSort1e6               5   2429631508 ns/op
BenchmarkStable1e6             1  12077036952 ns/op  4.97 slower

R=golang-dev, bradfitz, iant, 0xjnml, rsc
CC=golang-dev
https://golang.org/cl/9612044
parent 4d8aefde
...@@ -283,3 +283,192 @@ func Float64sAreSorted(a []float64) bool { return IsSorted(Float64Slice(a)) } ...@@ -283,3 +283,192 @@ func Float64sAreSorted(a []float64) bool { return IsSorted(Float64Slice(a)) }
// StringsAreSorted tests whether a slice of strings is sorted in increasing order. // StringsAreSorted tests whether a slice of strings is sorted in increasing order.
func StringsAreSorted(a []string) bool { return IsSorted(StringSlice(a)) } func StringsAreSorted(a []string) bool { return IsSorted(StringSlice(a)) }
// Notes on stable sorting:
// The used algorithms are simple and provable correct on all input and use
// only logarithmic additional stack space. They perform well if compared
// experimentaly to other stable in-place sorting algorithms.
//
// Remarks on other algoritms evaluated:
// - GCC's 4.6.3 stable_sort with merge_without_buffer from libstdc++:
// Not faster.
// - GCC's __rotate for block rotations: Not faster.
// - "Practical in-place mergesort" from Jyrki Katajainen, Tomi A. Pasanen
// and Jukka Teuhola; Nordic Journal of Computing 3,1 (1996), 27-40:
// The given algorithms are in-place, number of Swap and Assignments
// grow as n log n but the algorithm is not stable.
// - "Fast Stable In-Plcae Sorting with O(n) Data Moves" J.I. Munro and
// V. Raman in Algorithmica (1996) 16, 115-160:
// This algorithm either needs additional 2n bits or works only if there
// are enough different elements available to encode some permutations
// which have to be undone later (so not stable an any input).
// - All the optimal in-place sorting/merging algorithms I found are either
// unstable or rely on enough different elements in each step to encode the
// performed block rearrangements. See also "In-Place Merging Algorithms",
// Denham Coates-Evely, Department of Computer Science, Kings College,
// January 2004 and the reverences in there.
// - Often "optimal" algorithms are optimal in the number of assignments
// but Interface has only Swap as operation.
// Stable sorts data while keeping the original order of equal elements.
//
// It makes one call to data.Len to determine n, O(n*log(n)) calls to
// data.Less and O(n*log(n)*log(n)) calls to data.Swap.
func Stable(data Interface) {
n := data.Len()
blockSize := 20
a, b := 0, blockSize
for b <= n {
insertionSort(data, a, b)
a = b
b += blockSize
}
insertionSort(data, a, n)
for blockSize < n {
a, b = 0, 2*blockSize
for b <= n {
symMerge(data, a, a+blockSize, b)
a = b
b += 2 * blockSize
}
symMerge(data, a, a+blockSize, n)
blockSize *= 2
}
}
// SymMerge merges the two sorted subsequences data[a:m] and data[m:b] using
// the SymMerge algorithm from Pok-Son Kim and Arne Kutzner, "Stable Minimum
// Storage Merging by Symmetric Comparisons", in Susanne Albers and Tomasz
// Radzik, editors, Algorithms - ESA 2004, volume 3221 of Lecture Notes in
// Computer Science, pages 714-723. Springer, 2004.
//
// Let M = m-a and N = b-n. Wolog M < N.
// The recursion depth is bound by ceil(log(N+M)).
// The algorithm needs O(M*log(N/M + 1)) calls to data.Less.
// The algorithm needs O((M+N)*log(M)) calls to data.Swap.
//
// The paper gives O((M+N)*log(M)) as the number of assignments assuming a
// rotation algorithm wich uses O(M+N+gcd(M+N)) assignments. The argumentation
// in the paper carries through for Swap operations, especially as the block
// swapping rotate uses only O(M+N) Swaps.
func symMerge(data Interface, a, m, b int) {
if a >= m || m >= b {
return
}
mid := a + (b-a)/2
n := mid + m
start := 0
if m > mid {
start = n - b
r, p := mid, n-1
for start < r {
c := start + (r-start)/2
if !data.Less(p-c, c) {
start = c + 1
} else {
r = c
}
}
} else {
start = a
r, p := m, n-1
for start < r {
c := start + (r-start)/2
if !data.Less(p-c, c) {
start = c + 1
} else {
r = c
}
}
}
end := n - start
rotate(data, start, m, end)
symMerge(data, a, start, mid)
symMerge(data, mid, end, b)
}
// Rotate two consecutives blocks u = data[a:m] and v = data[m:b] in data:
// Data of the form 'x u v y' is changed to 'x v u y'.
// Rotate performs at most b-a many calls to data.Swap.
func rotate(data Interface, a, m, b int) {
i := m - a
if i == 0 {
return
}
j := b - m
if j == 0 {
return
}
if i == j {
swapRange(data, a, m, i)
return
}
p := a + i
for i != j {
if i > j {
swapRange(data, p-i, p, j)
i -= j
} else {
swapRange(data, p-i, p+j-i, i)
j -= i
}
}
swapRange(data, p-i, p, i)
}
/*
Complexity of Stable Sorting
Complexity of block swapping rotation
Each Swap puts one new element into its correct, final position.
Elements which reach their final position are no longer moved.
Thus block swapping rotation needs |u|+|v| calls to Swaps.
This is best possible as each element might need a move.
Pay attention when comparing to other optimal algorithms which
typically count the number of assignments instead of swaps:
E.g. the optimal algorithm of Dudzinski and Dydek for in-place
rotations uses O(u + v + gcd(u,v)) assignments which is
better than our O(3 * (u+v)) as gcd(u,v) <= u.
Stable sorting by SymMerge and BlockSwap rotations
SymMerg complexity for same size input M = N:
Calls to Less: O(M*log(N/M+1)) = O(N*log(2)) = O(N)
Calls to Swap: O((M+N)*log(M)) = O(2*N*log(N)) = O(N*log(N))
(The following argument does not fuzz over a missing -1 or
other stuff which does not impact the final result).
Let n = data.Len(). Assume n = 2^k.
Plain merge sort performs log(n) = k iterations.
On iteration i the algorithm merges 2^(k-i) blocks, each of size 2^i.
Thus iteration i of merge sort performs:
Calls to Less O(2^(k-i) * 2^i) = O(2^k) = O(2^log(n)) = O(n)
Calls to Swap O(2^(k-i) * 2^i * log(2^i)) = O(2^k * i) = O(n*i)
In total k = log(n) iterations are performed; so in total:
Calls to Less O(log(n) * n)
Calls to Swap O(n + 2*n + 3*n + ... + (k-1)*n + k*n)
= O((k/2) * k * n) = O(n * k^2) = O(n * log^2(n))
Above results should generalize to arbitrary n = 2^k + p
and should not be influenced by the initial insertion sort phase:
Insertion sort is O(n^2) on Swap and Less, thus O(bs^2) per block of
size bs at n/bs blocks: O(bs*n) Swaps and Less during insertion sort.
Merge sort iterations start at i = log(bs). With t = log(bs) constant:
Calls to Less O((log(n)-t) * n + bs*n) = O(log(n)*n + (bs-t)*n)
= O(n * log(n))
Calls to Swap O(n * log^2(n) - (t^2+t)/2*n) = O(n * log^2(n))
*/
...@@ -122,6 +122,19 @@ func BenchmarkSortString1K(b *testing.B) { ...@@ -122,6 +122,19 @@ func BenchmarkSortString1K(b *testing.B) {
} }
} }
func BenchmarkStableString1K(b *testing.B) {
b.StopTimer()
for i := 0; i < b.N; i++ {
data := make([]string, 1<<10)
for i := 0; i < len(data); i++ {
data[i] = strconv.Itoa(i ^ 0x2cc)
}
b.StartTimer()
Stable(StringSlice(data))
b.StopTimer()
}
}
func BenchmarkSortInt1K(b *testing.B) { func BenchmarkSortInt1K(b *testing.B) {
b.StopTimer() b.StopTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
...@@ -135,6 +148,19 @@ func BenchmarkSortInt1K(b *testing.B) { ...@@ -135,6 +148,19 @@ func BenchmarkSortInt1K(b *testing.B) {
} }
} }
func BenchmarkStableInt1K(b *testing.B) {
b.StopTimer()
for i := 0; i < b.N; i++ {
data := make([]int, 1<<10)
for i := 0; i < len(data); i++ {
data[i] = i ^ 0x2cc
}
b.StartTimer()
Stable(IntSlice(data))
b.StopTimer()
}
}
func BenchmarkSortInt64K(b *testing.B) { func BenchmarkSortInt64K(b *testing.B) {
b.StopTimer() b.StopTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
...@@ -148,6 +174,19 @@ func BenchmarkSortInt64K(b *testing.B) { ...@@ -148,6 +174,19 @@ func BenchmarkSortInt64K(b *testing.B) {
} }
} }
func BenchmarkStableInt64K(b *testing.B) {
b.StopTimer()
for i := 0; i < b.N; i++ {
data := make([]int, 1<<16)
for i := 0; i < len(data); i++ {
data[i] = i ^ 0xcccc
}
b.StartTimer()
Stable(IntSlice(data))
b.StopTimer()
}
}
const ( const (
_Sawtooth = iota _Sawtooth = iota
_Rand _Rand
...@@ -204,7 +243,7 @@ func lg(n int) int { ...@@ -204,7 +243,7 @@ func lg(n int) int {
return i return i
} }
func testBentleyMcIlroy(t *testing.T, sort func(Interface)) { func testBentleyMcIlroy(t *testing.T, sort func(Interface), maxswap func(int) int) {
sizes := []int{100, 1023, 1024, 1025} sizes := []int{100, 1023, 1024, 1025}
if testing.Short() { if testing.Short() {
sizes = []int{100, 127, 128, 129} sizes = []int{100, 127, 128, 129}
...@@ -278,7 +317,7 @@ func testBentleyMcIlroy(t *testing.T, sort func(Interface)) { ...@@ -278,7 +317,7 @@ func testBentleyMcIlroy(t *testing.T, sort func(Interface)) {
} }
desc := fmt.Sprintf("n=%d m=%d dist=%s mode=%s", n, m, dists[dist], modes[mode]) desc := fmt.Sprintf("n=%d m=%d dist=%s mode=%s", n, m, dists[dist], modes[mode])
d := &testingData{desc: desc, t: t, data: mdata[0:n], maxswap: n * lg(n) * 12 / 10} d := &testingData{desc: desc, t: t, data: mdata[0:n], maxswap: maxswap(n)}
sort(d) sort(d)
// Uncomment if you are trying to improve the number of compares/swaps. // Uncomment if you are trying to improve the number of compares/swaps.
//t.Logf("%s: ncmp=%d, nswp=%d", desc, d.ncmp, d.nswap) //t.Logf("%s: ncmp=%d, nswp=%d", desc, d.ncmp, d.nswap)
...@@ -303,11 +342,15 @@ func testBentleyMcIlroy(t *testing.T, sort func(Interface)) { ...@@ -303,11 +342,15 @@ func testBentleyMcIlroy(t *testing.T, sort func(Interface)) {
} }
func TestSortBM(t *testing.T) { func TestSortBM(t *testing.T) {
testBentleyMcIlroy(t, Sort) testBentleyMcIlroy(t, Sort, func(n int) int { return n * lg(n) * 12 / 10 })
} }
func TestHeapsortBM(t *testing.T) { func TestHeapsortBM(t *testing.T) {
testBentleyMcIlroy(t, Heapsort) testBentleyMcIlroy(t, Heapsort, func(n int) int { return n * lg(n) * 12 / 10 })
}
func TestStableBM(t *testing.T) {
testBentleyMcIlroy(t, Stable, func(n int) int { return n * lg(n) * lg(n) })
} }
// This is based on the "antiquicksort" implementation by M. Douglas McIlroy. // This is based on the "antiquicksort" implementation by M. Douglas McIlroy.
...@@ -357,3 +400,148 @@ func TestAdversary(t *testing.T) { ...@@ -357,3 +400,148 @@ func TestAdversary(t *testing.T) {
d := &adversaryTestingData{data, make(map[int]int), 0} d := &adversaryTestingData{data, make(map[int]int), 0}
Sort(d) // This should degenerate to heapsort. Sort(d) // This should degenerate to heapsort.
} }
func TestStableInts(t *testing.T) {
data := ints
Stable(IntSlice(data[0:]))
if !IntsAreSorted(data[0:]) {
t.Errorf("nsorted %v\n got %v", ints, data)
}
}
type intPairs []struct {
a, b int
}
// IntPairs compare on a only.
func (d intPairs) Len() int { return len(d) }
func (d intPairs) Less(i, j int) bool { return d[i].a < d[j].a }
func (d intPairs) Swap(i, j int) { d[i], d[j] = d[j], d[i] }
// Record initial order in B.
func (d intPairs) initB() {
for i := range d {
d[i].b = i
}
}
// InOrder checks if a-equal elements were not reordered.
func (d intPairs) inOrder() bool {
lastA, lastB := -1, 0
for i := 0; i < len(d); i++ {
if lastA != d[i].a {
lastA = d[i].a
lastB = d[i].b
continue
}
if d[i].b <= lastB {
return false
}
lastB = d[i].b
}
return true
}
func TestStability(t *testing.T) {
n, m := 100000, 1000
if testing.Short() {
n, m = 1000, 100
}
data := make(intPairs, n)
// random distribution
for i := 0; i < len(data); i++ {
data[i].a = rand.Intn(m)
}
if IsSorted(data) {
t.Fatalf("terrible rand.rand")
}
data.initB()
Stable(data)
if !IsSorted(data) {
t.Errorf("Stable didn't sort %d ints", n)
}
if !data.inOrder() {
t.Errorf("Stable wasn't stable on %d ints", n)
}
// already sorted
data.initB()
Stable(data)
if !IsSorted(data) {
t.Errorf("Stable shuffeled sorted %d ints (order)", n)
}
if !data.inOrder() {
t.Errorf("Stable shuffeled sorted %d ints (stability)", n)
}
// sorted reversed
for i := 0; i < len(data); i++ {
data[i].a = len(data) - i
}
data.initB()
Stable(data)
if !IsSorted(data) {
t.Errorf("Stable didn't sort %d ints", n)
}
if !data.inOrder() {
t.Errorf("Stable wasn't stable on %d ints", n)
}
}
var countOpsSizes = []int{1e2, 3e2, 1e3, 3e3, 1e4, 3e4, 1e5, 3e5, 1e6}
func countOps(t *testing.T, algo func(Interface), name string) {
sizes := countOpsSizes
if testing.Short() {
sizes = sizes[:5]
}
if !testing.Verbose() {
t.Skip("Counting skipped as non-verbose mode.")
}
for _, n := range sizes {
td := testingData{
desc: name,
t: t,
data: make([]int, n),
maxswap: 1 << 31,
}
for i := 0; i < n; i++ {
td.data[i] = rand.Intn(n / 5)
}
algo(&td)
t.Logf("%s %8d elements: %11d Swap, %10d Less", name, n, td.nswap, td.ncmp)
}
}
func TestCountStableOps(t *testing.T) { countOps(t, Stable, "Stable") }
func TestCountSortOps(t *testing.T) { countOps(t, Sort, "Sort ") }
func bench(b *testing.B, size int, algo func(Interface), name string) {
b.StopTimer()
data := make(intPairs, size)
for i := 0; i < b.N; i++ {
for n := size - 3; n <= size+3; n++ {
for i := 0; i < len(data); i++ {
data[i].a = rand.Intn(n / 5)
}
data.initB()
b.StartTimer()
algo(data)
b.StopTimer()
if !IsSorted(data) {
b.Errorf("%s did not sort %d ints", name, n)
}
if name == "Stable" && !data.inOrder() {
b.Errorf("%s unstable on %d ints", name, n)
}
}
}
}
func BenchmarkSort1e2(b *testing.B) { bench(b, 1e2, Sort, "Sort") }
func BenchmarkStable1e2(b *testing.B) { bench(b, 1e2, Stable, "Stable") }
func BenchmarkSort1e4(b *testing.B) { bench(b, 1e4, Sort, "Sort") }
func BenchmarkStable1e4(b *testing.B) { bench(b, 1e4, Stable, "Stable") }
func BenchmarkSort1e6(b *testing.B) { bench(b, 1e6, Sort, "Sort") }
func BenchmarkStable1e6(b *testing.B) { bench(b, 1e6, Stable, "Stable") }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment