Clone pyezviz from https://github.com/BaQs/pyEzviz
This commit is contained in:
@@ -0,0 +1,147 @@
|
||||
"""Test camera RTSP authentication"""
|
||||
import base64
|
||||
import hashlib
|
||||
import socket
|
||||
from pyezviz.exceptions import AuthTestResultFailed, InvalidHost
|
||||
|
||||
|
||||
def genmsg_describe(url, seq, user_agent, auth_seq):
|
||||
"""Generate RTSP describe message"""
|
||||
msg_ret = "DESCRIBE " + url + " RTSP/1.0\r\n"
|
||||
msg_ret += "CSeq: " + str(seq) + "\r\n"
|
||||
msg_ret += "Authorization: " + auth_seq + "\r\n"
|
||||
msg_ret += "User-Agent: " + user_agent + "\r\n"
|
||||
msg_ret += "Accept: application/sdp\r\n"
|
||||
msg_ret += "\r\n"
|
||||
return msg_ret
|
||||
|
||||
|
||||
class TestRTSPAuth:
|
||||
"""Initialize RTSP credential test"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
ip_addr,
|
||||
username=None,
|
||||
password=None,
|
||||
test_uri="",
|
||||
):
|
||||
self._rtsp_details = {
|
||||
"bufLen": 1024,
|
||||
"defaultServerIp": ip_addr,
|
||||
"defaultServerPort": 554,
|
||||
"defaultTestUri": test_uri,
|
||||
"defaultUserAgent": "RTSP Client",
|
||||
"defaultUsername": username,
|
||||
"defaultPassword": password,
|
||||
}
|
||||
|
||||
def generate_auth_string(self, realm, method, uri, nonce):
|
||||
"""Generate digest auth string """
|
||||
map_return_info = {}
|
||||
m_1 = hashlib.md5(
|
||||
f"{self._rtsp_details['defaultUsername']}:"
|
||||
f"{realm.decode()}:"
|
||||
f"{self._rtsp_details['defaultPassword']}".encode()
|
||||
).hexdigest()
|
||||
m_2 = hashlib.md5(f"{method}:{uri}".encode()).hexdigest()
|
||||
response = hashlib.md5(f"{m_1}:{nonce}:{m_2}".encode()).hexdigest()
|
||||
|
||||
map_return_info = (
|
||||
f"Digest "
|
||||
f"username=\"{self._rtsp_details['defaultUsername']}\", "
|
||||
f'realm="{realm.decode()}", '
|
||||
f'algorithm="MD5", '
|
||||
f'nonce="{nonce.decode()}", '
|
||||
f'uri="{uri}", '
|
||||
f'response="{response}"'
|
||||
)
|
||||
return map_return_info
|
||||
|
||||
def main(self):
|
||||
"""Main function """
|
||||
session = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
|
||||
try:
|
||||
session.connect(
|
||||
(
|
||||
self._rtsp_details["defaultServerIp"],
|
||||
self._rtsp_details["defaultServerPort"],
|
||||
)
|
||||
)
|
||||
|
||||
except TimeoutError as err:
|
||||
raise AuthTestResultFailed("Invalid ip or camera hibernating") from err
|
||||
|
||||
except (socket.gaierror, ConnectionRefusedError) as err:
|
||||
raise InvalidHost("Invalid IP or Hostname") from err
|
||||
|
||||
seq = 1
|
||||
|
||||
url = (
|
||||
"rtsp://"
|
||||
+ self._rtsp_details["defaultServerIp"]
|
||||
+ self._rtsp_details["defaultTestUri"]
|
||||
)
|
||||
|
||||
auth_seq = base64.b64encode(
|
||||
f"{self._rtsp_details['defaultUsername']}:"
|
||||
f"{self._rtsp_details['defaultPassword']}".encode("ascii")
|
||||
)
|
||||
auth_seq = "Basic " + auth_seq.decode()
|
||||
|
||||
print(
|
||||
genmsg_describe(url, seq, self._rtsp_details["defaultUserAgent"], auth_seq)
|
||||
)
|
||||
session.send(
|
||||
genmsg_describe(
|
||||
url, seq, self._rtsp_details["defaultUserAgent"], auth_seq
|
||||
).encode()
|
||||
)
|
||||
msg1 = session.recv(self._rtsp_details["bufLen"])
|
||||
seq = seq + 1
|
||||
|
||||
if msg1.decode().find("200 OK") > 1:
|
||||
print(f"Basic auth result: {msg1.decode()}")
|
||||
return print("Basic Auth test passed. Credentials Valid!")
|
||||
|
||||
if msg1.decode().find("Unauthorized") > 1:
|
||||
# Basic failed, doing new DESCRIBE with digest authentication.
|
||||
start = msg1.decode().find("realm")
|
||||
begin = msg1.decode().find('"', start)
|
||||
end = msg1.decode().find('"', begin + 1)
|
||||
realm = msg1[begin + 1 : end]
|
||||
|
||||
start = msg1.decode().find("nonce")
|
||||
begin = msg1.decode().find('"', start)
|
||||
end = msg1.decode().find('"', begin + 1)
|
||||
nonce = msg1[begin + 1 : end]
|
||||
|
||||
auth_seq = self.generate_auth_string(
|
||||
realm,
|
||||
"DESCRIBE",
|
||||
self._rtsp_details["defaultTestUri"],
|
||||
nonce,
|
||||
)
|
||||
|
||||
print(
|
||||
genmsg_describe(
|
||||
url, seq, self._rtsp_details["defaultUserAgent"], auth_seq
|
||||
)
|
||||
)
|
||||
|
||||
session.send(
|
||||
genmsg_describe(
|
||||
url, seq, self._rtsp_details["defaultUserAgent"], auth_seq
|
||||
).encode()
|
||||
)
|
||||
msg1 = session.recv(self._rtsp_details["bufLen"])
|
||||
print(f"Digest auth result: {msg1.decode()}")
|
||||
|
||||
if msg1.decode().find("200 OK") > 1:
|
||||
return print("Digest Auth test Passed. Credentials Valid!")
|
||||
|
||||
if msg1.decode().find("401 Unauthorized") > 1:
|
||||
raise AuthTestResultFailed("Credentials not valid!!")
|
||||
|
||||
return print("Basic Auth test passed. Credentials Valid!")
|
||||
Reference in New Issue
Block a user