Retrieve and filter datasets with pagination and sorting options
list()
method allows you to retrieve datasets with comprehensive filtering, sorting, and pagination capabilities. This is essential for managing and organizing your dataset collections.
client.datasets.list(
limit: int = 50,
offset: int = 0,
name: str = None,
created_after: str = None,
created_before: str = None,
metadata_filter: Dict[str, Any] = None,
sort_by: str = "created_at",
sort_order: str = "desc",
**kwargs
) -> DatasetList
await client.datasets.alist(
limit: int = 50,
offset: int = 0,
name: str = None,
created_after: str = None,
created_before: str = None,
metadata_filter: Dict[str, Any] = None,
sort_by: str = "created_at",
sort_order: str = "desc",
**kwargs
) -> DatasetList
created_at
, updated_at
, name
, or log_count
.asc
(ascending) or desc
(descending).DatasetList
object with the following structure:
{
"data": [
{
"id": "dataset_123456789",
"name": "Customer Support Dataset",
"description": "Collection of customer support conversations",
"log_count": 150,
"created_at": "2024-01-15T10:30:00Z",
"updated_at": "2024-01-20T14:45:00Z",
"metadata": {
"category": "support",
"version": "1.0"
}
}
],
"total": 25,
"limit": 50,
"offset": 0,
"has_more": false
}
from keywordsai import KeywordsAI
client = KeywordsAI(api_key="your-api-key")
# Get all datasets
datasets = client.datasets.list()
print(f"Total datasets: {datasets.total}")
print(f"Retrieved: {len(datasets.data)}")
for dataset in datasets.data:
print(f"- {dataset.name} ({dataset.log_count} logs)")
# Get first page
first_page = client.datasets.list(limit=10, offset=0)
print(f"First page: {len(first_page.data)} datasets")
# Get second page
second_page = client.datasets.list(limit=10, offset=10)
print(f"Second page: {len(second_page.data)} datasets")
# Iterate through all pages
def get_all_datasets():
all_datasets = []
offset = 0
limit = 50
while True:
page = client.datasets.list(limit=limit, offset=offset)
all_datasets.extend(page.data)
if not page.has_more:
break
offset += limit
return all_datasets
all_datasets = get_all_datasets()
print(f"Total datasets retrieved: {len(all_datasets)}")
# Search for datasets with "support" in the name
support_datasets = client.datasets.list(name="support")
print(f"Found {len(support_datasets.data)} support datasets")
# Search for specific dataset name
specific_dataset = client.datasets.list(name="Customer Support Training")
if specific_dataset.data:
dataset = specific_dataset.data[0]
print(f"Found dataset: {dataset.name}")
else:
print("Dataset not found")
from datetime import datetime, timedelta
# Get datasets created in the last 30 days
thirty_days_ago = (datetime.now() - timedelta(days=30)).isoformat()
recent_datasets = client.datasets.list(created_after=thirty_days_ago)
print(f"Datasets created in last 30 days: {len(recent_datasets.data)}")
# Get datasets created in January 2024
january_start = "2024-01-01T00:00:00Z"
january_end = "2024-02-01T00:00:00Z"
january_datasets = client.datasets.list(
created_after=january_start,
created_before=january_end
)
print(f"Datasets created in January 2024: {len(january_datasets.data)}")
# Filter by single metadata field
training_datasets = client.datasets.list(
metadata_filter={"purpose": "training"}
)
print(f"Training datasets: {len(training_datasets.data)}")
# Filter by multiple metadata fields
high_quality_support = client.datasets.list(
metadata_filter={
"category": "support",
"quality_level": "high"
}
)
print(f"High-quality support datasets: {len(high_quality_support.data)}")
# Complex metadata filtering
production_datasets = client.datasets.list(
metadata_filter={
"environment": "production",
"status": "active"
}
)
for dataset in production_datasets.data:
print(f"Production dataset: {dataset.name}")
print(f" Logs: {dataset.log_count}")
print(f" Last updated: {dataset.updated_at}")
# Sort by creation date (newest first)
newest_datasets = client.datasets.list(
sort_by="created_at",
sort_order="desc"
)
# Sort by name alphabetically
alphabetical_datasets = client.datasets.list(
sort_by="name",
sort_order="asc"
)
# Sort by log count (largest first)
largest_datasets = client.datasets.list(
sort_by="log_count",
sort_order="desc"
)
print("Largest datasets:")
for dataset in largest_datasets.data[:5]: # Top 5
print(f" {dataset.name}: {dataset.log_count} logs")
# Sort by last update (most recently updated first)
recently_updated = client.datasets.list(
sort_by="updated_at",
sort_order="desc"
)
print("Recently updated datasets:")
for dataset in recently_updated.data[:3]: # Top 3
print(f" {dataset.name}: {dataset.updated_at}")
# Get recent training datasets, sorted by log count
recent_training = client.datasets.list(
created_after="2024-01-01T00:00:00Z",
metadata_filter={"purpose": "training"},
sort_by="log_count",
sort_order="desc",
limit=20
)
print(f"Recent training datasets (by size):")
for dataset in recent_training.data:
print(f" {dataset.name}: {dataset.log_count} logs")
# Get support datasets with high quality, sorted by name
quality_support = client.datasets.list(
name="support",
metadata_filter={"quality_level": "high"},
sort_by="name",
sort_order="asc"
)
print(f"High-quality support datasets:")
for dataset in quality_support.data:
print(f" {dataset.name}")
import asyncio
async def list_datasets_async():
client = KeywordsAI(api_key="your-api-key")
# Basic async listing
datasets = await client.datasets.alist(limit=20)
print(f"Async retrieved {len(datasets.data)} datasets")
for dataset in datasets.data:
print(f" {dataset.name}: {dataset.log_count} logs")
return datasets
# Run async listing
datasets = asyncio.run(list_datasets_async())
async def get_all_datasets_async():
client = KeywordsAI(api_key="your-api-key")
all_datasets = []
offset = 0
limit = 50
while True:
page = await client.datasets.alist(limit=limit, offset=offset)
all_datasets.extend(page.data)
print(f"Retrieved page with {len(page.data)} datasets")
if not page.has_more:
break
offset += limit
return all_datasets
# Get all datasets asynchronously
all_datasets = asyncio.run(get_all_datasets_async())
print(f"Total datasets: {len(all_datasets)}")
async def get_datasets_by_category(categories):
client = KeywordsAI(api_key="your-api-key")
# Create tasks for each category
tasks = []
for category in categories:
task = client.datasets.alist(
metadata_filter={"category": category},
limit=100
)
tasks.append(task)
# Wait for all requests to complete
results = await asyncio.gather(*tasks)
# Organize results by category
datasets_by_category = {}
for i, category in enumerate(categories):
datasets_by_category[category] = results[i].data
return datasets_by_category
# Get datasets for multiple categories concurrently
categories = ["support", "training", "evaluation", "production"]
results = asyncio.run(get_datasets_by_category(categories))
for category, datasets in results.items():
print(f"{category.title()}: {len(datasets)} datasets")
def analyze_datasets():
# Get all datasets
all_datasets = client.datasets.list(limit=1000)
# Calculate statistics
total_logs = sum(d.log_count for d in all_datasets.data)
avg_logs = total_logs / len(all_datasets.data) if all_datasets.data else 0
# Group by category
categories = {}
for dataset in all_datasets.data:
category = dataset.metadata.get("category", "uncategorized")
if category not in categories:
categories[category] = []
categories[category].append(dataset)
print(f"Dataset Analytics:")
print(f" Total datasets: {len(all_datasets.data)}")
print(f" Total logs: {total_logs:,}")
print(f" Average logs per dataset: {avg_logs:.1f}")
print(f" Categories:")
for category, datasets in categories.items():
category_logs = sum(d.log_count for d in datasets)
print(f" {category}: {len(datasets)} datasets, {category_logs:,} logs")
analyze_datasets()
def search_datasets(query, search_fields=None):
if search_fields is None:
search_fields = ["name", "description"]
# Search by name
name_results = client.datasets.list(name=query) if "name" in search_fields else None
# For description search, we need to get all datasets and filter
# (assuming API doesn't support description search directly)
all_datasets = client.datasets.list(limit=1000)
description_results = []
if "description" in search_fields:
for dataset in all_datasets.data:
if dataset.description and query.lower() in dataset.description.lower():
description_results.append(dataset)
# Combine and deduplicate results
found_datasets = []
seen_ids = set()
if name_results:
for dataset in name_results.data:
if dataset.id not in seen_ids:
found_datasets.append(dataset)
seen_ids.add(dataset.id)
for dataset in description_results:
if dataset.id not in seen_ids:
found_datasets.append(dataset)
seen_ids.add(dataset.id)
return found_datasets
# Search for datasets
results = search_datasets("customer support")
print(f"Found {len(results)} datasets matching 'customer support'")
for dataset in results:
print(f" {dataset.name}: {dataset.description[:100]}...")
def monitor_dataset_growth():
from datetime import datetime, timedelta
# Get datasets created in different time periods
now = datetime.now()
periods = {
"last_24h": now - timedelta(hours=24),
"last_week": now - timedelta(weeks=1),
"last_month": now - timedelta(days=30)
}
growth_stats = {}
for period_name, start_time in periods.items():
datasets = client.datasets.list(
created_after=start_time.isoformat(),
limit=1000
)
growth_stats[period_name] = {
"count": len(datasets.data),
"total_logs": sum(d.log_count for d in datasets.data)
}
print("Dataset Growth Monitoring:")
for period, stats in growth_stats.items():
print(f" {period}: {stats['count']} datasets, {stats['total_logs']} logs")
return growth_stats
# Monitor growth
growth = monitor_dataset_growth()
def process_datasets_in_batches(batch_size=10):
offset = 0
processed_count = 0
while True:
# Get batch of datasets
batch = client.datasets.list(limit=batch_size, offset=offset)
if not batch.data:
break
# Process each dataset in the batch
for dataset in batch.data:
# Example processing: check if dataset needs updating
if dataset.log_count > 1000 and not dataset.metadata.get("large_dataset"):
print(f"Large dataset detected: {dataset.name} ({dataset.log_count} logs)")
# Could update metadata here
processed_count += 1
print(f"Processed batch: {len(batch.data)} datasets")
if not batch.has_more:
break
offset += batch_size
print(f"Total datasets processed: {processed_count}")
# Process all datasets in batches
process_datasets_in_batches()
from keywordsai.exceptions import ValidationError, RateLimitError
def list_datasets_safely(**kwargs):
try:
datasets = client.datasets.list(**kwargs)
return datasets
except ValidationError as e:
print(f"Validation error: {e}")
return None
except RateLimitError:
print("Rate limit exceeded. Please retry later.")
return None
except Exception as e:
print(f"Unexpected error: {e}")
return None
# Use safe listing
datasets = list_datasets_safely(limit=50, sort_by="created_at")
if datasets:
print(f"Retrieved {len(datasets.data)} datasets")
else:
print("Failed to retrieve datasets")
import time
def list_datasets_with_retry(max_retries=3, **kwargs):
for attempt in range(max_retries):
try:
datasets = client.datasets.list(**kwargs)
return datasets
except RateLimitError:
if attempt < max_retries - 1:
wait_time = 2 ** attempt
print(f"Rate limited. Retrying in {wait_time} seconds...")
time.sleep(wait_time)
else:
print("Rate limit exceeded. Max retries reached.")
return None
except Exception as e:
print(f"Error (attempt {attempt + 1}): {e}")
if attempt < max_retries - 1:
time.sleep(1)
else:
return None
return None
# Use retry logic
datasets = list_datasets_with_retry(limit=100, sort_by="log_count")
def efficient_dataset_iteration(process_func, batch_size=50):
"""Efficiently iterate through all datasets with a processing function."""
offset = 0
total_processed = 0
while True:
try:
batch = client.datasets.list(limit=batch_size, offset=offset)
if not batch.data:
break
# Process batch
for dataset in batch.data:
process_func(dataset)
total_processed += 1
print(f"Processed {len(batch.data)} datasets (total: {total_processed})")
if not batch.has_more:
break
offset += batch_size
except Exception as e:
print(f"Error processing batch at offset {offset}: {e}")
break
return total_processed
# Example usage
def analyze_dataset(dataset):
if dataset.log_count > 500:
print(f"Large dataset: {dataset.name} ({dataset.log_count} logs)")
total = efficient_dataset_iteration(analyze_dataset)
print(f"Analyzed {total} datasets")
from functools import lru_cache
from datetime import datetime, timedelta
class DatasetCache:
def __init__(self, cache_duration_minutes=5):
self.cache_duration = timedelta(minutes=cache_duration_minutes)
self._cache = {}
def _cache_key(self, **kwargs):
return str(sorted(kwargs.items()))
def get_datasets(self, **kwargs):
cache_key = self._cache_key(**kwargs)
now = datetime.now()
# Check cache
if cache_key in self._cache:
cached_data, cached_time = self._cache[cache_key]
if now - cached_time < self.cache_duration:
print(f"Cache hit for key: {cache_key[:50]}...")
return cached_data
# Fetch from API
print(f"Cache miss, fetching from API...")
datasets = client.datasets.list(**kwargs)
# Store in cache
self._cache[cache_key] = (datasets, now)
return datasets
def clear_cache(self):
self._cache.clear()
# Use caching
cache = DatasetCache(cache_duration_minutes=10)
# First call - fetches from API
datasets1 = cache.get_datasets(limit=50, sort_by="created_at")
# Second call - uses cache
datasets2 = cache.get_datasets(limit=50, sort_by="created_at")
import time
from contextlib import contextmanager
@contextmanager
def monitor_performance(operation_name):
start_time = time.time()
try:
yield
finally:
end_time = time.time()
duration = end_time - start_time
print(f"{operation_name} took {duration:.2f} seconds")
# Monitor dataset listing performance
with monitor_performance("Dataset listing"):
datasets = client.datasets.list(limit=100)
print(f"Retrieved {len(datasets.data)} datasets")
with monitor_performance("Filtered dataset listing"):
filtered_datasets = client.datasets.list(
metadata_filter={"category": "support"},
limit=100
)
print(f"Retrieved {len(filtered_datasets.data)} filtered datasets")
def create_dataset_dashboard():
# Get overview statistics
all_datasets = client.datasets.list(limit=1000)
# Recent activity
recent = client.datasets.list(
created_after=(datetime.now() - timedelta(days=7)).isoformat(),
sort_by="created_at",
sort_order="desc"
)
# Largest datasets
largest = client.datasets.list(
sort_by="log_count",
sort_order="desc",
limit=10
)
print("=== Dataset Dashboard ===")
print(f"Total Datasets: {all_datasets.total}")
print(f"Recent (7 days): {len(recent.data)}")
print(f"Total Logs: {sum(d.log_count for d in all_datasets.data):,}")
print("\nLargest Datasets:")
for i, dataset in enumerate(largest.data, 1):
print(f" {i}. {dataset.name}: {dataset.log_count:,} logs")
print("\nRecent Activity:")
for dataset in recent.data[:5]:
print(f" {dataset.name} - {dataset.created_at}")
create_dataset_dashboard()
def find_empty_datasets():
empty_datasets = client.datasets.list(
sort_by="log_count",
sort_order="asc",
limit=1000
)
empty = [d for d in empty_datasets.data if d.log_count == 0]
print(f"Found {len(empty)} empty datasets:")
for dataset in empty:
age_days = (datetime.now() - datetime.fromisoformat(dataset.created_at.replace('Z', '+00:00'))).days
print(f" {dataset.name} - Created {age_days} days ago")
return empty
# Find datasets that might need cleanup
empty_datasets = find_empty_datasets()
def check_dataset_compliance():
all_datasets = client.datasets.list(limit=1000)
compliance_issues = []
for dataset in all_datasets.data:
issues = []
# Check for required metadata
required_fields = ["category", "purpose", "owner"]
for field in required_fields:
if not dataset.metadata.get(field):
issues.append(f"Missing {field}")
# Check naming convention
if not dataset.name.replace(" ", "").replace("-", "").replace("_", "").isalnum():
issues.append("Invalid characters in name")
# Check description length
if not dataset.description or len(dataset.description) < 10:
issues.append("Description too short")
if issues:
compliance_issues.append({
"dataset": dataset,
"issues": issues
})
print(f"Compliance Check Results:")
print(f" Total datasets: {len(all_datasets.data)}")
print(f" Datasets with issues: {len(compliance_issues)}")
for item in compliance_issues[:10]: # Show first 10
print(f"\n {item['dataset'].name}:")
for issue in item['issues']:
print(f" - {issue}")
return compliance_issues
# Run compliance check
compliance_issues = check_dataset_compliance()