EstevezAlvarez
ETL Python PostgreSQL PostGIS Geoinformación

ETL con Python y geoinformación: pipelines de producción

Un pipeline de datos no es solo un script que mueve archivos. Es una cadena de decisiones técnicas que define si una organización puede confiar en sus indicadores, mapas y productos finales. Este artículo explica cómo diseñar uno que funcione en producción.

¿Qué hace robusto a un pipeline ETL?

La mayoría de los pipelines que he visto en proyectos reales fallan por la misma razón: no tienen reglas explícitas de validación y no registran qué pasó en cada etapa. Cuando algo falla, no hay forma de saber dónde ni por qué.

Un pipeline robusto tiene tres propiedades: verificabilidad (puedo saber si los datos son correctos), trazabilidad (puedo saber de dónde vino cada registro) y mantenibilidad (puedo modificarlo sin miedo a romper algo).

RAW Datos originales sin modificar validar STAGING Datos limpios transformados + log cargar PRODUCCIÓN Datos listos para análisis 📥 Fuentes externas 📊 APIs / QGIS / BI
Figura 1 — Arquitectura en tres capas. Cada capa tiene un propósito único: raw conserva los datos originales intactos, staging aplica transformaciones auditadas, producción expone solo datos validados.

La arquitectura en capas

La decisión más importante en un pipeline es no sobrescribir nunca los datos de origen. La capa raw almacena una copia exacta de lo que llegó, con timestamp de ingesta. Si algo falla aguas abajo, puedes reprocesar sin volver a pedirle el archivo a la fuente.

La capa staging es donde ocurren las transformaciones: normalización de campos, corrección de encodings, conversión de sistemas de referencia, joins y agregaciones. Aquí también se registran los errores — no se descartan, se documentan.

La capa producción contiene solo registros que pasaron todas las validaciones. Es la única que consumen los sistemas de análisis, mapas y APIs.

Validación explícita en geoinformación

En datos geoespaciales, validar significa algo más que comprobar que un campo no sea nulo. Un polígono puede ser sintácticamente correcto pero topológicamente inválido (auto-intersección). Una coordenada puede estar dentro del rango numérico pero fuera del área de estudio. Un atributo puede existir pero no pertenecer al dominio permitido.

Registro entrada Regla validación ✓ OK Staging registro aceptado ✗ Error Log de errores id + campo + motivo
Figura 2 — Ciclo de validación. Cada registro pasa por una regla: si es válido va a staging, si falla se registra en el log con ID del registro, campo problemático y motivo exacto.

Implementación con Python y PostGIS

El siguiente patrón muestra una estructura de pipeline real con validación explícita y logging estructurado:

import logging
from datetime import datetime
from sqlalchemy import create_engine, text
import geopandas as gpd
from shapely.validation import make_valid

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s %(message)s"
)
log = logging.getLogger(__name__)

ENGINE = create_engine("postgresql://user:pass@localhost/geodata")

def ingest_raw(filepath: str, layer: str) -> gpd.GeoDataFrame:
    """Carga el archivo original sin modificar y registra la ingesta."""
    gdf = gpd.read_file(filepath, layer=layer)
    gdf["_ingested_at"] = datetime.utcnow()
    gdf["_source_file"] = filepath
    gdf.to_postgis("raw_parcels", ENGINE, if_exists="append", index=False)
    log.info("raw: %d registros ingestados desde %s", len(gdf), filepath)
    return gdf

def validate(gdf: gpd.GeoDataFrame) -> tuple[gpd.GeoDataFrame, list[dict]]:
    """Aplica reglas de validación y separa registros válidos de errores."""
    errors = []
    valid_idx = []

    for idx, row in gdf.iterrows():
        record_id = row.get("id", idx)
        geom = row.geometry

        # Regla 1: geometría no nula
        if geom is None or geom.is_empty:
            errors.append({"id": record_id, "field": "geometry", "reason": "null_or_empty"})
            continue

        # Regla 2: geometría válida (auto-intersecciones, etc.)
        if not geom.is_valid:
            fixed = make_valid(geom)
            if not fixed.is_valid:
                errors.append({"id": record_id, "field": "geometry", "reason": "invalid_geometry"})
                continue
            gdf.at[idx, "geometry"] = fixed  # corregir si es posible

        # Regla 3: CRS correcto
        if gdf.crs is None or gdf.crs.to_epsg() != 4326:
            errors.append({"id": record_id, "field": "crs", "reason": f"expected_4326_got_{gdf.crs}"})
            continue

        # Regla 4: atributos obligatorios
        for field in ("parcel_id", "land_use", "area_m2"):
            if row.get(field) is None:
                errors.append({"id": record_id, "field": field, "reason": "required_null"})
                break
        else:
            valid_idx.append(idx)

    log.info("validate: %d válidos, %d errores", len(valid_idx), len(errors))
    return gdf.loc[valid_idx].copy(), errors

def load_staging(gdf: gpd.GeoDataFrame, errors: list[dict]) -> None:
    """Carga staging con datos válidos y registra errores."""
    gdf.to_postgis("staging_parcels", ENGINE, if_exists="replace", index=False)

    if errors:
        import pandas as pd
        err_df = pd.DataFrame(errors)
        err_df["logged_at"] = datetime.utcnow()
        err_df.to_sql("etl_errors", ENGINE, if_exists="append", index=False)
        log.warning("staging: %d errores registrados en etl_errors", len(errors))

# Ejecución
raw = ingest_raw("parcelas_2026.gpkg", layer="parcelas")
valid_gdf, errs = validate(raw)
load_staging(valid_gdf, errs)

Trazabilidad: saber qué pasó con cada registro

El logging anterior registra los errores, pero la trazabilidad va más lejos: cada registro en producción debe poder rastrearse hasta su fuente original. Para eso, los campos _ingested_at y _source_file se propagan desde raw hasta producción.

En proyectos geoespaciales complejos, donde los datos pasan por múltiples transformaciones (cambios de CRS, dissolves, intersecciones espaciales), es útil guardar también el _etl_step — el nombre de la función o proceso que produjo el registro en su estado actual.

Reglas para un pipeline que dura

Un pipeline así no es más lento ni más complejo que uno sin validación. Pero cuando algo falla — y siempre falla en algún momento — tienes exactamente la información que necesitas para entender qué pasó y cómo corregirlo.