Commit 86eeb759 authored by Jérome Perrin's avatar Jérome Perrin

open_api: support request body and fix py3 compatibility

parent 5e86291a
Pipeline #34495 failed with stage
in 0 seconds
......@@ -25,9 +25,13 @@
#
##############################################################################
import base64
import binascii
import json
import typing
import six
from six.moves.urllib.parse import unquote
if typing.TYPE_CHECKING:
from typing import Any, Callable, Optional
from erp5.component.document.OpenAPITypeInformation import OpenAPIOperation, OpenAPIParameter
......@@ -258,10 +262,8 @@ class OpenAPIService(XMLObject):
parameter,
parameter.getJSONSchema(),
)
requestBody = self.validateParameter(
'request body',
requestBody = self.validateRequestBody(
operation.getRequestBodyValue(request),
{},
operation.getRequestBodyJSONSchema(request),
)
if requestBody:
......@@ -296,6 +298,37 @@ class OpenAPIService(XMLObject):
parameter_name=parameter_name, e=e.message), str(e))
return parameter_value
security.declareProtected(
Permissions.AccessContentsInformation, 'validateRequestBody')
def validateRequestBody(self, parameter_value, schema):
# type: (str, dict) -> Any
"""Validate the request body raising a ParameterValidationError
when the parameter is not valid according to the corresponding schema.
"""
if schema is not None:
if schema.get('type') == 'string':
if schema.get('format') == 'base64':
try:
return base64.b64decode(parameter_value)
except (binascii.Error, TypeError) as e:
if isinstance(e, TypeError):
# BBB on python2 this raises a generic type error
# but we don't want to ignore potential TypeErrors
# on python3 here
if six.PY3:
raise
raise ParameterValidationError(
'Error validating request body: {e}'.format(e=str(e)))
elif schema.get('format') == 'binary':
return parameter_value or b''
return self.validateParameter(
'request body',
parameter_value,
{},
schema,
)
def executeMethod(self, request):
# type: (HTTPRequest) -> Any
operation = self.getMatchingOperation(request)
......
......@@ -90,7 +90,7 @@ ModuleSecurityInfo(__name__).declarePublic(
)
# On python2, make sure we use UTF-8 strings for the json schemas, so that we don't
# have ugly u' prefixs in the reprs. This also transforms the collections.OrderedDict
# have ugly u' prefixes in the reprs. This also transforms the collections.OrderedDict
# to simple dicts, because the former also have an ugly representation.
# http://stackoverflow.com/a/13105359
if six.PY2:
......@@ -105,7 +105,7 @@ if six.PY2:
return [byteify(element) for element in string]
elif isinstance(string, tuple):
return tuple(byteify(element) for element in string)
elif isinstance(string, unicode):
elif isinstance(string, six.text_type):
return string.encode('utf-8')
else:
return string
......@@ -175,7 +175,9 @@ class OpenAPIOperation(dict):
# type: (HTTPRequest) -> Optional[dict]
"""Returns the schema for the request body, or None if no `requestBody` defined
"""
request_content_type = request.getHeader('content-type')
exact_request_content_type = request.getHeader('content-type')
wildcard_request_content_type = '%s/*' % ((exact_request_content_type or '').split('/')[0])
for request_content_type in exact_request_content_type, wildcard_request_content_type, '*/*':
# TODO there might be $ref ?
request_body_definition = self.get(
'requestBody', {'content': {}})['content'].get(request_content_type)
......@@ -340,7 +342,10 @@ class OpenAPITypeInformation(ERP5TypeInformation):
security.declareObjectProtected(Permissions.AccessContentsInformation)
def getSchema(self):
stream = io.BytesIO(self.getTextContent() or b'{}')
text_content = self.getTextContent() or '{}'
if six.PY3:
text_content = text_content.encode()
stream = io.BytesIO(text_content)
if self.getContentType() == 'application/x-yaml':
try:
import yaml # pylint:disable=import-error
......
......@@ -25,6 +25,13 @@
#
##############################################################################
import six
# pylint:disable=no-name-in-module
if six.PY2:
from base64 import encodestring as base64_encodebytes
else:
from base64 import encodebytes as base64_encodebytes
# pylint:enable=no-name-in-module
import io
import json
import unittest
......@@ -39,7 +46,7 @@ from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
class OpenAPITestCase(ERP5TypeTestCase):
_type_id = NotImplemented # type: str
_open_api_schema = NotImplemented # type: bytes
_open_api_schema = NotImplemented # type: str
_open_api_schema_content_type = 'application/json'
_public_api = True
......@@ -375,7 +382,7 @@ class TestOpenAPIServicePetController(OpenAPIPetStoreTestCase):
class TestOpenAPIServiceYaml(OpenAPITestCase):
_type_id = 'Test Open API YAML'
_open_api_schema_content_type = 'application/x-yaml'
_open_api_schema = b'''
_open_api_schema = '''
openapi: 3.0.3
info:
title: TestOpenAPIServiceYaml
......@@ -456,7 +463,7 @@ class TestPathParameterSerialization(OpenAPITestCase):
}
}
}
}).encode()
})
def test_primitive_parameter_serialization(self):
self.addPythonScript(
......@@ -532,7 +539,7 @@ class TestQueryParameterSerialization(OpenAPITestCase):
}
}
}
}).encode()
})
def test_array_parameter_serialization(self):
self.addPythonScript(
......@@ -707,7 +714,7 @@ class TestOpenAPINonAsciiParameters(OpenAPIPetStoreTestCase):
class TestOpenAPICommonParameters(OpenAPIPetStoreTestCase):
_type_id = 'Test Open API Common Parameters'
_open_api_schema = (
b'''
'''
{
"openapi": "3.0.3",
"info": {
......@@ -718,7 +725,7 @@ class TestOpenAPICommonParameters(OpenAPIPetStoreTestCase):
'''
# https://swagger.io/docs/specification/describing-parameters/#common-for-path
b'''
'''
"/common-for-path": {
"parameters": [
{
......@@ -749,7 +756,7 @@ class TestOpenAPICommonParameters(OpenAPIPetStoreTestCase):
},'''
# https://swagger.io/docs/specification/describing-parameters/#common-for-various-paths
b'''
'''
"/common-for-various-paths": {
"get": {
"operationId": "testGET2",
......@@ -761,7 +768,7 @@ class TestOpenAPICommonParameters(OpenAPIPetStoreTestCase):
'''
# here we also excercice $refs in parameter schemas
b'''
'''
"$ref": "#/components/schemas/custom-number"
}
},
......@@ -781,7 +788,7 @@ class TestOpenAPICommonParameters(OpenAPIPetStoreTestCase):
# https://spec.openapis.org/oas/v3.1.0#fixed-fields-6
# $refs: Allows for a referenced definition of this path item.
# The referenced structure MUST be in the form of a Path Item Object.
b'''
'''
"/alias": {
"$ref": "#/paths/~1common-for-path"
}
......@@ -895,7 +902,7 @@ class TestOpenAPIMissingParameters(OpenAPIPetStoreTestCase):
}
}
}
}).encode()
})
def test_required_query(self):
self.addPythonScript(
......@@ -980,7 +987,7 @@ class TestOpenAPIErrorHandling(OpenAPIPetStoreTestCase):
self.addPythonScript(
'TestPetStoreOpenAPI_findPetsByStatus',
'status',
'1/0',
'1//0',
)
response = self.publish(
self.connector.getPath() + '/pet/findByStatus?status=available')
......@@ -1097,7 +1104,7 @@ class TestPathParameterAndAcquisition(OpenAPIPetStoreTestCase):
"""
def afterSetUp(self):
super(TestPathParameterAndAcquisition, self).afterSetUp()
if not '789' in self.portal.portal_web_services.objectIds():
if '789' not in self.portal.portal_web_services.objectIds():
self.portal.portal_web_services.newContent(
id='789',
portal_type=self.portal.portal_web_services.allowedContentTypes()
......@@ -1242,3 +1249,72 @@ class TestURLPathWithWebSiteAndVirtualHost(OpenAPIPetStoreTestCase):
self.connector.getRelativeUrl()
))
self.assertEqual(response.getBody(), b'"ok"')
class TestOpenAPIRequestBody(OpenAPITestCase):
_type_id = 'Test Open API Request Body'
_open_api_schema = json.dumps(
{
'openapi': '3.0.3',
'info': {
'title': 'TestOpenAPIRequestBody',
'version': '0.0.0'
},
'paths': {
'/post': {
'post': {
'operationId': 'testPostByContentType',
'requestBody': {
'content': {
'image/*': {
'schema': {
'type': 'string',
'format': 'binary',
}
},
'application/x-base64': {
'schema': {
'type': 'string',
'format': 'base64',
}
}
}
}
}
}
}
})
def test_request_body_content_encoding(self):
self.addPythonScript(
'TestOpenAPIRequestBody_testPostByContentType',
'body=None',
'container.REQUEST.RESPONSE.setHeader("Content-Type", "application/octet-stream")\n'
'return body',
)
response = self.publish(
self.connector.getPath() + '/post',
request_method='POST',
stdin=io.BytesIO(b'png file content'),
env={"CONTENT_TYPE": 'image/png'})
self.assertEqual(response.getBody(), b'png file content')
self.assertEqual(response.getStatus(), 200)
response = self.publish(
self.connector.getPath() + '/post',
request_method='POST',
stdin=io.BytesIO(base64_encodebytes(b'base64 file content')),
env={"CONTENT_TYPE": 'application/x-base64'})
self.assertEqual(response.getBody(), b'base64 file content')
self.assertEqual(response.getStatus(), 200)
response = self.publish(
self.connector.getPath() + '/post',
request_method='POST',
stdin=io.BytesIO(b'not base64'),
env={"CONTENT_TYPE": 'application/x-base64'})
self.assertEqual(response.getStatus(), 400)
body = json.loads(response.getBody())
self.assertEqual(body['type'], 'parameter-validation-error')
self.assertIn('Error validating request body:', body['title'])
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment