import collections.abc
import datetime
import logging
import random
import stix2
import stix2.parsing
import stix2.utils
import stix2generator
from stix2generator.exceptions import (
UnhandledPropertyValueType, SpecificationNotFoundError,
UnsupportedObjectStructureError, UnrecognizedSTIXTypeError,
InvalidRefPropertyValueError
)
import stix2generator.generation
import stix2generator.utils
# Don't create object paths which include the following properties.
_OBJECT_PATH_BLACKLIST = {
"id",
"type",
"spec_version"
}
# Maps types we would find as property values of stix2 objects, to the
# operators we may use them with in a comparison expression.
_PYTHON_TYPE_TO_OPERATORS = {
int: ("<", "<=", ">", ">=", "=", "!="),
float: ("<", "<=", ">", ">=", "=", "!="),
str: ("<", "<=", ">", ">=", "=", "!="),
stix2.utils.STIXdatetime: ("<", "<=", ">", ">=", "=", "!="),
bool: ("=", "!="),
}
# stix2's AST uses a different class for each comparison operator. Use this
# to resolve an operator to the corresponding AST class.
_OPERATOR_TO_AST_CLASS = {
"<": stix2.LessThanComparisonExpression,
"<=": stix2.LessThanEqualComparisonExpression,
">": stix2.GreaterThanComparisonExpression,
">=": stix2.GreaterThanEqualComparisonExpression,
"=": stix2.EqualityComparisonExpression,
"!=": stix2.EqualityComparisonExpression,
# "!=" uses the same EqualityComparisonExpression class, just constructed
# differently, so it will have to be a special case.
}
# Represents list index "star" steps in object paths
_INDEX_STAR_STEP = object()
[docs]class Config(stix2generator.generation.Config):
"""
STIX pattern generator config settings.
min/max_pattern_size: Bounds on pattern size, in terms of the total number
of simple comparison expressions across all observation expressions.
min/max_repeat_count: bounds on the repeat count for the REPEATS qualifier.
min/max_within_count: bounds on the number of seconds for the WITHIN
qualifier.
probability_qualifier: Probability a given observation expression (at any
nesting level) will get a random qualifier.
probability_continue_path_through_ref: When randomly generating an object
path, determines how likely the path will continue through a reference
property to another object.
probability_index_star_step: When randomly generating an object path,
determines how likely list index steps will use '*'.
"""
_DEFAULTS = {
"min_pattern_size": 1,
"max_pattern_size": 5,
"min_repeat_count": 1,
"max_repeat_count": 10,
"min_within_count": 1,
"max_within_count": 10,
"probability_qualifier": 0.2,
"probability_continue_path_through_ref": 0.8,
"probability_index_star_step": 0.1
}
def _rand_series(n):
"""
Generate a random series (sum) which adds to 'n'. The generated series
takes the form of a sequence of positive integers which add to n. If
n <= 0, nothing is generated.
:param n: The desired sum, as an integer
"""
# With this algorithm, the probability distribution over series is not
# uniform: it favors shorter series over longer ones. Should I try to
# think of something better?
while n > 0:
k = random.randint(1, n)
yield k
n -= k
def _random_operator_for_type(type_):
"""
Pick a random comparison expression "operator" for the given type. In
fact, there is a different AST class for (most) comparison expression
operators, so we must actually choose a random class instead.
There is no class corresponding to "!="; it is realized with the class for
equality, and with a flag for negation. So we must actually return both a
class and a boolean indicating whether to negate.
:param type_: A type, e.g. int, str, etc
:return: A 2-tuple consisting of (1) an AST class, and (2) a boolean
negation value, or None if the type is unrecognized and a set of legal
operators can't be determined.
"""
candidate_ops = _PYTHON_TYPE_TO_OPERATORS.get(type_)
if not candidate_ops:
return None
op = random.choice(candidate_ops)
ast_class = _OPERATOR_TO_AST_CLASS[op]
# Special case... there is no AST class for not-equal. Instead, it uses
# EqualityComparisonExpression with a flag for negation.
negated = op == "!="
return ast_class, negated
def _is_ref_path(path_elements):
"""
Determine whether the given object path, expressed as an element list
(see _element_list_to_object_path()), ends with a reference and is
therefore eligible for continuation through the reference. The given
object path is assumed to be "completed" down to a single STIX property
value. This means that a *_ref property will be the last component, and
*_refs will be second-to-last, because it requires a subsequent index step.
:param path_elements: An object path, as a list
:return: True if a continuable reference path; False if not
"""
result = False
if path_elements:
last_elt = path_elements[-1]
if isinstance(last_elt, str) and last_elt.endswith("_ref"):
result = True
elif len(path_elements) > 1:
# for _refs properties, the ref property itself must be
# second-to-last, and the last path element must be an index step,
# either "*" or an int. Maybe not necessary to check the index
# step; all we need is to check the second-to-last property.
second_last_elt = path_elements[-2]
if isinstance(second_last_elt, str) \
and second_last_elt.endswith("_refs"):
result = True
return result
def _element_list_to_object_path(object_type, path_elements):
"""
Build an AST ObjectPath instance from an object "path" given as a list
of strings, ints and the special _INDEX_STAR_STEP object, used for list
index "star" steps. The strings are interpreted as property names and
the ints/star steps as list indices.
:param object_type: The SCO type to use for the ObjectPath instance
:param path_elements: The path elements as a list of
strings/ints/_INDEX_STAR_STEPs
:return: An ObjectPath instance
"""
path_components = []
i = 0
while i < len(path_elements):
elt_i = path_elements[i]
if not isinstance(elt_i, str):
raise UnsupportedObjectStructureError(
object_type, path_elements
)
if i < len(path_elements) - 1:
elt_i1 = path_elements[i+1]
if isinstance(elt_i1, int):
component = stix2.ListObjectPathComponent(elt_i, elt_i1)
i += 1
elif elt_i1 is _INDEX_STAR_STEP:
component = stix2.ListObjectPathComponent(elt_i, "*")
i += 1
# ignoring ReferenceObjectPathComponent here. I think the pattern
# visitor never uses it(?), so I guess I won't either.
else:
component = stix2.BasicObjectPathComponent(elt_i, False)
else:
component = stix2.BasicObjectPathComponent(elt_i, False)
path_components.append(component)
i += 1
object_path = stix2.ObjectPath(object_type, path_components)
return object_path
[docs]class PatternGenerator:
"""
Instances of this class generate random STIX patterns.
"""
def __init__(self, object_generator, stix_version, config=None):
"""
Initialize this PatternGenerator. Patterns are generated by randomly
walking through randomly generated objects, so a pattern generator
relies on an object generator for its random STIX content.
:param object_generator: A STIX object generator
:param stix_version: The STIX version to generate patterns for. (This
should probably match up with the version of objects generated by
object_generator!)
:param config: A Config object with settings for pattern generation,
or None to choose default settings
"""
cls = self.__class__
self.__log = logging.getLogger(
cls.__module__ + "." + cls.__name__
)
self.__generator = object_generator
self.__stix_version = stix_version
self.__config = config or Config()
def __random_sco_type(self):
return stix2generator.utils.random_generatable_stix_type(
self.__generator,
stix2generator.utils.STIXTypeClass.SCO,
stix_version=self.__stix_version
)
def __generate_object_path(self, type_constraint=None):
"""
Generate a random object path. This is done by generating a random
object, and then choosing a random path through it. If type_constraint
is given, an object of that type is generated. Otherwise, a random SCO
type is chosen. The value at the "end" of the path is also returned,
for use in the pattern.
:param type_constraint: An SCO type, or None
:return: A 2-tuple consisting of (1) the ObjectPath instance, and (2)
a value from the object. This value will be taken from a stix2
object, so that determines its type. It could be a string,
STIXdatetime instance, etc.
"""
if type_constraint:
sco_type = type_constraint
else:
sco_type = self.__random_sco_type()
try:
obj_dict = self.__generator.generate(sco_type)
except SpecificationNotFoundError as e:
raise UnrecognizedSTIXTypeError(sco_type) from e
obj = stix2.parse(obj_dict, allow_custom=True)
path_elements = []
while True:
if isinstance(obj, collections.abc.Mapping):
candidate_props = obj.keys() - _OBJECT_PATH_BLACKLIST
element = stix2generator.utils.rand_iterable(candidate_props)
elif isinstance(obj, list):
element = random.randrange(len(obj))
else:
break
# Let's have a chance to append an index "star" step when a
# list is encountered, instead of the chosen index.
if isinstance(element, int) and \
random.random() < self.__config.probability_index_star_step:
path_elements.append(_INDEX_STAR_STEP)
else:
path_elements.append(element)
obj = obj[element]
object_path = _element_list_to_object_path(sco_type, path_elements)
if _is_ref_path(path_elements) and \
random.random() < \
self.__config.probability_continue_path_through_ref:
# If a ref path, the value must be an ID. Extract the object type
# from the ID and generate a path of that type to concatenate to our
# path. In this way, we can continue the path through references.
dd_idx = obj.find("--")
if dd_idx == -1:
raise InvalidRefPropertyValueError(obj)
id_type = obj[:dd_idx]
try:
path_continuation, obj = self.__generate_object_path(id_type)
except UnrecognizedSTIXTypeError:
# We couldn't generate an SCO of type id_type. Reduce this to
# a warning; we will simply not have a continued path in this
# case.
self.__log.warning(
'Truncating object path due to unrecognized SCO type "%s"',
id_type
)
else:
object_path.property_path.extend(
path_continuation.property_path
)
return object_path, obj
def __generate_simple_comparison_expression(self, type_constraint=None):
"""
Generate a "simple" <path> <op> <value> comparison expression. If
a type constraint is given, that will be the SCO type for the path.
Otherwise, a random SCO type is chosen.
:param type_constraint: An SCO type, or None
:return: An AST instance for a simple comparison expression
"""
object_path, value = self.__generate_object_path(type_constraint)
result = _random_operator_for_type(type(value))
if result is None:
raise UnhandledPropertyValueType(value)
ast_class, negated = result
ast_node = ast_class(object_path, value)
ast_node.negated = negated
return ast_node
def __generate_simple_comparison_expression_list(
self, size, type_constraint, is_and
):
"""
Generate a list of the given size of "simple" comparison expressions,
which honors the given type constraint, relative to the indicated
boolean connective. is_and indicates how the returned comparison
expressions will be used. They will be connected with 'AND' if is_and
is True, else 'OR'. Therefore, if a type constraint is given and
is_and is True, all generated comparison expressions must be of the
given type. Otherwise, at least one must be of the given type. If no
type constraint is given then is_and must be False, because AND'd
comparison expressions require a constraint. If is_and is False and
no type constraint is given, all comparison expressions will be of
randomly chosen types.
:param size: The number of simple comparison expressions to generate
:param type_constraint: An SCO type, or None
:param is_and: True if the returned expressions will be connected via
AND; False if they will be connected via OR.
:return: The list of comparison expressions (ASTs).
"""
assert size >= 0
# If AND, all operands *must* be type-constrained.
assert not is_and or type_constraint
if type_constraint:
if is_and:
# In 'AND': all simple exprs must be of the same type
result = [
self.__generate_simple_comparison_expression(
type_constraint
)
for _ in range(size)
]
else:
# In 'OR': at least one must be of the constraining type.
# Create N-1 unconstrained exprs and 1 constrained expr...
if size == 0:
result = []
else:
result = [
self.__generate_simple_comparison_expression(None)
for _ in range(size-1)
]
constrained_expr = \
self.__generate_simple_comparison_expression(
type_constraint
)
# Then insert the constrained one at a random location in
# the list
result.insert(
random.randint(0, len(result)),
constrained_expr
)
else:
# no type constraint; must be an 'OR'. So we can generate whatever
# types we want.
result = [
self.__generate_simple_comparison_expression(
None
)
for _ in range(size)
]
return result
def __generate_complex_comparison_expression(
self, size, type_constraint=None
):
"""
Generates a "complex" comparison expression, i.e. one which may consist
of sub-expressions connected via AND or OR. If a type constraint is
given, the resulting expression will honor that constraint.
:param size: The size of the desired complex comparison expression, in
terms of the number of simple comparison expressions it must contain
:param type_constraint: An SCO type, or None
:return:
"""
assert size > 0
# This complex expression must be composed of N simple expressions.
# This implementation builds the overall expression in two parts: a
# left and right side. The location of the split between left and
# right is random. A side is randomly chosen to just contain a series
# of simple expressions, and the other side will have a nested
# subexpression.
#
# One goal of the strategy is to avoid excessive nested parentheses.
# Too many parentheses results in ugly crazy-looking patterns. This
# algorithm still can generate some silly patterns, but I hope it helps
# a little.
if size == 1:
expr = self.__generate_simple_comparison_expression_list(
1, type_constraint, False
)[0]
else:
# Choose whether top-level operator will be AND or OR.
# This will also determine how we handle the type constraint.
is_and = random.random() < 0.5
# If AND, all operands *must* be type-constrained.
if is_and and not type_constraint:
type_constraint = self.__random_sco_type()
# In the following, if type_constraint is None, both left and right
# constraints will be None. No need for a special case. If we
# have a type constraint, for 'AND', the constraint must be
# enforced on both sides. For 'OR', we need only enforce it on one
# side.
if is_and:
left_constraint = right_constraint = type_constraint
else:
left_constraint, right_constraint = type_constraint, None
if random.random() < 0.5:
left_constraint, right_constraint = \
right_constraint, left_constraint
# Don't let either side be zero size here. Avoids the case where
# we have an OR, and randomly choose to enforce the type constraint
# on the zero-length side. That can result in an invalid pattern.
lsize = random.randint(1, size-1)
rsize = size - lsize
if random.random() < 0.5:
# Parenthesize right case
operands = self.__generate_simple_comparison_expression_list(
lsize, left_constraint, is_and
)
operands.append(stix2.ParentheticalExpression(
self.__generate_complex_comparison_expression(
rsize, right_constraint
)
))
else:
# Parenthesize left case
operands = [stix2.ParentheticalExpression(
self.__generate_complex_comparison_expression(
lsize, left_constraint
)
)]
operands.extend(
self.__generate_simple_comparison_expression_list(
rsize, right_constraint, is_and
)
)
if is_and:
expr = stix2.AndBooleanExpression(operands)
else:
expr = stix2.OrBooleanExpression(operands)
return expr
def __generate_random_qualifier(self):
"""
Generate a random qualifier AST object.
:return: The qualifier object
"""
qual_type = random.randrange(3)
if qual_type == 0:
repeat_count = random.randint(
self.__config.min_repeat_count,
self.__config.max_repeat_count
)
qualifier = stix2.RepeatQualifier(repeat_count)
elif qual_type == 1:
within_count = random.randint(
self.__config.min_within_count,
self.__config.max_within_count
)
qualifier = stix2.WithinQualifier(within_count)
else:
# Let's make the random timestamps near the current time
# (within a year).
dur1 = datetime.timedelta(
microseconds=random.randrange(
# 1 year
1000000 * 60 * 60 * 24 * 365
)
)
dur2 = datetime.timedelta(
microseconds=random.randrange(
# 1 year
1000000 * 60 * 60 * 24 * 365
)
)
if random.random() < 0.5:
dur1 = -dur1
if random.random() < 0.5:
dur2 = -dur2
now = datetime.datetime.utcnow()
dt1 = now + dur1
dt2 = now + dur2
# Order them as start=dt1, stop=dt2
if dt1 > dt2:
dt1, dt2 = dt2, dt1
elif dt1 == dt2:
# in the remote chance we get the same timestamp for both,
# just nudge one ahead...
dt2 += datetime.timedelta(seconds=1)
# STIX 2.0 requires string constants and millisecond precision
# here...
if self.__stix_version == "2.0":
dt1_str = stix2.utils.format_datetime(
stix2.utils.STIXdatetime(dt1, precision="millisecond")
)
dt1 = stix2.patterns.StringConstant(dt1_str)
dt2_str = stix2.utils.format_datetime(
stix2.utils.STIXdatetime(dt2, precision="millisecond")
)
dt2 = stix2.patterns.StringConstant(dt2_str)
qualifier = stix2.StartStopQualifier(dt1, dt2)
return qualifier
def __generate_observation_expression(self, size):
"""
Generate a random complex observation expression, which may consist of
sub-expressions and qualifiers.
:param size: The size of the desired observation expression, in terms of
the number of simple comparison expressions it must contain
:return: The observation expression AST
"""
assert size > 0
# The generation strategy is similar to that for comparison expressions
# (see __generate_complex_comparison_expression()). It is generated in
# two parts of random size; one side is constructed as a sub-expression.
if size == 1:
obs_expr = stix2.ObservationExpression(
self.__generate_complex_comparison_expression(1)
)
else:
lsize = random.randint(0, size)
rsize = size - lsize
if random.random() < 0.5:
# Parenthesize right case
obs_exprs = [
stix2.ObservationExpression(
self.__generate_complex_comparison_expression(sz)
)
for sz in _rand_series(lsize)
]
if rsize > 0:
obs_exprs.append(stix2.ParentheticalExpression(
self.__generate_observation_expression(rsize)
))
else:
# Parenthesize left case
if lsize == 0:
obs_exprs = []
else:
obs_exprs = [stix2.ParentheticalExpression(
self.__generate_observation_expression(lsize)
)]
obs_exprs.extend(
stix2.ObservationExpression(
self.__generate_complex_comparison_expression(sz)
)
for sz in _rand_series(rsize)
)
ast_class = random.choice((
stix2.AndObservationExpression,
stix2.OrObservationExpression,
stix2.FollowedByObservationExpression
))
obs_expr = ast_class(obs_exprs)
if random.random() < self.__config.probability_qualifier:
qualifier = self.__generate_random_qualifier()
obs_expr = stix2.QualifiedObservationExpression(obs_expr, qualifier)
return obs_expr
[docs] def generate_ast(self):
"""
Generate a random STIX pattern as an AST.
:return: A pattern AST
"""
size = random.randint(
self.__config.min_pattern_size,
self.__config.max_pattern_size
)
return self.__generate_observation_expression(size)
[docs] def generate(self):
"""
Generate a random STIX pattern.
:return: A pattern string
"""
pattern_ast = self.generate_ast()
return str(pattern_ast)