@@ -218,7 +218,7 @@ def testing_context(server_cert=SIGNED_CERTFILE):
218218
219219 server_context = ssl .SSLContext (ssl .PROTOCOL_TLS_SERVER )
220220 server_context .load_cert_chain (server_cert )
221- client_context .load_verify_locations (SIGNING_CA )
221+ server_context .load_verify_locations (SIGNING_CA )
222222
223223 return client_context , server_context , hostname
224224
@@ -2262,6 +2262,23 @@ def run(self):
22622262 sys .stdout .write (" server: read CB tls-unique from client, sending our CB data...\n " )
22632263 data = self .sslconn .get_channel_binding ("tls-unique" )
22642264 self .write (repr (data ).encode ("us-ascii" ) + b"\n " )
2265+ elif stripped == b'PHA' :
2266+ if support .verbose and self .server .connectionchatty :
2267+ sys .stdout .write (" server: initiating post handshake auth\n " )
2268+ try :
2269+ self .sslconn .verify_client_post_handshake ()
2270+ except ssl .SSLError as e :
2271+ self .write (repr (e ).encode ("us-ascii" ) + b"\n " )
2272+ else :
2273+ self .write (b"OK\n " )
2274+ elif stripped == b'HASCERT' :
2275+ if self .sslconn .getpeercert () is not None :
2276+ self .write (b'TRUE\n ' )
2277+ else :
2278+ self .write (b'FALSE\n ' )
2279+ elif stripped == b'GETCERT' :
2280+ cert = self .sslconn .getpeercert ()
2281+ self .write (repr (cert ).encode ("us-ascii" ) + b"\n " )
22652282 else :
22662283 if (support .verbose and
22672284 self .server .connectionchatty ):
@@ -4148,6 +4165,179 @@ def test_session_handling(self):
41484165 'Session refers to a different SSLContext.' )
41494166
41504167
4168+ @unittest .skipUnless (ssl .HAS_TLSv1_3 , "Test needs TLS 1.3" )
4169+ class TestPostHandshakeAuth (unittest .TestCase ):
4170+ def test_pha_setter (self ):
4171+ protocols = [
4172+ ssl .PROTOCOL_TLS , ssl .PROTOCOL_TLS_SERVER , ssl .PROTOCOL_TLS_CLIENT
4173+ ]
4174+ for protocol in protocols :
4175+ ctx = ssl .SSLContext (protocol )
4176+ self .assertEqual (ctx .post_handshake_auth , False )
4177+
4178+ ctx .post_handshake_auth = True
4179+ self .assertEqual (ctx .post_handshake_auth , True )
4180+
4181+ ctx .verify_mode = ssl .CERT_REQUIRED
4182+ self .assertEqual (ctx .verify_mode , ssl .CERT_REQUIRED )
4183+ self .assertEqual (ctx .post_handshake_auth , True )
4184+
4185+ ctx .post_handshake_auth = False
4186+ self .assertEqual (ctx .verify_mode , ssl .CERT_REQUIRED )
4187+ self .assertEqual (ctx .post_handshake_auth , False )
4188+
4189+ ctx .verify_mode = ssl .CERT_OPTIONAL
4190+ ctx .post_handshake_auth = True
4191+ self .assertEqual (ctx .verify_mode , ssl .CERT_OPTIONAL )
4192+ self .assertEqual (ctx .post_handshake_auth , True )
4193+
4194+ def test_pha_required (self ):
4195+ client_context , server_context , hostname = testing_context ()
4196+ server_context .post_handshake_auth = True
4197+ server_context .verify_mode = ssl .CERT_REQUIRED
4198+ client_context .post_handshake_auth = True
4199+ client_context .load_cert_chain (SIGNED_CERTFILE )
4200+
4201+ server = ThreadedEchoServer (context = server_context , chatty = False )
4202+ with server :
4203+ with client_context .wrap_socket (socket .socket (),
4204+ server_hostname = hostname ) as s :
4205+ s .connect ((HOST , server .port ))
4206+ s .write (b'HASCERT' )
4207+ self .assertEqual (s .recv (1024 ), b'FALSE\n ' )
4208+ s .write (b'PHA' )
4209+ self .assertEqual (s .recv (1024 ), b'OK\n ' )
4210+ s .write (b'HASCERT' )
4211+ self .assertEqual (s .recv (1024 ), b'TRUE\n ' )
4212+ # PHA method just returns true when cert is already available
4213+ s .write (b'PHA' )
4214+ self .assertEqual (s .recv (1024 ), b'OK\n ' )
4215+ s .write (b'GETCERT' )
4216+ cert_text = s .recv (4096 ).decode ('us-ascii' )
4217+ self .assertIn ('Python Software Foundation CA' , cert_text )
4218+
4219+ def test_pha_required_nocert (self ):
4220+ client_context , server_context , hostname = testing_context ()
4221+ server_context .post_handshake_auth = True
4222+ server_context .verify_mode = ssl .CERT_REQUIRED
4223+ client_context .post_handshake_auth = True
4224+
4225+ server = ThreadedEchoServer (context = server_context , chatty = False )
4226+ with server :
4227+ with client_context .wrap_socket (socket .socket (),
4228+ server_hostname = hostname ) as s :
4229+ s .connect ((HOST , server .port ))
4230+ s .write (b'PHA' )
4231+ # receive CertificateRequest
4232+ self .assertEqual (s .recv (1024 ), b'OK\n ' )
4233+ # send empty Certificate + Finish
4234+ s .write (b'HASCERT' )
4235+ # receive alert
4236+ with self .assertRaisesRegex (
4237+ ssl .SSLError ,
4238+ 'tlsv13 alert certificate required' ):
4239+ s .recv (1024 )
4240+
4241+ def test_pha_optional (self ):
4242+ if support .verbose :
4243+ sys .stdout .write ("\n " )
4244+
4245+ client_context , server_context , hostname = testing_context ()
4246+ server_context .post_handshake_auth = True
4247+ server_context .verify_mode = ssl .CERT_REQUIRED
4248+ client_context .post_handshake_auth = True
4249+ client_context .load_cert_chain (SIGNED_CERTFILE )
4250+
4251+ # check CERT_OPTIONAL
4252+ server_context .verify_mode = ssl .CERT_OPTIONAL
4253+ server = ThreadedEchoServer (context = server_context , chatty = False )
4254+ with server :
4255+ with client_context .wrap_socket (socket .socket (),
4256+ server_hostname = hostname ) as s :
4257+ s .connect ((HOST , server .port ))
4258+ s .write (b'HASCERT' )
4259+ self .assertEqual (s .recv (1024 ), b'FALSE\n ' )
4260+ s .write (b'PHA' )
4261+ self .assertEqual (s .recv (1024 ), b'OK\n ' )
4262+ s .write (b'HASCERT' )
4263+ self .assertEqual (s .recv (1024 ), b'TRUE\n ' )
4264+
4265+ def test_pha_optional_nocert (self ):
4266+ if support .verbose :
4267+ sys .stdout .write ("\n " )
4268+
4269+ client_context , server_context , hostname = testing_context ()
4270+ server_context .post_handshake_auth = True
4271+ server_context .verify_mode = ssl .CERT_OPTIONAL
4272+ client_context .post_handshake_auth = True
4273+
4274+ server = ThreadedEchoServer (context = server_context , chatty = False )
4275+ with server :
4276+ with client_context .wrap_socket (socket .socket (),
4277+ server_hostname = hostname ) as s :
4278+ s .connect ((HOST , server .port ))
4279+ s .write (b'HASCERT' )
4280+ self .assertEqual (s .recv (1024 ), b'FALSE\n ' )
4281+ s .write (b'PHA' )
4282+ self .assertEqual (s .recv (1024 ), b'OK\n ' )
4283+ # optional doens't fail when client does not have a cert
4284+ s .write (b'HASCERT' )
4285+ self .assertEqual (s .recv (1024 ), b'FALSE\n ' )
4286+
4287+ def test_pha_no_pha_client (self ):
4288+ client_context , server_context , hostname = testing_context ()
4289+ server_context .post_handshake_auth = True
4290+ server_context .verify_mode = ssl .CERT_REQUIRED
4291+ client_context .load_cert_chain (SIGNED_CERTFILE )
4292+
4293+ server = ThreadedEchoServer (context = server_context , chatty = False )
4294+ with server :
4295+ with client_context .wrap_socket (socket .socket (),
4296+ server_hostname = hostname ) as s :
4297+ s .connect ((HOST , server .port ))
4298+ with self .assertRaisesRegex (ssl .SSLError , 'not server' ):
4299+ s .verify_client_post_handshake ()
4300+ s .write (b'PHA' )
4301+ self .assertIn (b'extension not received' , s .recv (1024 ))
4302+
4303+ def test_pha_no_pha_server (self ):
4304+ # server doesn't have PHA enabled, cert is requested in handshake
4305+ client_context , server_context , hostname = testing_context ()
4306+ server_context .verify_mode = ssl .CERT_REQUIRED
4307+ client_context .post_handshake_auth = True
4308+ client_context .load_cert_chain (SIGNED_CERTFILE )
4309+
4310+ server = ThreadedEchoServer (context = server_context , chatty = False )
4311+ with server :
4312+ with client_context .wrap_socket (socket .socket (),
4313+ server_hostname = hostname ) as s :
4314+ s .connect ((HOST , server .port ))
4315+ s .write (b'HASCERT' )
4316+ self .assertEqual (s .recv (1024 ), b'TRUE\n ' )
4317+ # PHA doesn't fail if there is already a cert
4318+ s .write (b'PHA' )
4319+ self .assertEqual (s .recv (1024 ), b'OK\n ' )
4320+ s .write (b'HASCERT' )
4321+ self .assertEqual (s .recv (1024 ), b'TRUE\n ' )
4322+
4323+ def test_pha_not_tls13 (self ):
4324+ # TLS 1.2
4325+ client_context , server_context , hostname = testing_context ()
4326+ server_context .verify_mode = ssl .CERT_REQUIRED
4327+ client_context .maximum_version = ssl .TLSVersion .TLSv1_2
4328+ client_context .post_handshake_auth = True
4329+ client_context .load_cert_chain (SIGNED_CERTFILE )
4330+
4331+ server = ThreadedEchoServer (context = server_context , chatty = False )
4332+ with server :
4333+ with client_context .wrap_socket (socket .socket (),
4334+ server_hostname = hostname ) as s :
4335+ s .connect ((HOST , server .port ))
4336+ # PHA fails for TLS != 1.3
4337+ s .write (b'PHA' )
4338+ self .assertIn (b'WRONG_SSL_VERSION' , s .recv (1024 ))
4339+
4340+
41514341def test_main (verbose = False ):
41524342 if support .verbose :
41534343 import warnings
@@ -4183,6 +4373,7 @@ def test_main(verbose=False):
41834373 tests = [
41844374 ContextTests , BasicSocketTests , SSLErrorTests , MemoryBIOTests ,
41854375 SSLObjectTests , SimpleBackgroundTests , ThreadedTests ,
4376+ TestPostHandshakeAuth
41864377 ]
41874378
41884379 if support .is_resource_enabled ('network' ):
0 commit comments