Update dataset information including name, description, and metadata
update()
method allows you to modify dataset information including name, description, and metadata. This is useful for maintaining accurate dataset information as your data evolves.
client.datasets.update(
dataset_id: str,
name: str = None,
description: str = None,
metadata: Dict[str, Any] = None,
**kwargs
) -> Dataset
await client.datasets.aupdate(
dataset_id: str,
name: str = None,
description: str = None,
metadata: Dict[str, Any] = None,
**kwargs
) -> Dataset
Dataset
object with the following structure:
{
"id": "dataset_123456789",
"name": "Updated Customer Support Dataset",
"description": "Updated collection of customer support conversations",
"log_count": 1250,
"created_at": "2024-01-15T10:30:00Z",
"updated_at": "2024-01-25T16:20:00Z", # Updated timestamp
"metadata": {
"category": "support",
"purpose": "training",
"version": "2.0", # Updated version
"quality_level": "high",
"last_reviewed": "2024-01-25"
}
}
from keywordsai import KeywordsAI
client = KeywordsAI(api_key="your-api-key")
# Update dataset name and description
updated_dataset = client.datasets.update(
dataset_id="dataset_123456789",
name="Enhanced Customer Support Dataset",
description="Comprehensive collection of customer support conversations with quality annotations"
)
print(f"Updated dataset: {updated_dataset.name}")
print(f"New description: {updated_dataset.description}")
print(f"Updated at: {updated_dataset.updated_at}")
# Update only the dataset name
updated_dataset = client.datasets.update(
dataset_id="dataset_123456789",
name="Customer Support Training Data v2.0"
)
print(f"New name: {updated_dataset.name}")
print(f"Description unchanged: {updated_dataset.description}")
# Update only the description
updated_dataset = client.datasets.update(
dataset_id="dataset_123456789",
description="High-quality customer support conversations collected from Q1 2024, manually reviewed and annotated for training purposes"
)
print(f"Name unchanged: {updated_dataset.name}")
print(f"New description: {updated_dataset.description}")
# Update metadata fields
updated_dataset = client.datasets.update(
dataset_id="dataset_123456789",
metadata={
"version": "2.1",
"quality_level": "premium",
"last_reviewed": "2024-01-25",
"reviewer": "data_team",
"review_notes": "Added quality annotations and removed duplicates",
"tags": ["support", "training", "quality", "reviewed"]
}
)
print(f"Updated metadata:")
for key, value in updated_dataset.metadata.items():
print(f" {key}: {value}")
# Update all fields at once
updated_dataset = client.datasets.update(
dataset_id="dataset_123456789",
name="Premium Customer Support Training Dataset",
description="Curated and annotated customer support conversations for advanced model training",
metadata={
"version": "3.0",
"category": "customer_support",
"purpose": "training",
"quality_level": "premium",
"language": "english",
"domain": "technology",
"annotation_status": "complete",
"review_status": "approved",
"last_updated": "2024-01-25",
"updated_by": "data_science_team",
"changelog": "Added sentiment labels, removed PII, quality review completed",
"tags": ["support", "training", "premium", "annotated", "reviewed"]
}
)
print(f"Comprehensive update completed:")
print(f" Name: {updated_dataset.name}")
print(f" Description: {updated_dataset.description[:100]}...")
print(f" Metadata fields: {len(updated_dataset.metadata)}")
print(f" Version: {updated_dataset.metadata.get('version')}")
# Get current dataset to preserve existing metadata
current_dataset = client.datasets.get("dataset_123456789")
# Merge new metadata with existing
new_metadata = current_dataset.metadata.copy()
new_metadata.update({
"last_backup": "2024-01-25",
"backup_location": "s3://backups/datasets/",
"backup_status": "completed"
})
# Update with merged metadata
updated_dataset = client.datasets.update(
dataset_id="dataset_123456789",
metadata=new_metadata
)
print(f"Added backup information to metadata")
print(f"Total metadata fields: {len(updated_dataset.metadata)}")
def update_dataset_if_needed(dataset_id, min_logs=100):
# Get current dataset
dataset = client.datasets.get(dataset_id)
updates = {}
# Update quality level based on log count
if dataset.log_count >= 1000 and dataset.metadata.get("quality_level") != "high":
updates["metadata"] = dataset.metadata.copy()
updates["metadata"]["quality_level"] = "high"
updates["metadata"]["auto_updated"] = "2024-01-25"
# Update name if it doesn't reflect size
if dataset.log_count >= 1000 and "Large" not in dataset.name:
updates["name"] = f"Large {dataset.name}"
# Update description if it's too short
if not dataset.description or len(dataset.description) < 50:
updates["description"] = f"Dataset containing {dataset.log_count} logs for {dataset.metadata.get('purpose', 'general')} purposes"
# Apply updates if any
if updates:
updated_dataset = client.datasets.update(dataset_id, **updates)
print(f"Applied {len(updates)} updates to {dataset.name}")
return updated_dataset
else:
print(f"No updates needed for {dataset.name}")
return dataset
# Conditionally update dataset
updated = update_dataset_if_needed("dataset_123456789")
import asyncio
async def update_dataset_async(dataset_id, **updates):
client = KeywordsAI(api_key="your-api-key")
try:
updated_dataset = await client.datasets.aupdate(dataset_id, **updates)
print(f"Async updated: {updated_dataset.name}")
print(f" Updated at: {updated_dataset.updated_at}")
return updated_dataset
except Exception as e:
print(f"Async update error: {e}")
return None
# Run async update
updated = asyncio.run(update_dataset_async(
"dataset_123456789",
name="Async Updated Dataset",
metadata={"async_update": True, "update_method": "async"}
))
import asyncio
async def update_multiple_datasets_async(updates_list):
client = KeywordsAI(api_key="your-api-key")
async def update_single_dataset(dataset_id, updates):
try:
result = await client.datasets.aupdate(dataset_id, **updates)
return {"success": True, "dataset": result, "id": dataset_id}
except Exception as e:
return {"success": False, "error": str(e), "id": dataset_id}
# Create tasks for all updates
tasks = []
for item in updates_list:
dataset_id = item["dataset_id"]
updates = {k: v for k, v in item.items() if k != "dataset_id"}
task = update_single_dataset(dataset_id, updates)
tasks.append(task)
# Wait for all updates to complete
results = await asyncio.gather(*tasks)
# Separate successful and failed updates
successful = [r["dataset"] for r in results if r["success"]]
failed = [(r["id"], r["error"]) for r in results if not r["success"]]
print(f"Batch async updates completed:")
print(f" Successful: {len(successful)}")
print(f" Failed: {len(failed)}")
for dataset in successful:
print(f" ✓ Updated: {dataset.name}")
for dataset_id, error in failed:
print(f" ✗ Failed {dataset_id}: {error}")
return successful, failed
# Batch update configuration
updates = [
{
"dataset_id": "dataset_123456789",
"metadata": {"batch_update": True, "batch_id": "batch_001"}
},
{
"dataset_id": "dataset_987654321",
"name": "Batch Updated Dataset 2",
"metadata": {"batch_update": True, "batch_id": "batch_001"}
},
{
"dataset_id": "dataset_555666777",
"description": "Updated via batch operation",
"metadata": {"batch_update": True, "batch_id": "batch_001"}
}
]
# Run batch updates
successful, failed = asyncio.run(update_multiple_datasets_async(updates))
def increment_dataset_version(dataset_id, version_notes=None):
# Get current dataset
dataset = client.datasets.get(dataset_id)
# Parse current version
current_version = dataset.metadata.get("version", "1.0")
try:
major, minor = map(int, current_version.split("."))
new_version = f"{major}.{minor + 1}"
except ValueError:
new_version = "1.1" # Default if version format is invalid
# Prepare version metadata
version_metadata = dataset.metadata.copy()
version_metadata.update({
"version": new_version,
"previous_version": current_version,
"version_updated_at": "2024-01-25T16:20:00Z",
"version_notes": version_notes or f"Incremented from {current_version}"
})
# Update dataset
updated_dataset = client.datasets.update(
dataset_id=dataset_id,
metadata=version_metadata
)
print(f"Version updated: {current_version} → {new_version}")
print(f"Notes: {version_notes or 'No notes provided'}")
return updated_dataset
# Increment version
updated = increment_dataset_version(
"dataset_123456789",
version_notes="Added quality annotations and removed duplicates"
)
def update_quality_level(dataset_id, new_quality_level, review_notes=None):
# Validate quality level
valid_levels = ["low", "medium", "high", "premium"]
if new_quality_level not in valid_levels:
raise ValueError(f"Quality level must be one of: {valid_levels}")
# Get current dataset
dataset = client.datasets.get(dataset_id)
current_quality = dataset.metadata.get("quality_level", "unknown")
# Prepare quality metadata
quality_metadata = dataset.metadata.copy()
quality_metadata.update({
"quality_level": new_quality_level,
"previous_quality_level": current_quality,
"quality_updated_at": "2024-01-25T16:20:00Z",
"quality_reviewer": "data_team",
"quality_review_notes": review_notes or f"Updated from {current_quality} to {new_quality_level}"
})
# Update name to reflect quality if premium
new_name = dataset.name
if new_quality_level == "premium" and "Premium" not in dataset.name:
new_name = f"Premium {dataset.name}"
elif new_quality_level != "premium" and "Premium" in dataset.name:
new_name = dataset.name.replace("Premium ", "")
# Update dataset
updated_dataset = client.datasets.update(
dataset_id=dataset_id,
name=new_name,
metadata=quality_metadata
)
print(f"Quality level updated: {current_quality} → {new_quality_level}")
if new_name != dataset.name:
print(f"Name updated: {dataset.name} → {new_name}")
return updated_dataset
# Update quality level
updated = update_quality_level(
"dataset_123456789",
"premium",
"Manual review completed, all logs verified for quality"
)
def cleanup_dataset_metadata(dataset_id, remove_keys=None, standardize=True):
# Get current dataset
dataset = client.datasets.get(dataset_id)
# Start with current metadata
cleaned_metadata = dataset.metadata.copy()
# Remove specified keys
if remove_keys:
for key in remove_keys:
cleaned_metadata.pop(key, None)
# Remove empty or null values
cleaned_metadata = {k: v for k, v in cleaned_metadata.items()
if v is not None and v != "" and v != []}
if standardize:
# Standardize common fields
if "category" in cleaned_metadata:
cleaned_metadata["category"] = cleaned_metadata["category"].lower().replace(" ", "_")
if "purpose" in cleaned_metadata:
cleaned_metadata["purpose"] = cleaned_metadata["purpose"].lower().replace(" ", "_")
# Ensure required fields exist
if "created_date" not in cleaned_metadata:
cleaned_metadata["created_date"] = dataset.created_at[:10] # YYYY-MM-DD
if "last_cleanup" not in cleaned_metadata:
cleaned_metadata["last_cleanup"] = "2024-01-25"
# Update dataset
updated_dataset = client.datasets.update(
dataset_id=dataset_id,
metadata=cleaned_metadata
)
print(f"Metadata cleanup completed:")
print(f" Original fields: {len(dataset.metadata)}")
print(f" Cleaned fields: {len(cleaned_metadata)}")
print(f" Removed: {len(dataset.metadata) - len(cleaned_metadata)} fields")
return updated_dataset
# Cleanup metadata
updated = cleanup_dataset_metadata(
"dataset_123456789",
remove_keys=["temp_field", "debug_info", "old_version"],
standardize=True
)
def update_dataset_status(dataset_id, status, status_notes=None):
# Valid statuses
valid_statuses = [
"draft", "in_progress", "review", "approved",
"active", "archived", "deprecated"
]
if status not in valid_statuses:
raise ValueError(f"Status must be one of: {valid_statuses}")
# Get current dataset
dataset = client.datasets.get(dataset_id)
current_status = dataset.metadata.get("status", "unknown")
# Prepare status metadata
status_metadata = dataset.metadata.copy()
status_metadata.update({
"status": status,
"previous_status": current_status,
"status_updated_at": "2024-01-25T16:20:00Z",
"status_updated_by": "system",
"status_notes": status_notes or f"Status changed from {current_status} to {status}"
})
# Update description to reflect status if archived or deprecated
new_description = dataset.description
if status in ["archived", "deprecated"]:
status_prefix = f"[{status.upper()}] "
if not new_description.startswith(status_prefix):
new_description = status_prefix + new_description
else:
# Remove status prefixes if status is active
for prefix in ["[ARCHIVED] ", "[DEPRECATED] "]:
if new_description.startswith(prefix):
new_description = new_description[len(prefix):]
# Update dataset
updated_dataset = client.datasets.update(
dataset_id=dataset_id,
description=new_description,
metadata=status_metadata
)
print(f"Status updated: {current_status} → {status}")
if status_notes:
print(f"Notes: {status_notes}")
return updated_dataset
# Update status
updated = update_dataset_status(
"dataset_123456789",
"approved",
"Quality review completed, ready for production use"
)
def perform_maintenance_update(dataset_id):
from datetime import datetime
# Get current dataset
dataset = client.datasets.get(dataset_id)
# Prepare maintenance metadata
maintenance_metadata = dataset.metadata.copy()
# Update maintenance fields
maintenance_metadata.update({
"last_maintenance": datetime.now().isoformat()[:10],
"maintenance_version": maintenance_metadata.get("maintenance_version", 0) + 1,
"health_check_passed": True,
"metadata_validated": True,
"backup_verified": True
})
# Calculate and update statistics
if dataset.log_count > 0:
# Estimate dataset health score
health_score = 100
# Deduct points for various issues
if not dataset.description or len(dataset.description) < 20:
health_score -= 10
if len(dataset.metadata) < 3:
health_score -= 15
# Check age
created = datetime.fromisoformat(dataset.created_at.replace('Z', '+00:00'))
age_days = (datetime.now(created.tzinfo) - created).days
if age_days > 180: # Older than 6 months
health_score -= 5
maintenance_metadata["health_score"] = max(0, health_score)
maintenance_metadata["size_category"] = (
"small" if dataset.log_count < 100 else
"medium" if dataset.log_count < 1000 else
"large" if dataset.log_count < 10000 else
"very_large"
)
# Update dataset
updated_dataset = client.datasets.update(
dataset_id=dataset_id,
metadata=maintenance_metadata
)
print(f"Maintenance update completed for {dataset.name}")
print(f" Health score: {maintenance_metadata.get('health_score', 'N/A')}")
print(f" Size category: {maintenance_metadata.get('size_category', 'N/A')}")
print(f" Maintenance version: {maintenance_metadata['maintenance_version']}")
return updated_dataset
# Perform maintenance update
updated = perform_maintenance_update("dataset_123456789")
from keywordsai.exceptions import (
KeywordsAIError,
NotFoundError,
ValidationError,
AuthenticationError,
RateLimitError
)
import time
def update_dataset_with_retry(dataset_id, max_retries=3, **updates):
for attempt in range(max_retries):
try:
updated_dataset = client.datasets.update(dataset_id, **updates)
return updated_dataset
except NotFoundError:
print(f"Dataset {dataset_id} not found")
return None # Don't retry for not found
except AuthenticationError:
print("Authentication failed. Check your API key.")
return None # Don't retry for auth errors
except ValidationError as e:
print(f"Validation error: {e}")
return None # Don't retry for validation errors
except RateLimitError:
if attempt < max_retries - 1:
wait_time = 2 ** attempt # Exponential backoff
print(f"Rate limited. Retrying in {wait_time} seconds...")
time.sleep(wait_time)
else:
print("Rate limit exceeded. Max retries reached.")
return None
except KeywordsAIError as e:
print(f"API error (attempt {attempt + 1}): {e}")
if attempt < max_retries - 1:
time.sleep(1)
else:
return None
except Exception as e:
print(f"Unexpected error (attempt {attempt + 1}): {e}")
if attempt < max_retries - 1:
time.sleep(1)
else:
return None
return None
# Use retry logic
updated = update_dataset_with_retry(
"dataset_123456789",
name="Retry Updated Dataset",
metadata={"retry_update": True}
)
def validate_update_data(name=None, description=None, metadata=None):
errors = []
# Validate name
if name is not None:
if not isinstance(name, str):
errors.append("Name must be a string")
elif len(name.strip()) < 3:
errors.append("Name must be at least 3 characters long")
elif len(name) > 100:
errors.append("Name must be less than 100 characters")
# Validate description
if description is not None:
if not isinstance(description, str):
errors.append("Description must be a string")
elif len(description) > 1000:
errors.append("Description must be less than 1000 characters")
# Validate metadata
if metadata is not None:
if not isinstance(metadata, dict):
errors.append("Metadata must be a dictionary")
elif len(str(metadata)) > 5000: # Rough size check
errors.append("Metadata is too large")
# Check for reserved keys
reserved_keys = ["id", "created_at", "updated_at", "log_count"]
for key in reserved_keys:
if key in metadata:
errors.append(f"Cannot update reserved metadata key: {key}")
return errors
def update_dataset_safely(dataset_id, **updates):
# Validate input
errors = validate_update_data(**updates)
if errors:
print("Validation errors:")
for error in errors:
print(f" - {error}")
return None
# Perform update
try:
updated_dataset = client.datasets.update(dataset_id, **updates)
return updated_dataset
except Exception as e:
print(f"Update failed: {e}")
return None
# Safe update with validation
updated = update_dataset_safely(
"dataset_123456789",
name="Validated Update",
description="This update has been validated",
metadata={"validated": True}
)
def atomic_dataset_update(dataset_id, updates_dict):
"""Perform all updates in a single API call to ensure atomicity."""
try:
# Get current dataset for backup
original_dataset = client.datasets.get(dataset_id)
# Perform update
updated_dataset = client.datasets.update(dataset_id, **updates_dict)
print(f"Atomic update successful for {original_dataset.name}")
print(f" Updated fields: {list(updates_dict.keys())}")
return updated_dataset
except Exception as e:
print(f"Atomic update failed: {e}")
print(f"Dataset remains in original state")
return None
# Atomic update
updated = atomic_dataset_update(
"dataset_123456789",
{
"name": "Atomically Updated Dataset",
"description": "Updated atomically",
"metadata": {"atomic_update": True, "update_timestamp": "2024-01-25"}
}
)
def update_with_changelog(dataset_id, changes_description, **updates):
# Get current dataset
current_dataset = client.datasets.get(dataset_id)
# Prepare changelog metadata
changelog_metadata = current_dataset.metadata.copy()
# Add to existing metadata if provided
if "metadata" in updates:
changelog_metadata.update(updates["metadata"])
# Add changelog information
changelog_metadata.update({
"last_change_description": changes_description,
"last_change_timestamp": "2024-01-25T16:20:00Z",
"change_count": changelog_metadata.get("change_count", 0) + 1
})
# Update the metadata in updates
updates["metadata"] = changelog_metadata
# Perform update
updated_dataset = client.datasets.update(dataset_id, **updates)
print(f"Update with changelog completed:")
print(f" Change: {changes_description}")
print(f" Total changes: {changelog_metadata['change_count']}")
return updated_dataset
# Update with changelog
updated = update_with_changelog(
"dataset_123456789",
"Added quality annotations and updated categorization",
name="Quality Annotated Dataset",
metadata={"quality_annotations": True, "annotation_date": "2024-01-25"}
)
def update_with_backup(dataset_id, **updates):
# Get current dataset as backup
backup_dataset = client.datasets.get(dataset_id)
# Store backup information
backup_info = {
"original_name": backup_dataset.name,
"original_description": backup_dataset.description,
"original_metadata": backup_dataset.metadata.copy(),
"backup_timestamp": "2024-01-25T16:20:00Z"
}
try:
# Add backup info to metadata if metadata is being updated
if "metadata" in updates:
updates["metadata"]["backup_info"] = backup_info
else:
# Create new metadata with backup info
new_metadata = backup_dataset.metadata.copy()
new_metadata["backup_info"] = backup_info
updates["metadata"] = new_metadata
# Perform update
updated_dataset = client.datasets.update(dataset_id, **updates)
print(f"Update with backup completed")
print(f" Backup stored in metadata")
return updated_dataset
except Exception as e:
print(f"Update failed: {e}")
print(f"Original dataset information preserved")
return None
# Update with backup
updated = update_with_backup(
"dataset_123456789",
name="Backup Protected Update",
description="Updated with backup protection"
)
def promote_dataset_to_production(dataset_id):
# Get current dataset
dataset = client.datasets.get(dataset_id)
# Validate dataset is ready for production
if dataset.log_count < 100:
print("Dataset too small for production")
return None
if dataset.metadata.get("quality_level") not in ["high", "premium"]:
print("Dataset quality not sufficient for production")
return None
# Update for production
production_metadata = dataset.metadata.copy()
production_metadata.update({
"environment": "production",
"promoted_at": "2024-01-25T16:20:00Z",
"promoted_by": "data_team",
"production_ready": True,
"status": "active"
})
updated_dataset = client.datasets.update(
dataset_id=dataset_id,
name=f"[PROD] {dataset.name}",
metadata=production_metadata
)
print(f"Dataset promoted to production: {updated_dataset.name}")
return updated_dataset
# Promote to production
production_dataset = promote_dataset_to_production("dataset_123456789")
def archive_dataset(dataset_id, archive_reason=None):
# Get current dataset
dataset = client.datasets.get(dataset_id)
# Prepare archive metadata
archive_metadata = dataset.metadata.copy()
archive_metadata.update({
"status": "archived",
"archived_at": "2024-01-25T16:20:00Z",
"archived_by": "system",
"archive_reason": archive_reason or "Routine archival",
"original_status": dataset.metadata.get("status", "active")
})
# Update dataset
updated_dataset = client.datasets.update(
dataset_id=dataset_id,
name=f"[ARCHIVED] {dataset.name}",
description=f"[ARCHIVED] {dataset.description}",
metadata=archive_metadata
)
print(f"Dataset archived: {updated_dataset.name}")
print(f"Reason: {archive_reason or 'Routine archival'}")
return updated_dataset
# Archive dataset
archived = archive_dataset(
"dataset_123456789",
"Dataset no longer needed for current projects"
)