Retrieve a specific dataset by ID with complete details
get()
method allows you to retrieve a specific dataset by its ID, returning complete dataset information including metadata, log count, and timestamps.
client.datasets.get(
dataset_id: str,
**kwargs
) -> Dataset
await client.datasets.aget(
dataset_id: str,
**kwargs
) -> Dataset
Dataset
object with the following structure:
{
"id": "dataset_123456789",
"name": "Customer Support Dataset",
"description": "Collection of customer support conversations for training",
"log_count": 1250,
"created_at": "2024-01-15T10:30:00Z",
"updated_at": "2024-01-20T14:45:00Z",
"metadata": {
"category": "support",
"purpose": "training",
"version": "1.0",
"quality_level": "high",
"language": "english",
"owner": "data_team",
"tags": ["support", "training", "quality"]
}
}
from keywordsai import KeywordsAI
client = KeywordsAI(api_key="your-api-key")
# Get a specific dataset
dataset_id = "dataset_123456789"
dataset = client.datasets.get(dataset_id)
print(f"Dataset: {dataset.name}")
print(f"Description: {dataset.description}")
print(f"Log Count: {dataset.log_count}")
print(f"Created: {dataset.created_at}")
print(f"Last Updated: {dataset.updated_at}")
# Get dataset and access all properties
dataset = client.datasets.get("dataset_123456789")
# Basic information
print(f"ID: {dataset.id}")
print(f"Name: {dataset.name}")
print(f"Description: {dataset.description}")
# Statistics
print(f"\nStatistics:")
print(f" Total logs: {dataset.log_count:,}")
# Timestamps
from datetime import datetime
created = datetime.fromisoformat(dataset.created_at.replace('Z', '+00:00'))
updated = datetime.fromisoformat(dataset.updated_at.replace('Z', '+00:00'))
print(f"\nTimestamps:")
print(f" Created: {created.strftime('%Y-%m-%d %H:%M:%S')}")
print(f" Updated: {updated.strftime('%Y-%m-%d %H:%M:%S')}")
print(f" Age: {(datetime.now(created.tzinfo) - created).days} days")
# Metadata
print(f"\nMetadata:")
for key, value in dataset.metadata.items():
print(f" {key}: {value}")
from keywordsai.exceptions import NotFoundError
def get_dataset_safely(dataset_id):
try:
dataset = client.datasets.get(dataset_id)
return dataset
except NotFoundError:
print(f"Dataset {dataset_id} not found")
return None
except Exception as e:
print(f"Error retrieving dataset: {e}")
return None
# Safe retrieval
dataset = get_dataset_safely("dataset_123456789")
if dataset:
print(f"Found dataset: {dataset.name}")
else:
print("Dataset not available")
import asyncio
async def get_dataset_async(dataset_id):
client = KeywordsAI(api_key="your-api-key")
try:
dataset = await client.datasets.aget(dataset_id)
print(f"Async retrieved: {dataset.name}")
print(f" Logs: {dataset.log_count}")
print(f" Category: {dataset.metadata.get('category', 'N/A')}")
return dataset
except Exception as e:
print(f"Async error: {e}")
return None
# Run async retrieval
dataset = asyncio.run(get_dataset_async("dataset_123456789"))
def get_multiple_datasets(dataset_ids):
datasets = []
failed_ids = []
for dataset_id in dataset_ids:
try:
dataset = client.datasets.get(dataset_id)
datasets.append(dataset)
print(f"✓ Retrieved: {dataset.name}")
except NotFoundError:
failed_ids.append(dataset_id)
print(f"✗ Not found: {dataset_id}")
except Exception as e:
failed_ids.append(dataset_id)
print(f"✗ Error with {dataset_id}: {e}")
print(f"\nSummary:")
print(f" Successfully retrieved: {len(datasets)}")
print(f" Failed: {len(failed_ids)}")
return datasets, failed_ids
# Retrieve multiple datasets
dataset_ids = [
"dataset_123456789",
"dataset_987654321",
"dataset_555666777"
]
datasets, failed = get_multiple_datasets(dataset_ids)
import asyncio
async def get_multiple_datasets_async(dataset_ids):
client = KeywordsAI(api_key="your-api-key")
async def get_single_dataset(dataset_id):
try:
dataset = await client.datasets.aget(dataset_id)
return {"success": True, "dataset": dataset, "id": dataset_id}
except Exception as e:
return {"success": False, "error": str(e), "id": dataset_id}
# Create tasks for all datasets
tasks = [get_single_dataset(dataset_id) for dataset_id in dataset_ids]
# Wait for all tasks to complete
results = await asyncio.gather(*tasks)
# Separate successful and failed retrievals
successful = [r["dataset"] for r in results if r["success"]]
failed = [(r["id"], r["error"]) for r in results if not r["success"]]
print(f"Async batch retrieval completed:")
print(f" Successful: {len(successful)}")
print(f" Failed: {len(failed)}")
for dataset in successful:
print(f" ✓ {dataset.name}: {dataset.log_count} logs")
for dataset_id, error in failed:
print(f" ✗ {dataset_id}: {error}")
return successful, failed
# Run async batch retrieval
dataset_ids = [
"dataset_123456789",
"dataset_987654321",
"dataset_555666777"
]
successful, failed = asyncio.run(get_multiple_datasets_async(dataset_ids))
def summarize_dataset(dataset_id):
try:
dataset = client.datasets.get(dataset_id)
# Calculate age
from datetime import datetime
created = datetime.fromisoformat(dataset.created_at.replace('Z', '+00:00'))
age_days = (datetime.now(created.tzinfo) - created).days
# Determine size category
if dataset.log_count < 100:
size_category = "Small"
elif dataset.log_count < 1000:
size_category = "Medium"
elif dataset.log_count < 10000:
size_category = "Large"
else:
size_category = "Very Large"
# Extract key metadata
category = dataset.metadata.get("category", "Unknown")
purpose = dataset.metadata.get("purpose", "Unknown")
quality = dataset.metadata.get("quality_level", "Unknown")
summary = {
"id": dataset.id,
"name": dataset.name,
"category": category,
"purpose": purpose,
"size_category": size_category,
"log_count": dataset.log_count,
"age_days": age_days,
"quality_level": quality,
"last_updated_days_ago": (datetime.now(created.tzinfo) -
datetime.fromisoformat(dataset.updated_at.replace('Z', '+00:00'))).days
}
print(f"Dataset Summary: {dataset.name}")
print(f" Category: {category} | Purpose: {purpose}")
print(f" Size: {size_category} ({dataset.log_count:,} logs)")
print(f" Age: {age_days} days | Quality: {quality}")
print(f" Last updated: {summary['last_updated_days_ago']} days ago")
return summary
except Exception as e:
print(f"Error summarizing dataset {dataset_id}: {e}")
return None
# Summarize a dataset
summary = summarize_dataset("dataset_123456789")
def validate_dataset(dataset_id):
try:
dataset = client.datasets.get(dataset_id)
validation_results = {
"dataset_id": dataset.id,
"name": dataset.name,
"issues": [],
"warnings": [],
"score": 100
}
# Check required fields
if not dataset.name or len(dataset.name.strip()) < 3:
validation_results["issues"].append("Name is too short")
validation_results["score"] -= 20
if not dataset.description or len(dataset.description.strip()) < 10:
validation_results["issues"].append("Description is too short")
validation_results["score"] -= 15
# Check metadata completeness
required_metadata = ["category", "purpose", "owner"]
missing_metadata = []
for field in required_metadata:
if not dataset.metadata.get(field):
missing_metadata.append(field)
if missing_metadata:
validation_results["issues"].append(f"Missing metadata: {', '.join(missing_metadata)}")
validation_results["score"] -= len(missing_metadata) * 10
# Check log count
if dataset.log_count == 0:
validation_results["warnings"].append("Dataset is empty")
validation_results["score"] -= 5
elif dataset.log_count < 10:
validation_results["warnings"].append("Dataset has very few logs")
validation_results["score"] -= 3
# Check age vs activity
from datetime import datetime, timedelta
created = datetime.fromisoformat(dataset.created_at.replace('Z', '+00:00'))
updated = datetime.fromisoformat(dataset.updated_at.replace('Z', '+00:00'))
if created == updated and dataset.log_count > 0:
validation_results["warnings"].append("Dataset never updated after creation")
age_days = (datetime.now(created.tzinfo) - created).days
if age_days > 90 and updated < datetime.now(created.tzinfo) - timedelta(days=30):
validation_results["warnings"].append("Dataset hasn't been updated in over 30 days")
# Determine overall status
if validation_results["score"] >= 90:
validation_results["status"] = "Excellent"
elif validation_results["score"] >= 75:
validation_results["status"] = "Good"
elif validation_results["score"] >= 60:
validation_results["status"] = "Fair"
else:
validation_results["status"] = "Poor"
print(f"Validation Results for {dataset.name}:")
print(f" Status: {validation_results['status']} (Score: {validation_results['score']}/100)")
if validation_results["issues"]:
print(f" Issues:")
for issue in validation_results["issues"]:
print(f" - {issue}")
if validation_results["warnings"]:
print(f" Warnings:")
for warning in validation_results["warnings"]:
print(f" - {warning}")
return validation_results
except Exception as e:
print(f"Error validating dataset {dataset_id}: {e}")
return None
# Validate a dataset
validation = validate_dataset("dataset_123456789")
def compare_datasets(dataset_id1, dataset_id2):
try:
dataset1 = client.datasets.get(dataset_id1)
dataset2 = client.datasets.get(dataset_id2)
comparison = {
"dataset1": {
"id": dataset1.id,
"name": dataset1.name,
"log_count": dataset1.log_count,
"created_at": dataset1.created_at,
"metadata": dataset1.metadata
},
"dataset2": {
"id": dataset2.id,
"name": dataset2.name,
"log_count": dataset2.log_count,
"created_at": dataset2.created_at,
"metadata": dataset2.metadata
},
"differences": []
}
# Compare log counts
log_diff = abs(dataset1.log_count - dataset2.log_count)
if log_diff > 0:
larger = "dataset1" if dataset1.log_count > dataset2.log_count else "dataset2"
comparison["differences"].append(
f"Log count difference: {log_diff:,} logs ({larger} is larger)"
)
# Compare creation dates
from datetime import datetime
created1 = datetime.fromisoformat(dataset1.created_at.replace('Z', '+00:00'))
created2 = datetime.fromisoformat(dataset2.created_at.replace('Z', '+00:00'))
date_diff = abs((created1 - created2).days)
if date_diff > 0:
older = "dataset1" if created1 < created2 else "dataset2"
comparison["differences"].append(
f"Creation date difference: {date_diff} days ({older} is older)"
)
# Compare metadata
metadata1_keys = set(dataset1.metadata.keys())
metadata2_keys = set(dataset2.metadata.keys())
unique_to_1 = metadata1_keys - metadata2_keys
unique_to_2 = metadata2_keys - metadata1_keys
common_keys = metadata1_keys & metadata2_keys
if unique_to_1:
comparison["differences"].append(
f"Metadata unique to dataset1: {', '.join(unique_to_1)}"
)
if unique_to_2:
comparison["differences"].append(
f"Metadata unique to dataset2: {', '.join(unique_to_2)}"
)
# Compare common metadata values
for key in common_keys:
if dataset1.metadata[key] != dataset2.metadata[key]:
comparison["differences"].append(
f"Different {key}: '{dataset1.metadata[key]}' vs '{dataset2.metadata[key]}'"
)
print(f"Dataset Comparison:")
print(f" Dataset 1: {dataset1.name} ({dataset1.log_count:,} logs)")
print(f" Dataset 2: {dataset2.name} ({dataset2.log_count:,} logs)")
if comparison["differences"]:
print(f" Differences:")
for diff in comparison["differences"]:
print(f" - {diff}")
else:
print(f" No significant differences found")
return comparison
except Exception as e:
print(f"Error comparing datasets: {e}")
return None
# Compare two datasets
comparison = compare_datasets("dataset_123456789", "dataset_987654321")
def analyze_dataset(dataset_id):
try:
dataset = client.datasets.get(dataset_id)
from datetime import datetime
created = datetime.fromisoformat(dataset.created_at.replace('Z', '+00:00'))
updated = datetime.fromisoformat(dataset.updated_at.replace('Z', '+00:00'))
now = datetime.now(created.tzinfo)
analytics = {
"basic_info": {
"id": dataset.id,
"name": dataset.name,
"description_length": len(dataset.description) if dataset.description else 0,
"log_count": dataset.log_count
},
"temporal_analysis": {
"age_days": (now - created).days,
"last_update_days_ago": (now - updated).days,
"creation_date": created.strftime("%Y-%m-%d"),
"last_update_date": updated.strftime("%Y-%m-%d"),
"is_recently_active": (now - updated).days <= 7
},
"metadata_analysis": {
"metadata_count": len(dataset.metadata),
"has_category": "category" in dataset.metadata,
"has_purpose": "purpose" in dataset.metadata,
"has_owner": "owner" in dataset.metadata,
"metadata_keys": list(dataset.metadata.keys())
},
"size_analysis": {
"size_category": (
"empty" if dataset.log_count == 0 else
"small" if dataset.log_count < 100 else
"medium" if dataset.log_count < 1000 else
"large" if dataset.log_count < 10000 else
"very_large"
),
"logs_per_day_since_creation": (
dataset.log_count / max((now - created).days, 1)
)
}
}
# Quality score calculation
quality_score = 0
# Description quality (0-25 points)
if analytics["basic_info"]["description_length"] >= 50:
quality_score += 25
elif analytics["basic_info"]["description_length"] >= 20:
quality_score += 15
elif analytics["basic_info"]["description_length"] >= 10:
quality_score += 10
# Metadata completeness (0-25 points)
metadata_score = 0
if analytics["metadata_analysis"]["has_category"]:
metadata_score += 8
if analytics["metadata_analysis"]["has_purpose"]:
metadata_score += 8
if analytics["metadata_analysis"]["has_owner"]:
metadata_score += 9
quality_score += metadata_score
# Content volume (0-25 points)
if dataset.log_count >= 1000:
quality_score += 25
elif dataset.log_count >= 100:
quality_score += 20
elif dataset.log_count >= 10:
quality_score += 15
elif dataset.log_count > 0:
quality_score += 10
# Recency (0-25 points)
if analytics["temporal_analysis"]["last_update_days_ago"] <= 7:
quality_score += 25
elif analytics["temporal_analysis"]["last_update_days_ago"] <= 30:
quality_score += 20
elif analytics["temporal_analysis"]["last_update_days_ago"] <= 90:
quality_score += 15
elif analytics["temporal_analysis"]["last_update_days_ago"] <= 180:
quality_score += 10
analytics["quality_score"] = quality_score
print(f"Dataset Analytics: {dataset.name}")
print(f" Quality Score: {quality_score}/100")
print(f" Size: {analytics['size_analysis']['size_category']} ({dataset.log_count:,} logs)")
print(f" Age: {analytics['temporal_analysis']['age_days']} days")
print(f" Last Updated: {analytics['temporal_analysis']['last_update_days_ago']} days ago")
print(f" Metadata Fields: {analytics['metadata_analysis']['metadata_count']}")
print(f" Logs per Day: {analytics['size_analysis']['logs_per_day_since_creation']:.2f}")
return analytics
except Exception as e:
print(f"Error analyzing dataset {dataset_id}: {e}")
return None
# Analyze a dataset
analysis = analyze_dataset("dataset_123456789")
from keywordsai.exceptions import (
KeywordsAIError,
NotFoundError,
ValidationError,
AuthenticationError,
RateLimitError
)
import time
def get_dataset_with_retry(dataset_id, max_retries=3):
for attempt in range(max_retries):
try:
dataset = client.datasets.get(dataset_id)
return dataset
except NotFoundError:
print(f"Dataset {dataset_id} not found")
return None # Don't retry for not found
except AuthenticationError:
print("Authentication failed. Check your API key.")
return None # Don't retry for auth errors
except ValidationError as e:
print(f"Validation error: {e}")
return None # Don't retry for validation errors
except RateLimitError:
if attempt < max_retries - 1:
wait_time = 2 ** attempt # Exponential backoff
print(f"Rate limited. Retrying in {wait_time} seconds...")
time.sleep(wait_time)
else:
print("Rate limit exceeded. Max retries reached.")
return None
except KeywordsAIError as e:
print(f"API error (attempt {attempt + 1}): {e}")
if attempt < max_retries - 1:
time.sleep(1)
else:
return None
except Exception as e:
print(f"Unexpected error (attempt {attempt + 1}): {e}")
if attempt < max_retries - 1:
time.sleep(1)
else:
return None
return None
# Use retry logic
dataset = get_dataset_with_retry("dataset_123456789")
if dataset:
print(f"Successfully retrieved: {dataset.name}")
else:
print("Failed to retrieve dataset")
def validate_dataset_id(dataset_id):
"""Validate dataset ID format before making API call."""
if not dataset_id:
return False, "Dataset ID is required"
if not isinstance(dataset_id, str):
return False, "Dataset ID must be a string"
if not dataset_id.startswith("dataset_"):
return False, "Dataset ID must start with 'dataset_'"
if len(dataset_id) < 10:
return False, "Dataset ID is too short"
return True, "Valid"
def get_dataset_safely(dataset_id):
# Validate ID format first
is_valid, message = validate_dataset_id(dataset_id)
if not is_valid:
print(f"Invalid dataset ID: {message}")
return None
try:
dataset = client.datasets.get(dataset_id)
return dataset
except Exception as e:
print(f"Error retrieving dataset: {e}")
return None
# Safe retrieval with validation
dataset = get_dataset_safely("dataset_123456789")
class DatasetCache:
def __init__(self, max_size=100, ttl_seconds=300):
self.cache = {}
self.max_size = max_size
self.ttl_seconds = ttl_seconds
def get(self, dataset_id):
import time
# Check if in cache and not expired
if dataset_id in self.cache:
dataset, timestamp = self.cache[dataset_id]
if time.time() - timestamp < self.ttl_seconds:
print(f"Cache hit for {dataset_id}")
return dataset
else:
# Remove expired entry
del self.cache[dataset_id]
# Fetch from API
try:
dataset = client.datasets.get(dataset_id)
# Add to cache
if len(self.cache) >= self.max_size:
# Remove oldest entry
oldest_key = min(self.cache.keys(),
key=lambda k: self.cache[k][1])
del self.cache[oldest_key]
self.cache[dataset_id] = (dataset, time.time())
print(f"Cached {dataset_id}")
return dataset
except Exception as e:
print(f"Error fetching {dataset_id}: {e}")
return None
def clear(self):
self.cache.clear()
# Use caching
cache = DatasetCache(max_size=50, ttl_seconds=600) # 10 minutes TTL
# First call - fetches from API
dataset1 = cache.get("dataset_123456789")
# Second call - uses cache
dataset2 = cache.get("dataset_123456789")
import time
from functools import wraps
def monitor_performance(func):
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
end_time = time.time()
duration = end_time - start_time
print(f"{func.__name__} completed in {duration:.3f} seconds")
return result
except Exception as e:
end_time = time.time()
duration = end_time - start_time
print(f"{func.__name__} failed after {duration:.3f} seconds: {e}")
raise
return wrapper
@monitor_performance
def get_dataset_monitored(dataset_id):
return client.datasets.get(dataset_id)
# Monitor performance
dataset = get_dataset_monitored("dataset_123456789")
def health_check_dataset(dataset_id):
try:
dataset = client.datasets.get(dataset_id)
health_status = {
"dataset_id": dataset.id,
"name": dataset.name,
"status": "healthy",
"issues": [],
"recommendations": []
}
# Check if dataset is empty
if dataset.log_count == 0:
health_status["issues"].append("Dataset is empty")
health_status["recommendations"].append("Add logs to the dataset")
health_status["status"] = "warning"
# Check if dataset is stale
from datetime import datetime, timedelta
updated = datetime.fromisoformat(dataset.updated_at.replace('Z', '+00:00'))
if datetime.now(updated.tzinfo) - updated > timedelta(days=30):
health_status["issues"].append("Dataset hasn't been updated in 30+ days")
health_status["recommendations"].append("Consider updating or archiving")
if health_status["status"] == "healthy":
health_status["status"] = "warning"
# Check metadata completeness
required_fields = ["category", "purpose"]
missing_fields = [f for f in required_fields if f not in dataset.metadata]
if missing_fields:
health_status["issues"].append(f"Missing metadata: {', '.join(missing_fields)}")
health_status["recommendations"].append("Add missing metadata fields")
print(f"Health Check: {dataset.name}")
print(f" Status: {health_status['status'].upper()}")
if health_status["issues"]:
print(f" Issues: {len(health_status['issues'])}")
for issue in health_status["issues"]:
print(f" - {issue}")
if health_status["recommendations"]:
print(f" Recommendations:")
for rec in health_status["recommendations"]:
print(f" - {rec}")
return health_status
except Exception as e:
return {
"dataset_id": dataset_id,
"status": "error",
"error": str(e)
}
# Run health check
health = health_check_dataset("dataset_123456789")
def get_dataset_backup_info(dataset_id):
try:
dataset = client.datasets.get(dataset_id)
backup_info = {
"dataset_id": dataset.id,
"name": dataset.name,
"backup_priority": "medium",
"estimated_size_mb": dataset.log_count * 0.5, # Rough estimate
"backup_frequency": "weekly",
"retention_period": "1_year"
}
# Determine backup priority based on size and activity
if dataset.log_count > 10000:
backup_info["backup_priority"] = "high"
backup_info["backup_frequency"] = "daily"
elif dataset.log_count > 1000:
backup_info["backup_priority"] = "medium"
backup_info["backup_frequency"] = "weekly"
else:
backup_info["backup_priority"] = "low"
backup_info["backup_frequency"] = "monthly"
# Check if dataset is critical (based on metadata)
if dataset.metadata.get("critical") == "true":
backup_info["backup_priority"] = "critical"
backup_info["backup_frequency"] = "daily"
backup_info["retention_period"] = "5_years"
print(f"Backup Info: {dataset.name}")
print(f" Priority: {backup_info['backup_priority']}")
print(f" Frequency: {backup_info['backup_frequency']}")
print(f" Estimated Size: {backup_info['estimated_size_mb']:.1f} MB")
print(f" Retention: {backup_info['retention_period']}")
return backup_info
except Exception as e:
print(f"Error getting backup info for {dataset_id}: {e}")
return None
# Get backup information
backup_info = get_dataset_backup_info("dataset_123456789")