#!/usr/bin/python3 import time import asyncio import signal import psutil import alsaaudio import requests from subprocess import run, check_output from Xlib import X, display, Xutil from datetime import datetime DD = ';' DELIM = ' ' def shutdown(): for task in asyncio.Task.all_tasks(): if task is not asyncio.tasks.Task.current_task(): task.cancel() async def dwmbar(): global bl, br await asyncio.sleep(1) while True: bar = f"{status()}{DD}{bl}{DD}{br}" # print(bar.encode('utf-8')) # print(br) run(["xprop", "-root", "-set", "WM_NAME", bar], check=True) await asyncio.sleep(0.1) def status(): global DELIM, net, ram, cpu, gpu, wlan, disk, vol, lang, dat, tim try: bar = [ net, ram, cpu, gpu, disk, vol, lang, dat, tim ] return DELIM.join(bar) except: return '' async def bl(): global bl COLOR1='#AAAAAA' COLOR2='#444444' while True: curs = [] curs.append(['', f"${float(requests.get('https://api.binance.com/api/v3/ticker/24hr?symbol=XMRUSDT').json()['lastPrice']):.2f}"]) curs.append(['', f"${float(requests.get('https://api.binance.com/api/v3/ticker/24hr?symbol=BTCUSDT').json()['lastPrice']):.2f}"]) curs.append(['', f"${float(requests.get('https://api.binance.com/api/v3/ticker/24hr?symbol=LTCUSDT').json()['lastPrice']):.2f}"]) curs.append(['Ð', f"${float(requests.get('https://api.binance.com/api/v3/ticker/24hr?symbol=DOGEUSDT').json()['lastPrice']):.2f}"]) curs.append(['$', f"{float(requests.get('https://api.binance.com/api/v3/ticker/24hr?symbol=USDTRUB').json()['lastPrice']):.2f} RUB"]) curs.append(['', f"""{ float(requests.get('https://api.binance.com/api/v3/ticker/24hr?symbol=USDTRUB').json()['lastPrice']) / float(requests.get('https://api.binance.com/api/v3/ticker/24hr?symbol=USDTTRY').json()['lastPrice']) :.2f} RUB""" ]) curs.append(['₴', f"""{ float(requests.get('https://api.binance.com/api/v3/ticker/24hr?symbol=USDTRUB').json()['lastPrice']) / float(requests.get('https://api.binance.com/api/v3/ticker/24hr?symbol=USDTUAH').json()['lastPrice']) :.2f} RUB""" ]) row = list(map(lambda x: f'^b{COLOR1}^^c{COLOR2}^ {x[0]} ^c{COLOR1}^^b{COLOR2}^ {x[1]} ^d^', curs)) bl = DELIM.join(row) await asyncio.sleep(60 * 10) async def br(): global br COLOR1='#AAAAAA' COLOR2='#444444' CITY='Tyumen' while True: i = requests.get(f"https://wttr.in/{CITY}?format=%c").text.strip() #.encode('utf-8')[0:3].decode('utf-8') t = requests.get(f"https://wttr.in/{CITY}?format=%t").text.strip() w = requests.get(f"https://wttr.in/{CITY}?format=j2").json()['current_condition'][0]['windspeedKmph'].strip() br = f"^b{COLOR2}^^c{COLOR1}^ {w} km/h ^b{COLOR1}^^c{COLOR2}^ {i} ^c{COLOR1}^^b{COLOR2}^ {t} ^d^" await asyncio.sleep(60 * 30) async def net(): global net COLOR1='#FF9B00' COLOR2='#A16200' osent = psutil.net_io_counters().bytes_sent orecv = psutil.net_io_counters().bytes_recv while True: sent = psutil.net_io_counters().bytes_sent recv = psutil.net_io_counters().bytes_recv inb = f'{(recv - orecv) / 1024:0.0f} KB/s' outb = f'{(sent - osent) / 1024:0.0f} KB/s' orecv = recv osent = sent net = f'^c{COLOR1}^^b{COLOR2}^ {inb} ^b{COLOR1}^^c{COLOR2}^  ^c{COLOR1}^^b{COLOR2}^ {outb} ^d^' await asyncio.sleep(1) async def ram(): global ram COLOR1='#C700FF' COLOR2='#590073' while True: ramt = psutil.virtual_memory().total / 1024 / 1024 / 1024 ramu = psutil.virtual_memory().used / 1024 / 1024 / 1024 ram = f'^b{COLOR1}^^c{COLOR2}^  ^c{COLOR1}^^b{COLOR2}^ {ramu:0.1f}/{ramt:0.1f}G ^d^' await asyncio.sleep(1) async def cpu(): global cpu COLOR1='#990057' COLOR2='#FF00B2' while True: load = psutil.cpu_percent() cpu = f'^b{COLOR1}^^c{COLOR2}^  ^c{COLOR1}^^b{COLOR2}^ {load:0.0f}% ^d^' await asyncio.sleep(1) async def gpu(): global gpu COLOR1='#990057' COLOR2='#FF00B2' while True: load = open('/sys/class/drm/card0/device/gpu_busy_percent').read().strip() gpu = f'^b{COLOR1}^^c{COLOR2}^  ^c{COLOR1}^^b{COLOR2}^ {load}% ^d^' await asyncio.sleep(1) async def wlan(): global cpu COLOR1='#009CFF' COLOR2='#005083' while True: c = psutil.cpu_percent() i = '' cpu = f'^b{COLOR1}^^c{COLOR2}^  ^c{COLOR1}^^b{COLOR2}^ {c:0.0f}% ^d^' await asyncio.sleep(1) async def disk(): global disk COLOR1='#00FF90' COLOR2='#00A45D' while True: f = psutil.disk_usage('/').free / 1024 / 1024 / 1024 t = psutil.disk_usage('/').total / 1024 / 1024 / 1024 disk = f'^b{COLOR1}^^c{COLOR2}^  ^c{COLOR1}^^b{COLOR2}^ {f:0.0f}/{t:0.0f}G ^d^' await asyncio.sleep(3) async def vol(): global vol COLOR1='#FF0000' COLOR2='#700000' while True: v = alsaaudio.Mixer().getvolume()[0] if v > 67: i = '' elif v > 34: i = '' elif v > 1: i = '' else: i = '' if alsaaudio.Mixer().getmute()[0]: i = '' vol = f'^b{COLOR1}^^c{COLOR2}^ {i} ^c{COLOR1}^^b{COLOR2}^ {v}% ^d^' await asyncio.sleep(1) async def lang(): global lang COLOR1='#FF5800' COLOR2='#973400' while True: l = run(["xkblayout-state", "print", "%s"], check=True, capture_output=True, text=True).stdout lang = f'^b{COLOR1}^^c{COLOR2}^  ^c{COLOR1}^^b{COLOR2}^ {l} ^d^' await asyncio.sleep(0.1) async def tim(): global tim COLOR1='#00E9FF' COLOR2='#008592' while True: t = datetime.now().strftime("%H:%M:%S") tim = f'^b{COLOR1}^^c{COLOR2}^  ^c{COLOR1}^^b{COLOR2}^ {t} ^d^' await asyncio.sleep(1) async def dat(): global dat COLOR1='#17FF00' COLOR2='#0EA100' while True: d = datetime.now().strftime("%d.%m.%y") dat = f'^b{COLOR1}^^c{COLOR2}^  ^c{COLOR1}^^b{COLOR2}^ {d} ^d^' await asyncio.sleep(1) if __name__ == '__main__': signal.signal(signal.SIGINT, lambda n, f: shutdown()) loop = asyncio.get_event_loop() main_task = asyncio.wait([ dwmbar(), net(), ram(), cpu(), gpu(), disk(), vol(), lang(), dat(), tim(), bl(), br() ]) try: loop.run_until_complete(main_task) except asyncio.CancelledError: loop.run_until_complete(main_task) loop.close()