testNotificationTool.py 19.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
##############################################################################
#
# Copyright (c) 2008 Nexedi SA and Contributors. All Rights Reserved.
#          Romain Courteaud <romain@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################

import unittest

from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from AccessControl.SecurityManagement import newSecurityManager
33
from AccessControl.SecurityManagement import getSecurityManager
34 35 36
from Products.ERP5Type.tests.Sequence import SequenceList
from Products.ERP5Type.tests.utils import DummyMailHost
import email
37 38
from email.header import decode_header, make_header
from email.utils import parseaddr
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78

# Copied from ERP5Type/patches/CMFMailIn.py
def decode_email(file):
  # Prepare result
  theMail = {
    'attachment_list': [],
    'body': '',
    # Place all the email header in the headers dictionary in theMail
    'headers': {}
  }
  # Get Message
  msg = email.message_from_string(file)
  # Back up original file
  theMail['__original__'] = file
  # Recode headers to UTF-8 if needed
  for key, value in msg.items():
    decoded_value_list = decode_header(value)
    unicode_value = make_header(decoded_value_list)
    new_value = unicode_value.__unicode__().encode('utf-8')
    theMail['headers'][key.lower()] = new_value
  # Filter mail addresses
  for header in ('resent-to', 'resent-from', 'resent-cc', 'resent-sender', 
                 'to', 'from', 'cc', 'sender', 'reply-to'):
    header_field = theMail['headers'].get(header)
    if header_field:
        theMail['headers'][header] = parseaddr(header_field)[1]
  # Get attachments
  body_found = 0
  for part in msg.walk():
    content_type = part.get_content_type()
    file_name = part.get_filename()
    # multipart/* are just containers
    # XXX Check if data is None ?
    if content_type.startswith('multipart'):
      continue
    # message/rfc822 contains attached email message
    # next 'part' will be the message itself
    # so we ignore this one to avoid doubling
    elif content_type == 'message/rfc822':
      continue
79
    elif content_type in ("text/plain", "text/html"):
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
      charset = part.get_content_charset()
      payload = part.get_payload(decode=True)
      #LOG('CMFMailIn -> ',0,'charset: %s, payload: %s' % (charset,payload))
      if charset:
        payload = unicode(payload, charset).encode('utf-8')
      if body_found:
        # Keep the content type
        theMail['attachment_list'].append((file_name, 
                                           content_type, payload))
      else:
        theMail['body'] = payload
        body_found = 1
    else:
      payload = part.get_payload(decode=True)
      # Keep the content type
      theMail['attachment_list'].append((file_name, content_type, 
                                         payload))
  return theMail

class TestNotificationTool(ERP5TypeTestCase):
  """
  Test notification tool
  """

  def getBusinessTemplateList(self):
105
    return ('erp5_base',)
106 107 108 109

  def getTitle(self):
    return "Notification Tool"

110 111 112 113 114 115
  def createUser(self, name, role_list):
    user_folder = self.getPortal().acl_users
    user_folder._doAddUser(name, 'password', role_list, [])

  def changeUser(self, name):
    self.old_user = getSecurityManager().getUser()
116
    self.login(name)
117 118 119 120

  def changeToPreviousUser(self):
    newSecurityManager(None, self.old_user)

121
  def afterSetUp(self):
122 123
    self.createUser('erp5user', ['Auditor', 'Author'])
    self.createUser('manager', ['Manager'])
124 125 126 127
    portal = self.getPortal()
    if 'MailHost' in portal.objectIds():
      portal.manage_delObjects(['MailHost'])
    portal._setObject('MailHost', DummyMailHost('MailHost'))
128
    self.portal.MailHost.reset()
129 130 131
    portal.email_from_address = 'site@example.invalid'
    self.portal.portal_caches.clearAllCache()
    self.tic()
132
    self.login('erp5user')
133 134

  def beforeTearDown(self):
135
    self.abort()
136 137 138 139 140 141 142 143 144 145
    # clear modules if necessary
    self.portal.person_module.manage_delObjects(
            list(self.portal.person_module.objectIds()))
    self.tic()

  def stepAddUserA(self, sequence=None, sequence_list=None, **kw):
    """
    Create a user
    """
    person = self.portal.person_module.newContent(portal_type="Person",
146 147 148
                                                  default_email_text="userA@example.invalid")
    self.changeUser('manager')
    person.edit(reference="userA", password="passwordA")
149 150
    assignment = person.newContent(portal_type='Assignment')
    assignment.open()
151
    self.changeToPreviousUser()
152 153 154 155 156 157

  def stepAddUserB(self, sequence=None, sequence_list=None, **kw):
    """
    Create a user
    """
    person = self.portal.person_module.newContent(portal_type="Person",
158 159 160
                                                  default_email_text="userB@example.invalid")
    self.changeUser('manager')
    person.edit(reference="userB", password="passwordA")
161 162
    assignment = person.newContent(portal_type='Assignment')
    assignment.open()
163
    self.changeToPreviousUser()
164 165 166 167 168

  def stepAddUserWithoutEmail(self, sequence=None, sequence_list=None, **kw):
    """
    Create a user
    """
169 170 171 172
    person = self.portal.person_module.newContent(portal_type="Person")

    self.changeUser('manager')
    person.edit(reference="userWithoutEmail", password="passwordA")
173 174
    assignment = person.newContent(portal_type='Assignment')
    assignment.open()
175
    self.changeToPreviousUser()
176

177
  def test_01_defaultBehaviour(self):
178 179 180 181
    self.assertRaises(
      TypeError,
      self.portal.portal_notifications.sendMessage,
    )
182 183 184 185
    self.assertRaises(
      TypeError,
      self.portal.portal_notifications,
    )
186 187 188 189 190 191 192 193 194 195 196

  def stepCheckNotificationWithoutSender(self, sequence=None, 
                                         sequence_list=None, **kw):
    """
    Check that notification works without sender
    """
    self.portal.portal_notifications.sendMessage(
        recipient='userA', subject='Subject', message='Message')
    last_message = self.portal.MailHost._last_message
    self.assertNotEquals((), last_message)
    mfrom, mto, messageText = last_message
197 198
    self.assertEqual('Portal Administrator <site@example.invalid>', mfrom)
    self.assertEqual(['userA@example.invalid'], mto)
199

200
  def test_02_noSender(self):
201 202 203 204 205 206 207
    sequence_list = SequenceList()
    sequence_string = '\
        AddUserA \
        Tic \
        CheckNotificationWithoutSender \
        '
    sequence_list.addSequenceString(sequence_string)
208
    sequence_list.play(self)
209 210 211 212 213 214 215 216 217 218 219 220

  def stepCheckNotificationFailsWithoutSubject(self, sequence=None, 
                                               sequence_list=None, **kw):
    """
    Check that notification fails when no subject is given
    """
    self.assertRaises(
      TypeError,
      self.portal.portal_notifications.sendMessage,
        recipient='userA', message='Message'
    )

221
  def test_03_noSubject(self):
222 223 224 225 226 227 228
    sequence_list = SequenceList()
    sequence_string = '\
        AddUserA \
        Tic \
        CheckNotificationFailsWithoutSubject \
        '
    sequence_list.addSequenceString(sequence_string)
229
    sequence_list.play(self)
230

231
  def test_04_noRecipient(self):
232 233 234 235 236
    self.portal.portal_notifications.sendMessage(
        subject='Subject', message='Message')
    last_message = self.portal.MailHost._last_message
    self.assertNotEquals((), last_message)
    mfrom, mto, messageText = last_message
237 238
    self.assertEqual('Portal Administrator <site@example.invalid>', mfrom)
    self.assertEqual(['site@example.invalid'], mto)
239 240 241 242

  def stepCheckNotificationWithoutMessage(self, sequence=None, 
                                          sequence_list=None, **kw):
    """
Romain Courteaud's avatar
Romain Courteaud committed
243
    Check that notification is send when no message is passed
244 245 246 247 248 249
    """
    self.portal.portal_notifications.sendMessage(
        recipient='userA', subject='Subject', )
    last_message = self.portal.MailHost._last_message
    self.assertNotEquals((), last_message)
    mfrom, mto, messageText = last_message
250 251
    self.assertEqual('Portal Administrator <site@example.invalid>', mfrom)
    self.assertEqual(['userA@example.invalid'], mto)
252

253
  def test_05_noMessage(self):
254 255 256 257 258 259 260
    sequence_list = SequenceList()
    sequence_string = '\
        AddUserA \
        Tic \
        CheckNotificationWithoutMessage \
        '
    sequence_list.addSequenceString(sequence_string)
261
    sequence_list.play(self)
262 263 264 265

  def stepCheckSimpleNotification(self, sequence=None, 
                                  sequence_list=None, **kw):
    """
Romain Courteaud's avatar
Romain Courteaud committed
266
    Check that notification is send in standard use case
267 268 269 270 271 272
    """
    self.portal.portal_notifications.sendMessage(
        recipient='userA', subject='Subject', message='Message')
    last_message = self.portal.MailHost._last_message
    self.assertNotEquals((), last_message)
    mfrom, mto, messageText = last_message
273 274
    self.assertEqual('Portal Administrator <site@example.invalid>', mfrom)
    self.assertEqual(['userA@example.invalid'], mto)
275 276
    # Check Message
    mail_dict = decode_email(messageText)
277 278
    self.assertEqual(mail_dict['headers']['subject'], 'Subject')
    self.assertEqual(mail_dict['body'], 'Message')
279 280
    self.assertSameSet([], mail_dict['attachment_list'])

281
  def test_06_simpleMessage(self):
282 283 284 285 286 287 288
    sequence_list = SequenceList()
    sequence_string = '\
        AddUserA \
        Tic \
        CheckSimpleNotification \
        '
    sequence_list.addSequenceString(sequence_string)
289
    sequence_list.play(self)
290 291 292 293

  def stepCheckNotificationWithAttachment(self, sequence=None, 
                                          sequence_list=None, **kw):
    """
Romain Courteaud's avatar
Romain Courteaud committed
294
    Check attachment
295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313
    """
    self.portal.portal_notifications.sendMessage(
        recipient='userA', subject='Subject', message='Message',
        attachment_list=[
          {
            'name': 'Attachment 1',
            'content': 'Text 1',
            'mime_type': 'text/plain',
          },
          {
            'name': 'Attachment 2',
            'content': 'Text 2',
            'mime_type': 'application/octet-stream',
          },
        ])
    last_message = self.portal.MailHost._last_message

    self.assertNotEquals((), last_message)
    mfrom, mto, messageText = last_message
314 315
    self.assertEqual('Portal Administrator <site@example.invalid>', mfrom)
    self.assertEqual(['userA@example.invalid'], mto)
316 317 318

    # Check Message
    mail_dict = decode_email(messageText)
319 320
    self.assertEqual(mail_dict['headers']['subject'], 'Subject')
    self.assertEqual(mail_dict['body'], 'Message')
321 322 323 324
    self.assertSameSet([('Attachment 1', 'text/plain', 'Text 1'),
                        ('Attachment 2', 'application/octet-stream', 'Text 2')], 
                       mail_dict['attachment_list'])

325
  def test_07_AttachmentMessage(self):
326 327 328 329 330 331 332
    sequence_list = SequenceList()
    sequence_string = '\
        AddUserA \
        Tic \
        CheckNotificationWithAttachment \
        '
    sequence_list.addSequenceString(sequence_string)
333
    sequence_list.play(self)
334 335 336 337

  def stepCheckMultiRecipientNotification(self, sequence=None, 
                                          sequence_list=None, **kw):
    """
Romain Courteaud's avatar
Romain Courteaud committed
338
    Check that notification can be send to multiple recipient
339 340 341 342 343 344 345
    """
    self.portal.portal_notifications.sendMessage(
        recipient=['userA', 'userB'], subject='Subject', message='Message')
    last_message = self.portal.MailHost._last_message

    self.assertNotEquals((), last_message)
    mfrom, mto, messageText = last_message
346 347
    self.assertEqual('Portal Administrator <site@example.invalid>', mfrom)
    self.assertEqual(['userB@example.invalid'], mto)
348 349 350 351

    previous_message = self.portal.MailHost._previous_message
    self.assertNotEquals((), previous_message)
    mfrom, mto, messageText = previous_message
352 353
    self.assertEqual('Portal Administrator <site@example.invalid>', mfrom)
    self.assertEqual(['userA@example.invalid'], mto)
354

355
  def test_08_MultiRecipient(self):
356 357 358 359 360 361 362 363
    sequence_list = SequenceList()
    sequence_string = '\
        AddUserA \
        AddUserB \
        Tic \
        CheckMultiRecipientNotification \
        '
    sequence_list.addSequenceString(sequence_string)
364
    sequence_list.play(self)
365 366 367 368 369 370 371 372 373 374 375 376

  def stepCheckPersonWithoutEmail(self, sequence=None, 
                                  sequence_list=None, **kw):
    """
    Check that notification fails when the destination hasn't a email adress
    """
    self.assertRaises(
      AttributeError,
      self.portal.portal_notifications.sendMessage,
      recipient='userWithoutEmail', subject='Subject', message='Message'
    )

377
  def test_08_PersonWithoutEmail(self):
378 379 380 381 382 383 384
    sequence_list = SequenceList()
    sequence_string = '\
        AddUserWithoutEmail \
        Tic \
        CheckPersonWithoutEmail \
        '
    sequence_list.addSequenceString(sequence_string)
385 386 387
    sequence_list.play(self)

  def test_09_InvalideRecipient(self):
388 389 390 391 392 393
    self.assertRaises(
      IndexError,
      self.portal.portal_notifications.sendMessage,
      recipient='UnknowUser', subject='Subject', message='Message'
    )

394 395 396 397 398
  def stepCheckPersonNotification(self, sequence=None, 
                                  sequence_list=None, **kw):
    """
    Check that notification is send when recipient is a Person
    """
399
    person = self.portal.portal_catalog(reference='userA', portal_type='Person')[0]
400 401 402 403 404
    self.portal.portal_notifications.sendMessage(
        recipient=person.getObject(), subject='Subject', message='Message')
    last_message = self.portal.MailHost._last_message
    self.assertNotEquals((), last_message)
    mfrom, mto, messageText = last_message
405 406
    self.assertEqual('Portal Administrator <site@example.invalid>', mfrom)
    self.assertEqual(['userA@example.invalid'], mto)
407 408
    # Check Message
    mail_dict = decode_email(messageText)
409 410
    self.assertEqual(mail_dict['headers']['subject'], 'Subject')
    self.assertEqual(mail_dict['body'], 'Message')
411 412
    self.assertSameSet([], mail_dict['attachment_list'])

413
  def test_10_PersonNotification(self):
414 415 416 417 418 419 420
    sequence_list = SequenceList()
    sequence_string = '\
        AddUserA \
        Tic \
        CheckPersonNotification\
        '
    sequence_list.addSequenceString(sequence_string)
421
    sequence_list.play(self)
422

423 424 425 426 427 428 429 430
  def stepCheckNotificationPlainTextFormat(self, sequence=None, 
                                  sequence_list=None, **kw):
    """
    Check that if notification format is plain text.
    """

    message = """\
> Hello, will you go to the park on sunday?
Yusei Tahara's avatar
Yusei Tahara committed
431
Yes, I will go."""
432 433 434 435 436 437 438

    self.portal.portal_notifications.sendMessage(
        recipient='userA', subject='Subject',
        message_text_format='text/plain', message=message)
    last_message = self.portal.MailHost._last_message
    self.assertNotEquals((), last_message)
    mfrom, mto, messageText = last_message
439 440
    self.assertEqual('Portal Administrator <site@example.invalid>', mfrom)
    self.assertEqual(['userA@example.invalid'], mto)
441 442
    # Check Message
    mail_dict = decode_email(messageText)
443 444
    self.assertEqual(mail_dict['headers']['subject'], 'Subject')
    self.assertEqual(mail_dict['body'], message)
445 446
    self.assertSameSet([], mail_dict['attachment_list'])

447
  def test_11_TextMessage(self):
448 449 450 451 452 453 454
    sequence_list = SequenceList()
    sequence_string = '\
        AddUserA \
        Tic \
        CheckNotificationPlainTextFormat \
        '
    sequence_list.addSequenceString(sequence_string)
455
    sequence_list.play(self)
456 457 458 459 460 461 462

  def stepCheckNotificationHtmlFormat(self, sequence=None, 
                                  sequence_list=None, **kw):
    """
    Check that if notification format is html.
    """

463
    message = """<a href="http://www.erp5.com/">Click Here!!</a>"""
464 465 466 467
    
    self.portal.portal_notifications.sendMessage(
        recipient='userA', subject='Subject',
        message_text_format='text/html', message=message)
468
    last_message, = self.portal.MailHost._message_list
469
    mfrom, mto, messageText = last_message
470 471
    self.assertEqual('Portal Administrator <site@example.invalid>', mfrom)
    self.assertEqual(['userA@example.invalid'], mto)
472 473
    # Check Message
    mail_dict = decode_email(messageText)
474 475
    self.assertEqual(mail_dict['headers']['subject'], 'Subject')
    self.assertEqual(mail_dict['body'], '<html><body>%s</body></html>' % message)
476 477
    self.assertSameSet([], mail_dict['attachment_list'])

478
  def test_12_HtmlMessage(self):
479 480 481 482 483 484 485
    sequence_list = SequenceList()
    sequence_string = '\
        AddUserA \
        Tic \
        CheckNotificationHtmlFormat \
        '
    sequence_list.addSequenceString(sequence_string)
486
    sequence_list.play(self)
487

488 489 490 491

class TestNotificationToolWithCRM(TestNotificationTool):
  """Make sure that notification tool works with crm"""

492 493 494
  def getTitle(self):
    return "Notification Tool With CRM"

495 496 497
  def getBusinessTemplateList(self):
    return ('erp5_base', 'erp5_crm')

498 499 500 501 502 503 504 505 506 507 508 509
  def beforeTearDown(self):
    TestNotificationTool.beforeTearDown(self)
    self.portal.event_module.manage_delObjects(
            list(self.portal.event_module.objectIds()))
    self.tic()

  def test_store_as_event(self):
    # passing store_as_event=True to NotificationTool.sendMessage will store
    # the message in an event
    person = self.portal.person_module.newContent(
        portal_type="Person",
        default_email_text="userA@example.invalid")
510
    self.tic()
511 512 513 514 515 516
    self.portal.portal_notifications.sendMessage(
                                  store_as_event=True,
                                  recipient=person,
                                  subject='Subject',
                                  message='Message')
    self.tic()
517
    last_message, = self.portal.MailHost._message_list
518
    mfrom, mto, messageText = last_message
519
    mail_dict = decode_email(messageText)
520 521
    self.assertEqual('Portal Administrator <site@example.invalid>', mfrom)
    self.assertEqual(['userA@example.invalid'], mto)
522 523 524
    
    # check that an event has been created
    event_list = self.portal.event_module.contentValues()
525
    self.assertEqual(1, len(event_list))
526 527
    
    event = event_list[0]
528
    self.assertEqual(mail_dict['headers']['message-id'],
529
                      event.getSourceReference())
530 531 532
    self.assertEqual('Mail Message', event.getPortalTypeName())
    self.assertEqual('Subject', event.getTitle())
    self.assertEqual('Message', event.getTextContent())
533
    self.assertNotEquals(None, event.getStartDate())
534 535
    self.assertEqual(person, event.getDestinationValue())
    self.assertEqual('started', event.getSimulationState())
536

537

538 539 540
def test_suite():
  suite = unittest.TestSuite()
  suite.addTest(unittest.makeSuite(TestNotificationTool))
541
  suite.addTest(unittest.makeSuite(TestNotificationToolWithCRM))
542
  return suite