mirror of
https://gitlab.freedesktop.org/pipewire/wireplumber.git
synced 2026-05-08 08:08:03 +02:00
policy-node.lua: First round of cleanup
- remove scheduling rescan via core APIs instead use the event stack for proper scheduling. - The rescan & handlelinkable hooks will push new findTargetSiAndLink events one per session item. - Register a new "after-events-with-event" hook for findTargetSiAndLink event, it runs with lower priority than the rescan hook and so rescan hook can cancel unneeded findTargetSiAndLink events. - Register hooks for move and follow properties.
This commit is contained in:
parent
270dc5c041
commit
3832e14c1c
1 changed files with 240 additions and 211 deletions
|
|
@ -25,30 +25,171 @@ self.scanning = false
|
|||
self.pending_rescan = false
|
||||
self.events_skipped = false
|
||||
self.pending_error_timer = nil
|
||||
find_target_events = {}
|
||||
|
||||
function findTargetSiAndLink (si)
|
||||
|
||||
si_props = si.properties
|
||||
|
||||
Log.info (si, string.format ("handling item: %s (%s)",
|
||||
tostring (si_props ["node.name"]), tostring (si_props ["node.id"])))
|
||||
|
||||
ensureSiFlags (si)
|
||||
|
||||
-- get other important node properties
|
||||
local reconnect = not parseBool (si_props ["node.dont-reconnect"])
|
||||
local exclusive = parseBool (si_props ["node.exclusive"])
|
||||
local si_must_passthrough = parseBool (si_props ["item.node.encoded-only"])
|
||||
|
||||
-- find defined target
|
||||
local si_target, has_defined_target, has_node_defined_target = findDefinedTarget (si_props)
|
||||
local can_passthrough = si_target and canPassthrough (si, si_target)
|
||||
|
||||
if si_target and si_must_passthrough and not can_passthrough then
|
||||
si_target = nil
|
||||
end
|
||||
|
||||
-- if the client has seen a target that we haven't yet prepared, schedule
|
||||
-- a rescan one more time and hope for the best
|
||||
local si_id = si.id
|
||||
if has_defined_target
|
||||
and not si_target
|
||||
and not si_flags [si_id].was_handled
|
||||
and not si_flags [si_id].done_waiting then
|
||||
Log.info (si, "... waiting for target")
|
||||
si_flags [si_id].done_waiting = true
|
||||
-- Event-Stack TBD: do we need to retain this call here?
|
||||
rescan ()
|
||||
return
|
||||
end
|
||||
|
||||
-- find fallback target
|
||||
if not si_target and (reconnect or not has_defined_target) then
|
||||
si_target, can_passthrough = findUndefinedTarget (si)
|
||||
end
|
||||
|
||||
-- Check if item is linked to proper target, otherwise re-link
|
||||
if si_flags [si_id].peer_id then
|
||||
if si_target and si_flags [si_id].peer_id == si_target.id then
|
||||
Log.debug (si, "... already linked to proper target")
|
||||
-- Check this also here, in case in default targets changed
|
||||
checkFollowDefault (si, si_target, has_node_defined_target)
|
||||
return
|
||||
end
|
||||
local link = lookupLink (si_id, si_flags [si_id].peer_id)
|
||||
if reconnect then
|
||||
if link ~= nil then
|
||||
-- remove old link if active, otherwise schedule rescan
|
||||
if ((link:get_active_features () & Feature.SessionItem.ACTIVE) ~= 0) then
|
||||
si_flags [si_id].peer_id = nil
|
||||
link:remove ()
|
||||
Log.info (si, "... moving to new target")
|
||||
else
|
||||
-- Event-Stack TBD: do we need to retain this call here?
|
||||
rescan ()
|
||||
Log.info (si, "... scheduled rescan")
|
||||
return
|
||||
end
|
||||
end
|
||||
else
|
||||
if link ~= nil then
|
||||
Log.info (si, "... dont-reconnect, not moving")
|
||||
return
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
-- if the stream has dont-reconnect and was already linked before,
|
||||
-- don't link it to a new target
|
||||
if not reconnect and si_flags [si.id].was_handled then
|
||||
si_target = nil
|
||||
end
|
||||
|
||||
-- check target's availability
|
||||
if si_target then
|
||||
local target_is_linked, target_is_exclusive = isLinked (si_target)
|
||||
if target_is_exclusive then
|
||||
Log.info (si, "... target is linked exclusively")
|
||||
si_target = nil
|
||||
end
|
||||
|
||||
if target_is_linked then
|
||||
if exclusive or si_must_passthrough then
|
||||
Log.info (si, "... target is already linked, cannot link exclusively")
|
||||
si_target = nil
|
||||
else
|
||||
-- disable passthrough, we can live without it
|
||||
can_passthrough = false
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
if not si_target then
|
||||
Log.info (si, "... target not found, reconnect:" .. tostring (reconnect))
|
||||
|
||||
local node = si:get_associated_proxy ("node")
|
||||
if not reconnect then
|
||||
Log.info (si, "... destroy node")
|
||||
node:request_destroy ()
|
||||
elseif si_flags [si.id].was_handled then
|
||||
Log.info (si, "... waiting reconnect")
|
||||
return
|
||||
end
|
||||
|
||||
local client_id = node.properties ["client.id"]
|
||||
if client_id then
|
||||
local client = clients_om:lookup {
|
||||
Constraint { "bound-id", "=", client_id, type = "gobject" }
|
||||
}
|
||||
if client then
|
||||
client:send_error (node ["bound-id"], -2, "no node available")
|
||||
end
|
||||
end
|
||||
else
|
||||
createLink (si, si_target, can_passthrough, exclusive)
|
||||
si_flags [si.id].was_handled = true
|
||||
|
||||
checkFollowDefault (si, si_target, has_node_defined_target)
|
||||
end
|
||||
end
|
||||
|
||||
function handleLinkable (si)
|
||||
if checkPending () then
|
||||
return
|
||||
end
|
||||
|
||||
local valid, si_props = checkLinkable (si)
|
||||
if not valid then
|
||||
return
|
||||
end
|
||||
|
||||
props = {}
|
||||
props ["event.subject.type"] = "linkable"
|
||||
|
||||
-- check if we need to link this node at all
|
||||
local autoconnect = parseBool (si_props ["node.autoconnect"])
|
||||
if not autoconnect then
|
||||
Log.debug (si, tostring (si_props ["node.name"]) .. " does not need to be autoconnected")
|
||||
return
|
||||
end
|
||||
|
||||
if not find_target_events [si.id] then
|
||||
find_target_events [si.id] = {}
|
||||
else
|
||||
-- stop the processing of the old event, we are going to queue a new one any
|
||||
-- way
|
||||
find_target_events [si.id]:stop_processing ()
|
||||
end
|
||||
|
||||
find_target_events [si.id] = EventDispatcher.push_event {
|
||||
type = "find-target-si-and-link", priority = 10, properties = props,
|
||||
subject = si }
|
||||
end
|
||||
|
||||
function rescan ()
|
||||
Log.info ("rescanning..")
|
||||
for si in linkables_om:iterate () do
|
||||
handleLinkable (si)
|
||||
end
|
||||
end
|
||||
|
||||
function scheduleRescan ()
|
||||
Log.info ("schedule rescanning..")
|
||||
if self.scanning then
|
||||
self.pending_rescan = true
|
||||
return
|
||||
end
|
||||
|
||||
self.scanning = true
|
||||
rescan ()
|
||||
self.scanning = false
|
||||
|
||||
if self.pending_rescan then
|
||||
self.pending_rescan = false
|
||||
Core.sync (function ()
|
||||
scheduleRescan ()
|
||||
end)
|
||||
handleLinkable (si)
|
||||
end
|
||||
end
|
||||
|
||||
|
|
@ -627,7 +768,7 @@ function checkPending ()
|
|||
Log.debug ("pending linkables ready")
|
||||
self.events_skipped = false
|
||||
-- Event-Stack TBD: do we need to retain this call here?
|
||||
scheduleRescan ()
|
||||
rescan ()
|
||||
return true
|
||||
end
|
||||
|
||||
|
|
@ -663,174 +804,6 @@ function checkFollowDefault (si, si_target, has_node_defined_target)
|
|||
end
|
||||
end
|
||||
|
||||
function handleLinkable (si)
|
||||
if checkPending () then
|
||||
return
|
||||
end
|
||||
|
||||
local valid, si_props = checkLinkable (si)
|
||||
if not valid then
|
||||
return
|
||||
end
|
||||
|
||||
-- check if we need to link this node at all
|
||||
local autoconnect = parseBool (si_props ["node.autoconnect"])
|
||||
if not autoconnect then
|
||||
Log.debug (si, tostring (si_props ["node.name"]) .. " does not need to be autoconnected")
|
||||
return
|
||||
end
|
||||
|
||||
Log.info (si, string.format ("handling item: %s (%s)",
|
||||
tostring (si_props ["node.name"]), tostring (si_props ["node.id"])))
|
||||
|
||||
ensureSiFlags (si)
|
||||
|
||||
-- get other important node properties
|
||||
local reconnect = not parseBool (si_props ["node.dont-reconnect"])
|
||||
local exclusive = parseBool (si_props ["node.exclusive"])
|
||||
local si_must_passthrough = parseBool (si_props ["item.node.encoded-only"])
|
||||
|
||||
-- find defined target
|
||||
local si_target, has_defined_target, has_node_defined_target = findDefinedTarget (si_props)
|
||||
local can_passthrough = si_target and canPassthrough (si, si_target)
|
||||
|
||||
if si_target and si_must_passthrough and not can_passthrough then
|
||||
si_target = nil
|
||||
end
|
||||
|
||||
-- if the client has seen a target that we haven't yet prepared, schedule
|
||||
-- a rescan one more time and hope for the best
|
||||
local si_id = si.id
|
||||
if has_defined_target
|
||||
and not si_target
|
||||
and not si_flags [si_id].was_handled
|
||||
and not si_flags [si_id].done_waiting then
|
||||
Log.info (si, "... waiting for target")
|
||||
si_flags [si_id].done_waiting = true
|
||||
-- Event-Stack TBD: do we need to retain this call here?
|
||||
scheduleRescan ()
|
||||
return
|
||||
end
|
||||
|
||||
-- find fallback target
|
||||
if not si_target and (reconnect or not has_defined_target) then
|
||||
si_target, can_passthrough = findUndefinedTarget (si)
|
||||
end
|
||||
|
||||
-- Check if item is linked to proper target, otherwise re-link
|
||||
if si_flags [si_id].peer_id then
|
||||
if si_target and si_flags [si_id].peer_id == si_target.id then
|
||||
Log.debug (si, "... already linked to proper target")
|
||||
-- Check this also here, in case in default targets changed
|
||||
checkFollowDefault (si, si_target, has_node_defined_target)
|
||||
return
|
||||
end
|
||||
local link = lookupLink (si_id, si_flags [si_id].peer_id)
|
||||
if reconnect then
|
||||
if link ~= nil then
|
||||
-- remove old link
|
||||
if ((link:get_active_features() & Feature.SessionItem.ACTIVE) == 0) then
|
||||
-- Link not yet activated. We don't want to remove it now, as that
|
||||
-- may cause problems. Instead, give up for now. A rescan is scheduled
|
||||
-- once the link activates.
|
||||
Log.info (link, "Link to be moved was not activated, will wait for it.")
|
||||
return
|
||||
end
|
||||
si_flags [si_id].peer_id = nil
|
||||
link:remove ()
|
||||
Log.info (si, "... moving to new target")
|
||||
end
|
||||
else
|
||||
if link ~= nil then
|
||||
Log.info (si, "... dont-reconnect, not moving")
|
||||
return
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
-- if the stream has dont-reconnect and was already linked before,
|
||||
-- don't link it to a new target
|
||||
if not reconnect and si_flags [si.id].was_handled then
|
||||
si_target = nil
|
||||
end
|
||||
|
||||
-- check target's availability
|
||||
if si_target then
|
||||
local target_is_linked, target_is_exclusive = isLinked (si_target)
|
||||
if target_is_exclusive then
|
||||
Log.info (si, "... target is linked exclusively")
|
||||
si_target = nil
|
||||
end
|
||||
|
||||
if target_is_linked then
|
||||
if exclusive or si_must_passthrough then
|
||||
Log.info (si, "... target is already linked, cannot link exclusively")
|
||||
si_target = nil
|
||||
else
|
||||
-- disable passthrough, we can live without it
|
||||
can_passthrough = false
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
if not si_target then
|
||||
Log.info (si, "... target not found, reconnect:" .. tostring (reconnect))
|
||||
|
||||
local node = si:get_associated_proxy ("node")
|
||||
if not reconnect then
|
||||
Log.info (si, "... destroy node")
|
||||
node:request_destroy ()
|
||||
elseif si_flags [si.id].was_handled then
|
||||
Log.info (si, "... waiting reconnect")
|
||||
return
|
||||
end
|
||||
|
||||
local client_id = node.properties ["client.id"]
|
||||
if client_id then
|
||||
local client = clients_om:lookup {
|
||||
Constraint { "bound-id", "=", client_id, type = "gobject" }
|
||||
}
|
||||
if client then
|
||||
client:send_error (node ["bound-id"], -2, "no node available")
|
||||
end
|
||||
end
|
||||
else
|
||||
createLink (si, si_target, can_passthrough, exclusive)
|
||||
si_flags [si.id].was_handled = true
|
||||
|
||||
checkFollowDefault (si, si_target, has_node_defined_target)
|
||||
end
|
||||
end
|
||||
|
||||
function unhandleLinkable (si)
|
||||
local valid, si_props = checkLinkable (si, true)
|
||||
if not valid then
|
||||
return
|
||||
end
|
||||
|
||||
Log.info (si, string.format ("unhandling item: %s (%s)",
|
||||
tostring (si_props ["node.name"]), tostring (si_props ["node.id"])))
|
||||
|
||||
-- remove any links associated with this item
|
||||
for silink in links_om:iterate () do
|
||||
local out_id = tonumber (silink.properties ["out.item.id"])
|
||||
local in_id = tonumber (silink.properties ["in.item.id"])
|
||||
if out_id == si.id or in_id == si.id then
|
||||
if out_id == si.id and
|
||||
si_flags [in_id] and si_flags [in_id].peer_id == out_id then
|
||||
si_flags [in_id].peer_id = nil
|
||||
elseif in_id == si.id and
|
||||
si_flags [out_id] and si_flags [out_id].peer_id == in_id then
|
||||
si_flags [out_id].peer_id = nil
|
||||
end
|
||||
silink:remove ()
|
||||
Log.info (silink, "... link removed")
|
||||
end
|
||||
end
|
||||
|
||||
si_flags [si.id] = nil
|
||||
end
|
||||
|
||||
default_nodes = Plugin.find ("default-nodes-api")
|
||||
|
||||
metadata_om = ObjectManager {
|
||||
|
|
@ -872,24 +845,6 @@ links_om = ObjectManager {
|
|||
}
|
||||
}
|
||||
|
||||
-- listen for default node changes if "follow" setting is enabled
|
||||
if follow and default_nodes ~= nil then
|
||||
default_nodes:connect ("changed", function ()
|
||||
scheduleRescan ()
|
||||
end)
|
||||
end
|
||||
|
||||
-- listen for target.node metadata changes if "move" setting is enabled
|
||||
if move then
|
||||
metadata_om:connect ("object-added", function (om, metadata)
|
||||
metadata:connect ("changed", function (m, subject, key, t, value)
|
||||
if key == "target.node" or key == "target.object" then
|
||||
scheduleRescan ()
|
||||
end
|
||||
end)
|
||||
end)
|
||||
end
|
||||
|
||||
function findAssociatedLinkGroupNode (si)
|
||||
local si_props = si.properties
|
||||
local node = si:get_associated_proxy ("node")
|
||||
|
|
@ -1006,6 +961,22 @@ SimpleEventHook {
|
|||
end
|
||||
}:register ()
|
||||
|
||||
SimpleEventHook {
|
||||
name = "find-target-si-and-link@policy-node",
|
||||
type = "after-events-with-event",
|
||||
priority = "find-target-si",
|
||||
interests = {
|
||||
EventInterest {
|
||||
Constraint { "event.type", "=", "find-target-si-and-link" },
|
||||
Constraint { "event.subject.type", "=", "linkable" },
|
||||
},
|
||||
},
|
||||
execute = function (event)
|
||||
local si = event:get_subject ()
|
||||
findTargetSiAndLink (si)
|
||||
end
|
||||
}:register ()
|
||||
|
||||
SimpleEventHook {
|
||||
name = "linkable-removed@policy-node",
|
||||
type = "on-event",
|
||||
|
|
@ -1062,18 +1033,76 @@ SimpleEventHook {
|
|||
Constraint { "media.class", "#", "Video/*", type = "pw-global" },
|
||||
Constraint { "active-features", "!", 0, type = "gobject" },
|
||||
},
|
||||
-- on device addition
|
||||
-- on device Routes changed
|
||||
EventInterest {
|
||||
Constraint { "event.type", "=", "object-added" },
|
||||
Constraint { "event.type", "=", "params-changed" },
|
||||
Constraint { "event.subject.type", "=", "device" },
|
||||
Constraint { "event.subject.param-id", "=", "Route" },
|
||||
},
|
||||
|
||||
},
|
||||
execute = function ()
|
||||
scheduleRescan ()
|
||||
rescan ()
|
||||
end
|
||||
}:register ()
|
||||
|
||||
if follow then
|
||||
SimpleEventHook {
|
||||
name = "follow@policy-node",
|
||||
type = "after-events",
|
||||
priority = "rescan-policy",
|
||||
interests = {
|
||||
EventInterest {
|
||||
Constraint { "event.type", "=", "object-changed" },
|
||||
Constraint { "event.subject.type", "=", "metadata" },
|
||||
Constraint { "metadata.name", "=", "default" },
|
||||
Constraint { "event.subject.key", "=", "default.audio.source" },
|
||||
},
|
||||
EventInterest {
|
||||
Constraint { "event.type", "=", "object-changed" },
|
||||
Constraint { "event.subject.type", "=", "metadata" },
|
||||
Constraint { "metadata.name", "=", "default" },
|
||||
Constraint { "event.subject.key", "=", "default.audio.sink" },
|
||||
},
|
||||
EventInterest {
|
||||
Constraint { "event.type", "=", "object-changed" },
|
||||
Constraint { "event.subject.type", "=", "metadata" },
|
||||
Constraint { "metadata.name", "=", "default" },
|
||||
Constraint { "event.subject.key", "=", "default.video.source" },
|
||||
},
|
||||
},
|
||||
execute = function ()
|
||||
rescan ()
|
||||
end
|
||||
}:register ()
|
||||
end
|
||||
|
||||
if move then
|
||||
SimpleEventHook {
|
||||
name = "move@policy-node",
|
||||
type = "after-events",
|
||||
priority = "rescan-policy",
|
||||
interests = {
|
||||
EventInterest {
|
||||
Constraint { "event.type", "=", "object-changed" },
|
||||
Constraint { "event.subject.type", "=", "metadata" },
|
||||
Constraint { "metadata.name", "=", "default" },
|
||||
Constraint { "event.subject.key", "=", "target.node" },
|
||||
Constraint { "event.subject.value", "!", "-1" },
|
||||
},
|
||||
EventInterest {
|
||||
Constraint { "event.type", "=", "object-changed" },
|
||||
Constraint { "event.subject.type", "=", "metadata" },
|
||||
Constraint { "metadata.name", "=", "default" },
|
||||
Constraint { "event.subject.key", "=", "target.object" },
|
||||
Constraint { "event.subject.value", "!", "-1" },
|
||||
},
|
||||
},
|
||||
execute = function ()
|
||||
rescan ()
|
||||
end
|
||||
}:register ()
|
||||
end
|
||||
|
||||
metadata_om:activate ()
|
||||
endpoints_om:activate ()
|
||||
clients_om:activate ()
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue