import itertools
import pytest
import stix2.base
import stix2generator
import stix2generator.exceptions
import stix2generator.test.utils
import stix2generator.utils
import stix2generator.generation.object_generator
import stix2generator.generation.stix_generator
[docs]@pytest.mark.parametrize(
"seed_type", [
"identity",
stix2generator.utils.STIXTypeClass.SDO,
stix2generator.utils.STIXTypeClass.SCO,
stix2generator.utils.STIXTypeClass.SRO
]
)
def test_seeds(num_trials, seed_type, stix21_generator):
for _ in range(num_trials):
graph = stix21_generator.generate(seed_type)
# Ensure the graph has at least one object of type seed_type.
assert any(
stix2generator.utils.is_stix_type(
obj, seed_type, stix_version="2.1"
)
for obj in graph.values()
)
[docs]def test_bad_seed(stix21_generator):
with pytest.raises(stix2generator.exceptions.GeneratableSTIXTypeNotFoundError):
stix21_generator.generate("foo")
def _count_relationships(graph):
"""
Counts the number of relationships (plain and sighting) in the graph.
"""
count = sum(
1 if stix2generator.utils.is_sro(obj) else 0
for obj in graph.values()
)
return count
[docs]def test_relationship_count(num_trials):
stix_gen_config = stix2generator.generation.stix_generator.Config(
min_relationships=2,
max_relationships=5
)
stix_gen = stix2generator.create_stix_generator(
stix_generator_config=stix_gen_config
)
for _ in range(num_trials):
graph = stix_gen.generate()
rel_count = _count_relationships(graph)
assert 2 <= rel_count <= 5
[docs]def test_complete_ref_properties_true(num_trials):
stix_gen_config = stix2generator.generation.stix_generator.Config(
complete_ref_properties=True
)
stix_gen = stix2generator.create_stix_generator(
stix_generator_config=stix_gen_config
)
for _ in range(num_trials):
graph = stix_gen.generate()
assert not stix2generator.test.utils.has_dangling_references(graph)
[docs]def test_complete_ref_properties_false(num_trials):
stix_gen_config = stix2generator.generation.stix_generator.Config(
complete_ref_properties=False
)
stix_gen = stix2generator.create_stix_generator(
stix_generator_config=stix_gen_config
)
for _ in range(num_trials):
graph = stix_gen.generate()
# I think that if complete_ref_properties is False, non-relationship
# ref properties would always have to be dangling. If there are no
# non-relationship ref properties, none would be dangling, because it
# would be an error for a relationship to refer to a non-existent
# object.
for id_, obj in graph.items():
if not stix2generator.utils.is_sro(obj):
first_ref = next(
stix2generator.utils.find_references(obj), None
)
has_non_relationship_ref_props = first_ref is not None
if has_non_relationship_ref_props:
break
else:
has_non_relationship_ref_props = False
if has_non_relationship_ref_props:
assert stix2generator.test.utils.has_dangling_references(graph)
else:
assert not stix2generator.test.utils.has_dangling_references(graph)
[docs]def test_probability_sighting(num_trials):
stix_gen_config = stix2generator.generation.stix_generator.Config(
probability_sighting=0
)
stix_gen = stix2generator.create_stix_generator(
stix_generator_config=stix_gen_config
)
for _ in range(num_trials):
graph = stix_gen.generate()
has_sighting = any(
obj["type"] == "sighting"
for obj in graph.values()
)
assert not has_sighting
# can't test that probability_sighting=1 results in *only*
# sightings, because STIX graph generation can't guarantee that.
[docs]def test_connectedness(num_trials, stix21_generator):
for _ in range(num_trials):
graph = stix21_generator.generate()
assert stix2generator.test.utils.is_connected(graph)
class _FilterFirst:
"""
Instances act as a predicate which is used to filter out only the first
occurrence of some value.
"""
def __init__(self, filter_value):
"""
Initialize this predicate object.
:param filter_value: The value whose first occurrence this predicate
should filter out.
"""
self.filter_value = filter_value
self.found = False
def __call__(self, value):
"""
Check whether the given value should pass this filter.
:param value: A value
:return: True if the value passes this filter; False if not
"""
passes = True
if value == self.filter_value and not self.found:
self.found = True
passes = False
return passes
def _sro_relates(sro, id_):
"""
Determine whether the given SRO relates an object with the given ID to
any other object.
:param sro: An SRO
:param id_: A STIX ID
:return: True if SRO relates id_ to something; False if not
"""
sro_type = sro["type"]
if sro_type == "relationship":
relates = id_ in (sro["source_ref"], sro["target_ref"])
else:
# sightings
relates = id_ == sro["sighting_of_ref"] \
or id_ in sro.get("observed_data_refs", []) \
or id_ in sro.get("where_sighted_refs", [])
return relates
def _get_sro_other_ends(sro, this_end_id):
"""
Given an SRO and and the ID of an object it relates, find all the IDs
of other objects it relates the given ID to. This is a generator which
yields STIX IDs.
:param sro: An SRO
:param this_end_id: A STIX ID which the SRO relates to other things
"""
sro_type = sro["type"]
if sro_type == "relationship":
other_end_id = sro["target_ref"] \
if this_end_id == sro["source_ref"] \
else sro["source_ref"]
yield other_end_id
else:
# sightings
observed_data_refs = sro.get("observed_data_refs")
where_sighted_refs = sro.get("where_sighted_refs")
sighting_of_refs = (sro["sighting_of_ref"],) # always one of these
# We assume this_end_id exists in some relevant ref property for
# the sighting: that's one "end" of it. All other ref IDs are the
# other "ends". In fact, this_end_id could occur multiple times.
# We don't care where it occurs, but one of those is this end, and
# all others are other ends. The net result is that we want all ref
# IDs from the relevant ref properties, minus a single ID matching
# this_end_id. The following simply filters out the first occurrence
# as "this end".
# chain together all of the ref IDs in the sighting
all_refs = itertools.chain.from_iterable(
seq for seq in (
observed_data_refs, where_sighted_refs, sighting_of_refs
) if seq
)
# filter the first occurrence of this_end_id from the chain
filter_first_pred = _FilterFirst(this_end_id)
filtered_all_refs = (
id_ for id_ in all_refs if filter_first_pred(id_)
)
yield from filtered_all_refs
def _sro_cycle_undirected_dfs(
graph, curr_id, visited_ids=None, search_stack=None
):
"""
Do a depth-first-search starting from curr_id in the given graph, and look
for cycles. This treats SROs as edges, and SROs' "endpoints" as graph
nodes. SRO directionality is ignored (sightings don't have a "direction"
anyway).
:param graph: The STIX graph as a mapping from ID to object
:param curr_id: A start object ID for the search. Must be an SDO or SCO
ID (a type usable as an SRO endpoint).
:param visited_ids: A set of IDs of objects we've already seen. Prevents
re-traversing the same graph regions multiple times
:param search_stack: A search stack which builds up a path from the
start node to other nodes. This is used to detect the cycles.
:return: True if a cycle is detected; False if not
"""
if visited_ids is None:
visited_ids = set()
if search_stack is None:
search_stack = []
if curr_id in search_stack:
result = True
elif curr_id in visited_ids:
result = False
elif curr_id not in graph:
# dangling reference
result = False
else:
visited_ids.add(curr_id)
search_stack.append(curr_id)
for id_, obj in graph.items():
# Need to add the SROs to the stack too, because we don't want to
# reuse them in a cycle. Cycles require distinct objects *and*
# distinct SROs.
if stix2generator.utils.is_sro(obj) \
and id_ not in search_stack \
and _sro_relates(obj, curr_id):
search_stack.append(id_)
for other_end_id in _get_sro_other_ends(obj, curr_id):
result = _sro_cycle_undirected_dfs(
graph, other_end_id, visited_ids, search_stack
)
if result:
break
else:
search_stack.pop()
continue
search_stack.pop()
break
else:
result = False
search_stack.pop()
return result
def _has_sro_cycle_undirected(graph):
"""
Determine whether the given graph has an SRO-based cycle. SRO
directionality is ignored (sightings don't have a "direction" anyway).
:param graph: The STIX graph as a mapping from ID to object
:return: True if a cycle is detected; False if not
"""
# Need to find a start node, i.e. a SRO-connectable object in the graph.
sro_connectable_ids = (
id_ for id_, obj in graph.items() if stix2generator.utils.is_stix_type(
obj,
stix2generator.utils.STIXTypeClass.SDO,
stix2generator.utils.STIXTypeClass.SCO,
stix_version="2.1"
)
)
first_id = next(sro_connectable_ids, None)
# Should not happen: it would mean the graph is empty or contains no
# "normal" graph nodes (SDO/SCOs)!
assert first_id is not None
result = _sro_cycle_undirected_dfs(graph, first_id)
return result
[docs]def test_probability_reuse(num_trials):
# There shouldn't be any "cycles" if probability_reuse=0, since every
# SRO addition results in all new objects. I don't think there's any
# invariant we can test when probability_reuse=1...
stix_gen_config = stix2generator.generation.stix_generator.Config(
probability_reuse=0
)
stix_gen = stix2generator.create_stix_generator(
stix_generator_config=stix_gen_config,
stix_version="2.1"
)
for _ in range(num_trials):
graph = stix_gen.generate()
assert not _has_sro_cycle_undirected(graph)
[docs]@pytest.mark.parametrize(
"seed_type", [
"marking-definition",
"relationship",
"sighting"
]
)
def test_non_sro_connectable(num_trials, stix21_generator, seed_type):
for _ in range(num_trials):
stix21_generator.generate(seed_type)
def _observable_container_has_dangling_references(observable_container):
"""
Check all reference properties of all SCOs in the container, and determine
whether they reference objects which are also in the container.
:param graph: A STIX graph as a mapping from ID to object
:return: True if any references are dangling; False if not
"""
# This is almost a copy-paste of
# stix2generator.test.utils.has_dangling_references(), but changed to work
# on an observable-container, which is not a full STIX object.
for obj in observable_container.values():
for _, obj_id in stix2generator.utils.recurse_references(obj):
if obj_id not in observable_container:
result = True
break
else:
continue
break
else:
result = False
return result
[docs]def test_observed_data_observable_container(num_trials):
"""
Because of observed-data special-casing which occurs in the codebase,
this test is intended to ensure that SDO in particular isn't getting messed
up.
"""
# To induce observed-data SDOs to have an "objects" property (as opposed to
# the new "object_refs" property), configure the object generator to
# minimize properties. This will inhibit "object_refs" (since that's a ref
# property) and force "objects".
obj_gen_config = stix2generator.generation.object_generator.Config(
minimize_ref_properties=True
)
stix_gen = stix2generator.create_stix_generator(
object_generator_config=obj_gen_config
)
for _ in range(num_trials):
graph = stix_gen.generate("observed-data")
for id_, obj in graph.items():
if obj["type"] == "observed-data":
observable_container = obj["objects"]
assert not _observable_container_has_dangling_references(
observable_container
)
[docs]def test_preexisting_objects(stix21_generator):
graph1 = stix21_generator.generate()
graph2 = stix21_generator.generate(preexisting_objects=graph1)
# ensure graph2 absorbed graph1
assert all(
id_ in graph2 for id_ in graph1
)
# ensure all objects got parsed ok
assert all(
isinstance(obj, stix2.base._STIXBase)
for obj in graph2.values()
)
[docs]def test_stix2_parsing(stix21_generator):
graph1 = {
"identity--74fa9f1b-897e-40dc-8f1c-d2f531c956bb": {
"id": "identity--74fa9f1b-897e-40dc-8f1c-d2f531c956bb",
"type": "identity",
"spec_version": "2.1"
# Omit the required "name" property.
# Should be ok since the property is not used by any generators,
# and we don't expect this dict to be parsed and produce any
# validation errors.
}
}
graph2 = stix21_generator.generate(preexisting_objects=graph1)
# ensure graph2 absorbed graph1
assert all(
id_ in graph2 for id_ in graph1
)
# ensure our preexisting identity is still a dict
assert isinstance(
graph2["identity--74fa9f1b-897e-40dc-8f1c-d2f531c956bb"], dict
)