Commit 47a30bcf authored by bescoto's avatar bescoto

Added mapping by name (not id) for ACLs.


git-svn-id: http://svn.savannah.nongnu.org/svn/rdiff-backup@440 2b77aa54-bcbc-44c9-a7ec-4f6cf2b41109
parent 322197c3
......@@ -30,7 +30,8 @@ from __future__ import generators
import base64, errno, re
try: import posix1e
except ImportError: pass
import static, Globals, metadata, connection, rorpiter, log, C, rpath
import static, Globals, metadata, connection, rorpiter, log, C, \
rpath, user_group
class ExtendedAttributes:
"""Hold a file's extended attribute information"""
......@@ -172,32 +173,111 @@ class ExtendedAttributesFile(metadata.FlatFile):
static.MakeClass(ExtendedAttributesFile)
class AccessControlList:
class AccessControlLists:
"""Hold a file's access control list information
Since ACL objects cannot be picked, store everything as text, in
self.acl_text and self.def_acl_text.
Since posix1e.ACL objects cannot be picked, and because they lack
user/group name information, store everything in self.entry_list
and self.default_entry_list.
"""
def __init__(self, index, acl_text = None, def_acl_text = None):
def __init__(self, index, acl_text = None):
"""Initialize object with index and possibly acl_text"""
self.index = index
if acl_text: # Check validity of ACL, reorder if necessary
ACL = posix1e.ACL(text=acl_text)
assert ACL.valid(), "Bad ACL: "+acl_text
self.acl_text = str(ACL)
else: self.acl_text = None
if def_acl_text:
def_ACL = posix1e.ACL(text=def_acl_text)
assert def_ACL.valid(), "Bad default ACL: "+def_acl_text
self.def_acl_text = str(def_ACL)
else: self.def_acl_text = None
if acl_text: self.set_from_text(acl_text)
else: self.entry_list = self.default_entry_list = None
def set_from_text(self, text):
"""Set self.entry_list and self.default_entry_list from text"""
self.entry_list, self.default_entry_list = [], []
for line in text.split('\n'):
comment_pos = text.find('#')
if comment_pos >= 0: line = line[:comment_pos]
line = line.strip()
if not line: continue
if line.startswith('default:'):
entrytuple = self.text_to_entrytuple(line[8:])
self.default_entry_list.append(entrytuple)
else: self.entry_list.append(self.text_to_entrytuple(line))
def __str__(self):
"""Return human-readable string"""
return ("acl_text: %s\ndef_acl_text: %s" %
(self.acl_text, self.def_acl_text))
"""Return text version of acls"""
slist = map(self.entrytuple_to_text, self.entry_list)
if self.default_entry_list:
slist.extend(map(lambda e: "default:" + self.entrytuple_to_text(e),
self.default_entry_list))
return "\n".join(slist)
def entrytuple_to_text(self, entrytuple):
"""Return text version of entrytuple, as in getfacl"""
type, name_pair, perms = entrytuple
if type == posix1e.ACL_USER_OBJ:
text = 'user::'
elif type == posix1e.ACL_USER:
uid, uname = name_pair
text = 'user:%s:' % (uname or uid)
elif type == posix1e.ACL_GROUP_OBJ:
text = 'group::'
elif type == posix1e.ACL_GROUP:
gid, gname = name_pair
text = 'group:%s:' % (gname or gid)
elif type == posix1e.ACL_MASK:
text = 'mask::'
else:
assert type == posix1e.ACL_OTHER, type
text = 'other::'
permstring = '%s%s%s' % (perms & 4 and 'r' or '-',
perms & 2 and 'w' or '-',
perms & 1 and 'x' or '-')
return text+permstring
def text_to_entrytuple(self, text):
"""Return entrytuple given text like 'user:foo:r--'"""
typetext, qualifier, permtext = text.split(':')
if qualifier:
try: id = int(qualifier)
except ValueError: namepair = (None, qualifier)
else: namepair = (uid, None)
if typetext == 'user': type = posix1e.ACL_USER
else:
assert typetext == 'group', (typetext, text)
type = posix1e.ACL_GROUP
else:
namepair = None
if typetext == 'user': type = posix1e.ACL_USER_OBJ
elif typetext == 'group': type = posix1e.ACL_GROUP_OBJ
elif typetext == 'mask': type = posix1e.ACL_MASK
else:
assert typetext == 'other', (typetext, text)
type = posix1e.ACL_OTHER
assert len(permtext) == 3, (permtext, text)
read, write, execute = permtext
perms = ((read == 'r') << 2 |
(write == 'w') << 1 |
(execute == 'x'))
return (type, namepair, perms)
def cmp_entry_list(self, l1, l2):
"""True if the lists have same entries. Assume preordered"""
if not l1: return l1 == l2
if not l2 or len(l1) != len(l2): return 0
for i in range(len(l1)):
type1, namepair1, perms1 = l1[i]
type2, namepair2, perms2 = l2[i]
if type1 != type2 or perms1 != perms2: return 0
if namepair1 == namepair2: continue
if not namepair1 or not namepair2: return 0
(id1, name1), (id2, name2) = namepair1, namepair2
if name1:
if name1 == name2: continue
else: return 0
if name2: return 0
if id1 != id2: return 0
return 1
def __eq__(self, acl):
"""Compare self and other access control list
......@@ -208,11 +288,10 @@ class AccessControlList:
"""
assert isinstance(acl, self.__class__)
if self.index != acl.index: return 0
if self.is_basic(): return acl.is_basic()
if acl.is_basic(): return self.is_basic()
if self.acl_text != acl.acl_text: return 0
if not self.def_acl_text and not acl.def_acl_text: return 1
return self.def_acl_text == acl.def_acl_text
return (self.cmp_entry_list(self.entry_list, acl.entry_list) and
self.cmp_entry_list(self.default_entry_list,
acl.default_entry_list))
def __ne__(self, acl): return not self.__eq__(acl)
def eq_verbose(self, acl):
......@@ -220,16 +299,12 @@ class AccessControlList:
if self.index != acl.index:
print "index %s not equal to index %s" % (self.index, acl.index)
return 0
if self.acl_text != acl.acl_text:
print "ACL texts not equal:"
print self.acl_text
print acl.acl_text
if not self.cmp_entry_list(self.entry_list, acl.entry_list):
print "ACL entries for %s compare differently" % (self.index,)
return 0
if (self.def_acl_text != acl.def_acl_text and
(self.def_acl_text or acl.def_acl_text)):
print "Unequal default acl texts:"
print self.def_acl_text
print acl.def_acl_text
if not self.cmp_entry_list(self.default_entry_list,
acl.default_entry_list):
print "Default ACL entries for %s do not compare" % (self.index,)
return 0
return 1
......@@ -243,133 +318,119 @@ class AccessControlList:
features.
"""
if not self.acl_text and not self.def_acl_text: return 1
lines = self.acl_text.strip().split('\n')
assert len(lines) >= 3, lines
return len(lines) == 3 and not self.def_acl_text
if not self.entry_list and not self.default_entry_list: return 1
assert len(self.entry_list) >= 3, self.entry_list
return len(self.entry_list) == 3 and not self.default_entry_list
def read_from_rp(self, rp):
"""Set self.ACL from an rpath, or None if not supported"""
self.acl_text, self.def_acl_text = \
rp.conn.eas_acls.get_acl_text_from_rp(rp)
self.entry_list, self.default_entry_list = \
rp.conn.eas_acls.get_acl_lists_from_rp(rp)
def write_to_rp(self, rp):
"""Write current access control list to RPath rp"""
rp.conn.eas_acls.set_rp_acl(rp, self.acl_text, self.def_acl_text)
def acl_to_list(self, acl):
"""Return list representation of posix1e.ACL object
rp.conn.eas_acls.set_rp_acl(rp, self.entry_list,
self.default_entry_list)
ACL objects cannot be pickled, so this representation keeps
the structure while adding that option. Also we insert the
username along with the id, because that information will be
lost when moved to another system.
The result will be a list of tuples. Each tuple will have the
form (acltype, (uid or gid, uname or gname) or None,
permissions as an int).
"""
def entry_to_tuple(entry):
if entry.tag_type == posix1e.ACL_USER:
uid = entry.qualifier
owner_pair = (uid, user_group.uid2uname(uid))
elif entry.tag_type == posix1e.ACL_GROUP:
gid = entry.qualifier
owner_pair = (gid, user_group.gid2gname(gid))
else: owner_pair = None
perms = (entry.permset.read << 2 |
entry.permset.write << 1 |
entry.permset.execute)
return (entry.tag_type, owner_pair, perms)
return map(entry_to_tuple, acl)
def list_to_acl(self, listacl):
"""Return posix1e.ACL object from list representation"""
acl = posix1e.ACL()
for tag, owner_pair, perms in listacl:
entry = posix1e.Entry(acl)
entry.tag_type = tag
if owner_pair:
if tag == posix1e.ACL_USER:
entry.qualifier = user_group.UserMap.get_id(*owner_pair)
else:
assert tag == posix1e.ACL_GROUP
entry.qualifier = user_group.GroupMap.get_id(*owner_pair)
entry.read = perms >> 2
entry.write = perms >> 1 & 1
entry.execute = perms & 1
return acl
def set_rp_acl(rp, acl_text = None, def_acl_text = None):
def set_rp_acl(rp, entry_list = None, default_entry_list = None):
"""Set given rp with ACL that acl_text defines. rp should be local"""
assert rp.conn is Globals.local_connection
if acl_text:
acl = posix1e.ACL(text=acl_text)
assert acl.valid()
if entry_list: acl = list_to_acl(entry_list)
else: acl = posix1e.ACL()
acl.applyto(rp.path)
if rp.isdir():
if def_acl_text:
def_acl = posix1e.ACL(text=def_acl_text)
assert def_acl.valid()
if default_entry_list: def_acl = list_to_acl(default_entry_list)
else: def_acl = posix1e.ACL()
def_acl.applyto(rp.path, posix1e.ACL_TYPE_DEFAULT)
def get_acl_text_from_rp(rp):
"""Returns (acl_text, def_acl_text) from an rpath. Call locally"""
def get_acl_lists_from_rp(rp):
"""Returns (acl_list, def_acl_list) from an rpath. Call locally"""
assert rp.conn is Globals.local_connection
try: acl_text = str(posix1e.ACL(file=rp.path))
try: acl = posix1e.ACL(file=rp.path)
except IOError, exc:
if exc[0] == errno.EOPNOTSUPP: acl_text = None
if exc[0] == errno.EOPNOTSUPP: acl = None
else: raise
if rp.isdir():
try: def_acl_text = str(posix1e.ACL(filedef=rp.path))
try: def_acl = posix1e.ACL(filedef=rp.path)
except IOError, exc:
if exc[0] == errno.EOPNOTSUPP: def_acl_text = None
if exc[0] == errno.EOPNOTSUPP: def_acl = None
else: raise
else: def_acl_text = None
return (acl_text, def_acl_text)
else: def_acl = None
return (acl and acl_to_list(acl), def_acl and acl_to_list(def_acl))
def acl_to_list(acl):
"""Return list representation of posix1e.ACL object
ACL objects cannot be pickled, so this representation keeps
the structure while adding that option. Also we insert the
username along with the id, because that information will be
lost when moved to another system.
The result will be a list of tuples. Each tuple will have the
form (acltype, (uid or gid, uname or gname) or None,
permissions as an int).
"""
def entry_to_tuple(entry):
if entry.tag_type == posix1e.ACL_USER:
uid = entry.qualifier
owner_pair = (uid, user_group.uid2uname(uid))
elif entry.tag_type == posix1e.ACL_GROUP:
gid = entry.qualifier
owner_pair = (gid, user_group.gid2gname(gid))
else: owner_pair = None
perms = (entry.permset.read << 2 |
entry.permset.write << 1 |
entry.permset.execute)
return (entry.tag_type, owner_pair, perms)
return map(entry_to_tuple, acl)
def list_to_acl(entry_list):
"""Return posix1e.ACL object from list representation"""
acl = posix1e.ACL()
for tag, owner_pair, perms in entry_list:
entry = posix1e.Entry(acl)
entry.tag_type = tag
if owner_pair:
if tag == posix1e.ACL_USER:
entry.qualifier = user_group.UserMap.get_id(*owner_pair)
else:
assert tag == posix1e.ACL_GROUP, (tag, owner_pair, perms)
entry.qualifier = user_group.GroupMap.get_id(*owner_pair)
entry.permset.read = perms >> 2
entry.permset.write = perms >> 1 & 1
entry.permset.execute = perms & 1
return acl
def acl_compare_rps(rp1, rp2):
"""Return true if rp1 and rp2 have same acl information"""
acl1 = AccessControlList(rp1.index)
acl1 = AccessControlLists(rp1.index)
acl1.read_from_rp(rp1)
acl2 = AccessControlList(rp2.index)
acl2 = AccessControlLists(rp2.index)
acl2.read_from_rp(rp2)
return acl1 == acl2
def ACL2Record(acl):
"""Convert an AccessControlList object into a text record"""
start = "# file: %s\n%s" % (C.acl_quote(acl.get_indexpath()), acl.acl_text)
if not acl.def_acl_text: return start
default_lines = acl.def_acl_text.strip().split('\n')
default_text = '\ndefault:'.join(default_lines)
return "%sdefault:%s\n" % (start, default_text)
"""Convert an AccessControlLists object into a text record"""
return '# file: %s\n%s\n' % (C.acl_quote(acl.get_indexpath()), str(acl))
def Record2ACL(record):
"""Convert text record to an AccessControlList object"""
lines = record.split('\n')
first_line = lines.pop(0)
"""Convert text record to an AccessControlLists object"""
newline_pos = record.find('\n')
first_line = record[:newline_pos]
if not first_line.startswith('# file: '):
raise metadata.ParsingError("Bad record beginning: "+ first_line)
filename = first_line[8:]
if filename == '.': index = ()
else: index = tuple(C.acl_unquote(filename).split('/'))
normal_entries = []; default_entries = []
for line in lines:
if line.startswith('default:'): default_entries.append(line[8:])
else: normal_entries.append(line)
return AccessControlList(index, acl_text='\n'.join(normal_entries),
def_acl_text='\n'.join(default_entries))
return AccessControlLists(index, record[newline_pos:])
class ACLExtractor(EAExtractor):
"""Iterate AccessControlList objects from the ACL information file
"""Iterate AccessControlLists objects from the ACL information file
Except for the record_to_object method, we can reuse everything in
the EAExtractor class because the file formats are so similar.
......@@ -390,7 +451,7 @@ class AccessControlListFile(metadata.FlatFile):
collated = rorpiter.CollateIterators(rorp_iter, acl_iter)
for rorp, acl in collated:
assert rorp, "Missing rorp for index %s" % (acl.index,)
if not acl: acl = AccessControlList(rorp.index)
if not acl: acl = AccessControlLists(rorp.index)
rorp.set_acl(acl)
yield rorp
......@@ -433,7 +494,7 @@ def rpath_acl_get(rp):
This overrides a function in the rpath module.
"""
acl = AccessControlList(rp.index)
acl = AccessControlLists(rp.index)
if not rp.issym(): acl.read_from_rp(rp)
return acl
rpath.acl_get = rpath_acl_get
......
......@@ -141,7 +141,7 @@ def init_user_mapping(mapping_string = None):
if mapping_string: UserMap = DefinedMap(name2id_func, mapping_string)
else: UserMap = Map(name2id_func)
def init_group_mapping(mapping_string):
def init_group_mapping(mapping_string = None):
"""Initialize the group mapping dictionary with given mapping string"""
global GroupMap
name2id_func = lambda name: grp.getgrnam(name)[2]
......
import unittest, os, time, cStringIO
from commontest import *
from rdiff_backup.eas_acls import *
from rdiff_backup import Globals, rpath, Time
from rdiff_backup import Globals, rpath, Time, user_group
user_group.init_user_mapping()
user_group.init_group_mapping()
tempdir = rpath.RPath(Globals.local_connection, "testfiles/output")
restore_dir = rpath.RPath(Globals.local_connection,
"testfiles/restore_out")
......@@ -194,40 +196,40 @@ user.empty
class ACLTest(unittest.TestCase):
"""Test access control lists"""
sample_acl = AccessControlList((),"""user::rwx
sample_acl = AccessControlLists((),"""user::rwx
user:root:rwx
group::r-x
group:root:r-x
mask::r-x
other::---""")
dir_acl = AccessControlList((), """user::rwx
dir_acl = AccessControlLists((), """user::rwx
user:root:rwx
group::r-x
group:root:r-x
mask::r-x
other::---""",
"""user::rwx
user:root:---
group::r-x
mask::r-x
other::---""")
acl1 = AccessControlList(('1',), """user::r--
other::---
default:user::rwx
default:user:root:---
default:group::r-x
default:mask::r-x
default:other::---""")
acl1 = AccessControlLists(('1',), """user::r--
user:ben:---
group::---
group:root:---
mask::---
other::---""")
acl2 = AccessControlList(('2',), """user::rwx
acl2 = AccessControlLists(('2',), """user::rwx
group::r-x
group:ben:rwx
mask::---
other::---""")
acl3 = AccessControlList(('3',), """user::rwx
acl3 = AccessControlLists(('3',), """user::rwx
user:root:---
group::r-x
mask::---
other::---""")
empty_acl = AccessControlList((), "user::rwx\ngroup::---\nother::---")
empty_acl = AccessControlLists((), "user::rwx\ngroup::---\nother::---")
acl_testdir1 = rpath.RPath(Globals.local_connection, 'testfiles/acl_test1')
acl_testdir2 = rpath.RPath(Globals.local_connection, 'testfiles/acl_test2')
def make_temp(self):
......@@ -239,25 +241,25 @@ other::---""")
def testBasic(self):
"""Test basic writing and reading of ACLs"""
self.make_temp()
new_acl = AccessControlList(())
new_acl = AccessControlLists(())
tempdir.chmod(0700)
new_acl.read_from_rp(tempdir)
assert new_acl.is_basic(), new_acl.acl_text
assert new_acl.is_basic(), str(new_acl)
assert not new_acl == self.sample_acl
assert new_acl != self.sample_acl
assert new_acl == self.empty_acl, \
(new_acl.acl_text, self.empty_acl.acl_text)
(str(new_acl), str(self.empty_acl))
self.sample_acl.write_to_rp(tempdir)
new_acl.read_from_rp(tempdir)
assert new_acl.acl_text == self.sample_acl.acl_text, \
(new_acl.acl_text, self.sample_acl.acl_text)
assert str(new_acl) == str(self.sample_acl), \
(str(new_acl), str(self.sample_acl))
assert new_acl == self.sample_acl
def testBasicDir(self):
"""Test reading and writing of ACL w/ defaults to directory"""
self.make_temp()
new_acl = AccessControlList(())
new_acl = AccessControlLists(())
new_acl.read_from_rp(tempdir)
assert new_acl.is_basic()
assert new_acl != self.dir_acl
......@@ -273,7 +275,12 @@ other::---""")
"""Test writing a record and reading it back"""
record = ACL2Record(self.sample_acl)
new_acl = Record2ACL(record)
assert new_acl == self.sample_acl
if new_acl != self.sample_acl:
print "New_acl", new_acl.entry_list
print "sample_acl", self.sample_acl.entry_list
print "New_acl text", str(new_acl)
print "sample acl text", str(self.sample_acl)
assert 0
record2 = ACL2Record(self.dir_acl)
new_acl2 = Record2ACL(record2)
......@@ -358,7 +365,7 @@ other::---
Time.setcurtime(10000)
AccessControlListFile.open_file()
for rp in [self.acl_testdir1, rp1, rp2, rp3]:
acl = AccessControlList(rp.index)
acl = AccessControlLists(rp.index)
acl.read_from_rp(rp)
AccessControlListFile.write_object(acl)
AccessControlListFile.close_file()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment