Overview
The@publish decorator allows you to automatically publish function return values to Contextbase without modifying your existing code.
Basic usage
Simple JSON publishing
Copy
from contextbase import publish
@publish("analytics", "user-actions")
def track_user_login(user_id, login_method):
return {
"user_id": user_id,
"action": "login",
"method": login_method,
"timestamp": "2024-01-15T14:30:00Z",
"success": True
}
# Function runs normally and publishes result
result = track_user_login(123, "oauth")
print(result) # Returns the original data
File publishing
Automatically convert function output to files:Copy
@publish("reports", "daily-summary", as_file=True, file_name="summary.txt")
def generate_daily_report():
return "Daily Report: All systems operational"
# Creates and uploads summary.txt automatically
report = generate_daily_report()
@publish("exports", "data-dumps", as_file=True, file_name="export.json")
def export_user_data(user_id):
data = get_user_data(user_id) # Your function
return json.dumps(data, indent=2)
# Creates JSON file and uploads it
export_user_data(123)
Decorator parameters
Required parameters
Copy
@publish(context_name, **options)
context_name(str): Name of the context to publish to
Optional parameters
Copy
@publish(
context_name="analytics",
scopes={"environment": "prod"}, # Scoping data (static or callable)
raise_on_error=False, # Error handling behavior
as_file=False, # Publish as file instead of JSON
file_name="output.txt" # File name (static or callable)
)
Scoping
Add scopes to filter and organize your published data:Static scopes
Copy
@publish("monitoring", "metrics", scopes={"environment": "production"})
def collect_system_metrics():
return {
"cpu_usage": 75,
"memory_usage": 60,
"disk_usage": 45
}
Dynamic scopes
Compute scopes dynamically based on the function’s return value:Copy
@publish("user-data", "preferences", scopes=lambda result: {"user_id": result["user_id"]})
def update_user_preferences(user_id, preferences):
return {
"user_id": user_id,
"preferences": preferences,
"updated_at": datetime.now().isoformat()
}
# Scopes will be: {"user_id": 123}
update_user_preferences(123, {"theme": "dark"})
Copy
@publish(
"error-tracking",
"api-errors",
scopes=lambda result: {
"service": result["service"],
"severity": result["severity"],
"error_type": result["error_type"]
}
)
def handle_api_error(service, error_message, severity="medium"):
error_type = "timeout" if "timeout" in error_message.lower() else "unknown"
return {
"service": service,
"error_message": error_message,
"error_type": error_type,
"severity": severity,
"timestamp": datetime.now().isoformat()
}
# Scopes computed from actual error details!
handle_api_error("payment-api", "Connection timeout", "high")
Error handling
Silent mode (Default)
By default, publishing errors don’t affect your function:Copy
@publish("logs", "errors") # raise_on_error=False by default
def process_data(data):
# Your business logic
result = expensive_computation(data)
return result
# Even if publishing fails, your function succeeds
Strict mode
Raise exceptions when publishing fails:Copy
@publish("critical-data", "transactions", raise_on_error=True)
def process_payment(amount, user_id):
# Critical data that must be logged
transaction = {
"amount": amount,
"user_id": user_id,
"status": "completed",
"timestamp": datetime.now().isoformat()
}
return transaction
# Will raise ContextbaseError if publishing fails
Custom error handling
Copy
from contextbase import ContextbaseError
@publish("analytics", "events", raise_on_error=True)
def track_event(event_type, data):
try:
return {"event": event_type, "data": data}
except ContextbaseError as e:
# Handle publishing error
logger.error(f"Failed to publish event: {e.message}")
# Could fallback to local storage, retry later, etc.
raise
File publishing
Automatic file names
Whenfile_name is not provided, the decorator generates one:
Copy
@publish("reports", "generated", as_file=True)
def create_user_report(user_id):
return f"Report for user {user_id}: ..."
# Creates file: "create_user_report_output.txt"
Custom file names
Copy
@publish("exports", "csv-data", as_file=True, file_name="data.csv")
def export_to_csv():
return "name,email,age\nJohn,[email protected],30"
# Creates file: "data.csv"
Dynamic file names
Generate file names dynamically:Copy
from datetime import datetime
@publish("backups", "database", as_file=True,
file_name=lambda: f"backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.sql")
def backup_database():
return generate_sql_dump()
# Creates file: "backup_20240115_143000.sql"
Copy
# User-specific reports
@publish("user-reports", "monthly", as_file=True,
file_name=lambda: f"user_report_{datetime.now().strftime('%Y%m')}.pdf")
def generate_monthly_report(user_id):
return create_pdf_content(user_id)
# Content-based naming
@publish("analysis", "results", as_file=True,
file_name=lambda: f"analysis_{int(time.time())}.json")
def analyze_data(dataset):
return json.dumps(perform_analysis(dataset))
Advanced examples
ML experiment tracking
Copy
@publish(
"ml-experiments",
"training-results",
scopes=lambda result: {
"model_type": result["model_config"]["type"],
"accuracy_tier": "high" if result["accuracy"] > 0.9 else "medium"
}
)
def train_model(model_config):
model = create_model(model_config)
metrics = train_and_evaluate(model)
return {
"model_config": model_config,
"accuracy": metrics["accuracy"],
"loss": metrics["loss"],
"training_time": metrics["duration"],
"experiment_id": generate_id()
}
# Automatically categorizes experiments by type and performance!
Multi-tenant application logging
Copy
@publish(
"multi-tenant-logs",
"user-actions",
scopes=lambda result: {
"tenant_id": result["tenant_id"],
"user_role": result["user_role"],
"action_category": result["action"].split("_")[0]
}
)
def track_tenant_action(tenant_id, user_id, user_role, action):
return {
"tenant_id": tenant_id,
"user_id": user_id,
"user_role": user_role,
"action": action,
"timestamp": datetime.now().isoformat()
}
# Perfect scoping for multi-tenant analytics!
Combined dynamic features
Copy
@publish(
"user-reports",
"personalized",
as_file=True,
scopes=lambda result: {
"user_id": extract_user_id(result),
"report_type": "monthly"
},
file_name=lambda: f"user_report_{datetime.now().strftime('%Y%m')}.txt"
)
def generate_personalized_report(user_id):
return f"User Report for {user_id}: ..."
# Both scopes AND filename computed dynamically! 🚀
Combining with other decorators
The@publish decorator works well with other decorators:
Copy
import time
from functools import wraps
def timing_decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
duration = time.time() - start
print(f"{func.__name__} took {duration:.2f} seconds")
return result
return wrapper
@timing_decorator
@publish("performance", "function-timings")
def expensive_computation(data):
# Simulate expensive work
time.sleep(1)
return {"result": len(data), "processed_at": datetime.now().isoformat()}
# Both decorators work together
expensive_computation([1, 2, 3, 4, 5])
Best practices
✅ Recommended patterns
Copy
# Use a descriptive context name
@publish("user-login-events")
def track_login(user_id):
return {"user_id": user_id, "timestamp": datetime.now().isoformat()}
# Use dynamic scopes for better organization
@publish("monitoring-errors", scopes=lambda result: {
"service": result["service"],
"severity": result["severity"]
})
def log_error(service, error):
return {"service": service, "error": str(error), "severity": "high"}
# Use raise_on_error=True for critical data
@publish("financial-transactions", raise_on_error=True)
def process_transaction(amount):
return {"amount": amount, "processed": True}
❌ Avoid these patterns
Copy
# Don't use vague names
@publish("stuff") # ❌ Not descriptive
# Don't publish sensitive data
@publish("passwords") # ❌ Security risk
def authenticate(password):
return {"password": password} # ❌ Never log passwords
# Don't ignore errors for critical data
@publish("charges", raise_on_error=False) # ❌ Could lose important data
def process_charge(amount):
return {"charge": amount}
## Troubleshooting
### Debugging dynamic scopes
```python
# Test your dynamic scopes logic separately
def compute_scopes(result):
return {"user_id": result.get("user_id", "unknown")}
# Test it
test_result = {"user_id": 123, "action": "login"}
print(compute_scopes(test_result)) # {"user_id": 123}
# Use in decorator
@publish("events", scopes=compute_scopes)
def my_function():
return test_result
Debugging publishing issues
Copy
from contextbase import ContextbaseError
@publish("test", raise_on_error=True)
def debug_function():
return {"test": "data"}
try:
result = debug_function()
print("✅ Publishing successful")
except ContextbaseError as e:
print(f"❌ Publishing failed: {e.message}")
print(f"Status code: {e.status_code}")
print(f"Errors: {e.errors}")
