mirror of
https://gitlab.freedesktop.org/upower/power-profiles-daemon.git
synced 2025-12-22 23:00:07 +01:00
Use python argparse and decorators instead of reinventing both
This commit is contained in:
parent
718fde54d2
commit
18a9c66681
2 changed files with 104 additions and 200 deletions
|
|
@ -1,5 +1,6 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import argparse
|
||||
import signal
|
||||
import subprocess
|
||||
import sys
|
||||
|
|
@ -11,94 +12,6 @@ PP_IFACE = "org.freedesktop.UPower.PowerProfiles"
|
|||
PROPERTIES_IFACE = "org.freedesktop.DBus.Properties"
|
||||
|
||||
|
||||
def usage_main():
|
||||
print("Usage:")
|
||||
print(" powerprofilesctl COMMAND [ARGS…]")
|
||||
print("")
|
||||
print("Commands:")
|
||||
print(" help Print help")
|
||||
print(" version Print version")
|
||||
print(" get Print the currently active power profile")
|
||||
print(" set Set the currently active power profile")
|
||||
print(" list List available power profiles")
|
||||
print(" list-holds List current power profile holds")
|
||||
print(" launch Launch a command while holding a power profile")
|
||||
print("")
|
||||
print("Use “powerprofilesctl help COMMAND” to get detailed help.")
|
||||
|
||||
|
||||
def usage_version():
|
||||
print("Usage:")
|
||||
print(" powerprofilesctl version")
|
||||
print("")
|
||||
print("Print version information and exit.")
|
||||
|
||||
|
||||
def usage_get():
|
||||
print("Usage:")
|
||||
print(" powerprofilesctl get")
|
||||
print("")
|
||||
print("Print the currently active power profile.")
|
||||
|
||||
|
||||
def usage_set():
|
||||
print("Usage:")
|
||||
print(" powerprofilesctl set PROFILE")
|
||||
print("")
|
||||
print("Set the currently active power profile. Must be one of the ")
|
||||
print("available profiles.")
|
||||
|
||||
|
||||
def usage_list():
|
||||
print("Usage:")
|
||||
print(" powerprofilesctl list")
|
||||
print("")
|
||||
print("List available power profiles.")
|
||||
|
||||
|
||||
def usage_list_holds():
|
||||
print("Usage:")
|
||||
print(" powerprofilesctl list-holds")
|
||||
print("")
|
||||
print("List current power profile holds.")
|
||||
|
||||
|
||||
def usage_launch():
|
||||
print("Usage:")
|
||||
print(" powerprofilesctl launch [COMMAND…]")
|
||||
print("")
|
||||
print("Launch a command while holding a power profile.")
|
||||
print("")
|
||||
print("Options:")
|
||||
print(" -p, --profile=PROFILE The power profile to hold")
|
||||
print(" -r, --reason=REASON The reason for the profile hold")
|
||||
print(" -i, --appid=APP-ID The application ID for the profile hold")
|
||||
print("")
|
||||
print("Launch the command while holding a power profile, either performance, ")
|
||||
print("or power-saver. By default, the profile hold is for the performance ")
|
||||
print("profile, but it might not be available on all systems. See the list ")
|
||||
print("command for a list of available profiles.")
|
||||
|
||||
|
||||
def usage(_command=None):
|
||||
if not _command:
|
||||
usage_main()
|
||||
elif _command == "get":
|
||||
usage_get()
|
||||
elif _command == "set":
|
||||
usage_set()
|
||||
elif _command == "list":
|
||||
usage_list()
|
||||
elif _command == "list-holds":
|
||||
usage_list_holds()
|
||||
elif _command == "launch":
|
||||
usage_launch()
|
||||
elif _command == "version":
|
||||
usage_version()
|
||||
else:
|
||||
usage_main()
|
||||
|
||||
|
||||
def get_proxy():
|
||||
try:
|
||||
bus = Gio.bus_get_sync(Gio.BusType.SYSTEM, None)
|
||||
|
|
@ -110,22 +23,43 @@ def get_proxy():
|
|||
return proxy
|
||||
|
||||
|
||||
def version():
|
||||
def command(func):
|
||||
def wrapper(*args, **kwargs):
|
||||
try:
|
||||
func(*args, **kwargs)
|
||||
except GLib.Error as error:
|
||||
sys.stderr.write(
|
||||
f"Failed to communicate with power-profiles-daemon: {error}\n"
|
||||
)
|
||||
sys.exit(1)
|
||||
except ValueError as error:
|
||||
sys.stderr.write(f"Error: {error}\n")
|
||||
sys.exit(1)
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
@command
|
||||
def _version(args): # pylint: disable=unused-argument
|
||||
proxy = get_proxy()
|
||||
ver = proxy.Get("(ss)", PP_IFACE, "Version")
|
||||
print(ver)
|
||||
|
||||
|
||||
def _get():
|
||||
@command
|
||||
def _get(args): # pylint: disable=unused-argument
|
||||
proxy = get_proxy()
|
||||
profile = proxy.Get("(ss)", PP_IFACE, "ActiveProfile")
|
||||
print(profile)
|
||||
|
||||
|
||||
def _set(profile):
|
||||
@command
|
||||
def _set(args):
|
||||
try:
|
||||
proxy = get_proxy()
|
||||
proxy.Set("(ssv)", PP_IFACE, "ActiveProfile", GLib.Variant.new_string(profile))
|
||||
proxy.Set(
|
||||
"(ssv)", PP_IFACE, "ActiveProfile", GLib.Variant.new_string(args.profile[0])
|
||||
)
|
||||
except:
|
||||
raise
|
||||
|
||||
|
|
@ -144,7 +78,8 @@ def get_profiles_property(prop):
|
|||
return profiles
|
||||
|
||||
|
||||
def _list():
|
||||
@command
|
||||
def _list(args): # pylint: disable=unused-argument
|
||||
try:
|
||||
profiles = get_profiles_property("Profiles")
|
||||
reason = get_proxy().Get("(ss)", PP_IFACE, "PerformanceDegraded")
|
||||
|
|
@ -169,7 +104,8 @@ def _list():
|
|||
index += 1
|
||||
|
||||
|
||||
def _list_holds():
|
||||
@command
|
||||
def _list_holds(args): # pylint: disable=unused-argument
|
||||
try:
|
||||
holds = get_profiles_property("ActiveProfileHolds")
|
||||
except:
|
||||
|
|
@ -186,7 +122,19 @@ def _list_holds():
|
|||
index += 1
|
||||
|
||||
|
||||
def _launch(args, profile, appid, reason):
|
||||
@command
|
||||
def _launch(args):
|
||||
reason = args.reason
|
||||
profile = args.profile
|
||||
appid = args.appid
|
||||
if not args.arguments:
|
||||
raise ValueError("No command to launch")
|
||||
if not args.appid:
|
||||
appid = args.arguments[0]
|
||||
if not profile:
|
||||
profile = "performance"
|
||||
if not reason:
|
||||
reason = f"Running {args.appid}"
|
||||
ret = 0
|
||||
try:
|
||||
bus = Gio.bus_get_sync(Gio.BusType.SYSTEM, None)
|
||||
|
|
@ -205,7 +153,7 @@ def _launch(args, profile, appid, reason):
|
|||
signal.signal(signal.SIGTERM, receive_signal)
|
||||
|
||||
# print (f'Got {cookie} for {profile} hold')
|
||||
with subprocess.Popen(args) as launched_app:
|
||||
with subprocess.Popen(args.arguments) as launched_app:
|
||||
try:
|
||||
launched_app.wait()
|
||||
ret = launched_app.returncode
|
||||
|
|
@ -216,107 +164,63 @@ def _launch(args, profile, appid, reason):
|
|||
return ret
|
||||
|
||||
|
||||
def main(): # pylint: disable=too-many-branches, disable=too-many-statements
|
||||
args = None
|
||||
if len(sys.argv) == 1:
|
||||
command = "list"
|
||||
elif len(sys.argv) >= 2:
|
||||
command = sys.argv[1]
|
||||
if command == "--help":
|
||||
command = "help"
|
||||
if command == "--version":
|
||||
command = "version"
|
||||
else:
|
||||
args = sys.argv[2:]
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
epilog="Use “powerprofilesctl COMMAND --help” to get detailed help for individual commands",
|
||||
)
|
||||
subparsers = parser.add_subparsers(help="Individual command help", dest="command")
|
||||
parser_list = subparsers.add_parser("list", help="List available power profiles")
|
||||
parser_list.set_defaults(func=_list)
|
||||
parser_list_holds = subparsers.add_parser(
|
||||
"list-holds", help="List current power profile holds"
|
||||
)
|
||||
parser_list_holds.set_defaults(func=_list_holds)
|
||||
parser_get = subparsers.add_parser(
|
||||
"get", help="Print the currently active power profile"
|
||||
)
|
||||
parser_get.set_defaults(func=_get)
|
||||
parser_set = subparsers.add_parser(
|
||||
"set", help="Set the currently active power profile"
|
||||
)
|
||||
parser_set.add_argument(
|
||||
"profile",
|
||||
nargs=1,
|
||||
help="Profile to use for set command",
|
||||
)
|
||||
parser_set.set_defaults(func=_set)
|
||||
parser_launch = subparsers.add_parser(
|
||||
"launch",
|
||||
help="Launch a command while holding a power profile",
|
||||
description="Launch the command while holding a power profile,"
|
||||
"either performance, or power-saver. By default, the profile hold "
|
||||
"is for the performance profile, but it might not be available on "
|
||||
"all systems. See the list command for a list of available profiles.",
|
||||
)
|
||||
parser_launch.add_argument(
|
||||
"arguments",
|
||||
nargs="*",
|
||||
help="Command to launch",
|
||||
)
|
||||
parser_launch.add_argument(
|
||||
"--profile", "-p", required=False, help="Profile to use for launch command"
|
||||
)
|
||||
parser_launch.add_argument(
|
||||
"--reason", "-r", required=False, help="Reason to use for launch command"
|
||||
)
|
||||
parser_launch.add_argument(
|
||||
"--appid", "-i", required=False, help="AppId to use for launch command"
|
||||
)
|
||||
parser_launch.set_defaults(func=_launch)
|
||||
parser_version = subparsers.add_parser(
|
||||
"version", help="Print version information and exit"
|
||||
)
|
||||
parser_version.set_defaults(func=_version)
|
||||
|
||||
if command == "help":
|
||||
if len(args) > 0:
|
||||
usage(args[0])
|
||||
else:
|
||||
usage(None)
|
||||
elif command == "version":
|
||||
version()
|
||||
elif command == "get":
|
||||
try:
|
||||
_get()
|
||||
except GLib.Error as error:
|
||||
sys.stderr.write(
|
||||
f"Failed to communicate with power-profiles-daemon: {format(error)}\n"
|
||||
)
|
||||
sys.exit(1)
|
||||
elif command == "set":
|
||||
if len(args) != 1:
|
||||
usage_set()
|
||||
sys.exit(1)
|
||||
try:
|
||||
_set(args[0])
|
||||
except GLib.Error as error:
|
||||
sys.stderr.write(
|
||||
f"Failed to communicate with power-profiles-daemon: {format(error)}\n"
|
||||
)
|
||||
sys.exit(1)
|
||||
elif command == "list":
|
||||
try:
|
||||
_list()
|
||||
except GLib.Error as error:
|
||||
sys.stderr.write(
|
||||
f"Failed to communicate with power-profiles-daemon: {format(error)}\n"
|
||||
)
|
||||
sys.exit(1)
|
||||
elif command == "list-holds":
|
||||
try:
|
||||
_list_holds()
|
||||
except GLib.Error as error:
|
||||
sys.stderr.write(
|
||||
f"Failed to communicate with power-profiles-daemon: {format(error)}\n"
|
||||
)
|
||||
sys.exit(1)
|
||||
elif command == "launch":
|
||||
if len(args) == 0:
|
||||
sys.exit(0)
|
||||
profile = None
|
||||
reason = None
|
||||
appid = None
|
||||
while True:
|
||||
if args[0] == "--":
|
||||
args = args[1:]
|
||||
break
|
||||
if args[0][:9] == "--profile" or args[0] == "-p":
|
||||
if args[0][:10] == "--profile=":
|
||||
args = args[0].split("=") + args[1:]
|
||||
profile = args[1]
|
||||
args = args[2:]
|
||||
continue
|
||||
if args[0][:8] == "--reason" or args[0] == "-r":
|
||||
if args[0][:9] == "--reason=":
|
||||
args = args[0].split("=") + args[1:]
|
||||
reason = args[1]
|
||||
args = args[2:]
|
||||
continue
|
||||
if args[0][:7] == "--appid" or args[0] == "-i":
|
||||
if args[0][:8] == "--appid=":
|
||||
args = args[0].split("=") + args[1:]
|
||||
appid = args[1]
|
||||
args = args[2:]
|
||||
continue
|
||||
break
|
||||
|
||||
if len(args) < 1:
|
||||
sys.exit(0)
|
||||
if not appid:
|
||||
appid = args[0]
|
||||
if not reason:
|
||||
reason = "Running " + appid
|
||||
if not profile:
|
||||
profile = "performance"
|
||||
try:
|
||||
ret = _launch(args, profile, appid, reason)
|
||||
sys.exit(ret)
|
||||
except GLib.Error as error:
|
||||
sys.stderr.write(
|
||||
f"Failed to communicate with power-profiles-daemon: {format(error)}\n"
|
||||
)
|
||||
sys.exit(1)
|
||||
args = parser.parse_args()
|
||||
# default behavior is to run list if no command is given
|
||||
if not args.command:
|
||||
args.func = _list
|
||||
args.func(args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
|
|||
|
|
@ -1752,8 +1752,8 @@ class Tests(dbusmock.DBusTestCase):
|
|||
self.create_platform_profile()
|
||||
self.start_daemon()
|
||||
|
||||
builddir = os.getenv("top_builddir", ".")
|
||||
tool_path = os.path.join(builddir, "src", "powerprofilesctl")
|
||||
sourcedir = os.getenv("top_srcdir", ".")
|
||||
tool_path = os.path.join(sourcedir, "src", "powerprofilesctl")
|
||||
|
||||
with subprocess.Popen(
|
||||
[tool_path, "launch", "-p", "power-saver", "sleep", "3600"],
|
||||
|
|
@ -1996,8 +1996,8 @@ class Tests(dbusmock.DBusTestCase):
|
|||
def test_powerprofilesctl_error(self):
|
||||
"""Check that powerprofilesctl returns 1 rather than an exception on error"""
|
||||
|
||||
builddir = os.getenv("top_builddir", ".")
|
||||
tool_path = os.path.join(builddir, "src", "powerprofilesctl")
|
||||
sourcedir = os.getenv("top_srcdir", ".")
|
||||
tool_path = os.path.join(sourcedir, "src", "powerprofilesctl")
|
||||
|
||||
with self.assertRaises(subprocess.CalledProcessError) as error:
|
||||
subprocess.check_output(
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue