|
Packit |
228f82 |
# -*- coding: UTF-8 -*-
|
|
Packit |
228f82 |
#
|
|
Packit |
228f82 |
# Python unit tests for Lasso library
|
|
Packit |
228f82 |
#
|
|
Packit |
228f82 |
# Copyright (C) 2004-2007 Entr'ouvert
|
|
Packit |
228f82 |
# http://lasso.entrouvert.org
|
|
Packit |
228f82 |
#
|
|
Packit |
228f82 |
# Authors: See AUTHORS file in top-level directory.
|
|
Packit |
228f82 |
#
|
|
Packit |
228f82 |
# This program is free software; you can redistribute it and/or modify
|
|
Packit |
228f82 |
# it under the terms of the GNU General Public License as published by
|
|
Packit |
228f82 |
# the Free Software Foundation; either version 2 of the License, or
|
|
Packit |
228f82 |
# (at your option) any later version.
|
|
Packit |
228f82 |
#
|
|
Packit |
228f82 |
# This program is distributed in the hope that it will be useful,
|
|
Packit |
228f82 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
Packit |
228f82 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
Packit |
228f82 |
# GNU General Public License for more details.
|
|
Packit |
228f82 |
#
|
|
Packit |
228f82 |
# You should have received a copy of the GNU General Public License
|
|
Packit |
228f82 |
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
import os
|
|
Packit |
228f82 |
import unittest
|
|
Packit |
228f82 |
import sys
|
|
Packit |
228f82 |
import logging
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
logging.basicConfig()
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
if not '..' in sys.path:
|
|
Packit |
228f82 |
sys.path.insert(0, '..')
|
|
Packit |
228f82 |
if not '../.libs' in sys.path:
|
|
Packit |
228f82 |
sys.path.insert(0, '../.libs')
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
import lasso
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
try:
|
|
Packit |
228f82 |
dataDir
|
|
Packit |
228f82 |
except NameError:
|
|
Packit |
228f82 |
dataDir = os.path.join(os.environ['TOP_SRCDIR'], 'tests', 'data')
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
wsp_metadata = os.path.join(dataDir, 'sp1-la/metadata.xml')
|
|
Packit |
228f82 |
wsp_private_key = os.path.join(dataDir, 'sp1-la/private-key-raw.pem')
|
|
Packit |
228f82 |
wsp_public_key = os.path.join(dataDir, 'sp1-la/public-key.pem')
|
|
Packit |
228f82 |
wsc_metadata = os.path.join(dataDir, 'sp2-la/metadata.xml')
|
|
Packit |
228f82 |
wsc_private_key = os.path.join(dataDir, 'sp2-la/private-key-raw.pem')
|
|
Packit |
228f82 |
wsc_public_key = os.path.join(dataDir, 'sp2-la/public-key.pem')
|
|
Packit |
228f82 |
idp_metadata = os.path.join(dataDir, 'idp1-la/metadata.xml')
|
|
Packit |
228f82 |
idp_private_key = os.path.join(dataDir, 'idp1-la/private-key-raw.pem')
|
|
Packit |
228f82 |
idp_public_key = os.path.join(dataDir, 'idp1-la/public-key.pem')
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
abstract_description = "Personal Profile Resource"
|
|
Packit |
228f82 |
resource_id = "http://idp/user/resources/1"
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
def __LINE__():
|
|
Packit |
228f82 |
try:
|
|
Packit |
228f82 |
raise Exception
|
|
Packit |
228f82 |
except:
|
|
Packit |
228f82 |
return sys.exc_info()[2].tb_frame.f_back.f_lineno
|
|
Packit |
228f82 |
lasso.registerDstService('pp10', lasso.PP10_HREF)
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
class IdWsf1TestCase(unittest.TestCase):
|
|
Packit |
228f82 |
def get_wsp_server(self):
|
|
Packit |
228f82 |
server = lasso.Server(wsp_metadata, wsp_private_key, None, None)
|
|
Packit |
228f82 |
server.addProvider(lasso.PROVIDER_ROLE_IDP, idp_metadata, idp_public_key, None)
|
|
Packit |
228f82 |
return server
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
def get_wsc_server(self):
|
|
Packit |
228f82 |
server = lasso.Server(wsc_metadata, wsc_private_key, None, None)
|
|
Packit |
228f82 |
server.addProvider(lasso.PROVIDER_ROLE_IDP, idp_metadata, idp_public_key, None)
|
|
Packit |
228f82 |
return server
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
def get_idp_server(self):
|
|
Packit |
228f82 |
server = lasso.Server(idp_metadata, idp_private_key, None, None)
|
|
Packit |
228f82 |
server.addProvider(lasso.PROVIDER_ROLE_SP, wsp_metadata, wsp_public_key, None)
|
|
Packit |
228f82 |
server.addProvider(lasso.PROVIDER_ROLE_SP, wsc_metadata, wsc_public_key, None)
|
|
Packit |
228f82 |
return server
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
def add_services(self, idp):
|
|
Packit |
228f82 |
# Add Discovery service
|
|
Packit |
228f82 |
disco_description = lasso.DiscoDescription.newWithBriefSoapHttpDescription(
|
|
Packit |
228f82 |
lasso.SECURITY_MECH_NULL,
|
|
Packit |
228f82 |
"http://idp/discovery/soapEndpoint",
|
|
Packit |
228f82 |
"Discovery SOAP Endpoint description");
|
|
Packit |
228f82 |
disco_service_instance = lasso.DiscoServiceInstance(
|
|
Packit |
228f82 |
lasso.DISCO_HREF,
|
|
Packit |
228f82 |
"http://idp/providerId",
|
|
Packit |
228f82 |
disco_description);
|
|
Packit |
228f82 |
idp.addService(disco_service_instance);
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
# Add Personal Profile service
|
|
Packit |
228f82 |
pp_description = lasso.DiscoDescription.newWithBriefSoapHttpDescription(
|
|
Packit |
228f82 |
lasso.SECURITY_MECH_NULL,
|
|
Packit |
228f82 |
"http://idp/pp/soapEndpoint",
|
|
Packit |
228f82 |
"Discovery SOAP Endpoint description");
|
|
Packit |
228f82 |
pp_service_instance = lasso.DiscoServiceInstance(
|
|
Packit |
228f82 |
lasso.PP10_HREF,
|
|
Packit |
228f82 |
"http://idp/providerId",
|
|
Packit |
228f82 |
pp_description);
|
|
Packit |
228f82 |
idp.addService(pp_service_instance);
|
|
Packit |
228f82 |
return idp
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
def login(self, sp, idp):
|
|
Packit |
228f82 |
sp_login = lasso.Login(sp)
|
|
Packit |
228f82 |
sp_login.initAuthnRequest(sp.providerIds[0], lasso.HTTP_METHOD_POST)
|
|
Packit |
228f82 |
sp_login.request.nameIdPolicy = lasso.LIB_NAMEID_POLICY_TYPE_FEDERATED
|
|
Packit |
228f82 |
sp_login.request.protocolProfile = lasso.LIB_PROTOCOL_PROFILE_BRWS_POST
|
|
Packit |
228f82 |
sp_login.buildAuthnRequestMsg()
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
idp_login = lasso.Login(idp)
|
|
Packit |
228f82 |
idp_login.processAuthnRequestMsg(sp_login.msgBody)
|
|
Packit |
228f82 |
idp_login.validateRequestMsg(True, True)
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
# Set a resource offering in the assertion
|
|
Packit |
228f82 |
discovery_resource_id = "http://idp/discovery/resources/1"
|
|
Packit |
228f82 |
idp_login.setResourceId(discovery_resource_id)
|
|
Packit |
228f82 |
idp_login.buildAssertion(lasso.SAML_AUTHENTICATION_METHOD_PASSWORD, None, None, None, None)
|
|
Packit |
228f82 |
idp_login.buildAuthnResponseMsg()
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
sp_login = lasso.Login(sp)
|
|
Packit |
228f82 |
sp_login.processAuthnResponseMsg(idp_login.msgBody)
|
|
Packit |
228f82 |
sp_login.acceptSso()
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
return sp_login.identity.dump(), sp_login.session.dump(), idp_login.identity.dump(), idp_login.session.dump()
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
def get_resource_offering(self, soap_end_point='http://idp/pp/soapEndpoint'):
|
|
Packit |
228f82 |
service_instance = lasso.DiscoServiceInstance(
|
|
Packit |
228f82 |
lasso.PP10_HREF,
|
|
Packit |
228f82 |
self.idp.providerId,
|
|
Packit |
228f82 |
lasso.DiscoDescription_newWithBriefSoapHttpDescription(
|
|
Packit |
228f82 |
lasso.SECURITY_MECH_NULL,
|
|
Packit |
228f82 |
soap_end_point))
|
|
Packit |
228f82 |
resource_offering = lasso.DiscoResourceOffering(service_instance)
|
|
Packit |
228f82 |
resource_offering.resourceId = lasso.DiscoResourceID(resource_id)
|
|
Packit |
228f82 |
resource_offering.abstract = abstract_description
|
|
Packit |
228f82 |
return resource_offering
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
def get_pp_service(self):
|
|
Packit |
228f82 |
self.wsc = self.get_wsc_server()
|
|
Packit |
228f82 |
self.idp = self.get_idp_server()
|
|
Packit |
228f82 |
self.idp = self.add_services(self.idp)
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
# Login from WSC
|
|
Packit |
228f82 |
sp_identity_dump, sp_session_dump, idp_identity_dump, idp_session_dump = self.login(self.wsc, self.idp)
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
# Init discovery query
|
|
Packit |
228f82 |
wsc_disco = lasso.Discovery(self.wsc)
|
|
Packit |
228f82 |
wsc_disco.setSessionFromDump(sp_session_dump)
|
|
Packit |
228f82 |
wsc_disco.initQuery()
|
|
Packit |
228f82 |
wsc_disco.addRequestedServiceType(lasso.PP10_HREF)
|
|
Packit |
228f82 |
wsc_disco.buildRequestMsg()
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
# Process query
|
|
Packit |
228f82 |
idp_disco = lasso.Discovery(self.idp)
|
|
Packit |
228f82 |
idp_disco.processRequestMsg(wsc_disco.msgBody)
|
|
Packit |
228f82 |
idp_disco.setIdentityFromDump(idp_identity_dump)
|
|
Packit |
228f82 |
idp_disco.getIdentity().addResourceOffering(self.get_resource_offering())
|
|
Packit |
228f82 |
idp_disco.buildResponseMsg()
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
# Process response
|
|
Packit |
228f82 |
wsc_disco.processQueryResponseMsg(idp_disco.msgBody);
|
|
Packit |
228f82 |
return wsc_disco.getService()
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
class DiscoveryQueryTestCase(IdWsf1TestCase):
|
|
Packit |
228f82 |
def test01(self):
|
|
Packit |
228f82 |
'''Test a discovery query'''
|
|
Packit |
228f82 |
service = self.get_pp_service()
|
|
Packit |
228f82 |
# Check service attributes
|
|
Packit |
228f82 |
resource_offering = service.getResourceOffering()
|
|
Packit |
228f82 |
self.failUnless(resource_offering is not None)
|
|
Packit |
228f82 |
self.failUnless(resource_offering.resourceId is not None)
|
|
Packit |
228f82 |
self.failUnless(resource_offering.resourceId.content == resource_id)
|
|
Packit |
228f82 |
self.failUnless(resource_offering.serviceInstance.providerId == self.wsc.providerIds[0])
|
|
Packit |
228f82 |
self.failUnless(resource_offering.abstract == abstract_description)
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
class DiscoveryModifyTestCase(IdWsf1TestCase):
|
|
Packit |
228f82 |
def test01(self):
|
|
Packit |
228f82 |
'''Test a discovery modify'''
|
|
Packit |
228f82 |
self.wsp = self.get_wsp_server()
|
|
Packit |
228f82 |
self.idp = self.get_idp_server()
|
|
Packit |
228f82 |
self.idp = self.add_services(self.idp)
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
# Login from WSP
|
|
Packit |
228f82 |
sp_identity_dump, sp_session_dump, idp_identity_dump, idp_session_dump = self.login(self.wsp, self.idp)
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
# Init discovery modify
|
|
Packit |
228f82 |
wsp_disco = lasso.Discovery(self.wsp)
|
|
Packit |
228f82 |
wsp_disco.setIdentityFromDump(sp_identity_dump)
|
|
Packit |
228f82 |
wsp_disco.setSessionFromDump(sp_session_dump)
|
|
Packit |
228f82 |
resource_offering = self.get_resource_offering()
|
|
Packit |
228f82 |
wsp_disco.initModify()
|
|
Packit |
228f82 |
wsp_disco.addInsertEntry(resource_offering.serviceInstance, resource_offering.resourceId)
|
|
Packit |
228f82 |
wsp_disco.buildRequestMsg()
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
# Process Modify
|
|
Packit |
228f82 |
request_type = lasso.getRequestTypeFromSoapMsg(wsp_disco.msgBody)
|
|
Packit |
228f82 |
self.failUnless(request_type == lasso.REQUEST_TYPE_DISCO_MODIFY)
|
|
Packit |
228f82 |
idp_disco = lasso.Discovery(self.idp)
|
|
Packit |
228f82 |
idp_disco.processRequestMsg(wsp_disco.msgBody)
|
|
Packit |
228f82 |
idp_disco.setIdentityFromDump(idp_identity_dump)
|
|
Packit |
228f82 |
idp_disco.buildResponseMsg()
|
|
Packit |
228f82 |
offerings = idp_disco.identity.getOfferings()
|
|
Packit |
228f82 |
self.failUnless('<disco:Status code="OK"/>' in idp_disco.msgBody)
|
|
Packit |
228f82 |
self.failUnless('
|
|
Packit |
228f82 |
self.failUnless('<disco:ServiceType>urn:liberty:id-sis-pp:2003-08</disco:ServiceType>' in
|
|
Packit |
228f82 |
idp_disco.identity.dump())
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
# Process Response
|
|
Packit |
228f82 |
wsp_disco.processModifyResponseMsg(idp_disco.msgBody)
|
|
Packit |
228f82 |
self.failUnless(wsp_disco.response.newEntryIds == '0')
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
class DiscoveryRemoveTestCase(IdWsf1TestCase):
|
|
Packit |
228f82 |
def test01(self):
|
|
Packit |
228f82 |
'''Test a discovery remove'''
|
|
Packit |
228f82 |
self.wsp = self.get_wsp_server()
|
|
Packit |
228f82 |
self.idp = self.get_idp_server()
|
|
Packit |
228f82 |
self.idp = self.add_services(self.idp)
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
# Login from WSP
|
|
Packit |
228f82 |
sp_identity_dump, sp_session_dump, idp_identity_dump, idp_session_dump = self.login(self.wsp, self.idp)
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
# Init discovery modify
|
|
Packit |
228f82 |
wsp_disco = lasso.Discovery(self.wsp)
|
|
Packit |
228f82 |
wsp_disco.setIdentityFromDump(sp_identity_dump)
|
|
Packit |
228f82 |
wsp_disco.setSessionFromDump(sp_session_dump)
|
|
Packit |
228f82 |
wsp_disco.initModify()
|
|
Packit |
228f82 |
wsp_disco.addRemoveEntry('0')
|
|
Packit |
228f82 |
wsp_disco.buildRequestMsg()
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
# Process Modify
|
|
Packit |
228f82 |
request_type = lasso.getRequestTypeFromSoapMsg(wsp_disco.msgBody)
|
|
Packit |
228f82 |
self.failUnless(request_type == lasso.REQUEST_TYPE_DISCO_MODIFY)
|
|
Packit |
228f82 |
idp_disco = lasso.Discovery(self.idp)
|
|
Packit |
228f82 |
idp_disco.processRequestMsg(wsp_disco.msgBody)
|
|
Packit |
228f82 |
idp_disco.setIdentityFromDump(idp_identity_dump)
|
|
Packit |
228f82 |
offering = self.get_resource_offering()
|
|
Packit |
228f82 |
idp_disco.getIdentity().addResourceOffering(offering)
|
|
Packit |
228f82 |
self.failUnless('<disco:ServiceType>urn:liberty:id-sis-pp:2003-08</disco:ServiceType>' in
|
|
Packit |
228f82 |
idp_disco.identity.dump())
|
|
Packit |
228f82 |
idp_disco.buildResponseMsg()
|
|
Packit |
228f82 |
self.failUnless('<disco:Status code="OK"/>' in idp_disco.msgBody)
|
|
Packit |
228f82 |
self.failIf('<disco:ServiceType>urn:liberty:id-sis-pp:2003-08</disco:ServiceType>' in
|
|
Packit |
228f82 |
idp_disco.identity.dump())
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
# Process Response
|
|
Packit |
228f82 |
wsp_disco.processModifyResponseMsg(idp_disco.msgBody)
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
class DataServiceQueryTestCase(IdWsf1TestCase):
|
|
Packit |
228f82 |
def test01(self):
|
|
Packit |
228f82 |
'''Test a data service query'''
|
|
Packit |
228f82 |
wsc_service = self.get_pp_service()
|
|
Packit |
228f82 |
wsc_service.initQuery('/pp10:PP/pp10:InformalName', 'name')
|
|
Packit |
228f82 |
wsc_service.buildSoapRequestMsg()
|
|
Packit |
228f82 |
self.failUnless(lasso.getRequestTypeFromSoapMsg(wsc_service.msgBody)
|
|
Packit |
228f82 |
== lasso.REQUEST_TYPE_DST_QUERY)
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
self.wsp = self.get_wsp_server()
|
|
Packit |
228f82 |
wsp_service = lasso.DataService(self.wsp)
|
|
Packit |
228f82 |
wsp_service.processRequestMsg(wsc_service.msgBody)
|
|
Packit |
228f82 |
self.failUnless(isinstance(wsp_service.request, lasso.DstQuery))
|
|
Packit |
228f82 |
wsp_service.resourceData = '''
|
|
Packit |
228f82 |
<PP xmlns="urn:liberty:id-sis-pp:2003-08">
|
|
Packit |
228f82 |
<InformalName>Damien</InformalName>
|
|
Packit |
228f82 |
</PP>'''
|
|
Packit |
228f82 |
wsp_service.validateRequest()
|
|
Packit |
228f82 |
wsp_service.buildResponseMsg()
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
wsc_service.processQueryResponseMsg(wsp_service.msgBody)
|
|
Packit |
228f82 |
self.failUnless(wsc_service.getAnswer() ==
|
|
Packit |
228f82 |
'<InformalName xmlns="urn:liberty:id-sis-pp:2003-08">Damien</InformalName>')
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
class DataServiceModifyTestCase(IdWsf1TestCase):
|
|
Packit |
228f82 |
def test01(self):
|
|
Packit |
228f82 |
'''Test a data service modify'''
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
xpath = '/pp10:PP/pp10:InformalName'
|
|
Packit |
228f82 |
old_data = '''
|
|
Packit |
228f82 |
<PP xmlns="urn:liberty:id-sis-pp:2003-08">
|
|
Packit |
228f82 |
<InformalName>Damien</InformalName>
|
|
Packit |
228f82 |
</PP>'''
|
|
Packit |
228f82 |
new_data = '<InformalName>Alain</InformalName>'
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
new_full_data = '''<PP xmlns="urn:liberty:id-sis-pp:2003-08">
|
|
Packit |
228f82 |
<pp10:InformalName xmlns:pp10="urn:liberty:id-sis-pp:2003-08">Alain</pp10:InformalName>
|
|
Packit |
228f82 |
</PP>'''
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
wsc_service = self.get_pp_service()
|
|
Packit |
228f82 |
wsc_service.initModify()
|
|
Packit |
228f82 |
wsc_service.addModification(xpath, new_data, overrideAllowed = True)
|
|
Packit |
228f82 |
wsc_service.buildRequestMsg()
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
request_type = lasso.getRequestTypeFromSoapMsg(wsc_service.msgBody)
|
|
Packit |
228f82 |
self.failUnless(request_type == lasso.REQUEST_TYPE_DST_MODIFY)
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
self.wsp = self.get_wsp_server()
|
|
Packit |
228f82 |
wsp_service = lasso.DataService(self.wsp)
|
|
Packit |
228f82 |
wsp_service.processRequestMsg(wsc_service.msgBody)
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
item = wsp_service.request.modification[0]
|
|
Packit |
228f82 |
self.failUnless(item.newData.any[0] ==
|
|
Packit |
228f82 |
'<pp10:InformalName xmlns:pp10="urn:liberty:id-sis-pp:2003-08">Alain</pp10:InformalName>')
|
|
Packit |
228f82 |
self.failUnless(item.select == '/pp10:PP/pp10:InformalName')
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
wsp_service.resourceData = old_data
|
|
Packit |
228f82 |
wsp_service.validateRequest()
|
|
Packit |
228f82 |
wsp_service.buildModifyResponseMsg()
|
|
Packit |
228f82 |
# Save the new wsp_service.resourceData here
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
self.failUnless(wsp_service.resourceData == new_full_data)
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
wsc_service.processModifyResponseMsg(wsp_service.msgBody)
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
def test02(self):
|
|
Packit |
228f82 |
'''Test a data service modify - root element'''
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
xpath = '/pp10:PP'
|
|
Packit |
228f82 |
old_data = '''
|
|
Packit |
228f82 |
<PP xmlns="urn:liberty:id-sis-pp:2003-08">
|
|
Packit |
228f82 |
<InformalName>Damien</InformalName>
|
|
Packit |
228f82 |
</PP>'''
|
|
Packit |
228f82 |
new_data = '''
|
|
Packit |
228f82 |
<PP xmlns="urn:liberty:id-sis-pp:2003-08">
|
|
Packit |
228f82 |
<InformalName>Alain</InformalName>
|
|
Packit |
228f82 |
</PP>'''
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
new_full_data = '''<PP xmlns="urn:liberty:id-sis-pp:2003-08">
|
|
Packit |
228f82 |
<InformalName>Alain</InformalName>
|
|
Packit |
228f82 |
</PP>'''
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
wsc_service = self.get_pp_service()
|
|
Packit |
228f82 |
wsc_service.initModify()
|
|
Packit |
228f82 |
wsc_service.addModification(xpath, new_data, overrideAllowed = True)
|
|
Packit |
228f82 |
wsc_service.buildRequestMsg()
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
request_type = lasso.getRequestTypeFromSoapMsg(wsc_service.msgBody)
|
|
Packit |
228f82 |
self.failUnless(request_type == lasso.REQUEST_TYPE_DST_MODIFY)
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
self.wsp = self.get_wsp_server()
|
|
Packit |
228f82 |
wsp_service = lasso.DataService(self.wsp)
|
|
Packit |
228f82 |
wsp_service.processRequestMsg(wsc_service.msgBody)
|
|
Packit |
228f82 |
wsp_service.resourceData = old_data
|
|
Packit |
228f82 |
wsp_service.validateRequest()
|
|
Packit |
228f82 |
wsp_service.buildModifyResponseMsg()
|
|
Packit |
228f82 |
# Save the new wsp_service.resourceData here
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
self.failUnless(wsp_service.resourceData == new_full_data)
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
wsc_service.processModifyResponseMsg(wsp_service.msgBody)
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
def test03(self):
|
|
Packit |
228f82 |
'''Test a data service modify with redirect for consent'''
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
xpath = '/pp:PP/pp:InformalName'
|
|
Packit |
228f82 |
old_data = '''<PP xmlns="urn:liberty:id-sis-pp:2003-08">
|
|
Packit |
228f82 |
<InformalName>Damien</InformalName>
|
|
Packit |
228f82 |
</PP>'''
|
|
Packit |
228f82 |
new_data = '<InformalName>Alain</InformalName>'
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
new_full_data = '''<PP xmlns="urn:liberty:id-sis-pp:2003-08">
|
|
Packit |
228f82 |
<pp:InformalName xmlns:pp="urn:liberty:id-sis-pp:2003-08">Alain</pp:InformalName>
|
|
Packit |
228f82 |
</PP>'''
|
|
Packit |
228f82 |
redir_url = 'http://site/redirect_for_consent'
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
wsc_service = self.get_pp_service()
|
|
Packit |
228f82 |
wsc_service.initModify()
|
|
Packit |
228f82 |
wsc_service.addModification(xpath, new_data, overrideAllowed = True)
|
|
Packit |
228f82 |
wsc_service.buildRequestMsg()
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
request_type = lasso.getRequestTypeFromSoapMsg(wsc_service.msgBody)
|
|
Packit |
228f82 |
self.failUnless(request_type == lasso.REQUEST_TYPE_DST_MODIFY)
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
self.wsp = self.get_wsp_server()
|
|
Packit |
228f82 |
wsp_service = lasso.DataService(self.wsp)
|
|
Packit |
228f82 |
wsp_service.processRequestMsg(wsc_service.msgBody)
|
|
Packit |
228f82 |
wsp_service.resourceData = old_data
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
wsp_service.initInteractionServiceRedirect(redir_url)
|
|
Packit |
228f82 |
wsp_service.buildModifyResponseMsg()
|
|
Packit |
228f82 |
# Save the new wsp_service.resourceData here
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
# Data mustn't have been modified here
|
|
Packit |
228f82 |
self.failUnless(wsp_service.resourceData == old_data)
|
|
Packit |
228f82 |
self.failUnless(wsp_service.msgBody is not None)
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
try:
|
|
Packit |
228f82 |
wsc_service.processModifyResponseMsg(wsp_service.msgBody)
|
|
Packit |
228f82 |
except lasso.SoapRedirectRequestFaultError:
|
|
Packit |
228f82 |
pass
|
|
Packit |
228f82 |
except Exception, e:
|
|
Packit |
228f82 |
self.fail(e)
|
|
Packit |
228f82 |
self.failUnless(wsc_service.msgUrl == redir_url)
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
discoveryQuerySuite = unittest.makeSuite(DiscoveryQueryTestCase, 'test')
|
|
Packit |
228f82 |
discoveryModifySuite = unittest.makeSuite(DiscoveryModifyTestCase, 'test')
|
|
Packit |
228f82 |
discoveryRemoveSuite = unittest.makeSuite(DiscoveryRemoveTestCase, 'test')
|
|
Packit |
228f82 |
dataServiceQuerySuite = unittest.makeSuite(DataServiceQueryTestCase, 'test')
|
|
Packit |
228f82 |
dataServiceModifySuite = unittest.makeSuite(DataServiceModifyTestCase, 'test')
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
allTests = unittest.TestSuite((discoveryQuerySuite, discoveryModifySuite, discoveryRemoveSuite,
|
|
Packit |
228f82 |
dataServiceQuerySuite, dataServiceModifySuite))
|
|
Packit |
228f82 |
|
|
Packit |
228f82 |
if __name__ == '__main__':
|
|
Packit |
228f82 |
sys.exit(not unittest.TextTestRunner(verbosity = 2).run(allTests).wasSuccessful())
|
|
Packit |
228f82 |
|