power-profiles-daemon/tests/integration-test

383 lines
14 KiB
Text
Raw Normal View History

2020-07-16 16:09:56 +02:00
#!/usr/bin/python3
# power-profiles-daemon integration test suite
#
# Run in built tree to test local built binaries, or from anywhere else to test
# system installed binaries.
#
# Copyright: (C) 2011 Martin Pitt <martin.pitt@ubuntu.com>
# (C) 2020 Bastien Nocera <hadess@hadess.net>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
import os
import sys
import dbus
import tempfile
import subprocess
import unittest
import time
try:
import gi
from gi.repository import GLib
from gi.repository import Gio
except ImportError as e:
sys.stderr.write('Skipping tests, PyGobject not available for Python 3, or missing GI typelibs: %s\n' % str(e))
sys.exit(0)
try:
gi.require_version('UMockdev', '1.0')
from gi.repository import UMockdev
except ImportError:
sys.stderr.write('Skipping tests, umockdev not available (https://github.com/martinpitt/umockdev)\n')
sys.exit(0)
try:
import dbusmock
except ImportError:
sys.stderr.write('Skipping tests, python-dbusmock not available (http://pypi.python.org/pypi/python-dbusmock).\n')
sys.exit(0)
PP = 'net.hadess.PowerProfiles'
PP_PATH = '/net/hadess/PowerProfiles'
class Tests(dbusmock.DBusTestCase):
@classmethod
def setUpClass(cls):
# run from local build tree if we are in one, otherwise use system instance
builddir = os.getenv('top_builddir', '.')
if os.access(os.path.join(builddir, 'src', 'power-profiles-daemon'), os.X_OK):
cls.daemon_path = os.path.join(builddir, 'src', 'power-profiles-daemon')
print('Testing binaries from local build tree (%s)' % cls.daemon_path)
cls.local_daemon = True
elif os.environ.get('UNDER_JHBUILD', False):
jhbuild_prefix = os.environ['JHBUILD_PREFIX']
cls.daemon_path = os.path.join(jhbuild_prefix, 'libexec', 'power-profiles-daemon')
print('Testing binaries from JHBuild (%s)' % cls.daemon_path)
cls.local_daemon = False
else:
cls.daemon_path = None
with open('/usr/lib/systemd/system/power-profiles-daemon.service') as f:
for line in f:
if line.startswith('ExecStart='):
cls.daemon_path = line.split('=', 1)[1].strip()
break
assert cls.daemon_path, 'could not determine daemon path from systemd .service file'
cls.local_daemon = False
print('Testing installed system binary (%s)' % cls.daemon_path)
# fail on CRITICALs on client side
GLib.log_set_always_fatal(GLib.LogLevelFlags.LEVEL_WARNING |
GLib.LogLevelFlags.LEVEL_ERROR |
GLib.LogLevelFlags.LEVEL_CRITICAL)
# set up a fake system D-BUS
cls.test_bus = Gio.TestDBus.new(Gio.TestDBusFlags.NONE)
cls.test_bus.up()
try:
del os.environ['DBUS_SESSION_BUS_ADDRESS']
except KeyError:
pass
os.environ['DBUS_SYSTEM_BUS_ADDRESS'] = cls.test_bus.get_bus_address()
cls.dbus = Gio.bus_get_sync(Gio.BusType.SYSTEM, None)
cls.dbus_con = cls.get_dbus(True)
@classmethod
def tearDownClass(cls):
cls.test_bus.down()
dbusmock.DBusTestCase.tearDownClass()
def setUp(self):
'''Set up a local umockdev testbed.
The testbed is initially empty.
'''
self.testbed = UMockdev.Testbed.new()
self.proxy = None
self.log = None
self.daemon = None
def tearDown(self):
del self.testbed
self.stop_daemon()
# on failures, print daemon log
errors = [x[1] for x in self._outcome.errors if x[1]]
if errors and self.log:
with open(self.log.name) as f:
sys.stderr.write('\n-------------- daemon log: ----------------\n')
sys.stderr.write(f.read())
sys.stderr.write('------------------------------\n')
#
# Daemon control and D-BUS I/O
#
def start_daemon(self):
'''Start daemon and create DBus proxy.
When done, this sets self.proxy as the Gio.DBusProxy for power-profiles-daemon.
'''
env = os.environ.copy()
env['G_DEBUG'] = 'fatal-criticals'
env['G_MESSAGES_DEBUG'] = 'all'
# note: Python doesn't propagate the setenv from Testbed.new(), so we
# have to do that ourselves
env['UMOCKDEV_DIR'] = self.testbed.get_root_dir()
self.log = tempfile.NamedTemporaryFile()
if os.getenv('VALGRIND') != None:
if self.local_daemon:
daemon_path = ['libtool', '--mode=execute', 'valgrind', self.daemon_path, '-v']
else:
daemon_path = ['valgrind', self.daemon_path, '-v']
else:
daemon_path = [self.daemon_path, '-v']
self.daemon = subprocess.Popen(daemon_path,
env=env, stdout=self.log,
stderr=subprocess.STDOUT)
# wait until the daemon gets online
timeout = 100
while timeout > 0:
time.sleep(0.1)
timeout -= 1
try:
self.get_dbus_property('ActiveProfile')
break
except GLib.GError:
pass
else:
self.fail('daemon did not start in 10 seconds')
self.proxy = Gio.DBusProxy.new_sync(
self.dbus, Gio.DBusProxyFlags.DO_NOT_AUTO_START, None, PP,
PP_PATH, PP, None)
self.assertEqual(self.daemon.poll(), None, 'daemon crashed')
def stop_daemon(self):
'''Stop the daemon if it is running.'''
if self.daemon:
try:
self.daemon.kill()
except OSError:
pass
self.daemon.wait()
self.daemon = None
self.proxy = None
def get_dbus_property(self, name):
'''Get property value from daemon D-Bus interface.'''
proxy = Gio.DBusProxy.new_sync(
self.dbus, Gio.DBusProxyFlags.DO_NOT_AUTO_START, None, PP,
PP_PATH, 'org.freedesktop.DBus.Properties', None)
return proxy.Get('(ss)', PP, name)
def set_dbus_property(self, name, value):
'''Set property value on daemon D-Bus interface.'''
proxy = Gio.DBusProxy.new_sync(
self.dbus, Gio.DBusProxyFlags.DO_NOT_AUTO_START, None, PP,
PP_PATH, 'org.freedesktop.DBus.Properties', None)
return proxy.Set('(ssv)', PP, name, value)
def have_text_in_log(self, text):
return self.count_text_in_log(text) > 0
def count_text_in_log(self, text):
with open(self.log.name) as f:
return f.read().count(text)
def read_sysfs_attr(self, device, attribute):
with open(os.path.join(self.testbed.get_root_dir() + device, attribute), 'rb') as f:
return f.read()
return None
2020-07-16 16:09:56 +02:00
def assertEventually(self, condition, message=None, timeout=50):
'''Assert that condition function eventually returns True.
Timeout is in deciseconds, defaulting to 50 (5 seconds). message is
printed on failure.
'''
while timeout >= 0:
context = GLib.MainContext.default()
while context.iteration(False):
pass
if condition():
break
timeout -= 1
time.sleep(0.1)
else:
self.fail(message or 'timed out waiting for ' + str(condition))
#
# Actual test cases
#
def test_no_performance_driver(self):
'''no performance driver'''
self.start_daemon()
self.assertEqual(self.get_dbus_property('ActiveProfile'), 'balanced')
self.assertEqual(self.get_dbus_property('SelectedProfile'), 'balanced')
self.assertEqual(self.get_dbus_property('PerformanceInhibited'), '')
2020-07-16 16:09:56 +02:00
profiles = self.get_dbus_property('Profiles')
self.assertEqual(len(profiles), 2)
self.assertEqual(profiles[1]['Driver'], 'balanced')
self.assertEqual(profiles[0]['Driver'], 'power-saver')
self.assertEqual(profiles[1]['Profile'], 'balanced')
self.assertEqual(profiles[0]['Profile'], 'power-saver')
2020-07-16 16:09:56 +02:00
self.set_dbus_property('SelectedProfile', GLib.Variant.new_string('power-saver'))
self.assertEqual(self.get_dbus_property('ActiveProfile'), 'power-saver')
self.assertEqual(self.get_dbus_property('SelectedProfile'), 'power-saver')
# process = subprocess.Popen(['gdbus', 'introspect', '--system', '--dest', 'net.hadess.PowerProfiles', '--object-path', '/net/hadess/PowerProfiles'])
# print (self.get_dbus_property('GPUs'))
self.stop_daemon()
def test_inhibited_transition(self):
'''Test that transitions work as expected when inhibited'''
tp_acpi = self.testbed.add_device('platform', 'thinkpad_acpi', None,
['dytc_lapmode', '0'],
[ 'DEVPATH', '/devices/platform/thinkpad_acpi' ]
)
self.start_daemon()
profiles = self.get_dbus_property('Profiles')
self.assertEqual(len(profiles), 3)
self.assertEqual(self.get_dbus_property('ActiveProfile'), 'balanced')
self.assertEqual(self.get_dbus_property('SelectedProfile'), 'balanced')
self.set_dbus_property('SelectedProfile', GLib.Variant.new_string('performance'))
self.assertEqual(self.get_dbus_property('ActiveProfile'), 'performance')
self.assertEqual(self.get_dbus_property('SelectedProfile'), 'performance')
# Inhibit
self.testbed.set_attribute(tp_acpi, 'dytc_lapmode', '1')
self.testbed.uevent(tp_acpi, 'change')
self.assertEventually(lambda: self.have_text_in_log('dytc_lapmode is now on'))
self.assertEqual(self.get_dbus_property('PerformanceInhibited'), 'lap-detected')
self.assertEqual(self.get_dbus_property('ActiveProfile'), 'balanced')
self.assertEqual(self.get_dbus_property('SelectedProfile'), 'performance')
# Switch to non-performance
self.set_dbus_property('SelectedProfile', GLib.Variant.new_string('power-saver'))
self.assertEqual(self.get_dbus_property('ActiveProfile'), 'power-saver')
self.assertEqual(self.get_dbus_property('SelectedProfile'), 'power-saver')
2020-07-16 16:09:56 +02:00
def test_dytc_performance_driver(self):
'''Lenovo DYTC performance driver'''
tp_acpi = self.testbed.add_device('platform', 'thinkpad_acpi', None,
['dytc_lapmode', '0'],
[ 'DEVPATH', '/devices/platform/thinkpad_acpi' ]
)
self.start_daemon()
profiles = self.get_dbus_property('Profiles')
self.assertEqual(len(profiles), 3)
self.assertEqual(profiles[0]['Driver'], 'lenovo_dytc')
self.assertEqual(profiles[0]['Profile'], 'power-saver')
2020-07-16 16:09:56 +02:00
self.assertEqual(profiles[2]['Driver'], 'lenovo_dytc')
self.assertEqual(profiles[2]['Profile'], 'performance')
self.assertEqual(self.get_dbus_property('ActiveProfile'), 'balanced')
self.assertEqual(self.get_dbus_property('SelectedProfile'), 'balanced')
# lapmode detected, but performance wasn't selected anyway
self.testbed.set_attribute(tp_acpi, 'dytc_lapmode', '1')
self.testbed.uevent(tp_acpi, 'change')
self.assertEventually(lambda: self.have_text_in_log('dytc_lapmode is now on'))
self.assertEqual(self.get_dbus_property('ActiveProfile'), 'balanced')
self.assertEqual(self.get_dbus_property('SelectedProfile'), 'balanced')
self.assertEqual(self.get_dbus_property('PerformanceInhibited'), 'lap-detected')
2020-07-16 16:09:56 +02:00
# Reset lapmode
self.testbed.set_attribute(tp_acpi, 'dytc_lapmode', '0')
self.testbed.uevent(tp_acpi, 'change')
self.assertEventually(lambda: self.have_text_in_log('dytc_lapmode is now off'))
# Set performance mode
self.set_dbus_property('SelectedProfile', GLib.Variant.new_string('performance'))
self.assertEqual(self.get_dbus_property('ActiveProfile'), 'performance')
self.assertEqual(self.get_dbus_property('SelectedProfile'), 'performance')
# And turn on lapmode
self.testbed.set_attribute(tp_acpi, 'dytc_lapmode', '1')
self.testbed.uevent(tp_acpi, 'change')
self.assertEventually(lambda: self.have_text_in_log('dytc_lapmode is now on'))
self.assertEqual(self.get_dbus_property('ActiveProfile'), 'balanced')
self.assertEqual(self.get_dbus_property('SelectedProfile'), 'performance')
self.assertEqual(self.get_dbus_property('PerformanceInhibited'), 'lap-detected')
2020-07-16 16:09:56 +02:00
def test_trickle_charge_mode(self):
'''Trickle power_supply charge type'''
idevice = self.testbed.add_device('usb', 'iDevice', None,
[],
[ 'ID_MODEL', 'iDevice', 'DRIVER', 'apple-mfi-fastcharge' ]
)
fastcharge = self.testbed.add_device('power_supply', 'MFi Fastcharge', idevice,
[ 'charge_type', 'Trickle', 'scope', 'Device' ],
[]
)
self.start_daemon()
self.assertIn('trickle_charge', self.get_dbus_property('Actions'))
# Verify that charge-type got changed to Fast on startup
self.assertEqual(self.read_sysfs_attr(fastcharge, 'charge_type'), b'Fast')
# Verify that charge-type got changed to Trickle when power saving
self.set_dbus_property('SelectedProfile', GLib.Variant.new_string('power-saver'))
self.assertEqual(self.read_sysfs_attr(fastcharge, 'charge_type'), b'Trickle')
# FIXME no performance mode
# Verify that charge-type got changed to Fast in a non-default, non-power save mode
# self.set_dbus_property('SelectedProfile', GLib.Variant.new_string('performance'))
# self.assertEqual(self.read_sysfs_attr(fastcharge, 'charge_type'), 'Fast')
2020-07-16 16:09:56 +02:00
#
# Helper methods
#
@classmethod
def _props_to_str(cls, properties):
'''Convert a properties dictionary to uevent text representation.'''
prop_str = ''
if properties:
for k, v in properties.items():
prop_str += '%s=%s\n' % (k, v)
return prop_str
if __name__ == '__main__':
# run ourselves under umockdev
if 'umockdev' not in os.environ.get('LD_PRELOAD', ''):
os.execvp('umockdev-wrapper', ['umockdev-wrapper'] + sys.argv)
unittest.main()