##############################################################################
#
# Copyright (c) 2010 Nexedi SA and Contributors. All Rights Reserved.
#                    Lukasz Nowak <luke@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

from AccessControl.SecurityManagement import newSecurityManager
from Products.ERP5.ERP5Site import ERP5Site
from Products.ERP5Security.ERP5UserManager import SUPER_USER
from Products.ERP5Type.Base import Base
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Wizard import addERP5RemoteUserManager
from Products.ERP5Wizard.Tool.WizardTool import GeneratorCall
import socket
import transaction
import unittest

# portal_witch simulation
def proxyMethodHandler(self, kw):
  """Dummy proxyMethodHandler"""
  # login as super user
  newSecurityManager(self, self.getPortalObject().acl_users.getUserById(
      SUPER_USER))
  data = getattr(self, kw['method_id'])(**kw['method_kw'])
  response = GeneratorCall(data=data)
  return response.dump()

Base.proxyMethodHandler = proxyMethodHandler
Base.security.declarePublic('proxyMethodHandler')

def Base_authenticateCredentialsFromExpressInstance(self, **kw):
  person_list = self.portal_catalog(portal_type='Person',
    reference=kw['login'])

  if len(person_list) == 1:
    if person_list[0].getTitle() == kw['password']:
      return 1
  return 0

Base.Base_authenticateCredentialsFromExpressInstance =\
    Base_authenticateCredentialsFromExpressInstance
Base.security.declarePublic('Base_authenticateCredentialsFromExpressInstance')

# portal_wizard simulation
def ERP5Site_getExpressInstanceUid(self, **kw):
  """Dummy site it"""
  return 'dummy_site_id'

ERP5Site.ERP5Site_getExpressInstanceUid =\
    ERP5Site_getExpressInstanceUid
ERP5Site.security.declarePublic('ERP5Site_getExpressInstanceUid')

# portal_wizard patches
def raises_socket_error(self, *args, **kw):
  raise socket.error

def raises_socket_sslerror(self, *args, **kw):
  raise socket.sslerror

def raises_valueerror(self, *args, **kw):
  raise ValueError

def raises_socket_timeout(self, *args, **kw):
  raise socket.timeout

def raises_socket_gaierror(self, *args, **kw):
  raise socket.gaierror

class TestERP5RemoteUserManager(ERP5TypeTestCase):
  """Low level tests of remote logging"""
  def getBusinessTemplateList(self):
    return (
        'erp5_base',
        'erp5_wizard',
        )

  base_type_portal_type = 'Base Type'
  person_portal_type = 'Person'
  system_preference_portal_type = 'System Preference'

  erp5_remote_manager_id = 'erp5_remote_user_manager'
  system_preference_id = 'TestERP5RemoteUserManager'


  def setUpRemoteUserManager(self):
    acl_users = self.portal.acl_users
    addERP5RemoteUserManager(acl_users, self.erp5_remote_manager_id)
    erp5_remote_manager = getattr(acl_users, self.erp5_remote_manager_id)
    erp5_users = getattr(acl_users, 'erp5_users')
    erp5_users.manage_activateInterfaces(['IUserEnumerationPlugin'])
    erp5_remote_manager.manage_activateInterfaces(['IAuthenticationPlugin'])
    transaction.commit()

  def afterSetUp(self):
    self.portal = self.getPortalObject()
    self.createDummyWitchTool()
    self.setUpRemoteUserManager()
    self.person_module = self.portal.person_module
    acl_users = self.portal.acl_users
    self.erp5_remote_manager = getattr(acl_users, self.erp5_remote_manager_id)
    # set preferences before each test, as test suite can have different
    # ip/port after being saved and then loaded
    self.setUpAuthenticationServerPreferences()
    transaction.commit()
    self.tic()

  def beforeTearDown(self):
    """Clear everthing"""
    self.portal.acl_users.manage_delObjects(self.erp5_remote_manager_id)
    self.portal.deleteContent('portal_witch')
    self.removeAuthenticationServerPreferences()
    transaction.commit()
    self.tic()
    self.person_module.deleteContent(list(self.person_module.objectIds()))
    transaction.commit()
    self.tic()

  def removeAuthenticationServerPreferences(self):
    portal_preferences = self.portal.portal_preferences
    if self.system_preference_id in portal_preferences.objectIds():
      portal_preferences.deleteContent(self.system_preference_id)
    self.portal.portal_caches.clearAllCache()

  def setUpAuthenticationServerPreferences(self, server_url=None,
      server_root=None):
    if server_url is None:
      server_url = self.portal.absolute_url() + '/'
    if server_root is None:
      self.getPortalId()
    portal_preferences = self.portal.portal_preferences
    # disable all existing system preferences
    system_preference = portal_preferences.newContent(
        portal_type=self.system_preference_portal_type,
        id=self.system_preference_id,
        preferred_witch_tool_server_url=server_url,
        preferred_witch_tool_server_root=server_root,
    )
    system_preference.enable()
    self.assertEqual('global', system_preference.getPreferenceState())
    # clear cache after setting preferences
    self.portal.portal_caches.clearAllCache()

  def createDummyWitchTool(self):
    if 'portal_witch' not in self.portal.objectIds():
      self.portal.newContent(id='portal_witch',
        portal_type=self.base_type_portal_type)

  def createPerson(self, reference, password):
    """Creates person with reference and password in title to simulate remote
    logging"""
    self.person_module.newContent(
        portal_type=self.person_portal_type,
        reference=reference, title=password)

  def checkLogin(self, expected, sent):
    """Helper to check login, clear cache later and commit transaction"""
    self.assertEqual(expected,
        self.erp5_remote_manager.authenticateCredentials(sent))
    self.portal.portal_caches.clearAllCache()

  ############################################################################
  # TESTS
  ############################################################################
  def test_correct_login(self):
    """Checks typical login scenrio"""
    login = 'someone'
    password = 'somepass'
    self.createPerson(login, password)
    transaction.commit()
    self.tic()
    kw = {'login':login, 'password': password}
    self.checkLogin(('someone', 'someone'), kw)

  def test_incorrect_login(self):
    """Checks that incorrect login does not work"""
    login = 'someone'
    password = 'somepass'
    self.createPerson(login, password)
    transaction.commit()
    self.tic()
    kw = {'login':login, 'password': 'another_password'}
    self.checkLogin(None, kw)

  def test_incorrect_login_in_case_of_no_connection(self):
    """Checks that in case if there is no auth server defined it is not possible ot login"""
    login = 'someone'
    password = 'somepass'
    self.createPerson(login, password)
    transaction.commit()
    self.tic()
    kw = {'login':login, 'password': password}
    self.removeAuthenticationServerPreferences()
    transaction.commit()
    self.tic()
    self.checkLogin(None, kw)

  def test_loggable_in_case_of_server_socket_error(self):
    """Check that in case if socket.error is raised login works from ZODB cache"""
    login = 'someone'
    password = 'somepass'
    self.createPerson(login, password)
    transaction.commit()
    self.tic()
    kw = {'login':login, 'password': password}
    self.checkLogin(('someone', 'someone'), kw)
    # patch Wizard Tool to raise in callRemoteProxyMethod
    from Products.ERP5Wizard.Tool.WizardTool import WizardTool
    original_callRemoteProxyMethod=WizardTool.callRemoteProxyMethod
    try:
      WizardTool.callRemoteProxyMethod = raises_socket_error
      self.assertRaises(socket.error,
          self.portal.portal_wizard.callRemoteProxyMethod)
      self.checkLogin(('someone', 'someone'), kw)
    finally:
      WizardTool.callRemoteProxyMethod = original_callRemoteProxyMethod

  def test_loggable_in_case_of_server_socket_sslerror(self):
    """Check that in case if socket.sslerror is raised login works from ZODB cache"""
    login = 'someone'
    password = 'somepass'
    self.createPerson(login, password)
    transaction.commit()
    self.tic()
    kw = {'login':login, 'password': password}
    self.checkLogin(('someone', 'someone'), kw)
    # patch Wizard Tool to raise in callRemoteProxyMethod
    from Products.ERP5Wizard.Tool.WizardTool import WizardTool
    original_callRemoteProxyMethod=WizardTool.callRemoteProxyMethod
    try:
      WizardTool.callRemoteProxyMethod = raises_socket_sslerror
      self.assertRaises(socket.sslerror,
          self.portal.portal_wizard.callRemoteProxyMethod)
      self.checkLogin(('someone', 'someone'), kw)
    finally:
      WizardTool.callRemoteProxyMethod = original_callRemoteProxyMethod

  def test_not_loggable_in_case_of_server_raises_anything_else(self):
    """Check that in case if non socket is raised login does not works"""
    login = 'someone'
    password = 'somepass'
    self.createPerson(login, password)
    transaction.commit()
    self.tic()
    kw = {'login':login, 'password': password}
    self.checkLogin(('someone', 'someone'), kw)
    # patch Wizard Tool to raise in callRemoteProxyMethod
    from Products.ERP5Wizard.Tool.WizardTool import WizardTool
    original_callRemoteProxyMethod=WizardTool.callRemoteProxyMethod
    try:
      WizardTool.callRemoteProxyMethod = raises_valueerror
      self.assertRaises(ValueError,
          self.portal.portal_wizard.callRemoteProxyMethod)
      self.checkLogin(None, kw)
    finally:
      WizardTool.callRemoteProxyMethod = original_callRemoteProxyMethod

  def test_loggable_in_case_of_server_socket_error_with_failed_login_between(
      self):
    """Check that in case if socket.sslerror is raised login works from ZODB cache, when wrong credentials was passed"""
    login = 'someone'
    password = 'somepass'
    self.createPerson(login, password)
    transaction.commit()
    self.tic()
    kw = {'login':login, 'password': password}
    self.checkLogin(('someone', 'someone'), kw)
    # patch Wizard Tool to raise in callRemoteProxyMethod
    from Products.ERP5Wizard.Tool.WizardTool import WizardTool
    original_callRemoteProxyMethod=WizardTool.callRemoteProxyMethod
    try:
      WizardTool.callRemoteProxyMethod = raises_socket_error
      self.assertRaises(socket.error,
          self.portal.portal_wizard.callRemoteProxyMethod)
      self.checkLogin(('someone', 'someone'), kw)
      self.checkLogin(None, {'login':kw['login'], 'password':'wrong_password'})
      self.checkLogin(('someone', 'someone'), kw)
    finally:
      WizardTool.callRemoteProxyMethod = original_callRemoteProxyMethod

  def test_loggable_in_case_of_server_socket_timeout(self):
    """Check that in case if socket.timeout is raised login works from ZODB cache"""
    login = 'someone'
    password = 'somepass'
    self.createPerson(login, password)
    transaction.commit()
    self.tic()
    kw = {'login':login, 'password': password}
    self.checkLogin(('someone', 'someone'), kw)
    # patch Wizard Tool to raise in callRemoteProxyMethod
    from Products.ERP5Wizard.Tool.WizardTool import WizardTool
    original_callRemoteProxyMethod=WizardTool.callRemoteProxyMethod
    try:
      WizardTool.callRemoteProxyMethod = raises_socket_timeout
      self.assertRaises(socket.timeout,
          self.portal.portal_wizard.callRemoteProxyMethod)
      self.checkLogin(('someone', 'someone'), kw)
    finally:
      WizardTool.callRemoteProxyMethod = original_callRemoteProxyMethod

  def test_loggable_in_case_of_server_gaierror(self):
    """Check that in case if socket.gaierror is raised login works from ZODB cache"""
    login = 'someone'
    password = 'somepass'
    self.createPerson(login, password)
    transaction.commit()
    self.tic()
    kw = {'login':login, 'password': password}
    self.checkLogin(('someone', 'someone'), kw)
    # patch Wizard Tool to raise in callRemoteProxyMethod
    from Products.ERP5Wizard.Tool.WizardTool import WizardTool
    original_callRemoteProxyMethod=WizardTool.callRemoteProxyMethod
    try:
      WizardTool.callRemoteProxyMethod = raises_socket_gaierror
      self.assertRaises(socket.gaierror,
          self.portal.portal_wizard.callRemoteProxyMethod)
      self.checkLogin(('someone', 'someone'), kw)
    finally:
      WizardTool.callRemoteProxyMethod = original_callRemoteProxyMethod

  def test_loggable_in_case_of_server_gaierror_normal_cache(self):
    """Check that in case if socket.gaierror is raised login works from usual cache"""
    login = 'someone'
    password = 'somepass'
    self.createPerson(login, password)
    transaction.commit()
    self.tic()
    kw = {'login':login, 'password': password}
    expected = ('someone', 'someone')
    sent = kw
    self.assertEqual(expected,
        self.erp5_remote_manager.authenticateCredentials(sent))
    # patch Wizard Tool to raise in callRemoteProxyMethod
    from Products.ERP5Wizard.Tool.WizardTool import WizardTool
    original_callRemoteProxyMethod=WizardTool.callRemoteProxyMethod
    try:
      WizardTool.callRemoteProxyMethod = raises_socket_gaierror
      self.assertRaises(socket.gaierror,
          self.portal.portal_wizard.callRemoteProxyMethod)
      self.assertEqual(expected,
        self.erp5_remote_manager.authenticateCredentials(sent))
    finally:
      WizardTool.callRemoteProxyMethod = original_callRemoteProxyMethod

  def test_loggable_in_case_of_server_raises_anythin_else_normal_cache(self):
    """Check that in case if non socket is raised login works from usual cache"""
    login = 'someone'
    password = 'somepass'
    self.createPerson(login, password)
    transaction.commit()
    self.tic()
    kw = {'login':login, 'password': password}
    expected = ('someone', 'someone')
    sent = kw
    self.assertEqual(expected,
        self.erp5_remote_manager.authenticateCredentials(sent))
    # patch Wizard Tool to raise in callRemoteProxyMethod
    from Products.ERP5Wizard.Tool.WizardTool import WizardTool
    original_callRemoteProxyMethod=WizardTool.callRemoteProxyMethod
    try:
      WizardTool.callRemoteProxyMethod = raises_valueerror
      self.assertRaises(ValueError,
          self.portal.portal_wizard.callRemoteProxyMethod)
      self.assertEqual(expected,
        self.erp5_remote_manager.authenticateCredentials(sent))
    finally:
      WizardTool.callRemoteProxyMethod = original_callRemoteProxyMethod

  def test_not_loggable_in_case_of_server_gaierror_no_log_before(self):
    """Check that in case if socket.gaierror is raised login does not work in case of empty ZODB cache"""
    login = 'someone'
    password = 'somepass'
    self.createPerson(login, password)
    transaction.commit()
    self.tic()
    kw = {'login':login, 'password': password}
    # patch Wizard Tool to raise in callRemoteProxyMethod
    from Products.ERP5Wizard.Tool.WizardTool import WizardTool
    original_callRemoteProxyMethod=WizardTool.callRemoteProxyMethod
    try:
      WizardTool.callRemoteProxyMethod = raises_socket_gaierror
      self.assertRaises(socket.gaierror,
          self.portal.portal_wizard.callRemoteProxyMethod)
      self.checkLogin(None, kw)
    finally:
      WizardTool.callRemoteProxyMethod = original_callRemoteProxyMethod

def test_suite():
  suite = unittest.TestSuite()
  suite.addTest(unittest.makeSuite(TestERP5RemoteUserManager))
  return suite