systemd.py
📥 Install
# █ █ ▀ █▄▀ ▄▀█ █▀█ ▀
# █▀█ █ █ █ █▀█ █▀▄ █
# © Copyright 2022
# https://t.me/hikariatama
#
# 🔒 Licensed under the GNU AGPLv3
# 🌐 https://www.gnu.org/licenses/agpl-3.0.html
# scope: hikka_min 1.2.10
# meta pic: https://img.icons8.com/plasticine/344/apple-settings--v2.png
# meta banner: https://mods.hikariatama.ru/badges/systemd.jpg
# scope: inline
# scope: hikka_only
# meta developer: @hikarimods
# ⚠️ Please, ensure that userbot has enough rights to control units
# Put these lines in /etc/sudoers using visudo command:
#
# user ALL=(ALL) NOPASSWD: /bin/systemctl
# user ALL=(ALL) NOPASSWD: /bin/journalctl
#
# Where `user` is user on behalf of which the userbot is running
import asyncio
import io
import subprocess
from typing import Union
from telethon.tl.types import Message
from .. import loader, utils
from ..inline.types import InlineCall
def human_readable_size(size: float, decimal_places: int = 2) -> str:
for unit in ["B", "K", "M", "G", "T", "P"]:
if size < 1024.0 or unit == "P":
break
size /= 1024.0
return f"{size:.{decimal_places}f} {unit}"
@loader.tds
class SystemdMod(loader.Module):
"""Control systemd units easily"""
strings = {
"name": "Systemd",
"panel": (
"<emoji document_id=5771858080664915483>🎛</emoji> <b>Here you can control"
" your systemd units</b>\n\n{}"
),
"unit_doesnt_exist": (
"<emoji document_id=5312526098750252863>🚫</emoji> <b>Unit</b>"
" <code>{}</code> <b>doesn't exist!</b>"
),
"args": (
"<emoji document_id=5312526098750252863>🚫</emoji> <b>No arguments"
" specified</b>"
),
"unit_added": (
"<emoji document_id=5314250708508220914>✅</emoji> <b>Unit"
" </b><code>{}</code><b> with name </b><code>{}</code><b> added</b>"
),
"unit_removed": (
"<emoji document_id=5314250708508220914>✅</emoji> <b>Unit"
" </b><code>{}</code><b> removed</b>"
),
"unit_action_done": (
"<emoji document_id=5314250708508220914>✅</emoji> <b>Action"
" </b><code>{}</code><b> performed on unit </b><code>{}</code>"
),
"unit_control": (
"<emoji document_id=5771858080664915483>🎛</emoji> <b>Interacting with unit"
" </b><code>{}</code><b> (</b><code>{}</code><b>)</b>\n{} <b>Unit status:"
" </b><code>{}</code>"
),
"action_not_found": (
"<emoji document_id=5312526098750252863>🚫</emoji> <b>Action"
" </b><code>{}</code><b> not found</b>"
),
"unit_renamed": (
"<emoji document_id=5314250708508220914>✅</emoji> <b>Unit"
" </b><code>{}</code><b> renamed to </b><code>{}</code>"
),
"stop_btn": "🍎 Stop",
"start_btn": "🍏 Start",
"restart_btn": "🔄 Restart",
"logs_btn": "📄 Logs",
"tail_btn": "🚅 Tail",
"back_btn": "🔙 Back",
"close_btn": "✖️ Close",
"refresh_btn": "🔄 Refresh",
}
strings_ru = {
"panel": (
"<emoji document_id=5771858080664915483>🎛</emoji> <b>Здесь вы можете"
" управлять своими юнитами systemd</b>\n\n{}"
),
"unit_doesnt_exist": (
"<emoji document_id=5312526098750252863>🚫</emoji> <b>Юнит</b>"
" <code>{}</code> <b>не существует!</b>"
),
"args": (
"<emoji document_id=5312526098750252863>🚫</emoji> <b>Не указаны"
" аргументы</b>"
),
"unit_added": (
"<emoji document_id=5314250708508220914>✅</emoji> <b>Юнит"
" </b><code>{}</code><b> с именем </b><code>{}</code><b> добавлен</b>"
),
"unit_removed": (
"<emoji document_id=5314250708508220914>✅</emoji> <b>Юнит"
" </b><code>{}</code><b> удалён</b>"
),
"unit_action_done": (
"<emoji document_id=5314250708508220914>✅</emoji> <b>Действие"
" </b><code>{}</code><b> выполнено на юните </b><code>{}</code>"
),
"unit_control": (
"<emoji document_id=5771858080664915483>🎛</emoji> <b>Взаимодействие с"
" юнитом </b><code>{}</code><b> (</b><code>{}</code><b>)</b>\n{} <b>Статус"
" юнита: </b><code>{}</code>"
),
"action_not_found": (
"<emoji document_id=5312526098750252863>🚫</emoji> <b>Действие"
" </b><code>{}</code><b> не найдено</b>"
),
"unit_renamed": (
"<emoji document_id=5314250708508220914>✅</emoji> <b>Юнит"
" </b><code>{}</code><b> переименован в </b><code>{}</code>"
),
"stop_btn": "🍎 Стоп",
"start_btn": "🍏 Старт",
"restart_btn": "🔄 Рестарт",
"logs_btn": "📄 Логи",
"tail_btn": "🚅 Тейл",
"back_btn": "🔙 Назад",
"close_btn": "✖️ Закрыть",
"refresh_btn": "🔄 Обновить",
"_cmd_doc_units": "Показать список юнитов",
"_cmd_doc_addunit": "<unit> - Добавить юнит",
"_cmd_doc_nameunit": "<unit> - Переименовать юнит",
"_cmd_doc_delunit": "<unit> - Удалить юнит",
"_cmd_doc_unit": "<unit> - Управлять юнитом",
"_cls_doc": "Простое и удобное управление юнитами systemd",
}
strings_de = {
"panel": (
"<emoji document_id=5771858080664915483>🎛</emoji> <b>Hier kannst du deine"
" systemd-Einheiten kontrollieren</b>\n\n{}"
),
"unit_doesnt_exist": (
"<emoji document_id=5312526098750252863>🚫</emoji> <b>Einheit</b>"
" <code>{}</code> <b>existiert nicht!</b>"
),
"args": (
"<emoji document_id=5312526098750252863>🚫</emoji> <b>Keine Argumente"
" angegeben</b>"
),
"unit_added": (
"<emoji document_id=5314250708508220914>✅</emoji> <b>Einheit"
" </b><code>{}</code><b> mit dem Namen </b><code>{}</code><b>"
" hinzugefügt</b>"
),
"unit_removed": (
"<emoji document_id=5314250708508220914>✅</emoji> <b>Einheit"
" </b><code>{}</code><b> entfernt</b>"
),
"unit_action_done": (
"<emoji document_id=5314250708508220914>✅</emoji> <b>Aktion"
" </b><code>{}</code><b> auf Einheit </b><code>{}</code><b> ausgeführt</b>"
),
"unit_control": (
"<emoji document_id=5771858080664915483>🎛</emoji> <b>Interagiere mit"
" Einheit </b><code>{}</code><b> (</b><code>{}</code><b>)</b>\n{}"
" <b>Einheitsstatus: </b><code>{}</code>"
),
"action_not_found": (
"<emoji document_id=5312526098750252863>🚫</emoji> <b>Aktion"
" </b><code>{}</code><b> nicht gefunden</b>"
),
"unit_renamed": (
"<emoji document_id=5314250708508220914>✅</emoji> <b>Einheit"
" </b><code>{}</code><b> umbenannt zu </b><code>{}</code>"
),
"stop_btn": "🍎 Stop",
"start_btn": "🍏 Start",
"restart_btn": "🔄 Neustart",
"logs_btn": "📄 Logs",
"tail_btn": "🚅 Tail",
"back_btn": "🔙 Zurück",
"close_btn": "✖️ Schließen",
"refresh_btn": "🔄 Aktualisieren",
"_cmd_doc_units": "Liste der Einheiten anzeigen",
"_cmd_doc_addunit": "<unit> - Einheit hinzufügen",
"_cmd_doc_nameunit": "<unit> - Einheit umbenennen",
"_cmd_doc_delunit": "<unit> - Einheit entfernen",
"_cmd_doc_unit": "<unit> - Einheit verwalten",
"_cls_doc": "Einfache und bequeme Verwaltung von systemd-Einheiten",
}
strings_hi = {
"panel": (
"<emoji document_id=5771858080664915483>🎛</emoji> <b>यहाँ आप अपने systemd"
" इकाइयों का नियंत्रण कर सकते हैं</b>\n\n{}"
),
"unit_doesnt_exist": (
"<emoji document_id=5312526098750252863>🚫</emoji> <b>इकाई</b>"
" <code>{}</code> <b>अस्तित्व में नहीं है!</b>"
),
"args": (
"<emoji document_id=5312526098750252863>🚫</emoji> <b>कोई तर्क निर्दिष्ट"
" नहीं किया गया</b>"
),
"unit_added": (
"<emoji document_id=5314250708508220914>✅</emoji> <b>इकाई"
" </b><code>{}</code><b> नाम </b><code>{}</code><b> के साथ जोड़ा गया</b>"
),
"unit_removed": (
"<emoji document_id=5314250708508220914>✅</emoji> <b>इकाई"
" </b><code>{}</code><b> हटा दिया गया</b>"
),
"unit_action_done": (
"<emoji document_id=5314250708508220914>✅</emoji> <b>कार्य"
" </b><code>{}</code><b> इकाई </b><code>{}</code><b> पर किया गया</b>"
),
"unit_control": (
"<emoji document_id=5771858080664915483>🎛</emoji> <b>इकाई"
" </b><code>{}</code><b> के साथ इंटरैक्ट करें"
" (</b><code>{}</code><b>)</b>\n{} <b>इकाई स्थिति: </b><code>{}</code>"
),
"action_not_found": (
"<emoji document_id=5312526098750252863>🚫</emoji> <b>कार्य"
" </b><code>{}</code><b> नहीं मिला</b>"
),
"unit_renamed": (
"<emoji document_id=5314250708508220914>✅</emoji> <b>इकाई"
" </b><code>{}</code><b> का नाम बदल दिया गया </b><code>{}</code>"
),
"stop_btn": "🍎 रोकें",
"start_btn": "🍏 शुरू करें",
"restart_btn": "🔄 पुनः शुरू करें",
"logs_btn": "📄 लॉग",
"tail_btn": "🚅 Tail",
"back_btn": "🔙 पीछे जाएँ",
"close_btn": "✖️ बंद करें",
"refresh_btn": "🔄 ताज़ा करें",
"_cmd_doc_units": "इकाइयों की सूची दिखाएँ",
"_cmd_doc_addunit": "<unit> - इकाई जोड़ें",
"_cmd_doc_nameunit": "<unit> - इकाई का नाम बदलें",
"_cmd_doc_delunit": "<unit> - इकाई हटाएँ",
"_cmd_doc_unit": "<unit> - इकाई प्रबंधित करें",
"_cls_doc": "systemd इकाइयों का सरल और सुविधाजनक प्रबंधन",
}
strings_uz = {
"panel": (
"<emoji document_id=5771858080664915483>🎛</emoji> <b>Bu yerda siz sizning"
" systemd birliklaringizni boshqarishingiz mumkin</b>\n\n{}"
),
"unit_doesnt_exist": (
"<emoji document_id=5312526098750252863>🚫</emoji> <b>Birlik</b>"
" <code>{}</code> <b>mavjud emas!</b>"
),
"args": (
"<emoji document_id=5312526098750252863>🚫</emoji> <b>Hech qanday"
" argumentlar ko'rsatilmadi</b>"
),
"unit_added": (
"<emoji document_id=5314250708508220914>✅</emoji> <b>Birlik"
" </b><code>{}</code><b> nomi </b><code>{}</code><b> qo'shildi</b>"
),
"unit_removed": (
"<emoji document_id=5314250708508220914>✅</emoji> <b>Birlik"
" </b><code>{}</code><b> o'chirildi</b>"
),
"unit_action_done": (
"<emoji document_id=5314250708508220914>✅</emoji> <b>Amal"
" </b><code>{}</code><b> birlik </b><code>{}</code><b> uchun bajirildi</b>"
),
"unit_control": (
"<emoji document_id=5771858080664915483>🎛</emoji> <b>Birlik"
" </b><code>{}</code><b> bilan ishlash (</b><code>{}</code><b>)</b>\n{}"
" <b>Birlik holati: </b><code>{}</code>"
),
"action_not_found": (
"<emoji document_id=5312526098750252863>🚫</emoji> <b>Amal"
" </b><code>{}</code><b> topilmadi</b>"
),
"unit_renamed": (
"<emoji document_id=5314250708508220914>✅</emoji> <b>Birlik"
" </b><code>{}</code><b> nomi </b><code>{}</code><b> o'zgartirildi</b>"
),
"stop_btn": "🍎 To'xtatish",
"start_btn": "🍏 Boshlash",
"restart_btn": "🔄 Qayta ishga tushirish",
"logs_btn": "📄 Jurnal",
"tail_btn": "🚅 Tail",
"back_btn": "🔙 Orqaga",
"close_btn": "✖️ Yopish",
"refresh_btn": "🔄 Yangilash",
"_cmd_doc_units": "Birliklar ro'yxatini ko'rsatish",
"_cmd_doc_addunit": "<birlik> - Birlik qo'shish",
"_cmd_doc_nameunit": "<birlik> - Birlik nomini o'zgartirish",
"_cmd_doc_delunit": "<birlik> - Birlikni o'chirish",
"_cmd_doc_unit": "<birlik> - Birlikni boshqarish",
}
def _get_unit_status_text(self, unit: str) -> str:
return (
subprocess.run(
[
"sudo",
"-S",
"systemctl",
"is-active",
unit,
],
check=False,
stdout=subprocess.PIPE,
)
.stdout.decode()
.strip()
)
def _is_running(self, unit: str) -> bool:
return self._get_unit_status_text(unit) == "active"
def _unit_exists(self, unit: str) -> bool:
return (
subprocess.run(
[
"sudo",
"-S",
"systemctl",
"cat",
unit,
],
check=False,
stdout=subprocess.PIPE,
).returncode
== 0
)
async def _manage_unit(self, call: Union[InlineCall, int], unit: dict, action: str):
if action == "start":
subprocess.run(
["sudo", "-S", "systemctl", "start", unit["formal"]], check=True
)
elif action == "stop":
subprocess.run(
["sudo", "-S", "systemctl", "stop", unit["formal"]], check=True
)
elif action == "restart":
subprocess.run(
["sudo", "-S", "systemctl", "restart", unit["formal"]], check=True
)
elif action in {"logs", "tail"}:
logs = (
subprocess.run(
[
"sudo",
"-S",
"journalctl",
"-u",
unit["formal"],
"-n",
"1000",
],
check=True,
stdout=subprocess.PIPE,
)
.stdout.decode()
.strip()
)
hostname = (
subprocess.run(["hostname"], check=True, stdout=subprocess.PIPE)
.stdout.decode()
.strip()
)
logs = logs.replace(f"{hostname} ", "")
logs = logs.replace("[" + str(self._get_unit_pid(unit["formal"])) + "]", "")
if action == "logs":
logs = io.BytesIO(logs.encode())
logs.name = f"{unit['formal']}-logs.txt"
await self._client.send_file(
call.form["chat"] if not isinstance(call, int) else call, logs
)
else:
actual_logs = ""
logs = list(reversed(logs.splitlines()))
while logs:
chunk = f"{logs.pop()}\n"
if len(actual_logs + chunk) >= 4096:
break
actual_logs += chunk
if isinstance(call, int):
await self.inline.form(
f"<code>{utils.escape_html(actual_logs)}</code>",
call,
reply_markup=self._get_unit_markup(unit),
)
return
await call.edit(
f"<code>{utils.escape_html(actual_logs)}</code>",
reply_markup=self._get_unit_markup(unit),
)
await call.answer("Action complete")
return
if isinstance(call, int):
return
await call.answer("Action complete")
await asyncio.sleep(2)
await self._control_service(call, unit)
def _get_unit_markup(self, unit: dict) -> list:
return [
[
{
"text": self.strings("start_btn"),
"callback": self._manage_unit,
"args": (unit, "start"),
},
{
"text": self.strings("stop_btn"),
"callback": self._manage_unit,
"args": (unit, "stop"),
},
{
"text": self.strings("restart_btn"),
"callback": self._manage_unit,
"args": (unit, "restart"),
},
],
[
{
"text": self.strings("logs_btn"),
"callback": self._manage_unit,
"args": (unit, "logs"),
},
{
"text": self.strings("tail_btn"),
"callback": self._manage_unit,
"args": (unit, "tail"),
},
],
[
{
"text": self.strings("refresh_btn"),
"callback": self._control_service,
"args": (unit,),
},
{
"text": self.strings("back_btn"),
"callback": self._control_services,
},
],
]
async def _control_service(self, call: InlineCall, unit: dict):
await call.edit(
self.strings("unit_control").format(
unit["name"],
unit["formal"],
self._get_unit_status_emoji(unit["formal"]),
self._get_unit_status_text(unit["formal"]),
),
reply_markup=self._get_unit_markup(unit),
)
def _get_unit_pid(self, unit: str) -> str:
return (
subprocess.run(
[
"sudo",
"-S",
"systemctl",
"show",
unit,
"--property=MainPID",
"--value",
],
check=False,
stdout=subprocess.PIPE,
)
.stdout.decode()
.strip()
)
def _get_unit_resources_consumption(self, unit: str) -> str:
if not self._is_running(unit):
return ""
pid = self._get_unit_pid(unit)
ram = human_readable_size(
int(
subprocess.run(
[
"ps",
"-p",
pid,
"-o",
"rss",
],
check=False,
stdout=subprocess.PIPE,
)
.stdout.decode()
.strip()
.split("\n")[1]
)
* 1024
)
cpu = (
subprocess.run(
[
"ps",
"-p",
pid,
"-o",
r"%cpu",
],
check=False,
stdout=subprocess.PIPE,
)
.stdout.decode()
.strip()
.split("\n")[1]
+ "%"
)
return f"📟 <code>{ram}</code> | 🗃 <code>{cpu}</code>"
def _get_panel(self):
return self.strings("panel").format(
"\n".join(
[
f"{self._get_unit_status_emoji(unit['formal'])} <b>{unit['name']}</b>"
f" (<code>{unit['formal']}</code>):"
f" {self._get_unit_status_text(unit['formal'])} {self._get_unit_resources_consumption(unit['formal'])}"
for unit in self.get("services", [])
]
)
)
async def _control_services(self, call: InlineCall, refresh: bool = False):
await call.edit(
self._get_panel(),
reply_markup=self._get_services_markup(),
)
if refresh:
await call.answer("Information updated!")
def _get_unit_status_emoji(self, unit: str) -> str:
status = self._get_unit_status_text(unit)
if status == "active":
return "🍏"
elif status == "inactive":
return "🍎"
elif status == "failed":
return "🚫"
elif status == "activating":
return "🔄"
else:
return "❓"
def _get_services_markup(self) -> list:
return utils.chunks(
[
{
"text": (
self._get_unit_status_emoji(service["formal"])
+ " "
+ service["name"]
),
"callback": self._control_service,
"args": (service,),
}
for service in self.get("services", [])
],
2,
) + [
[
{
"text": self.strings("refresh_btn"),
"callback": self._control_services,
"args": (True,),
},
{"text": self.strings("close_btn"), "action": "close"},
]
]
async def unitscmd(self, message: Message):
"""Open control panel"""
form = await self.inline.form(
self._get_panel(),
message,
reply_markup=self._get_services_markup(),
)
async def addunitcmd(self, message: Message):
"""<unit> <name> - Add new unit"""
args = utils.get_args_raw(message)
if not args:
await utils.answer(message, self.strings("args"))
return
try:
unit, name = args.split(maxsplit=1)
except ValueError:
unit = args
name = args
if not self._unit_exists(unit):
await utils.answer(message, self.strings("unit_doesnt_exist").format(unit))
return
self.set(
"services",
self.get("services", []) + [{"name": name, "formal": unit}],
)
await utils.answer(message, self.strings("unit_added").format(unit, name))
async def delunitcmd(self, message: Message):
"""<unit> - Delete unit"""
args = utils.get_args_raw(message)
if not args:
await utils.answer(message, self.strings("args"))
return
if not any(unit["formal"] == args for unit in self.get("services", [])):
await utils.answer(message, self.strings("unit_doesnt_exist").format(args))
return
self.set(
"services",
[
service
for service in self.get("services", [])
if service["formal"] != args
],
)
await utils.answer(message, self.strings("unit_removed").format(args))
async def unitcmd(self, message: Message):
"""<unit> <start|stop|restart|logs|tail> - Perform specific action on unit bypassing main menu"""
args = utils.get_args_raw(message)
if not args or len(args.split()) < 2:
await utils.answer(message, self.strings("args"))
return
unit, action = args.split(maxsplit=1)
if not self._unit_exists(unit):
await utils.answer(message, self.strings("unit_doesnt_exist").format(unit))
return
if action in {"start", "stop", "restart", "logs"}:
await self._manage_unit(
utils.get_chat_id(message),
{"formal": unit, "name": unit},
action,
)
elif action == "tail":
await self._manage_unit(
utils.get_chat_id(message),
{"formal": unit, "name": unit},
"tail",
)
else:
await utils.answer(message, self.strings("action_not_found").format(action))
return
await utils.answer(
message,
self.strings("unit_action_done").format(action, unit),
)
async def nameunitcmd(self, message: Message):
"""<unit> <new_name> - Rename unit"""
args = utils.get_args_raw(message)
if not args or len(args.split()) < 2:
await utils.answer(message, self.strings("args"))
return
unit, name = args.split(maxsplit=1)
if not any(unit_["formal"] == unit for unit_ in self.get("services", [])):
await utils.answer(message, self.strings("unit_doesnt_exist").format(unit))
return
self.set(
"services",
[
service
for service in self.get("services", [])
if service["formal"] != unit
]
+ [{"name": name, "formal": unit}],
)
await utils.answer(message, self.strings("unit_renamed").format(unit, name))