google / secops-wrapper
A helper SDK to wrap the Google SecOps API for common security use cases
README
Google SecOps SDK for Python
A Python SDK for interacting with Google Security Operations products, currently supporting Chronicle/SecOps SIEM.
This wraps the API for common use cases, including UDM searches, entity lookups, IoCs, alert management, case management, and detection rule management.
Prerequisites
Follow these steps to ensure your environment is properly configured:
-
Configure a Google Cloud Project for Google SecOps
- Your Google Cloud project must be linked to your Google SecOps instance.
- Chronicle API needs to be enabled in your Google Cloud project.
- The project used for authentication must be the same project that was set up during your SecOps onboarding.
- For detailed instructions, see Configure a Google Cloud project for Google SecOps.
-
Set up IAM Permissions
- The service account or user credentials you use must have appropriate permissions
- The recommended predefined role is Chronicle API Admin (
roles/chronicle.admin) - For more granular access control, you can create custom roles with specific permissions
- See Access control using IAM for detailed permission information
-
Required Information
- Your Chronicle instance ID (customer_id)
- Your Google Cloud project number (project_id)
- Your preferred region (e.g., "us", "europe", "asia")
Note: Using a Google Cloud project that is not linked to your SecOps instance will result in authentication failures, even if the service account/user has the correct IAM roles assigned.
Installation
pip install secops
Command Line Interface
The SDK also provides a comprehensive command-line interface (CLI) that makes it easy to interact with Google Security Operations products from your terminal:
# Save your credentials
secops config set --customer-id "your-instance-id" --project-id "your-project-id" --region "us"
# Now use commands without specifying credentials each time
secops search --query "metadata.event_type = \"NETWORK_CONNECTION\""
For detailed CLI documentation and examples, see the CLI Documentation.
Authentication
The SDK supports two main authentication methods:
1. Application Default Credentials (ADC)
The simplest and recommended way to authenticate the SDK. Application Default Credentials provide a consistent authentication method that works across different Google Cloud environments and local development.
There are several ways to use ADC:
a. Using gcloud CLI (Recommended for Local Development)
# Login and set up application-default credentials
gcloud auth application-default login
Then in your code:
from secops import SecOpsClient
# Initialize with default credentials - no explicit configuration needed
client = SecOpsClient()
b. Using Environment Variable
Set the environment variable pointing to your service account key:
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/service-account.json"
Then in your code:
from secops import SecOpsClient
# Initialize with default credentials - will automatically use the credentials file
client = SecOpsClient()
c. Google Cloud Environment (Automatic)
When running on Google Cloud services (Compute Engine, Cloud Functions, Cloud Run, etc.), ADC works automatically without any configuration:
from secops import SecOpsClient
# Initialize with default credentials - will automatically use the service account
# assigned to your Google Cloud resource
client = SecOpsClient()
ADC will automatically try these authentication methods in order:
- Environment variable
GOOGLE_APPLICATION_CREDENTIALS - Google Cloud SDK credentials (set by
gcloud auth application-default login) - Google Cloud-provided service account credentials
- Local service account impersonation credentials
2. Service Account Authentication
For more explicit control, you can authenticate using a service account that is created in the Google Cloud project associated with Google SecOps.
Important Note on Permissions:
- This service account needs to be granted the appropriate Identity and Access Management (IAM) role to interact with the Google Secops (Chronicle) API. The recommended predefined role is Chronicle API Admin (
roles/chronicle.admin). Alternatively, if your security policies require more granular control, you can create a custom IAM role with the specific permissions needed for the operations you intend to use (e.g.,chronicle.instances.get,chronicle.events.create,chronicle.rules.list, etc.).
Once the service account is properly permissioned, you can authenticate using it in two ways:
a. Using a Service Account JSON File
from secops import SecOpsClient
# Initialize with service account JSON file
client = SecOpsClient(service_account_path="/path/to/service-account.json")
b. Using Service Account Info Dictionary
If you prefer to manage credentials programmatically without a file, you can create a dictionary containing the service account key's contents.
from secops import SecOpsClient
# Service account details as a dictionary
service_account_info = {
"type": "service_account",
"project_id": "your-project-id",
"private_key_id": "key-id",
"private_key": "-----BEGIN PRIVATE KEY-----\n...",
"client_email": "[email protected]",
"client_id": "client-id",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/..."
}
# Initialize with service account info
client = SecOpsClient(service_account_info=service_account_info)
Impersonate Service Account
Both Application Default Credentials and Service Account Authentication supports impersonating a Service Account leveraging the corresponding impersonate_service_account parameter as per the following configuration:
from secops import SecOpsClient
# Initialize with default credentials and impersonate service account
client = SecOpsClient(impersonate_service_account="[email protected]")
Retry Configuration
The SDK provides built-in retry functionality that automatically handles transient errors such as rate limiting (429), server errors (500, 502, 503, 504), and network issues. You can customize the retry behavior when initializing the client:
from secops import SecOpsClient
from secops.auth import RetryConfig
# Define retry configurations
retry_config = RetryConfig(
total=3, # Maximum number of retries (default: 5)
retry_status_codes=[429, 500, 502, 503, 504], # HTTP status codes to retry
allowed_methods=["GET", "DELETE"], # HTTP methods to retry
backoff_factor=0.5 # Backoff factor (default: 0.3)
)
# Initialize with custom retry config
client = SecOpsClient(retry_config=retry_config)
# Disable retry completely by marking retry config as False
client = SecOpsClient(retry_config=False)
Using the Chronicle API
Initializing the Chronicle Client
After creating a SecOpsClient, you need to initialize the Chronicle-specific client:
# Initialize Chronicle client
chronicle = client.chronicle(
customer_id="your-chronicle-instance-id", # Your Chronicle instance ID
project_id="your-project-id", # Your GCP project ID
region="us" # Chronicle API region
)
API Version Control
The SDK supports flexible API version selection:
- Default Version: Set
default_api_versionduring client initialization (default isv1alpha) - Per-Method Override: Many methods accept an
api_versionparameter to override the default for specific calls
Supported API versions:
v1- Stable production APIv1beta- Beta API with newer featuresv1alpha- Alpha API with experimental features
Example with per-method version override:
from secops.chronicle.models import APIVersion
# Client defaults to v1alpha
chronicle = client.chronicle(
customer_id="your-chronicle-instance-id",
project_id="your-project-id",
region="us",
default_api_version="v1alpha"
)
# Use v1 for a specific rule operation
rule = chronicle.get_rule(
rule_id="ru_12345678-1234-1234-1234-123456789abc",
api_version=APIVersion.V1 # Override to use v1 for this call
)
Log Ingestion
Ingest raw logs directly into Chronicle:
from datetime import datetime, timezone
import json
# Create a sample log (this is an OKTA log)
current_time = datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z')
okta_log = {
"actor": {
"alternateId": "[email protected]",
"displayName": "Mark Taylor",
"id": "00u4j7xcb5N6zfiRP5d8",
"type": "User"
},
"client": {
"userAgent": {
"rawUserAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36",
"os": "Windows 10",
"browser": "CHROME"
},
"ipAddress": "96.6.127.53",
"geographicalContext": {
"city": "New York",
"state": "New York",
"country": "United States",
"postalCode": "10118",
"geolocation": {"lat": 40.7123, "lon": -74.0068}
}
},
"displayMessage": "Max sign in attempts exceeded",
"eventType": "user.account.lock",
"outcome": {"result": "FAILURE", "reason": "LOCKED_OUT"},
"published": "2025-06-19T21:51:50.116Z",
"securityContext": {
"asNumber": 20940,
"asOrg": "akamai technologies inc.",
"isp": "akamai international b.v.",
"domain": "akamaitechnologies.com",
"isProxy": false
},
"severity": "DEBUG",
"legacyEventType": "core.user_auth.account_locked",
"uuid": "5b90a94a-d7ba-11ea-834a-85c24a1b2121",
"version": "0"
# ... additional OKTA log fields may be included
}
# Ingest a single log using the default forwarder
result = chronicle.ingest_log(
log_type="OKTA", # Chronicle log type
log_message=json.dumps(okta_log) # JSON string of the log
)
print(f"Operation: {result.get('operation')}")
# Batch ingestion: Ingest multiple logs in a single request
batch_logs = [
json.dumps({"actor": {"displayName": "User 1"}, "eventType": "user.session.start"}),
json.dumps({"actor": {"displayName": "User 2"}, "eventType": "user.session.start"}),
json.dumps({"actor": {"displayName": "User 3"}, "eventType": "user.session.start"})
]
# Ingest multiple logs in a single API call
batch_result = chronicle.ingest_log(
log_type="OKTA",
log_message=batch_logs # List of log message strings
)
print(f"Batch operation: {batch_result.get('operation')}")
# Add custom labels to your logs
labeled_result = chronicle.ingest_log(
log_type="OKTA",
log_message=json.dumps(okta_log),
labels={"environment": "production", "app": "web-portal", "team": "security"}
)
The SDK also supports non-JSON log formats. Here's an example with XML for Windows Event logs:
# Create a Windows Event XML log
xml_content = """<Event xmlns='http://schemas.microsoft.com/win/2004/08/events/event'>
<System>
<Provider Name='Microsoft-Windows-Security-Auditing' Guid='{54849625-5478-4994-A5BA-3E3B0328C30D}'/>
<EventID>4624</EventID>
<Version>1</Version>
<Level>0</Level>
<Task>12544</Task>
<Opcode>0</Opcode>
<Keywords>0x8020000000000000</Keywords>
<TimeCreated SystemTime='2024-05-10T14:30:00Z'/>
<EventRecordID>202117513</EventRecordID>
<Correlation/>
<Execution ProcessID='656' ThreadID='700'/>
<Channel>Security</Channel>
<Computer>WIN-SERVER.xyz.net</Computer>
<Security/>
</System>
<EventData>
<Data Name='SubjectUserSid'>S-1-0-0</Data>
<Data Name='SubjectUserName'>-</Data>
<Data Name='TargetUserName'>svcUser</Data>
<Data Name='WorkstationName'>CLIENT-PC</Data>
<Data Name='LogonType'>3</Data>
</EventData>
</Event>"""
# Ingest the XML log - no json.dumps() needed for XML
result = chronicle.ingest_log(
log_type="WINEVTLOG_XML", # Windows Event Log XML format
log_message=xml_content # Raw XML content
)
print(f"Operation: {result.get('operation')}")
The SDK supports all log types available in Chronicle. You can:
- View available log types:
# Get all available log types log_types = chronicle.get_all_log_types() for lt in log_types[:5]: # Show first 5 print(f"{lt.id}: {lt.description}")
Fetch only first 50 log types (single page)
log_types_page = chronicle.get_all_log_types(page_size=50)
Fetch specific page using token
log_types_next = chronicle.get_all_log_types(
page_size=50,
page_token="next_page_token"
)
2. Search for specific log types:
```python
# Search for log types related to firewalls
firewall_types = chronicle.search_log_types("firewall")
for lt in firewall_types:
print(f"{lt.id}: {lt.description}")
-
Validate log types:
# Check if a log type is valid if chronicle.is_valid_log_type("OKTA"): print("Valid log type") else: print("Invalid log type") -
Classify logs to predict log type:
# Classify a raw log to determine its type okta_log = '{"eventType": "user.session.start", "actor": {"alternateId": "[email protected]"}}' predictions = chronicle.classify_logs(log_data=okta_log)
Display predictions sorted by confidence score
for prediction in predictions:
print(f"Log Type: {prediction['logType']}, Score: {prediction['score']}")
> **Note:** Confidence scores are provided by the API as guidance only and may not always accurately reflect classification certainty. Use scores for relative ranking rather than absolute confidence.
5. Use custom forwarders:
```python
# Create or get a custom forwarder
forwarder = chronicle.get_or_create_forwarder(display_name="MyCustomForwarder")
forwarder_id = forwarder["name"].split("/")[-1]
# Use the custom forwarder for log ingestion
result = chronicle.ingest_log(
log_type="WINDOWS",
log_message=json.dumps(windows_log),
forwarder_id=forwarder_id
)
Forwarder Management
Chronicle log forwarders are essential for handling log ingestion with specific configurations. The SDK provides comprehensive methods for creating and managing forwarders:
Create a new forwarder
# Create a basic forwarder with just a display name
forwarder = chronicle.create_forwarder(display_name="MyAppForwarder")
# Create a forwarder with optional configuration
forwarder = chronicle.create_forwarder(
display_name="ProductionForwarder",
metadata={"labels": {"env": "prod"}},
upload_compression=True, # Enable upload compression for efficiency
enable_server=False # Server functionality disabled,
http_settings={
"port":8080,
"host":"192.168.0.100",
"routeSettings":{
"availableStatusCode": 200,
"readyStatusCode": 200,
"unreadyStatusCode": 500
}
}
)
print(f"Created forwarder with ID: {forwarder['name'].split('/')[-1]}")
List all forwarders
Retrieve all forwarders in your Chronicle environment with pagination support:
# Get the default page size (50)
forwarders = chronicle.list_forwarders()
# Get forwarders with custom page size
forwarders = chronicle.list_forwarders(page_size=100)
# Process the forwarders
for forwarder in forwarders.get("forwarders", []):
forwarder_id = forwarder.get("name", "").split("/")[-1]
display_name = forwarder.get("displayName", "")
create_time = forwarder.get("createTime", "")
print(f"Forwarder ID: {forwarder_id}, Name: {display_name}, Created: {create_time}")
Get forwarder details
Retrieve details about a specific forwarder using its ID:
# Get a specific forwarder using its ID
forwarder_id = "1234567890"
forwarder = chronicle.get_forwarder(forwarder_id=forwarder_id)
# Access forwarder properties
display_name = forwarder.get("displayName", "")
metadata = forwarder.get("metadata", {})
server_enabled = forwarder.get("enableServer", False)
print(f"Forwarder {display_name} details:")
print(f" Metadata: {metadata}")
print(f" Server enabled: {server_enabled}")
Get or create a forwarder
Retrieve an existing forwarder by display name or create a new one if it doesn't exist:
# Try to find a forwarder with the specified display name
# If not found, create a new one with that display name
forwarder = chronicle.get_or_create_forwarder(display_name="ApplicationLogForwarder")
# Extract the forwarder ID for use in log ingestion
forwarder_id = forwarder["name"].split("/")[-1]
Update a forwarder
Update an existing forwarder's configuration with specific properties:
# Update a forwarder with new properties
forwarder = chronicle.update_forwarder(
forwarder_id="1234567890",
display_name="UpdatedForwarderName",
metadata={"labels": {"env": "prod"}},
upload_compression=True
)
# Update specific fields using update mask
forwarder = chronicle.update_forwarder(
forwarder_id="1234567890",
display_name="ProdForwarder",
update_mask=["display_name"]
)
print(f"Updated forwarder: {forwarder['name']}")
Delete a forwarder
Delete an existing forwarder by its ID:
# Delete a forwarder by ID
chronicle.delete_forwarder(forwarder_id="1234567890")
print("Forwarder deleted successfully")
Log Processing Pipelines
Chronicle log processing pipelines allow you to transform, filter, and enrich log data before it is stored in Chronicle. Common use cases include removing empty key-value pairs, redacting sensitive data, adding ingestion labels, filtering logs by field values, and extracting host information. Pipelines can be associated with log types (with optional collector IDs) and feeds, providing flexible control over your data ingestion workflow.
The SDK provides comprehensive methods for managing pipelines, associating streams, testing configurations, and fetching sample logs.
List pipelines
Retrieve all log processing pipelines in your Chronicle instance:
# Get all pipelines
result = chronicle.list_log_processing_pipelines()
pipelines = result.get("logProcessingPipelines", [])
for pipeline in pipelines:
pipeline_id = pipeline["name"].split("/")[-1]
print(f"Pipeline: {pipeline['displayName']} (ID: {pipeline_id})")
# List with pagination
result = chronicle.list_log_processing_pipelines(
page_size=50,
page_token="next_page_token"
)
Get pipeline details
Retrieve details about a specific pipeline:
# Get pipeline by ID
pipeline_id = "1234567890"
pipeline = chronicle.get_log_processing_pipeline(pipeline_id)
print(f"Name: {pipeline['displayName']}")
print(f"Description: {pipeline.get('description', 'N/A')}")
print(f"Processors: {len(pipeline.get('processors', []))}")
Create a pipeline
Create a new log processing pipeline with processors:
# Define pipeline configuration
pipeline_config = {
"displayName": "My Custom Pipeline",
"description": "Filters and transforms application logs",
"processors": [
{
"filterProcessor": {
"include": {
"logMatchType": "REGEXP",
"logBodies": [".*error.*", ".*warning.*"],
},
"errorMode": "IGNORE",
}
}
],
"customMetadata": [
{"key": "environment", "value": "production"},
{"key": "team", "value": "security"}
]
}
# Create the pipeline (server generates ID)
created_pipeline = chronicle.create_log_processing_pipeline(
pipeline=pipeline_config
)
pipeline_id = created_pipeline["name"].split("/")[-1]
print(f"Created pipeline with ID: {pipeline_id}")
Update a pipeline
Update an existing pipeline's configuration:
# Get the existing pipeline first
pipeline = chronicle.get_log_processing_pipeline(pipeline_id)
# Update specific fields
updated_config = {
"name": pipeline["name"],
"description": "Updated description",
"processors": pipeline["processors"]
}
# Patch with update mask
updated_pipeline = chronicle.update_log_processing_pipeline(
pipeline_id=pipeline_id,
pipeline=updated_config,
update_mask="description"
)
print(f"Updated: {updated_pipeline['displayName']}")
Delete a pipeline
Delete an existing pipeline:
# Delete by ID
chronicle.delete_log_processing_pipeline(pipeline_id)
print("Pipeline deleted successfully")
# Delete with etag for concurrency control
chronicle.delete_log_processing_pipeline(
pipeline_id=pipeline_id,
etag="etag_value"
)
Associate streams with a pipeline
Associate log streams (by log type or feed) with a pipeline:
# Associate by log type
streams = [
{"logType": "WINEVTLOG"},
{"logType": "LINUX"}
]
chronicle.associate_streams(
pipeline_id=pipeline_id,
streams=streams
)
print("Streams associated successfully")
# Associate by feed ID
feed_streams = [
{"feed": "feed-uuid-1"},
{"feed": "feed-uuid-2"}
]
chronicle.associate_streams(
pipeline_id=pipeline_id,
streams=feed_streams
)
Dissociate streams from a pipeline
Remove stream associations from a pipeline:
# Dissociate streams
streams = [{"logType": "WINEVTLOG"}]
chronicle.dissociate_streams(
pipeline_id=pipeline_id,
streams=streams
)
print("Streams dissociated successfully")
Fetch associated pipeline
Find which pipeline is associated with a specific stream:
# Find pipeline for a log type
stream_query = {"logType": "WINEVTLOG"}
associated = chronicle.fetch_associated_pipeline(stream=stream_query)
if associated:
print(f"Associated pipeline: {associated['name']}")
else:
print("No pipeline associated with this stream")
# Find pipeline for a feed
feed_query = {"feed": "feed-uuid"}
associated = chronicle.fetch_associated_pipeline(stream=feed_query)
Fetch sample logs
Retrieve sample logs for specific streams:
# Fetch sample logs for log types
streams = [
{"logType": "WINEVTLOG"},
{"logType": "LINUX"}
]
result = chronicle.fetch_sample_logs_by_streams(
streams=streams,
sample_logs_count=10
)
for log in result.get("logs", []):
print(f"Log: {log}")
Test a pipeline
Test a pipeline configuration against sample logs before deployment:
import base64
from datetime import datetime, timezone
# Define pipeline to test
pipeline_config = {
"displayName": "Test Pipeline",
"processors": [
{
"filterProcessor": {
"include": {
"logMatchType": "REGEXP",
"logBodies": [".*"],
},
"errorMode": "IGNORE",
}
}
]
}
# Create test logs with base64-encoded data
current_time = datetime.now(timezone.utc).isoformat()
log_data = base64.b64encode(b"Sample log entry").decode("utf-8")
input_logs = [
{
"data": log_data,
"logEntryTime": current_time,
"collectionTime": current_time,
}
]
# Test the pipeline
result = chronicle.test_pipeline(
pipeline=pipeline_config,
input_logs=input_logs
)
print(f"Processed {len(result.get('logs', []))} logs")
for processed_log in result.get("logs", []):
print(f"Result: {processed_log}")
- Use custom timestamps:
from datetime import datetime, timedelta, timezone
Define custom timestamps
log_entry_time = datetime.now(timezone.utc) - timedelta(hours=1)
collection_time = datetime.now(timezone.utc)
result = chronicle.ingest_log(
log_type="OKTA",
log_message=json.dumps(okta_log),
log_entry_time=log_entry_time, # When the log was generated
collection_time=collection_time # When the log was collected
)
Ingest UDM events directly into Chronicle:
```python
import uuid
from datetime import datetime, timezone
# Generate a unique ID
event_id = str(uuid.uuid4())
# Get current time in ISO 8601 format
current_time = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
# Create a UDM event for a network connection
network_event = {
"metadata": {
"id": event_id,
"event_timestamp": current_time,
"event_type": "NETWORK_CONNECTION",
"product_name": "My Security Product",
"vendor_name": "My Company"
},
"principal": {
"hostname": "workstation-1",
"ip": "192.168.1.100",
"port": 12345
},
"target": {
"ip": "203.0.113.10",
"port": 443
},
"network": {
"application_protocol": "HTTPS",
"direction": "OUTBOUND"
}
}
# Ingest a single UDM event
result = chronicle.ingest_udm(udm_events=network_event)
print(f"Ingested event with ID: {event_id}")
# Create a second event
process_event = {
"metadata": {
# No ID - one will be auto-generated
"event_timestamp": current_time,
"event_type": "PROCESS_LAUNCH",
"product_name": "My Security Product",
"vendor_name": "My Company"
},
"principal": {
"hostname": "workstation-1",
"process": {
"command_line": "ping 8.8.8.8",
"pid": 1234
},
"user": {
"userid": "user123"
}
}
}
# Ingest multiple UDM events in a single call
result = chronicle.ingest_udm(udm_events=[network_event, process_event])
print("Multiple events ingested successfully")
Import entities into Chronicle:
# Create a sample entity
entity = {
"metadata": {
"collected_timestamp": "2025-01-01T00:00:00Z",
"vendor_name": "TestVendor",
"product_name": "TestProduct",
"entity_type": "USER",
},
"entity": {
"user": {
"userid": "testuser",
}
},
}
# Import a single entity
result = chronicle.import_entities(entities=entity, log_type="TEST_LOG_TYPE")
print(f"Imported entity: {result}")
# Import multiple entities
entity2 = {
"metadata": {
"collected_timestamp": "2025-01-01T00:00:00Z",
"vendor_name": "TestVendor",
"product_name": "TestProduct",
"entity_type": "ASSET",
},
"entity": {
"asset": {
"hostname": "testhost",
}
},
}
entities = [entity, entity2]
result = chronicle.import_entities(entities=entities, log_type="TEST_LOG_TYPE")
print(f"Imported entities: {result}")
Data Export
Note: The Data Export API features are currently under test and review. We welcome your feedback and encourage you to submit any issues or unexpected behavior to the issue tracker so we can improve this functionality.
You can export Chronicle logs to Google Cloud Storage using the Data Export API:
from datetime import datetime, timedelta, timezone
# Set time range for export
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(days=1) # Last 24 hours
# Get available log types for export
available_log_types = chronicle.fetch_available_log_types(
start_time=start_time,
end_time=end_time
)
# Print available log types
for log_type in available_log_types["available_log_types"]:
print(f"{log_type.display_name} ({log_type.log_type.split('/')[-1]})")
print(f" Available from {log_type.start_time} to {log_type.end_time}")
# Create a data export for a single log type (legacy method)
export = chronicle.create_data_export(
gcs_bucket="projects/my-project/buckets/my-export-bucket",
start_time=start_time,
end_time=end_time,
log_type="GCP_DNS" # Single log type to export
)
# Create a data export for multiple log types
export_multiple = chronicle.create_data_export(
gcs_bucket="projects/my-project/buckets/my-export-bucket",
start_time=start_time,
end_time=end_time,
log_types=["WINDOWS", "LINUX", "GCP_DNS"] # Multiple log types to export
)
# Get the export ID
export_id = export["name"].split("/")[-1]
print(f"Created export with ID: {export_id}")
print(f"Status: {export['data_export_status']['stage']}")
# List recent exports
recent_exports = chronicle.list_data_export(page_size=10)
print(f"Found {len(recent_exports.get('dataExports', []))} recent exports")
# Print details of recent exports
for item in recent_exports.get("dataExports", []):
item_id = item["name"].split("/")[-1]
if "dataExportStatus" in item:
status = item["dataExportStatus"]["stage"]
else:
status = item["data_export_status"]["stage"]
print(f"Export ID: {item_id}, Status: {status}")
# Check export status
status = chronicle.get_data_export(export_id)
# Update an export that is in IN_QUEUE state
if status.get("dataExportStatus", {}).get("stage") == "IN_QUEUE":
# Update with a new start time
updated_start = start_time + timedelta(hours=2)
update_result = chronicle.update_data_export(
data_export_id=export_id,
start_time=updated_start,
# Optionally update other parameters like end_time, gcs_bucket, or log_types
)
print("Export updated successfully")
# Cancel an export if needed
if status.get("dataExportStatus", {}).get("stage") in ["IN_QUEUE", "PROCESSING"]:
cancelled = chronicle.cancel_data_export(export_id)
print(f"Export has been cancelled. New status: {cancelled['data_export_status']['stage']}")
# Export all log types at once
export_all = chronicle.create_data_export(
gcs_bucket="projects/my-project/buckets/my-export-bucket",
start_time=start_time,
end_time=end_time,
export_all_logs=True
)
print(f"Created export for all logs. Status: {export_all['data_export_status']['stage']}")
The Data Export API supports:
- Exporting one, multiple, or all log types to Google Cloud Storage
- Listing recent exports and filtering results
- Checking export status and progress
- Updating exports that are in the queue
- Cancelling exports in progress
- Fetching available log types for a specific time range
If you encounter any issues with the Data Export functionality, please submit them to our issue tracker with detailed information about the problem and steps to reproduce.
Basic UDM Search
Search for network connection events:
from datetime import datetime, timedelta, timezone
# Set time range for queries
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(hours=24) # Last 24 hours
# Perform UDM search
results = chronicle.search_udm(
query="""
metadata.event_type = "NETWORK_CONNECTION"
ip != ""
""",
start_time=start_time,
end_time=end_time,
max_events=5
)
# Example response:
{
"events": [
{
"name": "projects/my-project/locations/us/instances/my-instance/events/encoded-event-id",
"udm": {
"metadata": {
"eventTimestamp": "2024-02-09T10:30:00Z",
"eventType": "NETWORK_CONNECTION"
},
"target": {
"ip": ["192.168.1.100"],
"port": 443
},
"principal": {
"hostname": "workstation-1"
}
}
}
],
"total_events": 1,
"more_data_available": false
}
UDM Search View
Retrieve UDM search results with additional contextual information, including detection data:
from datetime import datetime, timedelta, timezone
# Set time range for queries
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(hours=24) # Last 24 hours
# Fetch UDM search view results
results = chronicle.fetch_udm_search_view(
query='metadata.event_type = "NETWORK_CONNECTION"',
start_time=start_time,
end_time=end_time,
max_events=5, # Limit to 5 events
max_detections=10, # Get up to 10 detections
snapshot_query='feedback_summary.status = "OPEN"', # Filter for open alerts
case_insensitive=True # Case-insensitive search
)
Note: The
fetch_udm_search_viewmethod is synchronous and returns all results at once, not as a streaming response since the parameter passed to endpoint(legacyFetchUDMSearchView) provides synchronous response.
Fetch UDM Field Values
Search for ingested UDM field values that match a query:
# Search for fields containing "source"
results = chronicle.find_udm_field_values(
query="source",
page_size=10
)
# Example response:
{
"valueMatches": [
{
"fieldPath": "metadata.ingestion_labels.key",
"value": "source",
"ingestionTime": "2025-08-18T08:00:11.670673Z",
"matchEnd": 6
},
{
"fieldPath": "additional.fields.key",
"value": "source",
"ingestionTime": "2025-02-18T19:45:01.811426Z",
"matchEnd": 6
}
],
"fieldMatches": [
{
"fieldPath": "about.labels.value"
},
{
"fieldPath": "additional.fields.value.string_value"
}
],
"fieldMatchRegex": "source"
}
Statistics Queries
Get statistics about network connections grouped by hostname:
stats = chronicle.get_stats(
query="""metadata.event_type = "NETWORK_CONNECTION"
match:
target.hostname
outcome:
$count = count(metadata.id)
order:
$count desc""",
start_time=start_time,
end_time=end_time,
max_events=1000,
max_values=10,
timeout=180
)
# Example response:
{
"columns": ["hostname", "count"],
"rows": [
{"hostname": "server-1", "count": 1500},
{"hostname": "server-2", "count": 1200}
],
"total_rows": 2
}
CSV Export
Export specific fields to CSV format:
csv_data = chronicle.fetch_udm_search_csv(
query='metadata.event_type = "NETWORK_CONNECTION"',
start_time=start_time,
end_time=end_time,
fields=["timestamp", "user", "hostname", "process name"]
)
# Example response:
"""
metadata.eventTimestamp,principal.hostname,target.ip,target.port
2024-02-09T10:30:00Z,workstation-1,192.168.1.100,443
2024-02-09T10:31:00Z,workstation-2,192.168.1.101,80
"""
Query Validation
Validate a UDM query before execution:
query = 'target.ip != "" and principal.hostname = "test-host"'
validation = chronicle.validate_query(query)
# Example response:
{
"isValid": true,
"queryType": "QUERY_TYPE_UDM_QUERY",
"suggestedFields": [
"target.ip",
"principal.hostname"
]
}
Natural Language Search
Search for events using natural language instead of UDM query syntax:
from datetime import datetime, timedelta, timezone
# Set time range for queries
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(hours=24) # Last 24 hours
# Option 1: Translate natural language to UDM query
udm_query = chronicle.translate_nl_to_udm("show me network connections")
print(f"Translated query: {udm_query}")
# Example output: 'metadata.event_type="NETWORK_CONNECTION"'
# Then run the query manually if needed
results = chronicle.search_udm(
query=udm_query,
start_time=start_time,
end_time=end_time
)
# Option 2: Perform complete search with natural language
results = chronicle.nl_search(
text="show me failed login attempts",
start_time=start_time,
end_time=end_time,
max_events=100
)
# Example response (same format as search_udm):
{
"events": [
{
"event": {
"metadata": {
"eventTimestamp": "2024-02-09T10:30:00Z",
"eventType": "USER_LOGIN"
},
"principal": {
"user": {
"userid": "jdoe"
}
},
"securityResult": {
"action": "BLOCK",
"summary": "Failed login attempt"
}
}
}
],
"total_events": 1
}
The natural language search feature supports various query patterns:
- "Show me network connections"
- "Find suspicious processes"
- "Show login failures in the last hour"
- "Display connections to IP address 192.168.1.100"
If the natural language cannot be translated to a valid UDM query, an APIError will be raised with a message indicating that no valid query could be generated.
Entity Summary
Get detailed information about specific entities like IP addresses, domains, or file hashes. The function automatically detects the entity type based on the provided value and fetches a comprehensive summary including related entities, alerts, timeline, prevalence, and more.
# IP address summary
ip_summary = chronicle.summarize_entity(
value="8.8.8.8",
start_time=start_time,
end_time=end_time
)
# Domain summary
domain_summary = chronicle.summarize_entity(
value="google.com",
start_time=start_time,
end_time=end_time
)
# File hash summary (SHA256)
file_hash = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
file_summary = chronicle.summarize_entity(
value=file_hash,
start_time=start_time,
end_time=end_time
)
# Optionally hint the preferred type if auto-detection might be ambiguous
user_summary = chronicle.summarize_entity(
value="jdoe",
start_time=start_time,
end_time=end_time,
preferred_entity_type="USER"
)
# Example response structure (EntitySummary object):
# Access attributes like: ip_summary.primary_entity, ip_summary.related_entities,
# ip_summary.alert_counts, ip_summary.timeline, ip_summary.prevalence, etc.
# Example fields within the EntitySummary object:
# primary_entity: {
# "name": "entities/...",
# "metadata": {
# "entityType": "ASSET", # Or FILE, DOMAIN_NAME, USER, etc.
# "interval": { "startTime": "...", "endTime": "..." }
# },
# "metric": { "firstSeen": "...", "lastSeen": "..." },
# "entity": { # Contains specific details like 'asset', 'file', 'domain'
# "asset": { "ip": ["8.8.8.8"] }
# }
# }
# related_entities: [ { ... similar to primary_entity ... } ]
# alert_counts: [ { "rule": "Rule Name", "count": 5 } ]
# timeline: { "buckets": [ { "alertCount": 1, "eventCount": 10 } ], "bucketSize": "3600s" }
# prevalence: [ { "prevalenceTime": "...", "count": 100 } ]
# file_metadata_and_properties: { # Only for FILE entities
# "metadata": [ { "key": "...", "value": "..." } ],
# "properties": [ { "title": "...", "properties": [ { "key": "...", "value": "..." } ] } ]
# }
List IoCs (Indicators of Compromise)
Retrieve IoC matches against ingested events:
iocs = chronicle.list_iocs(
start_time=start_time,
end_time=end_time,
max_matches=1000,
add_mandiant_attributes=True,
prioritized_only=False
)
# Process the results
for ioc in iocs['matches']:
ioc_type = next(iter(ioc['artifactIndicator'].keys()))
ioc_value = next(iter(ioc['artifactIndicator'].values()))
print(f"IoC Type: {ioc_type}, Value: {ioc_value}")
print(f"Sources: {', '.join(ioc['sources'])}")
The IoC response includes:
- The indicator itself (domain, IP, hash, etc.)
- Sources and categories
- Affected assets in your environment
- First and last seen timestamps
- Confidence scores and severity ratings
- Associated threat actors and malware families (with Mandiant attributes)
Alerts and Case Management
Retrieve alerts and their associated cases:
# Get non-closed alerts
alerts = chronicle.get_alerts(
start_time=start_time,
end_time=end_time,
snapshot_query='feedback_summary.status != "CLOSED"',
max_alerts=100
)
# Get alerts from the response
alert_list = alerts.get('alerts', {}).get('alerts', [])
# Extract case IDs from alerts
case_ids = {alert.get('caseName') for alert in alert_list if alert.get('caseName')}
# Get case details using the batch API
if case_ids:
cases = chronicle.get_cases(list(case_ids))
# Process cases
for case in cases.cases:
print(f"Case: {case.display_name}")
print(f"Priority: {case.priority}")
print(f"Status: {case.status}")
print(f"Stage: {case.stage}")
# Access SOAR platform information if available
if case.soar_platform_info:
print(f"SOAR Case ID: {case.soar_platform_info.case_id}")
print(f"SOAR Platform: {case.soar_platform_info.platform_type}")
The alerts response includes:
- Progress status and completion status
- Alert counts (baseline and filtered)
- Alert details (rule information, detection details, etc.)
- Case associations
You can filter alerts using the snapshot query parameter with fields like:
detection.rule_namedetection.alert_statefeedback_summary.verdictfeedback_summary.priorityfeedback_summary.status
Case Management Helpers
The CaseList class provides helper methods for working with cases:
# Get details for specific cases (uses the batch API)
cases = chronicle.get_cases(["case-id-1", "case-id-2"])
# Filter cases by priority
high_priority = cases.filter_by_priority("PRIORITY_HIGH")
# Filter cases by status
open_cases = cases.filter_by_status("STATUS_OPEN")
# Look up a specific case
case = cases.get_case("case-id-1")
Note: The case management API uses the
legacy:legacyBatchGetCasesendpoint to retrieve multiple cases in a single request. You can retrieve up to 1000 cases in a single batch.
Investigation Management
Chronicle investigations provide automated analysis and recommendations for alerts and cases. The SDK provides methods to list, retrieve, trigger, and fetch associated investigations.
List investigations
Retrieve all investigations in your Chronicle instance:
# List all investigations
result = chronicle.list_investigations()
investigations = result.get("investigations", [])
for inv in investigations:
print(f"Investigation: {inv['displayName']}")
print(f" Status: {inv.get('status', 'N/A')}")
print(f" Verdict: {inv.get('verdict', 'N/A')}")
# List with pagination
result = chronicle.list_investigations(page_size=50, page_token="token")
Get investigation details
Retrieve a specific investigation by its ID:
# Get investigation by ID
investigation = chronicle.get_investigation(investigation_id="inv_123")
print(f"Name: {investigation['displayName']}")
print(f"Status: {investigation.get('status')}")
print(f"Verdict: {investigation.get('verdict')}")
print(f"Confidence: {investigation.get('confidence')}")
Trigger investigation for an alert
Create a new investigation for a specific alert:
# Trigger investigation for an alert
investigation = chronicle.trigger_investigation(alert_id="alert_123")
print(f"Investigation created: {investigation['name']}")
print(f"Status: {investigation.get('status')}")
print(f"Trigger type: {investigation.get('triggerType')}")
Fetch associated investigations
Retrieve investigations associated with alerts or cases:
from secops.chronicle import DetectionType
# Fetch investigations for specific alerts
result = chronicle.fetch_associated_investigations(
detection_type=DetectionType.ALERT,
alert_ids=["alert_123", "alert_456"],
association_limit_per_detection=5
)
# Process associations
associations_list = result.get("associationsList", {})
for alert_id, data in associations_list.items():
investigations = data.get("investigations", [])
print(f"Alert {alert_id}: {len(investigations)} investigation(s)")
for inv in investigations:
print(f" - {inv['displayName']}: {inv.get('verdict', 'N/A')}")
# Fetch investigations for cases
case_result = chronicle.fetch_associated_investigations(
detection_type=DetectionType.CASE,
case_ids=["case_123"],
association_limit_per_detection=3
)
# You can also use string values for detection_type
result = chronicle.fetch_associated_investigations(
detection_type="ALERT", # or "DETECTION_TYPE_ALERT"
alert_ids=["alert_123"]
)
Generating UDM Key/Value Mapping
Chronicle provides a feature to generate UDM key-value mapping for a given row log.
mapping = chronicle.generate_udm_key_value_mappings(
log_format="JSON",
log='{"events":[{"id":"123","user":"test_user","source_ip":"192.168.1.10"}]}',
use_array_bracket_notation=True,
compress_array_fields=False,
)
print(f"Generated UDM key/value mapping: {mapping}")
# Generate UDM key-value mapping
udm_mapping = chronicle.generate_udm_mapping(log_type="WINDOWS_AD")
print(udm_mapping)
Parser Management
Chronicle parsers are used to process and normalize raw log data into Chronicle's Unified Data Model (UDM) format. Parsers transform various log formats (JSON, XML, CEF, etc.) into a standardized structure that enables consistent querying and analysis across different data sources.
The SDK provides comprehensive support for managing Chronicle parsers:
Creating Parsers
Create new parser:
parser_text = """
filter {
mutate {
replace => {
"event1.idm.read_only_udm.metadata.event_type" => "GENERIC_EVENT"
"event1.idm.read_only_udm.metadata.vendor_name" => "ACME Labs"
}
}
grok {
match => {
"message" => ["^(?P<_firstWord>[^\s]+)\s.*$"]
}
on_error => "_grok_message_failed"
}
if ![_grok_message_failed] {
mutate {
replace => {
"event1.idm.read_only_udm.metadata.description" => "%{_firstWord}"
}
}
}
mutate {
merge => {
"@output" => "event1"
}
}
}
"""
log_type = "WINDOWS_AD"
# Create the parser
parser = chronicle.create_parser(
log_type=log_type,
parser_code=parser_text,
validated_on_empty_logs=True # Whether to validate parser on empty logs
)
parser_id = parser.get("name", "").split("/")[-1]
print(f"Parser ID: {parser_id}")
Managing Parsers
Retrieve, list, copy, activate/deactivate, and delete parsers:
# List all parsers (returns complete list)
parsers = chronicle.list_parsers()
for parser in parsers:
parser_id = parser.get("name", "").split("/")[-1]
state = parser.get("state")
print(f"Parser ID: {parser_id}, State: {state}")
# Manual pagination: get raw API response with nextPageToken
response = chronicle.list_parsers(page_size=50)
parsers = response.get("parsers", [])
next_token = response.get("nextPageToken")
# Use next_token for subsequent calls:
# response = chronicle.list_parsers(page_size=50, page_token=next_token)
log_type = "WINDOWS_AD"
# Get specific parser
parser = chronicle.get_parser(log_type=log_type, id=parser_id)
print(f"Parser content: {parser.get('text')}")
# Activate/Deactivate parser
chronicle.activate_parser(log_type=log_type, id=parser_id)
chronicle.deactivate_parser(log_type=log_type, id=parser_id)
# Copy an existing parser as a starting point
copied_parser = chronicle.copy_parser(log_type=log_type, id="pa_existing_parser")
# Delete parser
chronicle.delete_parser(log_type=log_type, id=parser_id)
# Force delete an active parser
chronicle.delete_parser(log_type=log_type, id=parser_id, force=True)
# Activate a release candidate parser
chronicle.activate_release_candidate_parser(log_type=log_type, id="pa_release_candidate")
Note: Parsers work in conjunction with log ingestion. When you ingest logs using
chronicle.ingest_log(), Chronicle automatically applies the appropriate parser based on the log type to transform your raw logs into UDM format. If you're working with custom log formats, you may need to create or configure custom parsers first.
Run Parser against sample logs
Run the parser on one or more sample logs:
# Sample parser code that extracts fields from logs
parser_text = """
filter {
mutate {
replace => {
"event1.idm.read_only_udm.metadata.event_type" => "GENERIC_EVENT"
"event1.idm.read_only_udm.metadata.vendor_name" => "ACME Labs"
}
}
grok {
match => {
"message" => ["^(?P<_firstWord>[^\s]+)\s.*$"]
}
on_error => "_grok_message_failed"
}
if ![_grok_message_failed] {
mutate {
replace => {
"event1.idm.read_only_udm.metadata.description" => "%{_firstWord}"
}
}
}
mutate {
merge => {
"@output" => "event1"
}
}
}
"""
log_type = "WINDOWS_AD"
# Sample log entries to test
sample_logs = [
'{"message": "ERROR: Failed authentication attempt"}',
'{"message": "WARNING: Suspicious activity detected"}',
'{"message": "INFO: User logged in successfully"}'
]
# Run parser evaluation
result = chronicle.run_parser(
log_type=log_type,
parser_code=parser_text,
parser_extension_code=None, # Optional parser extension
logs=sample_logs,
statedump_allowed=False # Enable if using statedump filters
)
# Check the results
if "runParserResults" in result:
for i, parser_result in enumerate(result["runParserResults"]):
print(f"\nLog {i+1} parsing result:")
if "parsedEvents" in parser_result:
print(f" Parsed events: {parser_result['parsedEvents']}")
if "errors" in parser_result:
print(f" Errors: {parser_result['errors']}")
The run_parser function includes comprehensive validation:
- Validates log type and parser code are provided
- Ensures logs are provided as a list of strings
- Enforces size limits (10MB per log, 50MB total, max 1000 logs)
- Provides detailed error messages for different failure scenarios
Complete Parser Workflow Example
Here's a complete example that demonstrates retrieving a parser, running it against a log, and ingesting the parsed UDM event:
# Step 1: List and retrieve an OKTA parser
parsers = chronicle.list_parsers(log_type="OKTA")
parser_id = parsers[0]["name"].split("/")[-1]
parser_details = chronicle.get_parser(log_type="OKTA", id=parser_id)
# Extract and decode parser code
import base64
parser_code = base64.b64decode(parser_details["cbn"]).decode('utf-8')
# Step 2: Run the parser against a sample log
okta_log = {
"actor": {"alternateId": "[email protected]", "displayName": "Test User"},
"eventType": "user.account.lock",
"outcome": {"result": "FAILURE", "reason": "LOCKED_OUT"},
"published": "2025-06-19T21:51:50.116Z"
# ... other OKTA log fields
}
result = chronicle.run_parser(
log_type="OKTA",
parser_code=parser_code,
parser_extension_code=None,
logs=[json.dumps(okta_log)]
)
# Step 3: Extract and ingest the parsed UDM event
if result["runParserResults"][0]["parsedEvents"]:
# parsedEvents is a dict with 'events' key containing the actual events list
parsed_events_data = result["runParserResults"][0]["parsedEvents"]
if isinstance(parsed_events_data, dict) and "events" in parsed_events_data:
events = parsed_events_data["events"]
if events and len(events) > 0:
# Extract the first event
if "event" in events[0]:
udm_event = events[0]["event"]
else:
udm_event = events[0]
# Ingest the parsed UDM event back into Chronicle
ingest_result = chronicle.ingest_udm(udm_events=udm_event)
print(f"UDM event ingested: {ingest_result}")
This workflow is useful for:
- Testing parsers before deployment
- Understanding how logs are transformed to UDM format
- Re-processing logs with updated parsers
- Debugging parsing issues
Parser Extension
Parser extensions provide a flexible way to extend the capabilities of existing default (or custom) parsers without replacing them. The extensions let you customize the parser pipeline by adding new parsing logic, extracting and transforming fields, and updating or removing UDM field mappings.
The SDK provides comprehensive support for managing Chronicle parser extensions:
List Parser Extensions
List parser extensions for a log type:
log_type = "OKTA"
extensions = chronicle.list_parser_extensions(log_type)
print(f"Found {len(extensions["parserExtensions"])} parser extensions for log type: {log_type}")
Create a new parser extension
Create new parser extension using either CBN snippet, field extractor or dynamic parsing:
log_type = "OKTA"
field_extractor = {
"extractors": [
{
"preconditionPath": "severity",
"preconditionValue": "Info",
"preconditionOp": "EQUALS",
"fieldPath": "displayMessage",
"destinationPath": "udm.metadata.description",
}
],
"logFormat": "JSON",
"appendRepeatedFields": True,
}
chronicle.create_parser_extension(log_type, field_extractor=field_extractor)
Get parser extension
Get parser extension details:
log_type = "OKTA"
extension_id = "1234567890"
extension = chronicle.get_parser_extension(log_type, extension_id)
print(extension)
Activate Parser Extension
Activate parser extension:
log_type = "OKTA"
extension_id = "1234567890"
chronicle.activate_parser_extension(log_type, extension_id)
Delete Parser Extension
Delete parser extension:
log_type = "OKTA"
extension_id = "1234567890"
chronicle.delete_parser_extension(log_type, extension_id)
Watchlist Management
Creating a Watchlist
Create a new watchlist:
watchlist = chronicle.create_watchlist(
name="my_watchlist",
display_name="my_watchlist",
multiplying_factor=1.5,
description="My new watchlist"
)
Updating a Watchlist
Update a watchlist by ID:
updated_watchlist = chronicle.update_watchlist(
watchlist_id="abc-123-def",
display_name="Updated Watchlist Name",
description="Updated description",
multiplying_factor=2.0,
entity_population_mechanism={"manual": {}},
watchlist_user_preferences={"pinned": True}
)
Deleting a Watchlist
Delete a watchlist by ID:
chronicle.delete_watchlist("acb-123-def", force=True)
Getting a Watchlist
Get a watchlist by ID:
watchlist = chronicle.get_watchlist("acb-123-def")
List all Watchlists
List all watchlists:
# List watchlists (returns dict with pagination metadata)
watchlists = chronicle.list_watchlists()
for watchlist in watchlists.get("watchlists", []):
print(f"Watchlist: {watchlist.get('displayName')}")
# List watchlists as a direct list (automatically fetches all pages)
watchlists = chronicle.list_watchlists(as_list=True)
for watchlist in watchlists:
print(f"Watchlist: {watchlist.get('displayName')}")
Rule Management
The SDK provides comprehensive support for managing Chronicle detection rules:
Creating Rules
Create new detection rules using YARA-L 2.0 syntax:
rule_text = """
rule simple_network_rule {
meta:
description = "Example rule to detect network connections"
author = "SecOps SDK Example"
severity = "Medium"
priority = "Medium"
yara_version = "YL2.0"
rule_version = "1.0"
events:
$e.metadata.event_type = "NETWORK_CONNECTION"
$e.principal.hostname != ""
condition:
$e
}
"""
# Create the rule
rule = chronicle.create_rule(rule_text)
rule_id = rule.get("name", "").split("/")[-1]
print(f"Rule ID: {rule_id}")
Managing Rules
Retrieve, list, update, enable/disable, and delete rules:
# List all rules
rules = chronicle.list_rules()
for rule in rules.get("rules", []):
rule_id = rule.get("name", "").split("/")[-1]
enabled = rule.get("deployment", {}).get("enabled", False)
print(f"Rule ID: {rule_id}, Enabled: {enabled}")
# List paginated rules and `REVISION_METADATA_ONLY` view
rules = chronicle.list_rules(view="REVISION_METADATA_ONLY",page_size=50)
print(f"Fetched {len(rules.get("rules"))} rules")
# Get specific rule
rule = chronicle.get_rule(rule_id)
print(f"Rule content: {rule.get('text')}")
# Update rule
updated_rule = chronicle.update_rule(rule_id, updated_rule_text)
# Enable/disable rule
deployment = chronicle.enable_rule(rule_id, enabled=True) # Enable
deployment = chronicle.enable_rule(rule_id, enabled=False) # Disable
# Delete rule
chronicle.delete_rule(rule_id)
Rule Deployment
Manage a rule's deployment (enabled/alerting/archive state and run frequency):
# Get current deployment for a rule
deployment = chronicle.get_rule_deployment(rule_id)
# List deployments (paginated)
page = chronicle.list_rule_deployments(page_size=10)
# List deployments with filter
filtered = chronicle.list_rule_deployments(filter_query="enabled=true")
# Update deployment fields (partial updates supported)
chronicle.update_rule_deployment(
rule_id=rule_id,
enabled=True, # continuously execute
alerting=False, # detections do not generate alerts
run_frequency="LIVE" # LIVE | HOURLY | DAILY
)
# Archive a rule (must set enabled to False when archived=True)
chronicle.update_rule_deployment(
rule_id=rule_id,
archived=True
)
Searching Rules
Search for rules using regular expressions:
# Search for rules containing specific patterns
results = chronicle.search_rules("suspicious process")
for rule in results.get("rules", []):
rule_id = rule.get("name", "").split("/")[-1]
print(f"Rule ID: {rule_id}, contains: 'suspicious process'")
# Find rules mentioning a specific MITRE technique
mitre_rules = chronicle.search_rules("T1055")
print(f"Found {len(mitre_rules.get('rules', []))} rules mentioning T1055 technique")
Testing Rules
Test rules against historical data to validate their effectiveness before deployment:
from datetime import datetime, timedelta, timezone
# Define time range for testing
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(days=7) # Test against last 7 days
# Rule to test
rule_text = """
rule test_rule {
meta:
description = "Test rule for validation"
author = "Test Author"
severity = "Low"
yara_version = "YL2.0"
rule_version = "1.0"
events:
$e.metadata.event_type = "NETWORK_CONNECTION"
condition:
$e
}
"""
# Test the rule
test_results = chronicle.run_rule_test(
rule_text=rule_text,
start_time=start_time,
end_time=end_time,
max_results=100
)
# Process streaming results
detection_count = 0
for result in test_results:
result_type = result.get("type")
if result_type == "progress":
# Progress update
percent_done = result.get("percentDone", 0)
print(f"Progress: {percent_done}%")
elif result_type == "detection":
# Detection result
detection_count += 1
detection = result.get("detection", {})
print(f"Detection {detection_count}:")
# Process detection details
if "rule_id" in detection:
print(f" Rule ID: {detection['rule_id']}")
if "data" in detection:
print(f" Data: {detection['data']}")
elif result_type == "error":
# Error information
print(f"Error: {result.get('message', 'Unknown error')}")
print(f"Finished testing. Found {detection_count} detection(s).")
Extract just the UDM events for programmatic processing
udm_events = []
for result in chronicle.run_rule_test(rule_text, start_time, end_time, max_results=100):
if result.get("type") == "detection":
detection = result.get("detection", {})
result_events = detection.get("resultEvents", {})
for var_name, var_data in result_events.items():
event_samples = var_data.get("eventSamples", [])
for sample in event_samples:
event = sample.get("event")
if event:
udm_events.append(event)
# Process the UDM events
for event in udm_events:
# Process each UDM event
metadata = event.get("metadata", {})
print(f"Event type: {metadata.get('eventType')}")
Retrohunts
Run rules against historical data to find past matches:
from datetime import datetime, timedelta, timezone
# Set time range for retrohunt
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(days=7) # Search past 7 days
# Create retrohunt
retrohunt = chronicle.create_retrohunt(rule_id, start_time, end_time)
operation_id = retrohunt.get("name", "").split("/")[-1]
# Check retrohunt status
retrohunt_status = chronicle.get_retrohunt(rule_id, operation_id)
state = retrohunt_status.get("state", "")
# List retrohunts for a rule
retrohunts = chronicle.list_retrohunts(rule_id)
Detections and Errors
Monitor rule detections and execution errors:
from datetime import datetime, timedelta
start_time = datetime.now(timezone.utc)
end_time = start_time - timedelta(days=7)
# List detections for a rule
detections = chronicle.list_detections(
rule_id=rule_id,
start_time=start_time,
end_time=end_time,
list_basis="CREATED_TIME"
)
for detection in detections.get("detections", []):
detection_id = detection.get("id", "")
event_time = detection.get("eventTime", "")
alerting = detection.get("alertState", "") == "ALERTING"
print(f"Detection: {detection_id}, Time: {event_time}, Alerting: {alerting}")
# List execution errors for a rule
errors = chronicle.list_errors(rule_id)
for error in errors.get("ruleExecutionErrors", []):
error_message = error.get("error_message", "")
create_time = error.get("create_time", "")
print(f"Error: {error_message}, Time: {create_time}")
Rule Alerts
Search for alerts generated by rules:
# Set time range for alert search
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(days=7) # Search past 7 days
# Search for rule alerts
alerts_response = chronicle.search_rule_alerts(
start_time=start_time,
end_time=end_time,
page_size=10
)
# The API returns a nested structure where alerts are grouped by rule
# Extract and process all alerts from this structure
all_alerts = []
too_many_alerts = alerts_response.get('tooManyAlerts', False)
# Process the nested response structure - alerts are grouped by rule
for rule_alert in alerts_response.get('ruleAlerts', []):
# Extract rule metadata
rule_metadata = rule_alert.get('ruleMetadata', {})
rule_id = rule_metadata.get('properties', {}).get('ruleId', 'Unknown')
rule_name = rule_metadata.get('properties', {}).get('name', 'Unknown')
# Get alerts for this rule
rule_alerts = rule_alert.get('alerts', [])
# Process each alert
for alert in rule_alerts:
# Extract important fields
alert_id = alert.get("id", "")
detection_time = alert.get("detectionTimestamp", "")
commit_time = alert.get("commitTimestamp", "")
alerting_type = alert.get("alertingType", "")
print(f"Alert ID: {alert_id}")
print(f"Rule ID: {rule_id}")
print(f"Rule Name: {rule_name}")
print(f"Detection Time: {detection_time}")
# Extract events from the alert
if 'resultEvents' in alert:
for var_name, event_data in alert.get('resultEvents', {}).items():
if 'eventSamples' in event_data:
for sample in event_data.get('eventSamples', []):
if 'event' in sample:
event = sample['event']
# Process event data
event_type = event.get('metadata', {}).get('eventType', 'Unknown')
print(f"Event Type: {event_type}")
If tooManyAlerts is True in the response, consider narrowing your search criteria using a smaller time window or more specific filters.
Curated Rule Sets
Query curated rules:
# List all curated rules (returns dict with pagination metadata)
result = chronicle.list_curated_rules()
for rule in result.get("curatedRules", []):
rule_id = rule.get("name", "").split("/")[-1]
display_name = rule.get