Source code for stix2generator.language.builder

import lark
import logging
import pprint
import stix2

from stix2generator.exceptions import (
    CircularVariableDependenciesError, LanguageError, RedeclaredVariableError,
    UndeclaredVariableError
)
from stix2generator.utils import (
    is_token, is_tree
)
import stix2generator.generation.semantics
import stix2generator.generation.object_generator
import stix2generator.logging


_grammar = r"""

// can we use the Lark-included string literal token?
%import common.ESCAPED_STRING

graph_spec : (statement ".")+

statement : variable_declaration_statement
  | graph_statement
  | sighting

variable_declaration_statement : variable_declaration ("," variable_declaration)* ":" SDO_TYPE_NAME

variable_declaration : count? VARIABLE_NAME property_block?

graph_statement : (sdo_ref | sdo_list) relationship?

relationship : count? RELATIONSHIP_NAME graph_statement

sdo_list : "(" sdo_ref+ ")"

sdo_ref : sdo_inline | VARIABLE_NAME

sdo_inline : count? SDO_TYPE_NAME property_block?

property_block : "{" [property_assignment ("," property_assignment)*] "}"

property_assignment : PROPERTY_NAME ":" (graph_statement | ESCAPED_STRING | string_array)

count : POSITIVE_INT

string_array : "[" [ESCAPED_STRING ("," ESCAPED_STRING)*] "]"

// This allows both the property block and "of" clause to be omitted
// (which will result in an error since "sighting_of_ref" is a required
// property), but it's simpler than expressing "at least one".
// Increase rule priority, since "Sighting of ..." is ambiguous w.r.t.
// graph_statement, with (supposed) sdo "Sighting" and relationship "of".
sighting.2 : "Sighting" property_block? ["of" graph_statement]

POSITIVE_INT : /[1-9][0-9]*/

SDO_TYPE_NAME : /[A-Z][A-Za-z0-9_]*/

// These definitions conflict; hopefully we can rely on Lark's
// terminal collision resolution.
RELATIONSHIP_NAME : /[a-z][a-z0-9-]*/
PROPERTY_NAME : /[a-z][a-z0-9_]*/
VARIABLE_NAME : /[a-z][a-z0-9_-]*/

WS : /[ \t\r\n\u000B\u000C\u0085\u00a0\u1680\u2000\u2001\u2002\u2003\u2004\u2005\u2006\u2007\u2008\u2009\u200a\u2028\u2029\u202f\u205f\u3000]+/

%ignore WS

"""


_parser = lark.Lark(
    _grammar,
    start="graph_spec",
    propagate_positions=True,
    # debug=True
)


# lazy-initialized
_log = None


# A custom property to use when embedding variable names in objects
_VARIABLE_NAME_PROPERTY = "x_viz_variable_name"


def _get_logger():
    global _log
    if _log is None:
        _log = logging.getLogger(__name__)

    return _log


def _make_object(object_generator, type_name, overlay_props=None):
    """
    Make an object via an object generator.

    :param object_generator: The generator to use
    :param type_name: The name of a spec, which the generator should recognize
    :param overlay_props: After having generated the object, a dict of
        property names and values which will be overlaid.  This can act to
        override generated property values.
    :return: The object, as a dict
    """

    sdo_dict = object_generator.generate(type_name)

    if overlay_props:
        sdo_dict.update(overlay_props)

    return sdo_dict


def _make_sro(object_generator, source_id, rel_type, target_id):
    """
    Make an SRO.

    :param source_id: The ID of the source object (string)
    :param rel_type: The relationship type (string)
    :param target_id: The ID of the target object (string)
    :return: The SRO, as a dict
    """

    rel = object_generator.generate("relationship")
    rel["source_ref"] = source_id
    rel["target_ref"] = target_id
    rel["relationship_type"] = rel_type

    return rel


def _make_sighting(object_generator, sighting_of, overlay_props=None):
    """
    Make a Sighting.

    :param sighting_of: A value for the sighting_of_ref property, or None to
        omit that property.
    :param overlay_props: After having generated the object, a dict of
        property names and values which will be overlaid.  This can act to
        override generated property values.
    :return: The sighting, as a dict
    """

    rel = object_generator.generate("sighting")

    if sighting_of:
        rel["sighting_of_ref"] = sighting_of
    if overlay_props:
        rel.update(overlay_props)

    return rel


def _print_parse_tree(logger, tree, indent=0):
    """
    A parse tree printer that's better than Lark's (in my opinion).  It shows
    token types as well as values (Lark's doesn't show the types).  It's done
    via the logger, at a custom extra-verbosity level.

    :param logger: The logger to use
    :param tree: The parse tree
    :param indent: An indent amount
    """
    line = " " * indent
    if is_token(tree):
        line += tree + " (" + tree.type + ")"
        logger.log(stix2generator.logging.EXTRA_VERBOSE, line)
    else:
        line += tree.data
        logger.log(stix2generator.logging.EXTRA_VERBOSE, line)
        for child in tree.children:
            _print_parse_tree(logger, child, indent + 1)


def _string_token_value(tok):
    """
    Given an ESCAPED_STRING token, unquote and unescape its value, to
    obtain the actual string it represents.

    :param tok: an ESCAPED_STRING token
    :return: The string value
    """
    return tok.value[1:-1].replace('\\"', '"').replace("\\\\", "\\")


def _topo_sort_dfs(dependencies, start_var, visited_vars, sorted_vars,
                   search_path):
    """
    Helper for topo sort, which does a post-order DFS from the given start
    variable to find the ordering.

    :param dependencies: The dependency data
    :param start_var: The start variable for the search
    :param visited_vars: Keeps track of which variables we've already seen,
        to prevent duplicates
    :param sorted_vars: A list which builds up the sort order
    :param search_path: A stack which records our DFS search path, so that
        we can detect cycles
    """

    visited_vars.add(start_var)
    search_path.append(start_var)

    if start_var in dependencies:
        for dep_var in dependencies[start_var]:

            if dep_var in search_path:
                cyclic_path = search_path[search_path.index(dep_var):]
                cyclic_path.append(dep_var)
                raise CircularVariableDependenciesError(cyclic_path)

            if dep_var not in visited_vars:
                _topo_sort_dfs(
                    dependencies, dep_var, visited_vars, sorted_vars,
                    search_path
                )

                sorted_vars.append(dep_var)

    search_path.pop()


def _topo_sort_dependencies(dependencies):
    """
    Topologically sorts the variables given in dependencies, to obtain a
    value creation order that is compatible with everyone's dependencies.

    :param dependencies: The dependency data, as a mapping from var name to
        a set of dependencies.  (This is basically an adjacency list
        representation of a directed graph.)
    :return: A list of variable names in sorted order
    :raises CircularVariableDependenciesError: If circular dependencies are
        encountered
    """
    visited_vars = set()
    sorted_vars = []

    for var_name in dependencies:
        if var_name not in visited_vars:
            _topo_sort_dfs(
                dependencies, var_name, visited_vars, sorted_vars, []
            )
            sorted_vars.append(var_name)

    return sorted_vars


def _remove_variable_declaration_statements(parse_tree):
    """
    Modify the parse tree by removing all variable declaration statements.
    This is an in-place modification: the given tree's list of children is
    modified.

    :param parse_tree: The whole parse tree (a subtree won't work)
    """

    indices_to_remove = [
        i for i, statement in enumerate(parse_tree.children)
        if statement.children[0].data == "variable_declaration_statement"
    ]

    # Easier to do in reverse order, so that deletions don't affect
    # subsequent indices.
    for i in reversed(indices_to_remove):
        del parse_tree.children[i]


class _VariableSpec:
    """
    Instances of this class serve as placeholders for variables which have
    dependencies.  They contain the information necessary to create a variable
    value later.  Their values aren't created when their declarations are
    encountered in the parse tree.
    """
    def __init__(self, var_name, var_count, var_type, var_props_tree):
        self.var_name = var_name
        self.var_count = var_count
        self.var_type = var_type
        self.var_props_tree = var_props_tree

    def __str__(self):
        s = "Placeholder for '{}', count={}, type={}".format(
            self.var_name, self.var_count, self.var_type
        )

        return s


class _VariableDependencyCollector(lark.Visitor):
    """
    A visitor which is intended to operate *only* on a variable property
    block.  It simply collects all variable names from the block.  These
    are the dependencies of the variable.  Note that there can never be
    indirect dependencies, since variables are never allowed to have property
    blocks, except when they are declared (i.e. no variable inside the block
    can itself have a block).  This makes it really easy: just grab all
    variable names you can find.
    """

    def __init__(self):
        self.variables = set()

    def sdo_ref(self, tree):
        if is_token(tree.children[0], "VARIABLE_NAME"):
            self.variables.add(tree.children[0])


class _VariableProcessor(lark.Transformer):
    """
    Selectively processes just the variable declarations in the parse tree.
    This creates STIX objects for all variables without any dependencies
    (references to other variables in a property block).  For variables with
    dependencies, objects are not created here.  Instead, a placeholder is
    created, and the dependency data is stored in a mapping to keep track of
    it.  Those objects are created later, since they can't be created as they
    are encountered in this parse tree traversal.
    """

    def __init__(self, object_generator):
        """
        Initialize the instance.

        :param object_generator: The object generator which will be used to
            generate values of variables.
        """
        super().__init__()

        self.__object_generator = object_generator

        self.__variables = {}
        self.__objects = {}
        self.__dependencies = {}

    def graph_spec(self, children):
        """
        Callback for the top-level rule in the grammar.  This returns
        variables (mapping from variable name to STIX object ID or
        placeholder), objects (mapping from ID to a STIX object dict), and
        dependencies (mapping from variable name to a set of names of all of
        its dependency variables), as a 3-tuple.
        """
        return self.__variables, self.__objects, self.__dependencies

    def variable_declaration_statement(self, children):
        """
        Updates the variable/object/dependency maps for all variables in the
        statement.
        """
        var_type = children[-1]  # SDO type is always last

        for var_count, var_name, var_props in children[:-1]:

            if var_name in self.__variables:
                raise RedeclaredVariableError(var_name)

            var_dependencies = None

            # Determine if there are any dependency variables
            if var_props:
                var_collector = _VariableDependencyCollector()
                var_collector.visit(var_props)
                var_dependencies = var_collector.variables

            if var_dependencies:
                # If there are dependencies, add a placeholder object to the
                # variables map for now; we need to replace these with actual
                # STIX objects later.  Also store the dependencies for later.
                self.__variables[var_name] = _VariableSpec(
                    var_name, var_count, var_type, var_props
                )
                self.__dependencies[var_name] = var_dependencies

            else:
                # Otherwise, we can directly create an object
                overlay_props = None
                if var_props:
                    builder = _GraphBuilder(
                        self.__object_generator,
                        self.__variables,
                        self.__objects
                    )
                    overlay_props = builder.transform(var_props)

                ids = []
                for _ in range(var_count):
                    obj = _make_object(
                        self.__object_generator, var_type, overlay_props
                    )
                    self.__objects[obj["id"]] = obj
                    ids.append(obj["id"])

                self.__variables[var_name] = ids

        # A formality; no one uses the return value.  But I should probably
        # return something, since all these callbacks are treated as returning
        # a value.
        return None

    def variable_declaration(self, children):
        """
        Produces count/varname/propblock 3-tuples.  The last value (prop block)
        may be None if no property block was given.
        """
        if is_tree(children[0], "count"):
            var_count = int(children[0].children[0])
            var_name = children[1]
        else:
            var_count = 1
            var_name = children[0]

        var_props = None
        if len(children) > 1 and is_tree(children[-1], "property_block"):
            var_props = children[-1]

        # Just forward the property block to the next level up, so we can
        # process the var name, count, type, and prop block together.  (We
        # don't know the variable type at this point, so we can't do anything
        # further here.)
        return var_count, var_name, var_props


class _GraphBuilder(lark.Transformer):
    """
    Selectively processes just the graph statements in a parse tree, and
    produces all of the objects described therein.  No variables are created;
    references to undeclared variables will produce errors.
    """

    def __init__(self, object_generator, variables, objects):
        """
        Initialize this transformer.

        :param object_generator: The object generator to use for making new
            objects.
        :param variables: A mapping from variable name to list of STIX IDs,
            which defines all variables usable in the graph statements.
        :param objects: A mapping from STIX ID to STIX object as dict, which
            contains values for the variables in the variables mapping, and any
            other related objects.  All new objects this transformer creates
            will also be placed in this mapping.
        """
        super().__init__()

        self.__object_generator = object_generator
        self.__variables = variables
        self.__objects = objects

    @lark.v_args(meta=True)
    def graph_statement(self, children, meta):
        """
        Create some objects, optionally connected via a relationship to
        other objects.  Produces the source object IDs only.
        """

        # The below code is simpler if we normalize to a list here, but we
        # still must remember whether this function should propagate a single
        # or list.  So create a new variable instead of modifying source_ids.
        source_ids = children[0]
        if isinstance(source_ids, list):
            source_ids_list = source_ids
        else:
            source_ids_list = [source_ids]

        if len(children) > 1:
            rel_count, rel_type, target_ids = children[1]

            # Same normalization for target IDs.  At least we have the freedom
            # to replace the value of target_ids this time.
            if not isinstance(target_ids, list):
                target_ids = [target_ids]

            for source_id in source_ids_list:

                # Hardcoded special handling of "on": this results in an
                # embedded relationship on the source object whose value is
                # *all* targets, not an SRO per target.  For this reason, it
                # doesn't make sense for the relationship count to be anything
                # but 1.
                if rel_type == "on":
                    if rel_count > 1:
                        raise LanguageError(
                            "Relationship 'on' must have count 1", meta
                        )
                    source_obj = self.__objects[source_id]
                    source_obj["object_refs"] = target_ids

                else:
                    for target_id in target_ids:
                        for _ in range(rel_count):
                            rel = _make_sro(
                                self.__object_generator,
                                source_id,
                                rel_type.value,
                                target_id
                            )
                            self.__objects[rel["id"]] = rel

        return source_ids

    def relationship(self, children):
        """
        Produce a rel_count, rel, target_ids triple representing most of a
        relationship (it doesn't include the source object(s))
        """

        if isinstance(children[0], int):
            rel_count, rel_type, target_ids = children
        else:
            rel_count = 1
            rel_type, target_ids = children

        return rel_count, rel_type, target_ids

    def sdo_list(self, children):
        """
        Produces the concatenation of all STIX object IDs in child nodes
        in a single list.
        """
        all_children = []
        for child in children:
            if isinstance(child, list):
                all_children.extend(child)
            else:
                all_children.append(child)

        return all_children

    def sdo_ref(self, children):
        """
        Produces a single or list of STIX object IDs, either from a
        referenced variable, or from newly created objects.
        """
        if is_token(children[0], "VARIABLE_NAME"):
            # ref is a variable
            var_name = children[0]
            if var_name in self.__variables:
                ids = self.__variables[var_name]
            else:
                raise UndeclaredVariableError(var_name)
        else:
            # ref is to inline SDO(s)
            ids = children[0]

        # A variable or inline SDO with count=1 will produce a single value;
        # count > 1 produces a list.  We need to propagate the right thing,
        # so that STIX properties which expect single IDs don't get lists.
        # (One wishes that such properties would accept a length-1 list, but
        # they don't.)
        if len(ids) == 1:
            ids = ids[0]

        return ids

    def sdo_inline(self, children):
        """Produces a list of the new STIX object IDs."""
        if isinstance(children[0], int):
            count = children[0]
            sdo_type = children[1]
        else:
            count = 1
            sdo_type = children[0]

        overlay_props = None
        if len(children) > 1 and isinstance(children[-1], dict):
            overlay_props = children[-1]

        ids = []
        for _ in range(count):
            obj = _make_object(self.__object_generator, sdo_type, overlay_props)
            self.__objects[obj["id"]] = obj
            ids.append(obj["id"])

        return ids

    def count(self, children):
        """Produces the count, as an int"""
        return int(children[0])

    def property_block(self, children):
        """
        Produces a mapping from prop name to ID(s) or other string literals,
        from the property block.
        """

        overlay_props = {}
        for prop_name, value in children:
            if is_token(value, "ESCAPED_STRING"):
                value = _string_token_value(value)

            overlay_props[prop_name] = value

        return overlay_props

    def property_assignment(self, children):
        """
        Propagates its children.  This will produce a 2-item list including
        the property name and a single or list of STIX IDs which are the "top"
        of the object graph which is the property value.
        """
        return children

    def string_array(self, children):
        """
        Produces a list of strings from the array.
        """
        values = [
            _string_token_value(tok)
            for tok in children
        ]

        return values

    @lark.v_args(meta=True)
    def sighting(self, children, meta):
        """
        Creates a sighting object (and all other related objects).
        """
        sighting_of = overlay_props = None

        if len(children) == 1:
            if isinstance(children[0], dict):
                overlay_props = children[0]
                sighting_of = None
            else:
                overlay_props = None
                sighting_of = children[0]

        elif len(children) > 1:
            overlay_props = children[0]
            sighting_of = children[1]

        if sighting_of and overlay_props and "sighting_of_ref" in overlay_props:
            raise LanguageError(
                "property 'sighting_of_ref' can't both be given explicitly and "
                "in the property block",
                meta
            )

        obj = _make_sighting(
            self.__object_generator, sighting_of, overlay_props
        )
        self.__objects[obj["id"]] = obj

        # A formality; no one uses the return value.  But I should probably
        # return something, since all these callbacks are treated as returning
        # a value.
        return None


[docs]class LanguageProcessor: """ Instances process stix prototyping language, and produce stix objects. """ def __init__(self, object_generator, stix_version="2.1"): """ Initialize this processor. :param object_generator: The object generator to use :param stix_version: Which version of STIX to use. :raises stix2generator.exceptions.RegistryNotFoundError: If there isn't a built-in registry for the given STIX version :raises IOError: (python 2) If the registry JSON file couldn't be opened or read :raises OSError: (python 3) If the registry JSON file couldn't be opened or read (IOError is retained as an alias for backward compatibility). """ self.__object_generator = object_generator self.__stix_version = stix_version
[docs] def build_graph( self, graph_spec, return_variable_bindings=False, embed_variable_names=False ): """ Build STIX objects from the given specification, expressed with the STIX prototyping language. :param graph_spec: The graph specification :param return_variable_bindings: Whether the caller wants the variable bindings returned :param embed_variable_names: Whether variable names should be embedded in generated objects bound to a variable, using a custom property :return: If return_variable_bindings is False, a list of STIX objects. Otherwise, return a 2-tuple where the first item is the list of generated objects, and the second is a mapping from variable name to list of STIX IDs. This latter value represents the variable bindings. :raises stix2generator.builder.LanguageError: If there are any errors processing the specification itself :raises stix2generator.make_object.ObjectGenerationError: If there is an error randomly generating a STIX object :raises stix2.exceptions.STIXError: If the objects and/or relationships expressed in the language, aren't valid STIX """ logger = _get_logger() parse_tree = _parser.parse(graph_spec) if logger.isEnabledFor(stix2generator.logging.EXTRA_VERBOSE): logger.log(stix2generator.logging.EXTRA_VERBOSE, "Parse tree:") _print_parse_tree(logger, parse_tree) variables, objects, dependencies = \ _VariableProcessor(self.__object_generator).transform(parse_tree) if dependencies: self.__process_variable_dependencies( variables, objects, dependencies ) if variables and logger.isEnabledFor(logging.DEBUG): logger.debug("Variables:") for var, ids in variables.items(): logger.debug("%s = %s", var, ids) # This is necessary so that GraphBuilder doesn't re-visit property # blocks in variable declarations, and erroneously create extra objects. _remove_variable_declaration_statements(parse_tree) graph_builder = _GraphBuilder( self.__object_generator, variables, objects ) graph_builder.transform(parse_tree) # Embed variable names if requested if embed_variable_names: for var, ids in variables.items(): for id_ in ids: objects[id_][_VARIABLE_NAME_PROPERTY] = var.value if logger.isEnabledFor(logging.DEBUG): logger.debug("Objects:") for obj in objects.values(): logger.debug(pprint.pformat(obj)) stix_objs = [ stix2.parse( stix_dict, version=self.__stix_version, allow_custom=True ) for stix_dict in objects.values() ] if return_variable_bindings: # The internal variables mapping is keyed by tokens; I don't think # that makes sense for users. So make a new map keyed by plain # string variable names. variables_plain_names = { var.value: ids for var, ids in variables.items() } ret = stix_objs, variables_plain_names else: ret = stix_objs return ret
def __process_variable_dependencies(self, variables, objects, dependencies): """ Create values for all variables with dependencies. :param variables: A mapping with any already-initialized variables, and all of the VariableSpec placeholders which were created. This will be updated with newly created variables. :param objects: A mapping to store all newly created objects in. :param dependencies: The dependency data, as a mapping from var name to a set of dependencies. (This is basically an adjacency list representation of a directed graph.) :raises UndeclaredVariableError: If there are any references to undeclared variables :raises CircularVariableDependenciesError: If circular dependencies are encountered """ logger = _get_logger() sorted_var_names = _topo_sort_dependencies(dependencies) if logger.isEnabledFor(logging.DEBUG): logger.debug( "Variable creation order: %s", ", ".join(tok.value for tok in sorted_var_names) ) for var_name in sorted_var_names: if var_name not in variables: raise UndeclaredVariableError(var_name) var_spec = variables[var_name] # Our "leaf node" variables will be regular ones, not # VariableSpec's; we don't need to do anything for those. if not isinstance(var_spec, _VariableSpec): continue builder = _GraphBuilder(self.__object_generator, variables, objects) overlay_props = builder.transform(var_spec.var_props_tree) ids = [] for _ in range(var_spec.var_count): obj = _make_object( self.__object_generator, var_spec.var_type, overlay_props ) ids.append(obj["id"]) objects[obj["id"]] = obj # Now we can replace the spec with real IDs variables[var_name] = ids