From 3832e14c1c9559e270102b036d3a7fc04f152622 Mon Sep 17 00:00:00 2001 From: Ashok Sidipotu Date: Mon, 15 Aug 2022 07:11:54 +0530 Subject: [PATCH] policy-node.lua: First round of cleanup - remove scheduling rescan via core APIs instead use the event stack for proper scheduling. - The rescan & handlelinkable hooks will push new findTargetSiAndLink events one per session item. - Register a new "after-events-with-event" hook for findTargetSiAndLink event, it runs with lower priority than the rescan hook and so rescan hook can cancel unneeded findTargetSiAndLink events. - Register hooks for move and follow properties. --- src/scripts/policy-node.lua | 451 +++++++++++++++++++----------------- 1 file changed, 240 insertions(+), 211 deletions(-) diff --git a/src/scripts/policy-node.lua b/src/scripts/policy-node.lua index 7ba3d10c..7f31f0c3 100644 --- a/src/scripts/policy-node.lua +++ b/src/scripts/policy-node.lua @@ -25,30 +25,171 @@ self.scanning = false self.pending_rescan = false self.events_skipped = false self.pending_error_timer = nil +find_target_events = {} + +function findTargetSiAndLink (si) + + si_props = si.properties + + Log.info (si, string.format ("handling item: %s (%s)", + tostring (si_props ["node.name"]), tostring (si_props ["node.id"]))) + + ensureSiFlags (si) + + -- get other important node properties + local reconnect = not parseBool (si_props ["node.dont-reconnect"]) + local exclusive = parseBool (si_props ["node.exclusive"]) + local si_must_passthrough = parseBool (si_props ["item.node.encoded-only"]) + + -- find defined target + local si_target, has_defined_target, has_node_defined_target = findDefinedTarget (si_props) + local can_passthrough = si_target and canPassthrough (si, si_target) + + if si_target and si_must_passthrough and not can_passthrough then + si_target = nil + end + + -- if the client has seen a target that we haven't yet prepared, schedule + -- a rescan one more time and hope for the best + local si_id = si.id + if has_defined_target + and not si_target + and not si_flags [si_id].was_handled + and not si_flags [si_id].done_waiting then + Log.info (si, "... waiting for target") + si_flags [si_id].done_waiting = true + -- Event-Stack TBD: do we need to retain this call here? + rescan () + return + end + + -- find fallback target + if not si_target and (reconnect or not has_defined_target) then + si_target, can_passthrough = findUndefinedTarget (si) + end + + -- Check if item is linked to proper target, otherwise re-link + if si_flags [si_id].peer_id then + if si_target and si_flags [si_id].peer_id == si_target.id then + Log.debug (si, "... already linked to proper target") + -- Check this also here, in case in default targets changed + checkFollowDefault (si, si_target, has_node_defined_target) + return + end + local link = lookupLink (si_id, si_flags [si_id].peer_id) + if reconnect then + if link ~= nil then + -- remove old link if active, otherwise schedule rescan + if ((link:get_active_features () & Feature.SessionItem.ACTIVE) ~= 0) then + si_flags [si_id].peer_id = nil + link:remove () + Log.info (si, "... moving to new target") + else + -- Event-Stack TBD: do we need to retain this call here? + rescan () + Log.info (si, "... scheduled rescan") + return + end + end + else + if link ~= nil then + Log.info (si, "... dont-reconnect, not moving") + return + end + end + end + + -- if the stream has dont-reconnect and was already linked before, + -- don't link it to a new target + if not reconnect and si_flags [si.id].was_handled then + si_target = nil + end + + -- check target's availability + if si_target then + local target_is_linked, target_is_exclusive = isLinked (si_target) + if target_is_exclusive then + Log.info (si, "... target is linked exclusively") + si_target = nil + end + + if target_is_linked then + if exclusive or si_must_passthrough then + Log.info (si, "... target is already linked, cannot link exclusively") + si_target = nil + else + -- disable passthrough, we can live without it + can_passthrough = false + end + end + end + + if not si_target then + Log.info (si, "... target not found, reconnect:" .. tostring (reconnect)) + + local node = si:get_associated_proxy ("node") + if not reconnect then + Log.info (si, "... destroy node") + node:request_destroy () + elseif si_flags [si.id].was_handled then + Log.info (si, "... waiting reconnect") + return + end + + local client_id = node.properties ["client.id"] + if client_id then + local client = clients_om:lookup { + Constraint { "bound-id", "=", client_id, type = "gobject" } + } + if client then + client:send_error (node ["bound-id"], -2, "no node available") + end + end + else + createLink (si, si_target, can_passthrough, exclusive) + si_flags [si.id].was_handled = true + + checkFollowDefault (si, si_target, has_node_defined_target) + end +end + +function handleLinkable (si) + if checkPending () then + return + end + + local valid, si_props = checkLinkable (si) + if not valid then + return + end + + props = {} + props ["event.subject.type"] = "linkable" + + -- check if we need to link this node at all + local autoconnect = parseBool (si_props ["node.autoconnect"]) + if not autoconnect then + Log.debug (si, tostring (si_props ["node.name"]) .. " does not need to be autoconnected") + return + end + + if not find_target_events [si.id] then + find_target_events [si.id] = {} + else + -- stop the processing of the old event, we are going to queue a new one any + -- way + find_target_events [si.id]:stop_processing () + end + + find_target_events [si.id] = EventDispatcher.push_event { + type = "find-target-si-and-link", priority = 10, properties = props, + subject = si } +end function rescan () Log.info ("rescanning..") for si in linkables_om:iterate () do - handleLinkable (si) - end -end - -function scheduleRescan () - Log.info ("schedule rescanning..") - if self.scanning then - self.pending_rescan = true - return - end - - self.scanning = true - rescan () - self.scanning = false - - if self.pending_rescan then - self.pending_rescan = false - Core.sync (function () - scheduleRescan () - end) + handleLinkable (si) end end @@ -627,7 +768,7 @@ function checkPending () Log.debug ("pending linkables ready") self.events_skipped = false -- Event-Stack TBD: do we need to retain this call here? - scheduleRescan () + rescan () return true end @@ -663,174 +804,6 @@ function checkFollowDefault (si, si_target, has_node_defined_target) end end -function handleLinkable (si) - if checkPending () then - return - end - - local valid, si_props = checkLinkable (si) - if not valid then - return - end - - -- check if we need to link this node at all - local autoconnect = parseBool (si_props ["node.autoconnect"]) - if not autoconnect then - Log.debug (si, tostring (si_props ["node.name"]) .. " does not need to be autoconnected") - return - end - - Log.info (si, string.format ("handling item: %s (%s)", - tostring (si_props ["node.name"]), tostring (si_props ["node.id"]))) - - ensureSiFlags (si) - - -- get other important node properties - local reconnect = not parseBool (si_props ["node.dont-reconnect"]) - local exclusive = parseBool (si_props ["node.exclusive"]) - local si_must_passthrough = parseBool (si_props ["item.node.encoded-only"]) - - -- find defined target - local si_target, has_defined_target, has_node_defined_target = findDefinedTarget (si_props) - local can_passthrough = si_target and canPassthrough (si, si_target) - - if si_target and si_must_passthrough and not can_passthrough then - si_target = nil - end - - -- if the client has seen a target that we haven't yet prepared, schedule - -- a rescan one more time and hope for the best - local si_id = si.id - if has_defined_target - and not si_target - and not si_flags [si_id].was_handled - and not si_flags [si_id].done_waiting then - Log.info (si, "... waiting for target") - si_flags [si_id].done_waiting = true - -- Event-Stack TBD: do we need to retain this call here? - scheduleRescan () - return - end - - -- find fallback target - if not si_target and (reconnect or not has_defined_target) then - si_target, can_passthrough = findUndefinedTarget (si) - end - - -- Check if item is linked to proper target, otherwise re-link - if si_flags [si_id].peer_id then - if si_target and si_flags [si_id].peer_id == si_target.id then - Log.debug (si, "... already linked to proper target") - -- Check this also here, in case in default targets changed - checkFollowDefault (si, si_target, has_node_defined_target) - return - end - local link = lookupLink (si_id, si_flags [si_id].peer_id) - if reconnect then - if link ~= nil then - -- remove old link - if ((link:get_active_features() & Feature.SessionItem.ACTIVE) == 0) then - -- Link not yet activated. We don't want to remove it now, as that - -- may cause problems. Instead, give up for now. A rescan is scheduled - -- once the link activates. - Log.info (link, "Link to be moved was not activated, will wait for it.") - return - end - si_flags [si_id].peer_id = nil - link:remove () - Log.info (si, "... moving to new target") - end - else - if link ~= nil then - Log.info (si, "... dont-reconnect, not moving") - return - end - end - end - - -- if the stream has dont-reconnect and was already linked before, - -- don't link it to a new target - if not reconnect and si_flags [si.id].was_handled then - si_target = nil - end - - -- check target's availability - if si_target then - local target_is_linked, target_is_exclusive = isLinked (si_target) - if target_is_exclusive then - Log.info (si, "... target is linked exclusively") - si_target = nil - end - - if target_is_linked then - if exclusive or si_must_passthrough then - Log.info (si, "... target is already linked, cannot link exclusively") - si_target = nil - else - -- disable passthrough, we can live without it - can_passthrough = false - end - end - end - - if not si_target then - Log.info (si, "... target not found, reconnect:" .. tostring (reconnect)) - - local node = si:get_associated_proxy ("node") - if not reconnect then - Log.info (si, "... destroy node") - node:request_destroy () - elseif si_flags [si.id].was_handled then - Log.info (si, "... waiting reconnect") - return - end - - local client_id = node.properties ["client.id"] - if client_id then - local client = clients_om:lookup { - Constraint { "bound-id", "=", client_id, type = "gobject" } - } - if client then - client:send_error (node ["bound-id"], -2, "no node available") - end - end - else - createLink (si, si_target, can_passthrough, exclusive) - si_flags [si.id].was_handled = true - - checkFollowDefault (si, si_target, has_node_defined_target) - end -end - -function unhandleLinkable (si) - local valid, si_props = checkLinkable (si, true) - if not valid then - return - end - - Log.info (si, string.format ("unhandling item: %s (%s)", - tostring (si_props ["node.name"]), tostring (si_props ["node.id"]))) - - -- remove any links associated with this item - for silink in links_om:iterate () do - local out_id = tonumber (silink.properties ["out.item.id"]) - local in_id = tonumber (silink.properties ["in.item.id"]) - if out_id == si.id or in_id == si.id then - if out_id == si.id and - si_flags [in_id] and si_flags [in_id].peer_id == out_id then - si_flags [in_id].peer_id = nil - elseif in_id == si.id and - si_flags [out_id] and si_flags [out_id].peer_id == in_id then - si_flags [out_id].peer_id = nil - end - silink:remove () - Log.info (silink, "... link removed") - end - end - - si_flags [si.id] = nil -end - default_nodes = Plugin.find ("default-nodes-api") metadata_om = ObjectManager { @@ -872,24 +845,6 @@ links_om = ObjectManager { } } --- listen for default node changes if "follow" setting is enabled -if follow and default_nodes ~= nil then - default_nodes:connect ("changed", function () - scheduleRescan () - end) -end - --- listen for target.node metadata changes if "move" setting is enabled -if move then - metadata_om:connect ("object-added", function (om, metadata) - metadata:connect ("changed", function (m, subject, key, t, value) - if key == "target.node" or key == "target.object" then - scheduleRescan () - end - end) - end) -end - function findAssociatedLinkGroupNode (si) local si_props = si.properties local node = si:get_associated_proxy ("node") @@ -1006,6 +961,22 @@ SimpleEventHook { end }:register () +SimpleEventHook { + name = "find-target-si-and-link@policy-node", + type = "after-events-with-event", + priority = "find-target-si", + interests = { + EventInterest { + Constraint { "event.type", "=", "find-target-si-and-link" }, + Constraint { "event.subject.type", "=", "linkable" }, + }, + }, + execute = function (event) + local si = event:get_subject () + findTargetSiAndLink (si) + end +}:register () + SimpleEventHook { name = "linkable-removed@policy-node", type = "on-event", @@ -1062,18 +1033,76 @@ SimpleEventHook { Constraint { "media.class", "#", "Video/*", type = "pw-global" }, Constraint { "active-features", "!", 0, type = "gobject" }, }, - -- on device addition + -- on device Routes changed EventInterest { - Constraint { "event.type", "=", "object-added" }, + Constraint { "event.type", "=", "params-changed" }, Constraint { "event.subject.type", "=", "device" }, + Constraint { "event.subject.param-id", "=", "Route" }, }, - }, execute = function () - scheduleRescan () + rescan () end }:register () +if follow then + SimpleEventHook { + name = "follow@policy-node", + type = "after-events", + priority = "rescan-policy", + interests = { + EventInterest { + Constraint { "event.type", "=", "object-changed" }, + Constraint { "event.subject.type", "=", "metadata" }, + Constraint { "metadata.name", "=", "default" }, + Constraint { "event.subject.key", "=", "default.audio.source" }, + }, + EventInterest { + Constraint { "event.type", "=", "object-changed" }, + Constraint { "event.subject.type", "=", "metadata" }, + Constraint { "metadata.name", "=", "default" }, + Constraint { "event.subject.key", "=", "default.audio.sink" }, + }, + EventInterest { + Constraint { "event.type", "=", "object-changed" }, + Constraint { "event.subject.type", "=", "metadata" }, + Constraint { "metadata.name", "=", "default" }, + Constraint { "event.subject.key", "=", "default.video.source" }, + }, + }, + execute = function () + rescan () + end + }:register () +end + +if move then + SimpleEventHook { + name = "move@policy-node", + type = "after-events", + priority = "rescan-policy", + interests = { + EventInterest { + Constraint { "event.type", "=", "object-changed" }, + Constraint { "event.subject.type", "=", "metadata" }, + Constraint { "metadata.name", "=", "default" }, + Constraint { "event.subject.key", "=", "target.node" }, + Constraint { "event.subject.value", "!", "-1" }, + }, + EventInterest { + Constraint { "event.type", "=", "object-changed" }, + Constraint { "event.subject.type", "=", "metadata" }, + Constraint { "metadata.name", "=", "default" }, + Constraint { "event.subject.key", "=", "target.object" }, + Constraint { "event.subject.value", "!", "-1" }, + }, + }, + execute = function () + rescan () + end + }:register () +end + metadata_om:activate () endpoints_om:activate () clients_om:activate ()