Commit 5224c417 authored by Rafael Monnerat's avatar Rafael Monnerat

Update from upstream/master

parents a4c33bde dc9ffa12
......@@ -808,7 +808,7 @@ class TestAuthenticationPolicy(ERP5TypeTestCase):
preference = self.portal.portal_catalog.getResultValue(
portal_type='System Preference',
title='Authentication',)
# Here we activate the "password should contain usename" policy
# Here we activate the "password should contain username" policy
# as a way to check that password reset checks are done in the
# context of the login
preference.setPrefferedForceUsernameCheckInPassword(1)
......@@ -856,8 +856,11 @@ class TestAuthenticationPolicy(ERP5TypeTestCase):
# now with a password complying to the policy
ret = submit_reset_password_dialog('ok')
self.assertEqual(httplib.FOUND, ret.getStatus())
self.assertTrue(ret.getHeader('Location').endswith(
'/login_form?portal_status_message=Password+changed.'))
redirect_url = urlparse.urlparse(ret.getHeader("Location"))
self.assertEqual(redirect_url.path, '{}/login_form'.format(self.portal.absolute_url_path()))
redirect_url_params = urlparse.parse_qsl(redirect_url.query)
self.assertIn(('portal_status_message', 'Password changed.'), redirect_url_params)
self.assertIn(('portal_status_level', 'success'), redirect_url_params)
def test_PreferenceTool_changePassword_checks_policy(self):
person = self.createUser(self.id(), password='current')
......@@ -918,7 +921,7 @@ class TestAuthenticationPolicy(ERP5TypeTestCase):
# long enough password is accepted
ret = submit_change_password_dialog('long_enough_password')
# When password reset is succesful, user is logged out
# When password reset is successful, user is logged out
self.assertEqual(httplib.FOUND, ret.getStatus())
self.assertEqual(self.portal.portal_preferences.absolute_url(),
ret.getHeader("Location"))
......
......@@ -27,12 +27,13 @@
#
##############################################################################
import unittest
import six
from six.moves.urllib_parse import urlparse, parse_qsl
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.tests.Sequence import SequenceList
from DateTime import DateTime
class TestPasswordTool(ERP5TypeTestCase):
"""
Test reset of password
......@@ -47,6 +48,22 @@ class TestPasswordTool(ERP5TypeTestCase):
self.portal.email_from_address = 'site@example.invalid'
self.portal.MailHost.reset()
self.portal.portal_caches.clearAllCache()
self._createUser("userA")
def _createUser(self, base_name):
person = self.portal.person_module.newContent(
portal_type="Person",
reference=base_name,
default_email_text="{base_name}@example.invalid".format(base_name=base_name))
assignment = person.newContent(portal_type='Assignment')
assignment.open()
login = person.newContent(
portal_type='ERP5 Login',
reference='{base_name}-login'.format(base_name=base_name),
password='{base_name}-password'.format(base_name=base_name),
)
login.validate()
self.tic()
def beforeTearDown(self):
self.abort()
......@@ -92,275 +109,175 @@ class TestPasswordTool(ERP5TypeTestCase):
"Plugin %s should not have authenticated '%s' with password '%s'" %
(plugin_name, login, password))
def stepAddUser(self, sequence=None, sequence_list=None, **kw):
"""
Create a user
"""
person = self.portal.person_module.newContent(portal_type="Person",
reference="userA",
default_email_text="userA@example.invalid")
assignment = person.newContent(portal_type='Assignment')
assignment.open()
login = person.newContent(
portal_type='ERP5 Login',
reference='userA-login',
password='passwordA',
)
login.validate()
def stepCheckPasswordToolExists(self, sequence=None, sequence_list=None, **kw):
"""
Check existence of password tool
"""
self.assertTrue(self.getPasswordTool() is not None)
def stepCheckUserLogin(self, sequence=None, sequence_list=None, **kw):
"""
Check existence of password tool
"""
self._assertUserExists('userA-login', 'passwordA')
def stepCheckUserLoginWithNewPassword(self, sequence=None, sequence_list=None, **kw):
"""
Check existence of password tool
"""
self._assertUserExists('userA-login', 'secret')
def stepCheckUserNotLoginWithBadPassword(self, sequence=None, sequence_list=None, **kw):
"""
Check existence of password tool
"""
self._assertUserDoesNotExists('userA', 'secret')
def stepCheckUserNotLoginWithFormerPassword(self, sequence=None, sequence_list=None, **kw):
"""
Check existence of password tool
"""
self._assertUserDoesNotExists('userA', 'passwordA')
def stepLostPassword(self, sequence=None, sequence_list=None, **kw):
"""
Required a new password
"""
self.portal.portal_password.mailPasswordResetRequest(user_login="userA-login")
def stepTryLostPasswordWithBadUser(self, sequence=None, sequence_list=None, **kw):
"""
Required a new password
"""
self.portal.portal_password.mailPasswordResetRequest(user_login="userZ-login")
def test_password_reset(self):
self._assertUserExists('userA-login', 'userA-password')
self._assertUserDoesNotExists('userA-login', 'bad')
ret = self.portal.portal_password.mailPasswordResetRequest(
user_login='userA-login', REQUEST=self.portal.REQUEST)
def stepCheckNoMailSent(self, sequence=None, sequence_list=None, **kw):
"""
Check mail has not been sent after fill in wrong the form password
"""
last_message = self.portal.MailHost._last_message
self.assertEqual((), last_message)
query_string_param = parse_qsl(urlparse(str(ret)).query)
self.assertIn(("portal_status_message", "An email has been sent to you."), query_string_param)
self.assertIn(("portal_status_level", "success"), query_string_param)
self.tic()
def stepCheckMailSent(self, sequence=None, sequence_list=None, **kw):
"""
Check mail has been sent after fill in the form password
"""
last_message = self.portal.MailHost._last_message
self.assertNotEqual((), last_message)
mfrom, mto, _ = last_message
(mfrom, mto, mbody), = self.portal.MailHost.getMessageList()
self.assertEqual('Portal Administrator <site@example.invalid>', mfrom)
self.assertEqual(['userA@example.invalid'], mto)
reset_key, = list(six.iterkeys(self.portal.portal_password._password_request_dict))
self.assertIn(
('PasswordTool_viewResetPassword?reset_key=' + reset_key).encode(),
mbody)
ret = self.portal.portal_password.changeUserPassword(
user_login="userA-login",
password="new-password",
password_confirm="new-password",
password_key=reset_key)
query_string_param = parse_qsl(urlparse(str(ret)).query)
self.assertIn(("portal_status_message", "Password changed."), query_string_param)
self.assertIn(("portal_status_level", "success"), query_string_param)
self.tic()
self._assertUserExists('userA-login', 'new-password')
self._assertUserDoesNotExists('userA-login', 'userA-password')
# key no longer work
ret = self.portal.portal_password.changeUserPassword(
user_login="userA-login",
password="new-password",
password_confirm="new-password",
password_key=reset_key)
query_string_param = parse_qsl(urlparse(str(ret)).query)
self.assertIn(("portal_status_message", "Key not known. Please ask reset password."), query_string_param)
self.assertIn(("portal_status_level", "error"), query_string_param)
self.tic()
self._assertUserExists('userA-login', 'new-password')
self._assertUserDoesNotExists('userA-login', 'userA-password')
def stepGoToRandomAddress(self, sequence=None, sequence_list=None, **kw):
"""
Call method that change the password
We don't check use of random url in mail here as we have on request
But random is also check by changeUserPassword, so it's the same
"""
key = self.portal.portal_password._password_request_dict.keys()[0]
self.portal.portal_password.changeUserPassword(user_login="userA-login",
password="secret",
password_confirmation="secret",
password_key=key)
# reset cache
self.portal.portal_caches.clearAllCache()
def stepGoToRandomAddressWithBadUserName(self, sequence=None, sequence_list=None, **kw):
"""
Call method that change the password with a bad user name
This must not work
"""
key = self.portal.portal_password._password_request_dict.keys()[0]
sequence.edit(key=key)
self.portal.portal_password.changeUserPassword(user_login="userZ-login",
password="secret",
password_confirmation="secret",
password_key=key)
# reset cache
self.portal.portal_caches.clearAllCache()
def test_password_reset_request_for_non_existing_user(self):
ret = self.portal.portal_password.mailPasswordResetRequest(
user_login='not exist', REQUEST=self.portal.REQUEST)
query_string_param = parse_qsl(urlparse(str(ret)).query)
self.assertIn(("portal_status_message", "An email has been sent to you."), query_string_param)
self.assertIn(("portal_status_level", "success"), query_string_param)
self.tic()
self.assertFalse(self.portal.MailHost.getMessageList())
def stepGoToRandomAddressTwice(self, sequence=None, sequence_list=None, **kw):
"""
As we already change password, this must npot work anylonger
"""
key = sequence.get('key')
self.portal.portal_password.changeUserPassword(user_login="userA-login",
password="passwordA",
password_confirmation="passwordA",
password_key=key)
# reset cache
self.portal.portal_caches.clearAllCache()
def test_password_reset_request_for_wildcard_username(self):
ret = self.portal.portal_password.mailPasswordResetRequest(
user_login='%', REQUEST=self.portal.REQUEST)
query_string_param = parse_qsl(urlparse(str(ret)).query)
self.assertIn(("portal_status_message", "An email has been sent to you."), query_string_param)
self.assertIn(("portal_status_level", "success"), query_string_param)
self.tic()
self.assertFalse(self.portal.MailHost.getMessageList())
def test_password_reset_request_for_different_user(self):
self._createUser('userB')
self.portal.portal_password.mailPasswordResetRequest(
user_login='userA-login', REQUEST=self.portal.REQUEST)
reset_key, = list(six.iterkeys(
self.portal.portal_password._password_request_dict))
ret = self.portal.portal_password.changeUserPassword(
user_login="userB-login",
password="new-password",
password_confirm="new-password",
password_key=reset_key)
query_string_param = parse_qsl(urlparse(str(ret)).query)
self.assertIn(("portal_status_message", "Bad login provided."), query_string_param)
self.assertIn(("portal_status_level", "error"), query_string_param)
self.tic()
self._assertUserExists('userA-login', 'userA-password')
self._assertUserExists('userB-login', 'userB-password')
self._assertUserDoesNotExists('userB-login', 'new-password')
def test_password_reset_unknown_key(self):
self.portal.portal_password.mailPasswordResetRequest(
user_login='userA-login', REQUEST=self.portal.REQUEST)
ret = self.portal.portal_password.changeUserPassword(
user_login="userA-login",
password="new-password",
password_confirm="new-password",
password_key='wrong key')
query_string_param = parse_qsl(urlparse(str(ret)).query)
self.assertIn(("portal_status_message", "Key not known. Please ask reset password."), query_string_param)
self.assertIn(("portal_status_level", "error"), query_string_param)
self.tic()
def stepGoToBadRandomAddress(self, sequence=None, sequence_list=None, **kw):
"""
Try to reset a password with bad random part
"""
self.portal.portal_password.changeUserPassword(user_login="userA-login",
password="secret",
password_confirmation="secret",
password_key="toto")
# reset cache
self.portal.portal_caches.clearAllCache()
def test_password_reset_date_expired(self):
self.portal.portal_password.mailPasswordResetRequest(user_login='userA-login')
(reset_key, (login, date)), = list(six.iteritems(
self.portal.portal_password._password_request_dict))
self.assertTrue(date.isFuture())
self.portal.portal_password._password_request_dict[reset_key] = (
login,
DateTime() - 1
)
ret = self.portal.portal_password.changeUserPassword(
user_login="userA-login",
password="new-password",
password_confirm="new-password",
password_key=reset_key)
query_string_param = parse_qsl(urlparse(str(ret)).query)
self.assertIn(("portal_status_message", "Date has expired."), query_string_param)
self.assertIn(("portal_status_level", "error"), query_string_param)
self.tic()
self._assertUserExists('userA-login', 'userA-password')
self._assertUserDoesNotExists('userA-login', 'new-password')
def stepModifyExpirationDate(self, sequence=None, sequence_list=None, **kw):
"""
Change expiration date so that reset of password is not available
"""
# save key for url
key = self.portal.portal_password._password_request_dict.keys()[0]
sequence.edit(key=key)
# modify date
for k, v in self.portal.portal_password._password_request_dict.items():
login, date = v
date = DateTime() - 1
self.portal.portal_password._password_request_dict[k] = (login, date)
def stepSimulateExpirationAlarm(self, sequence=None, sequence_list=None, **kw):
"""
Simulate alarm wich remove expired request
"""
self.portal.portal_password.removeExpiredRequests()
def stepCheckNoRequestRemains(self, sequence=None, sequence_list=None, **kw):
"""
after alarm all expired request must have been removed
"""
self.assertEqual(len(self.portal.portal_password._password_request_dict), 0)
def stepLogout(self, sequence=None, sequence_list=None, **kw):
"""
Logout
"""
self.logout()
# tests
def test_01_checkPasswordTool(self):
sequence_list = SequenceList()
sequence_string = 'CheckPasswordToolExists ' \
'AddUser Tic ' \
'Logout ' \
'CheckUserLogin CheckUserNotLoginWithBadPassword ' \
'TryLostPasswordWithBadUser Tic ' \
'CheckNoMailSent ' \
'GoToBadRandomAddress Tic ' \
'CheckUserLogin CheckUserNotLoginWithBadPassword ' \
'LostPassword Tic ' \
'CheckMailSent GoToRandomAddress Tic ' \
'CheckUserLoginWithNewPassword ' \
'CheckUserNotLoginWithFormerPassword ' \
'GoToRandomAddressTwice Tic ' \
'CheckUserLoginWithNewPassword ' \
'CheckUserNotLoginWithFormerPassword ' \
sequence_list.addSequenceString(sequence_string)
sequence_list.play(self)
def test_02_checkPasswordToolDateExpired(self):
sequence_list = SequenceList()
sequence_string = 'CheckPasswordToolExists ' \
'AddUser Tic ' \
'Logout ' \
'CheckUserLogin CheckUserNotLoginWithBadPassword ' \
'LostPassword Tic ' \
'CheckMailSent ' \
'ModifyExpirationDate ' \
'GoToRandomAddress Tic ' \
'CheckUserLogin CheckUserNotLoginWithBadPassword ' \
sequence_list.addSequenceString(sequence_string)
sequence_list.play(self)
def test_03_checkPasswordToolAlarm(self):
sequence_list = SequenceList()
sequence_string = 'CheckPasswordToolExists ' \
'AddUser Tic ' \
'Logout ' \
'CheckUserLogin CheckUserNotLoginWithBadPassword ' \
'LostPassword Tic ' \
'CheckMailSent ' \
'ModifyExpirationDate ' \
'SimulateExpirationAlarm ' \
'CheckNoRequestRemains ' \
'GoToRandomAddressTwice Tic ' \
'CheckUserLogin CheckUserNotLoginWithBadPassword ' \
sequence_list.addSequenceString(sequence_string)
sequence_list.play(self)
self.assertFalse(list(six.iterkeys(
self.portal.portal_password._password_request_dict)))
def test_password_reset_password_and_confirmation_do_not_match(self):
self.portal.portal_password.mailPasswordResetRequest(
user_login='userA-login', REQUEST=self.portal.REQUEST)
reset_key, = list(six.iterkeys(
self.portal.portal_password._password_request_dict))
ret = self.portal.portal_password.changeUserPassword(
user_login="userA-login",
password="new-password",
password_confirm="wrong-password",
password_key=reset_key)
query_string_param = parse_qsl(urlparse(str(ret)).query)
self.assertIn(("portal_status_message", "Password does not match the confirm password."), query_string_param)
self.assertIn(("portal_status_level", "error"), query_string_param)
def test_two_concurrent_password_reset(self):
personA = self.portal.person_module.newContent(portal_type="Person",
reference="userA",
default_email_text="userA@example.invalid")
assignment = personA.newContent(portal_type='Assignment')
assignment.open()
login = personA.newContent(
portal_type='ERP5 Login',
reference='userA-login',
password='passwordA',
)
login.validate()
personB = self.portal.person_module.newContent(portal_type="Person",
reference="userB",
default_email_text="userB@example.invalid")
assignment = personB.newContent(portal_type='Assignment')
assignment.open()
login = personB.newContent(
portal_type='ERP5 Login',
reference='userB-login',
password='passwordB',
)
login.validate()
self.tic()
self._assertUserExists('userA-login', 'passwordA')
self._assertUserExists('userB-login', 'passwordB')
self._createUser('userB')
self._assertUserExists('userA-login', 'userA-password')
self._assertUserExists('userB-login', 'userB-password')
self.assertEqual(0, len(self.portal.portal_password._password_request_dict))
self.portal.portal_password.mailPasswordResetRequest(user_login="userA-login")
self.assertEqual(1, len(self.portal.portal_password._password_request_dict))
key_a = self.portal.portal_password._password_request_dict.keys()[0]
key_a = list(six.iterkeys(self.portal.portal_password._password_request_dict))[0]
self.tic()
self.portal.portal_password.mailPasswordResetRequest(user_login="userB-login")
possible_key_list =\
self.portal.portal_password._password_request_dict.keys()
possible_key_list = \
list(six.iterkeys(self.portal.portal_password._password_request_dict))
self.assertEqual(2, len(possible_key_list))
key_b = [k for k in possible_key_list if k != key_a][0]
self.tic()
self._assertUserExists('userA-login', 'passwordA')
self._assertUserExists('userB-login', 'passwordB')
self._assertUserExists('userA-login', 'userA-password')
self._assertUserExists('userB-login', 'userB-password')
self.portal.portal_password.changeUserPassword(user_login="userA-login",
password="newA",
password_confirmation="newA",
password_confirm="newA",
password_key=key_a)
self.tic()
self._assertUserExists('userA-login', 'newA')
self._assertUserExists('userB-login', 'passwordB')
self._assertUserExists('userB-login', 'userB-password')
self.portal.portal_password.changeUserPassword(user_login="userB-login",
password="newB",
password_confirmation="newB",
password_confirm="newB",
password_key=key_b)
self.tic()
......@@ -390,8 +307,7 @@ class TestPasswordTool(ERP5TypeTestCase):
self.assertEqual(0, len(self.portal.portal_password._password_request_dict))
self.portal.portal_password.mailPasswordResetRequest(user_login="userZ-login ")
self.assertEqual(1, len(self.portal.portal_password._password_request_dict))
key_a = self.portal.portal_password._password_request_dict.keys()[0]
key_a, = list(six.iterkeys(self.portal.portal_password._password_request_dict))
self.tic()
self._assertUserExists('userZ-login ', 'passwordZ')
......@@ -399,7 +315,7 @@ class TestPasswordTool(ERP5TypeTestCase):
# Check that password is not changed if trailing space is not entered
self.portal.portal_password.changeUserPassword(user_login="userZ-login",
password="newZ",
password_confirmation="newZ",
password_confirm="newZ",
password_key=key_a)
self.tic()
self._assertUserExists('userZ-login ', 'passwordZ')
......@@ -407,7 +323,7 @@ class TestPasswordTool(ERP5TypeTestCase):
# Check that password is changed if trailing space is entered
self.portal.portal_password.changeUserPassword(user_login="userZ-login ",
password="newZ2",
password_confirmation="newZ2",
password_confirm="newZ2",
password_key=key_a)
self.tic()
self._assertUserExists('userZ-login ', 'newZ2')
......@@ -429,10 +345,13 @@ class TestPasswordTool(ERP5TypeTestCase):
ret = self.portal.portal_password.mailPasswordResetRequest(
user_login='user-login', REQUEST=self.portal.REQUEST)
query_string_param = parse_qsl(urlparse(str(ret)).query)
# For security reasons, the message should always be the same
self.assertIn("portal_status_message=An+email+has+been+sent+to+you.", str(ret))
self.assertIn(("portal_status_message", "An email has been sent to you."), query_string_param)
self.assertIn(("portal_status_level", "success"), query_string_param)
# But no mail has been sent
self.stepCheckNoMailSent()
self.assertFalse(self.portal.MailHost.getMessageList())
def test_unreachable_email_on_person(self):
person = self.portal.person_module.newContent(
......@@ -455,10 +374,13 @@ class TestPasswordTool(ERP5TypeTestCase):
ret = self.portal.portal_password.mailPasswordResetRequest(
user_login='user-login', REQUEST=self.portal.REQUEST)
query_string_param = parse_qsl(urlparse(str(ret)).query)
# For security reasons, the message should always be the same
self.assertIn("portal_status_message=An+email+has+been+sent+to+you.", str(ret))
self.assertIn(("portal_status_message", "An email has been sent to you."), query_string_param)
self.assertIn(("portal_status_level", "success"), query_string_param)
# But no mail has been sent
self.stepCheckNoMailSent()
self.assertFalse(self.portal.MailHost.getMessageList())
def test_acquired_email_on_person(self):
organisation = self.portal.organisation_module.newContent(
......@@ -482,12 +404,10 @@ class TestPasswordTool(ERP5TypeTestCase):
ret = self.portal.portal_password.mailPasswordResetRequest(
user_login='user-login', REQUEST=self.portal.REQUEST)
query_string_param = parse_qsl(urlparse(str(ret)).query)
# For security reasons, the message should always be the same
self.assertIn("portal_status_message=An+email+has+been+sent+to+you.", str(ret))
self.assertIn(("portal_status_message", "An email has been sent to you."), query_string_param)
self.assertIn(("portal_status_level", "success"), query_string_param)
# But no mail has been sent
self.stepCheckNoMailSent()
self.assertFalse(self.portal.MailHost.getMessageList())
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestPasswordTool))
return suite
......@@ -26,7 +26,7 @@
</item>
<item>
<key> <string>cache_duration</string> </key>
<value> <int>86400</int> </value>
<value> <int>864000</int> </value>
</item>
<item>
<key> <string>description</string> </key>
......
......@@ -513,6 +513,8 @@ var GameManager = /** @class */ (function () {
function GameManager(canvas, game_parameters_json) {
var drone, header_list;
this._canvas = canvas;
this._canvas_width = canvas.width;
this._canvas_height = canvas.height;
this._scene = null;
this._engine = null;
this._droneList = [];
......@@ -575,7 +577,7 @@ var GameManager = /** @class */ (function () {
});
};
GameManager.prototype.update = function () {
GameManager.prototype.update = function (fullscreen) {
// time delta means that drone are updated every virtual second
// This is fixed and must not be modified
// otherwise, it will lead to different scenario results
......@@ -587,10 +589,8 @@ var GameManager = /** @class */ (function () {
function triggerUpdateIfPossible() {
if ((_this._canUpdate) && (_this.ongoing_update_promise === null) &&
(0 < _this.waiting_update_count)) {
_this.ongoing_update_promise = _this._update(
TIME_DELTA,
(_this.waiting_update_count === 1)
).push(function () {
_this.ongoing_update_promise = _this._update(TIME_DELTA, fullscreen)
.push(function () {
_this.waiting_update_count -= 1;
_this.ongoing_update_promise = null;
triggerUpdateIfPossible();
......@@ -626,7 +626,7 @@ var GameManager = /** @class */ (function () {
return false;
};
GameManager.prototype._update = function (delta_time) {
GameManager.prototype._update = function (delta_time, fullscreen) {
var _this = this,
queue = new RSVP.Queue(),
i;
......@@ -642,6 +642,20 @@ var GameManager = /** @class */ (function () {
}
}
if (fullscreen) {
//Only resize if size changes
if (this._canvas.width !== GAMEPARAMETERS.fullscreen.width) {
this._canvas.width = GAMEPARAMETERS.fullscreen.width;
this._canvas.height = GAMEPARAMETERS.fullscreen.height;
}
} else {
if (this._canvas.width !== this._canvas_width) {
this._canvas.width = this._canvas_width;
this._canvas.height = this._canvas_height;
this._engine.resize(true);
}
}
this._droneList.forEach(function (drone) {
queue.push(function () {
drone._tick += 1;
......@@ -1043,9 +1057,9 @@ var runGame, updateGame;
return game_manager_instance.run();
};
updateGame = function () {
updateGame = function (fullscreen) {
if (game_manager_instance) {
return game_manager_instance.update();
return game_manager_instance.update(fullscreen);
}
};
......
......@@ -226,7 +226,7 @@
</item>
<item>
<key> <string>actor</string> </key>
<value> <string>zope</string> </value>
<value> <unicode>zope</unicode> </value>
</item>
<item>
<key> <string>comment</string> </key>
......@@ -240,7 +240,7 @@
</item>
<item>
<key> <string>serial</string> </key>
<value> <string>1006.43905.28804.11980</string> </value>
<value> <string>1009.7345.31305.44339</string> </value>
</item>
<item>
<key> <string>state</string> </key>
......@@ -260,7 +260,7 @@
</tuple>
<state>
<tuple>
<float>1677600104.11</float>
<float>1687455790.77</float>
<string>UTC</string>
</tuple>
</state>
......
......@@ -26,7 +26,7 @@
</item>
<item>
<key> <string>cache_duration</string> </key>
<value> <int>86400</int> </value>
<value> <int>864000</int> </value>
</item>
<item>
<key> <string>description</string> </key>
......
history_list = context.getMovementHistoryList(**kw)
reverse_list = []
for x in history_list:
reverse_list.insert(0, x)
return reverse_list
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="PythonScript" module="Products.PythonScripts.PythonScript"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>Script_magic</string> </key>
<value> <int>3</int> </value>
</item>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string>**kw</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>Resource_getReversedMovementHistoryList</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
......@@ -561,7 +561,7 @@
<dictionary>
<item>
<key> <string>method_name</string> </key>
<value> <string>getMovementHistoryList</string> </value>
<value> <string>Resource_getReversedMovementHistoryList</string> </value>
</item>
</dictionary>
</pickle>
......
......@@ -16,14 +16,40 @@
<key> <string>content_icon</string> </key>
<value> <string>folder_icon.gif</string> </value>
</item>
<item>
<key> <string>description</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>factory</string> </key>
<value> <string>addFolder</string> </value>
</item>
<item>
<key> <string>group_list</string> </key>
<value>
<tuple>
<string>module</string>
</tuple>
</value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>Delivery Node Module</string> </value>
</item>
<item>
<key> <string>init_script</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>permission</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>portal_type</string> </key>
<value> <string>Base Type</string> </value>
......@@ -44,6 +70,18 @@
<key> <string>type_group</string> </key>
<value> <string>module</string> </value>
</item>
<item>
<key> <string>type_interface</string> </key>
<value>
<tuple/>
</value>
</item>
<item>
<key> <string>type_mixin</string> </key>
<value>
<tuple/>
</value>
</item>
</dictionary>
</pickle>
</record>
......
......@@ -3,7 +3,7 @@
"""
REQUEST = context.REQUEST
next_url = context.portal_password.changeUserPassword(password=REQUEST['password'],
password_confirmation=REQUEST['password_confirm'],
password_confirm=REQUEST['password_confirm'],
password_key=REQUEST['password_key'],
user_login=REQUEST.get('user_login', None),
REQUEST=REQUEST)
......
......@@ -117,6 +117,7 @@ class TestStaticWebSiteRedirection(ERP5TypeTestCase):
connection = httplib.HTTPSConnection(netloc_to_check, context=ssl._create_unverified_context(), timeout=10)
else:
connection = httplib.HTTPConnection(netloc_to_check, timeout=10)
self.addCleanup(connection.close)
connection.request(
method="GET",
url=url_to_check
......
......@@ -2603,7 +2603,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.assertEqual(activity_node, current_node)
def test_getServerAddress(self):
host, port = self.startZServer()
host, port = self.startHTTPServer()
ip = socket.gethostbyname(host)
server_address = '%s:%s' % (ip, port)
address = getServerAddress()
......
......@@ -164,7 +164,7 @@ class Alarm(XMLObject, PeriodicityMixin):
activate_kw['tag'] = '%s_%x' % (self.getRelativeUrl(), getrandbits(32))
tag = activate_kw['tag']
method = getattr(self, method_id)
func_code = method.__code__
func_code = getattr(method, '__code__', None)
if func_code is None: # BBB Zope2
func_code = method.func_code
try:
......
......@@ -31,6 +31,7 @@ from AccessControl import ClassSecurityInfo
from Products.ERP5Type import Permissions, PropertySheet
from App.special_dtml import HTMLFile
from Products.ERP5Type.XMLObject import XMLObject
from Products.ERP5Type import IS_ZOPE2
from Products.PythonScripts.PythonScript import \
PythonScript as ZopePythonScript
from Products.ERP5Type.mixin.expression import ExpressionMixin
......@@ -71,6 +72,8 @@ class PythonScript(XMLObject, ZopePythonScript, ExpressionMixin('expression')):
meta_type = 'ERP5 Python Script'
portal_type = 'Python Script'
add_permission = Permissions.AddPortalContent
if not IS_ZOPE2:
zmi_icon = ZopePythonScript.zmi_icon
# Declarative security
security = ClassSecurityInfo()
......
REQUEST = context.REQUEST
return context.portal_password.changeUserPassword(password=REQUEST['password'],
password_confirmation=REQUEST['password_confirm'],
password_confirm=REQUEST['password_confirm'],
password_key=REQUEST['password_key'],
user_login=REQUEST.get('user_login', None),
REQUEST=REQUEST)
......@@ -41,9 +41,9 @@ from BTrees.OOBTree import OOBTree
from six.moves.urllib.parse import urlencode
redirect_path = '/login_form'
def redirect(REQUEST, site_url, message):
def redirect(REQUEST, site_url, message, level):
if REQUEST is not None and getattr(REQUEST.RESPONSE, 'redirect', None) is not None:
parameter = urlencode({'portal_status_message': message})
parameter = urlencode({'portal_status_message': message, 'portal_status_level': level})
ret_url = '%s%s?%s' % (site_url, redirect_path, parameter)
return REQUEST.RESPONSE.redirect( ret_url )
else:
......@@ -171,10 +171,13 @@ class PasswordTool(BaseTool):
"User {user} does not have a valid email address".format(user=user_login)
)
if error_encountered:
# note that we intentionally use the same msg here regardless of whether the
# email was successfully sent or not in order not to leak information about user
# existence.
if batch:
raise RuntimeError(msg)
else:
return redirect(REQUEST, site_url, msg)
return redirect(REQUEST, site_url, msg, 'success')
key = self.getResetPasswordKey(user_login=user_login,
expiration_date=expiration_date)
......@@ -222,8 +225,7 @@ class PasswordTool(BaseTool):
message_text_format=message_text_format,
event_keyword_argument_dict=event_keyword_argument_dict)
if not batch:
return redirect(REQUEST, site_url,
translateString("An email has been sent to you."))
return redirect(REQUEST, site_url, msg, 'success')
security.declareProtected(Permissions.ModifyPortalContent, 'removeExpiredRequests')
def removeExpiredRequests(self):
......@@ -266,13 +268,12 @@ class PasswordTool(BaseTool):
"""
Reset the password for a given login
"""
# BBB: password_confirm: unused argument
def error(message):
# BBB: should "raise Redirect" instead of just returning, simplifying
# calling code and making mistakes more difficult
# BBB: should probably not translate message when REQUEST is None
message = translateString(message)
return redirect(REQUEST, site_url, message)
return redirect(REQUEST, site_url, message, 'error')
if REQUEST is None:
REQUEST = get_request()
......@@ -291,6 +292,8 @@ class PasswordTool(BaseTool):
if user_login is not None and register_user_login != user_login:
# XXX: not descriptive enough
return error("Bad login provided.")
if password_confirm is not None and password_confirm != password:
return error("Password does not match the confirm password.")
if DateTime() > expiration_date:
return error("Date has expired.")
del self._password_request_dict[password_key]
......@@ -303,6 +306,6 @@ class PasswordTool(BaseTool):
login = portal.unrestrictedTraverse(login_dict['path'])
login.setPassword(password) # this will raise if password does not match policy
return redirect(REQUEST, site_url,
translateString("Password changed."))
translateString("Password changed."), 'success')
InitializeClass(PasswordTool)
......@@ -151,7 +151,7 @@ def addERP5KeyAuthPlugin(dispatcher, id, title=None,
class ERP5KeyAuthPlugin(ERP5UserManager, CookieAuthHelper):
"""
Key authentification PAS plugin which support key authentication in URL.
Key authentication PAS plugin which support key authentication in URL.
<ERP5_Root>/web_page_module/1?__ac_key=207221200213146153166
......@@ -309,7 +309,7 @@ class ERP5KeyAuthPlugin(ERP5UserManager, CookieAuthHelper):
################################
security.declarePrivate('resetCredentials')
def resetCredentials(self, request, response):
"""Expire cookies of authentification """
"""Expire cookies of authentication """
response.expireCookie(self.cookie_name, path='/')
response.expireCookie(self.default_cookie_name, path='/')
......@@ -319,7 +319,7 @@ class ERP5KeyAuthPlugin(ERP5UserManager, CookieAuthHelper):
################################
security.declarePrivate('authenticateCredentials')
def authenticateCredentials( self, credentials ):
"""Authentificate with credentials"""
"""Authenticate with credentials"""
key = credentials.get('key', None)
if key != None:
login = self.decrypt(key)
......@@ -377,9 +377,9 @@ class ERP5KeyAuthPlugin(ERP5UserManager, CookieAuthHelper):
LOG('ERP5KeyAuthPlugin.authenticateCredentials', PROBLEM, str(e))
return None
################################
# Properties for ZMI managment #
################################
#################################
# Properties for ZMI management #
#################################
#'Edit' option form
manage_editERP5KeyAuthPluginForm = PageTemplateFile(
......@@ -393,7 +393,7 @@ class ERP5KeyAuthPlugin(ERP5UserManager, CookieAuthHelper):
"""Edit the object"""
error_message = ''
#Test paramaeters
#Test parameters
if "__ac_key" in [cookie_name, default_cookie_name]:
raise ValueError("Cookie name must be different of __ac_key")
......
......@@ -96,6 +96,8 @@ class Message(Persistent):
def __init__(self, domain=None, message='',
mapping=None, default=None):
self.message = message
if mapping is not None:
assert isinstance(mapping, dict)
self.mapping = mapping
self.domain = domain
if default is None:
......
......@@ -35,6 +35,7 @@ import sys
import imp
import collections
from six import reraise
import traceback
import coverage
from Products.ERP5Type.Utils import ensure_list
......@@ -60,6 +61,13 @@ except NameError: # < 3.6
class ModuleNotFoundError(ImportError):
pass
class ComponentImportError(ImportError):
"""Error when importing an existing, but invalid component, typically
because it contains syntax errors or import errors.
"""
class ComponentDynamicPackage(ModuleType):
"""
A top-level component is a package as it contains modules, this is required
......@@ -355,15 +363,17 @@ class ComponentDynamicPackage(ModuleType):
# in a deadlock
source_code_obj = compile(source_code_str, module.__file__, 'exec')
exec(source_code_obj, module.__dict__)
except Exception as error:
except Exception:
del sys.modules[module_fullname]
if module_fullname_alias:
del sys.modules[module_fullname_alias]
if module_fullname_filesystem:
del sys.modules[module_fullname_filesystem]
reraise(ImportError,
"%s: cannot load Component %s (%s)" % (fullname, name, error),
reraise(
ComponentImportError,
"%s: cannot load Component %s :\n%s" % (
fullname, name, traceback.format_exc()),
sys.exc_info()[2])
# Add the newly created module to the Version package and add it as an
......
......@@ -22,6 +22,7 @@ from OFS.misc_ import p_
from App.ImageFile import ImageFile
from Acquisition import aq_base, aq_parent
from zExceptions import Forbidden
from Products.ERP5Type import IS_ZOPE2
### Guards
......@@ -153,6 +154,7 @@ class _(PatchClass(PythonScript)):
# Add proxy role icon in ZMI
if IS_ZOPE2:
def om_icons(self):
"""Return a list of icon URLs to be displayed by an ObjectManager"""
if self._proxy_roles:
......@@ -164,7 +166,13 @@ class _(PatchClass(PythonScript)):
p_.PythonScript_ProxyRole_icon = \
ImageFile('pyscript_proxyrole.gif', globals())
else:
@property
def zmi_icon(self):
if self._proxy_roles:
return 'fa fa-terminal fa-spin'
else:
return 'fa fa-terminal'
# Guards
......
......@@ -428,7 +428,7 @@ class ERP5TypeFunctionalTestCase(ERP5TypeTestCase):
# non-recursive results clean of portal_tests/ or portal_tests/``run_only``
self.portal.portal_tests.TestTool_cleanUpTestResults(self.run_only or None)
self.tic()
host, port = self.startZServer()
host, port = self.startHTTPServer()
self.runner = FunctionalTestRunner(host, port, self)
def setSystemPreference(self):
......
......@@ -185,7 +185,7 @@ class ERP5TypeLiveTestCase(ERP5TypeTestCaseMixin):
finally:
restoreInteraction()
from Products.ERP5Type.dynamic.component_package import ComponentDynamicPackage
from Products.ERP5Type.dynamic.component_package import ComponentDynamicPackage, ComponentImportError
from Products.ERP5Type.tests.runUnitTest import ERP5TypeTestLoader
class ERP5TypeTestReLoader(ERP5TypeTestLoader):
......@@ -221,6 +221,8 @@ class ERP5TypeTestReLoader(ERP5TypeTestLoader):
if module is None:
try:
self._importZodbTestComponent(name.split('.')[0])
except ComponentImportError:
raise
except ImportError:
pass
else:
......
......@@ -1284,7 +1284,7 @@ class ERP5TypeCommandLineTestCase(ERP5TypeTestCaseMixin):
if len(setup_done) == 1: # make sure it is run only once
self._setUpDummyMailHost()
self.startZServer(verbose=True)
self.startHTTPServer(verbose=True)
self._registerNode(distributing=1, processing=1)
self.loadPromise()
......
......@@ -151,9 +151,10 @@ class ProcessingNodeTestCase(ZopeTestCase.TestCase):
pass
Lifetime.graceful_shutdown_loop()
def startZServer(self, verbose=False):
"""Start HTTP ZServer in background"""
if self._server_address is None:
@staticmethod
def startHTTPServer(verbose=False):
"""Start HTTP Server in background"""
if ProcessingNodeTestCase._server_address is None:
from Products.ERP5Type.tests.runUnitTest import log_directory
log = os.path.join(log_directory, "Z2.log")
message = "Running %s server at %s:%s\n"
......@@ -199,8 +200,11 @@ class ProcessingNodeTestCase(ZopeTestCase.TestCase):
webdav_ports=webdav_ports),
logger,
sockets=sockets)
ProcessingNodeTestCase._server = hs
ProcessingNodeTestCase._server_address = hs.addr
t = Thread(target=hs.run)
ProcessingNodeTestCase._server_thread = t = Thread(
target=hs.run,
name='ProcessingNodeTestCase.startHTTPServer')
t.setDaemon(1)
t.start()
from Products.CMFActivity import ActivityTool
......@@ -210,7 +214,15 @@ class ProcessingNodeTestCase(ZopeTestCase.TestCase):
if ActivityTool.currentNode == ActivityTool._server_address:
ActivityTool.currentNode = None
ActivityTool._server_address = None
return self._server_address
return ProcessingNodeTestCase._server_address
startZServer = startHTTPServer # BBB
@staticmethod
def stopHTTPServer():
if ProcessingNodeTestCase._server_address is not None:
ProcessingNodeTestCase._server_address = None
ProcessingNodeTestCase._server.close()
ProcessingNodeTestCase._server_thread.join(5)
def _registerNode(self, distributing, processing):
"""Register node to process and/or distribute activities"""
......@@ -338,7 +350,7 @@ class ProcessingNodeTestCase(ZopeTestCase.TestCase):
def afterSetUp(self):
"""Initialize a node that will only process activities"""
self.startZServer()
self.startHTTPServer()
# Make sure to still have possibilities to edit components
addUserToDeveloperRole('ERP5TypeTestCase')
from Zope2.custom_zodb import cluster
......
......@@ -40,10 +40,11 @@ if save_mysql:
# The output of mysqldump needs to merge many lines at a time
# for performance reasons (merging lines is at most 10 times
# faster, so this produce somewhat not nice to read sql
command = 'mysqldump %s > %s' % (getMySQLArguments(), dump_sql_path,)
command = 'mysqldump %s > %s.tmp' % (getMySQLArguments(), dump_sql_path,)
if verbosity:
_print('Dumping MySQL database with %s ...' % command)
subprocess.check_call(command, shell=True)
os.rename(dump_sql_path + '.tmp', dump_sql_path)
if load:
if save_mysql:
......
......@@ -693,6 +693,7 @@ def runUnitTestList(test_list, verbosity=1, debug=0, run_only=None):
raise
finally:
ProcessingNodeTestCase.unregisterNode()
ProcessingNodeTestCase.stopHTTPServer()
db_factory.close()
Storage.close()
if node_pid_list is not None:
......
......@@ -3296,6 +3296,67 @@ class Test(ERP5TypeTestCase):
expected_msg_re = re.compile('Ran 3 test.*OK', re.DOTALL)
self.assertRegex(output, expected_msg_re)
def testRunLiveTestImportError(self):
source_code = '''
def break_at_import():
import non.existing.module # pylint:disable=import-error
break_at_import()
'''
component = self._newComponent('testRunLiveTestImportError', source_code)
component.validate()
self.tic()
from Products.ERP5Type.tests.runUnitTest import ERP5TypeTestLoader
ERP5TypeTestLoader_loadTestsFromNames = ERP5TypeTestLoader.loadTestsFromNames
def loadTestsFromNames(self, *args, **kwargs):
"""
Monkey patched to simulate a reset right after importing the ZODB Test
Component whose Unit Tests are going to be executed
"""
ret = ERP5TypeTestLoader_loadTestsFromNames(self, *args, **kwargs)
from Products.ERP5.ERP5Site import getSite
getSite().portal_components.reset(force=True)
# Simulate a new REQUEST while the old one has been GC'ed
import erp5.component
erp5.component.ref_manager.clear()
import gc
gc.collect()
return ret
self.assertEqual(component.getValidationState(), 'validated')
self._component_tool.reset(force=True,
reset_portal_type_at_transaction_boundary=True)
def runLiveTest(test_name):
# ERP5TypeLiveTestCase.runLiveTest patches ERP5TypeTestCase bases, thus it
# needs to be restored after calling runLiveTest
base_tuple = ERP5TypeTestCase.__bases__
ERP5TypeTestLoader.loadTestsFromNames = loadTestsFromNames
try:
self._component_tool.runLiveTest(test_name)
finally:
ERP5TypeTestCase.__bases__ = base_tuple
ERP5TypeTestLoader.loadTestsFromNames = ERP5TypeTestLoader_loadTestsFromNames
return self._component_tool.readTestOutput()
output = runLiveTest('testRunLiveTestImportError')
self.assertIn('''
File "<portal_components/test.erp5.testRunLiveTestImportError>", line 4, in <module>
break_at_import()
File "<portal_components/test.erp5.testRunLiveTestImportError>", line 3, in break_at_import
import non.existing.module # pylint:disable=import-error
ImportError: No module named non.existing.module
''', output)
output = runLiveTest('testDoesNotExist_import_error_because_module_does_not_exist')
self.assertIn(
"ImportError: No module named testDoesNotExist_import_error_because_module_does_not_exist",
output)
def testERP5Broken(self):
# Create a broken ghost object
import erp5.portal_type
......
......@@ -56,7 +56,7 @@
<dd>
The connection string used for Z MySQL Database Connection is of the form:
<br />
<code>[*lock] [+/-][database][@host[:port]] [user [password [unix_socket]]]</code>
<code>[%ssl_name] [*lock] [+/-][database][@host[:port]] [user [password [unix_socket]]]</code>
<br />
or typically:
<br />
......@@ -73,6 +73,16 @@
If the UNIX socket is in a non-standard location, you can specify
the full path to it after the password.
</dd>
<dd>
%<em>ssl_name</em> at the begining of the connection string means to use
a ssl client certificate for authentication.
This will use a CA certificate located at
<code>$INSTANCEHOME/etc/zmysqlda/[%ssl_name]-ca.pem</code>, a client certificate
at <code>$INSTANCEHOME/etc/zmysqlda/[%ssl_name]-cert.pem</code> with a key
at <code>$INSTANCEHOME/etc/zmysqlda/[%ssl_name]-key.pem</code>.
This will also verify that the connection is using ssl and cause an error
when an encrypted connection can not be established.
</dd>
<dd>
A '-' in front of the database tells ZMySQLDA to not use Zope's
Transaction Manager, even if the server supports transactions. A
......
......@@ -107,6 +107,7 @@ if _v < MySQLdb_version_required:
from MySQLdb.converters import conversions
from MySQLdb.constants import FIELD_TYPE, CR, ER, CLIENT
from App.config import getConfiguration
from Shared.DC.ZRDB.TM import TM
from DateTime import DateTime
from zLOG import LOG, ERROR, WARNING
......@@ -115,7 +116,8 @@ from ZODB.POSException import ConflictError
hosed_connection = (
CR.SERVER_GONE_ERROR,
CR.SERVER_LOST,
CR.COMMANDS_OUT_OF_SYNC
CR.COMMANDS_OUT_OF_SYNC,
1927, # ER_CONNECTION_KILLED "Connection was killed" in MariaDB
)
query_syntax_error = (
......@@ -245,6 +247,14 @@ class DB(TM):
items = self._connection.split()
if not items:
return
if items[0][0] == "%":
cert_base_name = items.pop(0)[1:]
instancehome = getConfiguration().instancehome
kwargs['ssl'] = {
'ca': os.path.join(instancehome, 'etc', 'zmysqlda', cert_base_name + '-ca.pem'),
'cert': os.path.join(instancehome, 'etc', 'zmysqlda', cert_base_name + '-cert.pem'),
'key': os.path.join(instancehome, 'etc', 'zmysqlda', cert_base_name + '-key.pem'),
}
if items[0] == "~":
kwargs['compress'] = True
del items[0]
......@@ -319,7 +329,12 @@ class DB(TM):
error=True,
)
self.db = MySQLdb.connect(**self._kw_args)
self._query("SET time_zone='+00:00'")
self._query(b"SET time_zone='+00:00'")
# BBB mysqlclient on python2 does not support sql_mode, check that
# the connection is actually encrypted.
if self._kw_args.get('ssl') and \
not self._query(b"SHOW STATUS LIKE 'Ssl_version'").fetch_row()[0][1]:
raise NotSupportedError("Connection established without SSL")
def tables(self, rdb=0,
_care=('TABLE', 'VIEW')):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment