execution: fold in dependency aware caching / Fix --cache-none with loops/lazy etc (#10368)

* execution: fold in dependency aware caching

This makes --cache-none compatiable with lazy and expanded
subgraphs.

Currently the --cache-none option is powered by the
DependencyAwareCache. The cache attempts to maintain a parallel
copy of the execution list data structure, however it is only
setup once at the start of execution and does not get meaninigful
updates to the execution list.

This causes multiple problems when --cache-none is used with lazy
and expanded subgraphs as the DAC does not accurately update its
copy of the execution data structure.

DAC has an attempt to handle subgraphs ensure_subcache however
this does not accurately connect to nodes outside the subgraph.
The current semantics of DAC are to free a node ASAP after the
dependent nodes are executed.

This means that if a subgraph refs such a node it will be requed
and re-executed by the execution_list but DAC wont see it in
its to-free lists anymore and leak memory.

Rather than try and cover all the cases where the execution list
changes from inside the cache, move the while problem to the
executor which maintains an always up-to-date copy of the wanted
data-structure.

The executor now has a fast-moving run-local cache of its own.
Each _to node has its own mini cache, and the cache is unconditionally
primed at the time of add_strong_link.

add_strong_link is called for all of static workflows, lazy links
and expanded subgraphs so its the singular source of truth for
output dependendencies.

In the case of a cache-hit, the executor cache will hold the non-none
value (it will respect updates if they happen somehow as well).

In the case of a cache-miss, the executor caches a None and will
wait for a notification to update the value when the node completes.

When a node completes execution, it simply releases its mini-cache
and in turn its strong refs on its direct anscestor outputs, allowing
for ASAP freeing (same as the DependencyAwareCache but a little more
automatic).

This now allows for re-implementation of --cache-none with no cache
at all. The dependency aware cache was also observing the dependency
sematics for the objects and UI cache which is not accurate (this
entire logic was always outputs specific).

This also prepares for more complex caching strategies (such as RAM
pressure based caching), where a cache can implement any freeing
strategy completely independently of the DepedancyAwareness
requirement.

* main: re-implement --cache-none as no cache at all

The execution list now tracks the dependency aware caching more
correctly that the DependancyAwareCache.

Change it to a cache that does nothing.

* test_execution: add --cache-none to the test suite

--cache-none is now expected to work universally. Run it through the
full unit test suite. Propagate the server parameterization for whether
or not the server is capabale of caching, so that the minority of tests
that specifically check for cache hits can if else. Hard assert NOT
caching in the else to give some coverage of --cache-none expected
behaviour to not acutally cache.
This commit is contained in:
rattus128
2025-10-18 06:55:15 +10:00
committed by GitHub
parent d8d60b5609
commit b1467da480
5 changed files with 101 additions and 190 deletions

View File

@@ -152,12 +152,12 @@ class TestExecution:
# Initialize server and client
#
@fixture(scope="class", autouse=True, params=[
# (use_lru, lru_size)
(False, 0),
(True, 0),
(True, 100),
{ "extra_args" : [], "should_cache_results" : True },
{ "extra_args" : ["--cache-lru", 0], "should_cache_results" : True },
{ "extra_args" : ["--cache-lru", 100], "should_cache_results" : True },
{ "extra_args" : ["--cache-none"], "should_cache_results" : False },
])
def _server(self, args_pytest, request):
def server(self, args_pytest, request):
# Start server
pargs = [
'python','main.py',
@@ -167,12 +167,10 @@ class TestExecution:
'--extra-model-paths-config', 'tests/execution/extra_model_paths.yaml',
'--cpu',
]
use_lru, lru_size = request.param
if use_lru:
pargs += ['--cache-lru', str(lru_size)]
pargs += [ str(param) for param in request.param["extra_args"] ]
print("Running server with args:", pargs) # noqa: T201
p = subprocess.Popen(pargs)
yield
yield request.param
p.kill()
torch.cuda.empty_cache()
@@ -193,7 +191,7 @@ class TestExecution:
return comfy_client
@fixture(scope="class", autouse=True)
def shared_client(self, args_pytest, _server):
def shared_client(self, args_pytest, server):
client = self.start_client(args_pytest["listen"], args_pytest["port"])
yield client
del client
@@ -225,7 +223,7 @@ class TestExecution:
assert result.did_run(mask)
assert result.did_run(lazy_mix)
def test_full_cache(self, client: ComfyClient, builder: GraphBuilder):
def test_full_cache(self, client: ComfyClient, builder: GraphBuilder, server):
g = builder
input1 = g.node("StubImage", content="BLACK", height=512, width=512, batch_size=1)
input2 = g.node("StubImage", content="NOISE", height=512, width=512, batch_size=1)
@@ -237,9 +235,12 @@ class TestExecution:
client.run(g)
result2 = client.run(g)
for node_id, node in g.nodes.items():
assert not result2.did_run(node), f"Node {node_id} ran, but should have been cached"
if server["should_cache_results"]:
assert not result2.did_run(node), f"Node {node_id} ran, but should have been cached"
else:
assert result2.did_run(node), f"Node {node_id} was cached, but should have been run"
def test_partial_cache(self, client: ComfyClient, builder: GraphBuilder):
def test_partial_cache(self, client: ComfyClient, builder: GraphBuilder, server):
g = builder
input1 = g.node("StubImage", content="BLACK", height=512, width=512, batch_size=1)
input2 = g.node("StubImage", content="NOISE", height=512, width=512, batch_size=1)
@@ -251,8 +252,12 @@ class TestExecution:
client.run(g)
mask.inputs['value'] = 0.4
result2 = client.run(g)
assert not result2.did_run(input1), "Input1 should have been cached"
assert not result2.did_run(input2), "Input2 should have been cached"
if server["should_cache_results"]:
assert not result2.did_run(input1), "Input1 should have been cached"
assert not result2.did_run(input2), "Input2 should have been cached"
else:
assert result2.did_run(input1), "Input1 should have been rerun"
assert result2.did_run(input2), "Input2 should have been rerun"
def test_error(self, client: ComfyClient, builder: GraphBuilder):
g = builder
@@ -411,7 +416,7 @@ class TestExecution:
input2 = g.node("StubImage", id="removeme", content="WHITE", height=512, width=512, batch_size=1)
client.run(g)
def test_custom_is_changed(self, client: ComfyClient, builder: GraphBuilder):
def test_custom_is_changed(self, client: ComfyClient, builder: GraphBuilder, server):
g = builder
# Creating the nodes in this specific order previously caused a bug
save = g.node("SaveImage")
@@ -427,7 +432,10 @@ class TestExecution:
result3 = client.run(g)
result4 = client.run(g)
assert result1.did_run(is_changed), "is_changed should have been run"
assert not result2.did_run(is_changed), "is_changed should have been cached"
if server["should_cache_results"]:
assert not result2.did_run(is_changed), "is_changed should have been cached"
else:
assert result2.did_run(is_changed), "is_changed should have been re-run"
assert result3.did_run(is_changed), "is_changed should have been re-run"
assert result4.did_run(is_changed), "is_changed should not have been cached"
@@ -514,7 +522,7 @@ class TestExecution:
assert len(images2) == 1, "Should have 1 image"
# This tests that only constant outputs are used in the call to `IS_CHANGED`
def test_is_changed_with_outputs(self, client: ComfyClient, builder: GraphBuilder):
def test_is_changed_with_outputs(self, client: ComfyClient, builder: GraphBuilder, server):
g = builder
input1 = g.node("StubConstantImage", value=0.5, height=512, width=512, batch_size=1)
test_node = g.node("TestIsChangedWithConstants", image=input1.out(0), value=0.5)
@@ -530,7 +538,11 @@ class TestExecution:
images = result.get_images(output)
assert len(images) == 1, "Should have 1 image"
assert numpy.array(images[0]).min() == 63 and numpy.array(images[0]).max() == 63, "Image should have value 0.25"
assert not result.did_run(test_node), "The execution should have been cached"
if server["should_cache_results"]:
assert not result.did_run(test_node), "The execution should have been cached"
else:
assert result.did_run(test_node), "The execution should have been re-run"
def test_parallel_sleep_nodes(self, client: ComfyClient, builder: GraphBuilder, skip_timing_checks):
# Warmup execution to ensure server is fully initialized