From 2eca9d0a113bf1bf2d02f1b76516af95a266f7a9 Mon Sep 17 00:00:00 2001
From: Fabien Morin <fabien@nexedi.com>
Date: Tue, 21 Oct 2008 22:49:03 +0000
Subject: [PATCH] update testEgovMixin to add some other generic methods

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@24278 20353a03-c40f-0410-a6d1-a30d3c3de9de
---
 .../TestTemplateItem/testEGovMixin.py         | 117 +++++++++++++++++-
 bt5/erp5_egov/bt/change_log                   |   1 +
 bt5/erp5_egov/bt/revision                     |   2 +-
 3 files changed, 114 insertions(+), 6 deletions(-)

diff --git a/bt5/erp5_egov/TestTemplateItem/testEGovMixin.py b/bt5/erp5_egov/TestTemplateItem/testEGovMixin.py
index 7467ee6bda..8896377eb6 100644
--- a/bt5/erp5_egov/TestTemplateItem/testEGovMixin.py
+++ b/bt5/erp5_egov/TestTemplateItem/testEGovMixin.py
@@ -34,7 +34,11 @@ from AccessControl import Unauthorized
 from Testing import ZopeTestCase
 from Products.ERP5Type.tests.Sequence import Step, Sequence, SequenceList
 from zLOG import LOG
+import transaction
 import random
+import email
+from email.Header import decode_header, make_header
+from email.Utils import parseaddr
 
 class TestEGovMixin(SecurityTestCase):
   """Usefull methods for eGov Unit Tests."""
@@ -76,7 +80,7 @@ class TestEGovMixin(SecurityTestCase):
       for idx, step in enumerate(self._step_list):
         step.play(context, sequence=self, quiet=quiet)
         # commit transaction after each step
-        get_transaction().commit()
+        transaction.commit()
   Sequence.play = play
 
   def play(self, context, sequence=None, quiet=0):
@@ -135,7 +139,7 @@ class TestEGovMixin(SecurityTestCase):
     self.portal.__class__.DeclarationTVA_zGetSIGTASInformation \
         = lambda x,**kw: []
 
-    get_transaction().commit()
+    transaction.commit()
     self.tic()
     
   def beforeTearDown(self):
@@ -144,7 +148,7 @@ class TestEGovMixin(SecurityTestCase):
       # we want to keep some IDs
       module.manage_delObjects([x for x in module.objectIds()
                                 if x not in ('EUR',)])
-    get_transaction().commit()
+    transaction.commit()
     self.tic()
 
   def getUserFolder(self) :
@@ -196,7 +200,7 @@ class TestEGovMixin(SecurityTestCase):
           'group/dgid/di/csf/bf')
 
       # make this available to catalog
-      get_transaction().commit()
+      transaction.commit()
       self.tic()
 
   def createOneOrganisation(self, username, role=None, function=None, 
@@ -228,7 +232,7 @@ class TestEGovMixin(SecurityTestCase):
           role='entreprise/siege')
 
       # make this available to catalog
-      get_transaction().commit()
+      transaction.commit()
       self.tic()
 
   def checkRights(self, object_list, security_mapping, username):
@@ -258,3 +262,106 @@ class TestEGovMixin(SecurityTestCase):
                                                      object)
       for transition in not_possible_transition_list:
         self.failIfUserCanPassWorkflowTransition(username, transition, object)
+
+  # Copied from ERP5Type/patches/CMFMailIn.py
+  def decode_email(self, file):
+    # Prepare result
+    theMail = {
+      'attachment_list': [],
+      'body': '',
+      # Place all the email header in the headers dictionary in theMail
+      'headers': {}
+    }
+    # Get Message
+    msg = email.message_from_string(file)
+    # Back up original file
+    theMail['__original__'] = file
+    # Recode headers to UTF-8 if needed
+    for key, value in msg.items():
+      decoded_value_list = decode_header(value)
+      unicode_value = make_header(decoded_value_list)
+      new_value = unicode_value.__unicode__().encode('utf-8')
+      theMail['headers'][key.lower()] = new_value
+    # Filter mail addresses
+    for header in ('resent-to', 'resent-from', 'resent-cc', 'resent-sender',
+                   'to', 'from', 'cc', 'sender', 'reply-to'):
+      header_field = theMail['headers'].get(header)
+      if header_field:
+          theMail['headers'][header] = parseaddr(header_field)[1]
+    # Get attachments
+    body_found = 0
+    for part in msg.walk():
+      content_type = part.get_content_type()
+      file_name = part.get_filename()
+      # multipart/* are just containers
+      # XXX Check if data is None ?
+      if content_type.startswith('multipart'):
+        continue
+      # message/rfc822 contains attached email message
+      # next 'part' will be the message itself
+      # so we ignore this one to avoid doubling
+      elif content_type == 'message/rfc822':
+        continue
+      elif content_type in ("text/plain", "text/html"):
+        charset = part.get_content_charset()
+        payload = part.get_payload(decode=True)
+        #LOG('CMFMailIn -> ',0,'charset: %s, payload: %s' % (charset,payload))
+        if charset:
+          payload = unicode(payload, charset).encode('utf-8')
+        if body_found:
+          # Keep the content type
+          theMail['attachment_list'].append((file_name,
+                                             content_type, payload))
+        else:
+          theMail['body'] = payload
+          body_found = 1
+      else:
+        payload = part.get_payload(decode=True)
+        # Keep the content type
+        theMail['attachment_list'].append((file_name, content_type,
+                                           payload))
+    return theMail
+
+  def _assertUserExists(self, login, password):
+    """Checks that a user with login and password exists and can log in to the
+    system.
+    """
+    from Products.PluggableAuthService.interfaces.plugins import\
+                                                      IAuthenticationPlugin
+    uf = self.getUserFolder()
+    self.assertNotEquals(uf.getUserById(login, None), None)
+    for plugin_name, plugin in uf._getOb('plugins').listPlugins(
+                                IAuthenticationPlugin ):
+      if plugin.authenticateCredentials(
+                  {'login':login, 'password':password}) is not None:
+        break
+    else:
+      self.fail("No plugin could authenticate '%s' with password '%s'" %
+              (login, password))
+
+  def checkWorklist(self, portal_type, count, validation_state, login):
+    '''
+      check that there is 'count' item in the worklist for 'portal_type' and
+      'validation_state' logged with 'login'
+    '''
+
+    # save previous user
+    previous_user = str(getSecurityManager().getUser())
+    self.loginAsUser(login)
+
+    worklist_dict = self.portal.getPortalTypeWorklistDictForWorkflow(\
+        self.portal,
+        workflow_list=['egov_universal_workflow', 'egov_anonymous_workflow'])
+    self.assertNotEquals(worklist_dict, {})
+    self.assertEquals(worklist_dict.has_key(portal_type), True)
+    portal_type_dict = worklist_dict[portal_type]
+    self.assertEquals(portal_type_dict.has_key(validation_state), True)
+    self.assertEquals(portal_type_dict[validation_state]['count'], count)
+
+    # relog with previous user
+    if previous_user in ('Anonymous User', 'ERP5TypeTestCase'):
+      self.logout()
+    else:
+      self.loginAsUser(previous_user)
+ 
+
diff --git a/bt5/erp5_egov/bt/change_log b/bt5/erp5_egov/bt/change_log
index 631758c8c9..e98e263c2e 100644
--- a/bt5/erp5_egov/bt/change_log
+++ b/bt5/erp5_egov/bt/change_log
@@ -1,5 +1,6 @@
 2008-10-21 fabien
 * fix some mistakes in sendCrendentialsByEMail workflow script and remove sender because it's not required
+* update testEgovMixin to add some other generic methods
 
 2008-10-20 fabien
 * update ERP5Site_getSecurityFromLatestWorkflowHistory  to remove workflow hardcoded value
diff --git a/bt5/erp5_egov/bt/revision b/bt5/erp5_egov/bt/revision
index 82a1667ba9..b872400059 100644
--- a/bt5/erp5_egov/bt/revision
+++ b/bt5/erp5_egov/bt/revision
@@ -1 +1 @@
-426
\ No newline at end of file
+427
\ No newline at end of file
-- 
2.30.9