mesa/src/vulkan/screenshot-layer/mesa-screenshot-control.py

201 lines
5.7 KiB
Python
Raw Normal View History

vulkan/screenshot-layer: Add Vulkan screenshot layer This change adds a Vulkan screenshot layer that allows users to take screenshots from a Vulkan application, but has an emphasis on performance, decreasing the performance impact on the application involved. This allows for automated setups to use this layer to take screenshots for navigating various in-application menus. This layer works by hooking into various common Vulkan setup functions, until it enters the vkQueuePresentKHR function, and from there it copies the current frame's image from the swapchain as an RGB image to host-cached memory, where we will receive the information as a framebuffer pointer. From there, we copy the framebuffer contents to a thread that will detach from the main process so it can write the image to a PNG file without holding back the main thread. This layer was created from using the existing overlay layer as a template, then adding portions of LunarG's VulkanTools screenshot layer: https://github.com/LunarG/VulkanTools/blob/main/layersvt/screenshot.cpp More specifically, there were usages of functions, along with modifications of various functions from screenshot.cpp in the VulkanTools project, used in screenshot.cpp. There are some sections of the screenshotting functionality that remain unmodified from the original screenshot.cpp file in VulkanTools, including the global locking structures and the writeFile() function, which takes care of obtaining the images from the swapchain. There were various areas in which modifications were made, including how images are written to a file (using PNG instead of PPM, introducing threading, added fences/semaphores, etc), along with many smaller changes. v2: Fix segfault upon application exit v3: Fix filename issue with concatenation, along with some leftover memory handling that wasn't cleaned up. v4: Fix some error handling and nits v5: Fix output directory handling Reviewed-by: Ivan Briano <ivan.briano@intel.com Signed-off-by: Casey Bowman <casey.g.bowman@intel.com> Part-of: <https://gitlab.freedesktop.org/mesa/mesa/-/merge_requests/30527>
2024-05-29 23:39:45 -07:00
#!/usr/bin/env python3
import socket
import sys
import select
from select import EPOLLIN, EPOLLPRI, EPOLLERR
import time
import argparse
TIMEOUT = 1.0 # seconds
VERSION_HEADER = bytearray('MesaScreenshotControlVersion', 'utf-8')
DEVICE_NAME_HEADER = bytearray('DeviceName', 'utf-8')
MESA_VERSION_HEADER = bytearray('MesaVersion', 'utf-8')
DEFAULT_SERVER_ADDRESS = "\0mesa_screenshot"
class Connection:
def __init__(self, path):
# Create a Unix Domain socket and connect
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
sock.connect(path)
except socket.error as msg:
print(msg)
sys.exit(1)
self.sock = sock
# initialize poll interface and register socket
epoll = select.epoll()
epoll.register(sock, EPOLLIN | EPOLLPRI | EPOLLERR)
self.epoll = epoll
def recv(self, timeout):
'''
timeout as float in seconds
returns:
- None on error or disconnection
- bytes() (empty) on timeout
'''
events = self.epoll.poll(timeout)
for ev in events:
(fd, event) = ev
if fd != self.sock.fileno():
continue
# check for socket error
if event & EPOLLERR:
return None
# EPOLLIN or EPOLLPRI, just read the message
msg = self.sock.recv(4096)
# socket disconnected
if len(msg) == 0:
return None
return msg
return bytes()
def send(self, msg):
self.sock.send(msg)
class MsgParser:
MSGBEGIN = bytes(':', 'utf-8')[0]
MSGEND = bytes(';', 'utf-8')[0]
MSGSEP = bytes('=', 'utf-8')[0]
def __init__(self, conn):
self.cmdpos = 0
self.parampos = 0
self.bufferpos = 0
self.reading_cmd = False
self.reading_param = False
self.buffer = None
self.cmd = bytearray(4096)
self.param = bytearray(4096)
self.conn = conn
def readCmd(self, ncmds, timeout=TIMEOUT):
'''
returns:
- None on error or disconnection
- bytes() (empty) on timeout
'''
parsed = []
remaining = timeout
while remaining > 0 and ncmds > 0:
now = time.monotonic()
if self.buffer is None:
self.buffer = self.conn.recv(remaining)
self.bufferpos = 0
# disconnected or error
if self.buffer is None:
return None
for i in range(self.bufferpos, len(self.buffer)):
c = self.buffer[i]
self.bufferpos += 1
if c == self.MSGBEGIN:
self.cmdpos = 0
self.parampos = 0
self.reading_cmd = True
self.reading_param = False
elif c == self.MSGEND:
if not self.reading_cmd:
continue
self.reading_cmd = False
self.reading_param = False
cmd = self.cmd[0:self.cmdpos]
param = self.param[0:self.parampos]
self.reading_cmd = False
self.reading_param = False
parsed.append((cmd, param))
ncmds -= 1
if ncmds == 0:
break
elif c == self.MSGSEP:
if self.reading_cmd:
self.reading_param = True
else:
if self.reading_param:
self.param[self.parampos] = c
self.parampos += 1
elif self.reading_cmd:
self.cmd[self.cmdpos] = c
self.cmdpos += 1
# if we read the entire buffer and didn't finish the command,
# throw it away
self.buffer = None
# check if we have time for another iteration
elapsed = time.monotonic() - now
remaining = max(0, remaining - elapsed)
# timeout
return parsed
def control(args):
if args.socket:
address = '\0' + args.socket
else:
address = DEFAULT_SERVER_ADDRESS
conn = Connection(address)
msgparser = MsgParser(conn)
version = None
name = None
mesa_version = None
msgs = msgparser.readCmd(3)
for m in msgs:
cmd, param = m
if cmd == VERSION_HEADER:
version = int(param)
elif cmd == DEVICE_NAME_HEADER:
name = param.decode('utf-8')
elif cmd == MESA_VERSION_HEADER:
mesa_version = param.decode('utf-8')
if version != 1 or name is None or mesa_version is None:
print('ERROR: invalid protocol')
sys.exit(1)
if args.info:
print(f"Protocol Version: {version}")
print(f"Device Name: {name}")
print(f"Mesa Version: {mesa_version}")
if args.cmd == 'capture':
if args.filename is None:
args.filename = ''
msg = ':capture=' + args.filename + ';'
conn.send(bytearray(msg, 'utf-8'))
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='MESA_screenshot control client')
parser.add_argument('--info', action='store_true', help='Print info from socket')
parser.add_argument('--socket', '-s', type=str, help='Path to socket')
commands = parser.add_subparsers(help='commands to run', dest='cmd')
commands_parser = commands.add_parser('capture', help='capture [filename.png]')
commands_parser.add_argument('filename', nargs='?', type=str)
args = parser.parse_args()
control(args)