##############################################################################
#
# Copyright (c) 2007 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################

import unittest
import os
import email

from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5OOo.tests.testIngestion import conversion_server_host
from Products.ERP5OOo.tests.testIngestion import FILE_NAME_REGULAR_EXPRESSION
from Products.ERP5OOo.tests.testIngestion import REFERENCE_REGULAR_EXPRESSION

TEST_HOME = os.path.dirname(__file__)

def openTestFile(filename):
  return file(os.path.join(TEST_HOME, 'test_data', 'crm_emails', filename))


class TestCRM(ERP5TypeTestCase):
  def getBusinessTemplateList(self):
    return ('erp5_base', 'erp5_crm',)

  def test_Event_CreateRelatedEvent(self):
    # test action to create a related event from an event
    event_module = self.portal.event_module
    event_module_url = event_module.absolute_url()
    for ptype in self.portal.getPortalEventTypeList():
      event = event_module.newContent(portal_type=ptype)
      redirect = event.Event_createRelatedEvent(
                                     portal_type=ptype,
                                     title='New Title',
                                     description='New Desc')
      self.assert_(redirect.startswith(event_module_url), redirect)
      # event_module_url does not contain trailing slash, so add +1 to its len
      new_id = redirect[len(event_module_url)+1:].split('/', 1)[0]
      new_event = self.portal.event_module._getOb(new_id)
      self.assertEquals(event, new_event.getCausalityValue())
 
  def test_Event_CreateRelatedEventUnauthorized(self):
    # test that we don't get Unauthorized error when invoking the "Create
    # Related Event" without add permission on the module
    event = self.portal.event_module.newContent(portal_type='Letter')
    self.portal.event_module.manage_permission('Add portal content', [], 0)
    redirect = event.Event_createRelatedEvent(
                                     portal_type='Letter',
                                     title='New Title',
                                     description='New Desc')
    
  def test_Ticket_CreateRelatedEvent(self):
    # test action to create a related event from a ticket
    event_module_url = self.portal.event_module.absolute_url()
    ticket = self.portal.meeting_module.newContent(portal_type='Meeting')
    for ptype in self.portal.getPortalEventTypeList():
      # incoming
      redirect = ticket.Ticket_newEvent(direction='incoming',
                                        portal_type=ptype,
                                        title='New Title',
                                        description='New Desc')
      self.assert_(redirect.startswith(event_module_url), redirect)
      new_id = redirect[len(event_module_url)+1:].split('/', 1)[0]
      new_event = self.portal.event_module._getOb(new_id)
      self.assertEquals(ticket, new_event.getFollowUpValue())
      self.assertEquals('new', new_event.getSimulationState())

      # outgoing
      redirect = ticket.Ticket_newEvent(direction='outgoing',
                                        portal_type=ptype,
                                        title='New Title',
                                        description='New Desc')
      self.assert_(redirect.startswith(event_module_url), redirect)
      new_id = redirect[len(event_module_url)+1:].split('/', 1)[0]
      new_event = self.portal.event_module._getOb(new_id)
      self.assertEquals(ticket, new_event.getFollowUpValue())
      self.assertEquals('planned', new_event.getSimulationState())

  def test_Ticket_CreateRelatedEventUnauthorized(self):
    # test that we don't get Unauthorized error when invoking the "Create
    # New Event" without add permission on the module
    ticket = self.portal.meeting_module.newContent(portal_type='Meeting')
    self.portal.event_module.manage_permission('Add portal content', [], 0)
    ticket.Ticket_newEvent(portal_type='Letter',
                           title='New Title',
                           description='New Desc',
                           direction='incoming')
   

class TestCRMMailIngestion(ERP5TypeTestCase):
  """Test Mail Ingestion for CRM.
  """

  def getBusinessTemplateList(self):
    # In this test, We will attach some document portal types in event.
    # So we add DMS and Web.
    return ('erp5_base', 'erp5_crm', 'erp5_web',
            'erp5_dms_mysql_innodb_catalog', 'erp5_dms')

  def afterSetUp(self):
    portal = self.portal

    # XXX this should not be necessary either 
    # set prefered file name regular expression
    pref = portal.portal_preferences.default_site_preference
    pref.setPreferredDocumentFileNameRegularExpression('.*')
    pref.setPreferredDocumentReferenceRegularExpression('.*')
    pref.enable()

    # XXX do this in ERP5Site.py ?
    # sets up content type registry
    ctr = self.portal.content_type_registry
    if ctr.getPredicate('mail_message') is None:
      ctr.addPredicate('mail_message', 'extension')
    ctr.getPredicate('mail_message').edit(extensions='eml')
    ctr.assignTypeName('mail_message', 'Mail Message')
    ctr.reorderPredicate('mail_message', 0)

    # create customer organisation and person
    if 'customer' not in portal.organisation_module.objectIds():
      customer_organisation = portal.organisation_module.newContent(
              id='customer',
              portal_type='Organisation',
              title='Customer')
      portal.person_module.newContent(
              id='sender',
              title='Sender',
              subordination_value=customer_organisation,
              default_email_text='sender@customer.com')
      # also create the recipient
      portal.person_module.newContent(
              id='me',
              title='Me',
              default_email_text='me@erp5.org')

      # make sure customers are available to catalog
      get_transaction().commit()
      self.tic()

  def beforeTearDown(self):
    get_transaction().abort()
    # clear modules if necessary
    for module in (self.portal.event_module,):
      module.manage_delObjects(list(module.objectIds()))
    get_transaction().commit()
    self.tic()

  def _ingestMail(self, filename):
    """ingest an email from the mail in data dir named `filename`"""
    data = file(os.path.join(os.path.dirname(__file__),
                  'test_data', 'crm_emails', filename)).read()
    return self.portal.portal_contributions.newContent(
                    portal_type='Mail Message',
                    container_path='event_module',
                    file_name='postfix_mail.eml',
                    data=data)

  def test_findTypeByName_MailMessage(self):
    # without this, ingestion will not work
    registry = self.portal.content_type_registry
    self.assertEquals('Mail Message',
        registry.findTypeName('postfix_mail.eml', 'message/rfc822', ''))

  def test_document_creation(self):
    # CRM email ingestion creates a Mail Message in event_module
    event = self._ingestMail('simple')
    self.assertEquals(len(self.portal.event_module), 1)
    self.assertEquals(event, self.portal.event_module.contentValues()[0])
    self.assertEquals('Mail Message', event.getPortalType())
    self.assertEquals('message/rfc822', event.getContentType())
  
  def test_title(self):
    # tite is found automatically, based on the Subject: header in the mail
    event = self._ingestMail('simple')
    self.assertEquals('Simple Mail Test', event.getTitle())

  def test_asText(self):
    # asText requires portal_transforms
    event = self._ingestMail('simple')
    self.assertEquals('Hello,\nContent of the mail.\n', str(event.asText()))
 
  def test_sender(self):
    # source is found automatically, based on the From: header in the mail
    event = self._ingestMail('simple')
    # metadata discovery is done in an activity
    get_transaction().commit()
    self.tic()
    self.assertEquals('person_module/sender', event.getSource())

  def test_recipient(self):
    # destination is found automatically, based on the To: header in the mail
    event = self._ingestMail('simple')
    get_transaction().commit()
    self.tic()
    self.assertEquals('person_module/me', event.getDestination())

  def test_follow_up(self):
    # follow up is found automatically, based on the content of the mail, and
    # what you defined in preference regexpr.
    # But, we don't want it to associate with the first campaign simply
    # because we searched against nothing
    self.portal.campaign_module.newContent(portal_type='Campaign')
    get_transaction().commit()
    self.tic()
    event = self._ingestMail('simple')
    get_transaction().commit()
    self.tic()
    self.assertEquals(None, event.getFollowUp())
 
 
## TODO:
##  def test_forwarder_mail(self):
##    # if there is a forwarded email, import the forwarded email
##    event = self._ingestMail('forwarded')
##
##  def test_attachements(self):
##    event = self._ingestMail('with_attachements')
##
##  def test_encoding(self):
##    event = self._ingestMail('utf8')
##

class TestCRMMailSend(ERP5TypeTestCase):
  """Test Mail Sending for CRM
  """

  def getBusinessTemplateList(self):
    # In this test, We will attach some document portal types in event.
    # So we add DMS and Web.
    return ('erp5_base', 'erp5_crm', 'erp5_web',
            'erp5_dms_mysql_innodb_catalog', 'erp5_dms')

  def afterSetUp(self):
    portal = self.portal

    # create customer organisation and person
    if 'customer' not in portal.organisation_module.objectIds():
      customer_organisation = portal.organisation_module.newContent(
              id='customer',
              portal_type='Organisation',
              title='Customer')
      portal.person_module.newContent(
              id='sender',
              title='Sender',
              subordination_value=customer_organisation,
              default_email_text='sender@customer.com')
      # also create the recipient
      portal.person_module.newContent(
              id='me',
              title='Me',
              default_email_text='me@erp5.org')

    # set preference
    default_pref = self.portal.portal_preferences.default_site_preference
    default_pref.setPreferredOoodocServerAddress(conversion_server_host[0])
    default_pref.setPreferredOoodocServerPortNumber(conversion_server_host[1])
    default_pref.setPreferredDocumentFileNameRegularExpression(FILE_NAME_REGULAR_EXPRESSION)
    default_pref.setPreferredDocumentReferenceRegularExpression(REFERENCE_REGULAR_EXPRESSION)
    default_pref.enable()

    # make sure customers are available to catalog
    get_transaction().commit()
    self.tic()

  def beforeTearDown(self):
    get_transaction().abort()
    # clear modules if necessary
    for module in (self.portal.event_module,):
      module.manage_delObjects(list(module.objectIds()))
    get_transaction().commit()
    self.tic()

  def test_MailAttachmentPdf(self):
    """
    Make sure that pdf document is correctly attached in email
    """
    # Add a document which will be attached.

    def add_document(filename, id, container, portal_type):
      f = openTestFile(filename)
      document = container.newContent(id=id, portal_type=portal_type)
      document.edit(file=f, reference=filename)
      return document

    # pdf
    document_pdf = add_document('sample_attachment.pdf', '1',
                                self.portal.document_module, 'PDF')

    get_transaction().commit()
    self.tic()
    get_transaction().commit()

    # Add a ticket
    ticket = self.portal.campaign_module.newContent(id='1',
                                                    portal_type='Campaign',
                                                    title='Advertisement')
    # Create a event
    ticket.Ticket_newEvent(portal_type='Mail Message',
                           title='Our new product',
                           description='Buy this now!',
                           direction='outgoing')

    # Set sender and attach a document to the event.
    event = self.portal.event_module.objectValues()[0]
    event.edit(source='person_module/me',
               destination='person_module/sender',
               aggregate=document_pdf.getRelativeUrl(),
               text_content='This is an advertisement mail.')

    mail_text = event.send(download=True)

    # Check mail text.
    message = email.message_from_string(mail_text)
    part = None
    for i in message.get_payload():
      if i.get_content_type()=='text/plain':
        part = i
    self.assertEqual(part.get_payload(decode=True), event.getTextContent())

    # Check attachment
    # pdf
    self.assert_('sample_attachment.pdf' in 
                 [i.get_filename() for i in message.get_payload()])
    part = None
    for i in message.get_payload():
      if i.get_filename()=='sample_attachment.pdf':
        part = i
    self.assertEqual(part.get_payload(decode=True), str(document_pdf.getData()))

  def test_MailAttachmentText(self):
    """
    Make sure that text document is correctly attached in email
    """
    # Add a document which will be attached.

    def add_document(filename, id, container, portal_type):
      f = openTestFile(filename)
      document = container.newContent(id=id, portal_type=portal_type)
      document.edit(file=f, reference=filename)
      return document

    # odt
    document_odt = add_document('sample_attachment.odt', '2',
                                self.portal.document_module, 'Text')
    
    get_transaction().commit()
    self.tic()
    get_transaction().commit()

    # Add a ticket
    ticket = self.portal.campaign_module.newContent(id='1',
                                                    portal_type='Campaign',
                                                    title='Advertisement')
    # Create a event
    ticket.Ticket_newEvent(portal_type='Mail Message',
                           title='Our new product',
                           description='Buy this now!',
                           direction='outgoing')

    # Set sender and attach a document to the event.
    event = self.portal.event_module.objectValues()[0]
    event.edit(source='person_module/me',
               destination='person_module/sender',
               aggregate=document_odt.getRelativeUrl(),
               text_content='This is an advertisement mail.')

    mail_text = event.send(download=True)

    # Check mail text.
    message = email.message_from_string(mail_text)
    part = None
    for i in message.get_payload():
      if i.get_content_type()=='text/plain':
        part = i
    self.assertEqual(part.get_payload(decode=True), event.getTextContent())

    # Check attachment
    # odt
    self.assert_('sample_attachment.odt' in 
                 [i.get_filename() for i in message.get_payload()])
    part = None
    for i in message.get_payload():
      if i.get_filename()=='sample_attachment.odt':
        part = i
    self.assert_(len(part.get_payload(decode=True))>0)

  def test_MailAttachmentFile(self):
    """
    Make sure that file document is correctly attached in email
    """
    # Add a document which will be attached.

    def add_document(filename, id, container, portal_type):
      f = openTestFile(filename)
      document = container.newContent(id=id, portal_type=portal_type)
      document.edit(file=f, reference=filename)
      return document

    # zip
    document_zip = add_document('sample_attachment.zip', '3',
                                self.portal.document_module, 'File')

    get_transaction().commit()
    self.tic()
    get_transaction().commit()

    # Add a ticket
    ticket = self.portal.campaign_module.newContent(id='1',
                                                    portal_type='Campaign',
                                                    title='Advertisement')
    # Create a event
    ticket.Ticket_newEvent(portal_type='Mail Message',
                           title='Our new product',
                           description='Buy this now!',
                           direction='outgoing')

    # Set sender and attach a document to the event.
    event = self.portal.event_module.objectValues()[0]
    event.edit(source='person_module/me',
               destination='person_module/sender',
               aggregate=document_zip.getRelativeUrl(),
               text_content='This is an advertisement mail.')

    mail_text = event.send(download=True)

    # Check mail text.
    message = email.message_from_string(mail_text)
    part = None
    for i in message.get_payload():
      if i.get_content_type()=='text/plain':
        part = i
    self.assertEqual(part.get_payload(decode=True), event.getTextContent())

    # Check attachment
    # zip
    self.assert_('sample_attachment.zip' in 
                 [i.get_filename() for i in message.get_payload()])
    part = None
    for i in message.get_payload():
      if i.get_filename()=='sample_attachment.zip':
        part = i
    self.assert_(len(part.get_payload(decode=True))>0)

  def test_MailAttachmentImage(self):
    """
    Make sure that image document is correctly attached in email
    """
    # Add a document which will be attached.

    def add_document(filename, id, container, portal_type):
      f = openTestFile(filename)
      document = container.newContent(id=id, portal_type=portal_type)
      document.edit(file=f, reference=filename)
      return document

    # gif
    document_gif = add_document('sample_attachment.gif', '4',
                                self.portal.image_module, 'Image')

    get_transaction().commit()
    self.tic()
    get_transaction().commit()

    # Add a ticket
    ticket = self.portal.campaign_module.newContent(id='1',
                                                    portal_type='Campaign',
                                                    title='Advertisement')
    # Create a event
    ticket.Ticket_newEvent(portal_type='Mail Message',
                           title='Our new product',
                           description='Buy this now!',
                           direction='outgoing')

    # Set sender and attach a document to the event.
    event = self.portal.event_module.objectValues()[0]
    event.edit(source='person_module/me',
               destination='person_module/sender',
               aggregate=document_gif.getRelativeUrl(),
               text_content='This is an advertisement mail.')

    mail_text = event.send(download=True)

    # Check mail text.
    message = email.message_from_string(mail_text)
    part = None
    for i in message.get_payload():
      if i.get_content_type()=='text/plain':
        part = i
    self.assertEqual(part.get_payload(decode=True), event.getTextContent())

    # Check attachment
    # gif
    self.assert_('sample_attachment.gif' in 
                 [i.get_filename() for i in message.get_payload()])
    part = None
    for i in message.get_payload():
      if i.get_filename()=='sample_attachment.gif':
        part = i
    self.assertEqual(part.get_payload(decode=True), str(document_gif.getData()))

  def test_MailAttachmentWebPage(self):
    """
    Make sure that webpage document is correctly attached in email
    """
    # Add a document which will be attached.

    document_html = self.portal.web_page_module.newContent(id='5',
                                                           portal_type='Web Page')
    document_html.edit(text_content='<html><body>Hello world!</body></html>',
                       reference='sample_attachment.html')

    get_transaction().commit()
    self.tic()
    get_transaction().commit()

    # Add a ticket
    ticket = self.portal.campaign_module.newContent(id='1',
                                                    portal_type='Campaign',
                                                    title='Advertisement')
    # Create a event
    ticket.Ticket_newEvent(portal_type='Mail Message',
                           title='Our new product',
                           description='Buy this now!',
                           direction='outgoing')

    # Set sender and attach a document to the event.
    event = self.portal.event_module.objectValues()[0]
    event.edit(source='person_module/me',
               destination='person_module/sender',
               aggregate=document_html.getRelativeUrl(),
               text_content='This is an advertisement mail.')

    mail_text = event.send(download=True)

    # Check mail text.
    message = email.message_from_string(mail_text)
    part = None
    for i in message.get_payload():
      if i.get_content_type()=='text/plain':
        part = i
    self.assertEqual(part.get_payload(decode=True), event.getTextContent())

    # Check attachment
    # html
    self.assert_('sample_attachment.html' in 
                 [i.get_filename() for i in message.get_payload()])
    part = None
    for i in message.get_payload():
      if i.get_filename()=='sample_attachment.html':
        part = i
    self.assertEqual(part.get_payload(decode=True),
                     str(document_html.getTextContent()))
    self.assertEqual(part.get_content_type(), 'text/html')

  def test_MailRespond(self):
    """
    Test we can answer an incoming event and quote it
    """
    # Add a ticket
    ticket = self.portal.campaign_module.newContent(id='1',
                                                    portal_type='Campaign',
                                                    title='Advertisement')
    # Create a event
    ticket.Ticket_newEvent(portal_type='Mail Message',
                           title='Our new product',
                           description='Buy this now!',
                           direction='incoming')

    # Set sender and attach a document to the event.
    event = self.portal.event_module.objectValues()[0]
    event.edit(source='person_module/me',
               destination='person_module/sender',
               text_content='This is an advertisement mail.')
    first_event_id = event.getId()
    self.getWorkflowTool().doActionFor(event, 'respond_action', 
                                       wf_id='event_workflow',
                                       respond_event_quotation = 1,
                                       respond_event_portal_type = "Mail Message",
                                       respond_event_title = "Answer",
                                       respond_event_description = "Answer Advertissement Mail",
                                       )

    self.assertEqual(event.getSimulationState(), "responded")
    # answer event must have been created
    self.assertEqual(len(self.portal.event_module), 2)
    for ev in self.portal.event_module.objectValues():
      if ev.getId() != first_event_id:
        answer_event = ev

    # check properties of answer event
    self.assertEqual(answer_event.getSimulationState(), "planned")
    self.assertEqual(answer_event.getCausality(), event.getRelativeUrl())
    self.assertEqual(answer_event.getDestination(), 'person_module/me')
    self.assertEqual(answer_event.getSource(), 'person_module/sender')
    self.assertEqual(answer_event.getTextContent(), '> This is an advertisement mail.')


  def test_MailAttachmentFileWithoutDMS(self):
    """
    Make sure that file document is correctly attached in email
    """
    # Add a document on a person which will be attached.

    def add_document(filename, id, container, portal_type):
      f = openTestFile(filename)
      document = container.newContent(id=id, portal_type=portal_type)
      document.edit(file=f, reference=filename)
      return document

    # txt
    document_txt = add_document('sample_attachment.txt', '2',
                                self.portal.person_module['me'], 'File')

    get_transaction().commit()
    self.tic()
    get_transaction().commit()

    # Add a ticket
    ticket = self.portal.campaign_module.newContent(id='1',
                                                    portal_type='Campaign',
                                                    title='Advertisement')
    # Create a event
    ticket.Ticket_newEvent(portal_type='Mail Message',
                           title='Our new product',
                           description='Buy this now!',
                           direction='outgoing')

    # Set sender and attach a document to the event.
    event = self.portal.event_module.objectValues()[0]
    event.edit(source='person_module/me',
               destination='person_module/sender',
               aggregate=document_txt.getRelativeUrl(),
               text_content='This is an advertisement mail.')

    mail_text = event.send(download=True)

    # Check mail text.
    message = email.message_from_string(mail_text)
    part = None
    for i in message.get_payload():
      if i.get_content_type()=='text/plain':
        part = i
    self.assertEqual(part.get_payload(decode=True), event.getTextContent())

    # Check attachment
    # txt
    self.assert_('sample_attachment.txt' in 
                 [i.get_filename() for i in message.get_payload()])
    part = None
    for i in message.get_payload():
      if i.get_filename()=='sample_attachment.txt':
        part = i
    self.assert_(len(part.get_payload(decode=True))>0)



  def test_MailAttachmentImageWithoutDMS(self):
    """
    Make sure that image document is correctly attached in email without dms
    """
    # Add a document on a person which will be attached.

    def add_document(filename, id, container, portal_type):
      f = openTestFile(filename)
      document = container.newContent(id=id, portal_type=portal_type)
      document.edit(file=f, reference=filename)
      return document

    # gif
    document_gif = add_document('sample_attachment.gif', '1',
                                self.portal.person_module['me'], 'Image')

    get_transaction().commit()
    self.tic()
    get_transaction().commit()

    # Add a ticket
    ticket = self.portal.campaign_module.newContent(id='1',
                                                    portal_type='Campaign',
                                                    title='Advertisement')
    # Create a event
    ticket.Ticket_newEvent(portal_type='Mail Message',
                           title='Our new product',
                           description='Buy this now!',
                           direction='outgoing')

    # Set sender and attach a document to the event.
    event = self.portal.event_module.objectValues()[0]
    event.edit(source='person_module/me',
               destination='person_module/sender',
               aggregate=document_gif.getRelativeUrl(),
               text_content='This is an advertisement mail.')

    mail_text = event.send(download=True)

    # Check mail text.
    message = email.message_from_string(mail_text)
    part = None
    for i in message.get_payload():
      if i.get_content_type()=='text/plain':
        part = i
    self.assertEqual(part.get_payload(decode=True), event.getTextContent())

    # Check attachment
    # gif
    self.assert_('sample_attachment.gif' in 
                 [i.get_filename() for i in message.get_payload()])
    part = None
    for i in message.get_payload():
      if i.get_filename()=='sample_attachment.gif':
        part = i
    self.assertEqual(part.get_payload(decode=True), str(document_gif.getData()))


def test_suite():
  suite = unittest.TestSuite()
  suite.addTest(unittest.makeSuite(TestCRM))
  suite.addTest(unittest.makeSuite(TestCRMMailIngestion))
  suite.addTest(unittest.makeSuite(TestCRMMailSend))
  return suite