Linux premium155.web-hosting.com 4.18.0-513.11.1.lve.el8.x86_64 #1 SMP Thu Jan 18 16:21:02 UTC 2024 x86_64
LiteSpeed
: 162.0.235.200 | : 18.188.205.95
Cant Read [ /etc/named.conf ]
7.4.33
varifktc
www.github.com/MadExploits
Terminal
AUTO ROOT
Adminer
Backdoor Destroyer
Linux Exploit
Lock Shell
Lock File
Create User
CREATE RDP
PHP Mailer
BACKCONNECT
UNLOCK SHELL
HASH IDENTIFIER
CPANEL RESET
CREATE WP USER
README
+ Create Folder
+ Create File
/
opt /
alt /
python33 /
lib64 /
python3.3 /
[ HOME SHELL ]
Name
Size
Permission
Action
__pycache__
[ DIR ]
drwxr-xr-x
collections
[ DIR ]
drwxr-xr-x
concurrent
[ DIR ]
drwxr-xr-x
config-3.3m
[ DIR ]
drwxr-xr-x
ctypes
[ DIR ]
drwxr-xr-x
curses
[ DIR ]
drwxr-xr-x
dbm
[ DIR ]
drwxr-xr-x
distutils
[ DIR ]
drwxr-xr-x
email
[ DIR ]
drwxr-xr-x
encodings
[ DIR ]
drwxr-xr-x
html
[ DIR ]
drwxr-xr-x
http
[ DIR ]
drwxr-xr-x
idlelib
[ DIR ]
drwxr-xr-x
importlib
[ DIR ]
drwxr-xr-x
json
[ DIR ]
drwxr-xr-x
lib-dynload
[ DIR ]
drwxr-xr-x
lib2to3
[ DIR ]
drwxr-xr-x
logging
[ DIR ]
drwxr-xr-x
multiprocessing
[ DIR ]
drwxr-xr-x
plat-linux
[ DIR ]
drwxr-xr-x
pydoc_data
[ DIR ]
drwxr-xr-x
site-packages
[ DIR ]
drwxr-xr-x
sqlite3
[ DIR ]
drwxr-xr-x
test
[ DIR ]
drwxr-xr-x
unittest
[ DIR ]
drwxr-xr-x
urllib
[ DIR ]
drwxr-xr-x
venv
[ DIR ]
drwxr-xr-x
wsgiref
[ DIR ]
drwxr-xr-x
xml
[ DIR ]
drwxr-xr-x
xmlrpc
[ DIR ]
drwxr-xr-x
__future__.py
4.48
KB
-rw-r--r--
__phello__.foo.py
64
B
-rw-r--r--
_compat_pickle.py
4.24
KB
-rw-r--r--
_dummy_thread.py
4.66
KB
-rw-r--r--
_markupbase.py
14.26
KB
-rw-r--r--
_osx_support.py
18.41
KB
-rw-r--r--
_pyio.py
71.2
KB
-rw-r--r--
_strptime.py
21.17
KB
-rw-r--r--
_sysconfigdata.py
22.31
KB
-rw-r--r--
_threading_local.py
7.24
KB
-rw-r--r--
_weakrefset.py
5.57
KB
-rw-r--r--
abc.py
7.87
KB
-rw-r--r--
aifc.py
30.33
KB
-rw-r--r--
antigravity.py
475
B
-rw-r--r--
argparse.py
86.98
KB
-rw-r--r--
ast.py
11.86
KB
-rw-r--r--
asynchat.py
11.32
KB
-rw-r--r--
asyncore.py
20.27
KB
-rw-r--r--
base64.py
13.66
KB
-rwxr-xr-x
bdb.py
21.38
KB
-rw-r--r--
binhex.py
13.39
KB
-rw-r--r--
bisect.py
2.53
KB
-rw-r--r--
bz2.py
18.04
KB
-rw-r--r--
cProfile.py
6.21
KB
-rwxr-xr-x
calendar.py
22.4
KB
-rw-r--r--
cgi.py
34.72
KB
-rwxr-xr-x
cgitb.py
11.76
KB
-rw-r--r--
chunk.py
5.25
KB
-rw-r--r--
cmd.py
14.51
KB
-rw-r--r--
code.py
9.79
KB
-rw-r--r--
codecs.py
35.11
KB
-rw-r--r--
codeop.py
5.85
KB
-rw-r--r--
colorsys.py
3.6
KB
-rw-r--r--
compileall.py
9.51
KB
-rw-r--r--
configparser.py
48.28
KB
-rw-r--r--
contextlib.py
8.91
KB
-rw-r--r--
copy.py
8.78
KB
-rw-r--r--
copyreg.py
6.46
KB
-rw-r--r--
crypt.py
1.83
KB
-rw-r--r--
csv.py
15.81
KB
-rw-r--r--
datetime.py
73.2
KB
-rw-r--r--
decimal.py
223.2
KB
-rw-r--r--
difflib.py
80.58
KB
-rw-r--r--
dis.py
9.9
KB
-rw-r--r--
doctest.py
100.52
KB
-rw-r--r--
dummy_threading.py
2.75
KB
-rw-r--r--
filecmp.py
9.37
KB
-rw-r--r--
fileinput.py
13.92
KB
-rw-r--r--
fnmatch.py
3.09
KB
-rw-r--r--
formatter.py
14.58
KB
-rw-r--r--
fractions.py
22.49
KB
-rw-r--r--
ftplib.py
39.31
KB
-rw-r--r--
functools.py
13.28
KB
-rw-r--r--
genericpath.py
3.02
KB
-rw-r--r--
getopt.py
7.31
KB
-rw-r--r--
getpass.py
5.66
KB
-rw-r--r--
gettext.py
20.15
KB
-rw-r--r--
glob.py
2.77
KB
-rw-r--r--
gzip.py
23.83
KB
-rw-r--r--
hashlib.py
6.05
KB
-rw-r--r--
heapq.py
17.58
KB
-rw-r--r--
hmac.py
4.34
KB
-rw-r--r--
imaplib.py
48.94
KB
-rw-r--r--
imghdr.py
3.45
KB
-rw-r--r--
imp.py
9.5
KB
-rw-r--r--
inspect.py
77.11
KB
-rw-r--r--
io.py
3.2
KB
-rw-r--r--
ipaddress.py
68.66
KB
-rw-r--r--
keyword.py
2.01
KB
-rwxr-xr-x
linecache.py
3.77
KB
-rw-r--r--
locale.py
91.03
KB
-rw-r--r--
lzma.py
17.04
KB
-rw-r--r--
macpath.py
5.49
KB
-rw-r--r--
macurl2path.py
2.67
KB
-rw-r--r--
mailbox.py
77.24
KB
-rw-r--r--
mailcap.py
7.26
KB
-rw-r--r--
mimetypes.py
20.25
KB
-rw-r--r--
modulefinder.py
22.65
KB
-rw-r--r--
netrc.py
5.61
KB
-rw-r--r--
nntplib.py
41.78
KB
-rw-r--r--
ntpath.py
19.96
KB
-rw-r--r--
nturl2path.py
2.34
KB
-rw-r--r--
numbers.py
10.15
KB
-rw-r--r--
opcode.py
4.98
KB
-rw-r--r--
optparse.py
58.93
KB
-rw-r--r--
os.py
33.96
KB
-rw-r--r--
os2emxpath.py
4.55
KB
-rw-r--r--
pdb.py
59.23
KB
-rwxr-xr-x
pickle.py
46.74
KB
-rw-r--r--
pickletools.py
79.44
KB
-rw-r--r--
pipes.py
8.71
KB
-rw-r--r--
pkgutil.py
21.03
KB
-rw-r--r--
platform.py
49.55
KB
-rwxr-xr-x
plistlib.py
14.43
KB
-rw-r--r--
poplib.py
11.11
KB
-rw-r--r--
posixpath.py
13.92
KB
-rw-r--r--
pprint.py
12.4
KB
-rw-r--r--
profile.py
20.95
KB
-rwxr-xr-x
pstats.py
25.75
KB
-rw-r--r--
pty.py
4.94
KB
-rw-r--r--
py_compile.py
6.56
KB
-rw-r--r--
pyclbr.py
13.12
KB
-rw-r--r--
pydoc.py
99.26
KB
-rwxr-xr-x
queue.py
8.63
KB
-rw-r--r--
quopri.py
7.14
KB
-rwxr-xr-x
random.py
25.06
KB
-rw-r--r--
re.py
14.62
KB
-rw-r--r--
reprlib.py
4.99
KB
-rw-r--r--
rlcompleter.py
5.4
KB
-rw-r--r--
runpy.py
10.17
KB
-rw-r--r--
sched.py
6.25
KB
-rw-r--r--
shelve.py
8.05
KB
-rw-r--r--
shlex.py
11.23
KB
-rw-r--r--
shutil.py
38.23
KB
-rw-r--r--
site.py
21.46
KB
-rw-r--r--
smtpd.py
29.5
KB
-rwxr-xr-x
smtplib.py
37.13
KB
-rwxr-xr-x
sndhdr.py
6.07
KB
-rw-r--r--
socket.py
14.56
KB
-rw-r--r--
socketserver.py
23.63
KB
-rw-r--r--
sre_compile.py
15.96
KB
-rw-r--r--
sre_constants.py
7.06
KB
-rw-r--r--
sre_parse.py
29.5
KB
-rw-r--r--
ssl.py
23.9
KB
-rw-r--r--
stat.py
4.2
KB
-rw-r--r--
string.py
9.19
KB
-rw-r--r--
stringprep.py
12.61
KB
-rw-r--r--
struct.py
238
B
-rw-r--r--
subprocess.py
65.99
KB
-rw-r--r--
sunau.py
17.11
KB
-rw-r--r--
symbol.py
2
KB
-rwxr-xr-x
symtable.py
7.21
KB
-rw-r--r--
sysconfig.py
24.58
KB
-rw-r--r--
tabnanny.py
11.14
KB
-rwxr-xr-x
tarfile.py
86.78
KB
-rwxr-xr-x
telnetlib.py
26.71
KB
-rw-r--r--
tempfile.py
22.47
KB
-rw-r--r--
textwrap.py
16.1
KB
-rw-r--r--
this.py
1003
B
-rw-r--r--
threading.py
44.57
KB
-rw-r--r--
timeit.py
12.1
KB
-rwxr-xr-x
token.py
2.96
KB
-rw-r--r--
tokenize.py
24.29
KB
-rw-r--r--
trace.py
30.75
KB
-rwxr-xr-x
traceback.py
11.7
KB
-rw-r--r--
tty.py
879
B
-rw-r--r--
types.py
3.09
KB
-rw-r--r--
uu.py
6.61
KB
-rwxr-xr-x
uuid.py
21.83
KB
-rw-r--r--
warnings.py
13.5
KB
-rw-r--r--
wave.py
18.14
KB
-rw-r--r--
weakref.py
11.23
KB
-rw-r--r--
webbrowser.py
22.38
KB
-rwxr-xr-x
xdrlib.py
5.25
KB
-rw-r--r--
zipfile.py
64.87
KB
-rw-r--r--
Delete
Unzip
Zip
${this.title}
Close
Code Editor : ssl.py
# Wrapper module for _ssl, providing some additional facilities # implemented in Python. Written by Bill Janssen. """This module provides some more Pythonic support for SSL. Object types: SSLSocket -- subtype of socket.socket which does SSL over the socket Exceptions: SSLError -- exception raised for I/O errors Functions: cert_time_to_seconds -- convert time string used for certificate notBefore and notAfter functions to integer seconds past the Epoch (the time values returned from time.time()) fetch_server_certificate (HOST, PORT) -- fetch the certificate provided by the server running on HOST at port PORT. No validation of the certificate is performed. Integer constants: SSL_ERROR_ZERO_RETURN SSL_ERROR_WANT_READ SSL_ERROR_WANT_WRITE SSL_ERROR_WANT_X509_LOOKUP SSL_ERROR_SYSCALL SSL_ERROR_SSL SSL_ERROR_WANT_CONNECT SSL_ERROR_EOF SSL_ERROR_INVALID_ERROR_CODE The following group define certificate requirements that one side is allowing/requiring from the other side: CERT_NONE - no certificates from the other side are required (or will be looked at if provided) CERT_OPTIONAL - certificates are not required, but if provided will be validated, and if validation fails, the connection will also fail CERT_REQUIRED - certificates are required, and will be validated, and if validation fails, the connection will also fail The following constants identify various SSL protocol variants: PROTOCOL_SSLv2 PROTOCOL_SSLv3 PROTOCOL_SSLv23 PROTOCOL_TLSv1 """ import textwrap import re import _ssl # if we can't import it, let the error propagate from _ssl import OPENSSL_VERSION_NUMBER, OPENSSL_VERSION_INFO, OPENSSL_VERSION from _ssl import _SSLContext from _ssl import ( SSLError, SSLZeroReturnError, SSLWantReadError, SSLWantWriteError, SSLSyscallError, SSLEOFError, ) from _ssl import CERT_NONE, CERT_OPTIONAL, CERT_REQUIRED from _ssl import ( OP_ALL, OP_NO_SSLv2, OP_NO_SSLv3, OP_NO_TLSv1, OP_CIPHER_SERVER_PREFERENCE, OP_SINGLE_DH_USE ) try: from _ssl import OP_NO_COMPRESSION except ImportError: pass try: from _ssl import OP_SINGLE_ECDH_USE except ImportError: pass from _ssl import RAND_status, RAND_add, RAND_bytes, RAND_pseudo_bytes try: from _ssl import RAND_egd except ImportError: pass from _ssl import ( SSL_ERROR_ZERO_RETURN, SSL_ERROR_WANT_READ, SSL_ERROR_WANT_WRITE, SSL_ERROR_WANT_X509_LOOKUP, SSL_ERROR_SYSCALL, SSL_ERROR_SSL, SSL_ERROR_WANT_CONNECT, SSL_ERROR_EOF, SSL_ERROR_INVALID_ERROR_CODE, ) from _ssl import HAS_SNI, HAS_ECDH, HAS_NPN from _ssl import (PROTOCOL_SSLv3, PROTOCOL_SSLv23, PROTOCOL_TLSv1, PROTOCOL_TLS) from _ssl import _OPENSSL_API_VERSION globals()['PROTOCOL_SSLv23'] = getattr(_ssl, 'PROTOCOL_TLS') _PROTOCOL_NAMES = { PROTOCOL_TLSv1: "TLSv1", PROTOCOL_SSLv23: "SSLv23", PROTOCOL_SSLv3: "SSLv3", PROTOCOL_TLS: "SSLv23" } try: from _ssl import PROTOCOL_SSLv2 _SSLv2_IF_EXISTS = PROTOCOL_SSLv2 except ImportError: _SSLv2_IF_EXISTS = None else: _PROTOCOL_NAMES[PROTOCOL_SSLv2] = "SSLv2" from socket import getnameinfo as _getnameinfo from socket import error as socket_error from socket import socket, AF_INET, SOCK_STREAM, create_connection from socket import SOL_SOCKET, SO_TYPE import base64 # for DER-to-PEM translation import traceback import errno if _ssl.HAS_TLS_UNIQUE: CHANNEL_BINDING_TYPES = ['tls-unique'] else: CHANNEL_BINDING_TYPES = [] # Disable weak or insecure ciphers by default # (OpenSSL's default setting is 'DEFAULT:!aNULL:!eNULL') _DEFAULT_CIPHERS = 'DEFAULT:!aNULL:!eNULL:!LOW:!EXPORT:!SSLv2' class CertificateError(ValueError): pass def _dnsname_match(dn, hostname, max_wildcards=1): """Matching according to RFC 6125, section 6.4.3 http://tools.ietf.org/html/rfc6125#section-6.4.3 """ pats = [] if not dn: return False leftmost, *remainder = dn.split(r'.') wildcards = leftmost.count('*') if wildcards > max_wildcards: # Issue #17980: avoid denials of service by refusing more # than one wildcard per fragment. A survery of established # policy among SSL implementations showed it to be a # reasonable choice. raise CertificateError( "too many wildcards in certificate DNS name: " + repr(dn)) # speed up common case w/o wildcards if not wildcards: return dn.lower() == hostname.lower() # RFC 6125, section 6.4.3, subitem 1. # The client SHOULD NOT attempt to match a presented identifier in which # the wildcard character comprises a label other than the left-most label. if leftmost == '*': # When '*' is a fragment by itself, it matches a non-empty dotless # fragment. pats.append('[^.]+') elif leftmost.startswith('xn--') or hostname.startswith('xn--'): # RFC 6125, section 6.4.3, subitem 3. # The client SHOULD NOT attempt to match a presented identifier # where the wildcard character is embedded within an A-label or # U-label of an internationalized domain name. pats.append(re.escape(leftmost)) else: # Otherwise, '*' matches any dotless string, e.g. www* pats.append(re.escape(leftmost).replace(r'\*', '[^.]*')) # add the remaining fragments, ignore any wildcards for frag in remainder: pats.append(re.escape(frag)) pat = re.compile(r'\A' + r'\.'.join(pats) + r'\Z', re.IGNORECASE) return pat.match(hostname) def match_hostname(cert, hostname): """Verify that *cert* (in decoded format as returned by SSLSocket.getpeercert()) matches the *hostname*. RFC 2818 and RFC 6125 rules are followed, but IP addresses are not accepted for *hostname*. CertificateError is raised on failure. On success, the function returns nothing. """ if not cert: raise ValueError("empty or no certificate") dnsnames = [] san = cert.get('subjectAltName', ()) for key, value in san: if key == 'DNS': if _dnsname_match(value, hostname): return dnsnames.append(value) if not dnsnames: # The subject is only checked when there is no dNSName entry # in subjectAltName for sub in cert.get('subject', ()): for key, value in sub: # XXX according to RFC 2818, the most specific Common Name # must be used. if key == 'commonName': if _dnsname_match(value, hostname): return dnsnames.append(value) if len(dnsnames) > 1: raise CertificateError("hostname %r " "doesn't match either of %s" % (hostname, ', '.join(map(repr, dnsnames)))) elif len(dnsnames) == 1: raise CertificateError("hostname %r " "doesn't match %r" % (hostname, dnsnames[0])) else: raise CertificateError("no appropriate commonName or " "subjectAltName fields were found") class SSLContext(_SSLContext): """An SSLContext holds various SSL-related configuration options and data, such as certificates and possibly a private key.""" __slots__ = ('protocol',) def __new__(cls, protocol, *args, **kwargs): self = _SSLContext.__new__(cls, protocol) if protocol != _SSLv2_IF_EXISTS: self.set_ciphers(_DEFAULT_CIPHERS) return self def __init__(self, protocol): self.protocol = protocol def wrap_socket(self, sock, server_side=False, do_handshake_on_connect=True, suppress_ragged_eofs=True, server_hostname=None): return SSLSocket(sock=sock, server_side=server_side, do_handshake_on_connect=do_handshake_on_connect, suppress_ragged_eofs=suppress_ragged_eofs, server_hostname=server_hostname, _context=self) def set_npn_protocols(self, npn_protocols): protos = bytearray() for protocol in npn_protocols: b = bytes(protocol, 'ascii') if len(b) == 0 or len(b) > 255: raise SSLError('NPN protocols must be 1 to 255 in length') protos.append(len(b)) protos.extend(b) self._set_npn_protocols(protos) class SSLSocket(socket): """This class implements a subtype of socket.socket that wraps the underlying OS socket in an SSL context when necessary, and provides read and write methods over that channel.""" def __init__(self, sock=None, keyfile=None, certfile=None, server_side=False, cert_reqs=CERT_NONE, ssl_version=PROTOCOL_TLS, ca_certs=None, do_handshake_on_connect=True, family=AF_INET, type=SOCK_STREAM, proto=0, fileno=None, suppress_ragged_eofs=True, npn_protocols=None, ciphers=None, server_hostname=None, _context=None): if _context: self.context = _context else: if server_side and not certfile: raise ValueError("certfile must be specified for server-side " "operations") if keyfile and not certfile: raise ValueError("certfile must be specified") if certfile and not keyfile: keyfile = certfile self.context = SSLContext(ssl_version) self.context.verify_mode = cert_reqs if ca_certs: self.context.load_verify_locations(ca_certs) if certfile: self.context.load_cert_chain(certfile, keyfile) if npn_protocols: self.context.set_npn_protocols(npn_protocols) if ciphers: self.context.set_ciphers(ciphers) self.keyfile = keyfile self.certfile = certfile self.cert_reqs = cert_reqs self.ssl_version = ssl_version self.ca_certs = ca_certs self.ciphers = ciphers # Can't use sock.type as other flags (such as SOCK_NONBLOCK) get # mixed in. if sock.getsockopt(SOL_SOCKET, SO_TYPE) != SOCK_STREAM: raise NotImplementedError("only stream sockets are supported") if server_side and server_hostname: raise ValueError("server_hostname can only be specified " "in client mode") self.server_side = server_side self.server_hostname = server_hostname self.do_handshake_on_connect = do_handshake_on_connect self.suppress_ragged_eofs = suppress_ragged_eofs connected = False if sock is not None: socket.__init__(self, family=sock.family, type=sock.type, proto=sock.proto, fileno=sock.fileno()) self.settimeout(sock.gettimeout()) # see if it's connected try: sock.getpeername() except socket_error as e: if e.errno != errno.ENOTCONN: raise else: connected = True sock.detach() elif fileno is not None: socket.__init__(self, fileno=fileno) else: socket.__init__(self, family=family, type=type, proto=proto) self._closed = False self._sslobj = None self._connected = connected if connected: # create the SSL object try: self._sslobj = self.context._wrap_socket(self, server_side, server_hostname) if do_handshake_on_connect: timeout = self.gettimeout() if timeout == 0.0: # non-blocking raise ValueError("do_handshake_on_connect should not be specified for non-blocking sockets") self.do_handshake() except socket_error as x: self.close() raise x def dup(self): raise NotImplemented("Can't dup() %s instances" % self.__class__.__name__) def _checkClosed(self, msg=None): # raise an exception here if you wish to check for spurious closes pass def read(self, len=0, buffer=None): """Read up to LEN bytes and return them. Return zero-length string on EOF.""" self._checkClosed() try: if buffer is not None: v = self._sslobj.read(len, buffer) else: v = self._sslobj.read(len or 1024) return v except SSLError as x: if x.args[0] == SSL_ERROR_EOF and self.suppress_ragged_eofs: if buffer is not None: return 0 else: return b'' else: raise def write(self, data): """Write DATA to the underlying SSL channel. Returns number of bytes of DATA actually transmitted.""" self._checkClosed() return self._sslobj.write(data) def getpeercert(self, binary_form=False): """Returns a formatted version of the data in the certificate provided by the other end of the SSL channel. Return None if no certificate was provided, {} if a certificate was provided, but not validated.""" self._checkClosed() return self._sslobj.peer_certificate(binary_form) def selected_npn_protocol(self): self._checkClosed() if not self._sslobj or not _ssl.HAS_NPN: return None else: return self._sslobj.selected_npn_protocol() def cipher(self): self._checkClosed() if not self._sslobj: return None else: return self._sslobj.cipher() def compression(self): self._checkClosed() if not self._sslobj: return None else: return self._sslobj.compression() def send(self, data, flags=0): self._checkClosed() if self._sslobj: if flags != 0: raise ValueError( "non-zero flags not allowed in calls to send() on %s" % self.__class__) while True: try: v = self._sslobj.write(data) except SSLError as x: if x.args[0] == SSL_ERROR_WANT_READ: return 0 elif x.args[0] == SSL_ERROR_WANT_WRITE: return 0 else: raise else: return v else: return socket.send(self, data, flags) def sendto(self, data, flags_or_addr, addr=None): self._checkClosed() if self._sslobj: raise ValueError("sendto not allowed on instances of %s" % self.__class__) elif addr is None: return socket.sendto(self, data, flags_or_addr) else: return socket.sendto(self, data, flags_or_addr, addr) def sendmsg(self, *args, **kwargs): # Ensure programs don't send data unencrypted if they try to # use this method. raise NotImplementedError("sendmsg not allowed on instances of %s" % self.__class__) def sendall(self, data, flags=0): self._checkClosed() if self._sslobj: if flags != 0: raise ValueError( "non-zero flags not allowed in calls to sendall() on %s" % self.__class__) amount = len(data) count = 0 while (count < amount): v = self.send(data[count:]) count += v return amount else: return socket.sendall(self, data, flags) def recv(self, buflen=1024, flags=0): self._checkClosed() if self._sslobj: if flags != 0: raise ValueError( "non-zero flags not allowed in calls to recv() on %s" % self.__class__) return self.read(buflen) else: return socket.recv(self, buflen, flags) def recv_into(self, buffer, nbytes=None, flags=0): self._checkClosed() if buffer and (nbytes is None): nbytes = len(buffer) elif nbytes is None: nbytes = 1024 if self._sslobj: if flags != 0: raise ValueError( "non-zero flags not allowed in calls to recv_into() on %s" % self.__class__) return self.read(nbytes, buffer) else: return socket.recv_into(self, buffer, nbytes, flags) def recvfrom(self, buflen=1024, flags=0): self._checkClosed() if self._sslobj: raise ValueError("recvfrom not allowed on instances of %s" % self.__class__) else: return socket.recvfrom(self, buflen, flags) def recvfrom_into(self, buffer, nbytes=None, flags=0): self._checkClosed() if self._sslobj: raise ValueError("recvfrom_into not allowed on instances of %s" % self.__class__) else: return socket.recvfrom_into(self, buffer, nbytes, flags) def recvmsg(self, *args, **kwargs): raise NotImplementedError("recvmsg not allowed on instances of %s" % self.__class__) def recvmsg_into(self, *args, **kwargs): raise NotImplementedError("recvmsg_into not allowed on instances of " "%s" % self.__class__) def pending(self): self._checkClosed() if self._sslobj: return self._sslobj.pending() else: return 0 def shutdown(self, how): self._checkClosed() self._sslobj = None socket.shutdown(self, how) def unwrap(self): if self._sslobj: s = self._sslobj.shutdown() self._sslobj = None return s else: raise ValueError("No SSL wrapper around " + str(self)) def _real_close(self): self._sslobj = None # self._closed = True socket._real_close(self) def do_handshake(self, block=False): """Perform a TLS/SSL handshake.""" timeout = self.gettimeout() try: if timeout == 0.0 and block: self.settimeout(None) self._sslobj.do_handshake() finally: self.settimeout(timeout) def _real_connect(self, addr, connect_ex): if self.server_side: raise ValueError("can't connect in server-side mode") # Here we assume that the socket is client-side, and not # connected at the time of the call. We connect it, then wrap it. if self._connected: raise ValueError("attempt to connect already-connected SSLSocket!") self._sslobj = self.context._wrap_socket(self, False, self.server_hostname) try: if connect_ex: rc = socket.connect_ex(self, addr) else: rc = None socket.connect(self, addr) if not rc: if self.do_handshake_on_connect: self.do_handshake() self._connected = True return rc except socket_error: self._sslobj = None raise def connect(self, addr): """Connects to remote ADDR, and then wraps the connection in an SSL channel.""" self._real_connect(addr, False) def connect_ex(self, addr): """Connects to remote ADDR, and then wraps the connection in an SSL channel.""" return self._real_connect(addr, True) def accept(self): """Accepts a new connection from a remote client, and returns a tuple containing that new connection wrapped with a server-side SSL channel, and the address of the remote client.""" newsock, addr = socket.accept(self) newsock = self.context.wrap_socket(newsock, do_handshake_on_connect=self.do_handshake_on_connect, suppress_ragged_eofs=self.suppress_ragged_eofs, server_side=True) return newsock, addr def get_channel_binding(self, cb_type="tls-unique"): """Get channel binding data for current connection. Raise ValueError if the requested `cb_type` is not supported. Return bytes of the data or None if the data is not available (e.g. before the handshake). """ if cb_type not in CHANNEL_BINDING_TYPES: raise ValueError("Unsupported channel binding type") if cb_type != "tls-unique": raise NotImplementedError( "{0} channel binding type not implemented" .format(cb_type)) if self._sslobj is None: return None return self._sslobj.tls_unique_cb() def wrap_socket(sock, keyfile=None, certfile=None, server_side=False, cert_reqs=CERT_NONE, ssl_version=PROTOCOL_TLS, ca_certs=None, do_handshake_on_connect=True, suppress_ragged_eofs=True, ciphers=None): return SSLSocket(sock=sock, keyfile=keyfile, certfile=certfile, server_side=server_side, cert_reqs=cert_reqs, ssl_version=ssl_version, ca_certs=ca_certs, do_handshake_on_connect=do_handshake_on_connect, suppress_ragged_eofs=suppress_ragged_eofs, ciphers=ciphers) # some utility functions def cert_time_to_seconds(cert_time): """Takes a date-time string in standard ASN1_print form ("MON DAY 24HOUR:MINUTE:SEC YEAR TIMEZONE") and return a Python time value in seconds past the epoch.""" import time return time.mktime(time.strptime(cert_time, "%b %d %H:%M:%S %Y GMT")) PEM_HEADER = "-----BEGIN CERTIFICATE-----" PEM_FOOTER = "-----END CERTIFICATE-----" def DER_cert_to_PEM_cert(der_cert_bytes): """Takes a certificate in binary DER format and returns the PEM version of it as a string.""" f = str(base64.standard_b64encode(der_cert_bytes), 'ASCII', 'strict') return (PEM_HEADER + '\n' + textwrap.fill(f, 64) + '\n' + PEM_FOOTER + '\n') def PEM_cert_to_DER_cert(pem_cert_string): """Takes a certificate in ASCII PEM format and returns the DER-encoded version of it as a byte sequence""" if not pem_cert_string.startswith(PEM_HEADER): raise ValueError("Invalid PEM encoding; must start with %s" % PEM_HEADER) if not pem_cert_string.strip().endswith(PEM_FOOTER): raise ValueError("Invalid PEM encoding; must end with %s" % PEM_FOOTER) d = pem_cert_string.strip()[len(PEM_HEADER):-len(PEM_FOOTER)] return base64.decodebytes(d.encode('ASCII', 'strict')) def get_server_certificate(addr, ssl_version=PROTOCOL_SSLv3, ca_certs=None): """Retrieve the certificate from the server at the specified address, and return it as a PEM-encoded string. If 'ca_certs' is specified, validate the server cert against it. If 'ssl_version' is specified, use it in the connection attempt.""" host, port = addr if (ca_certs is not None): cert_reqs = CERT_REQUIRED else: cert_reqs = CERT_NONE s = create_connection(addr) s = wrap_socket(s, ssl_version=ssl_version, cert_reqs=cert_reqs, ca_certs=ca_certs) dercert = s.getpeercert(True) s.close() return DER_cert_to_PEM_cert(dercert) def get_protocol_name(protocol_code): return _PROTOCOL_NAMES.get(protocol_code, '<unknown>')
Close