Today I will discuss how to build an ML-based network anomaly detection application for telecom networks to identify cyber security threats by using Dataflow, BigQuery ML, and Cloud Data Loss Prevention and used to help identify cybersecurity threats.
In this tutorial, I'm trying to focus on google cloud documentation key point to develop an anomaly detection application intended for data engineers and scientists. Also, I assume that you have basic knowledge of the following:
- Dataflow pipelines built using the Apache Beam Java SDK
- Standard SQL
- Basic shell scripting
- K-means clustering
Architecture Overview
The following diagram shows the components used to build an ML-based network anomaly detection system. Pub/Sub and Cloud Storage serve as data sources. Dataflow aggregates and extracts features from this data tokenized with DLP API. BigQuery ML creates a k-means clustering model from these features and Dataflow identifies the outliers.
Objectives
- Create a Pub/Sub topic and a subscription to generate synthetic NetFlow log data.
- Aggregate and extract features from NetFlow log data using Dataflow.
- Create a BigQuery ML k-means clustering model.
- Tokenize sensitive data with the DLP API.
- Create a Dataflow pipeline for real-time outlier detection, using normalized and trained data.
Before I begin discussing this tutorial, I need to address the costing issue.
The followings Google Cloud's billable components are used in this tutorial:
To generate a cost estimate based on your projected usage, use the pricing calculator.
Before you begin
- In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
- Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.
- At the bottom of the console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
You run all commands in this tutorial from the Cloud Shell.
- In Cloud Shell, enable the BigQuery, Dataflow, Cloud Storage, and DLP APIs.
gcloud services enable dlp.googleapis.com bigquery.googleapis.com \
dataflow.googleapis.com storage-component.googleapis.com \
pubsub.googleapis.com cloudbuild.googleapis.com
Generating synthetic data using Dataflow and Pub/Sub
In this section, you create a Pub/Sub topic and a subscription to generate synthetic NetFlow log data by triggering an automated Dataflow pipeline.
Create a Pub/Sub topic and a subscription
In Cloud Shell, create a Pub/Sub topic and a subscription:
export PROJECT_ID=$(gcloud config get-value project)
export TOPIC_ID=TOPIC_ID
export SUBSCRIPTION_ID=SUBSCRIPTION_ID
gcloud pubsub topics create $TOPIC_ID
gcloud pubsub subscriptions create $SUBSCRIPTION_ID --topic=$TOPIC_ID
Replace the following:
-
- TOPIC_ID: the name of the Pub/Sub topic
- SUBSCRIPTION_ID: the name of the Pub/Sub subscription
Trigger the synthetic data generation pipeline
- In Cloud Shell, clone the GitHub repository:
git clone https://github.com/GoogleCloudPlatform/df-ml-anomaly-detection.git
cd df-ml-anomaly-detection
- To enable submitting a job automatically, grant Dataflow permissions to your Cloud Build service account:
export PROJECT_NUMBER=$(gcloud projects list --filter=${PROJECT_ID} \
--format="value(PROJECT_NUMBER)")
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
--member serviceAccount:${PROJECT_NUMBER}@cloudbuild.gserviceaccount.com \
--role roles/dataflow.admin
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
--member serviceAccount:${PROJECT_NUMBER}@cloudbuild.gserviceaccount.com \
--role roles/compute.instanceAdmin.v1
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
--member serviceAccount:${PROJECT_NUMBER}@cloudbuild.gserviceaccount.com \
--role roles/iam.serviceAccountUser
- Start the synthetic data generation pipeline:
gcloud builds submit . --machine-type=n1-highcpu-8 \
--config scripts/cloud-build-data-generator.yaml \
--substitutions _TOPIC_ID=${TOPIC_ID}
Because of the large code package, you must use a high memory machine type. For this tutorial, use machine-type=n1-highcpu-8
.
- Validate that the log data is published in the subscription:
gcloud pubsub subscriptions pull ${SUBSCRIPTION_ID} --auto-ack --limit 1 >> raw_log.txt
cat raw_log.txt
The output contains a subset of NetFlow log schema fields populated with random values, similar to the following:
{
\"subscriberId\": \"mharper\",
\"srcIP\": \"12.0.9.4",
\"dstIP\": \"12.0.1.2\",
\"srcPort\": 5000,
\"dstPort\": 3000,
\"txBytes\": 15,
\"rxBytes\": 40,
\"startTime\": 1570276550,
\"endTime\": 1570276559,
\"tcpFlag\": 0,
\"protocolName\": \"tcp\",
\"protocolNumber\": 0
}
Extracting features and finding outlier data
In this section, you create BigQuery tables to store feature and outlier data processed by the anomaly detection pipeline.
Create BigQuery tables for storing feature and outlier data
- In Cloud Shell, create a BigQuery dataset:
export DATASET_NAME=DATASET_NAME
bq --location=US mk -d \
--description "Network Logs Dataset" \
${DATASET_NAME}
- Create BigQuery tables:
bq mk -t --schema src/main/resources/aggr_log_table_schema.json \
--time_partitioning_type=DAY \
--clustering_fields="dst_subnet,subscriber_id" \
--description "Network Log Feature Table" \
${PROJECT_ID}:${DATASET_NAME}.cluster_model_data
bq mk -t --schema src/main/resources/outlier_table_schema.json \
--description "Network Log Outlier Table" \
${PROJECT_ID}:${DATASET_NAME}.outlier_data
bq mk -t --schema src/main/resources/normalized_centroid_data_schema.json \
--description "Sample Normalized Data" \
${PROJECT_ID}:${DATASET_NAME}.normalized_centroid_data
The following tables are generated:
cluster_model_data
: a clustered partition table that stores feature values for model creation.outlier_data
: an outlier table that stores anomalies.normalized_centroid_data
: a table pre-populated with normalized data created from a sample model.
bq load \
--source_format=NEWLINE_DELIMITED_JSON \
${PROJECT_ID}:${DATASET_NAME}.normalized_centroid_data \
gs://df-ml-anomaly-detection-mock-data/sample_model/normalized_centroid_data.json src/main/resources/normalized_centroid_data_schema.json
Create and trigger a Dataflow Flex Template
In this section, you create a Dataflow Flex Template to trigger the anomaly detection pipeline.
-
In Cloud Shell, create a Docker image in your project:
gcloud auth configure-docker
gradle jib --image=gcr.io/${PROJECT_ID}/df-ml-anomaly-detection:latest -DmainClass=com.google.solutions.df.log.aggregations.SecureLogAggregationPipeline
- Upload the Flex Template configuration file to a Cloud Storage bucket:
export DF_TEMPLATE_CONFIG_BUCKET=${PROJECT_ID}-DF_TEMPLATE_CONFIG
gsutil mb -c standard -l REGION gs://${DF_TEMPLATE_CONFIG_BUCKET}
cat << EOF | gsutil cp - gs://${DF_TEMPLATE_CONFIG_BUCKET}/dynamic_template_secure_log_aggr_template.json
{"image": "gcr.io/${PROJECT_ID}/df-ml-anomaly-detection",
"sdk_info": {"language": "JAVA"}
}
EOF
Replace the following:
- PROJECT_ID: your Cloud project ID
- DF_TEMPLATE_CONFIG: the name of the Cloud Storage bucket for your Dataflow Flex Template configuration file
- REGION: the region where you want to create the Cloud Storage bucket
- Create a SQL file to pass the normalized model data as a pipeline parameter:
echo "SELECT * FROM \`${PROJECT_ID}.${DATASET_NAME}.normalized_centroid_data\`" > normalized_cluster_data.sql
gsutil cp normalized_cluster_data.sql gs://${DF_TEMPLATE_CONFIG_BUCKET}/
- Run the anomaly detection pipeline:
gcloud beta dataflow flex-template run "anomaly-detection" \
--project=${PROJECT_ID} \
--region=us-central1 \
--template-file-gcs-location=gs://${DF_TEMPLATE_CONFIG_BUCKET}/dynamic_template_secure_log_aggr_template.json \
--parameters=autoscalingAlgorithm="NONE",\
numWorkers=5,\
maxNumWorkers=5,\
workerMachineType=n1-highmem-4,\
subscriberId=projects/${PROJECT_ID}/subscriptions/${SUBSCRIPTION_ID},\
tableSpec=${PROJECT_ID}:${DATASET_NAME}.cluster_model_data,\
batchFrequency=2,\
customGcsTempLocation=gs://${DF_TEMPLATE_CONFIG_BUCKET}/temp,\
tempLocation=gs://${DF_TEMPLATE_CONFIG_BUCKET}/temp,\
clusterQuery=gs://${DF_TEMPLATE_CONFIG_BUCKET}/normalized_cluster_data.sql,\
outlierTableSpec=${PROJECT_ID}:${DATASET_NAME}.outlier_data,\
inputFilePattern=gs://df-ml-anomaly-detection-mock-data/flow_log*.json,\
workerDiskType=compute.googleapis.com/projects/${PROJECT_ID}/zones/us-central1-b/diskTypes/pd-ssd,\
diskSizeGb=5,\
windowInterval=10,\
writeMethod=FILE_LOADS,\
streaming=true
- In the console, go to the Dataflow page.
- Click the netflow-anomaly-detection-date +%Y%m%d-%H%M%S-%N` job. A representation of the Dataflow pipeline that's similar to the following appears:

Publish an outlier message for testing
You can publish a message to validate that the outlier message is correctly detected from the pipeline.
-
In Cloud Shell, publish the following message:
gcloud pubsub topics publish ${TOPIC_ID} --message \
"{\"subscriberId\": \"00000000000000000\", \
\"srcIP\": \"12.0.9.4\", \
\"dstIP\": \"12.0.1.3\", \
\"srcPort\": 5000, \
\"dstPort\": 3000, \
\"txBytes\": 150000, \
\"rxBytes\": 40000, \
\"startTime\": 1570276550, \
\"endTime\": 1570276550, \
\"tcpFlag\": 0, \
\"protocolName\": \"tcp\", \
\"protocolNumber\": 0}"
Notice the unusually high number of transmission (txBytes
) and receiving bytes (rxBytes
) compared to the range (100 to 500 bytes) set up for synthetic data. This message might indicate a security risk to validate.
- After a minute or so, validate that the anomaly is identified and stored in the BigQuery table:
export OUTLIER_TABLE_QUERY='SELECT subscriber_id,dst_subnet,transaction_time
FROM `'${PROJECT_ID}.${DATASET_NAME}'.outlier_data`
WHERE subscriber_id like "0%" limit 1'
bq query --nouse_legacy_sql $OUTLIER_TABLE_QUERY >> outlier_orig.txt
cat outlier_orig.txt
The output is similar to the following:
+---------------+--------------+----------------------------+| subscriber_id | dst_subnet | transaction_time |+---------------+--------------+----------------------------+| 00000000000| 12.0.1.3/22 | 2020-07-09 21:29:36.571000 |+---------------+--------------+---------------------------∓
Creating a k-means clustering model using BigQuery ML
- In the console, go to the BigQuery Query editor page.
- Select training data from the feature table and create a k-means clustering model using BigQuery ML:
--> temp table for training data
#standardSQL
CREATE OR REPLACE TABLE DATASET_NAME.train_data as
(SELECT * FROM DATASET_NAME.cluster_model_data
WHERE _PARTITIONDATE BETWEEN START_DATE AND END_DATE
AND NOT IS_NAN(avg_tx_bytes)
AND NOT IS_NAN(avg_rx_bytes)
AND NOT IS_NAN(avg_duration))
limit 100000;
--> create a model using BigQuery ML
#standardSQL
CREATE OR REPLACE MODEL DATASET_NAME.log_cluster options(model_type='kmeans', standardize_features = true) AS
SELECT * EXCEPT (transaction_time,subscriber_id,number_of_unique_ips, number_of_unique_ports, dst_subnet)
FROM DATASET_NAME.train_data;
Replace the following:
- START_DATE and END_DATE: the current date
- DATASET_NAME: the dataset that you created earlier
- Normalize the data for each cluster:
--> create normalize table for each centroid
#standardSQL
CREATE OR REPLACE TABLE DATASET_NAME.normalized_centroid_data as(
with centroid_details AS (
SELECT centroid_id,array_agg(struct(feature as name, round(numerical_value,1) as value)
order by centroid_id) AS cluster
from ML.CENTROIDS(model DATASET_NAME.log_cluster)
group by centroid_id
),
cluster as (select centroid_details.centroid_id as centroid_id,
(select value from unnest(cluster) where name = 'number_of_records') AS number_of_records,
(select value from unnest(cluster) where name = 'max_tx_bytes') AS max_tx_bytes,
(select value from unnest(cluster) where name = 'min_tx_bytes') AS min_tx_bytes,
(select value from unnest(cluster) where name = 'avg_tx_bytes') AS avg_tx_bytes,
(select value from unnest(cluster) where name = 'max_rx_bytes') AS max_rx_bytes,
(select value from unnest(cluster) where name = 'min_rx_bytes') AS min_rx_bytes,
(select value from unnest(cluster) where name = 'avg_rx_bytes') AS avg_rx_bytes,
(select value from unnest(cluster) where name = 'max_duration') AS max_duration,
(select value from unnest(cluster) where name = 'min_duration') AS min_duration,
(select value from unnest(cluster) where name = 'avg_duration') AS avg_duration
FROM centroid_details order by centroid_id asc),
predict as
(select * from ML.PREDICT(model DATASET_NAME.log_cluster,
(select * from DATASET_NAME.train_data)))
select c.centroid_id as centroid_id,
(stddev((p.number_of_records-c.number_of_records)+(p.max_tx_bytes-c.max_tx_bytes)+(p.min_tx_bytes-c.min_tx_bytes)+(p.avg_tx_bytes-c.min_tx_bytes)+(p.max_rx_bytes-c.max_rx_bytes)+(p.min_rx_bytes-c.min_rx_bytes)+ (p.avg_rx_bytes-c.min_rx_bytes)
+(p.max_duration-c.max_duration)+(p.min_duration-c.min_duration)+(p.avg_duration-c.avg_duration)))
as normalized_dest, any_value(c.number_of_records) as number_of_records,any_value(c.max_tx_bytes) as max_tx_bytes, any_value(c.min_tx_bytes) as min_tx_bytes , any_value(c.avg_tx_bytes) as avg_tx_bytes,any_value(c.max_rx_bytes) as max_rx_bytes, any_value(c.min_tx_bytes) as min_rx_bytes ,any_value(c.avg_rx_bytes) as avg_rx_bytes, any_value(c.avg_duration) as avg_duration,any_value(c.max_duration)
as max_duration , any_value(c.min_duration) as min_duration
from predict as p
inner join cluster as c on c.centroid_id = p.centroid_id
group by c.centroid_id);
This query calculates a normalized distance for each cluster by using the standard deviation function between the input and centroid vectors. In other words, it implements the following formula:
stddev(input_value_x-centroid_value_x)+(input_value_y-centroid_value_y)+(..))
- Validate the
normalized_centroid_data
table:
#standardSQL
SELECT * from DATASET_NAME.normalized_centroid_data
The result from this statement is a table of calculated normalized distances for each centroid ID:
De-identifying data using Cloud DLP
In this section, you reuse the pipeline by passing an additional parameter to de-identify the international mobile subscriber identity (IMSI) number in the subscriber_id
column.
- In Cloud Shell, create a crypto key:
export TEK=$(openssl rand -base64 32); echo ${TEK}
a3ecrQAQJJ8oxVO8TZ/odlfjcujhWXjU/Xg5lEFiw5M=
- To launch the code editor, on the toolbar of the Cloud Shell window, click Open Editor edit.
- Click File > New File and create a file named
deid_template.json
.
- Copy the following JSON block into the new file:
{
"deidentifyTemplate": {
"displayName": "Config to de-identify IMEI Number",
"description": "IMEI Number masking transformation",
"deidentifyConfig": {
"recordTransformations": {
"fieldTransformations": [
{
"fields": [
{
"name": "subscriber_id"
}
],
"primitiveTransformation": {
"cryptoDeterministicConfig": {
"cryptoKey": {
"unwrapped": {
"key": "CRYPTO_KEY"
}
},
"surrogateInfoType": {
"name": "IMSI_TOKEN"
}
}
}
}
]
}
}
},
"templateId": "dlp-deid-subid"
}
Replace CRYPTO_KEY with the crypto key that you previously created. It's a best practice to use a Cloud KMS wrapped key for production workloads. Save the file.
- In the Cloud Shell toolbar, click Open Terminal.
- In the Cloud Shell terminal, create a Cloud DLP de-identify template:
export DLP_API_ROOT_URL="https://dlp.googleapis.com"
export DEID_TEMPLATE_API="${DLP_API_ROOT_URL}/v2/projects/${PROJECT_ID}/deidentifyTemplates"
export DEID_CONFIG="@deid_template.json"
export ACCESS_TOKEN=$(gcloud auth print-access-token)
curl -X POST -H "Content-Type: application/json" \
-H "Authorization: Bearer ${ACCESS_TOKEN}" \
"${DEID_TEMPLATE_API}" \
-d "${DEID_CONFIG}"
This creates a template with the following name in your Cloud project:
"name": "projects/${PROJECT_ID}/deidentifyTemplates/dlp-deid-sub-id"
- Stop the pipeline that you triggered in an earlier step:
gcloud dataflow jobs list --filter="name=anomaly-detection" --state=active
- Trigger the anomaly detection pipeline using the Cloud DLP de-identify the template name:
gcloud beta dataflow flex-template run "anomaly-detection-with-dlp" \
--project=${PROJECT_ID} \
--region=us-central1 \
--template-file-gcs-location=gs://${DF_TEMPLATE_CONFIG_BUCKET}/dynamic_template_secure_log_aggr_template.json \
--parameters=autoscalingAlgorithm="NONE",\
numWorkers=5,\
maxNumWorkers=5,\
workerMachineType=n1-highmem-4,\
subscriberId=projects/${PROJECT_ID}/subscriptions/${SUBSCRIPTION_ID},\
tableSpec=${PROJECT_ID}:${DATASET_NAME}.cluster_model_data,\
batchFrequency=2,\
customGcsTempLocation=gs://${DF_TEMPLATE_CONFIG_BUCKET}/temp,\
tempLocation=gs://${DF_TEMPLATE_CONFIG_BUCKET}/temp,\
clusterQuery=gs://${DF_TEMPLATE_CONFIG_BUCKET}/normalized_cluster_data.sql,\
outlierTableSpec=${PROJECT_ID}:${DATASET_NAME}.outlier_data,\
inputFilePattern=gs://df-ml-anomaly-detection-mock-data/flow_log*.json,\
workerDiskType=compute.googleapis.com/projects/${PROJECT_ID}/zones/us-central1-b/diskTypes/pd-ssd,\
diskSizeGb=5,\
windowInterval=10,\
writeMethod=FILE_LOADS,\
streaming=true,\
deidTemplateName=projects/${PROJECT_ID}/deidentifyTemplates/dlp-deid-subid
- Query the outlier table to validate that the subscriber ID is successfully de-identified:
export DLP_OUTLIER_TABLE_QUERY='SELECT subscriber_id,dst_subnet,transaction_time
FROM `'${PROJECT_ID}.${DATASET_NAME}'.outlier_data`
ORDER BY transaction_time DESC'
bq query --nouse_legacy_sql $DLP_OUTLIER_TABLE_QUERY >> outlier_deid.txt
cat outlier_deid.txt
The output is similar to the following:
+---------------+--------------+----------------------------+| subscriber_id | dst_subnet | transaction_time |+---------------+--------------+----------------------------+| IMSI_TOKEN(64):AcZD2U2v//QiKkGzbFCm29pv5cqVi3Db09Z6CNt5cQSevBKRQvgdDfacPQIRY1dc| 12.0.1.3/22 | 2020-07-09 21:29:36.571000 |+---------------+--------------+---------------------------∓
If the subscriber ID was de-identified, the subscriber_id
column is no longer the original subscriber ID, which was 00000000000
This is a summary of the tutorials from the Google Cloud documentation. Please see the extra credit links below for source code and an overview video based on this discussion.
Extra Credit:
- Sample code for this tutorils
- https://cloud.google.com/blog/products/data-analytics/anomaly-detection-using-streaming-analytics-and-ai