ETL con Python y geoinformación: pipelines de producción
Un pipeline de datos no es solo un script que mueve archivos. Es una cadena de decisiones técnicas que define si una organización puede confiar en sus indicadores, mapas y productos finales. Este artículo explica cómo diseñar uno que funcione en producción.
¿Qué hace robusto a un pipeline ETL?
La mayoría de los pipelines que he visto en proyectos reales fallan por la misma razón: no tienen reglas explícitas de validación y no registran qué pasó en cada etapa. Cuando algo falla, no hay forma de saber dónde ni por qué.
Un pipeline robusto tiene tres propiedades: verificabilidad (puedo saber si los datos son correctos), trazabilidad (puedo saber de dónde vino cada registro) y mantenibilidad (puedo modificarlo sin miedo a romper algo).
La arquitectura en capas
La decisión más importante en un pipeline es no sobrescribir nunca los datos de origen. La capa raw almacena una copia exacta de lo que llegó, con timestamp de ingesta. Si algo falla aguas abajo, puedes reprocesar sin volver a pedirle el archivo a la fuente.
La capa staging es donde ocurren las transformaciones: normalización de campos, corrección de encodings, conversión de sistemas de referencia, joins y agregaciones. Aquí también se registran los errores — no se descartan, se documentan.
La capa producción contiene solo registros que pasaron todas las validaciones. Es la única que consumen los sistemas de análisis, mapas y APIs.
Validación explícita en geoinformación
En datos geoespaciales, validar significa algo más que comprobar que un campo no sea nulo. Un polígono puede ser sintácticamente correcto pero topológicamente inválido (auto-intersección). Una coordenada puede estar dentro del rango numérico pero fuera del área de estudio. Un atributo puede existir pero no pertenecer al dominio permitido.
Implementación con Python y PostGIS
El siguiente patrón muestra una estructura de pipeline real con validación explícita y logging estructurado:
import logging
from datetime import datetime
from sqlalchemy import create_engine, text
import geopandas as gpd
from shapely.validation import make_valid
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s"
)
log = logging.getLogger(__name__)
ENGINE = create_engine("postgresql://user:pass@localhost/geodata")
def ingest_raw(filepath: str, layer: str) -> gpd.GeoDataFrame:
"""Carga el archivo original sin modificar y registra la ingesta."""
gdf = gpd.read_file(filepath, layer=layer)
gdf["_ingested_at"] = datetime.utcnow()
gdf["_source_file"] = filepath
gdf.to_postgis("raw_parcels", ENGINE, if_exists="append", index=False)
log.info("raw: %d registros ingestados desde %s", len(gdf), filepath)
return gdf
def validate(gdf: gpd.GeoDataFrame) -> tuple[gpd.GeoDataFrame, list[dict]]:
"""Aplica reglas de validación y separa registros válidos de errores."""
errors = []
valid_idx = []
for idx, row in gdf.iterrows():
record_id = row.get("id", idx)
geom = row.geometry
# Regla 1: geometría no nula
if geom is None or geom.is_empty:
errors.append({"id": record_id, "field": "geometry", "reason": "null_or_empty"})
continue
# Regla 2: geometría válida (auto-intersecciones, etc.)
if not geom.is_valid:
fixed = make_valid(geom)
if not fixed.is_valid:
errors.append({"id": record_id, "field": "geometry", "reason": "invalid_geometry"})
continue
gdf.at[idx, "geometry"] = fixed # corregir si es posible
# Regla 3: CRS correcto
if gdf.crs is None or gdf.crs.to_epsg() != 4326:
errors.append({"id": record_id, "field": "crs", "reason": f"expected_4326_got_{gdf.crs}"})
continue
# Regla 4: atributos obligatorios
for field in ("parcel_id", "land_use", "area_m2"):
if row.get(field) is None:
errors.append({"id": record_id, "field": field, "reason": "required_null"})
break
else:
valid_idx.append(idx)
log.info("validate: %d válidos, %d errores", len(valid_idx), len(errors))
return gdf.loc[valid_idx].copy(), errors
def load_staging(gdf: gpd.GeoDataFrame, errors: list[dict]) -> None:
"""Carga staging con datos válidos y registra errores."""
gdf.to_postgis("staging_parcels", ENGINE, if_exists="replace", index=False)
if errors:
import pandas as pd
err_df = pd.DataFrame(errors)
err_df["logged_at"] = datetime.utcnow()
err_df.to_sql("etl_errors", ENGINE, if_exists="append", index=False)
log.warning("staging: %d errores registrados en etl_errors", len(errors))
# Ejecución
raw = ingest_raw("parcelas_2026.gpkg", layer="parcelas")
valid_gdf, errs = validate(raw)
load_staging(valid_gdf, errs)
Trazabilidad: saber qué pasó con cada registro
El logging anterior registra los errores, pero la trazabilidad va más lejos: cada registro en producción debe poder rastrearse hasta su fuente original. Para eso, los campos _ingested_at y _source_file se propagan desde raw hasta producción.
En proyectos geoespaciales complejos, donde los datos pasan por múltiples transformaciones (cambios de CRS, dissolves, intersecciones espaciales), es útil guardar también el _etl_step — el nombre de la función o proceso que produjo el registro en su estado actual.
Reglas para un pipeline que dura
- Nunca sobrescribas raw. Es tu seguro de datos. Si algo falla, puedes reprocesar.
- Las reglas de validación son código, no comentarios. Escríbelas como funciones con nombre claro y prueba unitaria.
- Los errores se registran, no se ignoran. Un pipeline silencioso que descarta datos incorrectamente es más peligroso que uno que falla explícitamente.
- Cada etapa tiene entrada esperada y salida verificable. Define el schema de salida antes de escribir la transformación.
- PostGIS es tu aliado. Usa
ST_IsValid,ST_MakeValid,ST_Withiny restricciones CHECK en la base para validar en el momento de la carga.
Un pipeline así no es más lento ni más complejo que uno sin validación. Pero cuando algo falla — y siempre falla en algún momento — tienes exactamente la información que necesitas para entender qué pasó y cómo corregirlo.