در مدیریت مدرن بحران و تحلیلهای زیستمحیطی، دسترسی به دادههای سنجش از دور (Remote Sensing) جایگزین روشهای سنتی شده است. برای ردیابی آتشسوزیها، سه ابزار کلیدی زیر، زنجیره ارزش دادهها را از مشاهده تا تحلیل تکمیل میکنند:
سامانه NASA FIRMS
دادههای بلادرنگ این پلتفرم (Fire Information for Resource Management System) منبع اصلی برای دریافت دادههای «تقریباً زمان واقعی» (Near Real-Time) است. FIRMS با استفاده از سنسورهای MODIS و VIIRS، نقاط داغ (Hotspots) را ظرف ۳ ساعت پس از عبور ماهواره شناسایی میکند. خروجی این سیستم صرفاً تصویر نیست، بلکه دادههای مکانی دقیق برای تیمهای عملیاتی است.

پلتفرم Nasa Worldview
بصریسازی چندلایه در حالی که FIRMS بر موقعیت تمرکز دارد، Worldview امکان تحلیل بصری را فراهم میکند. با لایهگذاری «نابهنجاریهای حرارتی» (Thermal Anomalies) بر روی تصاویر اپتیکال (مانند لایههای Reflectance)، میتوان علاوه بر کانون آتش، حجم و جهت انتشار دود را بررسی کرد. لینکهای بررسی شده نشاندهندهی ترکیب باندهای حرارتی و مرئی برای تشخیص دقیقتر رخدادها در مقیاس جهانی هستند.

موتور Google Earth Engine (GEE)
تحلیل محاسباتی برای تحلیلهای عمیقتر، GEE فراتر از مشاهده میرود. این پلتفرم ابری امکان کدنویسی (جاوا اسکریپت/پایتون) روی پتابایتها دادهی ماهوارهای (لندست، سنتینل) را فراهم میکند. در بیزینس-تک، از GEE برای محاسبه شاخصهای شدت سوختگی (مانند NBR)، تحلیل سریهای زمانی آتشسوزی و توسعه الگوریتمهای پیشبینی خطر با یادگیری ماشین استفاده میشود.

قدرت واقعی Google Earth Engine در دسترسی مستقیم به ImageCollectionهای سطح ۲ و ۳ و قابلیتهای محاسباتی سمت سرور (Server-side) نهفته است. برای پایش حریق، تحلیلگران از ترکیب دادههای زیر استفاده میکنند:
دادههای تفکیک مکانی بالا (High-Res)
استفاده از مجموعههای COPERNICUS/S2_SR (سنتینل-۲) و LANDSAT/LC08/C02/T1_L2 (لندست ۸/۹) برای محاسبه شاخصهای طیفی دقیق. با دسترسی به باندهای NIR (مادون قرمز نزدیک) و SWIR (مادون قرمز کوتاه)، شاخص NBR (Normalized Burn Ratio) و dNBR (تفاضل NBR پیش و پس از آتش) برای ارزیابی شدت سوختگی (Burn Severity) با دقت ۱۰ تا ۳۰ متر محاسبه میشود.
دادههای تفکیک زمانی بالا (High-Temporal)
استفاده از پروداکت MODIS/061/MCD64A1 برای نقشهبرداری ماهانه نواحی سوخته در مقیاس ۵۰۰ متر و دادههای NOAA/GOES برای پایش فعال با نرخ بروزرسانی دقیقهای.
الگوریتمهای پردازشی گوگل ارث انجین
GEE امکان اعمال فیلترهای کیفیت (QA Bands) برای حذف ابر و سایه، اجرای توابع Reducer برای ساخت کامپوزیتهای زمانی (مانند Median یا Max طی یک فصل)، و اعمال الگوریتمهای طبقهبندی نظارتشده (مانند Random Forest) را بدون نیاز به دانلود پتابایتها داده فراهم میکند. این رویکرد، تاخیر (Latency) تحلیل را از “روزها” به “ثانیهها” کاهش میدهد. این ویدیو Active wildfire products and mapping in Google Earth Engine بررسی خوبی روی این استفاده از گوگل ارث انجین است.
تکه کد زیر نمونهای از استفاده از GEE برای پیدا کردن آتش است که در ادامه یک نمونه کامل درج شده است.
# ترکیب دادههای حرارتی VIIRS برای تشخیص نقاط قطعی آتش
fire_hotspots = ee.ImageCollection("NASA/VIIRS/002/VNP14A1") \
.filterBounds(roi).filterDate(start, end) \
.select('FireMask').max().gte(7)
# محاسبه شاخص شدت سوختگی (dNBR) با تصاویر Sentinel-2
dnbr = pre_fire_img.normalizedDifference(['B8', 'B12']) \
.subtract(post_fire_img.normalizedDifference(['B8', 'B12']))
شناسایی آتش با استفاده از GEE
این اسکریپت پایتون، یک پایپلاین (Pipeline) خودکار برای پیدا کردن آتش با استفاده ازGoogle Earth Engine است. سیستم زیر با تلفیق دادههای حرارتی فرکانسبالای VIIRS (جهت تشخیص نقاط داغ) و تصاویر اپتیکال Sentinel-2 (جهت تحلیل شدت سوختگی یا dNBR)، نقشههای دقیق گسترش آتش و گزارشهای تحلیلی را بدون دخالت کاربر تولید و اکسپورت میکند.
import ee
import requests
import numpy as np
import matplotlib.pyplot as plt
import json
from io import BytesIO
from PIL import Image, ImageDraw, ImageFont
from datetime import datetime, timezone, timedelta
from pathlib import Path
import pandas as pd
# ============================================================
# 0. ENHANCED CONFIGURATION
# ============================================================
CLOUD_PROJECT = 'REPLACE WITH YOUR CODE'
# ROI: polygon coordinates (lon, lat)
ROI_COORDS = [
[51.0557141979713, 36.32172705064279],
[51.112705775119736, 36.28797254158071],
[51.10206276974864, 36.26389278029297],
[51.15596444211192, 36.23703647459804],
[51.18068368039317, 36.25475713493892],
[51.26273781210802, 36.24755860127389],
[51.30942971310802, 36.270259416408464],
[51.26685769162364, 36.3770306956692],
[51.0557141979713, 36.32172705064279]
]
ROI_NAME = "Target Fire Zone"
INCIDENT_ID = "FIRE-2024-001"
# Temporal parameters
MAX_LOOKBACK_DAYS = 90
PRE_DAYS_BEFORE_FIRE = 60
PRE_DAYS_END_BEFORE_FIRE = 30
POST_DAYS_AFTER_FIRE_END = 21
SNAPSHOT_WINDOW_DAYS = 3
# Ultra high-resolution for fire hotspots
ULTRA_HIGHRES_DIM = 8192
HOTSPOT_BUFFER_METERS = 2000
# Output organization
OUTPUT_DIR = Path("fire_intelligence_outputs")
GEOJSON_DIR = OUTPUT_DIR / "geographic_data"
IMAGERY_DIR = OUTPUT_DIR / "imagery"
REPORTS_DIR = OUTPUT_DIR / "reports"
TIMESERIES_DIR = OUTPUT_DIR / "timeseries"
HOTSPOT_DIR = OUTPUT_DIR / "fire_hotspots"
# Visualization parameters
VIS_TRUE_COLOR = {'bands': ['B4', 'B3', 'B2'], 'min': 0, 'max': 3000, 'gamma': 1.4}
VIS_FALSE_COLOR = {'bands': ['B12', 'B8', 'B4'], 'min': 0, 'max': 3500}
VIS_THERMAL = {'bands': ['B11'], 'min': 273, 'max': 333, 'palette': ['blue', 'cyan', 'yellow', 'red']}
VIS_DNBR = {
'min': -0.1,
'max': 1.3,
'palette': ['006400', '7FFFD4', 'ADFF2F', 'FFA500', 'FF4500', '8B0000']
}
FONT_TITLE = {'size': 18, 'weight': 'bold', 'color': '#1a1a1a'}
FONT_SUBTITLE = {'size': 12, 'weight': 'normal', 'color': '#4a4a4a'}
BRAND_COLOR = '#c0392b'
DPI_PRINT = 300
# ============================================================
# 1. INITIALIZATION & SETUP
# ============================================================
def setup_environment():
"""Initialize Earth Engine and create output directories."""
try:
ee.Initialize(project=CLOUD_PROJECT)
print(f"✓ Connected to Earth Engine project: {CLOUD_PROJECT}")
except Exception:
print("⚠ Authenticating with Earth Engine...")
ee.Authenticate()
ee.Initialize(project=CLOUD_PROJECT)
print(f"✓ Authenticated and connected")
for directory in [OUTPUT_DIR, GEOJSON_DIR, IMAGERY_DIR, REPORTS_DIR,
TIMESERIES_DIR, HOTSPOT_DIR]:
directory.mkdir(parents=True, exist_ok=True)
print(f"✓ Output directories created at: {OUTPUT_DIR.absolute()}")
# ============================================================
# 2. HELPER FUNCTIONS
# ============================================================
def fetch_image_as_array(ee_image, roi_geom, params, dimensions=1080):
"""Download an EE image as NumPy array with retry logic."""
max_retries = 3
for attempt in range(max_retries):
try:
thumb_params = {
'region': roi_geom,
'dimensions': dimensions,
'format': 'png',
}
thumb_params.update(params)
url = ee_image.getThumbURL(thumb_params)
response = requests.get(url, timeout=300)
response.raise_for_status()
img = Image.open(BytesIO(response.content))
return np.array(img)
except Exception as e:
if attempt < max_retries - 1:
print(f" ⚠ Retry {attempt + 1}/{max_retries}")
continue
else:
print(f" ❌ Failed after {max_retries} attempts: {e}")
return None
def mask_s2_sr_clouds(image):
"""Enhanced cloud masking for Sentinel-2 SR."""
scl = image.select('SCL')
mask = (scl.neq(3).And(scl.neq(8)).And(scl.neq(9))
.And(scl.neq(10)).And(scl.neq(11)))
return image.updateMask(mask).copyProperties(image, ['system:time_start'])
def get_s2_composite(start, end, roi_geom, apply_cloud_mask=True):
"""Get Sentinel-2 composite with metadata."""
s2 = (ee.ImageCollection("COPERNICUS/S2_SR_HARMONIZED")
.filterBounds(roi_geom)
.filterDate(start, end)
.filter(ee.Filter.lt('CLOUDY_PIXEL_PERCENTAGE', 60)))
if apply_cloud_mask:
s2 = s2.map(mask_s2_sr_clouds)
count = s2.size().getInfo()
if count == 0:
return None, 0
return s2.median().clip(roi_geom), count
def get_best_s2_image(start, end, roi_geom):
"""Get single best Sentinel-2 image with metadata - FULLY FIXED."""
try:
s2 = (ee.ImageCollection("COPERNICUS/S2_SR_HARMONIZED")
.filterBounds(roi_geom)
.filterDate(start, end)
.filter(ee.Filter.lt('CLOUDY_PIXEL_PERCENTAGE', 80))
.sort('CLOUDY_PIXEL_PERCENTAGE'))
size = s2.size().getInfo()
if size == 0:
return None, None
# Get the first image and ensure it's valid
first_img = s2.first()
# Force evaluation to check if it's actually an Image
img = ee.Image(first_img)
# Try to get properties - if this fails, image is invalid
date = ee.Date(img.get('system:time_start')).format('YYYY-MM-dd').getInfo()
cloud_pct = img.get('CLOUDY_PIXEL_PERCENTAGE').getInfo()
# Apply cloud mask and clip
masked_img = mask_s2_sr_clouds(img).clip(roi_geom)
return masked_img, {'date': date, 'cloud_pct': cloud_pct}
except Exception as e:
print(f" ⚠ Error getting best S2 image: {str(e)}")
return None, None
# ============================================================
# 3. FIRE DETECTION & TRACKING
# ============================================================
def detect_fire_activity_detailed(roi_geom, max_lookback_days=90):
"""Comprehensive fire detection with daily tracking."""
today = datetime.now(timezone.utc)
start = today - timedelta(days=max_lookback_days)
start_str = start.strftime('%Y-%m-%d')
end_str = today.strftime('%Y-%m-%d')
print(f"\n{'=' * 60}")
print(f"🔍 INITIATING FIRE DETECTION SCAN")
print(f"{'=' * 60}")
print(f"Time window: {start_str} to {end_str} ({max_lookback_days} days)")
viirs = (ee.ImageCollection("NASA/VIIRS/002/VNP14A1")
.filterBounds(roi_geom)
.filterDate(start_str, end_str))
count = viirs.size().getInfo()
if count == 0:
print("❌ No VIIRS fire detections in specified period")
return None
print(f"✓ Found {count} VIIRS daily products to analyze")
daily_fires = []
viirs_list = viirs.toList(count)
for i in range(count):
img = ee.Image(viirs_list.get(i))
date_ms = img.get('system:time_start').getInfo()
date = datetime.fromtimestamp(date_ms / 1000, tz=timezone.utc)
fire_mask = img.select('FireMask').gte(7)
fire_pixels = fire_mask.reduceRegion(
reducer=ee.Reducer.sum(),
geometry=roi_geom,
scale=375,
maxPixels=1e9
).get('FireMask')
fire_count = fire_pixels.getInfo() if fire_pixels else 0
if fire_count and fire_count > 0:
frp = img.select('MaxFRP').updateMask(fire_mask)
frp_stats = frp.reduceRegion(
reducer=ee.Reducer.mean().combine(
reducer2=ee.Reducer.max(),
sharedInputs=True
),
geometry=roi_geom,
scale=375,
maxPixels=1e9
)
daily_fires.append({
'date': date,
'date_str': date.strftime('%Y-%m-%d'),
'fire_pixels': int(fire_count),
'frp_mean': frp_stats.get('MaxFRP_mean').getInfo() or 0,
'frp_max': frp_stats.get('MaxFRP_max').getInfo() or 0,
'image': img
})
if not daily_fires:
print("❌ No active fire pixels detected in period")
return None
daily_fires.sort(key=lambda x: x['date'])
first_fire = daily_fires[0]
last_fire = daily_fires[-1]
total_fire_days = len(daily_fires)
print(f"\n🔥 FIRE ACTIVITY SUMMARY:")
print(f" First detection: {first_fire['date_str']}")
print(f" Last detection: {last_fire['date_str']}")
print(f" Active fire days: {total_fire_days}")
print(f" Peak FRP: {max(d['frp_max'] for d in daily_fires):.1f} MW")
print(f" Total fire pixels detected: {sum(d['fire_pixels'] for d in daily_fires)}")
return {
'first_fire_date': first_fire['date'],
'last_fire_date': last_fire['date'],
'daily_activity': daily_fires,
'total_fire_days': total_fire_days,
'peak_frp': max(d['frp_max'] for d in daily_fires),
'collection': viirs
}
def extract_fire_coordinates(fire_data, roi_geom):
"""Extract geographic coordinates of all fire pixels."""
print(f"\n📍 EXTRACTING FIRE PIXEL COORDINATES...")
all_fire_points = []
fire_clusters = []
for day_info in fire_data['daily_activity']:
img = day_info['image']
date_str = day_info['date_str']
fire_mask = img.select('FireMask').gte(7)
frp = img.select('MaxFRP')
fire_samples = fire_mask.selfMask().addBands(frp).sample(
region=roi_geom,
scale=375,
geometries=True,
numPixels=10000
)
features = fire_samples.getInfo()['features']
for feature in features:
coords = feature['geometry']['coordinates']
props = feature['properties']
all_fire_points.append({
'type': 'Feature',
'geometry': {
'type': 'Point',
'coordinates': coords
},
'properties': {
'date': date_str,
'frp': props.get('MaxFRP', 0),
'fire_mask': props.get('FireMask', 0),
'longitude': coords[0],
'latitude': coords[1]
}
})
print(f"✓ Extracted {len(all_fire_points)} individual fire pixel locations")
if all_fire_points:
cluster_dict = {}
bin_size = 0.01
for point in all_fire_points:
lon, lat = point['geometry']['coordinates']
cluster_key = (round(lat / bin_size), round(lon / bin_size))
if cluster_key not in cluster_dict:
cluster_dict[cluster_key] = []
cluster_dict[cluster_key].append(point)
for cluster_id, (key, points) in enumerate(cluster_dict.items()):
lons = [p['geometry']['coordinates'][0] for p in points]
lats = [p['geometry']['coordinates'][1] for p in points]
centroid_lon = sum(lons) / len(lons)
centroid_lat = sum(lats) / len(lats)
max_frp = max(p['properties']['frp'] for p in points)
fire_clusters.append({
'cluster_id': cluster_id + 1,
'centroid': [centroid_lon, centroid_lat],
'num_detections': len(points),
'max_frp': max_frp,
'points': points
})
print(f"✓ Identified {len(fire_clusters)} distinct fire cluster zones")
if all_fire_points:
all_lons = [p['geometry']['coordinates'][0] for p in all_fire_points]
all_lats = [p['geometry']['coordinates'][1] for p in all_fire_points]
bbox = {
'min_lon': min(all_lons),
'max_lon': max(all_lons),
'min_lat': min(all_lats),
'max_lat': max(all_lats)
}
print(f"✓ Fire extent bounding box:")
print(f" Latitude: {bbox['min_lat']:.6f}° to {bbox['max_lat']:.6f}°")
print(f" Longitude: {bbox['min_lon']:.6f}° to {bbox['max_lon']:.6f}°")
else:
bbox = None
return {
'all_points': all_fire_points,
'clusters': fire_clusters,
'bounding_box': bbox
}
def save_geographic_data(geo_data, fire_data):
"""Save fire geographic data in multiple formats."""
print(f"\n💾 SAVING GEOGRAPHIC DATA...")
geojson = {
'type': 'FeatureCollection',
'features': geo_data['all_points'],
'properties': {
'incident_id': INCIDENT_ID,
'roi_name': ROI_NAME,
'generated': datetime.now(timezone.utc).isoformat(),
'first_fire': fire_data['first_fire_date'].isoformat(),
'last_fire': fire_data['last_fire_date'].isoformat(),
'total_fire_days': fire_data['total_fire_days']
}
}
geojson_file = GEOJSON_DIR / f"{INCIDENT_ID}_all_fire_points.geojson"
with open(geojson_file, 'w') as f:
json.dump(geojson, f, indent=2)
print(f"✓ Saved: {geojson_file.name}")
cluster_features = []
for cluster in geo_data['clusters']:
cluster_features.append({
'type': 'Feature',
'geometry': {
'type': 'Point',
'coordinates': cluster['centroid']
},
'properties': {
'cluster_id': cluster['cluster_id'],
'detections': cluster['num_detections'],
'max_frp_mw': round(cluster['max_frp'], 2)
}
})
clusters_geojson = {
'type': 'FeatureCollection',
'features': cluster_features
}
clusters_file = GEOJSON_DIR / f"{INCIDENT_ID}_fire_clusters.geojson"
with open(clusters_file, 'w') as f:
json.dump(clusters_geojson, f, indent=2)
print(f"✓ Saved: {clusters_file.name}")
csv_data = []
for point in geo_data['all_points']:
csv_data.append({
'date': point['properties']['date'],
'latitude': point['properties']['latitude'],
'longitude': point['properties']['longitude'],
'frp_mw': round(point['properties']['frp'], 2),
'fire_mask_value': point['properties']['fire_mask']
})
df = pd.DataFrame(csv_data)
csv_file = GEOJSON_DIR / f"{INCIDENT_ID}_fire_coordinates.csv"
df.to_csv(csv_file, index=False)
print(f"✓ Saved: {csv_file.name}")
if geo_data['bounding_box']:
bbox = geo_data['bounding_box']
kml = f"""<?xml version="1.0" encoding="UTF-8"?>
<kml xmlns="http://www.opengis.net/kml/2.2">
<Document>
<name>{INCIDENT_ID} - Fire Extent</name>
<Placemark>
<name>Fire Bounding Box</name>
<Style>
<LineStyle>
<color>ff0000ff</color>
<width>3</width>
</LineStyle>
</Style>
<Polygon>
<outerBoundaryIs>
<LinearRing>
<coordinates>
{bbox['min_lon']},{bbox['min_lat']},0
{bbox['max_lon']},{bbox['min_lat']},0
{bbox['max_lon']},{bbox['max_lat']},0
{bbox['min_lon']},{bbox['max_lat']},0
{bbox['min_lon']},{bbox['min_lat']},0
</coordinates>
</LinearRing>
</outerBoundaryIs>
</Polygon>
</Placemark>
</Document>
</kml>"""
kml_file = GEOJSON_DIR / f"{INCIDENT_ID}_fire_extent.kml"
with open(kml_file, 'w') as f:
f.write(kml)
print(f"✓ Saved: {kml_file.name}")
return geojson_file, clusters_file, csv_file
# ============================================================
# 4. ULTRA HIGH-RES HOTSPOT IMAGERY - TIMEZONE FIXED
# ============================================================
def generate_hotspot_zooms(fire_data, geo_data, roi_geom):
"""Generate ultra high-resolution before/after imagery - ALL FIXES APPLIED."""
print(f"\n🔍 GENERATING ULTRA HIGH-RESOLUTION FIRE HOTSPOT IMAGERY...")
clusters = geo_data['clusters']
if not clusters:
print("⚠ No fire clusters identified")
return
top_clusters = sorted(clusters, key=lambda x: x['num_detections'], reverse=True)[:5]
print(f"Processing top {len(top_clusters)} fire clusters...")
for cluster in top_clusters:
cluster_id = cluster['cluster_id']
centroid = cluster['centroid']
print(f"\n 🎯 Cluster #{cluster_id} (Centroid: {centroid[1]:.5f}°N, {centroid[0]:.5f}°E)")
print(f" Detections: {cluster['num_detections']} | Peak FRP: {cluster['max_frp']:.1f} MW")
point = ee.Geometry.Point(centroid)
hotspot_roi = point.buffer(HOTSPOT_BUFFER_METERS)
cluster_dates = sorted(set(p['properties']['date'] for p in cluster['points']))
# FIXED: Make datetimes timezone-aware
first_cluster_fire = datetime.strptime(cluster_dates[0], '%Y-%m-%d').replace(tzinfo=timezone.utc)
last_cluster_fire = datetime.strptime(cluster_dates[-1], '%Y-%m-%d').replace(tzinfo=timezone.utc)
# BEFORE imagery
before_end = first_cluster_fire - timedelta(days=1)
before_start = before_end - timedelta(days=30)
before_img, metadata = get_best_s2_image(
before_start.strftime('%Y-%m-%d'),
before_end.strftime('%Y-%m-%d'),
hotspot_roi
)
if before_img is not None:
arr_before = fetch_image_as_array(
before_img, hotspot_roi, VIS_TRUE_COLOR, dimensions=ULTRA_HIGHRES_DIM
)
if arr_before is not None:
img_annotated = add_professional_annotations(
arr_before,
title=f"BEFORE FIRE - Cluster #{cluster_id}",
subtitle=f"Period: {before_start.strftime('%Y-%m-%d')} to {before_end.strftime('%Y-%m-%d')}",
coords=f"{centroid[1]:.5f}°N, {centroid[0]:.5f}°E"
)
output_file = HOTSPOT_DIR / f"{INCIDENT_ID}_cluster{cluster_id}_BEFORE_8K.png"
Image.fromarray(img_annotated).save(output_file, optimize=True)
print(f" ✓ Saved BEFORE: {output_file.name}")
else:
print(f" ⚠ No BEFORE imagery available for cluster {cluster_id}")
# AFTER imagery (daily progression) - TIMEZONE FIXED
after_start = last_cluster_fire
after_end = min(last_cluster_fire + timedelta(days=14), datetime.now(timezone.utc)) # FIXED
current_date = after_start
day_count = 0
while current_date <= after_end and day_count < 14:
day_str = current_date.strftime('%Y-%m-%d')
next_day = current_date + timedelta(days=1)
next_str = next_day.strftime('%Y-%m-%d')
daily_img, metadata = get_best_s2_image(day_str, next_str, hotspot_roi)
if daily_img is not None:
arr_daily = fetch_image_as_array(
daily_img, hotspot_roi, VIS_TRUE_COLOR, dimensions=ULTRA_HIGHRES_DIM
)
if arr_daily is not None:
viirs_day = (ee.ImageCollection("NASA/VIIRS/002/VNP14A1")
.filterDate(day_str, next_str)
.filterBounds(hotspot_roi))
has_fire = viirs_day.size().getInfo() > 0
fire_indicator = "ACTIVE FIRE" if has_fire else "Post-fire"
img_annotated = add_professional_annotations(
arr_daily,
title=f"AFTER FIRE - Cluster #{cluster_id} - Day +{day_count}",
subtitle=f"Date: {day_str} | {fire_indicator}",
coords=f"{centroid[1]:.5f}°N, {centroid[0]:.5f}°E"
)
output_file = HOTSPOT_DIR / f"{INCIDENT_ID}_cluster{cluster_id}_AFTER_day{day_count:02d}_8K.png"
Image.fromarray(img_annotated).save(output_file, optimize=True)
print(f" ✓ Day +{day_count}: {output_file.name}")
current_date += timedelta(days=1)
day_count += 1
print(f"\n✓ Hotspot imagery complete - {len(top_clusters)} clusters processed")
def add_professional_annotations(img_array, title, subtitle, coords, logo_text="SATELLITE FIRE INTELLIGENCE"):
"""Add professional annotations to imagery."""
img = Image.fromarray(img_array)
draw = ImageDraw.Draw(img)
width, height = img.size
try:
font_title = ImageFont.truetype("arial.ttf", 60)
font_subtitle = ImageFont.truetype("arial.ttf", 40)
font_small = ImageFont.truetype("arial.ttf", 30)
except:
try:
font_title = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 60)
font_subtitle = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 40)
font_small = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 30)
except:
font_title = ImageFont.load_default()
font_subtitle = ImageFont.load_default()
font_small = ImageFont.load_default()
banner_height = 200
draw.rectangle([(0, 0), (width, banner_height)], fill=(0, 0, 0, 230))
clean_title = title.replace('🔥', 'FIRE')
draw.text((30, 30), clean_title, fill=(255, 255, 255), font=font_title)
draw.text((30, 100), subtitle, fill=(255, 200, 0), font=font_subtitle)
draw.text((30, 155), f"Location: {coords}", fill=(200, 200, 200), font=font_small)
info_height = 120
draw.rectangle([(0, height - info_height), (width, height)], fill=(0, 0, 0, 230))
timestamp = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')
draw.text((30, height - 90), logo_text, fill=(255, 100, 100), font=font_subtitle)
draw.text((30, height - 45), f"Generated: {timestamp} | {INCIDENT_ID}",
fill=(200, 200, 200), font=font_small)
scale_width = 300
scale_x = width - scale_width - 50
scale_y = height - 80
draw.rectangle([(scale_x, scale_y), (scale_x + scale_width, scale_y + 15)],
fill=(255, 255, 255))
draw.text((scale_x, scale_y + 20), f"{HOTSPOT_BUFFER_METERS * 2 / 1000:.1f} km approx",
fill=(255, 255, 255), font=font_small)
arrow_x = width - 100
arrow_y = 250
draw.text((arrow_x, arrow_y), "N ^", fill=(255, 255, 255), font=font_title)
return np.array(img)
# ============================================================
# 5. TIME SERIES
# ============================================================
def generate_enhanced_timeseries(fire_data, roi_geom):
"""Generate time series with multi-day windows."""
print(f"\n📊 GENERATING ENHANCED FIRE PROGRESSION TIME SERIES...")
daily_activity = fire_data['daily_activity']
if len(daily_activity) < 2:
print("⚠ Insufficient fire days for time series")
return
selected_days = []
selected_days.append(daily_activity[0])
selected_days.append(daily_activity[-1])
if len(daily_activity) > 2:
mid_idx = len(daily_activity) // 2
selected_days.insert(1, daily_activity[mid_idx])
sorted_by_frp = sorted(daily_activity, key=lambda x: x['frp_max'], reverse=True)
for day in sorted_by_frp[:2]:
if day not in selected_days:
selected_days.append(day)
selected_days.sort(key=lambda x: x['date'])
selected_days = selected_days[:6]
print(f"Selected {len(selected_days)} representative days for analysis")
fig, axes = plt.subplots(len(selected_days), 3, figsize=(24, 6 * len(selected_days)))
if len(selected_days) == 1:
axes = axes.reshape(1, -1)
for row_idx, day_info in enumerate(selected_days):
date = day_info['date']
date_str = day_info['date_str']
print(f"\n Processing {date_str} (FRP: {day_info['frp_max']:.1f} MW)...")
window_start = (date - timedelta(days=SNAPSHOT_WINDOW_DAYS)).strftime('%Y-%m-%d')
window_end = (date + timedelta(days=SNAPSHOT_WINDOW_DAYS + 1)).strftime('%Y-%m-%d')
day_img, day_count = get_s2_composite(window_start, window_end, roi_geom)
day_start = date_str
day_end = (date + timedelta(days=1)).strftime('%Y-%m-%d')
viirs_mask = (ee.ImageCollection("NASA/VIIRS/002/VNP14A1")
.filterDate(day_start, day_end)
.filterBounds(roi_geom)
.select('FireMask')
.max()
.gte(7)
.selfMask())
if day_img:
arr_rgb = fetch_image_as_array(day_img, roi_geom, VIS_TRUE_COLOR, dimensions=1200)
if arr_rgb is not None:
axes[row_idx, 0].imshow(arr_rgb)
axes[row_idx, 0].set_title(
f"{date_str}\nTrue Color RGB ({day_count} images)",
**FONT_TITLE
)
axes[row_idx, 0].axis('off')
if day_img:
arr_false = fetch_image_as_array(day_img, roi_geom, VIS_FALSE_COLOR, dimensions=1200)
arr_viirs = fetch_image_as_array(
viirs_mask.visualize(palette=['red'], min=0, max=1),
roi_geom, {}, dimensions=1200
)
if arr_false is not None:
axes[row_idx, 1].imshow(arr_false)
if arr_viirs is not None and arr_viirs.ndim >= 2:
fire_mask = arr_viirs[:, :, 0] > 50 if arr_viirs.ndim == 3 else arr_viirs > 50
axes[row_idx, 1].imshow(
np.ma.masked_where(~fire_mask, fire_mask),
cmap='autumn', alpha=0.7, interpolation='none'
)
axes[row_idx, 1].set_title(
f"{date_str}\nFalse Color + Fire Detection",
**FONT_TITLE
)
axes[row_idx, 1].axis('off')
if day_img:
thermal_vis = day_img.select('B11').visualize(**VIS_THERMAL)
arr_thermal = fetch_image_as_array(thermal_vis, roi_geom, {}, dimensions=1200)
if arr_thermal is not None:
axes[row_idx, 2].imshow(arr_thermal)
axes[row_idx, 2].set_title(
f"{date_str}\nThermal (SWIR Band 11)",
**FONT_TITLE
)
axes[row_idx, 2].text(
0.05, 0.95,
f"Peak FRP: {day_info['frp_max']:.1f} MW\n"
f"Fire pixels: {day_info['fire_pixels']}",
transform=axes[row_idx, 2].transAxes,
fontsize=12, color='white',
bbox=dict(boxstyle='round', facecolor='black', alpha=0.7),
verticalalignment='top'
)
axes[row_idx, 2].axis('off')
plt.suptitle(
f"{INCIDENT_ID} - FIRE PROGRESSION TIME SERIES\n"
f"{ROI_NAME} | {fire_data['first_fire_date'].strftime('%Y-%m-%d')} to "
f"{fire_data['last_fire_date'].strftime('%Y-%m-%d')} ({fire_data['total_fire_days']} active days)",
fontsize=20, fontweight='bold', y=0.995
)
plt.tight_layout(rect=[0, 0, 1, 0.99])
output_file = TIMESERIES_DIR / f"{INCIDENT_ID}_enhanced_timeseries.jpg"
plt.savefig(output_file, dpi=DPI_PRINT, bbox_inches='tight')
plt.close()
print(f"\n✓ Enhanced time series saved: {output_file.name}")
# ============================================================
# 6. dNBR REPORT
# ============================================================
def compute_dnbr(pre_img, post_img):
"""Compute NBR and dNBR burn severity."""
nir = 'B8'
swir2 = 'B12'
pre_nbr = pre_img.normalizedDifference([nir, swir2]).rename('NBR_pre')
post_nbr = post_img.normalizedDifference([nir, swir2]).rename('NBR_post')
dnbr = pre_nbr.subtract(post_nbr).rename('dNBR')
dnbr_class = (dnbr.where(dnbr.lt(-0.1), 0)
.where(dnbr.gte(-0.1).And(dnbr.lt(0.1)), 1)
.where(dnbr.gte(0.1).And(dnbr.lt(0.27)), 2)
.where(dnbr.gte(0.27).And(dnbr.lt(0.44)), 3)
.where(dnbr.gte(0.44).And(dnbr.lt(0.66)), 4)
.where(dnbr.gte(0.66), 5)
.rename('dNBR_class'))
return pre_nbr, post_nbr, dnbr, dnbr_class
def generate_master_report(fire_data, geo_data, roi_geom, pre_window, post_window):
"""Generate comprehensive master analytical report."""
print(f"\n📋 GENERATING MASTER ANALYTICAL REPORT...")
pre_img, pre_count = get_s2_composite(pre_window[0], pre_window[1], roi_geom)
post_img, post_count = get_s2_composite(post_window[0], post_window[1], roi_geom)
if not pre_img or not post_img:
print("❌ Cannot generate report: missing imagery")
return
pre_nbr, post_nbr, dnbr, dnbr_class = compute_dnbr(pre_img, post_img)
pixel_area = ee.Image.pixelArea()
class_names = {
0: "Regrowth/Unaffected",
1: "Unburned",
2: "Low Severity",
3: "Moderate-Low",
4: "Moderate-High",
5: "High Severity"
}
stats = {}
for class_val in range(6):
mask = dnbr_class.eq(class_val)
area_m2 = pixel_area.updateMask(mask).reduceRegion(
reducer=ee.Reducer.sum(),
geometry=roi_geom,
scale=30,
maxPixels=1e9
).get('area').getInfo() or 0
stats[class_val] = {
'name': class_names[class_val],
'area_km2': area_m2 / 1e6,
'area_ha': area_m2 / 1e4
}
fig = plt.figure(figsize=(28, 16))
gs = fig.add_gridspec(3, 4, hspace=0.3, wspace=0.25)
ax1 = fig.add_subplot(gs[0, :2])
arr_pre = fetch_image_as_array(pre_img, roi_geom, VIS_TRUE_COLOR, dimensions=2000)
if arr_pre is not None:
ax1.imshow(arr_pre)
ax1.set_title(f"PRE-FIRE BASELINE\n{pre_window[0]} to {pre_window[1]}\n({pre_count} S2 images)",
fontsize=16, fontweight='bold')
ax1.axis('off')
ax2 = fig.add_subplot(gs[0, 2:])
arr_post = fetch_image_as_array(post_img, roi_geom, VIS_FALSE_COLOR, dimensions=2000)
if arr_post is not None:
ax2.imshow(arr_post)
ax2.set_title(f"POST-FIRE STATE\n{post_window[0]} to {post_window[1]}\n({post_count} S2 images)",
fontsize=16, fontweight='bold', color='#c0392b')
ax2.axis('off')
ax3 = fig.add_subplot(gs[1, :3])
arr_dnbr = fetch_image_as_array(dnbr_class.visualize(**VIS_DNBR), roi_geom, {}, dimensions=2000)
if arr_dnbr is not None:
ax3.imshow(arr_dnbr)
ax3.set_title("BURN SEVERITY ANALYSIS (dNBR Classification)",
fontsize=18, fontweight='bold')
ax3.axis('off')
ax4 = fig.add_subplot(gs[1, 3])
ax4.axis('off')
stats_text = f"""
BURN SEVERITY STATISTICS
Total Analyzed Area:
{roi_geom.area().divide(1e6).getInfo():.2f} km²
━━━━━━━━━━━━━━━━━━━━━
"""
for class_val in sorted(stats.keys(), reverse=True):
if stats[class_val]['area_km2'] > 0.01:
stats_text += f"\n{class_names[class_val]}:\n"
stats_text += f" {stats[class_val]['area_km2']:.2f} km²\n"
stats_text += f" ({stats[class_val]['area_ha']:.1f} ha)\n"
total_burned = sum(stats[i]['area_km2'] for i in range(2, 6))
stats_text += f"\n━━━━━━━━━━━━━━━━━━━━━\n"
stats_text += f"\nTOTAL BURNED AREA:\n{total_burned:.2f} km²"
stats_text += f"\n\nFire Duration:\n{fire_data['total_fire_days']} days"
stats_text += f"\n\nPeak Intensity:\n{fire_data['peak_frp']:.1f} MW"
ax4.text(0.1, 0.95, stats_text, transform=ax4.transAxes,
fontsize=13, verticalalignment='top', family='monospace',
bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.8))
ax5 = fig.add_subplot(gs[2, :])
timeline_days = []
timeline_frp = []
for day in fire_data['daily_activity']:
timeline_days.append(day['date'])
timeline_frp.append(day['frp_max'])
ax5.bar(range(len(timeline_days)), timeline_frp, color=BRAND_COLOR, alpha=0.7)
ax5.set_xlabel('Days Since First Detection', fontsize=14, fontweight='bold')
ax5.set_ylabel('Peak Fire Radiative Power (MW)', fontsize=14, fontweight='bold')
ax5.set_title('FIRE INTENSITY PROGRESSION', fontsize=16, fontweight='bold')
ax5.grid(axis='y', alpha=0.3)
date_labels = [d.strftime('%m/%d') for d in timeline_days]
ax5.set_xticks(range(0, len(date_labels), max(1, len(date_labels) // 10)))
ax5.set_xticklabels([date_labels[i] for i in range(0, len(date_labels), max(1, len(date_labels) // 10))],
rotation=45)
fig.suptitle(
f"COMPREHENSIVE FIRE IMPACT ASSESSMENT\n"
f"{INCIDENT_ID} | {ROI_NAME}\n"
f"Analysis Period: {fire_data['first_fire_date'].strftime('%Y-%m-%d')} to "
f"{fire_data['last_fire_date'].strftime('%Y-%m-%d')}",
fontsize=22, fontweight='bold', y=0.98
)
footer_text = (f"Generated: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')} | "
f"Data: Sentinel-2 SR, VIIRS VNP14A1 | "
f"Fire Detections: {len(geo_data['all_points'])} pixels | "
f"Clusters: {len(geo_data['clusters'])}")
fig.text(0.5, 0.01, footer_text, ha='center', fontsize=10, style='italic')
output_file = REPORTS_DIR / f"{INCIDENT_ID}_master_report.jpg"
plt.savefig(output_file, dpi=DPI_PRINT, bbox_inches='tight')
plt.close()
print(f"✓ Master report saved: {output_file.name}")
return stats
# ============================================================
# 7. MAIN EXECUTION
# ============================================================
def main():
"""Main execution pipeline."""
print("\n" + "=" * 70)
print(" FIRE INTELLIGENCE SYSTEM - MEDIA-READY EDITION")
print("=" * 70)
print(f"\nIncident ID: {INCIDENT_ID}")
print(f"Target Area: {ROI_NAME}")
print(f"Analysis Initiated: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}")
setup_environment()
roi = ee.Geometry.Polygon(ROI_COORDS)
roi_area = roi.area().divide(1e6).getInfo()
print(f"\n📍 ROI Area: {roi_area:.2f} km²")
fire_data = detect_fire_activity_detailed(roi, MAX_LOOKBACK_DAYS)
if not fire_data:
print("\n❌ No fire activity detected. Exiting.")
return
geo_data = extract_fire_coordinates(fire_data, roi)
save_geographic_data(geo_data, fire_data)
first_fire = fire_data['first_fire_date']
last_fire = fire_data['last_fire_date']
pre_start = (first_fire - timedelta(days=PRE_DAYS_BEFORE_FIRE)).strftime('%Y-%m-%d')
pre_end = (first_fire - timedelta(days=PRE_DAYS_END_BEFORE_FIRE)).strftime('%Y-%m-%d')
post_start = last_fire.strftime('%Y-%m-%d')
post_end_date = min(last_fire + timedelta(days=POST_DAYS_AFTER_FIRE_END),
datetime.now(timezone.utc))
post_end = post_end_date.strftime('%Y-%m-%d')
print(f"\n📅 Analysis Windows:")
print(f" Pre-fire: {pre_start} to {pre_end}")
print(f" Post-fire: {post_start} to {post_end}")
generate_master_report(fire_data, geo_data, roi, (pre_start, pre_end), (post_start, post_end))
generate_enhanced_timeseries(fire_data, roi)
generate_hotspot_zooms(fire_data, geo_data, roi)
print("\n" + "=" * 70)
print("✅ ANALYSIS COMPLETE - ALL PRODUCTS GENERATED")
print("=" * 70)
print(f"\n📁 Output Directory: {OUTPUT_DIR.absolute()}")
print(f"\n📊 Generated Products:")
print(f" • Master Analytical Report (300 DPI)")
print(f" • Enhanced Fire Progression Time Series")
print(f" • Ultra High-Res Hotspot Imagery (8K)")
print(f" • GeoJSON Fire Point Data")
print(f" • Fire Cluster Analysis")
print(f" • CSV Coordinate Export")
print(f" • KML for Google Earth")
print(f"\n🔢 Key Statistics:")
print(f" • Fire Duration: {fire_data['total_fire_days']} days")
print(f" • Peak Intensity: {fire_data['peak_frp']:.1f} MW")
print(f" • Fire Detections: {len(geo_data['all_points'])} pixels")
print(f" • Fire Clusters: {len(geo_data['clusters'])} distinct zones")
bbox = geo_data['bounding_box']
if bbox:
print(f"\n📍 Fire Extent:")
print(f" • Latitude: {bbox['min_lat']:.6f}° to {bbox['max_lat']:.6f}°")
print(f" • Longitude: {bbox['min_lon']:.6f}° to {bbox['max_lon']:.6f}°")
print(f"\n⏱️ Completed: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}")
print("\n" + "=" * 70 + "\n")
if __name__ == "__main__":
main()


دیدگاهتان را بنویسید