Commit ff2f0b67 authored by Kirill Smelkov's avatar Kirill Smelkov

restore: Extract packs in multiple workers

This way it allows us to leverage multiple CPUs on a system for pack
extractions, which are computation-heavy operations.

The way to do is more-or-less classical:

    - main worker prepares requests for pack extraction jobs

    - there are multiple pack-extraction workers, which read requests
      from jobs queue and perform them

    - at the end we wait for everything to stop, collect errors and
      optionally signalling the whole thing to cancel if we see an error
      coming. (it is only a signal and we still have to wait for
      everything to stop)

The default number of workers is N(CPU) on the system - because we spawn
separate `git pack-objects ...` for every request.

We also now explicitly limit N(CPU) each `git pack-objects ...` can use
to 1. This way control how many resources to use is in git-backup hand
and also git packs better this way (when only using 1 thread) because
when deltifying all objects are considered to each other, not only all
objects inside 1 thread's object poll, and even when pack.threads is not
1, first "objects counting" phase of pack is serial - wasting all but 1
core.

On lab.nexedi.com we already use pack.threads=1 by default in global
gitconfig, but the above change is for code to be universal.

Time to restore nexedi/ from lab.nexedi.com backup:

2CPU laptop:

    before (pack.threads=1)     10m11s
    before (pack.threads=NCPU)   9m13s
    after  -j1                  10m11s
    after                        6m17s

8CPU system (with other load present, noisy) :

    before (pack.threads=1)     ~5m
    after                       ~1m30s
parent 6c2abbbf
......@@ -251,3 +251,18 @@ func erraddcallingcontext(topfunc string, e *Error) *Error {
return e
}
// error merging multiple errors (e.g. after collecting them from several parallel workers)
type Errorv []error
func (ev Errorv) Error() string {
if len(ev) == 1 {
return ev[0].Error()
}
msg := fmt.Sprintf("%d errors:\n", len(ev))
for _, e := range ev {
msg += fmt.Sprintf("\t- %s\n", e)
}
return msg
}
......@@ -67,9 +67,11 @@ import (
"os"
pathpkg "path"
"path/filepath"
"runtime"
"runtime/debug"
"sort"
"strings"
"sync"
"syscall"
"time"
......@@ -106,6 +108,9 @@ func debugf(format string, a ...interface{}) {
}
}
// how many max jobs to spawn
var njobs = runtime.NumCPU()
// -------- create/extract blob --------
// file -> blob_sha1, mode
......@@ -675,6 +680,15 @@ func (br ByRepoPath) Search(prefix string) int {
})
}
// request to extract a pack
type PackExtractReq struct {
refs RefMap // extract pack with objects from this heads
repopath string // into repository located here
// for info only: request was generated restoring from under this backup prefix
prefix string
}
func cmd_restore_(gb *git.Repository, HEAD_ string, restorespecv []RestoreSpec) {
HEAD := xgitSha1("rev-parse", "--verify", HEAD_)
......@@ -722,7 +736,24 @@ func cmd_restore_(gb *git.Repository, HEAD_ string, restorespecv []RestoreSpec)
// repotab no longer needed
repotab = nil
// walk over specified prefixes restoring files and packs in *.git
packxq := make(chan PackExtractReq, 2*njobs) // requests to extract packs
errch := make(chan error) // errors from workers
stopch := make(chan struct{}) // broadcasts restore has to be cancelled
wg := sync.WaitGroup{}
// main worker: walk over specified prefixes restoring files and
// scheduling pack extraction requests from *.git -> packxq
wg.Add(1)
go func() {
defer wg.Done()
defer close(packxq)
// raised err -> errch
here := myfuncname()
defer errcatch(func(e *Error) {
errch <- erraddcallingcontext(here, e)
})
runloop:
for _, __ := range restorespecv {
prefix, dir := __.prefix, __.dir
......@@ -774,9 +805,6 @@ func cmd_restore_(gb *git.Repository, HEAD_ string, restorespecv []RestoreSpec)
break // repov is sorted - end of repositories with prefix
}
repopath := reprefix(prefix, dir, repo.repopath)
infof("# git %s\t-> %s", prefix, repopath)
// make sure tag/tree/blob objects represented as commits are
// present, before we generate pack for restored repo.
// ( such objects could be lost e.g. after backup repo repack as they
......@@ -787,22 +815,58 @@ func cmd_restore_(gb *git.Repository, HEAD_ string, restorespecv []RestoreSpec)
}
}
select {
case packxq <- PackExtractReq{refs: repo.refs,
repopath: reprefix(prefix, dir, repo.repopath),
prefix: prefix}:
case <-stopch:
break runloop
}
}
}
}()
// pack workers: packxq -> extract packs
for i := 0; i < njobs; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// raised err -> errch
here := myfuncname()
defer errcatch(func(e *Error) {
errch <- erraddcallingcontext(here, e)
})
runloop:
for {
select {
case <-stopch:
break runloop
case p, ok := <-packxq:
if !ok {
break runloop
}
infof("# git %s\t-> %s", p.prefix, p.repopath)
// extract pack for that repo from big backup pack + decoded tags
pack_argv := []string{
"-c", "pack.threads=1", // occupy only 1 CPU + it packs better
"pack-objects",
"--revs", // include all objects referencable from input sha1 list
"--reuse-object", "--reuse-delta", "--delta-base-offset"}
if verbose <= 0 {
pack_argv = append(pack_argv, "-q")
}
pack_argv = append(pack_argv, repopath+"/objects/pack/pack")
pack_argv = append(pack_argv, p.repopath+"/objects/pack/pack")
xgit2(pack_argv, RunWith{stdin: repo.refs.Sha1HeadsStr(), stderr: gitprogress()})
xgit2(pack_argv, RunWith{stdin: p.refs.Sha1HeadsStr(), stderr: gitprogress()})
// verify that extracted repo refs match backup.refs index after extraction
x_ref_list := xgit("--git-dir=" + repopath,
x_ref_list := xgit("--git-dir=" + p.repopath,
"for-each-ref", "--format=%(objectname) %(refname)")
repo_refs := repo.refs.Values()
repo_refs := p.refs.Values()
sort.Sort(ByRefname(repo_refs))
repo_ref_listv := make([]string, 0, len(repo_refs))
for _, __ := range repo_refs {
......@@ -810,7 +874,7 @@ func cmd_restore_(gb *git.Repository, HEAD_ string, restorespecv []RestoreSpec)
}
repo_ref_list := strings.Join(repo_ref_listv, "\n")
if x_ref_list != repo_ref_list {
raisef("E: extracted %s refs corrupt", repopath)
raisef("E: extracted %s refs corrupt", p.repopath)
}
// check connectivity in recreated repository.
......@@ -820,8 +884,8 @@ func cmd_restore_(gb *git.Repository, HEAD_ string, restorespecv []RestoreSpec)
//
// Compared to fsck we do not re-compute sha1 sum of objects which
// is significantly faster.
gerr, _, _ := ggit("--git-dir=" + repopath,
"rev-list", "--objects", "--stdin", "--quiet", RunWith{stdin: repo.refs.Sha1HeadsStr()})
gerr, _, _ := ggit("--git-dir=" + p.repopath,
"rev-list", "--objects", "--stdin", "--quiet", RunWith{stdin: p.refs.Sha1HeadsStr()})
if gerr != nil {
fmt.Fprintln(os.Stderr, "E: Problem while checking connectivity of extracted repo:")
raise(gerr)
......@@ -829,7 +893,7 @@ func cmd_restore_(gb *git.Repository, HEAD_ string, restorespecv []RestoreSpec)
// XXX disabled because it is slow
// // NOTE progress goes to stderr, problems go to stdout
// xgit("--git-dir=" + repopath, "fsck",
// xgit("--git-dir=" + p.repopath, "fsck",
// # only check that traversal from refs is ok: this unpacks
// # commits and trees and verifies blob objects are there,
// # but do _not_ unpack blobs =fast.
......@@ -837,6 +901,27 @@ func cmd_restore_(gb *git.Repository, HEAD_ string, restorespecv []RestoreSpec)
// RunWith{stdout: gitprogress(), stderr: gitprogress()})
}
}
}()
}
// wait for workers to finish & collect/reraise their errors
go func() {
wg.Wait()
close(errch)
}()
ev := Errorv{}
for e := range errch {
// tell everything to stop on first error
if len(ev) == 0 {
close(stopch)
}
ev = append(ev, e)
}
if len(ev) != 0 {
raise(ev)
}
}
var commands = map[string]func(*git.Repository, []string){
......@@ -856,7 +941,8 @@ func usage() {
-h --help this help text.
-v increase verbosity.
-q decrease verbosity.
`)
-j N allow max N jobs to spawn; default=NPROC (%d on this system)
`, njobs)
}
func main() {
......@@ -864,6 +950,7 @@ func main() {
quiet := 0
flag.Var((*countFlag)(&verbose), "v", "verbosity level")
flag.Var((*countFlag)(&quiet), "q", "decrease verbosity")
flag.IntVar(&njobs, "j", njobs, "allow max N jobs to spawn")
flag.Parse()
verbose -= quiet
argv := flag.Args()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment