How to build a secure ML-based network anomaly detection solution for telecommunication networks | C2C Community

How to build a secure ML-based network anomaly detection solution for telecommunication networks

  • 22 August 2022
  • 7 replies
  • 199 views

Userlevel 7
Badge +35

Today I will discuss how to build an ML-based network anomaly detection application for telecom networks to identify cyber security threats by using Dataflow, BigQuery ML, and Cloud Data Loss Prevention and used to help identify cybersecurity threats.

In this tutorial, I'm trying to focus on google cloud documentation key point to develop an anomaly detection application intended for data engineers and scientists. Also, I assume that you have basic knowledge of the following:

Architecture Overview

 

The following diagram shows the components used to build an ML-based network anomaly detection system. Pub/Sub and Cloud Storage serve as data sources. Dataflow aggregates and extracts features from this data tokenized with DLP API. BigQuery ML creates a k-means clustering model from these features and Dataflow identifies the outliers.

 

Anomaly detection reference architecture usingDataflow and BigQuery ML.

Objectives

  • Create a Pub/Sub topic and a subscription to generate synthetic NetFlow log data.
  • Aggregate and extract features from NetFlow log data using Dataflow.
  • Create a BigQuery ML k-means clustering model.
  • Tokenize sensitive data with the DLP API.
  • Create a Dataflow pipeline for real-time outlier detection, using normalized and trained data.

Before I begin discussing this tutorial, I need to address the costing issue.

 

The followings Google Cloud's billable components are used in this tutorial:

To generate a cost estimate based on your projected usage, use the pricing calculator.

Before you begin

 

  1. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
  1. Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.

 

  1. At the bottom of the console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

You run all commands in this tutorial from the Cloud Shell.

  1. In Cloud Shell, enable the BigQuery, Dataflow, Cloud Storage, and DLP APIs.
gcloud services enable dlp.googleapis.com bigquery.googleapis.com \
dataflow.googleapis.com storage-component.googleapis.com \
pubsub.googleapis.com cloudbuild.googleapis.com

Generating synthetic data using Dataflow and Pub/Sub

 

In this section, you create a Pub/Sub topic and a subscription to generate synthetic NetFlow log data by triggering an automated Dataflow pipeline.

Create a Pub/Sub topic and a subscription

 

In Cloud Shell, create a Pub/Sub topic and a subscription:

export PROJECT_ID=$(gcloud config get-value project)
export TOPIC_ID=TOPIC_ID
export SUBSCRIPTION_ID=SUBSCRIPTION_ID
gcloud pubsub topics create $TOPIC_ID
gcloud pubsub subscriptions create $SUBSCRIPTION_ID --topic=$TOPIC_ID

Replace the following:

    • TOPIC_ID: the name of the Pub/Sub topic
    • SUBSCRIPTION_ID: the name of the Pub/Sub subscription

Trigger the synthetic data generation pipeline

  1. In Cloud Shell, clone the GitHub repository:
git clone https://github.com/GoogleCloudPlatform/df-ml-anomaly-detection.git
cd df-ml-anomaly-detection
  1. To enable submitting a job automatically, grant Dataflow permissions to your Cloud Build service account:
export PROJECT_NUMBER=$(gcloud projects list --filter=${PROJECT_ID} \
--format="value(PROJECT_NUMBER)")

gcloud projects add-iam-policy-binding ${PROJECT_ID} \
--member serviceAccount:${PROJECT_NUMBER}@cloudbuild.gserviceaccount.com \
--role roles/dataflow.admin

gcloud projects add-iam-policy-binding ${PROJECT_ID} \
--member serviceAccount:${PROJECT_NUMBER}@cloudbuild.gserviceaccount.com \
--role roles/compute.instanceAdmin.v1

gcloud projects add-iam-policy-binding ${PROJECT_ID} \
--member serviceAccount:${PROJECT_NUMBER}@cloudbuild.gserviceaccount.com \
--role roles/iam.serviceAccountUser

  1. Start the synthetic data generation pipeline:
gcloud builds submit . --machine-type=n1-highcpu-8 \
--config scripts/cloud-build-data-generator.yaml \
--substitutions _TOPIC_ID=${TOPIC_ID}

Because of the large code package, you must use a high memory machine type. For this tutorial, use machine-type=n1-highcpu-8.

  1. Validate that the log data is published in the subscription:
gcloud pubsub subscriptions pull ${SUBSCRIPTION_ID} --auto-ack --limit 1 >> raw_log.txt
cat raw_log.txt

The output contains a subset of NetFlow log schema fields populated with random values, similar to the following:

 

{
\"subscriberId\": \"mharper\",
\"srcIP\": \"12.0.9.4",
\"dstIP\": \"12.0.1.2\",
\"srcPort\": 5000,
\"dstPort\": 3000,
\"txBytes\": 15,
\"rxBytes\": 40,
\"startTime\": 1570276550,
\"endTime\": 1570276559,
\"tcpFlag\": 0,
\"protocolName\": \"tcp\",
\"protocolNumber\": 0
}

Extracting features and finding outlier data

In this section, you create BigQuery tables to store feature and outlier data processed by the anomaly detection pipeline.

Create BigQuery tables for storing feature and outlier data

 

  1. In Cloud Shell, create a BigQuery dataset:
export DATASET_NAME=DATASET_NAME
bq --location=US mk -d \
--description "Network Logs Dataset" \
${DATASET_NAME}
  1. Create BigQuery tables:
bq mk -t --schema src/main/resources/aggr_log_table_schema.json \
--time_partitioning_type=DAY \
--clustering_fields="dst_subnet,subscriber_id" \
--description "Network Log Feature Table" \
${PROJECT_ID}:${DATASET_NAME}.cluster_model_data

bq mk -t --schema src/main/resources/outlier_table_schema.json \
--description "Network Log Outlier Table" \
${PROJECT_ID}:${DATASET_NAME}.outlier_data

bq mk -t --schema src/main/resources/normalized_centroid_data_schema.json \
--description "Sample Normalized Data" \
${PROJECT_ID}:${DATASET_NAME}.normalized_centroid_data

The following tables are generated:

  • cluster_model_data: a clustered partition table that stores feature values for model creation.
  • outlier_data: an outlier table that stores anomalies.
  • normalized_centroid_data: a table pre-populated with normalized data created from a sample model.
3. Load the centroid data into the tables:
bq load \
--source_format=NEWLINE_DELIMITED_JSON \
${PROJECT_ID}:${DATASET_NAME}.normalized_centroid_data \
gs://df-ml-anomaly-detection-mock-data/sample_model/normalized_centroid_data.json src/main/resources/normalized_centroid_data_schema.json

Create and trigger a Dataflow Flex Template

In this section, you create a Dataflow Flex Template to trigger the anomaly detection pipeline.

  1. In Cloud Shell, create a Docker image in your project:

gcloud auth configure-docker
gradle jib --image=gcr.io/${PROJECT_ID}/df-ml-anomaly-detection:latest -DmainClass=com.google.solutions.df.log.aggregations.SecureLogAggregationPipeline
  1. Upload the Flex Template configuration file to a Cloud Storage bucket:
export DF_TEMPLATE_CONFIG_BUCKET=${PROJECT_ID}-DF_TEMPLATE_CONFIG
gsutil mb -c standard -l REGION gs://${DF_TEMPLATE_CONFIG_BUCKET}
cat << EOF | gsutil cp - gs://${DF_TEMPLATE_CONFIG_BUCKET}/dynamic_template_secure_log_aggr_template.json
{"image": "gcr.io/${PROJECT_ID}/df-ml-anomaly-detection",
"sdk_info": {"language": "JAVA"}
}
EOF

Replace the following:

  • PROJECT_ID: your Cloud project ID
  • DF_TEMPLATE_CONFIG: the name of the Cloud Storage bucket for your Dataflow Flex Template configuration file
  • REGION: the region where you want to create the Cloud Storage bucket
  1. Create a SQL file to pass the normalized model data as a pipeline parameter:
echo "SELECT * FROM \`${PROJECT_ID}.${DATASET_NAME}.normalized_centroid_data\`" > normalized_cluster_data.sql
gsutil cp normalized_cluster_data.sql gs://${DF_TEMPLATE_CONFIG_BUCKET}/
  1. Run the anomaly detection pipeline:

 

gcloud beta dataflow flex-template run "anomaly-detection" \
--project=${PROJECT_ID} \
--region=us-central1 \
--template-file-gcs-location=gs://${DF_TEMPLATE_CONFIG_BUCKET}/dynamic_template_secure_log_aggr_template.json \
--parameters=autoscalingAlgorithm="NONE",\
numWorkers=5,\
maxNumWorkers=5,\
workerMachineType=n1-highmem-4,\
subscriberId=projects/${PROJECT_ID}/subscriptions/${SUBSCRIPTION_ID},\
tableSpec=${PROJECT_ID}:${DATASET_NAME}.cluster_model_data,\
batchFrequency=2,\
customGcsTempLocation=gs://${DF_TEMPLATE_CONFIG_BUCKET}/temp,\
tempLocation=gs://${DF_TEMPLATE_CONFIG_BUCKET}/temp,\
clusterQuery=gs://${DF_TEMPLATE_CONFIG_BUCKET}/normalized_cluster_data.sql,\
outlierTableSpec=${PROJECT_ID}:${DATASET_NAME}.outlier_data,\
inputFilePattern=gs://df-ml-anomaly-detection-mock-data/flow_log*.json,\
workerDiskType=compute.googleapis.com/projects/${PROJECT_ID}/zones/us-central1-b/diskTypes/pd-ssd,\
diskSizeGb=5,\
windowInterval=10,\
writeMethod=FILE_LOADS,\
streaming=true
  1. In the console, go to the Dataflow page.
  1. Click the netflow-anomaly-detection-date +%Y%m%d-%H%M%S-%N` job. A representation of the Dataflow pipeline that's similar to the following appears:

Publish an outlier message for testing

You can publish a message to validate that the outlier message is correctly detected from the pipeline.

  1. In Cloud Shell, publish the following message:

gcloud pubsub topics publish ${TOPIC_ID} --message \
"{\"subscriberId\": \"00000000000000000\", \
\"srcIP\": \"12.0.9.4\", \
\"dstIP\": \"12.0.1.3\", \
\"srcPort\": 5000, \
\"dstPort\": 3000, \
\"txBytes\": 150000, \
\"rxBytes\": 40000, \
\"startTime\": 1570276550, \
\"endTime\": 1570276550, \
\"tcpFlag\": 0, \
\"protocolName\": \"tcp\", \
\"protocolNumber\": 0}"

Notice the unusually high number of transmission (txBytes) and receiving bytes (rxBytes) compared to the range (100 to 500 bytes) set up for synthetic data. This message might indicate a security risk to validate.

 

  1. After a minute or so, validate that the anomaly is identified and stored in the BigQuery table:

 

export OUTLIER_TABLE_QUERY='SELECT subscriber_id,dst_subnet,transaction_time
FROM `'${PROJECT_ID}.${DATASET_NAME}'.outlier_data`
WHERE subscriber_id like "0%" limit 1'
bq query --nouse_legacy_sql $OUTLIER_TABLE_QUERY >> outlier_orig.txt
cat outlier_orig.txt

The output is similar to the following:

+---------------+--------------+----------------------------+| subscriber_id |  dst_subnet  |   transaction_time |+---------------+--------------+----------------------------+| 00000000000| 12.0.1.3/22 | 2020-07-09 21:29:36.571000 |+---------------+--------------+---------------------------∓

 

Creating a k-means clustering model using BigQuery ML

 

  1. In the console, go to the BigQuery Query editor page.
  1. Select training data from the feature table and create a k-means clustering model using BigQuery ML:
--> temp table for training data
#standardSQL
CREATE OR REPLACE TABLE DATASET_NAME.train_data as
(SELECT * FROM DATASET_NAME.cluster_model_data
WHERE _PARTITIONDATE BETWEEN START_DATE AND END_DATE
AND NOT IS_NAN(avg_tx_bytes)
AND NOT IS_NAN(avg_rx_bytes)
AND NOT IS_NAN(avg_duration))
limit 100000;

--> create a model using BigQuery ML
#standardSQL
CREATE OR REPLACE MODEL DATASET_NAME.log_cluster options(model_type='kmeans', standardize_features = true) AS
SELECT * EXCEPT (transaction_time,subscriber_id,number_of_unique_ips, number_of_unique_ports, dst_subnet)
FROM DATASET_NAME.train_data;

Replace the following:

  • START_DATE and END_DATE: the current date
  • DATASET_NAME: the dataset that you created earlier
  1. Normalize the data for each cluster:

 

--> create normalize table for each centroid
#standardSQL
CREATE OR REPLACE TABLE DATASET_NAME.normalized_centroid_data as(
with centroid_details AS (
SELECT centroid_id,array_agg(struct(feature as name, round(numerical_value,1) as value)
order by centroid_id) AS cluster
from ML.CENTROIDS(model DATASET_NAME.log_cluster)
group by centroid_id
),
cluster as (select centroid_details.centroid_id as centroid_id,
(select value from unnest(cluster) where name = 'number_of_records') AS number_of_records,
(select value from unnest(cluster) where name = 'max_tx_bytes') AS max_tx_bytes,
(select value from unnest(cluster) where name = 'min_tx_bytes') AS min_tx_bytes,
(select value from unnest(cluster) where name = 'avg_tx_bytes') AS avg_tx_bytes,
(select value from unnest(cluster) where name = 'max_rx_bytes') AS max_rx_bytes,
(select value from unnest(cluster) where name = 'min_rx_bytes') AS min_rx_bytes,
(select value from unnest(cluster) where name = 'avg_rx_bytes') AS avg_rx_bytes,
(select value from unnest(cluster) where name = 'max_duration') AS max_duration,
(select value from unnest(cluster) where name = 'min_duration') AS min_duration,
(select value from unnest(cluster) where name = 'avg_duration') AS avg_duration
FROM centroid_details order by centroid_id asc),
predict as
(select * from ML.PREDICT(model DATASET_NAME.log_cluster,
(select * from DATASET_NAME.train_data)))
select c.centroid_id as centroid_id,
(stddev((p.number_of_records-c.number_of_records)+(p.max_tx_bytes-c.max_tx_bytes)+(p.min_tx_bytes-c.min_tx_bytes)+(p.avg_tx_bytes-c.min_tx_bytes)+(p.max_rx_bytes-c.max_rx_bytes)+(p.min_rx_bytes-c.min_rx_bytes)+ (p.avg_rx_bytes-c.min_rx_bytes)
+(p.max_duration-c.max_duration)+(p.min_duration-c.min_duration)+(p.avg_duration-c.avg_duration)))
as normalized_dest, any_value(c.number_of_records) as number_of_records,any_value(c.max_tx_bytes) as max_tx_bytes, any_value(c.min_tx_bytes) as min_tx_bytes , any_value(c.avg_tx_bytes) as avg_tx_bytes,any_value(c.max_rx_bytes) as max_rx_bytes, any_value(c.min_tx_bytes) as min_rx_bytes ,any_value(c.avg_rx_bytes) as avg_rx_bytes, any_value(c.avg_duration) as avg_duration,any_value(c.max_duration)
as max_duration , any_value(c.min_duration) as min_duration
from predict as p
inner join cluster as c on c.centroid_id = p.centroid_id
group by c.centroid_id);

This query calculates a normalized distance for each cluster by using the standard deviation function between the input and centroid vectors. In other words, it implements the following formula:

stddev(input_value_x-centroid_value_x)+(input_value_y-centroid_value_y)+(..))
  1. Validate the normalized_centroid_data table:
#standardSQL
SELECT * from DATASET_NAME.normalized_centroid_data

The result from this statement is a table of calculated normalized distances for each centroid ID:

 

Normalized data for each k-means cluster.

 

 

De-identifying data using Cloud DLP

In this section, you reuse the pipeline by passing an additional parameter to de-identify the international mobile subscriber identity (IMSI) number in the subscriber_id column.

  1. In Cloud Shell, create a crypto key:
export TEK=$(openssl rand -base64 32); echo ${TEK}
a3ecrQAQJJ8oxVO8TZ/odlfjcujhWXjU/Xg5lEFiw5M=
  1. To launch the code editor, on the toolbar of the Cloud Shell window, click Open Editor edit.
  1. Click File > New File and create a file named deid_template.json.
  1. Copy the following JSON block into the new file:
 
{
"deidentifyTemplate": {
"displayName": "Config to de-identify IMEI Number",
"description": "IMEI Number masking transformation",
"deidentifyConfig": {
"recordTransformations": {
"fieldTransformations": [
{
"fields": [
{
"name": "subscriber_id"
}
],
"primitiveTransformation": {
"cryptoDeterministicConfig": {
"cryptoKey": {
"unwrapped": {
"key": "CRYPTO_KEY"
}
},
"surrogateInfoType": {
"name": "IMSI_TOKEN"
}
}
}
}
]
}
}
},
"templateId": "dlp-deid-subid"
}
 Replace CRYPTO_KEY with the crypto key that you previously created. It's a best practice to use a Cloud KMS wrapped key for production workloads. Save the file. 
  1. In the Cloud Shell toolbar, click Open Terminal.
  2. In the Cloud Shell terminal, create a Cloud DLP de-identify template:
export DLP_API_ROOT_URL="https://dlp.googleapis.com"
export DEID_TEMPLATE_API="${DLP_API_ROOT_URL}/v2/projects/${PROJECT_ID}/deidentifyTemplates"
export DEID_CONFIG="@deid_template.json"

export ACCESS_TOKEN=$(gcloud auth print-access-token)
curl -X POST -H "Content-Type: application/json" \
-H "Authorization: Bearer ${ACCESS_TOKEN}" \
"${DEID_TEMPLATE_API}" \
-d "${DEID_CONFIG}"

This creates a template with the following name in your Cloud project:

"name": "projects/${PROJECT_ID}/deidentifyTemplates/dlp-deid-sub-id"

  1. Stop the pipeline that you triggered in an earlier step:
gcloud dataflow jobs list --filter="name=anomaly-detection" --state=active
  1. Trigger the anomaly detection pipeline using the Cloud DLP de-identify the template name:
gcloud beta dataflow flex-template run "anomaly-detection-with-dlp" \
--project=${PROJECT_ID} \
--region=us-central1 \
--template-file-gcs-location=gs://${DF_TEMPLATE_CONFIG_BUCKET}/dynamic_template_secure_log_aggr_template.json \
--parameters=autoscalingAlgorithm="NONE",\
numWorkers=5,\
maxNumWorkers=5,\
workerMachineType=n1-highmem-4,\
subscriberId=projects/${PROJECT_ID}/subscriptions/${SUBSCRIPTION_ID},\
tableSpec=${PROJECT_ID}:${DATASET_NAME}.cluster_model_data,\
batchFrequency=2,\
customGcsTempLocation=gs://${DF_TEMPLATE_CONFIG_BUCKET}/temp,\
tempLocation=gs://${DF_TEMPLATE_CONFIG_BUCKET}/temp,\
clusterQuery=gs://${DF_TEMPLATE_CONFIG_BUCKET}/normalized_cluster_data.sql,\
outlierTableSpec=${PROJECT_ID}:${DATASET_NAME}.outlier_data,\
inputFilePattern=gs://df-ml-anomaly-detection-mock-data/flow_log*.json,\
workerDiskType=compute.googleapis.com/projects/${PROJECT_ID}/zones/us-central1-b/diskTypes/pd-ssd,\
diskSizeGb=5,\
windowInterval=10,\
writeMethod=FILE_LOADS,\
streaming=true,\
deidTemplateName=projects/${PROJECT_ID}/deidentifyTemplates/dlp-deid-subid
  1. Query the outlier table to validate that the subscriber ID is successfully de-identified:
export DLP_OUTLIER_TABLE_QUERY='SELECT subscriber_id,dst_subnet,transaction_time
FROM `'${PROJECT_ID}.${DATASET_NAME}'.outlier_data`
ORDER BY transaction_time DESC'

bq query --nouse_legacy_sql $DLP_OUTLIER_TABLE_QUERY >> outlier_deid.txt

cat outlier_deid.txt

The output is similar to the following:

+---------------+--------------+----------------------------+| subscriber_id |  dst_subnet  |      transaction_time      |+---------------+--------------+----------------------------+| IMSI_TOKEN(64):AcZD2U2v//QiKkGzbFCm29pv5cqVi3Db09Z6CNt5cQSevBKRQvgdDfacPQIRY1dc| 12.0.1.3/22 | 2020-07-09 21:29:36.571000 |+---------------+--------------+---------------------------∓

If the subscriber ID was de-identified, the subscriber_id column is no longer the original subscriber ID, which was 00000000000

 

This is a summary of the tutorials from the Google Cloud documentation. Please see the extra credit links below for source code and an overview video based on this discussion.

Extra Credit:

 


7 replies

Userlevel 7
Badge +28

wow! Great post, @malamin !

Userlevel 7
Badge +35

Thank you so much @Dimitris Petrakis .

I tried following the tutorial, but i keep on getting this error when building flex template.

 

Userlevel 7
Badge +65

Thanks for the post @Ribesh 

@malamin can you help us here? 🤔

Userlevel 7
Badge +35

Thank you, @Ribesh for the post.

Please make sure you have properly assigned the global environment variable, enable the necessary API and project create auth permission for the experiment in this use-case tutorial. 

gcloud services enable storage_component
gcloud services enable dataflow
gcloud services enable cloudbuild.googleapis.com
gcloud config set project <project_id>
gcloud config set project PROJECT_ID

and gcloud auth command.

Sometimes ${PROJECT_ID} variable it does not work inside the command directly if it is not properly assigned for that reason you should try this command: gcloud config set project PROJECT_ID or you can directly implement project id replace the project id variable when executing the command.

 

Based on the error screenshot that you have posted here please follow the error screenshot debugging instructions to find out the actual cause :

Also, you should read the tutorial properly and check the referencing url, specifically the following URL :

https://github.com/GoogleCloudPlatform/df-ml-anomaly-detection#anomaly-detection-in-netflow-log

https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates

I hope it might help you overcome the error. Otherwise, please post log and debugging information here so that I can assist you further.

Userlevel 7
Badge +35

Thank you so much for reminding me of the post, @ilias .

Userlevel 7
Badge +65

Hi @Ribesh 

Have you checked @malamin ‘s answer?

Reply