Source code for aequilibrae.utils.geo_index

import warnings
from typing import Union, List

from shapely.geometry import Point, Polygon, LineString, MultiPoint, MultiPolygon, MultiLineString
from shapely.wkb import loads

from aequilibrae.utils.qgis_utils import inside_qgis, rtree_avail

if inside_qgis:
    from qgis.core import QgsSpatialIndex as Index
    from qgis.core import QgsGeometry, QgsFeature
else:
    from rtree import Index


[docs] class GeoIndex: """Implements a generic GeoIndex class that uses the QGIS index when using the GUI and RTree otherwise""" def __init__(self): self.idx = Index() self.built = False
[docs] def build_from_layer(self, layer) -> dict: if inside_qgis: warnings.warn("This method works inside QGIS only") self.built = True self.idx = Index(layer.getFeatures()) return {f.id(): loads(f.geometry().asWkb().data()) for f in layer.getFeatures()}
[docs] def insert( self, feature_id: int, geometry: Union[Point, Polygon, LineString, MultiPoint, MultiPolygon, MultiLineString], ) -> None: """Inserts a valid shapely geometry in the index :Arguments: **feature_id** (:obj:`int`): ID of the geometry being inserted **geo** (:obj:`Shapely.geometry`): Any valid shapely geometry """ self.built = True if inside_qgis: g = QgsGeometry() g.fromWkb(geometry.wkb) feature = QgsFeature() feature.setGeometry(g) feature.setId(feature_id) self.idx.addFeature(feature) elif rtree_avail: self.idx.insert(feature_id, geometry.bounds) else: warnings.warn("You need RTREE to build a spatial index")
[docs] def nearest(self, geo: Union[Point, Polygon, LineString, MultiPoint, MultiPolygon], num_results) -> List[int]: """Finds nearest neighbor for a given geometry :Arguments: **geo** (:obj:`Shapely geometry`): Any valid shapely geometry **num_results** (:obj:`int`): A positive integer for the number of neighbors to return :Returns: **neighbors** (:obj:`List[int]`): List of IDs of the closest neighbors in the index """ if inside_qgis: g = QgsGeometry() g.fromWkb(geo.wkb) return self.idx.nearestNeighbor(g, num_results) elif rtree_avail: return self.idx.nearest(geo.bounds, num_results) else: warnings.warn("You need RTREE to build a spatial index")
[docs] def delete(self, feature_id, geometry: Union[Point, Polygon, LineString, MultiPoint, MultiPolygon]): if inside_qgis: g = QgsGeometry() g.fromWkb(geometry.wkb) feature = QgsFeature() feature.setGeometry(g) feature.setId(feature_id) self.idx.deleteFeature(feature) elif rtree_avail: self.idx.delete(feature_id, geometry.bounds) else: warnings.warn("You need RTREE to build a spatial index")
[docs] def reset(self): self.idx = Index() self.built = False