Skip to main content
This guide provides a comprehensive reference for developers creating correlation rules to detect security threats in UTMStack v11. Rules are YAML files used by the analysis plugin to generate alerts when specific conditions are met.
Developer Reference: This page is designed as a practical guide for implementing security detection logic through correlation rules.

What are Correlation Rules?

Correlation rules define how to analyze events to detect security threats. When an event matches a rule’s conditions, an alert is generated.

Key Capabilities

  • Real-time threat detection based on event patterns
  • Multi-event correlation across time windows
  • Complex conditional logic using CEL expressions
  • Alert deduplication to prevent fatigue
  • Threat intelligence integration

Rule Structure

A complete rule consists of several components:
- id: 1                           # Unique identifier
  dataTypes:                      # Event types this rule applies to
    - google
  name: Hello                     # Rule name
  impact:                         # Impact scoring
    confidentiality: 0            # 0-5 scale
    integrity: 0                  # 0-5 scale
    availability: 3               # 0-5 scale
  category: Testing Category      # Rule category
  technique: Testing Technique    # Attack technique
  adversary: origin               # Adversary side (origin or target)
  references:                     # External references
    - https://quantfall.com
  description: This is a testing rule.  # Rule description
  where: safe(origin.geolocation.country, "") == "United States"  # Main condition
  afterEvents:                    # Additional correlation searches
    - indexPattern: v11-log-*
      with:
        - field: origin.ip.keyword
          operator: filter_term
          value: '{{origin.ip}}'
      within: now-12h
      count: 1
  deduplicateBy:                  # Deduplication fields
    - adversary.ip
    - adversary.country

Rule Fields Reference

Basic Metadata

Type: Integer
Required: Yes
Unique identifier for the rule across the entire system.
id: 1001
IDs must be unique. Duplicate IDs will cause conflicts.
Type: Array of strings
Required: Yes
Specifies which event types this rule applies to. Rule only evaluates events with matching data types.
dataTypes:
  - windows
  - linux
  - apache
Common data types:
  • windows, linux, macos - OS logs
  • apache, nginx, iis - Web server logs
  • cisco, fortigate, paloalto - Network device logs
  • aws, azure, gcp - Cloud provider logs
Type: String
Required: Yes
Human-readable name for the rule. Displayed in alerts and UI.
name: Brute Force Login Attempt
Type: String
Required: Yes
Detailed description explaining what the rule detects and why it’s important.
description: Detects multiple failed login attempts from the same source IP within a short time window, indicating a potential brute force attack.

Threat Classification

Type: Object
Required: Yes
Defines the potential impact of the detected threat on CIA triad (0-5 scale).
impact:
  confidentiality: 4  # Data exposure risk
  integrity: 2        # Data modification risk
  availability: 3     # Service disruption risk
Scoring Guide:
  • 0: No impact
  • 1-2: Low impact
  • 3: Medium impact
  • 4: High impact
  • 5: Critical impact
Type: String
Required: Yes
Classification category for the threat.
category: Authentication
Common categories:
  • Authentication, Authorization
  • Network Attack, Web Attack
  • Malware, Ransomware
  • Data Exfiltration
  • Insider Threat
  • Policy Violation
Type: String
Required: Yes
Specific technique used by the threat (often mapped to MITRE ATT&CK).
technique: Brute Force - T1110
Type: String
Required: Yes
Values: origin or target
Identifies which side is considered the adversary.
adversary: origin  # Attacker is the source
# or
adversary: target  # Attacker is the destination
Type: Array of strings
Required: No
External references for more information (MITRE ATT&CK, CVEs, articles).
references:
  - https://attack.mitre.org/techniques/T1110/
  - https://cwe.mitre.org/data/definitions/307.html

Conditional Logic: The where Field

The where field defines the main condition using Common Expression Language (CEL).

Basic Syntax

where: field == "value"

Supported Operators

  • Comparison
  • Logical
  • String Operations
  • Field Checks
# Equality
where: statusCode == 200
where: action != "denied"

# Numeric comparison
where: bytesReceived > 1000000
where: failedLogins >= 5
where: severity <= 3

Complex Expression Examples

# Multiple conditions with field existence checks
where: has(origin.country) && 
       !(origin.country in ["US", "CA", "GB"]) && 
       safe(origin.user, "").startsWith("admin")

# Numeric range with string condition
where: safe(bytesReceived, 0) > 10000000 && 
       safe(bytesReceived, 0) < 100000000 && 
       target.ip.contains("192.168")

# Time-based with status check
where: has(deviceTime) && 
       time.getHours(deviceTime) >= 22 && 
       time.getHours(deviceTime) <= 6 && 
       actionResult == "failure"

Event Correlation: The afterEvents Field

The afterEvents field enables multi-event correlation by searching for additional events within specified time windows.

Basic Structure

afterEvents:
  - indexPattern: v11-log-*     # Index to search
    with:                        # Search conditions
      - field: origin.ip.keyword
        operator: filter_term
        value: '{{origin.ip}}'
    within: now-12h              # Time window
    count: 5                     # Minimum matches

Search Operators

filter_term

Exact match using term search (keyword fields)Best for: IPs, usernames, exact strings

filter_match

Full-text match using text searchBest for: Messages, descriptions, analyzed text

must_not_term

Not equal using term searchBest for: Excluding specific values

must_not_match

Not matching using text searchBest for: Excluding text patterns

Dynamic Values

Use {{field.path}} syntax to reference values from the triggering event:
afterEvents:
  - indexPattern: v11-log-*
    with:
      - field: origin.user.keyword
        operator: filter_term
        value: '{{origin.user}}'       # Same user
      - field: origin.ip.keyword
        operator: must_not_term
        value: '{{origin.ip}}'          # Different IP
    within: now-24h
    count: 3

Time Windows

Time windows use Elasticsearch date math syntax:
within: now-1h      # Last hour
within: now-24h     # Last 24 hours
within: now-7d      # Last 7 days
within: now-30d     # Last 30 days
within: now-1h/h    # Current hour

Multiple Searches

Correlate across different indices:
afterEvents:
  - indexPattern: v11-log-auth
    with:
      - field: origin.ip.keyword
        operator: filter_term
        value: '{{origin.ip}}'
    within: now-1h
    count: 5
  - indexPattern: v11-log-network
    with:
      - field: source.ip.keyword
        operator: filter_term
        value: '{{origin.ip}}'
    within: now-6h
    count: 10

Nested OR Logic

Use or field for alternative correlation paths:
afterEvents:
  - indexPattern: v11-log-*
    with:
      - field: origin.ip.keyword
        operator: filter_term
        value: '{{origin.ip}}'
    within: now-12h
    count: 1
    or:
      - indexPattern: v11-alert-*
        with:
          - field: adversary.ip.keyword
            operator: filter_term
            value: '{{origin.ip}}'
        within: now-24h
        count: 2
Count Limit: Maximum count value is 50 to prevent performance issues.

Alert Deduplication

Use deduplicateBy to prevent duplicate alerts for the same threat:
deduplicateBy:
  - adversary.ip
  - adversary.country
  - target.ip

How It Works

  1. Alert is generated based on rule conditions
  2. System creates a hash from specified fields
  3. If hash matches recent alert (within deduplication window), new alert is suppressed
  4. Otherwise, alert is created

Best Practices

  • Include fields that uniquely identify the threat
  • Balance between deduplication and alert visibility
  • Common fields: IPs, usernames, hostnames, attack types
  • Avoid time-based fields that change frequently

Rule Evaluation Process

1

Event Received

EventProcessor receives and parses event
2

Data Type Match

System identifies rules matching event’s data type
3

Where Condition

Evaluates where expression using event fields
4

AfterEvents Search

If where=true, performs correlation searches
5

Count Validation

Checks if required event counts are met
6

Alert Generation

Creates alert if all conditions satisfied
7

Deduplication

Checks for recent duplicate alerts
8

Alert Delivery

Delivers alert to correlation engine and UI

Real-World Examples

Example 1: Brute Force Detection

- id: 1001
  dataTypes:
    - linux
    - windows
  name: Brute Force Authentication Attack
  impact:
    confidentiality: 4
    integrity: 3
    availability: 2
  category: Authentication
  technique: Brute Force - T1110
  adversary: origin
  references:
    - https://attack.mitre.org/techniques/T1110/
  description: |
    Detects multiple failed authentication attempts from the same source IP
    within a short time window, indicating a potential brute force attack.
  where: has(origin.ip) && actionResult == "failure" && action == "login"
  afterEvents:
    - indexPattern: v11-log-*
      with:
        - field: origin.ip.keyword
          operator: filter_term
          value: '{{origin.ip}}'
        - field: actionResult.keyword
          operator: filter_term
          value: 'failure'
      within: now-1h
      count: 10
  deduplicateBy:
    - origin.ip
    - target.host

Example 2: Data Exfiltration

- id: 1002
  dataTypes:
    - network
    - firewall
  name: Large Data Transfer to External Destination
  impact:
    confidentiality: 5
    integrity: 2
    availability: 1
  category: Exfiltration
  technique: Data Transfer - T1048
  adversary: origin
  references:
    - https://attack.mitre.org/techniques/T1048/
  description: |
    Detects unusually large data transfers to external IP addresses,
    which may indicate data exfiltration attempts.
  where: |
    has(origin.ip) && 
    has(target.ip) && 
    has(bytesSent) && 
    safe(bytesSent, 0) > 50000000 &&
    !(target.ip.contains("10.") || 
      target.ip.contains("192.168.") || 
      target.ip.contains("172.16."))
  afterEvents:
    - indexPattern: v11-log-network-*
      with:
        - field: origin.ip.keyword
          operator: filter_term
          value: '{{origin.ip}}'
        - field: target.ip.keyword
          operator: filter_term
          value: '{{target.ip}}'
      within: now-24h
      count: 1
  deduplicateBy:
    - origin.ip
    - target.ip

Example 3: Lateral Movement

- id: 1003
  dataTypes:
    - windows
  name: Lateral Movement via Remote Execution
  impact:
    confidentiality: 4
    integrity: 4
    availability: 3
  category: Lateral Movement
  technique: Remote Services - T1021
  adversary: origin
  references:
    - https://attack.mitre.org/techniques/T1021/
  description: |
    Detects use of remote execution tools like PsExec, WinRM, or WMI
    from a single source to multiple targets, indicating lateral movement.
  where: |
    has(origin.ip) && 
    has(action) &&
    action in ["psexec", "winrm", "wmi"] &&
    actionResult == "success"
  afterEvents:
    - indexPattern: v11-log-windows-*
      with:
        - field: origin.ip.keyword
          operator: filter_term
          value: '{{origin.ip}}'
        - field: action.keyword
          operator: filter_term
          value: '{{action}}'
        - field: target.ip.keyword
          operator: must_not_term
          value: '{{target.ip}}'
      within: now-30m
      count: 3
  deduplicateBy:
    - origin.ip
    - action

Example 4: Suspicious Time Activity

- id: 1004
  dataTypes:
    - database
    - file_access
  name: Sensitive Activity Outside Business Hours
  impact:
    confidentiality: 3
    integrity: 4
    availability: 2
  category: Insider Threat
  technique: Off-Hours Activity - T1078
  adversary: origin
  references:
    - https://attack.mitre.org/techniques/T1078/
  description: |
    Detects access to sensitive resources (databases, confidential files)
    outside of normal business hours by internal users.
  where: |
    has(origin.user) &&
    has(deviceTime) &&
    !(origin.user.startsWith("system_") || origin.user.startsWith("service_")) &&
    (time.getHours(deviceTime) < 6 || time.getHours(deviceTime) > 20) &&
    (action in ["database_query", "file_download", "export_data"])
  afterEvents:
    - indexPattern: v11-log-*
      with:
        - field: origin.user.keyword
          operator: filter_term
          value: '{{origin.user}}'
        - field: action.keyword
          operator: filter_term
          value: '{{action}}'
      within: now-7d
      count: 1
  deduplicateBy:
    - origin.user
    - action

Development Workflow

1

Identify Threat

Determine what security threat you want to detect
  • Review security requirements
  • Analyze threat intelligence
  • Consider MITRE ATT&CK framework
2

Analyze Event Data

Examine sample events that indicate this threat
# Query sample events
GET v11-log-*/_search
{
  "query": {
    "match": { "dataType": "windows" }
  },
  "size": 10
}
3

Create Rule File

Create YAML file with basic structure
- id: [next_available_id]
  dataTypes:
    - [event_type]
  name: [rule_name]
  # ... other fields
4

Define Metadata

Set impact scores, category, technique, references
5

Write Where Condition

Create CEL expression to identify suspicious events
  • Start simple, refine iteratively
  • Test expressions with sample data
  • Handle missing fields with safe() and has()
6

Add Correlation Logic

Define afterEvents if multi-event correlation needed
  • Choose appropriate index patterns
  • Set reasonable time windows
  • Use dynamic values for correlation
7

Configure Deduplication

Specify fields to prevent alert fatigue
8

Test Rule

Deploy to development environment and test
  • Use test events that should trigger
  • Use test events that should NOT trigger
  • Verify alert content and format
9

Refine and Optimize

Adjust based on testing results
  • Tune thresholds
  • Optimize performance
  • Reduce false positives
10

Document and Deploy

Add comprehensive description and deploy to production

Best Practices

Rule Design

Start Simple
  • Begin with basic conditions
  • Add complexity incrementally
  • Test each addition
Be Specific
  • Target specific data types
  • Use precise field matches
  • Avoid overly broad conditions
Handle Missing Data
  • Always use has() or safe() for optional fields
  • Provide sensible defaults
  • Test with incomplete events
Consider Performance
  • Limit data type scope
  • Use efficient operators
  • Minimize afterEvents searches
  • Set reasonable time windows
Prevent False Positives
  • Test with diverse datasets
  • Include normal activity patterns
  • Use whitelisting where appropriate
  • Refine based on feedback

Documentation

  • Write clear, detailed descriptions
  • Include relevant references (MITRE ATT&CK, CVEs)
  • Document expected event formats
  • Explain complex logic
  • Note any limitations or caveats

Testing

  • Test with real production data samples
  • Test edge cases and missing fields
  • Verify deduplication works correctly
  • Monitor performance impact
  • Conduct regular reviews and updates

Troubleshooting

Common Issues

Possible Causes:
  • Event doesn’t match dataTypes
  • where condition evaluates to false
  • afterEvents count not met
  • Field names don’t match event structure
Solutions:
  • Verify event has correct dataType field
  • Test where expression with sample data
  • Check field names match exactly (case-sensitive)
  • Review afterEvents search results
  • Check for typos in field names
Possible Causes:
  • Condition too broad
  • Normal activity matches pattern
  • Missing exclusions
Solutions:
  • Add more specific conditions
  • Include whitelist of known-good patterns
  • Increase thresholds (counts, time windows)
  • Add additional correlation requirements
Possible Causes:
  • Complex expressions
  • Large time windows in afterEvents
  • Too many correlation searches
Solutions:
  • Simplify expressions
  • Reduce time window durations
  • Limit number of afterEvents searches
  • Narrow indexPattern scope
  • Add more specific dataTypes
Possible Causes:
  • Field doesn’t exist in all events
  • Field name typo
  • Data type mismatch
Solutions:
  • Use has() to check field existence
  • Use safe() with defaults
  • Verify field names against events
  • Check field types match expected

Debugging Tips

# Add field existence checks
where: has(origin.ip) && has(origin.user) && origin.user != ""

# Use safe() with defaults
where: safe(failedLogins, 0) > 5

# Test expressions incrementally
# Start with: where: has(origin.ip)
# Then add: where: has(origin.ip) && actionResult == "failure"
# Finally: where: has(origin.ip) && actionResult == "failure" && safe(count, 0) > 5

# Log correlation searches (in development)
# Add temporary rule with broader conditions to see what events exist