Commit addb7413 authored by Xavier Thompson's avatar Xavier Thompson

simplehttpserver: Fix forbidden writes

parent 4c304fcf
...@@ -32,6 +32,13 @@ from six.moves import range ...@@ -32,6 +32,13 @@ from six.moves import range
from zc.buildout.buildout import bool_option from zc.buildout.buildout import bool_option
def issubpathof(subpath, path):
subpath = os.path.abspath(subpath)
path = os.path.abspath(path)
relpath = os.path.relpath(subpath, start=path)
return not relpath.startswith(os.pardir)
class Recipe(GenericBaseRecipe): class Recipe(GenericBaseRecipe):
def __init__(self, buildout, name, options): def __init__(self, buildout, name, options):
......
...@@ -10,6 +10,8 @@ import cgi, errno ...@@ -10,6 +10,8 @@ import cgi, errno
from slapos.util import str2bytes from slapos.util import str2bytes
from . import issubpathof
class ServerHandler(SimpleHTTPRequestHandler): class ServerHandler(SimpleHTTPRequestHandler):
...@@ -73,30 +75,32 @@ class ServerHandler(SimpleHTTPRequestHandler): ...@@ -73,30 +75,32 @@ class ServerHandler(SimpleHTTPRequestHandler):
file_open_mode = 'wb' if ('clear' in form and form['clear'].value in ('1', b'1')) else 'ab' file_open_mode = 'wb' if ('clear' in form and form['clear'].value in ('1', b'1')) else 'ab'
self.writeFile(file_path, file_content, file_open_mode) self.writeFile(file_path, file_content, file_open_mode)
self.respond(200, type=self.headers['Content-Type'])
self.wfile.write(b"Content written to %s" % str2bytes(file_path))
def writeFile(self, filename, content, method='ab'): def writeFile(self, filename, content, method='ab'):
file_path = os.path.abspath(os.path.join(self.document_path, filename)) file_path = os.path.abspath(os.path.join(self.document_path, filename))
if not file_path.startswith(self.document_path): # Check writing there is allowed
if not issubpathof(file_path, self.document_path):
self.respond(403, 'text/plain') self.respond(403, 'text/plain')
self.wfile.write(b"Forbidden") self.wfile.write(b"Forbidden")
return
# Create missing directories if needed
try: try:
os.makedirs(os.path.dirname(file_path)) os.makedirs(os.path.dirname(file_path))
except OSError as exception: except OSError as exception:
if exception.errno != errno.EEXIST: if exception.errno != errno.EEXIST:
logging.error('Failed to create file in %s. The error is \n%s' % ( logging.error('Failed to create file in %s. The error is \n%s',
file_path, str(exception))) file_path, str(exception))
# Write content to file
logging.info('Writing received content to file %s' % file_path) logging.info('Writing received content to file %s', file_path)
try: try:
with open(file_path, method) as myfile: with open(file_path, method) as myfile:
myfile.write(content) myfile.write(content)
logging.info('Done.') logging.info('Done.')
except IOError as e: except IOError as e:
logging.error('Something happened while processing \'writeFile\'. The message is %s' % logging.error('Something happened while processing \'writeFile\'. The message is %s',
str(e)) str(e))
self.respond(200, type=self.headers['Content-Type'])
self.wfile.write(b"Content written to %s" % str2bytes(filename))
class HTTPServerV6(HTTPServer): class HTTPServerV6(HTTPServer):
address_family = socket.AF_INET6 address_family = socket.AF_INET6
......
import errno
import os import os
import shutil import shutil
import tempfile import tempfile
...@@ -88,6 +89,37 @@ class SimpleHTTPServerTest(unittest.TestCase): ...@@ -88,6 +89,37 @@ class SimpleHTTPServerTest(unittest.TestCase):
self.base_path self.base_path
) )
def test_write_outside_root_dir_should_fail(self):
self.setUpRecipe({'allow-write': 'true'})
server_base_url = self.startServer()
# A file outside the server's root directory
hack_path = os.path.join(self.install_dir, 'forbidden', 'hack.txt')
hack_content = "You should not be able to write to hack.txt"
# post with multipart/form-data encoding
resp = requests.post(
server_base_url,
files={
'path': hack_path,
'content': hack_content,
},
)
# First check for actual access to forbidden files
try:
with open(hack_path) as f:
content = f.read()
if content == hack_content:
self.fail(content)
self.fail("%s should not have been created" % hack_path)
except IOError as e:
if e.errno != errno.ENOENT:
raise
self.assertFalse(os.path.exists(os.path.dirname(hack_path)))
# Now check for proper response
self.assertEqual(resp.status_code, requests.codes.forbidden)
self.assertEqual(resp.text, 'Forbidden')
def test_write(self): def test_write(self):
self.setUpRecipe({'allow-write': 'true'}) self.setUpRecipe({'allow-write': 'true'})
server_base_url = self.startServer() server_base_url = self.startServer()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment