import os, logging DEBUG = os.environ.get('DEBUG') logging.basicConfig( format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s - %(message)s', level=logging.DEBUG if DEBUG else logging.INFO) logging.getLogger('aiohttp').setLevel(logging.DEBUG if DEBUG else logging.WARNING) import asyncio import aiohttp import zlib import struct import json import settings HEADER_LENGTH = 8 async def connect(): data = dict( username=settings.UFP_USERNAME, password=settings.UFP_PASSWORD, rememberMe=True, ) logging.info('Connecting to Unifi Protect...') async with aiohttp.ClientSession() as session: async with session.post(settings.UFP_ADDRESS + '/api/auth/login', json=data, ssl=False, timeout=5) as resp: cookie = resp.cookies['TOKEN'] logging.info('Got cookie.') headers = {'cookie': cookie.key + '=' + cookie.value} async with session.ws_connect( settings.UFP_ADDRESS + '/proxy/protect/ws/updates', headers=headers, ssl=False, receive_timeout=10.0, heartbeat=10.0, ) as ws: yield 'CONNECTED' async for msg in ws: packet_type, payload_format, deflated, unknown, payload_size = struct.unpack('!bbbbi', msg.data[0:HEADER_LENGTH]) action_start = HEADER_LENGTH action_packet = zlib.decompress(msg.data[action_start:]) data_start = payload_size + 2*HEADER_LENGTH data_packet = zlib.decompress(msg.data[data_start:]) yield json.loads(data_packet.decode()) logging.info('Lost connection to web socket.') async def test(): async for msg in connect(): print(msg) if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(test()) loop.close()