STATSML 200: Unlock Dataset Metadata Insights via Adobe Experience Platform APIs and Python
This chapter covers the essential steps for installing necessary libraries, generating access tokens, and making authenticated API requests.
Last updated
Use Case Overview
Our objective is to create a snapshot of the datasets available in the Adobe Experience Platform (AEP) catalog and enrich this snapshot with additional dimensions such as the number of rows, dataset size, and whether the dataset is managed by the customer or the system. While some of this information is available through the AEP UI under the Datasets tab, our goal is to extract this data programmatically, store it in a data lake, and leverage Data Distiller for advanced slice-and-dice analysis.
By doing so, we will be able to track the growth of datasets over time, with a particular focus on those marked for the Profile Store and Identity Store. This use case falls under the category of metadata analytics, involving potentially hundreds or thousands of datasets. Although AEP exposes some of this metadata via system datasets, our requirements demand a customized view of the base dataset. In such scenarios, a Python-based solution to extract, format, and ingest the data into the platform becomes a highly effective approach.
REST APIs in the Platform
A fundamental aspect of Adobe Experience Platform's architecture is its use of modular services that interact through REST APIs to perform tasks. This service-oriented approach forms the backbone of the platform. So, if you're unable to find specific data or information in the UI or product documentation, the REST APIs should be your go-to resource. In essence, every UI feature and function is powered by underlying API calls.
My approach to working with APIs begins by creating a Developer Project, obtaining the necessary credentials and IMS organization details. I then use Python to extract, process, and format the data. Finally, I leverage Python's Postgres SQL connector to write the data back into Adobe Experience Platform. I could use Postman as well, but having spent a significant amount of time working at MathWorks, I have a deep appreciation for both MATLAB and Python. This is why I tend to favor Python for my workflow—it’s just a personal preference!
Python vs Postman: Choosing the Right Tool for REST API Interactions
Both Python and Postman offer powerful ways to interact with REST APIs, but they serve different purposes depending on the context. Postman is ideal for quick testing, debugging, and prototyping of API calls. It provides an intuitive user interface for crafting requests, viewing responses, and managing collections of API calls without needing to write code. This makes it particularly useful for developers during the early stages of API development or testing, as well as for non-developers who need to work with APIs. However, Python excels when you need to integrate API calls into a larger workflow, automate tasks, or manipulate the response data programmatically. With libraries like requests, Python enables greater flexibility and customization, allowing for complex logic, error handling, and scheduled tasks. The trade-off is that Python requires more setup and knowledge of coding, making it more suitable for repeatable, automated processes rather than ad-hoc testing. In summary, Postman shines in quick, manual interactions with APIs, while Python is the go-to for automation and advanced API workflows.
Prerequisites
Before You Create a Project
Reach out to an Admin Console administrator within your organization and request to be added as a developer to an Experience Platform product profile through the Admin Console. Also make sure that and a user for a product profile. Without this permission, you will not be able to create a project or move any further with the APIs.
Once you are done creating a project, you will be generating the following that you will use to gain access:
{ACCESS_TOKEN}: This is a short-lived token used to authenticate API requests. It is obtained through an OAuth flow or Adobe's IMS (Identity Management System) and is required for authorization to interact with AEP services. However, the {ACCESS_TOKEN} must be refreshed every 24 hours, as it is a short-lived token. This means that each day you need to request a new access token using your API credentials to continue making authenticated API requests to Adobe Experience Platform.
{API_KEY}: Also known as the {CLIENT ID}, this is a unique identifier assigned to your application when you create a Developer Project in Adobe Developer Console. It is used to authenticate and identify the application making the API requests.
{CLIENT SECRET}: The Client Secret is used along with the Client ID to authenticate your application when requesting an access token. It acts as a password for your app and should be kept secure. When using OAuth Server-to-Server authentication, you need both the Client ID and Client Secret to securely generate the access token.
{SCOPES}define the permissions your application is requesting. They specify which Adobe services or APIs your access token can interact with. Without defining the appropriate scopes, your token may not have the necessary access to certain resources in the platform.
The access token is generated after successful authentication using the Client ID, Client Secret, and Scopes. It is then used in each API request to authorize the interaction.
{ORG_ID}: The Organization ID (IMS Organization ID) uniquely identifies your Adobe Experience Platform organization. This ID is required in API requests to specify which organization’s data you are interacting with
{SANDBOX_NAME}: Some APIs may require you to have the sandbox name identify the specific sandbox environment you are working in within Adobe Experience Platform. Sandboxes allow you to segment and manage different environments (such as development, staging, and production) to safely test, develop, and deploy features without affecting live data.
TECHNICAL_ACCOUNT_ID belongs to a machine account (not a human user), it is used when your application needs to perform automated tasks, such as fetching data or executing processes in Adobe Experience Platform, without any user interaction.
Stuff I Wish They Told Me
What is an access token really?
It grants the bearer permission to interact with the AEP services for a limited time (typically 24 hours). The token includes encoded information about the user or application, the permissions granted, and its expiration time.
In simple terms, the access token serves as a "proof of identity" that you or your application can present when calling AEP APIs, ensuring that only authorized users can perform actions or retrieve data from the platform.
Why do I need a API key if I have the access token?
When we are making API calls from Python (or any other client), both the API key and the access token play different roles:
API Key Tracking
When you make API calls, the API key (also known as the Client ID) is sent along with your requests. This helps Adobe track which application is making the requests. The API key is typically passed in the headers or query parameters of each request. It allows Adobe to monitor usage, enforce rate limits, and tie your requests to the specific developer project associated with your API key.
Access Token for Every API Request:
The access token is used in every single API request to authenticate and authorize your actions. It's sent in the headers of your API calls, typically in the form of a "Bearer" token (a kind of token used in HTTP authentication). Without a valid access token, your API calls will be rejected because the platform needs to confirm your identity and the scope of permissions granted to you.
Wait, where are these API calls going in the Adobe system?
When you make API requests to Adobe Experience Platform (AEP) or even other Adobe services, those requests are routed through Adobe I/O, Adobe’s central API gateway.
Adobe I/O is not a traditional web server; it functions as an API gateway and integration platform, providing a centralized access point. It works with Adobe’s Identity Management System (IMS) to validate your API keys and access tokens, ensuring you have the proper permissions to access the service. Once authenticated, Adobe I/O directs your requests to the correct Adobe service, such as AEP or Analytics.
Additionally, Adobe I/O manages traffic by enforcing rate limits to ensure fair usage and protect Adobe’s infrastructure.
What is this OAuth Server-to-Server?
OAuth is a server-to-server security protocol that lets you grant limited access to your data or services to an app (the Python client we will use) without giving away your personal login details. Instead of sharing your password, you authorize the app to get an access token, which the app (our Python client) then uses to make secure API requests. OAuth generates access tokens that are short-lived and easily revoked, reducing the risk if a token is compromised.
You will be greeted on a a screen that looks like this. Click on Create Project
The new Project screen will look like this:
Add API and choose Adobe Experience Platform and Experience Platform API
Click on Next and you will see the following screen. Choose OAuth Server-to-Server
Now you need to choose the product profiles. Product profile lets you define what products you have access to. The product profile is typically created by the administrator at https://adminconsole.adobe.com/.
Previously, permissions for role-based access control on AEP objects, such as dataset access, were managed within the product profile. However, this has now changed with Permissions being introduced as a separate feature in the left navigation panel of the AEP UI. It's important to confirm with your admin that you've selected a product profile that grants you developer access to Adobe Experience Platform.
Click Save the configured API and you will see the following. There will be a API Key that you can copy. You will also get an option to Generate access token.
You can also the edit the project if you like to change the name or description
Also, if you clicked on OAuth Server-to-Server, you will be able to see new information such as CLIENT ID, CLIENT SECRET and SCOPES. Copy them. You will use this information to generate the access token every 24 hours to connect to the Platform APIs. Alternatively, you can log into this project everyday and generate the access token. Make sure you look into the SCOPES to make sure you have the right SCOPE for the service that you are querying into.
The API requests will be sent to an endpoint. To access that endpoint, click on the View cURL command.
If you scroll down, you will get additional details on ORG_ID and TECHNICAL_ACCOUNT_ID which you should copy. Note the Credential name. You will need this later in the Permissions UI.
11. Go to the AEP UI or have the admin add a role with the right permissions to your Technical Account ID. Click on Permissions->Roles->Create Role
Name the Role.
Add the All permission in Data Management and make sure you choose the right Sandboxes. Also add all permissions for Sandbox Administration, Data Ingestion and Query Service
Scroll to API Credentials. The credential name is the same one that we saw in the project. Click on the credential name from your project and go to Roles and add the role you created.
Get the Access Token
Let us write some Python code to do so. Copy paste the following in JupyterLab and make sure you have all the parameters from the previous section.
!pip install requests
import requests
# Replace with your Adobe credentials
client_id = 'your_client_id'
client_secret = 'your_client_secret'
org_id = 'your_org_id'
tech_account_id = 'your_tech_account_id'
scope = 'scope'
auth_endpoint = 'https://ims-na1.adobelogin.com/ims/token/v2'
# Prepare the data for the access token request
data = {
'grant_type': 'client_credentials',
'client_id': client_id,
'client_secret': client_secret,
'scope': scope # Specify the scope relevant to your API usage
}
# Request the access token from Adobe IMS
response = requests.post(auth_endpoint, data=data)
if response.status_code == 200:
access_token = response.json()['access_token']
print(f"Access Token: {access_token}")
else:
print(f"Failed to obtain access token. Status Code: {response.status_code}, Error: {response.text}")
You should see a response that looks like this:
Retrieve Datasets Information Across All Sandboxes
We will use the Sandbox Management APIs to retrieve the list of sandboxes. Then we will loop through these sandboxes and make calls to the Catalog API to get the dataset information.
In a separate cell, copy paste the following and make sure the cell containing the variables for the headers are executed from the previous section
import json
import requests
from datetime import datetime
# Set headers for API requests
headers = {
"Authorization": f"Bearer {access_token}",
"x-api-key": client_id,
"x-gw-ims-org-id": org_id,
"Content-Type": "application/json"
}
# List to store all datasets across sandboxes
all_datasets = {}
# Initial sandbox endpoint
sandbox_endpoint = 'https://platform.adobe.io/data/foundation/sandbox-management/sandboxes'
url = sandbox_endpoint
# Iterate over pages of sandboxes
while url:
sandbox_response = requests.get(url, headers=headers)
# Check if the sandboxes were retrieved successfully
if sandbox_response.status_code == 200:
sandbox_data = sandbox_response.json()
sandboxes = sandbox_data.get('sandboxes', [])
# Iterate over each sandbox on the current page
for sandbox in sandboxes:
sandbox_id = sandbox.get('id')
sandbox_name = sandbox.get('name')
print(f"Processing sandbox: {sandbox_name}...")
datasets = []
dataset_count = 0 # Initialize the dataset count
start = 0
limit = 50
dataset_url = 'https://platform.adobe.io/data/foundation/catalog/datasets'
# Set headers specific to the current sandbox
headers.update({
'x-sandbox-id': sandbox_id,
'x-sandbox-name': sandbox_name
})
# Pagination for datasets in each sandbox
while True:
params = {
'start': start,
'limit': limit
}
# Get the dataset info for the current sandbox with pagination
dataset_response = requests.get(dataset_url, headers=headers, params=params)
if dataset_response.status_code == 200:
data = dataset_response.json()
if not data: # If the returned JSON is empty, we've reached the end
break
# Append the entire dataset info from this page
for dataset_id, dataset_info in data.items():
dataset_info_entry = dataset_info # Store the entire dataset information
dataset_info_entry['dataset_id'] = dataset_id # Add dataset ID explicitly
datasets.append(dataset_info_entry)
dataset_count += 1 # Increment the dataset count
# Update start for the next page of datasets
start += limit
else:
print(f"Failed to retrieve datasets for sandbox {sandbox_name}. Status Code: {dataset_response.status_code}, Response: {dataset_response.text}")
break
# Save the entire dataset info for the current sandbox
all_datasets[sandbox_name] = datasets
# Print the dataset count for the current sandbox
print(f"Sandbox: {sandbox_name}, Dataset Count: {dataset_count}")
# If the sandbox is "testingqs-for-computed-attributes", list all datasets with name, ID, and managed_by
if sandbox_name == "testingqs-for-computed-attributes":
print(f"Listing datasets for sandbox: {sandbox_name}")
for dataset in datasets:
print(f"Dataset Name: {dataset.get('name', 'UNKNOWN')}, Dataset ID: {dataset.get('dataset_id', 'UNKNOWN')}, Managed by: {dataset.get('classification', {}).get('managedBy', 'UNKNOWN')}")
# Check if there is a next page for sandboxes
url = sandbox_data.get('_links', {}).get('next', {}).get('href')
if not url:
break
else:
print(f"Failed to retrieve sandboxes. Status Code: {sandbox_response.status_code}, Response: {sandbox_response.text}")
url = None
# Save the retrieved datasets (full info) to a file called 'datasets.json'
with open('datasets.json', 'w') as outfile:
json.dump(all_datasets, outfile, indent=4)
print("Full datasets information saved to 'datasets.json'.")
You will see output in your Python environment that looks like this:
Open up datasets.json in a notepad-like application and you should see something similar to this. Use the JSON file to get a sense of the various fields and their values. We do not need all of these values.
The most important part of the above code is the pagination code for getting the sandboxes and the datasets (in sets of 50). If you miss the pagination code, your answers will be wrong. It also helps to print the datasets in a sandbox to compare your results.
Data Processing of JSON into a Flat File
Copy paste and execute the following code:
import json
import pandas as pd
# Load the JSON file (replace with your correct file path)
file_path = 'datasets.json'
with open(file_path) as json_file:
data = json.load(json_file)
extracted_rows = []
# Loop through each sandbox, which is the top-level key
for sandbox_name, sandbox_datasets in data.items():
# Loop through each dataset within the current sandbox (sandbox_datasets is a list)
for dataset_info in sandbox_datasets:
# Clean up values in unifiedProfile and unifiedIdentity
unifiedProfile = dataset_info.get('tags', {}).get('unifiedProfile', [None])[0]
unifiedProfile_enabledAt = dataset_info.get('tags', {}).get('unifiedProfile', [None, None])[1]
unifiedIdentity = dataset_info.get('tags', {}).get('unifiedIdentity', [None])[0]
# If the value contains "enabled", strip it, otherwise use None
unifiedProfile_clean = unifiedProfile.replace("enabled:", "") if unifiedProfile else None
unifiedProfile_enabledAt_clean = unifiedProfile_enabledAt.replace("enabledAt:", "") if unifiedProfile_enabledAt and "enabledAt" in unifiedProfile_enabledAt else None
unifiedIdentity_clean = unifiedIdentity.replace("enabled:", "") if unifiedIdentity else None
# Get sandbox name and ID, assuming sandboxId exists in the dataset info
sandbox_id = dataset_info.get('sandboxId', None)
# Append the cleaned row to extracted_rows
row = {
'sandbox_name': sandbox_name, # Added Sandbox Name
'dataset_id': dataset_info.get('dataset_id', None), # Added Dataset ID
'dataset_name': dataset_info.get('name', None),
'dataset_ownership': dataset_info.get('classification', {}).get('managedBy', None),
'dataset_type': dataset_info.get('classification', {}).get('dataBehavior', None),
'imsOrg_id': dataset_info.get('imsOrg', None),
'sandbox_id': sandbox_id,
'profile_enabled': unifiedProfile_clean,
'date_enabled_profile': unifiedProfile_enabledAt_clean,
'identity_enabled': unifiedIdentity_clean
}
extracted_rows.append(row)
# Convert the extracted rows into a DataFrame
extracted_df = pd.DataFrame(extracted_rows)
# Display the DataFrame (print the first few rows)
print(extracted_df.head())
# Save the DataFrame to a CSV file
extracted_df.to_csv('cleaned_extracted_data.csv', index=False)
The result should look something similar to like this in the editor:
Tip: If you try to ingest this CSV file, you will need to use a source schema that has no spaces in its column names. If you use the manual CSV upload workflow, you will need to reformat the columns names to exclude the spaces. But this is what real life is - dirty column names. That is the reason why we are being extra cautious in how we do our naming.
Get Row and Count Statistics on the Datasets
1, To generate the statistics of the dataset, you can use the Statistics endpoint:
import json
import pandas as pd
import requests
from datetime import datetime
# Get the current timestamp (will be the same for all rows)
current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Load the JSON file (replace with your correct file path)
file_path = 'datasets.json' # Update this to the correct path
with open(file_path) as json_file:
data = json.load(json_file)
extracted_rows = []
# Loop through each sandbox, which is the top-level key
for sandbox_name, sandbox_datasets in data.items():
# Loop through each dataset within the current sandbox (assuming sandbox_datasets is a list)
for dataset_info in sandbox_datasets:
# Clean up values in unifiedProfile and unifiedIdentity
unifiedProfile = dataset_info.get('tags', {}).get('unifiedProfile', [None])[0]
unifiedProfile_enabledAt = dataset_info.get('tags', {}).get('unifiedProfile', [None, None])[1]
unifiedIdentity = dataset_info.get('tags', {}).get('unifiedIdentity', [None])[0]
# If the value contains "enabled", strip it, otherwise use None
unifiedProfile_clean = unifiedProfile.replace("enabled:", "") if unifiedProfile else None
unifiedProfile_enabledAt_clean = unifiedProfile_enabledAt.replace("enabledAt:", "") if unifiedProfile_enabledAt and "enabledAt" in unifiedProfile_enabledAt else None
unifiedIdentity_clean = unifiedIdentity.replace("enabled:", "") if unifiedIdentity else None
# Get dataset ID and sandbox ID
dataset_id = dataset_info.get('dataset_id', None)
sandbox_id = dataset_info.get('sandbox_id', None)
# Statistics API call logic (replacing function)
stats_endpoint = f"https://platform.adobe.io/data/foundation/statistics/statistics?statisticType=table&dataSet={dataset_id}"
headers = {
"Authorization": f"Bearer {access_token}",
"x-api-key": "acp_foundation_statistics",
"x-gw-ims-org-id": org_id,
"x-sandbox-id": sandbox_id,
"x-sandbox-name": sandbox_name
}
# Initialize statistics
rows, size = 0, 0
# Make the request for statistics
response = requests.get(stats_endpoint, headers=headers)
if response.status_code == 200:
stats_data = response.json()
# Iterate over statistics to find row_count and total_size
for stat in stats_data.get("statistics", []):
if stat["name"] == "row_count" and stat["value"] != "0":
rows = stat["value"]
elif stat["name"] == "total_size" and stat["value"] != "0":
size = stat["value"]
# Append the cleaned row to extracted_rows, using the updated format
row = {
'timestamp': current_timestamp, # Added Timestamp
'sandbox_name': sandbox_name, # Added Sandbox Name
'dataset_id': dataset_id, # Added Dataset ID
'dataset_name': dataset_info.get('name', None),
'dataset_ownership': dataset_info.get('classification', {}).get('managedBy', None),
'dataset_type': dataset_info.get('classification', {}).get('dataBehavior', None),
'imsOrg_id': dataset_info.get('imsOrg', None),
'sandbox_id': sandbox_id,
'profile_enabled': unifiedProfile_clean,
'date_enabled_profile': unifiedProfile_enabledAt_clean,
'identity_enabled': unifiedIdentity_clean,
'row_count': rows, # Added row count
'total_size': size # Added total size
}
extracted_rows.append(row)
# Convert the extracted rows into a DataFrame
extracted_df = pd.DataFrame(extracted_rows)
# Display the DataFrame (print the first few rows)
print(extracted_df.head())
# Save the DataFrame to a CSV file
extracted_df.to_csv('cleaned_extracted_data_with_statistics.csv', index=False)
print("Data saved to 'cleaned_extracted_data_with_statistics.csv'.")
We will now upload this CSV into the Data Landing Zone with the expectation that a schema and a data flow was created as per the following prerequisite tutorial.
You need to extract the SAS URI from the AEP UI. Go to Sources->Catalog->Data Landing Zone. Click on the card gently and a right panel will appear. Scroll down to get the SAS URI. Note that the SAS URI already contains the SAS Token
Just copy paste the SAS URI in the following code for the variable full_sas_url variable. Also observe the csv_file_path variable and how it is being injected into the SAS URI in the variable sas_url_with_file
import requests
# Path to the CSV file you want to upload
csv_file_path = 'cleaned_extracted_data_with_statistics.csv' # Replace with your file path
# The full SAS URL with both URI and Token (you already have this)
full_sas_url = 'your_SAS_URI'
# Split the URL into the base URI (before the '?') and the SAS token (starting from '?')
sas_uri, sas_token = full_sas_url.split('?')
# Inject the file name into the SAS URI
file_name = 'cleaned_extracted_data_with_statistics.csv' # The file name you are uploading
sas_url_with_file = f'{sas_uri}/{file_name}?{sas_token}' # Recombine URI, file name, and SAS token
# Open the CSV file in binary mode
with open(csv_file_path, 'rb') as file_data:
# Set the required headers
headers = {
'x-ms-blob-type': 'BlockBlob', # Required for Azure Blob Storage uploads
}
# Make a PUT request to upload the file using the constructed SAS URL
response = requests.put(sas_url_with_file, headers=headers, data=file_data)
# Check if the upload was successful
if response.status_code == 201:
print(f"File '{csv_file_path}' uploaded successfully.")
else:
print(f"Failed to upload file. Status Code: {response.status_code}, Response: {response.text}")
If you click into the Data Landing Zone i.e. Sources->Catalog->Data Landing Zone->Add data, you will see:
Tip: The great thing about this approach is that if we keep running this Python notebook on schedule, then the files dropped into Data Landing Zone will be picked up by the dataflow runs.
Analysis in Python
Number of Datasets by Sandbox and Ownership
Execute thee following piece of code
import pandas as pd
import matplotlib.pyplot as plt
# Load the dataset
file_path = 'cleaned_extracted_data_with_statistics.csv'
df = pd.read_csv(file_path)
# Group the data by sandbox_name and dataset_ownership, counting the number of datasets
grouped_df = df.groupby(['sandbox_name', 'dataset_ownership']).size().unstack(fill_value=0)
# Plotting the stacked bar chart
ax = grouped_df.plot(kind='bar', stacked=True, figsize=(10, 6))
# Set the labels and title
ax.set_title('Number of Datasets by Sandbox and Ownership')
ax.set_xlabel('Sandbox Name')
ax.set_ylabel('Number of Datasets')
# Rotate x-axis labels for better readability
plt.xticks(rotation=45)
# Display the plot
plt.tight_layout()
plt.show()
The result is:
Total Volume Used in GB Across All Sandboxes Split By Ownership
Copy paste and execute the following piece of code
import pandas as pd
import matplotlib.pyplot as plt
# Load the dataset
file_path = 'cleaned_extracted_data_with_statistics.csv'
df = pd.read_csv(file_path)
# Convert the 'total_size' column to numeric (in case there are any non-numeric entries)
df['total_size'] = pd.to_numeric(df['total_size'], errors='coerce')
# Drop any rows with missing sizes
df_clean = df.dropna(subset=['total_size'])
# Convert bytes to gigabytes (1 GB = 1e9 bytes)
df_clean['total_size_gb'] = df_clean['total_size'] / 1e9
# Group by dataset_ownership (assuming it contains 'customer' vs 'system') and sum the total_size_gb for each group
ownership_grouped = df_clean.groupby('dataset_ownership')['total_size_gb'].sum()
# Calculate the percentage of the total size for each group
total_size = ownership_grouped.sum()
ownership_percent = (ownership_grouped / total_size) * 100
# Print the total volume in GB and the percentage
for ownership, size_gb in ownership_grouped.items():
percentage = ownership_percent[ownership]
print(f"Ownership: {ownership}, Size: {size_gb:.2f} GB, Percentage: {percentage:.2f}%")
# Plotting the pie chart
plt.figure(figsize=(8, 8))
plt.pie(ownership_grouped, labels=ownership_grouped.index, autopct='%1.1f%%', startangle=140, colors=plt.cm.Paired.colors)
# Set the title
plt.title('Total Volume Used by Customer vs System in GB')
# Display the pie chart
plt.tight_layout()
plt.show()
The results will be:
Top 10 Datasets by Size in GB
Remember the size is in bytes and needs to be converted to GB or TB. Copy paste the following piece of code and execute:
import pandas as pd
import matplotlib.pyplot as plt
# Load the dataset
file_path = 'cleaned_extracted_data_with_statistics.csv'
df = pd.read_csv(file_path)
# Convert the 'total_size' column to numeric (in case there are any non-numeric entries)
df['total_size'] = pd.to_numeric(df['total_size'], errors='coerce')
# Drop any rows with missing sizes
df_clean = df.dropna(subset=['total_size'])
# Convert bytes to gigabytes (1 GB = 1e9 bytes)
df_clean['total_size_gb'] = df_clean['total_size'] / 1e9
# Sort the DataFrame by 'total_size_gb' in descending order
top_datasets_by_size = df_clean.sort_values(by='total_size_gb', ascending=False)
# Select the top 10 datasets by size
top_10_datasets = top_datasets_by_size.head(10)
# Plotting the top 10 datasets by size in GB
plt.figure(figsize=(10, 6))
plt.barh(top_10_datasets['dataset_name'], top_10_datasets['total_size_gb'], color='skyblue')
plt.xlabel('Total Size (GB)')
plt.ylabel('Dataset Name')
plt.title('Top 10 Datasets by Size (in GB)')
# Display the plot
plt.tight_layout()
plt.gca().invert_yaxis() # To display the largest dataset on top
plt.show()
The result will be:
Retrieve a List of Dataset Sizes and Rows by Sandbox
Copy paste and execute the following piece of code
import pandas as pd
# Load the dataset
file_path = 'cleaned_extracted_data_with_statistics.csv'
df = pd.read_csv(file_path)
# Filter for the "for-anksharm" sandbox
filtered_df = df[df['sandbox_name'] == 'for-anksharm']
# Convert the 'total_size' and 'row_count' columns to numeric (in case there are any non-numeric entries)
filtered_df['total_size'] = pd.to_numeric(filtered_df['total_size'], errors='coerce')
filtered_df['row_count'] = pd.to_numeric(filtered_df['row_count'], errors='coerce')
# Drop any rows with missing sizes or row counts
filtered_df_clean = filtered_df.dropna(subset=['total_size', 'row_count'])
# Convert total_size from bytes to gigabytes
filtered_df_clean['total_size_gb'] = filtered_df_clean['total_size'] / 1e9
# Group by dataset name and sum the total size (in GB) and row counts
sandbox_grouped = filtered_df_clean.groupby('dataset_name')[['total_size_gb', 'row_count']].sum()
# Display the grouped result
print(sandbox_grouped)
# Optionally, save the result to a CSV file
sandbox_grouped.to_csv('for_anksharm_dataset_sizes_and_rows_gb.csv')
Average Record Richness by Sandbox (bytes per record)
Copy paste and execute the following piece of code:
import pandas as pd
# Load the dataset
file_path = 'cleaned_extracted_data_with_statistics.csv'
df = pd.read_csv(file_path)
# Convert the 'total_size' and 'row_count' columns to numeric (in case there are any non-numeric entries)
df['total_size'] = pd.to_numeric(df['total_size'], errors='coerce')
df['row_count'] = pd.to_numeric(df['row_count'], errors='coerce')
# Drop any rows with missing sizes or row counts
df_clean = df.dropna(subset=['total_size', 'row_count'])
# Group by sandbox_name and sum the total_size and row_count
sandbox_grouped = df_clean.groupby('sandbox_name')[['total_size', 'row_count']].sum()
# Calculate the average record richness (bytes per record)
sandbox_grouped['avg_record_richness'] = sandbox_grouped['total_size'] / sandbox_grouped['row_count']
# Display the grouped result with average record richness
print(sandbox_grouped[['total_size', 'row_count', 'avg_record_richness']])
# Optionally, save the result to a CSV file
sandbox_grouped.to_csv('sandbox_avg_record_richness.csv')
import pandas as pd
import matplotlib.pyplot as plt
# Load the dataset
file_path = 'cleaned_extracted_data_with_statistics.csv'
df = pd.read_csv(file_path)
# Convert the 'total_size' column to numeric (in case there are any non-numeric entries)
df['total_size'] = pd.to_numeric(df['total_size'], errors='coerce')
# Drop any rows with missing sizes
df_clean = df.dropna(subset=['total_size'])
# Convert total_size from bytes to gigabytes (1 GB = 1e9 bytes)
df_clean['total_size_gb'] = df_clean['total_size'] / 1e9
# Plot the histogram of dataset sizes in GB
plt.figure(figsize=(10, 6))
plt.hist(df_clean['total_size_gb'], bins=20, color='skyblue', edgecolor='black')
# Set the labels and title
plt.title('Histogram of Dataset Sizes (in GB) Across All Sandboxes')
plt.xlabel('Dataset Size (GB)')
plt.ylabel('Frequency')
# Display the plot
plt.tight_layout()
plt.show()
The result will look like:
Profile-Enabled Datasets (GB) By Sandbox
Copy paste and execute the following code:
import pandas as pd
import matplotlib.pyplot as plt
# Load the dataset
file_path = 'cleaned_extracted_data_with_statistics.csv'
df = pd.read_csv(file_path)
# Filter for the "prod" sandbox and datasets that are profile enabled
df_filtered = df[(df['sandbox_name'] == 'prod') & (df['profile_enabled'].notnull())]
# Convert the 'total_size' column to numeric (in case there are any non-numeric entries)
df_filtered['total_size'] = pd.to_numeric(df_filtered['total_size'], errors='coerce')
# Drop any rows with missing sizes
df_filtered = df_filtered.dropna(subset=['total_size'])
# Convert the total size from bytes to gigabytes (1 GB = 1e9 bytes)
df_filtered['total_size_gb'] = df_filtered['total_size'] / 1e9
# Sort the DataFrame by total size in descending order
df_filtered = df_filtered.sort_values(by='total_size_gb', ascending=False)
# Define colors based on the dataset type (assuming 'dataset_type' column exists with values like 'record' or 'event')
colors = df_filtered['dataset_type'].map({'record': 'blue', 'event': 'green'}).fillna('gray')
# Plot the bar chart for profile-enabled datasets in the prod sandbox
plt.figure(figsize=(10, 6))
plt.barh(df_filtered['dataset_name'], df_filtered['total_size_gb'], color=colors)
# Set the title and labels
plt.title('Profile-Enabled Datasets in Prod Sandbox (Sorted by Size in GB)', fontsize=14)
plt.xlabel('Size in GB')
plt.ylabel('Dataset Name')
# Invert y-axis to have the largest dataset on top
plt.gca().invert_yaxis()
# Add a legend to indicate which color represents record vs event datasets
from matplotlib.lines import Line2D
legend_elements = [Line2D([0], [0], color='blue', lw=4, label='Record'),
Line2D([0], [0], color='green', lw=4, label='Event')]
plt.legend(handles=legend_elements, title="Dataset Type")
# Display the chart
plt.tight_layout()
plt.show()
The result will look like:
Note: The dataset sizes reported here reflect the sizes in the Data Lake, where data is stored in the Parquet file format. Parquet provides significant compression, typically ranging from 5-10x compared to JSON data. However, the Profile Store uses a different file format and compression mechanism, so the volume comparison between Data Lake and Profile Store is not 1:1. Instead, these sizes should be viewed in terms of relative proportions, rather than exact volumes.
User Contributions & Improvements
David Teko Kangni
Many thanks toDavid Teko Kangni modularized the code and also fixed the warnings which I had intentionally not fixed:
SettingWithCopyWarning:
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead for the code
You will first need to generate a token very 24 hours by using the Client ID, Secret and Scopes.
Subsequent calls will use the access token and other parameters along with the API call to get the results. It does not require the technical account or the client secret anymore.
New Projext screen
Choose the APIs
Choose your authentication protocol
Choosing a product profile
Generate the API key and the access token
Change the name or description if you need to.
On the project screen, you will get extra details on client ID, client secret and scopes.
Getting the endpoint where we will send the API requests to.
More details available on scroll.
Click on Create Role
Name the role just to track it in the system
Add all the permissions and the sandboxes
Add a role
Choose the role you created
JupyterLab output
JSON file opened in Visual Studio Code
SAS URI and SAS Token from the AEP UI
CSV file has been uploaded into Data Landing Zone
Number of datasets broken down by sandbox and ownership
Data volume split by ownership
The chart reveals a long tail indicating fragmentation
Histogram is showing typical distribution for a demo environment