Source code for smosaic.smosaic_clip_raster

import os
import numpy as np
import rasterio
from rasterio.mask import mask as rasterio_mask
from rasterio.warp import reproject, Resampling, calculate_default_transform
import pyproj
from pyproj import Transformer
from shapely.ops import transform
import shapely
from shapely.geometry import box

from smosaic.smosaic_utils import COVERAGE_PROJ, load_jsons


[docs] def clip_raster(input_raster_path, output_folder, clip_geometry, projection_output, output_filename=None, grid=None, tile_id=None): """ Extract a spatial subset from a raster file using a geometry or grid tile. Args: input_raster_path (str): Full path to the input raster file (GeoTIFF format). output_folder (str): Directory path where the clipped raster will be saved. clip_geometry (shapely.geometry): Spatial boundary for clipping. projection_output (int/str, optional): Output coordinate reference system. Options: - EPSG codes: 4326 (WGS84), 5880 (SIRGAS 2000 Brazil Polyconic) - BDC codes: "BDC" (Brazil Data Cube Standard Grid projection) Defaults to 4326. output_filename (str, optional): Custom name for output file (without directory path). If None, generated from input filename and clip parameters. Defaults to None. grid (str, optional): BDC grid system identifier. Supported grids include: - "BDC_SM_V2": Brazil Data Cube Small Grid. Defaults to None. tile_id (str, optional): BDC grid tile identifier (6-digit code, e.g., "020019"). Requires grid parameter. Defaults to None. Returns: str: Absolute path to the generated clipped raster file. """ using_grid = False if grid and tile_id: if grid == "BDC_SM_V2": bdc_grid_sm = load_jsons("BDC_SM_V2") for feature in bdc_grid_sm['features']: if feature['properties']['tile'] == tile_id: grid_geom = shapely.geometry.shape(feature['geometry']) clip_geometry = grid_geom using_grid = True break if projection_output == "BDC": target_crs = COVERAGE_PROJ else: target_crs = pyproj.CRS.from_epsg(projection_output) with rasterio.open(input_raster_path) as src: if not using_grid: proj_converter = Transformer.from_crs( pyproj.CRS.from_epsg(4326), src.crs, always_xy=True ).transform clip_geometry = transform(proj_converter, clip_geometry) out_image, out_transform = rasterio_mask( src, [shapely.geometry.mapping(clip_geometry)], crop=True, all_touched=True ) if using_grid: bounds = clip_geometry.bounds pixel_size_x = 10 pixel_size_y = -10 width = int((bounds[2] - bounds[0]) / pixel_size_x) height = int((bounds[3] - bounds[1]) / abs(pixel_size_y)) target_transform = rasterio.transform.from_origin( bounds[0], bounds[3], pixel_size_x, abs(pixel_size_y) ) resampled_data = np.zeros((out_image.shape[0], height, width), dtype=out_image.dtype) reproject( source=out_image, destination=resampled_data, src_transform=out_transform, src_crs=src.crs, dst_transform=target_transform, dst_crs=src.crs, resampling=Resampling.nearest ) output_data = resampled_data output_transform = target_transform output_width = width output_height = height output_crs = src.crs else: if src.crs != target_crs: target_transform, target_width, target_height = calculate_default_transform( src.crs, target_crs, out_image.shape[2], out_image.shape[1], *clip_geometry.bounds ) reprojected_data = np.zeros((out_image.shape[0], target_height, target_width), dtype=out_image.dtype) reproject( source=out_image, destination=reprojected_data, src_transform=out_transform, src_crs=src.crs, dst_transform=target_transform, dst_crs=target_crs, resampling=Resampling.nearest ) output_data = reprojected_data output_transform = target_transform output_width = target_width output_height = target_height output_crs = target_crs else: output_data = out_image output_transform = out_transform output_width = out_image.shape[2] output_height = out_image.shape[1] output_crs = src.crs os.makedirs(output_folder, exist_ok=True) output_path = os.path.join(output_folder, output_filename) out_meta = src.meta.copy() out_meta.update({ "height": output_height, "width": output_width, "transform": output_transform, "crs": output_crs }) with rasterio.open(output_path, "w", **out_meta) as dest: dest.write(output_data) os.remove(input_raster_path) return output_path