power-profiles-daemon/tests/integration-test
Bastien Nocera 97f449a497 main: Add "Actions" property
That allows API users to programmatically check whether a particular
functionality is implemented in the daemon, eg. if we wanted to move
GPU performance modes from gamemode to this daemon, gamemode could
query whether that feature was implemented and available in the backend.

Closes: #12
2020-08-12 11:31:34 +02:00

381 lines
14 KiB
Python
Executable file

#!/usr/bin/python3
# power-profiles-daemon integration test suite
#
# Run in built tree to test local built binaries, or from anywhere else to test
# system installed binaries.
#
# Copyright: (C) 2011 Martin Pitt <martin.pitt@ubuntu.com>
# (C) 2020 Bastien Nocera <hadess@hadess.net>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
import os
import sys
import dbus
import tempfile
import subprocess
import unittest
import time
try:
import gi
from gi.repository import GLib
from gi.repository import Gio
except ImportError as e:
sys.stderr.write('Skipping tests, PyGobject not available for Python 3, or missing GI typelibs: %s\n' % str(e))
sys.exit(0)
try:
gi.require_version('UMockdev', '1.0')
from gi.repository import UMockdev
except ImportError:
sys.stderr.write('Skipping tests, umockdev not available (https://github.com/martinpitt/umockdev)\n')
sys.exit(0)
try:
import dbusmock
except ImportError:
sys.stderr.write('Skipping tests, python-dbusmock not available (http://pypi.python.org/pypi/python-dbusmock).\n')
sys.exit(0)
PP = 'net.hadess.PowerProfiles'
PP_PATH = '/net/hadess/PowerProfiles'
class Tests(dbusmock.DBusTestCase):
@classmethod
def setUpClass(cls):
# run from local build tree if we are in one, otherwise use system instance
builddir = os.getenv('top_builddir', '.')
if os.access(os.path.join(builddir, 'src', 'power-profiles-daemon'), os.X_OK):
cls.daemon_path = os.path.join(builddir, 'src', 'power-profiles-daemon')
print('Testing binaries from local build tree (%s)' % cls.daemon_path)
cls.local_daemon = True
elif os.environ.get('UNDER_JHBUILD', False):
jhbuild_prefix = os.environ['JHBUILD_PREFIX']
cls.daemon_path = os.path.join(jhbuild_prefix, 'libexec', 'power-profiles-daemon')
print('Testing binaries from JHBuild (%s)' % cls.daemon_path)
cls.local_daemon = False
else:
cls.daemon_path = None
with open('/usr/lib/systemd/system/power-profiles-daemon.service') as f:
for line in f:
if line.startswith('ExecStart='):
cls.daemon_path = line.split('=', 1)[1].strip()
break
assert cls.daemon_path, 'could not determine daemon path from systemd .service file'
cls.local_daemon = False
print('Testing installed system binary (%s)' % cls.daemon_path)
# fail on CRITICALs on client side
GLib.log_set_always_fatal(GLib.LogLevelFlags.LEVEL_WARNING |
GLib.LogLevelFlags.LEVEL_ERROR |
GLib.LogLevelFlags.LEVEL_CRITICAL)
# set up a fake system D-BUS
cls.test_bus = Gio.TestDBus.new(Gio.TestDBusFlags.NONE)
cls.test_bus.up()
try:
del os.environ['DBUS_SESSION_BUS_ADDRESS']
except KeyError:
pass
os.environ['DBUS_SYSTEM_BUS_ADDRESS'] = cls.test_bus.get_bus_address()
cls.dbus = Gio.bus_get_sync(Gio.BusType.SYSTEM, None)
cls.dbus_con = cls.get_dbus(True)
@classmethod
def tearDownClass(cls):
cls.test_bus.down()
dbusmock.DBusTestCase.tearDownClass()
def setUp(self):
'''Set up a local umockdev testbed.
The testbed is initially empty.
'''
self.testbed = UMockdev.Testbed.new()
self.proxy = None
self.log = None
self.daemon = None
def tearDown(self):
del self.testbed
self.stop_daemon()
# on failures, print daemon log
errors = [x[1] for x in self._outcome.errors if x[1]]
if errors and self.log:
with open(self.log.name) as f:
sys.stderr.write('\n-------------- daemon log: ----------------\n')
sys.stderr.write(f.read())
sys.stderr.write('------------------------------\n')
#
# Daemon control and D-BUS I/O
#
def start_daemon(self):
'''Start daemon and create DBus proxy.
When done, this sets self.proxy as the Gio.DBusProxy for power-profiles-daemon.
'''
env = os.environ.copy()
env['G_DEBUG'] = 'fatal-criticals'
env['G_MESSAGES_DEBUG'] = 'all'
# note: Python doesn't propagate the setenv from Testbed.new(), so we
# have to do that ourselves
env['UMOCKDEV_DIR'] = self.testbed.get_root_dir()
self.log = tempfile.NamedTemporaryFile()
if os.getenv('VALGRIND') != None:
if self.local_daemon:
daemon_path = ['libtool', '--mode=execute', 'valgrind', self.daemon_path, '-v']
else:
daemon_path = ['valgrind', self.daemon_path, '-v']
else:
daemon_path = [self.daemon_path, '-v']
self.daemon = subprocess.Popen(daemon_path,
env=env, stdout=self.log,
stderr=subprocess.STDOUT)
# wait until the daemon gets online
timeout = 100
while timeout > 0:
time.sleep(0.1)
timeout -= 1
try:
self.get_dbus_property('ActiveProfile')
break
except GLib.GError:
pass
else:
self.fail('daemon did not start in 10 seconds')
self.proxy = Gio.DBusProxy.new_sync(
self.dbus, Gio.DBusProxyFlags.DO_NOT_AUTO_START, None, PP,
PP_PATH, PP, None)
self.assertEqual(self.daemon.poll(), None, 'daemon crashed')
def stop_daemon(self):
'''Stop the daemon if it is running.'''
if self.daemon:
try:
self.daemon.kill()
except OSError:
pass
self.daemon.wait()
self.daemon = None
self.proxy = None
def get_dbus_property(self, name):
'''Get property value from daemon D-Bus interface.'''
proxy = Gio.DBusProxy.new_sync(
self.dbus, Gio.DBusProxyFlags.DO_NOT_AUTO_START, None, PP,
PP_PATH, 'org.freedesktop.DBus.Properties', None)
return proxy.Get('(ss)', PP, name)
def set_dbus_property(self, name, value):
'''Set property value on daemon D-Bus interface.'''
proxy = Gio.DBusProxy.new_sync(
self.dbus, Gio.DBusProxyFlags.DO_NOT_AUTO_START, None, PP,
PP_PATH, 'org.freedesktop.DBus.Properties', None)
return proxy.Set('(ssv)', PP, name, value)
def have_text_in_log(self, text):
return self.count_text_in_log(text) > 0
def count_text_in_log(self, text):
with open(self.log.name) as f:
return f.read().count(text)
def read_sysfs_attr(self, device, attribute):
with open(os.path.join(self.testbed.get_root_dir() + device, attribute), 'rb') as f:
return f.read()
return None
def assertEventually(self, condition, message=None, timeout=50):
'''Assert that condition function eventually returns True.
Timeout is in deciseconds, defaulting to 50 (5 seconds). message is
printed on failure.
'''
while timeout >= 0:
context = GLib.MainContext.default()
while context.iteration(False):
pass
if condition():
break
timeout -= 1
time.sleep(0.1)
else:
self.fail(message or 'timed out waiting for ' + str(condition))
#
# Actual test cases
#
def test_no_performance_driver(self):
'''no performance driver'''
self.start_daemon()
self.assertEqual(self.get_dbus_property('ActiveProfile'), 'balanced')
self.assertEqual(self.get_dbus_property('SelectedProfile'), 'balanced')
self.assertEqual(self.get_dbus_property('PerformanceInhibited'), '')
profiles = self.get_dbus_property('Profiles')
self.assertEqual(len(profiles), 2)
self.assertEqual(profiles[1]['Driver'], 'balanced')
self.assertEqual(profiles[0]['Driver'], 'power-saver')
self.assertEqual(profiles[1]['Profile'], 'balanced')
self.assertEqual(profiles[0]['Profile'], 'power-saver')
self.set_dbus_property('SelectedProfile', GLib.Variant.new_string('power-saver'))
self.assertEqual(self.get_dbus_property('ActiveProfile'), 'power-saver')
self.assertEqual(self.get_dbus_property('SelectedProfile'), 'power-saver')
# process = subprocess.Popen(['gdbus', 'introspect', '--system', '--dest', 'net.hadess.PowerProfiles', '--object-path', '/net/hadess/PowerProfiles'])
# print (self.get_dbus_property('GPUs'))
self.stop_daemon()
def test_inhibited_transition(self):
'''Test that transitions work as expected when inhibited'''
tp_acpi = self.testbed.add_device('platform', 'thinkpad_acpi', None,
['dytc_lapmode', '0'],
[ 'DEVPATH', '/devices/platform/thinkpad_acpi' ]
)
self.start_daemon()
profiles = self.get_dbus_property('Profiles')
self.assertEqual(len(profiles), 3)
self.assertEqual(self.get_dbus_property('ActiveProfile'), 'balanced')
self.assertEqual(self.get_dbus_property('SelectedProfile'), 'balanced')
self.set_dbus_property('SelectedProfile', GLib.Variant.new_string('performance'))
self.assertEqual(self.get_dbus_property('ActiveProfile'), 'performance')
self.assertEqual(self.get_dbus_property('SelectedProfile'), 'performance')
# Inhibit
self.testbed.set_attribute(tp_acpi, 'dytc_lapmode', '1')
self.testbed.uevent(tp_acpi, 'change')
self.assertEventually(lambda: self.have_text_in_log('dytc_lapmode is now on'))
self.assertEqual(self.get_dbus_property('PerformanceInhibited'), 'lap-detected')
self.assertEqual(self.get_dbus_property('ActiveProfile'), 'balanced')
self.assertEqual(self.get_dbus_property('SelectedProfile'), 'performance')
# Switch to non-performance
self.set_dbus_property('SelectedProfile', GLib.Variant.new_string('power-saver'))
self.assertEqual(self.get_dbus_property('ActiveProfile'), 'power-saver')
self.assertEqual(self.get_dbus_property('SelectedProfile'), 'power-saver')
def test_dytc_performance_driver(self):
'''Lenovo DYTC performance driver'''
tp_acpi = self.testbed.add_device('platform', 'thinkpad_acpi', None,
['dytc_lapmode', '0'],
[ 'DEVPATH', '/devices/platform/thinkpad_acpi' ]
)
self.start_daemon()
profiles = self.get_dbus_property('Profiles')
self.assertEqual(len(profiles), 3)
self.assertEqual(profiles[0]['Driver'], 'lenovo_dytc')
self.assertEqual(profiles[0]['Profile'], 'power-saver')
self.assertEqual(profiles[2]['Driver'], 'lenovo_dytc')
self.assertEqual(profiles[2]['Profile'], 'performance')
self.assertEqual(self.get_dbus_property('ActiveProfile'), 'balanced')
self.assertEqual(self.get_dbus_property('SelectedProfile'), 'balanced')
# lapmode detected, but performance wasn't selected anyway
self.testbed.set_attribute(tp_acpi, 'dytc_lapmode', '1')
self.testbed.uevent(tp_acpi, 'change')
self.assertEventually(lambda: self.have_text_in_log('dytc_lapmode is now on'))
self.assertEqual(self.get_dbus_property('ActiveProfile'), 'balanced')
self.assertEqual(self.get_dbus_property('SelectedProfile'), 'balanced')
# Reset lapmode
self.testbed.set_attribute(tp_acpi, 'dytc_lapmode', '0')
self.testbed.uevent(tp_acpi, 'change')
self.assertEventually(lambda: self.have_text_in_log('dytc_lapmode is now off'))
# Set performance mode
self.set_dbus_property('SelectedProfile', GLib.Variant.new_string('performance'))
self.assertEqual(self.get_dbus_property('ActiveProfile'), 'performance')
self.assertEqual(self.get_dbus_property('SelectedProfile'), 'performance')
# And turn on lapmode
self.testbed.set_attribute(tp_acpi, 'dytc_lapmode', '1')
self.testbed.uevent(tp_acpi, 'change')
self.assertEventually(lambda: self.have_text_in_log('dytc_lapmode is now on'))
self.assertEqual(self.get_dbus_property('ActiveProfile'), 'balanced')
self.assertEqual(self.get_dbus_property('SelectedProfile'), 'performance')
self.assertEqual(self.get_dbus_property('PerformanceInhibited'), 'lap-detected')
def test_trickle_charge_mode(self):
'''Trickle power_supply charge type'''
idevice = self.testbed.add_device('usb', 'iDevice', None,
[],
[ 'ID_MODEL', 'iDevice', 'DRIVER', 'apple-mfi-fastcharge' ]
)
fastcharge = self.testbed.add_device('power_supply', 'MFi Fastcharge', idevice,
[ 'charge_type', 'Trickle', 'scope', 'Device' ],
[]
)
self.start_daemon()
self.assertIn('trickle_charge', self.get_dbus_property('Actions'))
# Verify that charge-type got changed to Fast on startup
self.assertEqual(self.read_sysfs_attr(fastcharge, 'charge_type'), b'Fast')
# Verify that charge-type got changed to Trickle when power saving
self.set_dbus_property('SelectedProfile', GLib.Variant.new_string('power-saver'))
self.assertEqual(self.read_sysfs_attr(fastcharge, 'charge_type'), b'Trickle')
# FIXME no performance mode
# Verify that charge-type got changed to Fast in a non-default, non-power save mode
# self.set_dbus_property('SelectedProfile', GLib.Variant.new_string('performance'))
# self.assertEqual(self.read_sysfs_attr(fastcharge, 'charge_type'), 'Fast')
#
# Helper methods
#
@classmethod
def _props_to_str(cls, properties):
'''Convert a properties dictionary to uevent text representation.'''
prop_str = ''
if properties:
for k, v in properties.items():
prop_str += '%s=%s\n' % (k, v)
return prop_str
if __name__ == '__main__':
# run ourselves under umockdev
if 'umockdev' not in os.environ.get('LD_PRELOAD', ''):
os.execvp('umockdev-wrapper', ['umockdev-wrapper'] + sys.argv)
unittest.main()