Skip to article frontmatterSkip to article content
Site not loading correctly?

This may be due to an incorrect BASE_URL configuration. See the MyST Documentation for reference.

This notebook demonstrates how to download and work with GRIB2 forecast files from the Spire Weather API.

import os
import requests
from dotenv import load_dotenv

load_dotenv()

API_KEY = os.environ.get('SPIRE_API_KEY')
BASE_URL = 'https://api.wx.spire.com'
HEADERS = {'spire-api-key': API_KEY}

List Available Files

# Get list of available forecast files
params = {
    'bundles': 'basic',
    'regions': 'global'
}

response = requests.get(
    f'{BASE_URL}/forecast/file',
    headers=HEADERS,
    params=params
)
response.raise_for_status()
file_list = response.json()

print(f"Total files available: {file_list['meta']['count']}")
print()
print("First 10 files:")
for f in file_list['files'][:10]:
    print(f"  {f}")

Parse File Names

Understanding the file naming convention:

import re
from dataclasses import dataclass
from datetime import datetime

@dataclass
class FileInfo:
    system: str
    date: datetime
    hour: int
    resolution: float
    bundle: str
    region: str
    lead_hours: int
    format: str
    filename: str

def parse_filename(filename):
    """Parse a forecast filename into components."""
    pattern = r'^(\w+[-\w]*)\.(\d{8})\.t(\d{2})z\.(\d+p\d+)\.(\w+)\.(\w+)\.f(\d{3})\.(\w+)$'
    match = re.match(pattern, filename)
    
    if not match:
        return None
    
    system, date_str, hour, res_str, bundle, region, lead, fmt = match.groups()
    
    return FileInfo(
        system=system,
        date=datetime.strptime(date_str, '%Y%m%d'),
        hour=int(hour),
        resolution=float(res_str.replace('p', '.')),
        bundle=bundle,
        region=region,
        lead_hours=int(lead),
        format=fmt,
        filename=filename
    )

# Parse all files
parsed_files = [parse_filename(f) for f in file_list['files']]
parsed_files = [f for f in parsed_files if f is not None]

# Show example
if parsed_files:
    example = parsed_files[0]
    print(f"Example: {example.filename}")
    print(f"  System: {example.system}")
    print(f"  Date: {example.date.strftime('%Y-%m-%d')}")
    print(f"  Issuance: {example.hour:02d}Z")
    print(f"  Resolution: {example.resolution}°")
    print(f"  Lead time: {example.lead_hours} hours")

Filter Files

# Get unique lead times
lead_times = sorted(set(f.lead_hours for f in parsed_files))
print(f"Available lead times: {lead_times}")

# Filter to first 24 hours only
short_range = [f for f in parsed_files if f.lead_hours <= 24]
print(f"\nFiles with lead time <= 24 hours: {len(short_range)}")

Download a Single File

# Create output directory
output_dir = 'downloaded_data'
os.makedirs(output_dir, exist_ok=True)

# Download the first file
if file_list['files']:
    filename = file_list['files'][0]
    print(f"Downloading: {filename}")
    
    file_response = requests.get(
        f'{BASE_URL}/forecast/file/{filename}',
        headers=HEADERS,
        allow_redirects=True  # Important!
    )
    file_response.raise_for_status()
    
    filepath = os.path.join(output_dir, filename)
    with open(filepath, 'wb') as f:
        f.write(file_response.content)
    
    print(f"Saved to: {filepath}")
    print(f"File size: {os.path.getsize(filepath) / 1024 / 1024:.2f} MB")

Download Multiple Files

def download_file(filename, output_dir='downloaded_data'):
    """Download a single file from the API."""
    filepath = os.path.join(output_dir, filename)
    
    # Skip if already exists
    if os.path.exists(filepath):
        return filepath, 'skipped'
    
    response = requests.get(
        f'{BASE_URL}/forecast/file/{filename}',
        headers=HEADERS,
        allow_redirects=True
    )
    response.raise_for_status()
    
    with open(filepath, 'wb') as f:
        f.write(response.content)
    
    return filepath, 'downloaded'

# Download first 3 files (for demonstration)
files_to_download = file_list['files'][:3]

for filename in files_to_download:
    filepath, status = download_file(filename)
    print(f"{status}: {filename}")

Read GRIB2 with xarray

Requires: cfgrib and eccodes

try:
    import xarray as xr
    
    # Find a downloaded file
    grib_files = [f for f in os.listdir(output_dir) if f.endswith('.grib2')]
    
    if grib_files:
        filepath = os.path.join(output_dir, grib_files[0])
        print(f"Opening: {filepath}")
        
        # Open with cfgrib engine
        ds = xr.open_dataset(filepath, engine='cfgrib')
        print("\nDataset:")
        print(ds)
    else:
        print("No GRIB2 files found. Download some files first.")
        
except ImportError:
    print("cfgrib not installed. Install with: pip install cfgrib")
except Exception as e:
    print(f"Error opening file: {e}")

Explore Dataset Variables

try:
    if 'ds' in dir():
        print("Variables in dataset:")
        for var in ds.data_vars:
            print(f"  {var}: {ds[var].dims}")
        
        print("\nCoordinates:")
        for coord in ds.coords:
            print(f"  {coord}: {ds.coords[coord].shape}")
except NameError:
    print("Dataset not loaded.")

Get Latest Files

# Use the latest endpoint to get most recent files regardless of issuance
response = requests.get(
    f'{BASE_URL}/forecast/latest/file',
    headers=HEADERS,
    params={'bundles': 'basic'}
)
response.raise_for_status()
latest_files = response.json()

print(f"Latest files: {latest_files['meta']['count']}")
print()
for f in latest_files['files'][:5]:
    print(f"  {f}")

Cleanup

# Optionally remove downloaded files
# import shutil
# shutil.rmtree(output_dir)
# print(f"Removed {output_dir}")

Next Steps