Overview

The list() method allows you to retrieve datasets with comprehensive filtering, sorting, and pagination capabilities. This is essential for managing and organizing your dataset collections.

Method Signature

Synchronous

client.datasets.list(
    limit: int = 50,
    offset: int = 0,
    name: str = None,
    created_after: str = None,
    created_before: str = None,
    metadata_filter: Dict[str, Any] = None,
    sort_by: str = "created_at",
    sort_order: str = "desc",
    **kwargs
) -> DatasetList

Asynchronous

await client.datasets.alist(
    limit: int = 50,
    offset: int = 0,
    name: str = None,
    created_after: str = None,
    created_before: str = None,
    metadata_filter: Dict[str, Any] = None,
    sort_by: str = "created_at",
    sort_order: str = "desc",
    **kwargs
) -> DatasetList

Parameters

limit
int
default:"50"
Maximum number of datasets to return (1-100).
offset
int
default:"0"
Number of datasets to skip for pagination.
name
str
Filter datasets by name (supports partial matching).
created_after
str
Filter datasets created after this timestamp (ISO 8601 format).
created_before
str
Filter datasets created before this timestamp (ISO 8601 format).
metadata_filter
Dict[str, Any]
Filter datasets by metadata key-value pairs.
sort_by
str
default:"created_at"
Field to sort by: created_at, updated_at, name, or log_count.
sort_order
str
default:"desc"
Sort order: asc (ascending) or desc (descending).

Returns

Returns a DatasetList object with the following structure:
{
    "data": [
        {
            "id": "dataset_123456789",
            "name": "Customer Support Dataset",
            "description": "Collection of customer support conversations",
            "log_count": 150,
            "created_at": "2024-01-15T10:30:00Z",
            "updated_at": "2024-01-20T14:45:00Z",
            "metadata": {
                "category": "support",
                "version": "1.0"
            }
        }
    ],
    "total": 25,
    "limit": 50,
    "offset": 0,
    "has_more": false
}

Examples

Basic Dataset Listing

from keywordsai import KeywordsAI

client = KeywordsAI(api_key="your-api-key")

# Get all datasets
datasets = client.datasets.list()

print(f"Total datasets: {datasets.total}")
print(f"Retrieved: {len(datasets.data)}")

for dataset in datasets.data:
    print(f"- {dataset.name} ({dataset.log_count} logs)")

Pagination

# Get first page
first_page = client.datasets.list(limit=10, offset=0)
print(f"First page: {len(first_page.data)} datasets")

# Get second page
second_page = client.datasets.list(limit=10, offset=10)
print(f"Second page: {len(second_page.data)} datasets")

# Iterate through all pages
def get_all_datasets():
    all_datasets = []
    offset = 0
    limit = 50
    
    while True:
        page = client.datasets.list(limit=limit, offset=offset)
        all_datasets.extend(page.data)
        
        if not page.has_more:
            break
        
        offset += limit
    
    return all_datasets

all_datasets = get_all_datasets()
print(f"Total datasets retrieved: {len(all_datasets)}")

Filtering by Name

# Search for datasets with "support" in the name
support_datasets = client.datasets.list(name="support")
print(f"Found {len(support_datasets.data)} support datasets")

# Search for specific dataset name
specific_dataset = client.datasets.list(name="Customer Support Training")
if specific_dataset.data:
    dataset = specific_dataset.data[0]
    print(f"Found dataset: {dataset.name}")
else:
    print("Dataset not found")

Date Range Filtering

from datetime import datetime, timedelta

# Get datasets created in the last 30 days
thirty_days_ago = (datetime.now() - timedelta(days=30)).isoformat()
recent_datasets = client.datasets.list(created_after=thirty_days_ago)

print(f"Datasets created in last 30 days: {len(recent_datasets.data)}")

# Get datasets created in January 2024
january_start = "2024-01-01T00:00:00Z"
january_end = "2024-02-01T00:00:00Z"

january_datasets = client.datasets.list(
    created_after=january_start,
    created_before=january_end
)

print(f"Datasets created in January 2024: {len(january_datasets.data)}")

Metadata Filtering

# Filter by single metadata field
training_datasets = client.datasets.list(
    metadata_filter={"purpose": "training"}
)

print(f"Training datasets: {len(training_datasets.data)}")

# Filter by multiple metadata fields
high_quality_support = client.datasets.list(
    metadata_filter={
        "category": "support",
        "quality_level": "high"
    }
)

print(f"High-quality support datasets: {len(high_quality_support.data)}")

# Complex metadata filtering
production_datasets = client.datasets.list(
    metadata_filter={
        "environment": "production",
        "status": "active"
    }
)

for dataset in production_datasets.data:
    print(f"Production dataset: {dataset.name}")
    print(f"  Logs: {dataset.log_count}")
    print(f"  Last updated: {dataset.updated_at}")

Sorting Options

# Sort by creation date (newest first)
newest_datasets = client.datasets.list(
    sort_by="created_at",
    sort_order="desc"
)

# Sort by name alphabetically
alphabetical_datasets = client.datasets.list(
    sort_by="name",
    sort_order="asc"
)

# Sort by log count (largest first)
largest_datasets = client.datasets.list(
    sort_by="log_count",
    sort_order="desc"
)

print("Largest datasets:")
for dataset in largest_datasets.data[:5]:  # Top 5
    print(f"  {dataset.name}: {dataset.log_count} logs")

# Sort by last update (most recently updated first)
recently_updated = client.datasets.list(
    sort_by="updated_at",
    sort_order="desc"
)

print("Recently updated datasets:")
for dataset in recently_updated.data[:3]:  # Top 3
    print(f"  {dataset.name}: {dataset.updated_at}")

Combined Filtering and Sorting

# Get recent training datasets, sorted by log count
recent_training = client.datasets.list(
    created_after="2024-01-01T00:00:00Z",
    metadata_filter={"purpose": "training"},
    sort_by="log_count",
    sort_order="desc",
    limit=20
)

print(f"Recent training datasets (by size):")
for dataset in recent_training.data:
    print(f"  {dataset.name}: {dataset.log_count} logs")

# Get support datasets with high quality, sorted by name
quality_support = client.datasets.list(
    name="support",
    metadata_filter={"quality_level": "high"},
    sort_by="name",
    sort_order="asc"
)

print(f"High-quality support datasets:")
for dataset in quality_support.data:
    print(f"  {dataset.name}")

Asynchronous Listing

import asyncio

async def list_datasets_async():
    client = KeywordsAI(api_key="your-api-key")
    
    # Basic async listing
    datasets = await client.datasets.alist(limit=20)
    
    print(f"Async retrieved {len(datasets.data)} datasets")
    
    for dataset in datasets.data:
        print(f"  {dataset.name}: {dataset.log_count} logs")
    
    return datasets

# Run async listing
datasets = asyncio.run(list_datasets_async())

Async Pagination

async def get_all_datasets_async():
    client = KeywordsAI(api_key="your-api-key")
    all_datasets = []
    offset = 0
    limit = 50
    
    while True:
        page = await client.datasets.alist(limit=limit, offset=offset)
        all_datasets.extend(page.data)
        
        print(f"Retrieved page with {len(page.data)} datasets")
        
        if not page.has_more:
            break
        
        offset += limit
    
    return all_datasets

# Get all datasets asynchronously
all_datasets = asyncio.run(get_all_datasets_async())
print(f"Total datasets: {len(all_datasets)}")

Concurrent Async Operations

async def get_datasets_by_category(categories):
    client = KeywordsAI(api_key="your-api-key")
    
    # Create tasks for each category
    tasks = []
    for category in categories:
        task = client.datasets.alist(
            metadata_filter={"category": category},
            limit=100
        )
        tasks.append(task)
    
    # Wait for all requests to complete
    results = await asyncio.gather(*tasks)
    
    # Organize results by category
    datasets_by_category = {}
    for i, category in enumerate(categories):
        datasets_by_category[category] = results[i].data
    
    return datasets_by_category

# Get datasets for multiple categories concurrently
categories = ["support", "training", "evaluation", "production"]
results = asyncio.run(get_datasets_by_category(categories))

for category, datasets in results.items():
    print(f"{category.title()}: {len(datasets)} datasets")

Advanced Use Cases

Dataset Analytics

def analyze_datasets():
    # Get all datasets
    all_datasets = client.datasets.list(limit=1000)
    
    # Calculate statistics
    total_logs = sum(d.log_count for d in all_datasets.data)
    avg_logs = total_logs / len(all_datasets.data) if all_datasets.data else 0
    
    # Group by category
    categories = {}
    for dataset in all_datasets.data:
        category = dataset.metadata.get("category", "uncategorized")
        if category not in categories:
            categories[category] = []
        categories[category].append(dataset)
    
    print(f"Dataset Analytics:")
    print(f"  Total datasets: {len(all_datasets.data)}")
    print(f"  Total logs: {total_logs:,}")
    print(f"  Average logs per dataset: {avg_logs:.1f}")
    print(f"  Categories:")
    
    for category, datasets in categories.items():
        category_logs = sum(d.log_count for d in datasets)
        print(f"    {category}: {len(datasets)} datasets, {category_logs:,} logs")

analyze_datasets()

Dataset Search and Discovery

def search_datasets(query, search_fields=None):
    if search_fields is None:
        search_fields = ["name", "description"]
    
    # Search by name
    name_results = client.datasets.list(name=query) if "name" in search_fields else None
    
    # For description search, we need to get all datasets and filter
    # (assuming API doesn't support description search directly)
    all_datasets = client.datasets.list(limit=1000)
    description_results = []
    
    if "description" in search_fields:
        for dataset in all_datasets.data:
            if dataset.description and query.lower() in dataset.description.lower():
                description_results.append(dataset)
    
    # Combine and deduplicate results
    found_datasets = []
    seen_ids = set()
    
    if name_results:
        for dataset in name_results.data:
            if dataset.id not in seen_ids:
                found_datasets.append(dataset)
                seen_ids.add(dataset.id)
    
    for dataset in description_results:
        if dataset.id not in seen_ids:
            found_datasets.append(dataset)
            seen_ids.add(dataset.id)
    
    return found_datasets

# Search for datasets
results = search_datasets("customer support")
print(f"Found {len(results)} datasets matching 'customer support'")

for dataset in results:
    print(f"  {dataset.name}: {dataset.description[:100]}...")

Dataset Monitoring

def monitor_dataset_growth():
    from datetime import datetime, timedelta
    
    # Get datasets created in different time periods
    now = datetime.now()
    periods = {
        "last_24h": now - timedelta(hours=24),
        "last_week": now - timedelta(weeks=1),
        "last_month": now - timedelta(days=30)
    }
    
    growth_stats = {}
    
    for period_name, start_time in periods.items():
        datasets = client.datasets.list(
            created_after=start_time.isoformat(),
            limit=1000
        )
        
        growth_stats[period_name] = {
            "count": len(datasets.data),
            "total_logs": sum(d.log_count for d in datasets.data)
        }
    
    print("Dataset Growth Monitoring:")
    for period, stats in growth_stats.items():
        print(f"  {period}: {stats['count']} datasets, {stats['total_logs']} logs")
    
    return growth_stats

# Monitor growth
growth = monitor_dataset_growth()

Batch Operations

def process_datasets_in_batches(batch_size=10):
    offset = 0
    processed_count = 0
    
    while True:
        # Get batch of datasets
        batch = client.datasets.list(limit=batch_size, offset=offset)
        
        if not batch.data:
            break
        
        # Process each dataset in the batch
        for dataset in batch.data:
            # Example processing: check if dataset needs updating
            if dataset.log_count > 1000 and not dataset.metadata.get("large_dataset"):
                print(f"Large dataset detected: {dataset.name} ({dataset.log_count} logs)")
                # Could update metadata here
            
            processed_count += 1
        
        print(f"Processed batch: {len(batch.data)} datasets")
        
        if not batch.has_more:
            break
        
        offset += batch_size
    
    print(f"Total datasets processed: {processed_count}")

# Process all datasets in batches
process_datasets_in_batches()

Error Handling

Basic Error Handling

from keywordsai.exceptions import ValidationError, RateLimitError

def list_datasets_safely(**kwargs):
    try:
        datasets = client.datasets.list(**kwargs)
        return datasets
    except ValidationError as e:
        print(f"Validation error: {e}")
        return None
    except RateLimitError:
        print("Rate limit exceeded. Please retry later.")
        return None
    except Exception as e:
        print(f"Unexpected error: {e}")
        return None

# Use safe listing
datasets = list_datasets_safely(limit=50, sort_by="created_at")
if datasets:
    print(f"Retrieved {len(datasets.data)} datasets")
else:
    print("Failed to retrieve datasets")

Retry Logic for Listing

import time

def list_datasets_with_retry(max_retries=3, **kwargs):
    for attempt in range(max_retries):
        try:
            datasets = client.datasets.list(**kwargs)
            return datasets
        except RateLimitError:
            if attempt < max_retries - 1:
                wait_time = 2 ** attempt
                print(f"Rate limited. Retrying in {wait_time} seconds...")
                time.sleep(wait_time)
            else:
                print("Rate limit exceeded. Max retries reached.")
                return None
        except Exception as e:
            print(f"Error (attempt {attempt + 1}): {e}")
            if attempt < max_retries - 1:
                time.sleep(1)
            else:
                return None
    
    return None

# Use retry logic
datasets = list_datasets_with_retry(limit=100, sort_by="log_count")

Best Practices

Efficient Pagination

def efficient_dataset_iteration(process_func, batch_size=50):
    """Efficiently iterate through all datasets with a processing function."""
    offset = 0
    total_processed = 0
    
    while True:
        try:
            batch = client.datasets.list(limit=batch_size, offset=offset)
            
            if not batch.data:
                break
            
            # Process batch
            for dataset in batch.data:
                process_func(dataset)
                total_processed += 1
            
            print(f"Processed {len(batch.data)} datasets (total: {total_processed})")
            
            if not batch.has_more:
                break
            
            offset += batch_size
            
        except Exception as e:
            print(f"Error processing batch at offset {offset}: {e}")
            break
    
    return total_processed

# Example usage
def analyze_dataset(dataset):
    if dataset.log_count > 500:
        print(f"Large dataset: {dataset.name} ({dataset.log_count} logs)")

total = efficient_dataset_iteration(analyze_dataset)
print(f"Analyzed {total} datasets")

Caching Results

from functools import lru_cache
from datetime import datetime, timedelta

class DatasetCache:
    def __init__(self, cache_duration_minutes=5):
        self.cache_duration = timedelta(minutes=cache_duration_minutes)
        self._cache = {}
    
    def _cache_key(self, **kwargs):
        return str(sorted(kwargs.items()))
    
    def get_datasets(self, **kwargs):
        cache_key = self._cache_key(**kwargs)
        now = datetime.now()
        
        # Check cache
        if cache_key in self._cache:
            cached_data, cached_time = self._cache[cache_key]
            if now - cached_time < self.cache_duration:
                print(f"Cache hit for key: {cache_key[:50]}...")
                return cached_data
        
        # Fetch from API
        print(f"Cache miss, fetching from API...")
        datasets = client.datasets.list(**kwargs)
        
        # Store in cache
        self._cache[cache_key] = (datasets, now)
        
        return datasets
    
    def clear_cache(self):
        self._cache.clear()

# Use caching
cache = DatasetCache(cache_duration_minutes=10)

# First call - fetches from API
datasets1 = cache.get_datasets(limit=50, sort_by="created_at")

# Second call - uses cache
datasets2 = cache.get_datasets(limit=50, sort_by="created_at")

Performance Monitoring

import time
from contextlib import contextmanager

@contextmanager
def monitor_performance(operation_name):
    start_time = time.time()
    try:
        yield
    finally:
        end_time = time.time()
        duration = end_time - start_time
        print(f"{operation_name} took {duration:.2f} seconds")

# Monitor dataset listing performance
with monitor_performance("Dataset listing"):
    datasets = client.datasets.list(limit=100)
    print(f"Retrieved {len(datasets.data)} datasets")

with monitor_performance("Filtered dataset listing"):
    filtered_datasets = client.datasets.list(
        metadata_filter={"category": "support"},
        limit=100
    )
    print(f"Retrieved {len(filtered_datasets.data)} filtered datasets")

Common Use Cases

Dataset Discovery Dashboard

def create_dataset_dashboard():
    # Get overview statistics
    all_datasets = client.datasets.list(limit=1000)
    
    # Recent activity
    recent = client.datasets.list(
        created_after=(datetime.now() - timedelta(days=7)).isoformat(),
        sort_by="created_at",
        sort_order="desc"
    )
    
    # Largest datasets
    largest = client.datasets.list(
        sort_by="log_count",
        sort_order="desc",
        limit=10
    )
    
    print("=== Dataset Dashboard ===")
    print(f"Total Datasets: {all_datasets.total}")
    print(f"Recent (7 days): {len(recent.data)}")
    print(f"Total Logs: {sum(d.log_count for d in all_datasets.data):,}")
    
    print("\nLargest Datasets:")
    for i, dataset in enumerate(largest.data, 1):
        print(f"  {i}. {dataset.name}: {dataset.log_count:,} logs")
    
    print("\nRecent Activity:")
    for dataset in recent.data[:5]:
        print(f"  {dataset.name} - {dataset.created_at}")

create_dataset_dashboard()

Dataset Cleanup

def find_empty_datasets():
    empty_datasets = client.datasets.list(
        sort_by="log_count",
        sort_order="asc",
        limit=1000
    )
    
    empty = [d for d in empty_datasets.data if d.log_count == 0]
    
    print(f"Found {len(empty)} empty datasets:")
    for dataset in empty:
        age_days = (datetime.now() - datetime.fromisoformat(dataset.created_at.replace('Z', '+00:00'))).days
        print(f"  {dataset.name} - Created {age_days} days ago")
    
    return empty

# Find datasets that might need cleanup
empty_datasets = find_empty_datasets()

Dataset Compliance Check

def check_dataset_compliance():
    all_datasets = client.datasets.list(limit=1000)
    
    compliance_issues = []
    
    for dataset in all_datasets.data:
        issues = []
        
        # Check for required metadata
        required_fields = ["category", "purpose", "owner"]
        for field in required_fields:
            if not dataset.metadata.get(field):
                issues.append(f"Missing {field}")
        
        # Check naming convention
        if not dataset.name.replace(" ", "").replace("-", "").replace("_", "").isalnum():
            issues.append("Invalid characters in name")
        
        # Check description length
        if not dataset.description or len(dataset.description) < 10:
            issues.append("Description too short")
        
        if issues:
            compliance_issues.append({
                "dataset": dataset,
                "issues": issues
            })
    
    print(f"Compliance Check Results:")
    print(f"  Total datasets: {len(all_datasets.data)}")
    print(f"  Datasets with issues: {len(compliance_issues)}")
    
    for item in compliance_issues[:10]:  # Show first 10
        print(f"\n  {item['dataset'].name}:")
        for issue in item['issues']:
            print(f"    - {issue}")
    
    return compliance_issues

# Run compliance check
compliance_issues = check_dataset_compliance()