Commit c3b644da authored by bescoto's avatar bescoto

Added user_group module and associated tests


git-svn-id: http://svn.savannah.nongnu.org/svn/rdiff-backup@416 2b77aa54-bcbc-44c9-a7ec-4f6cf2b41109
parent 6a98bce8
...@@ -124,6 +124,12 @@ backup_writer = None ...@@ -124,6 +124,12 @@ backup_writer = None
# Connection of the client # Connection of the client
client_conn = None client_conn = None
# When backing up, issource should be true on the reader and isdest on
# the writer. When restoring, issource should be true on the mirror
# and isdest should be true on the target.
issource = None
isdest = None
# This list is used by the set function below. When a new # This list is used by the set function below. When a new
# connection is created with init_connection, its Globals class # connection is created with init_connection, its Globals class
# will match this one for all the variables mentioned in this # will match this one for all the variables mentioned in this
......
# Copyright 2002, 2003 Ben Escoto
#
# This file is part of rdiff-backup.
#
# rdiff-backup is free software; you can redistribute it and/or modify
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# rdiff-backup is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with rdiff-backup; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
# USA
"""This module deal with users and groups
On each connection we may need to map unames and gnames to uids and
gids, and possibly vice-versa. So maintain a separate dictionary for
this.
On the destination connection only, if necessary have a separate
dictionary of mappings, which specify how to map users/groups on one
connection to the users/groups on the other.
"""
import grp, pwd
import log, Globals
# This should be set to the user UserMap class object if using
# user-defined user mapping, and a Map class object otherwise.
UserMap = None
# This should be set to the group UserMap class object if using
# user-defined group mapping, and a Map class object otherwise.
GroupMap = None
uid2uname_dict = {}; gid2gname_dict = {}
def uid2uname(uid):
"""Given uid, return uname or None if cannot find"""
try: return uid2uname_dict[uid]
except KeyError:
try: uname = pwd.getpwuid(uid)[0]
except KeyError: uname = None
uid2uname_dict[uid] = uname
return uname
def gid2gname(gid):
"""Given gid, return group name or None if cannot find"""
try: return gid2gname_dict[gid]
except KeyError:
try: gname = grp.getgrgid(gid)[0]
except KeyError: gname = None
gid2gname_dict[gid] = gname
return gname
class Map:
"""Used for mapping names and id on source side to dest side"""
def __init__(self, name2id_func):
"""Map initializer, set dictionaries"""
assert Globals.isdest, "Should run on destination connection"
self.name2id_dict = {}
self.name2id_func = name2id_func
def get_id(self, id, name = None):
"""Return mapped id from id and, if available, name"""
if not name: return self.get_id_from_id(id)
try: return self.name2id_dict[name]
except KeyError:
out_id = self.find_id(id, name)
self.name2id_dict[name] = out_id
return out_id
def get_id_from_id(self, id): return id
def find_id(self, id, name):
"""Find the proper id to use with given id and name"""
try: return self.name2id_func(name)
except KeyError: return id
class DefinedMap(Map):
"""Map names and ids on source side to appropriate ids on dest side
Like map, but initialize with user-defined mapping string, which
supersedes Map.
"""
def __init__(self, name2id_func, mapping_string):
"""Initialize object with given mapping string
The mapping_string should consist of a number of lines, each which
should have the form "source_id_or_name:dest_id_or_name". Do user
mapping unless user is false, then do group.
"""
Map.__init__(self, name2id_func)
self.name_mapping_dict = {}; self.id_mapping_dict = {}
for line in mapping_string.split('\n'):
line = line.strip()
if not line: continue
comps = line.split(':')
if not len(comps) == 2:
log.Log.FatalError("Error parsing mapping file, bad line: "
+ line)
old, new = comps
try: self.id_mapping_dict[int(old)] = self.init_get_new_id(new)
except ValueError:
self.name_mapping_dict[old] = self.init_get_new_id(new)
def init_get_new_id(self, id_or_name):
"""Return id of id_or_name, failing if cannot. Used in __init__"""
try: return int(id_or_name)
except ValueError:
try: id = self.name2id_func(id_or_name)
except KeyError:
log.Log.FatalError("Cannot get id for user or group name"
+ id_or_name)
return id
def get_id_from_id(self, id): return self.id_mapping_dict.get(id, id)
def find_id(self, id, name):
"""Find proper id to use when source file has give id and name"""
try: return self.name_mapping_dict[name]
except KeyError:
try: return self.id_mapping_dict[id]
except KeyError: return Map.find_id(self, id, name)
def init_user_mapping(mapping_string = None):
"""Initialize user mapping with given mapping string or None"""
global UserMap
name2id_func = lambda name: pwd.getpwnam(name)[2]
if mapping_string: UserMap = DefinedMap(name2id_func, mapping_string)
else: UserMap = Map(name2id_func)
def init_group_mapping(mapping_string):
"""Initialize the group mapping dictionary with given mapping string"""
global GroupMap
name2id_func = lambda name: grp.getgrnam(name)[2]
if mapping_string: GroupMap = DefinedMap(name2id_func, mapping_string)
else: GroupMap = Map(name2id_func)
"""commontest - Some functions and constants common to several test cases""" """commontest - Some functions and constants common to several test cases"""
import os, sys import os, sys, code
from rdiff_backup.log import Log from rdiff_backup.log import Log
from rdiff_backup.rpath import RPath from rdiff_backup.rpath import RPath
from rdiff_backup import Globals, Hardlink, SetConnections, Main, \ from rdiff_backup import Globals, Hardlink, SetConnections, Main, \
...@@ -363,3 +363,9 @@ def MirrorTest(source_local, dest_local, list_of_dirnames, ...@@ -363,3 +363,9 @@ def MirrorTest(source_local, dest_local, list_of_dirnames,
_reset_connections(src_rp, dest_rp) _reset_connections(src_rp, dest_rp)
assert CompareRecursive(src_rp, dest_rp, compare_hardlinks) assert CompareRecursive(src_rp, dest_rp, compare_hardlinks)
Main.force = old_force_val Main.force = old_force_val
def raise_interpreter(use_locals = None):
"""Start python interpreter, with local variables if locals is true"""
if use_locals: local_dict = locals()
else: local_dict = globals()
code.InteractiveConsole(local_dict).interact()
import unittest, pwd, grp, code
from commontest import *
from rdiff_backup import user_group
class UserGroupTest(unittest.TestCase):
"""Test user and group functionality"""
def test_basic_conversion(self):
"""Test basic id2name. May need to modify for different systems"""
user_group.uid2uname_dict = {}; user_group.gid2gname_dict = {}
assert user_group.uid2uname(0) == "root"
assert user_group.uid2uname(0) == "root"
assert user_group.gid2gname(0) == "root"
assert user_group.gid2gname(0) == "root"
def test_default_mapping(self):
"""Test the default user mapping"""
Globals.isdest = 1
rootid = 0
binid = pwd.getpwnam('bin')[2]
syncid = pwd.getpwnam('sync')[2]
user_group.init_user_mapping()
assert user_group.UserMap.get_id(0) == 0
assert user_group.UserMap.get_id(0, 'bin') == binid
assert user_group.UserMap.get_id(binid, 'sync') == syncid
def test_user_mapping(self):
"""Test the user mapping file through the DefinedMap class"""
mapping_string = """
root:bin
bin:root
500:501
0:sync
sync:0"""
Globals.isdest = 1
rootid = 0
binid = pwd.getpwnam('bin')[2]
syncid = pwd.getpwnam('sync')[2]
daemonid = pwd.getpwnam('daemon')[2]
user_group.init_user_mapping(mapping_string)
assert user_group.UserMap.get_id(rootid, 'root') == binid
assert user_group.UserMap.get_id(binid, 'bin') == rootid
assert user_group.UserMap.get_id(0) == syncid
assert user_group.UserMap.get_id(syncid, 'sync') == 0
assert user_group.UserMap.get_id(500) == 501
assert user_group.UserMap.get_id(501) == 501
assert user_group.UserMap.get_id(123, 'daemon') == daemonid
if 0: code.InteractiveConsole(globals()).interact()
if __name__ == "__main__": unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment