##############################################################################
#
# Copyright (c) 2008 Nexedi SA and Contributors. All Rights Reserved.
#          Romain Courteaud <romain@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################

import unittest

from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from AccessControl.SecurityManagement import newSecurityManager
from AccessControl.SecurityManagement import getSecurityManager
from Products.ERP5Type.tests.Sequence import SequenceList
from Products.ERP5Type.tests.utils import DummyMailHost
import email
from email.header import decode_header, make_header
from email.utils import parseaddr

# Copied from ERP5Type/patches/CMFMailIn.py
def decode_email(file):
  # Prepare result
  theMail = {
    'attachment_list': [],
    'body': '',
    # Place all the email header in the headers dictionary in theMail
    'headers': {}
  }
  # Get Message
  msg = email.message_from_string(file)
  # Back up original file
  theMail['__original__'] = file
  # Recode headers to UTF-8 if needed
  for key, value in msg.items():
    decoded_value_list = decode_header(value)
    unicode_value = make_header(decoded_value_list)
    new_value = unicode_value.__unicode__().encode('utf-8')
    theMail['headers'][key.lower()] = new_value
  # Filter mail addresses
  for header in ('resent-to', 'resent-from', 'resent-cc', 'resent-sender',
                 'to', 'from', 'cc', 'sender', 'reply-to'):
    header_field = theMail['headers'].get(header)
    if header_field:
        theMail['headers'][header] = parseaddr(header_field)[1]
  # Get attachments
  body_found = 0
  for part in msg.walk():
    content_type = part.get_content_type()
    file_name = part.get_filename()
    # multipart/* are just containers
    # XXX Check if data is None ?
    if content_type.startswith('multipart'):
      continue
    # message/rfc822 contains attached email message
    # next 'part' will be the message itself
    # so we ignore this one to avoid doubling
    elif content_type == 'message/rfc822':
      continue
    elif content_type in ("text/plain", "text/html"):
      charset = part.get_content_charset()
      payload = part.get_payload(decode=True)
      #LOG('CMFMailIn -> ',0,'charset: %s, payload: %s' % (charset,payload))
      if charset:
        payload = unicode(payload, charset).encode('utf-8')
      if body_found:
        # Keep the content type
        theMail['attachment_list'].append((file_name,
                                           content_type, payload))
      else:
        theMail['body'] = payload
        body_found = 1
    else:
      payload = part.get_payload(decode=True)
      # Keep the content type
      theMail['attachment_list'].append((file_name, content_type,
                                         payload))
  return theMail

class TestNotificationTool(ERP5TypeTestCase):
  """
  Test notification tool
  """

  def getBusinessTemplateList(self):
    return ('erp5_base',)

  def getTitle(self):
    return "Notification Tool"

  def createUser(self, name, role_list):
    user_folder = self.getPortal().acl_users
    user_folder._doAddUser(name, 'password', role_list, [])

  def changeUser(self, name):
    self.old_user = getSecurityManager().getUser()
    self.loginByUserName(name)

  def changeToPreviousUser(self):
    newSecurityManager(None, self.old_user)

  def afterSetUp(self):
    self.createUser('erp5user', ['Associate', 'Auditor', 'Author'])
    self.createUser('manager', ['Manager'])
    portal = self.getPortal()
    if 'MailHost' in portal.objectIds():
      portal.manage_delObjects(['MailHost'])
    portal._setObject('MailHost', DummyMailHost('MailHost'))
    self.portal.MailHost.reset()
    portal.email_from_address = 'site@example.invalid'
    self.portal.portal_caches.clearAllCache()
    self.tic()
    self.loginByUserName('erp5user')

  def beforeTearDown(self):
    self.abort()
    # clear modules if necessary
    self.loginByUserName('manager')
    self.portal.person_module.manage_delObjects(
            list(self.portal.person_module.objectIds()))
    self.tic()

  def stepAddUserA(self, sequence=None, sequence_list=None, **kw):
    """
    Create a user
    """
    person = self.portal.person_module.newContent(portal_type="Person",
                                                  default_email_text="userA@example.invalid")
    self.changeUser('manager')
    person.edit(reference="userA", password="passwordA")
    assignment = person.newContent(portal_type='Assignment')
    assignment.open()
    self.changeToPreviousUser()
    sequence['user_a_id'] = person.Person_getUserId()

  def stepAddUserB(self, sequence=None, sequence_list=None, **kw):
    """
    Create a user
    """
    person = self.portal.person_module.newContent(portal_type="Person",
                                                  default_email_text="userB@example.invalid")
    self.changeUser('manager')
    person.edit(reference="userB", password="passwordA")
    assignment = person.newContent(portal_type='Assignment')
    assignment.open()
    self.changeToPreviousUser()
    sequence['user_b_id'] = person.Person_getUserId()

  def stepAddUserWithoutEmail(self, sequence=None, sequence_list=None, **kw):
    """
    Create a user
    """
    person = self.portal.person_module.newContent(portal_type="Person")

    self.changeUser('manager')
    person.edit(reference="userWithoutEmail", password="passwordA")
    assignment = person.newContent(portal_type='Assignment')
    assignment.open()
    self.changeToPreviousUser()
    sequence['user_without_email_id'] = person.Person_getUserId()

  def test_01_defaultBehaviour(self):
    self.assertRaises(
      TypeError,
      self.portal.portal_notifications.sendMessage,
    )
    self.assertRaises(
      TypeError,
      self.portal.portal_notifications,
    )

  def stepCheckNotificationWithoutSender(self, sequence=None,
                                         sequence_list=None, **kw):
    """
    Check that notification works without sender
    """
    self.portal.portal_notifications.sendMessage(
        recipient=sequence['user_a_id'], subject='Subject', message='Message')
    last_message = self.portal.MailHost._last_message
    self.assertNotEquals((), last_message)
    mfrom, mto, messageText = last_message
    self.assertEqual('Portal Administrator <site@example.invalid>', mfrom)
    self.assertEqual(['userA@example.invalid'], mto)

  def test_02_noSender(self):
    sequence_list = SequenceList()
    sequence_string = '\
        AddUserA \
        Tic \
        CheckNotificationWithoutSender \
        '
    sequence_list.addSequenceString(sequence_string)
    sequence_list.play(self)

  def stepCheckNotificationFailsWithoutSubject(self, sequence=None,
                                               sequence_list=None, **kw):
    """
    Check that notification fails when no subject is given
    """
    self.assertRaises(
      TypeError,
      self.portal.portal_notifications.sendMessage,
        recipient=sequence['user_a_id'], message='Message'
    )

  def test_03_noSubject(self):
    sequence_list = SequenceList()
    sequence_string = '\
        AddUserA \
        Tic \
        CheckNotificationFailsWithoutSubject \
        '
    sequence_list.addSequenceString(sequence_string)
    sequence_list.play(self)

  def test_04_noRecipient(self):
    self.portal.portal_notifications.sendMessage(
        subject='Subject', message='Message')
    last_message = self.portal.MailHost._last_message
    self.assertNotEquals((), last_message)
    mfrom, mto, messageText = last_message
    self.assertEqual('Portal Administrator <site@example.invalid>', mfrom)
    self.assertEqual(['site@example.invalid'], mto)

  def stepCheckNotificationWithoutMessage(self, sequence=None,
                                          sequence_list=None, **kw):
    """
    Check that notification is send when no message is passed
    """
    self.portal.portal_notifications.sendMessage(
        recipient=sequence['user_a_id'], subject='Subject', )
    last_message = self.portal.MailHost._last_message
    self.assertNotEquals((), last_message)
    mfrom, mto, messageText = last_message
    self.assertEqual('Portal Administrator <site@example.invalid>', mfrom)
    self.assertEqual(['userA@example.invalid'], mto)

  def test_05_noMessage(self):
    sequence_list = SequenceList()
    sequence_string = '\
        AddUserA \
        Tic \
        CheckNotificationWithoutMessage \
        '
    sequence_list.addSequenceString(sequence_string)
    sequence_list.play(self)

  def stepCheckSimpleNotification(self, sequence=None,
                                  sequence_list=None, **kw):
    """
    Check that notification is send in standard use case
    """
    self.portal.portal_notifications.sendMessage(
        recipient=sequence['user_a_id'], subject='Subject', message='Message')
    last_message = self.portal.MailHost._last_message
    self.assertNotEquals((), last_message)
    mfrom, mto, messageText = last_message
    self.assertEqual('Portal Administrator <site@example.invalid>', mfrom)
    self.assertEqual(['userA@example.invalid'], mto)
    # Check Message
    mail_dict = decode_email(messageText)
    self.assertEqual(mail_dict['headers']['subject'], 'Subject')
    self.assertEqual(mail_dict['body'], 'Message')
    self.assertSameSet([], mail_dict['attachment_list'])

  def test_06_simpleMessage(self):
    sequence_list = SequenceList()
    sequence_string = '\
        AddUserA \
        Tic \
        CheckSimpleNotification \
        '
    sequence_list.addSequenceString(sequence_string)
    sequence_list.play(self)

  def stepCheckNotificationWithAttachment(self, sequence=None,
                                          sequence_list=None, **kw):
    """
    Check attachment
    """
    self.portal.portal_notifications.sendMessage(
        recipient=sequence['user_a_id'], subject='Subject', message='Message',
        attachment_list=[
          {
            'name': 'Attachment 1',
            'content': 'Text 1',
            'mime_type': 'text/plain',
          },
          {
            'name': 'Attachment 2',
            'content': 'Text 2',
            'mime_type': 'application/octet-stream',
          },
        ])
    last_message = self.portal.MailHost._last_message

    self.assertNotEquals((), last_message)
    mfrom, mto, messageText = last_message
    self.assertEqual('Portal Administrator <site@example.invalid>', mfrom)
    self.assertEqual(['userA@example.invalid'], mto)

    # Check Message
    mail_dict = decode_email(messageText)
    self.assertEqual(mail_dict['headers']['subject'], 'Subject')
    self.assertEqual(mail_dict['body'], 'Message')
    self.assertSameSet([('Attachment 1', 'text/plain', 'Text 1'),
                        ('Attachment 2', 'application/octet-stream', 'Text 2')],
                       mail_dict['attachment_list'])

  def test_07_AttachmentMessage(self):
    sequence_list = SequenceList()
    sequence_string = '\
        AddUserA \
        Tic \
        CheckNotificationWithAttachment \
        '
    sequence_list.addSequenceString(sequence_string)
    sequence_list.play(self)

  def stepCheckMultiRecipientNotification(self, sequence=None,
                                          sequence_list=None, **kw):
    """
    Check that notification can be send to multiple recipient
    """
    self.portal.portal_notifications.sendMessage(
        recipient=[sequence['user_a_id'], sequence['user_b_id']], subject='Subject', message='Message')
    last_message = self.portal.MailHost._last_message

    self.assertNotEquals((), last_message)
    mfrom, mto, messageText = last_message
    self.assertEqual('Portal Administrator <site@example.invalid>', mfrom)
    self.assertEqual(['userB@example.invalid'], mto)

    previous_message = self.portal.MailHost._previous_message
    self.assertNotEquals((), previous_message)
    mfrom, mto, messageText = previous_message
    self.assertEqual('Portal Administrator <site@example.invalid>', mfrom)
    self.assertEqual(['userA@example.invalid'], mto)

  def test_08_MultiRecipient(self):
    sequence_list = SequenceList()
    sequence_string = '\
        AddUserA \
        AddUserB \
        Tic \
        CheckMultiRecipientNotification \
        '
    sequence_list.addSequenceString(sequence_string)
    sequence_list.play(self)

  def stepCheckPersonWithoutEmail(self, sequence=None,
                                  sequence_list=None, **kw):
    """
    Check that notification fails when the destination hasn't a email adress
    """
    with self.assertRaisesRegexp(ValueError, "email must be set"):
      self.portal.portal_notifications.sendMessage(
          recipient=sequence['user_without_email_id'], subject='Subject', message='Message')

  def test_08_PersonWithoutEmail(self):
    sequence_list = SequenceList()
    sequence_string = '\
        AddUserWithoutEmail \
        Tic \
        CheckPersonWithoutEmail \
        '
    sequence_list.addSequenceString(sequence_string)
    sequence_list.play(self)

  def test_09_InvalideRecipient(self):
    with self.assertRaises(ValueError):
      self.portal.portal_notifications.sendMessage(
          recipient='UnknowUser', subject='Subject', message='Message')

  def stepCheckPersonNotification(self, sequence=None,
                                  sequence_list=None, **kw):
    """
    Check that notification is send when recipient is a Person
    """
    person = self.portal.Base_getUserValueByUserId(sequence['user_a_id'])
    self.portal.portal_notifications.sendMessage(
        recipient=person.getObject(), subject='Subject', message='Message')
    last_message = self.portal.MailHost._last_message
    self.assertNotEquals((), last_message)
    mfrom, mto, messageText = last_message
    self.assertEqual('Portal Administrator <site@example.invalid>', mfrom)
    self.assertEqual(['userA@example.invalid'], mto)
    # Check Message
    mail_dict = decode_email(messageText)
    self.assertEqual(mail_dict['headers']['subject'], 'Subject')
    self.assertEqual(mail_dict['body'], 'Message')
    self.assertSameSet([], mail_dict['attachment_list'])

  def test_10_PersonNotification(self):
    sequence_list = SequenceList()
    sequence_string = '\
        AddUserA \
        Tic \
        CheckPersonNotification\
        '
    sequence_list.addSequenceString(sequence_string)
    sequence_list.play(self)

  def stepCheckNotificationPlainTextFormat(self, sequence=None,
                                  sequence_list=None, **kw):
    """
    Check that if notification format is plain text.
    """

    message = """\
> Hello, will you go to the park on sunday?
Yes, I will go."""

    self.portal.portal_notifications.sendMessage(
        recipient=sequence['user_a_id'], subject='Subject',
        message_text_format='text/plain', message=message)
    last_message = self.portal.MailHost._last_message
    self.assertNotEquals((), last_message)
    mfrom, mto, messageText = last_message
    self.assertEqual('Portal Administrator <site@example.invalid>', mfrom)
    self.assertEqual(['userA@example.invalid'], mto)
    # Check Message
    mail_dict = decode_email(messageText)
    self.assertEqual(mail_dict['headers']['subject'], 'Subject')
    self.assertEqual(mail_dict['body'], message)
    self.assertSameSet([], mail_dict['attachment_list'])

  def test_11_TextMessage(self):
    sequence_list = SequenceList()
    sequence_string = '\
        AddUserA \
        Tic \
        CheckNotificationPlainTextFormat \
        '
    sequence_list.addSequenceString(sequence_string)
    sequence_list.play(self)

  def stepCheckNotificationHtmlFormat(self, sequence=None,
                                  sequence_list=None, **kw):
    """
    Check that if notification format is html.
    """

    message = """<a href="http://www.erp5.com/">Click Here!!</a>"""

    self.portal.portal_notifications.sendMessage(
        recipient=sequence['user_a_id'], subject='Subject',
        message_text_format='text/html', message=message)
    last_message, = self.portal.MailHost._message_list
    mfrom, mto, messageText = last_message
    self.assertEqual('Portal Administrator <site@example.invalid>', mfrom)
    self.assertEqual(['userA@example.invalid'], mto)
    # Check Message
    mail_dict = decode_email(messageText)
    self.assertEqual(mail_dict['headers']['subject'], 'Subject')
    self.assertEqual(mail_dict['body'], '<html><body>%s</body></html>' % message)
    self.assertSameSet([], mail_dict['attachment_list'])

  def test_12_HtmlMessage(self):
    sequence_list = SequenceList()
    sequence_string = '\
        AddUserA \
        Tic \
        CheckNotificationHtmlFormat \
        '
    sequence_list.addSequenceString(sequence_string)
    sequence_list.play(self)

  def stepCheckNotificationWithoutPermissionOnRecipient(self, sequence=None):
    """
    Check that notification is send by user who cannot see recipient
    """
    self.logout()
    self.portal.portal_notifications.sendMessage(
        recipient=sequence['user_a_id'], subject='Subject', message='Message')
    last_message = self.portal.MailHost._last_message
    self.assertNotEquals((), last_message)

  def test_permission_on_recipient_not_needed(self):
    """Notification Tool can be used to send Messages even when user does not
    have permission on sender or recipent documents.
    """
    sequence_list = SequenceList()
    sequence_string = '\
        AddUserA \
        Tic \
        CheckNotificationWithoutPermissionOnRecipient \
        '
    sequence_list.addSequenceString(sequence_string)
    sequence_list.play(self)


class TestNotificationToolWithCRM(TestNotificationTool):
  """Make sure that notification tool works with crm"""

  def getTitle(self):
    return "Notification Tool With CRM"

  def getBusinessTemplateList(self):
    return ('erp5_base', 'erp5_crm')

  def beforeTearDown(self):
    TestNotificationTool.beforeTearDown(self)
    self.portal.event_module.manage_delObjects(
            list(self.portal.event_module.objectIds()))
    self.tic()

  def test_store_as_event(self):
    # passing store_as_event=True to NotificationTool.sendMessage will store
    # the message in an event
    person = self.portal.person_module.newContent(
        portal_type="Person",
        default_email_text="userA@example.invalid")
    self.tic()
    self.portal.portal_notifications.sendMessage(
                                  store_as_event=True,
                                  recipient=person,
                                  subject='Subject',
                                  message='Message')
    self.tic()
    last_message, = self.portal.MailHost._message_list
    mfrom, mto, messageText = last_message
    mail_dict = decode_email(messageText)
    self.assertEqual('Portal Administrator <site@example.invalid>', mfrom)
    self.assertEqual(['userA@example.invalid'], mto)

    # check that an event has been created
    event_list = self.portal.event_module.contentValues()
    self.assertEqual(1, len(event_list))

    event = event_list[0]
    self.assertEqual('Mail Message', event.getPortalTypeName())
    self.assertEqual('Subject', event.getTitle())
    self.assertEqual('Message', event.getTextContent())
    self.assertNotEquals(None, event.getStartDate())
    self.assertEqual(person, event.getDestinationValue())
    self.assertEqual('started', event.getSimulationState())

  def test_check_consistency(self):
    # This test only applies when erp5_crm is installed and we have
    # a constraint on mail message
    person_without_email = self.portal.person_module.newContent(
      portal_type="Person",)
    self.tic()
    with self.assertRaises(ValueError) as exception_context:
      self.portal.portal_notifications.sendMessage(
        check_consistency=True,
        recipient=person_without_email,
        subject='Subject',
        message='Message')
    consistency_message_list, = exception_context.exception.args
    consistency_message, = consistency_message_list
    from Products.ERP5Type.ConsistencyMessage import ConsistencyMessage
    self.assertIsInstance(consistency_message, ConsistencyMessage)
    self.assertEqual(
      'Recipients email must be defined',
      str(consistency_message.getTranslatedMessage()))


def test_suite():
  suite = unittest.TestSuite()
  suite.addTest(unittest.makeSuite(TestNotificationTool))
  suite.addTest(unittest.makeSuite(TestNotificationToolWithCRM))
  return suite