STATS 200: Unlock Dataset Metadata Insights via Adobe Experience Platform APIs and Python
This chapter covers the essential steps for installing necessary libraries, generating access tokens, and making authenticated API requests.
Use Case Overview
Our objective is to create a snapshot of the datasets available in the Adobe Experience Platform (AEP) catalog and enrich this snapshot with additional dimensions such as the number of rows, dataset size, and whether the dataset is managed by the customer or the system. While some of this information is available through the AEP UI under the Datasets tab, our goal is to extract this data programmatically, store it in a data lake, and leverage Data Distiller for advanced slice-and-dice analysis.
By doing so, we will be able to track the growth of datasets over time, with a particular focus on those marked for the Profile Store and Identity Store. This use case falls under the category of metadata analytics, involving potentially hundreds or thousands of datasets. Although AEP exposes some of this metadata via system datasets, our requirements demand a customized view of the base dataset. In such scenarios, a Python-based solution to extract, format, and ingest the data into the platform becomes a highly effective approach.
REST APIs in the Platform
A fundamental aspect of Adobe Experience Platform's architecture is its use of modular services that interact through REST APIs to perform tasks. This service-oriented approach forms the backbone of the platform. So, if you're unable to find specific data or information in the UI or product documentation, the REST APIs should be your go-to resource. In essence, every UI feature and function is powered by underlying API calls.
My approach to working with APIs begins by creating a Developer Project, obtaining the necessary credentials and IMS organization details. I then use Python to extract, process, and format the data. Finally, I leverage Python's Postgres SQL connector to write the data back into Adobe Experience Platform. I could use Postman as well, but having spent a significant amount of time working at MathWorks, I have a deep appreciation for both MATLAB and Python. This is why I tend to favor Python for my workflow—it’s just a personal preference!
Python vs Postman: Choosing the Right Tool for REST API Interactions
Both Python and Postman offer powerful ways to interact with REST APIs, but they serve different purposes depending on the context. Postman is ideal for quick testing, debugging, and prototyping of API calls. It provides an intuitive user interface for crafting requests, viewing responses, and managing collections of API calls without needing to write code. This makes it particularly useful for developers during the early stages of API development or testing, as well as for non-developers who need to work with APIs. However, Python excels when you need to integrate API calls into a larger workflow, automate tasks, or manipulate the response data programmatically. With libraries like requests, Python enables greater flexibility and customization, allowing for complex logic, error handling, and scheduled tasks. The trade-off is that Python requires more setup and knowledge of coding, making it more suitable for repeatable, automated processes rather than ad-hoc testing. In summary, Postman shines in quick, manual interactions with APIs, while Python is the go-to for automation and advanced API workflows.
Prerequisites
Before You Create a Project
Reach out to an Admin Console administrator within your organization and request to be added as a developer to an Experience Platform product profile through the Admin Console. Also make sure that and a user for a product profile. Without this permission, you will not be able to create a project or move any further with the APIs.
Once you are done creating a project, you will be generating the following that you will use to gain access:
{ACCESS_TOKEN}: This is a short-lived token used to authenticate API requests. It is obtained through an OAuth flow or Adobe's IMS (Identity Management System) and is required for authorization to interact with AEP services. However, the {ACCESS_TOKEN} must be refreshed every 24 hours, as it is a short-lived token. This means that each day you need to request a new access token using your API credentials to continue making authenticated API requests to Adobe Experience Platform.
{API_KEY}: Also known as the {CLIENT ID}, this is a unique identifier assigned to your application when you create a Developer Project in Adobe Developer Console. It is used to authenticate and identify the application making the API requests.
{CLIENT SECRET}: The Client Secret is used along with the Client ID to authenticate your application when requesting an access token. It acts as a password for your app and should be kept secure. When using OAuth Server-to-Server authentication, you need both the Client ID and Client Secret to securely generate the access token.
{SCOPES}define the permissions your application is requesting. They specify which Adobe services or APIs your access token can interact with. Without defining the appropriate scopes, your token may not have the necessary access to certain resources in the platform.
The access token is generated after successful authentication using the Client ID, Client Secret, and Scopes. It is then used in each API request to authorize the interaction.
{ORG_ID}: The Organization ID (IMS Organization ID) uniquely identifies your Adobe Experience Platform organization. This ID is required in API requests to specify which organization’s data you are interacting with
{SANDBOX_NAME}: Some APIs may require you to have the sandbox name identify the specific sandbox environment you are working in within Adobe Experience Platform. Sandboxes allow you to segment and manage different environments (such as development, staging, and production) to safely test, develop, and deploy features without affecting live data.
TECHNICAL_ACCOUNT_ID belongs to a machine account (not a human user), it is used when your application needs to perform automated tasks, such as fetching data or executing processes in Adobe Experience Platform, without any user interaction.
Stuff I Wish They Told Me
What is an access token really?
It grants the bearer permission to interact with the AEP services for a limited time (typically 24 hours). The token includes encoded information about the user or application, the permissions granted, and its expiration time.
In simple terms, the access token serves as a "proof of identity" that you or your application can present when calling AEP APIs, ensuring that only authorized users can perform actions or retrieve data from the platform.
Why do I need a API key if I have the access token?
When we are making API calls from Python (or any other client), both the API key and the access token play different roles:
API Key Tracking
When you make API calls, the API key (also known as the Client ID) is sent along with your requests. This helps Adobe track which application is making the requests. The API key is typically passed in the headers or query parameters of each request. It allows Adobe to monitor usage, enforce rate limits, and tie your requests to the specific developer project associated with your API key.
Access Token for Every API Request:
The access token is used in every single API request to authenticate and authorize your actions. It's sent in the headers of your API calls, typically in the form of a "Bearer" token (a kind of token used in HTTP authentication). Without a valid access token, your API calls will be rejected because the platform needs to confirm your identity and the scope of permissions granted to you.
Wait, where are these API calls going in the Adobe system?
When you make API requests to Adobe Experience Platform (AEP) or even other Adobe services, those requests are routed through Adobe I/O, Adobe’s central API gateway.
Adobe I/O is not a traditional web server; it functions as an API gateway and integration platform, providing a centralized access point. It works with Adobe’s Identity Management System (IMS) to validate your API keys and access tokens, ensuring you have the proper permissions to access the service. Once authenticated, Adobe I/O directs your requests to the correct Adobe service, such as AEP or Analytics.
Additionally, Adobe I/O manages traffic by enforcing rate limits to ensure fair usage and protect Adobe’s infrastructure.
What is this OAuth Server-to-Server?
OAuth is a server-to-server security protocol that lets you grant limited access to your data or services to an app (the Python client we will use) without giving away your personal login details. Instead of sharing your password, you authorize the app to get an access token, which the app (our Python client) then uses to make secure API requests. OAuth generates access tokens that are short-lived and easily revoked, reducing the risk if a token is compromised.
You will be greeted on a a screen that looks like this. Click on Create Project
The new Project screen will look like this:
Add API and choose Adobe Experience Platform and Experience Platform API
Click on Next and you will see the following screen. Choose OAuth Server-to-Server
Now you need to choose the product profiles. Product profile lets you define what products you have access to. The product profile is typically created by the administrator at https://adminconsole.adobe.com/.
Previously, permissions for role-based access control on AEP objects, such as dataset access, were managed within the product profile. However, this has now changed with Permissions being introduced as a separate feature in the left navigation panel of the AEP UI. It's important to confirm with your admin that you've selected a product profile that grants you developer access to Adobe Experience Platform.
Click Save the configured API and you will see the following. There will be a API Key that you can copy. You will also get an option to Generate access token.
You can also the edit the project if you like to change the name or description
Also, if you clicked on OAuth Server-to-Server, you will be able to see new information such as CLIENT ID, CLIENT SECRET and SCOPES. Copy them. You will use this information to generate the access token every 24 hours to connect to the Platform APIs. Alternatively, you can log into this project everyday and generate the access token. Make sure you look into the SCOPES to make sure you have the right SCOPE for the service that you are querying into.
The API requests will be sent to an endpoint. To access that endpoint, click on the View cURL command.
If you scroll down, you will get additional details on ORG_ID and TECHNICAL_ACCOUNT_ID which you should copy. Note the Credential name. You will need this later in the Permissions UI.
11. Go to the AEP UI or have the admin add a role with the right permissions to your Technical Account ID. Click on Permissions->Roles->Create Role
Name the Role.
Add the All permission in Data Management and make sure you choose the right Sandboxes. Also add all permissions for Sandbox Administration, Data Ingestion and Query Service
Scroll to API Credentials. The credential name is the same one that we saw in the project. Click on the credential name from your project and go to Roles and add the role you created.
Get the Access Token
Let us write some Python code to do so. Copy paste the following in JupyterLab and make sure you have all the parameters from the previous section.
!pip install requestsimport requests# Replace with your Adobe credentialsclient_id ='your_client_id'client_secret ='your_client_secret'org_id ='your_org_id'tech_account_id ='your_tech_account_id'scope ='scope'auth_endpoint ='https://ims-na1.adobelogin.com/ims/token/v2'# Prepare the data for the access token requestdata ={'grant_type':'client_credentials','client_id': client_id,'client_secret': client_secret,'scope': scope # Specify the scope relevant to your API usage}# Request the access token from Adobe IMSresponse = requests.post(auth_endpoint, data=data)if response.status_code ==200: access_token = response.json()['access_token']print(f"Access Token: {access_token}")else:print(f"Failed to obtain access token. Status Code: {response.status_code}, Error: {response.text}")
You should see a response that looks like this:
Retrieve Datasets Information Across All Sandboxes
We will use the Sandbox Management APIs to retrieve the list of sandboxes. Then we will loop through these sandboxes and make calls to the Catalog API to get the dataset information.
In a separate cell, copy paste the following and make sure the cell containing the variables for the headers are executed from the previous section
import jsonimport requestsfrom datetime import datetime# Set headers for API requestsheaders ={"Authorization": f"Bearer {access_token}","x-api-key": client_id,"x-gw-ims-org-id": org_id,"Content-Type":"application/json"}# List to store all datasets across sandboxesall_datasets ={}# Initial sandbox endpointsandbox_endpoint ='https://platform.adobe.io/data/foundation/sandbox-management/sandboxes'url = sandbox_endpoint# Iterate over pages of sandboxeswhile url: sandbox_response = requests.get(url, headers=headers)# Check if the sandboxes were retrieved successfullyif sandbox_response.status_code ==200: sandbox_data = sandbox_response.json() sandboxes = sandbox_data.get('sandboxes', [])# Iterate over each sandbox on the current pagefor sandbox in sandboxes: sandbox_id = sandbox.get('id') sandbox_name = sandbox.get('name')print(f"Processing sandbox: {sandbox_name}...") datasets = [] dataset_count =0# Initialize the dataset count start =0 limit =50 dataset_url ='https://platform.adobe.io/data/foundation/catalog/datasets'# Set headers specific to the current sandbox headers.update({'x-sandbox-id': sandbox_id,'x-sandbox-name': sandbox_name })# Pagination for datasets in each sandboxwhileTrue: params ={'start': start,'limit': limit}# Get the dataset info for the current sandbox with pagination dataset_response = requests.get(dataset_url, headers=headers, params=params)if dataset_response.status_code ==200: data = dataset_response.json()ifnot data:# If the returned JSON is empty, we've reached the endbreak# Append the entire dataset info from this pagefor dataset_id, dataset_info in data.items(): dataset_info_entry = dataset_info # Store the entire dataset information dataset_info_entry['dataset_id']= dataset_id # Add dataset ID explicitly datasets.append(dataset_info_entry) dataset_count +=1# Increment the dataset count# Update start for the next page of datasets start += limitelse: print(f"Failed to retrieve datasets for sandbox {sandbox_name}. Status Code: {dataset_response.status_code}, Response: {dataset_response.text}")
break# Save the entire dataset info for the current sandbox all_datasets[sandbox_name]= datasets# Print the dataset count for the current sandboxprint(f"Sandbox: {sandbox_name}, Dataset Count: {dataset_count}")# If the sandbox is "testingqs-for-computed-attributes", list all datasets with name, ID, and managed_byif sandbox_name =="testingqs-for-computed-attributes":print(f"Listing datasets for sandbox: {sandbox_name}")for dataset in datasets: print(f"Dataset Name: {dataset.get('name', 'UNKNOWN')}, Dataset ID: {dataset.get('dataset_id', 'UNKNOWN')}, Managed by: {dataset.get('classification', {}).get('managedBy', 'UNKNOWN')}")
# Check if there is a next page for sandboxes url = sandbox_data.get('_links', {}).get('next', {}).get('href')ifnot url:breakelse: print(f"Failed to retrieve sandboxes. Status Code: {sandbox_response.status_code}, Response: {sandbox_response.text}")
url =None# Save the retrieved datasets (full info) to a file called 'datasets.json'withopen('datasets.json', 'w')as outfile: json.dump(all_datasets, outfile, indent=4)print("Full datasets information saved to 'datasets.json'.")
You will see output in your Python environment that looks like this:
Open up datasets.json in a notepad-like application and you should see something similar to this. Use the JSON file to get a sense of the various fields and their values. We do not need all of these values.
The most important part of the above code is the pagination code for getting the sandboxes and the datasets (in sets of 50). If you miss the pagination code, your answers will be wrong. It also helps to print the datasets in a sandbox to compare your results.
Data Processing of JSON into a Flat File
Copy paste and execute the following code:
import jsonimport pandas as pd# Load the JSON file (replace with your correct file path)file_path ='datasets.json'withopen(file_path)as json_file: data = json.load(json_file)extracted_rows = []# Loop through each sandbox, which is the top-level keyfor sandbox_name, sandbox_datasets in data.items():# Loop through each dataset within the current sandbox (sandbox_datasets is a list)for dataset_info in sandbox_datasets:# Clean up values in unifiedProfile and unifiedIdentity unifiedProfile = dataset_info.get('tags', {}).get('unifiedProfile', [None])[0] unifiedProfile_enabledAt = dataset_info.get('tags', {}).get('unifiedProfile', [None, None])[1] unifiedIdentity = dataset_info.get('tags', {}).get('unifiedIdentity', [None])[0]# If the value contains "enabled", strip it, otherwise use None unifiedProfile_clean = unifiedProfile.replace("enabled:", "")if unifiedProfile elseNone unifiedProfile_enabledAt_clean = unifiedProfile_enabledAt.replace("enabledAt:", "") if unifiedProfile_enabledAt and "enabledAt" in unifiedProfile_enabledAt else None
unifiedIdentity_clean = unifiedIdentity.replace("enabled:", "")if unifiedIdentity elseNone# Get sandbox name and ID, assuming sandboxId exists in the dataset info sandbox_id = dataset_info.get('sandboxId', None)# Append the cleaned row to extracted_rows row ={'sandbox_name': sandbox_name,# Added Sandbox Name'dataset_id': dataset_info.get('dataset_id', None),# Added Dataset ID'dataset_name': dataset_info.get('name', None),'dataset_ownership': dataset_info.get('classification', {}).get('managedBy', None),'dataset_type': dataset_info.get('classification', {}).get('dataBehavior', None),'imsOrg_id': dataset_info.get('imsOrg', None),'sandbox_id': sandbox_id,'profile_enabled': unifiedProfile_clean,'date_enabled_profile': unifiedProfile_enabledAt_clean,'identity_enabled': unifiedIdentity_clean} extracted_rows.append(row)# Convert the extracted rows into a DataFrameextracted_df = pd.DataFrame(extracted_rows)# Display the DataFrame (print the first few rows)print(extracted_df.head())# Save the DataFrame to a CSV fileextracted_df.to_csv('cleaned_extracted_data.csv', index=False)
The result should look something similar to like this in the editor:
Tip: If you try to ingest this CSV file, you will need to use a source schema that has no spaces in its column names. If you use the manual CSV upload workflow, you will need to reformat the columns names to exclude the spaces. But this is what real life is - dirty column names. That is the reason why we are being extra cautious in how we do our naming.
Get Row and Count Statistics on the Datasets
1, To generate the statistics of the dataset, you can use the Statistics endpoint:
import jsonimport pandas as pdimport requestsfrom datetime import datetime# Get the current timestamp (will be the same for all rows)current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")# Load the JSON file (replace with your correct file path)file_path ='datasets.json'# Update this to the correct pathwithopen(file_path)as json_file: data = json.load(json_file)extracted_rows = []# Loop through each sandbox, which is the top-level keyfor sandbox_name, sandbox_datasets in data.items():# Loop through each dataset within the current sandbox (assuming sandbox_datasets is a list)for dataset_info in sandbox_datasets:# Clean up values in unifiedProfile and unifiedIdentity unifiedProfile = dataset_info.get('tags', {}).get('unifiedProfile', [None])[0] unifiedProfile_enabledAt = dataset_info.get('tags', {}).get('unifiedProfile', [None, None])[1] unifiedIdentity = dataset_info.get('tags', {}).get('unifiedIdentity', [None])[0]# If the value contains "enabled", strip it, otherwise use None unifiedProfile_clean = unifiedProfile.replace("enabled:", "")if unifiedProfile elseNone unifiedProfile_enabledAt_clean = unifiedProfile_enabledAt.replace("enabledAt:", "") if unifiedProfile_enabledAt and "enabledAt" in unifiedProfile_enabledAt else None
unifiedIdentity_clean = unifiedIdentity.replace("enabled:", "")if unifiedIdentity elseNone# Get dataset ID and sandbox ID dataset_id = dataset_info.get('dataset_id', None) sandbox_id = dataset_info.get('sandbox_id', None)# Statistics API call logic (replacing function) stats_endpoint = f"https://platform.adobe.io/data/foundation/statistics/statistics?statisticType=table&dataSet={dataset_id}"
headers ={"Authorization": f"Bearer {access_token}","x-api-key":"acp_foundation_statistics","x-gw-ims-org-id": org_id,"x-sandbox-id": sandbox_id,"x-sandbox-name": sandbox_name}# Initialize statistics rows, size =0,0# Make the request for statistics response = requests.get(stats_endpoint, headers=headers)if response.status_code ==200: stats_data = response.json()# Iterate over statistics to find row_count and total_sizefor stat in stats_data.get("statistics", []):if stat["name"]=="row_count"and stat["value"]!="0": rows = stat["value"]elif stat["name"]=="total_size"and stat["value"]!="0": size = stat["value"]# Append the cleaned row to extracted_rows, using the updated format row ={'timestamp': current_timestamp,# Added Timestamp'sandbox_name': sandbox_name,# Added Sandbox Name'dataset_id': dataset_id,# Added Dataset ID'dataset_name': dataset_info.get('name', None),'dataset_ownership': dataset_info.get('classification', {}).get('managedBy', None),'dataset_type': dataset_info.get('classification', {}).get('dataBehavior', None),'imsOrg_id': dataset_info.get('imsOrg', None),'sandbox_id': sandbox_id,'profile_enabled': unifiedProfile_clean,'date_enabled_profile': unifiedProfile_enabledAt_clean,'identity_enabled': unifiedIdentity_clean,'row_count': rows,# Added row count'total_size': size # Added total size} extracted_rows.append(row)# Convert the extracted rows into a DataFrameextracted_df = pd.DataFrame(extracted_rows)# Display the DataFrame (print the first few rows)print(extracted_df.head())# Save the DataFrame to a CSV fileextracted_df.to_csv('cleaned_extracted_data_with_statistics.csv', index=False)print("Data saved to 'cleaned_extracted_data_with_statistics.csv'.")
We will now upload this CSV into the Data Landing Zone with the expectation that a schema and a data flow was created as per the following prerequisite tutorial.
You need to extract the SAS URI from the AEP UI. Go to Sources->Catalog->Data Landing Zone. Click on the card gently and a right panel will appear. Scroll down to get the SAS URI. Note that the SAS URI already contains the SAS Token
Just copy paste the SAS URI in the following code for the variable full_sas_url variable. Also observe the csv_file_path variable and how it is being injected into the SAS URI in the variable sas_url_with_file
import requests# Path to the CSV file you want to uploadcsv_file_path ='cleaned_extracted_data_with_statistics.csv'# Replace with your file path# The full SAS URL with both URI and Token (you already have this)full_sas_url ='your_SAS_URI'# Split the URL into the base URI (before the '?') and the SAS token (starting from '?')sas_uri, sas_token = full_sas_url.split('?')# Inject the file name into the SAS URIfile_name ='cleaned_extracted_data_with_statistics.csv'# The file name you are uploadingsas_url_with_file = f'{sas_uri}/{file_name}?{sas_token}'# Recombine URI, file name, and SAS token# Open the CSV file in binary modewithopen(csv_file_path, 'rb')as file_data:# Set the required headers headers ={'x-ms-blob-type':'BlockBlob',# Required for Azure Blob Storage uploads}# Make a PUT request to upload the file using the constructed SAS URL response = requests.put(sas_url_with_file, headers=headers, data=file_data)# Check if the upload was successfulif response.status_code ==201:print(f"File '{csv_file_path}' uploaded successfully.")else:print(f"Failed to upload file. Status Code: {response.status_code}, Response: {response.text}")
If you click into the Data Landing Zone i.e. Sources->Catalog->Data Landing Zone->Add data, you will see:
Tip: The great thing about this approach is that if we keep running this Python notebook on schedule, then the files dropped into Data Landing Zone will be picked up by the dataflow runs.
Analysis in Python
Number of Datasets by Sandbox and Ownership
Execute thee following piece of code
import pandas as pdimport matplotlib.pyplot as plt# Load the datasetfile_path ='cleaned_extracted_data_with_statistics.csv'df = pd.read_csv(file_path)# Group the data by sandbox_name and dataset_ownership, counting the number of datasetsgrouped_df = df.groupby(['sandbox_name', 'dataset_ownership']).size().unstack(fill_value=0)# Plotting the stacked bar chartax = grouped_df.plot(kind='bar', stacked=True, figsize=(10, 6))# Set the labels and titleax.set_title('Number of Datasets by Sandbox and Ownership')ax.set_xlabel('Sandbox Name')ax.set_ylabel('Number of Datasets')# Rotate x-axis labels for better readabilityplt.xticks(rotation=45)# Display the plotplt.tight_layout()plt.show()
The result is:
Total Volume Used in GB Across All Sandboxes Split By Ownership
Copy paste and execute the following piece of code
import pandas as pdimport matplotlib.pyplot as plt# Load the datasetfile_path ='cleaned_extracted_data_with_statistics.csv'df = pd.read_csv(file_path)# Convert the 'total_size' column to numeric (in case there are any non-numeric entries)df['total_size']= pd.to_numeric(df['total_size'], errors='coerce')# Drop any rows with missing sizesdf_clean = df.dropna(subset=['total_size'])# Convert bytes to gigabytes (1 GB = 1e9 bytes)df_clean['total_size_gb']= df_clean['total_size']/1e9# Group by dataset_ownership (assuming it contains 'customer' vs 'system') and sum the total_size_gb for each groupownership_grouped = df_clean.groupby('dataset_ownership')['total_size_gb'].sum()# Calculate the percentage of the total size for each grouptotal_size = ownership_grouped.sum()ownership_percent = (ownership_grouped / total_size) *100# Print the total volume in GB and the percentagefor ownership, size_gb in ownership_grouped.items(): percentage = ownership_percent[ownership]print(f"Ownership: {ownership}, Size: {size_gb:.2f} GB, Percentage: {percentage:.2f}%")# Plotting the pie chartplt.figure(figsize=(8, 8))plt.pie(ownership_grouped, labels=ownership_grouped.index, autopct='%1.1f%%', startangle=140, colors=plt.cm.Paired.colors)
# Set the titleplt.title('Total Volume Used by Customer vs System in GB')# Display the pie chartplt.tight_layout()plt.show()
The results will be:
Top 10 Datasets by Size in GB
Remember the size is in bytes and needs to be converted to GB or TB. Copy paste the following piece of code and execute:
import pandas as pdimport matplotlib.pyplot as plt# Load the datasetfile_path ='cleaned_extracted_data_with_statistics.csv'df = pd.read_csv(file_path)# Convert the 'total_size' column to numeric (in case there are any non-numeric entries)df['total_size']= pd.to_numeric(df['total_size'], errors='coerce')# Drop any rows with missing sizesdf_clean = df.dropna(subset=['total_size'])# Convert bytes to gigabytes (1 GB = 1e9 bytes)df_clean['total_size_gb']= df_clean['total_size']/1e9# Sort the DataFrame by 'total_size_gb' in descending ordertop_datasets_by_size = df_clean.sort_values(by='total_size_gb', ascending=False)# Select the top 10 datasets by sizetop_10_datasets = top_datasets_by_size.head(10)# Plotting the top 10 datasets by size in GBplt.figure(figsize=(10, 6))plt.barh(top_10_datasets['dataset_name'], top_10_datasets['total_size_gb'], color='skyblue')plt.xlabel('Total Size (GB)')plt.ylabel('Dataset Name')plt.title('Top 10 Datasets by Size (in GB)')# Display the plotplt.tight_layout()plt.gca().invert_yaxis()# To display the largest dataset on topplt.show()
The result will be:
Retrieve a List of Dataset Sizes and Rows by Sandbox
Copy paste and execute the following piece of code
import pandas as pd# Load the datasetfile_path ='cleaned_extracted_data_with_statistics.csv'df = pd.read_csv(file_path)# Filter for the "for-anksharm" sandboxfiltered_df = df[df['sandbox_name']=='for-anksharm']# Convert the 'total_size' and 'row_count' columns to numeric (in case there are any non-numeric entries)filtered_df['total_size']= pd.to_numeric(filtered_df['total_size'], errors='coerce')filtered_df['row_count']= pd.to_numeric(filtered_df['row_count'], errors='coerce')# Drop any rows with missing sizes or row countsfiltered_df_clean = filtered_df.dropna(subset=['total_size', 'row_count'])# Convert total_size from bytes to gigabytesfiltered_df_clean['total_size_gb']= filtered_df_clean['total_size']/1e9# Group by dataset name and sum the total size (in GB) and row countssandbox_grouped = filtered_df_clean.groupby('dataset_name')[['total_size_gb','row_count']].sum()# Display the grouped resultprint(sandbox_grouped)# Optionally, save the result to a CSV filesandbox_grouped.to_csv('for_anksharm_dataset_sizes_and_rows_gb.csv')
Average Record Richness by Sandbox (bytes per record)
Copy paste and execute the following piece of code:
import pandas as pd# Load the datasetfile_path ='cleaned_extracted_data_with_statistics.csv'df = pd.read_csv(file_path)# Convert the 'total_size' and 'row_count' columns to numeric (in case there are any non-numeric entries)df['total_size']= pd.to_numeric(df['total_size'], errors='coerce')df['row_count']= pd.to_numeric(df['row_count'], errors='coerce')# Drop any rows with missing sizes or row countsdf_clean = df.dropna(subset=['total_size', 'row_count'])# Group by sandbox_name and sum the total_size and row_countsandbox_grouped = df_clean.groupby('sandbox_name')[['total_size','row_count']].sum()# Calculate the average record richness (bytes per record)sandbox_grouped['avg_record_richness']= sandbox_grouped['total_size']/ sandbox_grouped['row_count']# Display the grouped result with average record richnessprint(sandbox_grouped[['total_size', 'row_count', 'avg_record_richness']])# Optionally, save the result to a CSV filesandbox_grouped.to_csv('sandbox_avg_record_richness.csv')
import pandas as pdimport matplotlib.pyplot as plt# Load the datasetfile_path ='cleaned_extracted_data_with_statistics.csv'df = pd.read_csv(file_path)# Convert the 'total_size' column to numeric (in case there are any non-numeric entries)df['total_size']= pd.to_numeric(df['total_size'], errors='coerce')# Drop any rows with missing sizesdf_clean = df.dropna(subset=['total_size'])# Convert total_size from bytes to gigabytes (1 GB = 1e9 bytes)df_clean['total_size_gb']= df_clean['total_size']/1e9# Plot the histogram of dataset sizes in GBplt.figure(figsize=(10, 6))plt.hist(df_clean['total_size_gb'], bins=20, color='skyblue', edgecolor='black')# Set the labels and titleplt.title('Histogram of Dataset Sizes (in GB) Across All Sandboxes')plt.xlabel('Dataset Size (GB)')plt.ylabel('Frequency')# Display the plotplt.tight_layout()plt.show()
The result will look like:
Profile-Enabled Datasets (GB) By Sandbox
Copy paste and execute the following code:
import pandas as pdimport matplotlib.pyplot as plt# Load the datasetfile_path ='cleaned_extracted_data_with_statistics.csv'df = pd.read_csv(file_path)# Filter for the "prod" sandbox and datasets that are profile enableddf_filtered = df[(df['sandbox_name']=='prod') & (df['profile_enabled'].notnull())]# Convert the 'total_size' column to numeric (in case there are any non-numeric entries)df_filtered['total_size']= pd.to_numeric(df_filtered['total_size'], errors='coerce')# Drop any rows with missing sizesdf_filtered = df_filtered.dropna(subset=['total_size'])# Convert the total size from bytes to gigabytes (1 GB = 1e9 bytes)df_filtered['total_size_gb']= df_filtered['total_size']/1e9# Sort the DataFrame by total size in descending orderdf_filtered = df_filtered.sort_values(by='total_size_gb', ascending=False)# Define colors based on the dataset type (assuming 'dataset_type' column exists with values like 'record' or 'event')colors = df_filtered['dataset_type'].map({'record': 'blue', 'event': 'green'}).fillna('gray')# Plot the bar chart for profile-enabled datasets in the prod sandboxplt.figure(figsize=(10, 6))plt.barh(df_filtered['dataset_name'], df_filtered['total_size_gb'], color=colors)# Set the title and labelsplt.title('Profile-Enabled Datasets in Prod Sandbox (Sorted by Size in GB)', fontsize=14)plt.xlabel('Size in GB')plt.ylabel('Dataset Name')# Invert y-axis to have the largest dataset on topplt.gca().invert_yaxis()# Add a legend to indicate which color represents record vs event datasetsfrom matplotlib.lines import Line2Dlegend_elements = [Line2D([0], [0], color='blue', lw=4, label='Record'),Line2D([0], [0], color='green', lw=4, label='Event')]plt.legend(handles=legend_elements, title="Dataset Type")# Display the chartplt.tight_layout()plt.show()
The result will look like:
Note: The dataset sizes reported here reflect the sizes in the Data Lake, where data is stored in the Parquet file format. Parquet provides significant compression, typically ranging from 5-10x compared to JSON data. However, the Profile Store uses a different file format and compression mechanism, so the volume comparison between Data Lake and Profile Store is not 1:1. Instead, these sizes should be viewed in terms of relative proportions, rather than exact volumes.
User Contributions & Improvements
David Teko Kangni
Many thanks toDavid Teko Kangni modularized the code and also fixed the warnings which I had intentionally not fixed:
SettingWithCopyWarning:A value is trying to be set on a copy of a slicefrom a DataFrame.Try using .loc[row_indexer,col_indexer]= value instead for the code