#!/usr/bin/python3 import time import asyncio import signal import psutil import alsaaudio import requests from subprocess import run from datetime import datetime DD = ';' DELIM = ' ' COLOR = True def shutdown(): print("Shutting down...") loop.stop() async def dwmbar(): global DD await asyncio.sleep(1) while True: bar = f"{status()}{DD}{extrabar()}" # print(bar) run(["xprop", "-root", "-set", "WM_NAME", bar], check=True) await asyncio.sleep(0.1) net = ram = cpu = gpu = wlan = disk = vol = lang = dat = tim = currates = weather = 0 def status(): global DELIM, net, ram, cpu, gpu, wlan, disk, vol, lang, dat, tim try: bar = [ net, ram, cpu, gpu, disk, vol, lang, dat, tim ] return DELIM.join(bar) except: return '' def extrabar(): global DELIM, currates, weather, COLOR try: bar = [ weather, # currates ] return DELIM.join(bar) except: return '' async def currates(): global DELIM, currates, COLOR COLOR1='#AAAAAA' COLOR2='#444444' while True: curs = [] try: curs.append(['', f"${float(requests.get('https://api.binance.com/api/v3/ticker/24hr?symbol=BTCUSDT').json()['lastPrice']):.2f}"]) # curs.append(['', f"${float(requests.get('https://api.binance.com/api/v3/ticker/24hr?symbol=LTCUSDT').json()['lastPrice']):.2f}"]) # curs.append(['Ð', f"${float(requests.get('https://api.binance.com/api/v3/ticker/24hr?symbol=DOGEUSDT').json()['lastPrice']):.2f}"]) if COLOR: row = list(map(lambda x: f'^b{COLOR1}^^c{COLOR2}^ {x[0]} ^c{COLOR1}^^b{COLOR2}^ {x[1]} ^d^', curs)) else: row = list(map(lambda x: f'[{x[0]} | {x[1]}]', curs)) currates = DELIM.join(row) except: pass await asyncio.sleep(60 * 10) async def weather(): global weather, COLOR COLOR1='#AAAAAA' COLOR2='#444444' CITY='Tyumen' while True: try: i = requests.get(f"https://wttr.in/{CITY}?format=%c").text.strip() #.encode('utf-8')[0:3].decode('utf-8') t = requests.get(f"https://wttr.in/{CITY}?format=%t").text.strip() w = requests.get(f"https://wttr.in/{CITY}?format=j2").json()['current_condition'][0]['windspeedKmph'].strip() if COLOR: weather = f"^b{COLOR2}^^c{COLOR1}^ {w} km/h ^b{COLOR1}^^c{COLOR2}^ {i} ^c{COLOR1}^^b{COLOR2}^ {t} ^d^" else: weather = f"[{w} km/h | {i} | {t}]" except: pass await asyncio.sleep(60 * 30) async def net(): global net, COLOR COLOR1='#FF9B00' COLOR2='#A16200' osent = psutil.net_io_counters().bytes_sent orecv = psutil.net_io_counters().bytes_recv while True: sent = psutil.net_io_counters().bytes_sent recv = psutil.net_io_counters().bytes_recv inb = f'{(recv - orecv) / 1024:0.0f} KB/s' outb = f'{(sent - osent) / 1024:0.0f} KB/s' orecv = recv osent = sent if COLOR: net = f'^c{COLOR1}^^b{COLOR2}^ {inb} ^b{COLOR1}^^c{COLOR2}^  ^c{COLOR1}^^b{COLOR2}^ {outb} ^d^' else: net = f'[{inb} |  | {outb}]' await asyncio.sleep(1) async def ram(): global ram, COLOR COLOR1='#C700FF' COLOR2='#590073' while True: ramt = psutil.virtual_memory().total / 1024 / 1024 / 1024 ramu = psutil.virtual_memory().used / 1024 / 1024 / 1024 if COLOR: ram = f'^b{COLOR1}^^c{COLOR2}^  ^c{COLOR1}^^b{COLOR2}^ {ramu:0.1f}/{ramt:0.1f}G ^d^' else: ram = f'[ | {ramu:0.1f}/{ramt:0.1f}G]' await asyncio.sleep(1) async def cpu(): global cpu, COLOR COLOR1='#FF00B2' COLOR2='#990057' while True: load = psutil.cpu_percent() if COLOR: cpu = f'^b{COLOR1}^^c{COLOR2}^  ^c{COLOR1}^^b{COLOR2}^ {load:0.0f}% ^d^' else: cpu = f'[ | {load:0.0f}%]' await asyncio.sleep(1) async def gpu(): global gpu, COLOR COLOR1='#FF00B2' COLOR2='#990057' while True: load = open('/sys/class/drm/card1/device/gpu_busy_percent').read().strip() if COLOR: gpu = f'^b{COLOR1}^^c{COLOR2}^  ^c{COLOR1}^^b{COLOR2}^ {load}% ^d^' else: gpu = f'[ | {load}%]' await asyncio.sleep(1) async def wlan(): global cpu, COLOR COLOR1='#009CFF' COLOR2='#005083' while True: c = psutil.cpu_percent() i = '' if COLOR: cpu = f'^b{COLOR1}^^c{COLOR2}^  ^c{COLOR1}^^b{COLOR2}^ {c:0.0f}% ^d^' else: cpu = f'[ | {c:0.0f}%]' await asyncio.sleep(1) async def disk(): global disk, COLOR COLOR1='#00FF90' COLOR2='#00A45D' while True: f = psutil.disk_usage('/').free / 1024 / 1024 / 1024 t = psutil.disk_usage('/').total / 1024 / 1024 / 1024 if COLOR: disk = f'^b{COLOR1}^^c{COLOR2}^  ^c{COLOR1}^^b{COLOR2}^ {f:0.0f}/{t:0.0f}G ^d^' else: disk = f'[ | {f:0.0f}/{t:0.0f}G]' await asyncio.sleep(3) async def vol(): global vol, COLOR COLOR1='#FF0000' COLOR2='#700000' while True: v = alsaaudio.Mixer().getvolume()[0] if v > 67: i = '' elif v > 34: i = '' elif v > 1: i = '' else: i = '' if alsaaudio.Mixer().getmute()[0]: i = '' if COLOR: vol = f'^b{COLOR1}^^c{COLOR2}^ {i} ^c{COLOR1}^^b{COLOR2}^ {v}% ^d^' else: vol = f'[{i} | {v}%]' await asyncio.sleep(1) async def lang(): global lang, COLOR COLOR1='#FF5800' COLOR2='#973400' while True: l = run(["xkblayout-state", "print", "%s"], check=True, capture_output=True, text=True).stdout if COLOR: lang = f'^b{COLOR1}^^c{COLOR2}^  ^c{COLOR1}^^b{COLOR2}^ {l} ^d^' else: lang = f'[ | {l}]' await asyncio.sleep(0.1) async def tim(): global tim, COLOR COLOR1='#00E9FF' COLOR2='#008592' while True: t = datetime.now().strftime("%H:%M:%S") if COLOR: tim = f'^b{COLOR1}^^c{COLOR2}^  ^c{COLOR1}^^b{COLOR2}^ {t} ^d^' else: tim = f'[ | {t}]' await asyncio.sleep(1) async def dat(): global dat, COLOR COLOR1='#17FF00' COLOR2='#0EA100' while True: d = datetime.now().strftime("%d.%m.%y") if COLOR: dat = f'^b{COLOR1}^^c{COLOR2}^  ^c{COLOR1}^^b{COLOR2}^ {d} ^d^' else: dat = f'[ | {d}]' await asyncio.sleep(1) async def main(): await asyncio.gather( dwmbar(), net(), ram(), cpu(), gpu(), disk(), vol(), lang(), dat(), tim(), # currates(), weather() ) if __name__ == '__main__': signal.signal(signal.SIGINT, lambda n, f: shutdown()) loop = asyncio.get_event_loop() main_task = asyncio.ensure_future(main()) try: loop.run_until_complete(main_task) except KeyboardInterrupt: print("KeyboardInterrupt: Stopping the event loop...") finally: loop.close()