Overview

The update() method allows you to modify dataset information including name, description, and metadata. This is useful for maintaining accurate dataset information as your data evolves.

Method Signature

Synchronous

client.datasets.update(
    dataset_id: str,
    name: str = None,
    description: str = None,
    metadata: Dict[str, Any] = None,
    **kwargs
) -> Dataset

Asynchronous

await client.datasets.aupdate(
    dataset_id: str,
    name: str = None,
    description: str = None,
    metadata: Dict[str, Any] = None,
    **kwargs
) -> Dataset

Parameters

dataset_id
str
required
The unique identifier of the dataset to update.
name
str
New name for the dataset. If not provided, the current name is preserved.
description
str
New description for the dataset. If not provided, the current description is preserved.
metadata
Dict[str, Any]
New metadata for the dataset. This will merge with existing metadata.

Returns

Returns the updated Dataset object with the following structure:
{
    "id": "dataset_123456789",
    "name": "Updated Customer Support Dataset",
    "description": "Updated collection of customer support conversations",
    "log_count": 1250,
    "created_at": "2024-01-15T10:30:00Z",
    "updated_at": "2024-01-25T16:20:00Z",  # Updated timestamp
    "metadata": {
        "category": "support",
        "purpose": "training",
        "version": "2.0",  # Updated version
        "quality_level": "high",
        "last_reviewed": "2024-01-25"
    }
}

Examples

Basic Dataset Update

from keywordsai import KeywordsAI

client = KeywordsAI(api_key="your-api-key")

# Update dataset name and description
updated_dataset = client.datasets.update(
    dataset_id="dataset_123456789",
    name="Enhanced Customer Support Dataset",
    description="Comprehensive collection of customer support conversations with quality annotations"
)

print(f"Updated dataset: {updated_dataset.name}")
print(f"New description: {updated_dataset.description}")
print(f"Updated at: {updated_dataset.updated_at}")

Update Only Name

# Update only the dataset name
updated_dataset = client.datasets.update(
    dataset_id="dataset_123456789",
    name="Customer Support Training Data v2.0"
)

print(f"New name: {updated_dataset.name}")
print(f"Description unchanged: {updated_dataset.description}")

Update Only Description

# Update only the description
updated_dataset = client.datasets.update(
    dataset_id="dataset_123456789",
    description="High-quality customer support conversations collected from Q1 2024, manually reviewed and annotated for training purposes"
)

print(f"Name unchanged: {updated_dataset.name}")
print(f"New description: {updated_dataset.description}")

Update Metadata

# Update metadata fields
updated_dataset = client.datasets.update(
    dataset_id="dataset_123456789",
    metadata={
        "version": "2.1",
        "quality_level": "premium",
        "last_reviewed": "2024-01-25",
        "reviewer": "data_team",
        "review_notes": "Added quality annotations and removed duplicates",
        "tags": ["support", "training", "quality", "reviewed"]
    }
)

print(f"Updated metadata:")
for key, value in updated_dataset.metadata.items():
    print(f"  {key}: {value}")

Comprehensive Update

# Update all fields at once
updated_dataset = client.datasets.update(
    dataset_id="dataset_123456789",
    name="Premium Customer Support Training Dataset",
    description="Curated and annotated customer support conversations for advanced model training",
    metadata={
        "version": "3.0",
        "category": "customer_support",
        "purpose": "training",
        "quality_level": "premium",
        "language": "english",
        "domain": "technology",
        "annotation_status": "complete",
        "review_status": "approved",
        "last_updated": "2024-01-25",
        "updated_by": "data_science_team",
        "changelog": "Added sentiment labels, removed PII, quality review completed",
        "tags": ["support", "training", "premium", "annotated", "reviewed"]
    }
)

print(f"Comprehensive update completed:")
print(f"  Name: {updated_dataset.name}")
print(f"  Description: {updated_dataset.description[:100]}...")
print(f"  Metadata fields: {len(updated_dataset.metadata)}")
print(f"  Version: {updated_dataset.metadata.get('version')}")

Incremental Metadata Updates

# Get current dataset to preserve existing metadata
current_dataset = client.datasets.get("dataset_123456789")

# Merge new metadata with existing
new_metadata = current_dataset.metadata.copy()
new_metadata.update({
    "last_backup": "2024-01-25",
    "backup_location": "s3://backups/datasets/",
    "backup_status": "completed"
})

# Update with merged metadata
updated_dataset = client.datasets.update(
    dataset_id="dataset_123456789",
    metadata=new_metadata
)

print(f"Added backup information to metadata")
print(f"Total metadata fields: {len(updated_dataset.metadata)}")

Conditional Updates

def update_dataset_if_needed(dataset_id, min_logs=100):
    # Get current dataset
    dataset = client.datasets.get(dataset_id)
    
    updates = {}
    
    # Update quality level based on log count
    if dataset.log_count >= 1000 and dataset.metadata.get("quality_level") != "high":
        updates["metadata"] = dataset.metadata.copy()
        updates["metadata"]["quality_level"] = "high"
        updates["metadata"]["auto_updated"] = "2024-01-25"
    
    # Update name if it doesn't reflect size
    if dataset.log_count >= 1000 and "Large" not in dataset.name:
        updates["name"] = f"Large {dataset.name}"
    
    # Update description if it's too short
    if not dataset.description or len(dataset.description) < 50:
        updates["description"] = f"Dataset containing {dataset.log_count} logs for {dataset.metadata.get('purpose', 'general')} purposes"
    
    # Apply updates if any
    if updates:
        updated_dataset = client.datasets.update(dataset_id, **updates)
        print(f"Applied {len(updates)} updates to {dataset.name}")
        return updated_dataset
    else:
        print(f"No updates needed for {dataset.name}")
        return dataset

# Conditionally update dataset
updated = update_dataset_if_needed("dataset_123456789")

Asynchronous Updates

import asyncio

async def update_dataset_async(dataset_id, **updates):
    client = KeywordsAI(api_key="your-api-key")
    
    try:
        updated_dataset = await client.datasets.aupdate(dataset_id, **updates)
        print(f"Async updated: {updated_dataset.name}")
        print(f"  Updated at: {updated_dataset.updated_at}")
        return updated_dataset
    except Exception as e:
        print(f"Async update error: {e}")
        return None

# Run async update
updated = asyncio.run(update_dataset_async(
    "dataset_123456789",
    name="Async Updated Dataset",
    metadata={"async_update": True, "update_method": "async"}
))

Batch Async Updates

import asyncio

async def update_multiple_datasets_async(updates_list):
    client = KeywordsAI(api_key="your-api-key")
    
    async def update_single_dataset(dataset_id, updates):
        try:
            result = await client.datasets.aupdate(dataset_id, **updates)
            return {"success": True, "dataset": result, "id": dataset_id}
        except Exception as e:
            return {"success": False, "error": str(e), "id": dataset_id}
    
    # Create tasks for all updates
    tasks = []
    for item in updates_list:
        dataset_id = item["dataset_id"]
        updates = {k: v for k, v in item.items() if k != "dataset_id"}
        task = update_single_dataset(dataset_id, updates)
        tasks.append(task)
    
    # Wait for all updates to complete
    results = await asyncio.gather(*tasks)
    
    # Separate successful and failed updates
    successful = [r["dataset"] for r in results if r["success"]]
    failed = [(r["id"], r["error"]) for r in results if not r["success"]]
    
    print(f"Batch async updates completed:")
    print(f"  Successful: {len(successful)}")
    print(f"  Failed: {len(failed)}")
    
    for dataset in successful:
        print(f"  ✓ Updated: {dataset.name}")
    
    for dataset_id, error in failed:
        print(f"  ✗ Failed {dataset_id}: {error}")
    
    return successful, failed

# Batch update configuration
updates = [
    {
        "dataset_id": "dataset_123456789",
        "metadata": {"batch_update": True, "batch_id": "batch_001"}
    },
    {
        "dataset_id": "dataset_987654321",
        "name": "Batch Updated Dataset 2",
        "metadata": {"batch_update": True, "batch_id": "batch_001"}
    },
    {
        "dataset_id": "dataset_555666777",
        "description": "Updated via batch operation",
        "metadata": {"batch_update": True, "batch_id": "batch_001"}
    }
]

# Run batch updates
successful, failed = asyncio.run(update_multiple_datasets_async(updates))

Advanced Use Cases

Version Management

def increment_dataset_version(dataset_id, version_notes=None):
    # Get current dataset
    dataset = client.datasets.get(dataset_id)
    
    # Parse current version
    current_version = dataset.metadata.get("version", "1.0")
    try:
        major, minor = map(int, current_version.split("."))
        new_version = f"{major}.{minor + 1}"
    except ValueError:
        new_version = "1.1"  # Default if version format is invalid
    
    # Prepare version metadata
    version_metadata = dataset.metadata.copy()
    version_metadata.update({
        "version": new_version,
        "previous_version": current_version,
        "version_updated_at": "2024-01-25T16:20:00Z",
        "version_notes": version_notes or f"Incremented from {current_version}"
    })
    
    # Update dataset
    updated_dataset = client.datasets.update(
        dataset_id=dataset_id,
        metadata=version_metadata
    )
    
    print(f"Version updated: {current_version}{new_version}")
    print(f"Notes: {version_notes or 'No notes provided'}")
    
    return updated_dataset

# Increment version
updated = increment_dataset_version(
    "dataset_123456789",
    version_notes="Added quality annotations and removed duplicates"
)

Quality Level Management

def update_quality_level(dataset_id, new_quality_level, review_notes=None):
    # Validate quality level
    valid_levels = ["low", "medium", "high", "premium"]
    if new_quality_level not in valid_levels:
        raise ValueError(f"Quality level must be one of: {valid_levels}")
    
    # Get current dataset
    dataset = client.datasets.get(dataset_id)
    current_quality = dataset.metadata.get("quality_level", "unknown")
    
    # Prepare quality metadata
    quality_metadata = dataset.metadata.copy()
    quality_metadata.update({
        "quality_level": new_quality_level,
        "previous_quality_level": current_quality,
        "quality_updated_at": "2024-01-25T16:20:00Z",
        "quality_reviewer": "data_team",
        "quality_review_notes": review_notes or f"Updated from {current_quality} to {new_quality_level}"
    })
    
    # Update name to reflect quality if premium
    new_name = dataset.name
    if new_quality_level == "premium" and "Premium" not in dataset.name:
        new_name = f"Premium {dataset.name}"
    elif new_quality_level != "premium" and "Premium" in dataset.name:
        new_name = dataset.name.replace("Premium ", "")
    
    # Update dataset
    updated_dataset = client.datasets.update(
        dataset_id=dataset_id,
        name=new_name,
        metadata=quality_metadata
    )
    
    print(f"Quality level updated: {current_quality}{new_quality_level}")
    if new_name != dataset.name:
        print(f"Name updated: {dataset.name}{new_name}")
    
    return updated_dataset

# Update quality level
updated = update_quality_level(
    "dataset_123456789",
    "premium",
    "Manual review completed, all logs verified for quality"
)

Metadata Cleanup

def cleanup_dataset_metadata(dataset_id, remove_keys=None, standardize=True):
    # Get current dataset
    dataset = client.datasets.get(dataset_id)
    
    # Start with current metadata
    cleaned_metadata = dataset.metadata.copy()
    
    # Remove specified keys
    if remove_keys:
        for key in remove_keys:
            cleaned_metadata.pop(key, None)
    
    # Remove empty or null values
    cleaned_metadata = {k: v for k, v in cleaned_metadata.items() 
                       if v is not None and v != "" and v != []}
    
    if standardize:
        # Standardize common fields
        if "category" in cleaned_metadata:
            cleaned_metadata["category"] = cleaned_metadata["category"].lower().replace(" ", "_")
        
        if "purpose" in cleaned_metadata:
            cleaned_metadata["purpose"] = cleaned_metadata["purpose"].lower().replace(" ", "_")
        
        # Ensure required fields exist
        if "created_date" not in cleaned_metadata:
            cleaned_metadata["created_date"] = dataset.created_at[:10]  # YYYY-MM-DD
        
        if "last_cleanup" not in cleaned_metadata:
            cleaned_metadata["last_cleanup"] = "2024-01-25"
    
    # Update dataset
    updated_dataset = client.datasets.update(
        dataset_id=dataset_id,
        metadata=cleaned_metadata
    )
    
    print(f"Metadata cleanup completed:")
    print(f"  Original fields: {len(dataset.metadata)}")
    print(f"  Cleaned fields: {len(cleaned_metadata)}")
    print(f"  Removed: {len(dataset.metadata) - len(cleaned_metadata)} fields")
    
    return updated_dataset

# Cleanup metadata
updated = cleanup_dataset_metadata(
    "dataset_123456789",
    remove_keys=["temp_field", "debug_info", "old_version"],
    standardize=True
)

Status Tracking

def update_dataset_status(dataset_id, status, status_notes=None):
    # Valid statuses
    valid_statuses = [
        "draft", "in_progress", "review", "approved", 
        "active", "archived", "deprecated"
    ]
    
    if status not in valid_statuses:
        raise ValueError(f"Status must be one of: {valid_statuses}")
    
    # Get current dataset
    dataset = client.datasets.get(dataset_id)
    current_status = dataset.metadata.get("status", "unknown")
    
    # Prepare status metadata
    status_metadata = dataset.metadata.copy()
    status_metadata.update({
        "status": status,
        "previous_status": current_status,
        "status_updated_at": "2024-01-25T16:20:00Z",
        "status_updated_by": "system",
        "status_notes": status_notes or f"Status changed from {current_status} to {status}"
    })
    
    # Update description to reflect status if archived or deprecated
    new_description = dataset.description
    if status in ["archived", "deprecated"]:
        status_prefix = f"[{status.upper()}] "
        if not new_description.startswith(status_prefix):
            new_description = status_prefix + new_description
    else:
        # Remove status prefixes if status is active
        for prefix in ["[ARCHIVED] ", "[DEPRECATED] "]:
            if new_description.startswith(prefix):
                new_description = new_description[len(prefix):]
    
    # Update dataset
    updated_dataset = client.datasets.update(
        dataset_id=dataset_id,
        description=new_description,
        metadata=status_metadata
    )
    
    print(f"Status updated: {current_status}{status}")
    if status_notes:
        print(f"Notes: {status_notes}")
    
    return updated_dataset

# Update status
updated = update_dataset_status(
    "dataset_123456789",
    "approved",
    "Quality review completed, ready for production use"
)

Automated Maintenance Updates

def perform_maintenance_update(dataset_id):
    from datetime import datetime
    
    # Get current dataset
    dataset = client.datasets.get(dataset_id)
    
    # Prepare maintenance metadata
    maintenance_metadata = dataset.metadata.copy()
    
    # Update maintenance fields
    maintenance_metadata.update({
        "last_maintenance": datetime.now().isoformat()[:10],
        "maintenance_version": maintenance_metadata.get("maintenance_version", 0) + 1,
        "health_check_passed": True,
        "metadata_validated": True,
        "backup_verified": True
    })
    
    # Calculate and update statistics
    if dataset.log_count > 0:
        # Estimate dataset health score
        health_score = 100
        
        # Deduct points for various issues
        if not dataset.description or len(dataset.description) < 20:
            health_score -= 10
        
        if len(dataset.metadata) < 3:
            health_score -= 15
        
        # Check age
        created = datetime.fromisoformat(dataset.created_at.replace('Z', '+00:00'))
        age_days = (datetime.now(created.tzinfo) - created).days
        if age_days > 180:  # Older than 6 months
            health_score -= 5
        
        maintenance_metadata["health_score"] = max(0, health_score)
        maintenance_metadata["size_category"] = (
            "small" if dataset.log_count < 100 else
            "medium" if dataset.log_count < 1000 else
            "large" if dataset.log_count < 10000 else
            "very_large"
        )
    
    # Update dataset
    updated_dataset = client.datasets.update(
        dataset_id=dataset_id,
        metadata=maintenance_metadata
    )
    
    print(f"Maintenance update completed for {dataset.name}")
    print(f"  Health score: {maintenance_metadata.get('health_score', 'N/A')}")
    print(f"  Size category: {maintenance_metadata.get('size_category', 'N/A')}")
    print(f"  Maintenance version: {maintenance_metadata['maintenance_version']}")
    
    return updated_dataset

# Perform maintenance update
updated = perform_maintenance_update("dataset_123456789")

Error Handling

Comprehensive Error Handling

from keywordsai.exceptions import (
    KeywordsAIError,
    NotFoundError,
    ValidationError,
    AuthenticationError,
    RateLimitError
)
import time

def update_dataset_with_retry(dataset_id, max_retries=3, **updates):
    for attempt in range(max_retries):
        try:
            updated_dataset = client.datasets.update(dataset_id, **updates)
            return updated_dataset
        except NotFoundError:
            print(f"Dataset {dataset_id} not found")
            return None  # Don't retry for not found
        except AuthenticationError:
            print("Authentication failed. Check your API key.")
            return None  # Don't retry for auth errors
        except ValidationError as e:
            print(f"Validation error: {e}")
            return None  # Don't retry for validation errors
        except RateLimitError:
            if attempt < max_retries - 1:
                wait_time = 2 ** attempt  # Exponential backoff
                print(f"Rate limited. Retrying in {wait_time} seconds...")
                time.sleep(wait_time)
            else:
                print("Rate limit exceeded. Max retries reached.")
                return None
        except KeywordsAIError as e:
            print(f"API error (attempt {attempt + 1}): {e}")
            if attempt < max_retries - 1:
                time.sleep(1)
            else:
                return None
        except Exception as e:
            print(f"Unexpected error (attempt {attempt + 1}): {e}")
            if attempt < max_retries - 1:
                time.sleep(1)
            else:
                return None
    
    return None

# Use retry logic
updated = update_dataset_with_retry(
    "dataset_123456789",
    name="Retry Updated Dataset",
    metadata={"retry_update": True}
)

Validation Before Update

def validate_update_data(name=None, description=None, metadata=None):
    errors = []
    
    # Validate name
    if name is not None:
        if not isinstance(name, str):
            errors.append("Name must be a string")
        elif len(name.strip()) < 3:
            errors.append("Name must be at least 3 characters long")
        elif len(name) > 100:
            errors.append("Name must be less than 100 characters")
    
    # Validate description
    if description is not None:
        if not isinstance(description, str):
            errors.append("Description must be a string")
        elif len(description) > 1000:
            errors.append("Description must be less than 1000 characters")
    
    # Validate metadata
    if metadata is not None:
        if not isinstance(metadata, dict):
            errors.append("Metadata must be a dictionary")
        elif len(str(metadata)) > 5000:  # Rough size check
            errors.append("Metadata is too large")
        
        # Check for reserved keys
        reserved_keys = ["id", "created_at", "updated_at", "log_count"]
        for key in reserved_keys:
            if key in metadata:
                errors.append(f"Cannot update reserved metadata key: {key}")
    
    return errors

def update_dataset_safely(dataset_id, **updates):
    # Validate input
    errors = validate_update_data(**updates)
    if errors:
        print("Validation errors:")
        for error in errors:
            print(f"  - {error}")
        return None
    
    # Perform update
    try:
        updated_dataset = client.datasets.update(dataset_id, **updates)
        return updated_dataset
    except Exception as e:
        print(f"Update failed: {e}")
        return None

# Safe update with validation
updated = update_dataset_safely(
    "dataset_123456789",
    name="Validated Update",
    description="This update has been validated",
    metadata={"validated": True}
)

Best Practices

Atomic Updates

def atomic_dataset_update(dataset_id, updates_dict):
    """Perform all updates in a single API call to ensure atomicity."""
    try:
        # Get current dataset for backup
        original_dataset = client.datasets.get(dataset_id)
        
        # Perform update
        updated_dataset = client.datasets.update(dataset_id, **updates_dict)
        
        print(f"Atomic update successful for {original_dataset.name}")
        print(f"  Updated fields: {list(updates_dict.keys())}")
        
        return updated_dataset
        
    except Exception as e:
        print(f"Atomic update failed: {e}")
        print(f"Dataset remains in original state")
        return None

# Atomic update
updated = atomic_dataset_update(
    "dataset_123456789",
    {
        "name": "Atomically Updated Dataset",
        "description": "Updated atomically",
        "metadata": {"atomic_update": True, "update_timestamp": "2024-01-25"}
    }
)

Change Tracking

def update_with_changelog(dataset_id, changes_description, **updates):
    # Get current dataset
    current_dataset = client.datasets.get(dataset_id)
    
    # Prepare changelog metadata
    changelog_metadata = current_dataset.metadata.copy()
    
    # Add to existing metadata if provided
    if "metadata" in updates:
        changelog_metadata.update(updates["metadata"])
    
    # Add changelog information
    changelog_metadata.update({
        "last_change_description": changes_description,
        "last_change_timestamp": "2024-01-25T16:20:00Z",
        "change_count": changelog_metadata.get("change_count", 0) + 1
    })
    
    # Update the metadata in updates
    updates["metadata"] = changelog_metadata
    
    # Perform update
    updated_dataset = client.datasets.update(dataset_id, **updates)
    
    print(f"Update with changelog completed:")
    print(f"  Change: {changes_description}")
    print(f"  Total changes: {changelog_metadata['change_count']}")
    
    return updated_dataset

# Update with changelog
updated = update_with_changelog(
    "dataset_123456789",
    "Added quality annotations and updated categorization",
    name="Quality Annotated Dataset",
    metadata={"quality_annotations": True, "annotation_date": "2024-01-25"}
)

Backup Before Update

def update_with_backup(dataset_id, **updates):
    # Get current dataset as backup
    backup_dataset = client.datasets.get(dataset_id)
    
    # Store backup information
    backup_info = {
        "original_name": backup_dataset.name,
        "original_description": backup_dataset.description,
        "original_metadata": backup_dataset.metadata.copy(),
        "backup_timestamp": "2024-01-25T16:20:00Z"
    }
    
    try:
        # Add backup info to metadata if metadata is being updated
        if "metadata" in updates:
            updates["metadata"]["backup_info"] = backup_info
        else:
            # Create new metadata with backup info
            new_metadata = backup_dataset.metadata.copy()
            new_metadata["backup_info"] = backup_info
            updates["metadata"] = new_metadata
        
        # Perform update
        updated_dataset = client.datasets.update(dataset_id, **updates)
        
        print(f"Update with backup completed")
        print(f"  Backup stored in metadata")
        
        return updated_dataset
        
    except Exception as e:
        print(f"Update failed: {e}")
        print(f"Original dataset information preserved")
        return None

# Update with backup
updated = update_with_backup(
    "dataset_123456789",
    name="Backup Protected Update",
    description="Updated with backup protection"
)

Common Use Cases

Dataset Lifecycle Management

def promote_dataset_to_production(dataset_id):
    # Get current dataset
    dataset = client.datasets.get(dataset_id)
    
    # Validate dataset is ready for production
    if dataset.log_count < 100:
        print("Dataset too small for production")
        return None
    
    if dataset.metadata.get("quality_level") not in ["high", "premium"]:
        print("Dataset quality not sufficient for production")
        return None
    
    # Update for production
    production_metadata = dataset.metadata.copy()
    production_metadata.update({
        "environment": "production",
        "promoted_at": "2024-01-25T16:20:00Z",
        "promoted_by": "data_team",
        "production_ready": True,
        "status": "active"
    })
    
    updated_dataset = client.datasets.update(
        dataset_id=dataset_id,
        name=f"[PROD] {dataset.name}",
        metadata=production_metadata
    )
    
    print(f"Dataset promoted to production: {updated_dataset.name}")
    return updated_dataset

# Promote to production
production_dataset = promote_dataset_to_production("dataset_123456789")

Dataset Archival

def archive_dataset(dataset_id, archive_reason=None):
    # Get current dataset
    dataset = client.datasets.get(dataset_id)
    
    # Prepare archive metadata
    archive_metadata = dataset.metadata.copy()
    archive_metadata.update({
        "status": "archived",
        "archived_at": "2024-01-25T16:20:00Z",
        "archived_by": "system",
        "archive_reason": archive_reason or "Routine archival",
        "original_status": dataset.metadata.get("status", "active")
    })
    
    # Update dataset
    updated_dataset = client.datasets.update(
        dataset_id=dataset_id,
        name=f"[ARCHIVED] {dataset.name}",
        description=f"[ARCHIVED] {dataset.description}",
        metadata=archive_metadata
    )
    
    print(f"Dataset archived: {updated_dataset.name}")
    print(f"Reason: {archive_reason or 'Routine archival'}")
    
    return updated_dataset

# Archive dataset
archived = archive_dataset(
    "dataset_123456789",
    "Dataset no longer needed for current projects"
)