Overview
Thelist()
method allows you to retrieve datasets with comprehensive filtering, sorting, and pagination capabilities. This is essential for managing and organizing your dataset collections.
Method Signature
Synchronous
Copy
client.datasets.list(
limit: int = 50,
offset: int = 0,
name: str = None,
created_after: str = None,
created_before: str = None,
metadata_filter: Dict[str, Any] = None,
sort_by: str = "created_at",
sort_order: str = "desc",
**kwargs
) -> DatasetList
Asynchronous
Copy
await client.datasets.alist(
limit: int = 50,
offset: int = 0,
name: str = None,
created_after: str = None,
created_before: str = None,
metadata_filter: Dict[str, Any] = None,
sort_by: str = "created_at",
sort_order: str = "desc",
**kwargs
) -> DatasetList
Parameters
Maximum number of datasets to return (1-100).
Number of datasets to skip for pagination.
Filter datasets by name (supports partial matching).
Filter datasets created after this timestamp (ISO 8601 format).
Filter datasets created before this timestamp (ISO 8601 format).
Filter datasets by metadata key-value pairs.
Field to sort by:
created_at
, updated_at
, name
, or log_count
.Sort order:
asc
(ascending) or desc
(descending).Returns
Returns aDatasetList
object with the following structure:
Copy
{
"data": [
{
"id": "dataset_123456789",
"name": "Customer Support Dataset",
"description": "Collection of customer support conversations",
"log_count": 150,
"created_at": "2024-01-15T10:30:00Z",
"updated_at": "2024-01-20T14:45:00Z",
"metadata": {
"category": "support",
"version": "1.0"
}
}
],
"total": 25,
"limit": 50,
"offset": 0,
"has_more": false
}
Examples
Basic Dataset Listing
Copy
from keywordsai import KeywordsAI
client = KeywordsAI(api_key="your-api-key")
# Get all datasets
datasets = client.datasets.list()
print(f"Total datasets: {datasets.total}")
print(f"Retrieved: {len(datasets.data)}")
for dataset in datasets.data:
print(f"- {dataset.name} ({dataset.log_count} logs)")
Pagination
Copy
# Get first page
first_page = client.datasets.list(limit=10, offset=0)
print(f"First page: {len(first_page.data)} datasets")
# Get second page
second_page = client.datasets.list(limit=10, offset=10)
print(f"Second page: {len(second_page.data)} datasets")
# Iterate through all pages
def get_all_datasets():
all_datasets = []
offset = 0
limit = 50
while True:
page = client.datasets.list(limit=limit, offset=offset)
all_datasets.extend(page.data)
if not page.has_more:
break
offset += limit
return all_datasets
all_datasets = get_all_datasets()
print(f"Total datasets retrieved: {len(all_datasets)}")
Filtering by Name
Copy
# Search for datasets with "support" in the name
support_datasets = client.datasets.list(name="support")
print(f"Found {len(support_datasets.data)} support datasets")
# Search for specific dataset name
specific_dataset = client.datasets.list(name="Customer Support Training")
if specific_dataset.data:
dataset = specific_dataset.data[0]
print(f"Found dataset: {dataset.name}")
else:
print("Dataset not found")
Date Range Filtering
Copy
from datetime import datetime, timedelta
# Get datasets created in the last 30 days
thirty_days_ago = (datetime.now() - timedelta(days=30)).isoformat()
recent_datasets = client.datasets.list(created_after=thirty_days_ago)
print(f"Datasets created in last 30 days: {len(recent_datasets.data)}")
# Get datasets created in January 2024
january_start = "2024-01-01T00:00:00Z"
january_end = "2024-02-01T00:00:00Z"
january_datasets = client.datasets.list(
created_after=january_start,
created_before=january_end
)
print(f"Datasets created in January 2024: {len(january_datasets.data)}")
Metadata Filtering
Copy
# Filter by single metadata field
training_datasets = client.datasets.list(
metadata_filter={"purpose": "training"}
)
print(f"Training datasets: {len(training_datasets.data)}")
# Filter by multiple metadata fields
high_quality_support = client.datasets.list(
metadata_filter={
"category": "support",
"quality_level": "high"
}
)
print(f"High-quality support datasets: {len(high_quality_support.data)}")
# Complex metadata filtering
production_datasets = client.datasets.list(
metadata_filter={
"environment": "production",
"status": "active"
}
)
for dataset in production_datasets.data:
print(f"Production dataset: {dataset.name}")
print(f" Logs: {dataset.log_count}")
print(f" Last updated: {dataset.updated_at}")
Sorting Options
Copy
# Sort by creation date (newest first)
newest_datasets = client.datasets.list(
sort_by="created_at",
sort_order="desc"
)
# Sort by name alphabetically
alphabetical_datasets = client.datasets.list(
sort_by="name",
sort_order="asc"
)
# Sort by log count (largest first)
largest_datasets = client.datasets.list(
sort_by="log_count",
sort_order="desc"
)
print("Largest datasets:")
for dataset in largest_datasets.data[:5]: # Top 5
print(f" {dataset.name}: {dataset.log_count} logs")
# Sort by last update (most recently updated first)
recently_updated = client.datasets.list(
sort_by="updated_at",
sort_order="desc"
)
print("Recently updated datasets:")
for dataset in recently_updated.data[:3]: # Top 3
print(f" {dataset.name}: {dataset.updated_at}")
Combined Filtering and Sorting
Copy
# Get recent training datasets, sorted by log count
recent_training = client.datasets.list(
created_after="2024-01-01T00:00:00Z",
metadata_filter={"purpose": "training"},
sort_by="log_count",
sort_order="desc",
limit=20
)
print(f"Recent training datasets (by size):")
for dataset in recent_training.data:
print(f" {dataset.name}: {dataset.log_count} logs")
# Get support datasets with high quality, sorted by name
quality_support = client.datasets.list(
name="support",
metadata_filter={"quality_level": "high"},
sort_by="name",
sort_order="asc"
)
print(f"High-quality support datasets:")
for dataset in quality_support.data:
print(f" {dataset.name}")
Asynchronous Listing
Copy
import asyncio
async def list_datasets_async():
client = KeywordsAI(api_key="your-api-key")
# Basic async listing
datasets = await client.datasets.alist(limit=20)
print(f"Async retrieved {len(datasets.data)} datasets")
for dataset in datasets.data:
print(f" {dataset.name}: {dataset.log_count} logs")
return datasets
# Run async listing
datasets = asyncio.run(list_datasets_async())
Async Pagination
Copy
async def get_all_datasets_async():
client = KeywordsAI(api_key="your-api-key")
all_datasets = []
offset = 0
limit = 50
while True:
page = await client.datasets.alist(limit=limit, offset=offset)
all_datasets.extend(page.data)
print(f"Retrieved page with {len(page.data)} datasets")
if not page.has_more:
break
offset += limit
return all_datasets
# Get all datasets asynchronously
all_datasets = asyncio.run(get_all_datasets_async())
print(f"Total datasets: {len(all_datasets)}")
Concurrent Async Operations
Copy
async def get_datasets_by_category(categories):
client = KeywordsAI(api_key="your-api-key")
# Create tasks for each category
tasks = []
for category in categories:
task = client.datasets.alist(
metadata_filter={"category": category},
limit=100
)
tasks.append(task)
# Wait for all requests to complete
results = await asyncio.gather(*tasks)
# Organize results by category
datasets_by_category = {}
for i, category in enumerate(categories):
datasets_by_category[category] = results[i].data
return datasets_by_category
# Get datasets for multiple categories concurrently
categories = ["support", "training", "evaluation", "production"]
results = asyncio.run(get_datasets_by_category(categories))
for category, datasets in results.items():
print(f"{category.title()}: {len(datasets)} datasets")
Advanced Use Cases
Dataset Analytics
Copy
def analyze_datasets():
# Get all datasets
all_datasets = client.datasets.list(limit=1000)
# Calculate statistics
total_logs = sum(d.log_count for d in all_datasets.data)
avg_logs = total_logs / len(all_datasets.data) if all_datasets.data else 0
# Group by category
categories = {}
for dataset in all_datasets.data:
category = dataset.metadata.get("category", "uncategorized")
if category not in categories:
categories[category] = []
categories[category].append(dataset)
print(f"Dataset Analytics:")
print(f" Total datasets: {len(all_datasets.data)}")
print(f" Total logs: {total_logs:,}")
print(f" Average logs per dataset: {avg_logs:.1f}")
print(f" Categories:")
for category, datasets in categories.items():
category_logs = sum(d.log_count for d in datasets)
print(f" {category}: {len(datasets)} datasets, {category_logs:,} logs")
analyze_datasets()
Dataset Search and Discovery
Copy
def search_datasets(query, search_fields=None):
if search_fields is None:
search_fields = ["name", "description"]
# Search by name
name_results = client.datasets.list(name=query) if "name" in search_fields else None
# For description search, we need to get all datasets and filter
# (assuming API doesn't support description search directly)
all_datasets = client.datasets.list(limit=1000)
description_results = []
if "description" in search_fields:
for dataset in all_datasets.data:
if dataset.description and query.lower() in dataset.description.lower():
description_results.append(dataset)
# Combine and deduplicate results
found_datasets = []
seen_ids = set()
if name_results:
for dataset in name_results.data:
if dataset.id not in seen_ids:
found_datasets.append(dataset)
seen_ids.add(dataset.id)
for dataset in description_results:
if dataset.id not in seen_ids:
found_datasets.append(dataset)
seen_ids.add(dataset.id)
return found_datasets
# Search for datasets
results = search_datasets("customer support")
print(f"Found {len(results)} datasets matching 'customer support'")
for dataset in results:
print(f" {dataset.name}: {dataset.description[:100]}...")
Dataset Monitoring
Copy
def monitor_dataset_growth():
from datetime import datetime, timedelta
# Get datasets created in different time periods
now = datetime.now()
periods = {
"last_24h": now - timedelta(hours=24),
"last_week": now - timedelta(weeks=1),
"last_month": now - timedelta(days=30)
}
growth_stats = {}
for period_name, start_time in periods.items():
datasets = client.datasets.list(
created_after=start_time.isoformat(),
limit=1000
)
growth_stats[period_name] = {
"count": len(datasets.data),
"total_logs": sum(d.log_count for d in datasets.data)
}
print("Dataset Growth Monitoring:")
for period, stats in growth_stats.items():
print(f" {period}: {stats['count']} datasets, {stats['total_logs']} logs")
return growth_stats
# Monitor growth
growth = monitor_dataset_growth()
Batch Operations
Copy
def process_datasets_in_batches(batch_size=10):
offset = 0
processed_count = 0
while True:
# Get batch of datasets
batch = client.datasets.list(limit=batch_size, offset=offset)
if not batch.data:
break
# Process each dataset in the batch
for dataset in batch.data:
# Example processing: check if dataset needs updating
if dataset.log_count > 1000 and not dataset.metadata.get("large_dataset"):
print(f"Large dataset detected: {dataset.name} ({dataset.log_count} logs)")
# Could update metadata here
processed_count += 1
print(f"Processed batch: {len(batch.data)} datasets")
if not batch.has_more:
break
offset += batch_size
print(f"Total datasets processed: {processed_count}")
# Process all datasets in batches
process_datasets_in_batches()
Error Handling
Basic Error Handling
Copy
from keywordsai.exceptions import ValidationError, RateLimitError
def list_datasets_safely(**kwargs):
try:
datasets = client.datasets.list(**kwargs)
return datasets
except ValidationError as e:
print(f"Validation error: {e}")
return None
except RateLimitError:
print("Rate limit exceeded. Please retry later.")
return None
except Exception as e:
print(f"Unexpected error: {e}")
return None
# Use safe listing
datasets = list_datasets_safely(limit=50, sort_by="created_at")
if datasets:
print(f"Retrieved {len(datasets.data)} datasets")
else:
print("Failed to retrieve datasets")
Retry Logic for Listing
Copy
import time
def list_datasets_with_retry(max_retries=3, **kwargs):
for attempt in range(max_retries):
try:
datasets = client.datasets.list(**kwargs)
return datasets
except RateLimitError:
if attempt < max_retries - 1:
wait_time = 2 ** attempt
print(f"Rate limited. Retrying in {wait_time} seconds...")
time.sleep(wait_time)
else:
print("Rate limit exceeded. Max retries reached.")
return None
except Exception as e:
print(f"Error (attempt {attempt + 1}): {e}")
if attempt < max_retries - 1:
time.sleep(1)
else:
return None
return None
# Use retry logic
datasets = list_datasets_with_retry(limit=100, sort_by="log_count")
Best Practices
Efficient Pagination
Copy
def efficient_dataset_iteration(process_func, batch_size=50):
"""Efficiently iterate through all datasets with a processing function."""
offset = 0
total_processed = 0
while True:
try:
batch = client.datasets.list(limit=batch_size, offset=offset)
if not batch.data:
break
# Process batch
for dataset in batch.data:
process_func(dataset)
total_processed += 1
print(f"Processed {len(batch.data)} datasets (total: {total_processed})")
if not batch.has_more:
break
offset += batch_size
except Exception as e:
print(f"Error processing batch at offset {offset}: {e}")
break
return total_processed
# Example usage
def analyze_dataset(dataset):
if dataset.log_count > 500:
print(f"Large dataset: {dataset.name} ({dataset.log_count} logs)")
total = efficient_dataset_iteration(analyze_dataset)
print(f"Analyzed {total} datasets")
Caching Results
Copy
from functools import lru_cache
from datetime import datetime, timedelta
class DatasetCache:
def __init__(self, cache_duration_minutes=5):
self.cache_duration = timedelta(minutes=cache_duration_minutes)
self._cache = {}
def _cache_key(self, **kwargs):
return str(sorted(kwargs.items()))
def get_datasets(self, **kwargs):
cache_key = self._cache_key(**kwargs)
now = datetime.now()
# Check cache
if cache_key in self._cache:
cached_data, cached_time = self._cache[cache_key]
if now - cached_time < self.cache_duration:
print(f"Cache hit for key: {cache_key[:50]}...")
return cached_data
# Fetch from API
print(f"Cache miss, fetching from API...")
datasets = client.datasets.list(**kwargs)
# Store in cache
self._cache[cache_key] = (datasets, now)
return datasets
def clear_cache(self):
self._cache.clear()
# Use caching
cache = DatasetCache(cache_duration_minutes=10)
# First call - fetches from API
datasets1 = cache.get_datasets(limit=50, sort_by="created_at")
# Second call - uses cache
datasets2 = cache.get_datasets(limit=50, sort_by="created_at")
Performance Monitoring
Copy
import time
from contextlib import contextmanager
@contextmanager
def monitor_performance(operation_name):
start_time = time.time()
try:
yield
finally:
end_time = time.time()
duration = end_time - start_time
print(f"{operation_name} took {duration:.2f} seconds")
# Monitor dataset listing performance
with monitor_performance("Dataset listing"):
datasets = client.datasets.list(limit=100)
print(f"Retrieved {len(datasets.data)} datasets")
with monitor_performance("Filtered dataset listing"):
filtered_datasets = client.datasets.list(
metadata_filter={"category": "support"},
limit=100
)
print(f"Retrieved {len(filtered_datasets.data)} filtered datasets")
Common Use Cases
Dataset Discovery Dashboard
Copy
def create_dataset_dashboard():
# Get overview statistics
all_datasets = client.datasets.list(limit=1000)
# Recent activity
recent = client.datasets.list(
created_after=(datetime.now() - timedelta(days=7)).isoformat(),
sort_by="created_at",
sort_order="desc"
)
# Largest datasets
largest = client.datasets.list(
sort_by="log_count",
sort_order="desc",
limit=10
)
print("=== Dataset Dashboard ===")
print(f"Total Datasets: {all_datasets.total}")
print(f"Recent (7 days): {len(recent.data)}")
print(f"Total Logs: {sum(d.log_count for d in all_datasets.data):,}")
print("\nLargest Datasets:")
for i, dataset in enumerate(largest.data, 1):
print(f" {i}. {dataset.name}: {dataset.log_count:,} logs")
print("\nRecent Activity:")
for dataset in recent.data[:5]:
print(f" {dataset.name} - {dataset.created_at}")
create_dataset_dashboard()
Dataset Cleanup
Copy
def find_empty_datasets():
empty_datasets = client.datasets.list(
sort_by="log_count",
sort_order="asc",
limit=1000
)
empty = [d for d in empty_datasets.data if d.log_count == 0]
print(f"Found {len(empty)} empty datasets:")
for dataset in empty:
age_days = (datetime.now() - datetime.fromisoformat(dataset.created_at.replace('Z', '+00:00'))).days
print(f" {dataset.name} - Created {age_days} days ago")
return empty
# Find datasets that might need cleanup
empty_datasets = find_empty_datasets()
Dataset Compliance Check
Copy
def check_dataset_compliance():
all_datasets = client.datasets.list(limit=1000)
compliance_issues = []
for dataset in all_datasets.data:
issues = []
# Check for required metadata
required_fields = ["category", "purpose", "owner"]
for field in required_fields:
if not dataset.metadata.get(field):
issues.append(f"Missing {field}")
# Check naming convention
if not dataset.name.replace(" ", "").replace("-", "").replace("_", "").isalnum():
issues.append("Invalid characters in name")
# Check description length
if not dataset.description or len(dataset.description) < 10:
issues.append("Description too short")
if issues:
compliance_issues.append({
"dataset": dataset,
"issues": issues
})
print(f"Compliance Check Results:")
print(f" Total datasets: {len(all_datasets.data)}")
print(f" Datasets with issues: {len(compliance_issues)}")
for item in compliance_issues[:10]: # Show first 10
print(f"\n {item['dataset'].name}:")
for issue in item['issues']:
print(f" - {issue}")
return compliance_issues
# Run compliance check
compliance_issues = check_dataset_compliance()