icebox/notes/s3_simulate.py

284 lines
11 KiB
Python

import datetime
from dataclasses import dataclass
from typing import List, Dict, Tuple
from argparse import ArgumentParser
@dataclass
class S3PricingConfig:
# Storage pricing tiers in GB (tier_min, tier_max, price_per_gb_month)
storage_tiers: List[Tuple[float, float, float]]
# Data transfer pricing in/out in GB (tier_min, tier_max, price_per_gb)
transfer_in_tiers: List[Tuple[float, float, float]]
transfer_out_tiers: List[Tuple[float, float, float]]
# Request pricing (per 1000 requests)
put_request_price: float
get_request_price: float
delete_request_price: float
@dataclass
class S3Usage:
file_size_gb: float
upload_date: datetime.date
download_count: int
delete_date: datetime.date = None # None means file is not deleted
class S3PricingSimulator:
def __init__(self, pricing_config: S3PricingConfig):
self.config = pricing_config
def calculate_storage_cost(self, usage_list: List[S3Usage], start_date: datetime.date, end_date: datetime.date) -> Dict:
"""Calculate storage costs for the given time period across all usage items."""
# Track total GB stored per day
daily_storage = {}
current_date = start_date
while current_date <= end_date:
daily_storage[current_date] = 0
for usage in usage_list:
if usage.upload_date <= current_date and (usage.delete_date is None or usage.delete_date > current_date):
daily_storage[current_date] += usage.file_size_gb
current_date += datetime.timedelta(days=1)
# Calculate monthly storage
monthly_storage = {}
for date, storage_gb in daily_storage.items():
month_key = f"{date.year}-{date.month:02d}"
if month_key not in monthly_storage:
monthly_storage[month_key] = []
monthly_storage[month_key].append(storage_gb)
# Calculate costs per month
storage_costs = {}
for month, daily_gb_values in monthly_storage.items():
# Average GB stored in the month
avg_gb = sum(daily_gb_values) / len(daily_gb_values)
cost = self._calculate_tiered_cost(avg_gb, self.config.storage_tiers)
storage_costs[month] = cost
return {
"daily_storage_gb": daily_storage,
"monthly_avg_storage_gb": {month: sum(days)/len(days) for month, days in monthly_storage.items()},
"monthly_storage_cost": storage_costs,
"total_storage_cost": sum(storage_costs.values())
}
def calculate_transfer_costs(self, usage_list: List[S3Usage]) -> Dict:
"""Calculate data transfer costs for all usage items."""
total_transfer_in = 0
total_transfer_out = 0
for usage in usage_list:
# Each file is uploaded once (transfer in)
total_transfer_in += usage.file_size_gb
# And downloaded multiple times (transfer out)
total_transfer_out += usage.file_size_gb * usage.download_count
transfer_in_cost = self._calculate_tiered_cost(total_transfer_in, self.config.transfer_in_tiers)
transfer_out_cost = self._calculate_tiered_cost(total_transfer_out, self.config.transfer_out_tiers)
return {
"total_transfer_in_gb": total_transfer_in,
"total_transfer_out_gb": total_transfer_out,
"transfer_in_cost": transfer_in_cost,
"transfer_out_cost": transfer_out_cost,
"total_transfer_cost": transfer_in_cost + transfer_out_cost
}
def calculate_request_costs(self, usage_list: List[S3Usage]) -> Dict:
"""Calculate request costs for all usage items."""
# Each file has 1 PUT, n GETs, and potentially 1 DELETE
put_requests = len(usage_list)
get_requests = sum(usage.download_count for usage in usage_list)
delete_requests = sum(1 for usage in usage_list if usage.delete_date is not None)
put_cost = (put_requests / 1000) * self.config.put_request_price
get_cost = (get_requests / 1000) * self.config.get_request_price
delete_cost = (delete_requests / 1000) * self.config.delete_request_price
return {
"put_requests": put_requests,
"get_requests": get_requests,
"delete_requests": delete_requests,
"put_cost": put_cost,
"get_cost": get_cost,
"delete_cost": delete_cost,
"total_request_cost": put_cost + get_cost + delete_cost
}
def simulate(self, usage_list: List[S3Usage], start_date: datetime.date = None, end_date: datetime.date = None) -> Dict:
"""Run a complete simulation with the given usage patterns."""
if not usage_list:
return {"error": "No usage items provided."}
# Determine the simulation time period if not specified
if start_date is None:
start_date = min(usage.upload_date for usage in usage_list)
if end_date is None:
# Find latest date among delete_dates (considering None as "not deleted")
latest_delete = max((u.delete_date for u in usage_list if u.delete_date is not None), default=None)
# If no files are deleted, simulate for one month from the last upload
if latest_delete is None:
latest_upload = max(u.upload_date for u in usage_list)
end_date = latest_upload + datetime.timedelta(days=30)
else:
end_date = latest_delete
# Run the individual cost calculations
storage_results = self.calculate_storage_cost(usage_list, start_date, end_date)
transfer_results = self.calculate_transfer_costs(usage_list)
request_results = self.calculate_request_costs(usage_list)
# Combine all results
total_cost = (
storage_results["total_storage_cost"] +
transfer_results["total_transfer_cost"] +
request_results["total_request_cost"]
)
return {
"simulation_period": {
"start_date": start_date,
"end_date": end_date
},
"storage": storage_results,
"transfer": transfer_results,
"requests": request_results,
"total_cost": total_cost
}
def _calculate_tiered_cost(self, amount: float, tiers: List[Tuple[float, float, float]]) -> float:
"""Calculate cost based on tiered pricing."""
if amount <= 0:
return 0
total_cost = 0
remaining = amount
for tier_min, tier_max, price_per_unit in tiers:
# Skip tiers below our amount
if tier_max <= 0 or tier_min >= remaining:
continue
# Calculate how much falls into this tier
tier_amount = min(remaining, tier_max - tier_min)
total_cost += tier_amount * price_per_unit
remaining -= tier_amount
# If we've accounted for everything, stop
if remaining <= 0:
break
return total_cost
pricing = {}
# Sample pricing configuration based on approximated AWS S3 Standard pricing
pricing['aws'] = S3PricingConfig(
# Storage tiers (GB range min, max, price per GB-month)
storage_tiers=[
(0, 50 * 1024, 0.023), # First 50 TB
(50 * 1024, 450 * 1024, 0.022), # Next 400 TB
(450 * 1024, float('inf'), 0.021) # Over 450 TB
],
# Data transfer in (usually free)
transfer_in_tiers=[
(0, float('inf'), 0.0)
],
# Data transfer out tiers
transfer_out_tiers=[
(0, 1, 0.0), # First 1 GB free
(1, 10 * 1024, 0.09), # Up to 10 TB
(10 * 1024, 50 * 1024, 0.085), # Next 40 TB
(50 * 1024, 150 * 1024, 0.07), # Next 100 TB
(150 * 1024, float('inf'), 0.05) # Over 150 TB
],
# Request pricing (per 1000)
put_request_price=0.005,
get_request_price=0.0004,
delete_request_price=0.0
)
#
pricing['b2'] = S3PricingConfig(
# Storage tiers (GB range min, max, price per GB-month)
storage_tiers=[
(0, 10, 0), # First 10 GB
(10, float('inf'), 0.0006)
],
# Data transfer in (usually free)
transfer_in_tiers=[
(0, float('inf'), 0.0)
],
# Data transfer out tiers
transfer_out_tiers=[
(0, float('inf'), 0.01) # Over 150 TB
],
# Request pricing (per 1000)
put_request_price=0.000,
get_request_price=0.0004,
delete_request_price=0.0
)
def main () :
services = ['aws', 'b2']
service_name = {}
service_name['aws'] = 'Amazon'
service_name['b2'] = 'Back Blaze B2'
parser = ArgumentParser(description='Simulate s3-like service pricing scenarios')
parser.add_argument('-f', '--file', default=80, type=int, help='File size (GB)')
parser.add_argument('-c', '--count', default=100, type=int, help='Number of files to simulate')
parser.add_argument('-d', '--downloads', default=10, type=int, help='Number of downloads per file to simulate')
parser.add_argument('-l', '--length', default=30, type=int, help='Length of upload in days')
parser.add_argument('-s', '--service', default='aws', choices=services, type=str, help='File size (GB)')
args = parser.parse_args()
simulator = S3PricingSimulator(pricing[args.service])
today = datetime.date.today()
large_file = S3Usage(
file_size_gb=args.file,
upload_date=today,
download_count=args.downloads,
delete_date=today + datetime.timedelta(days=15)
)
usage_list = [ large_file ] * args.count
results = simulator.simulate(usage_list)
print("S3 Cost Simulation Results")
print("=========================")
print(f"File Size: {args.file}GB")
print(f" Count: {args.count}")
print(f"Downloads: {args.downloads}")
print(f" Service: {service_name[args.service]}")
print("=========================")
print(f"Period: {results['simulation_period']['start_date']} to {results['simulation_period']['end_date']}")
print("\nStorage Costs:")
for month, cost in results['storage']['monthly_storage_cost'].items():
avg_gb = results['storage']['monthly_avg_storage_gb'][month]
print(f" {month}: {avg_gb:.2f} GB (avg) = ${cost:.2f}")
print(f" Total: ${results['storage']['total_storage_cost']:.2f}")
print("\nData Transfer Costs:")
print(f" In: {results['transfer']['total_transfer_in_gb']:.2f} GB = ${results['transfer']['transfer_in_cost']:.2f}")
print(f" Out: {results['transfer']['total_transfer_out_gb']:.2f} GB = ${results['transfer']['transfer_out_cost']:.2f}")
print(f" Total: ${results['transfer']['total_transfer_cost']:.2f}")
print("\nRequest Costs:")
print(f" PUT: {results['requests']['put_requests']} requests = ${results['requests']['put_cost']:.4f}")
print(f" GET: {results['requests']['get_requests']} requests = ${results['requests']['get_cost']:.4f}")
print(f" DELETE: {results['requests']['delete_requests']} requests = ${results['requests']['delete_cost']:.4f}")
print(f" Total: ${results['requests']['total_request_cost']:.4f}")
print("\nTotal Estimated Cost: ${:.2f}".format(results['total_cost']))
if __name__ == "__main__":
main()