Commit 796e7780 authored by Jérome Perrin's avatar Jérome Perrin Committed by Cédric Le Ninivin

fixup! erp5_web_service: add disabled_public_key_algorithm property.

parent 4702354a
...@@ -60,6 +60,9 @@ class FTPConnector(XMLObject): ...@@ -60,6 +60,9 @@ class FTPConnector(XMLObject):
transport_kw={ transport_kw={
'private_key':self.getDescription(), 'private_key':self.getDescription(),
'bind_address': self.getBindAddress(), 'bind_address': self.getBindAddress(),
'disabled_algorithms': {
'pubkeys': self.getDisabledPublicKeyAlgorithmList() or []
},
}, },
) )
else: else:
......
...@@ -48,7 +48,7 @@ class SFTPConnection: ...@@ -48,7 +48,7 @@ class SFTPConnection:
""" """
def __init__(self, url, user_name, password=None, private_key=None, def __init__(self, url, user_name, password=None, private_key=None,
bind_address=None): bind_address=None, disabled_algorithms=None):
self.url = url self.url = url
self.user_name = user_name self.user_name = user_name
if password and private_key: if password and private_key:
...@@ -56,6 +56,7 @@ class SFTPConnection: ...@@ -56,6 +56,7 @@ class SFTPConnection:
self.password = password self.password = password
self.private_key = private_key self.private_key = private_key
self.bind_address = bind_address self.bind_address = bind_address
self.disabled_algorithms = disabled_algorithms
def connect(self): def connect(self):
""" Get a handle to a remote connection """ """ Get a handle to a remote connection """
...@@ -80,9 +81,7 @@ class SFTPConnection: ...@@ -80,9 +81,7 @@ class SFTPConnection:
break break
else: else:
raise SFTPError('No suitable socket family found') raise SFTPError('No suitable socket family found')
self.transport = Transport(sock, disabled_algorithms={ self.transport = Transport(sock, disabled_algorithms=self.disabled_algorithms)
'pubkeys': self.getDisabledPublicKeyAlgorithmList([]),
})
else: else:
raise SFTPError('Not a valid sftp url %s, type is %s' %(self.url, schema.scheme)) raise SFTPError('Not a valid sftp url %s, type is %s' %(self.url, schema.scheme))
# Add authentication to transport # Add authentication to transport
......
...@@ -26,14 +26,17 @@ ...@@ -26,14 +26,17 @@
############################################################################## ##############################################################################
import os import os
import socket
import unittest import unittest
import urlparse import urlparse
import mock
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
class TestSFTPConnection(ERP5TypeTestCase): if os.environ.get("testSFTPConnection_SFTP_URL"):
if os.environ.get("testSFTPConnection_SFTP_URL"): class TestSFTPConnection(ERP5TypeTestCase):
def afterSetUp(self): def afterSetUp(self):
url = os.environ["testSFTPConnection_SFTP_URL"] url = os.environ["testSFTPConnection_SFTP_URL"]
parsed_url = urlparse.urlparse(url) parsed_url = urlparse.urlparse(url)
...@@ -98,7 +101,25 @@ class TestSFTPConnection(ERP5TypeTestCase): ...@@ -98,7 +101,25 @@ class TestSFTPConnection(ERP5TypeTestCase):
self.connection.removeDirectory("foo") self.connection.removeDirectory("foo")
self.assertItemsEqual([], self.connection.listFiles(".")) self.assertItemsEqual([], self.connection.listFiles("."))
else: class TestSFTPConnectionDisabledPublicKeyAlgorithms(TestSFTPConnection):
def afterSetUp(self):
url = os.environ["testSFTPConnection_SFTP_URL"]
parsed_url = urlparse.urlparse(url)
self.connection = self.portal.portal_web_services.newContent(
portal_type='FTP Connector',
reference=self.id(),
user_id=parsed_url.username,
password=parsed_url.password,
url_string=url,
url_protocol='sftp',
use_temporary_file_on_write=False,
disabled_public_key_algorithm_list=[
'rsa-sha2-256',
'rsa-sha2-512',
]
)
else:
class TestSFTPConnection(ERP5TypeTestCase):
def test_no_SFTP_URL_in_environ(self): def test_no_SFTP_URL_in_environ(self):
raise unittest.SkipTest( raise unittest.SkipTest(
"""This test needs the environment variable testSFTPConnection_SFTP_URL set to the URL of a SFTP connection. """This test needs the environment variable testSFTPConnection_SFTP_URL set to the URL of a SFTP connection.
...@@ -106,4 +127,38 @@ class TestSFTPConnection(ERP5TypeTestCase): ...@@ -106,4 +127,38 @@ class TestSFTPConnection(ERP5TypeTestCase):
The URL must contain login and password, such as sftp://user:pass@[::1]:8022 The URL must contain login and password, such as sftp://user:pass@[::1]:8022
The directory from this URL must be empty and writeable. The directory from this URL must be empty and writeable.
""" """
) )
\ No newline at end of file
class TestSFTPConnectionMock(ERP5TypeTestCase):
def test_disabled_public_key_algorithm_list(self):
connection = self.portal.portal_web_services.newContent(
portal_type='FTP Connector',
reference=self.id(),
user_id='user',
password='pass',
url_string='sftp://sftp-example.erp5.net:21',
url_protocol='sftp',
use_temporary_file_on_write=False,
disabled_public_key_algorithm_list=[
'rsa-sha2-256',
'rsa-sha2-512',
]
)
with mock.patch('erp5.component.module.erp5_version.SFTPConnection.Transport') as Transport,\
mock.patch('erp5.component.module.erp5_version.SFTPConnection.SFTPClient') as SFTPClient,\
mock.patch(
'erp5.component.module.erp5_version.SFTPConnection.getaddrinfo',
return_value=(
(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('127.0.0.1', 21)),
)
),\
mock.patch(
'erp5.component.module.erp5_version.SFTPConnection.socket',
) as sock:
connection.listFiles(".")
sock().connect.assert_called_once_with(('sftp-example.erp5.net', 21))
Transport.assert_called_once_with(
sock(),
disabled_algorithms={'pubkeys': ['rsa-sha2-256', 'rsa-sha2-512']})
SFTPClient.from_transport.assert_called_once()
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment