Anomaly Detection Using Sigma Rules: Build Your Own Spark Streaming Detections | by Jean-Claude Cote | Jun, 2023


Easily deploy Sigma rules in Spark streaming pipelines: a future-proof solution supporting the upcoming Sigma 2 specification

Photo by Dana Walker on Unsplash

In our previous articles we elaborated and designed a stateful function named flux-capacitor.

The flux-capacitor stateful function that can remember parent-child (and ancestor) relationships between log events. It can also remember events occurring on the same host in a certain window of time, the Sigma specification refers to this as temporal proximity correlation .

For a deep-dive into the design of flux-capacitor refer to part 1 , part 2, part 3, part 4, and part5. However, you don’t need to understand the implementation of the function to use it.

In this article we first show a Spark streaming job which performs discreet detections. A discreet detection is a Sigma rule which uses the features and values of a single log line (a single event).

Then we leverage the flux-capacitor function to handle stateful parent-child relationships between log events. The flux-capacitor is also able to detect a number of events occurring on the same host in a certain window of time; these are called temporal proximity correlation in the upcoming Sigma specification. A complete demo of these spark streaming jobs is available in our git repo .

Performing discrete tests is fairly straightforward, thanks to all the built-in functions that come out-of-the-box in Spark. Spark has support for reading streaming sources, writing to sinks, checkpointing, stream-stream joins, windowed aggregations and many more. For a complete list of the possible functionalities, see the comprehensive Spark Structured Streaming Programming Guide.

Here’s a high level diagram showing a Spark streaming job that consumes events from an Iceberg table of “start-process” windows events (1). A classic example of this is found in Windows Security Logs (Event ID 4688).

Topology for discrete detections

The source table (1) is named process_telemetry_table. The Spark job reads all events, detects anomalous events, tags these events and writes them to table (3) named tagged_telemetry_table. Events deemed anomalous are also written to a table (4) containing alerts.

Periodically we poll a git repository (5) containing the SQL auto-generated from the Sigma rules we want to apply. If the SQL statements change, we restart the streaming job to add these new detections to the pipeline.

Let’s take this Sigma rule as an example:

screenshot from proc_creation_win_rundll32_sys.yml at Sigma HQ

The detection section is the heart of the Sigma rule and consists of a condition and 1 or more named tests. The selection1 and selection2 are named boolean tests. The author of the Sigma rule can give meaniningful names to these tests. The condition is where the user can combine the tests in a final evaluation. See the Sigma specification for more details on writing a Sigma rule.

From now on we will refer to these named boolean tests as tags.

The inner workings of the Spark streaming job is broken down into 4 logical steps:

  • read the source table process_telemetry_table
  • perform pattern matching
  • evaluate final condition
  • write the results

The Pattern Match step consist of evaluating the tags found in the Sigma rule and the Eval final condition evaluates thecondition.

On the right of this diagram we show what the row would look like at this stage of processing. The columns in blue represent values read from the source table. The Pattern Match step adds a column named Sigma tags which is a map of all the tests performed and whether the test passed or failed. The gray column contains the final Sigma rule evaluations. Finally, the brown columns are added in the foreachBatch function. A GUID is generated, the rule names that are true are extracted from the Sigma tags map and the detection action is retrieved from a lookup map of rule-name to rule-type. This gives context to the alerts produced.

This diagram depicts how attributes of the event are combined into tags, final evaluation and finally contextual information.

Let’s now look at the actual pyspark code. First, we connect spark to the source table using the readStream function and specifying the name from which the iceberg table is read. The load function returns a dataframe, which we use to create a view named process_telemetry_view.

spark
.readStream
.format("iceberg")
.option("stream-from-timestamp", ts)
.option("streaming-skip-delete-snapshots", True)
.option("streaming-skip-overwrite-snapshots", True)
.load(constants.process_telemetry_table)
.createOrReplaceTempView("process_telemetry_view")

The data in the process_telemetry_view looks like this:

+-------------------+---+---------+---------------------+                       
|timestamp |id |parent_id|Commandline |
+-------------------+---+---------+---------------------+
|2022-12-25 00:00:01|11 |0 | |
|2022-12-25 00:00:02|2 |0 |c:winnotepad.exe |
|2022-12-25 00:00:03|12 |11 | |
|2022-12-25 00:00:08|201|200 |cmdline and args |
|2022-12-25 00:00:09|202|201 | |
|2022-12-25 00:00:10|203|202 |c:test.exe |
+-------------------+---+---------+---------------------+

On this view we apply a Pattern Matching step which consists of an auto-generated SQL statement produced by the Sigma compiler. The patern_match.sql file looks like this:

select
*,
-- regroup each rule's tags in a map (ruleName -> Tags)
map(
'rule0',
map(
'selection1', (CommandLine LIKE '%rundll32.exe%'),
'selection2', (CommandLine LIKE '%.sys,%' OR CommandLine LIKE '%.sys %'),
)
) as sigma
from
process_telemetry_view

We use spark.sql() to apply this statement to the process_telemetry_view view.

df = spark.sql(render_file("pattern_match.sql"))
df.createOrReplaceTempView("pattern_match_view")

Notice that the results of each tag found in the Sigma rule are stored in a map of boolean values. The sigma column holds the results of each tag found in each Sigma rule. By using a MapType we can easily introduce new Sigma rules without affecting the schema of the table. Adding a new rule simply adds a new entry in the sigmacolumn (a MapType) .

+---+---------+---------------------+----------------------------------+
|id |parent_id|Commandline |sigma
+---+---------+---------------------+----------------------------------+
|11 |0 | |{rule0 -> {
selection1 -> false,
selection2 -> false
},
}

Similarly, the Eval final condition step applies the conditions from the Sigma rules. The conditions are compiled into an SQL statement, which use map, map_filter, map_keys, to build a column named sigma_final. This column holds the name of all the rules that have a condition that evaluates to true.


select
*,
map_keys( -- only keep the rule names of rules that evaluted to true
map_filter( -- filter map entries keeping only rules that evaluated to true
map( -- store the result of the condition of each rule in a map
'rule0',
-- rule 0 -> condition: all of selection*
sigma.rule0.selection1 AND sigma.rule0.selection2)
)
, (k,v) -> v = TRUE)) as sigma_final
from
pattern_match_view

The auto-generated statement is applied using spark.sql().

df = spark.sql(render_file("eval_final_condition.sql"))

Here’s the results with the newly added sigma_final column, an array of rules that fire.

+---+---------+-------------------------------------+-------------+
|id |parent_id|sigma | sigma_final |
+---+---------+-------------------------------------+-------------+
|11 |0 |{rule0 -> { | [] |
selection1 -> false,
selection2 -> false
}
}

We are now ready to start the streaming job for our dataframe. Notice that we pass in a call back function for_each_batch_function to the foreachBatch.

streaming_query = (
df
.writeStream
.queryName("detections")
.trigger(processingTime=f"{trigger} seconds")
.option("checkpointLocation", get_checkpoint_location(constants.tagged_telemetry_table) )
.foreachBatch(foreach_batch_function)
.start()
)

streaming_query.awaitTermination()

The for_each_batch_function is called at every micro-batch and is given the evaluated batchdf dataframe. The for_each_batch_function writes the entirety of batchdf to the tagged_telementry_table and also writes alerts for any of the Sigma rules that evaluated to true.

def foreach_batch_function(batchdf, epoch_id):
# Transform and write batchDF
batchdf.persist()
batchdf.createOrReplaceGlobalTempView("eval_condition_view")
run("insert_into_tagged_telemetry")
run("publish_suspected_anomalies")
spark.catalog.clearCache()

The details of insert_into_tagged_telemetry.sql and publish_suspected_anomalies.sql can be found in our git repo.

As mentioned above, writing a streaming anomaly detection handling discreet test is relatively straightforward using the built-in functionality found in Spark.

Thus far we showed how to detect events with discrete Sigma rules. In this section we leverage the flux-capacitor function to enable caching tags and testing tags of past events. As discussed in our previous articles, the flux-capacitor lets us detect parent-child relationships and also sequences of arbitrating features of past events.

These types of Sigma rules need to simultaneously consider the tags of the current event and of past events. In order to perform the final rule evaluation, we introduce a Time travel tags step to retrieve all of past tags for an event and merge them with the current event. This is what the flux-capacitor function is designed to do, it caches and retrieves past tags. Now that past tags and current tags are on the same row, the Eval final condition can be evaluated just like we did in our discreet example above.

The detection now looks like this:

The flux-capacitor is given the Sigma tags produced by the Pattern Match step. The flux-capacitor stores these tags for later retrieval. The column in red has the same schema as the Sigma tags column we used before. However, it combines current and past tags, which the flux-capacitor retrieved from its internal state.

Adding caching and retrieval of past tags is easy thanks to the flux-capacitor function. Here’s how we applied the flux-capacitor function in our Spark anomaly detection. First, pass the dataframe produced by the Pattern Match step to the flux_stateful_function and the function returns another dataframe, which contains past tags.

flux_update_spec = read_flux_update_spec()
bloom_capacity = 200000
# reference the scala code
flux_stateful_function = spark._sc._jvm.cccs.fluxcapacitor.FluxCapacitor.invoke
# group logs by host_id
jdf = flux_stateful_function(
pattern_match_df._jdf,
"host_id",
bloom_capacity,
flux_update_spec)
output_df = DataFrame(jdf, spark)

To control the behavior of the flux_stateful_function we pass in a flux_update_spec. The flux-capacitor specification is a yaml file produced by the Sigma compiler. The specification details which tags should be cached and retrieved and how they should be handled. The action attribute can be set to parent, ancestor or temporal.

Let’s use a concrete example from Sigma HQ proc_creation_win_rundll32_executable_invalid_extension.yml

screenshot from Sigma HQ github

Again the heart of the detection consists of tags and of a final condition which puts all these tags together. Note however that this rule (that we will refer to as Rule 1) involves tests against CommandLine and also test on the parent process ParentImage. ParentImage is not a field found in the start-process logs. Rather it refers to the Image field of the parent process.

As seen before, this Sigma rule will be compiled into SQL to evaluate the tags and to combine them into a final condition.

In order to propagate the parent tags, the Sigma compiler also produces a flux-capacitor specification. Rule 1 is a parent rule and thus the specification must specify what are the parent and child fields. In our logs these correspond to id and parent_id.

The specification also specifies which tags should be cached and retrieved by the flux-capacitor function. Here is the auto-generated specification:

rules:
- rulename: rule1
description: proc_creation_win_run_executable_invalid_extension
action: parent
tags:
- name: filter_iexplorer
- name: filter_edge_update
- name: filter_msiexec_system32
parent: parent_id
child: id

Note Rule 0 is not included in the flux-capacitor function since it has no temporal tags.

In order to better understand what the flux-capacitor does, you can use the function outside a streaming analytic. Here we show a simple ancestor example. We want to propagate the tag pf. For example pf might represent a CommandLine containing rundll32.exe.

spec = """
rules:
- rulename: rule2
action: ancestor
child: pid
parent: parent_pid
tags:
- name: pf
"""

df_input = spark.sql("""
select
*
from
values
(TIMESTAMP '2022-12-30 00:00:05', 'host1', 'pid500', '', map('rule1', map('pf', true, 'cf', false))),
(TIMESTAMP '2022-12-30 00:00:06', 'host1', 'pid600', 'pid500', map('rule1', map('pf', false, 'cf', false))),
(TIMESTAMP '2022-12-30 00:00:07', 'host1', 'pid700', 'pid600', map('rule1', map('pf', false, 'cf', true)))
t(timestamp, host_id, pid, parent_pid, sigma)
""")

Printing the dataframe df_input we see that pid500 started and had a CommandLine with the pf feature. Then pid500 started pid600. Later pid600 started pid700. Pid700 had a child feature cf.

+-------------------+------+----------+--------------+-------------------------------------+
|timestamp |pid |parent_pid|human_readable|sigma |
+-------------------+------+----------+--------------+-------------------------------------+
|2022-12-30 00:00:05|pid500| |[pf] |{rule2 -> {pf -> true, cf -> false}} |
|2022-12-30 00:00:06|pid600|pid500 |[] |{rule2 -> {pf -> false, cf -> false}}|
|2022-12-30 00:00:07|pid700|pid600 |[cf] |{rule2 -> {pf -> false, cf -> true}} |
+-------------------+------+----------+--------------+-------------------------------------+

The Sigma rule is a combination of both pf and cf. In order to bring the pf tag back on the current row, we need to apply time-travel to the pf tag. Applying the flux-capacitor function to the df_input dataframe

jdf = flux_stateful_function(df_input._jdf, "host_id", bloom_capacity, spec, True)
df_output = DataFrame(jdf, spark)

We obtain the df_output dataframe. Notice how the pf tag is propagated through time.

+-------------------+------+----------+--------------+------------------------------------+
|timestamp |pid |parent_pid|human_readable|sigma |
+-------------------+------+----------+--------------+------------------------------------+
|2022-12-30 00:00:05|pid500| |[pf] |{rule2 -> {pf -> true, cf -> false}}|
|2022-12-30 00:00:06|pid600|pid500 |[pf] |{rule2 -> {pf -> true, cf -> false}}|
|2022-12-30 00:00:07|pid700|pid600 |[pf, cf] |{rule2 -> {pf -> true, cf -> true}} |
+-------------------+------+----------+--------------+------------------------------------+

This notebook TagPropagationIllustration.ipynb contains more examples like this for parent-child and temporal proximity.

The flux-capacitor function caches all the past tags. In order to conserve memory, it caches these tags using bloom filter segments. Bloom filters have an extremely small memory footprint, are quick to query and to update. However, they do introduce possible false positive. It is thus possible that one of our detections is in fact a false positive. In order to remedy this we put the suspected anomalies in a queue (4) for re-evaluation.

To eliminate false positives, the second Spark streaming job named the Alert Builder reads the suspected anomalies (5) and retrieves the events (6) that are required to re-evaluate the rule.

For example in the case of a parent-child Sigma rule, the Alert Builder will read the suspected anomaly (5) retrieving a child process event. Next, in (6) it will retrieve the parent process of this child event. Then using these two events it re-evaluates the Sigma rule. However, this time the flux-capacitor is configured to store tags in a hash map, rather than in bloom filters. This eliminates false positives and as a bonus we have all the events involved in this detection. We store this alert along with the rows of evidence (parent and child events) into an alert table (7).

Topology with stateful detections (temporal)

The Alert Builder handles a fraction of the volume processed by (2) the Streaming Detections. Thanks to the low volume read in (5) historical searches into the tagged telemetry (6) are possible.

For a more in-depth look, take a look at the Spark jobs for the Streaming Detections streaming_detections.py and the Alert Builder streaming_alert_builder.py

To evaluate the performance of this proof of concept we ran tests on machines with 16 CPU and 64G of ram. We wrote a simple data producer that creates 5,000 synthetic events per seconds and ran the experiment for 30 days.

The Spark Streaming Detections job runs on one machine. The job is configured to trigger every minute. Each micro-batch (trigger) reads 300,000 events and takes on average 20 seconds to complete. The job can easily keep up with the incoming events rate.

Spark Streaming Detections

The Spark Alert Builder also runs on a single machine and is configured to trigger every minute. This job takes between 30 and 50 seconds to complete. This job is very sensitive to organization of the tagged_telemetry_table . Here we see the effect of the maintenance job which organizes and sorts the latest data at every hour. Thus at every hour, the Spark Alert Builder’s micro-batch execution time drops back to 30 seconds.

Spark Streaming Alert Builder

Our Spark streaming jobs trigger every minute and thus produce small data files every minute. In order to allow for fast searches and retrieval in this table, it’s important to compact and sort the data periodically. Fortunately Iceberg comes with built-in procedures to organize and maintain your tables.

For example this script maintenance.py runs every hour to sort and compact the newly added files of the Iceberg tagged_telemetry_table.

CALL catalog.system.rewrite_data_files(
table => 'catalog.jc_sched.tagged_telemetry_table',
strategy => 'sort',
sort_order => 'host_id, has_temporal_proximity_tags',
options => map('min-input-files', '100',
'max-concurrent-file-group-rewrites', '30',
'partial-progress.enabled', 'true'),
where => 'timestamp >= TIMESTAMP '2023-05-06 00:00:00' '
)

At the end of the day we also re-sort this table, yielding maximum search performance over long search periods (months of data).

CALL catalog.system.rewrite_data_files(
table => 'catalog.jc_sched.tagged_telemetry_table',
strategy => 'sort',
sort_order => 'host_id, has_temporal_proximity_tags',
options => map('min-input-files', '100',
'max-concurrent-file-group-rewrites', '30',
'partial-progress.enabled', 'true',
'rewrite-all', 'true'),
where => 'timestamp >= TIMESTAMP '2023-05-05 00:00:00' AND timestamp < TIMESTAMP '2023-05-06 00:00:00' '
)

Another maintenance task we do is deleting old data from the streaming tables. These tables are only used as buffers between producers and consumers. Thus every day we age off the streaming tables keeping 7 days of data.

delete from catalog.jc_sched.process_telemetry_table
where
timestamp < current_timestamp() - interval 7 days

Finally, every day we perform standard Iceberg table maintenance tasks, like expiring snapshots and removing orphan files. We run these maintenance jobs on all of our tables and schedule these jobs on Airflow.

In this article we showed how build a Spark streaming anomaly detection framework that generically applies Sigma rules. New Sigma rules can easily be added to the system.

This proof of concept was extensively tested on synthetic data to evaluate its stability and scalability. It shows great promise and further evaluation will be performed on a production system.

All images unless otherwise noted are by the author



Source link

Leave a Comment