22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287 | class GeoPackage:
"""A Python interface to a GeoPackage, an SQLite-format OGC standard for
geospatial vector data.
:param path: Path to a GeoPackage (ends with .gpkg). If the path does not
exist, it will be created.
"""
VERSION = 0
EMPTY = 1
def __init__(self, path: str = None):
if path is None:
# TODO: revisit this behavior. Creating a temporary file by default
# may be undesirable.
# Create a temporary path, get the name
_, path = tempfile.mkstemp(suffix=".gpkg")
self.path = str(path)
# Delete the path to prepare for fresh db
os.remove(path)
else:
self.path = path
self._setup_database()
self.feature_tables = {}
# Instantiate FeatureTables that already exist in the db
with self.connect() as conn:
query_result = conn.execute(
"SELECT table_name, srs_id FROM gpkg_contents"
)
table_rows = list(query_result)
for row in table_rows:
table_name = row["table_name"]
with self.connect() as conn:
geom_type_query = conn.execute(
"""
SELECT geometry_type_name
FROM gpkg_geometry_columns
WHERE table_name = ?
""",
(table_name,),
)
geom_type = next(geom_type_query)["geometry_type_name"]
enum_geom_type = getattr(GeoPackageGeoms, geom_type)
self.feature_tables[table_name] = FeatureTable(
self, table_name, enum_geom_type, srid=row["srs_id"]
)
def add_feature_table(
self, name: str, geom_type: GeoPackageGeoms, srid: int = 4326
) -> FeatureTable:
"""Create a new layer (feature table) in the GeoPackage.
:param name: Name of the new layer.
:param geom_type: Geometry type of the new layer.
:srid: SRID (projection).
"""
table = FeatureTable(self, name, geom_type, srid=srid)
table.create_tables()
self.feature_tables[name] = table
return table
def drop_feature_table(self, name: str) -> None:
"""Delete a layer (feature table) by name.
:param name: Name of the layer.
"""
table = self.feature_tables.pop(name)
table.drop_tables()
def _is_connected(self) -> bool:
try:
self.conn.cursor()
return True
except Exception:
return False
def _get_connection(self) -> None:
if not self._is_connected():
conn = sqlite3.connect(self.path, uri=True, isolation_level=None)
# Spatialite used for rtree-based functions (MinX, etc). Can
# eventually replace or make configurable with other extensions.
conn.load_extension("mod_spatialite.so")
conn.row_factory = self._dict_factory
self.conn = conn
@contextlib.contextmanager
def connect(self) -> Generator[sqlite3.Connection, None, None]:
self._get_connection()
yield self.conn
def _setup_database(self) -> None:
if self._is_empty_database():
self._create_database()
def _is_empty_database(self) -> bool:
is_empty = True
with self.connect() as conn:
query_result = conn.execute(
"SELECT name FROM sqlite_master WHERE type = 'table'"
)
try:
next(query_result)
is_empty = False
except StopIteration:
is_empty = True
return is_empty
def _create_database(self) -> None:
with self.connect() as conn:
# Set the format metadata
conn.execute(f"PRAGMA application_id = {GPKG_APPLICATION_ID}")
conn.execute(f"PRAGMA user_version = {GPKG_USER_VERSION}")
# Create gpkg_contents table
conn.execute(
"""
CREATE TABLE IF NOT EXISTS gpkg_contents (
table_name TEXT,
data_type TEXT NOT NULL,
identifier TEXT UNIQUE,
description TEXT DEFAULT '',
last_change TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
min_x DOUBLE,
min_y DOUBLE,
max_x DOUBLE,
max_y DOUBLE,
srs_id INTEGER,
PRIMARY KEY (table_name)
)
"""
)
# Create gpkg_extensions table
conn.execute(
"""
CREATE TABLE IF NOT EXISTS gpkg_extensions(
table_name TEXT,
column_name TEXT,
extension_name TEXT NOT NULL,
definition TEXT NOT NULL,
scope TEXT NOT NULL,
UNIQUE (table_name, column_name, extension_name)
)
"""
)
# Create gpkg_geometry_columns table
conn.execute(
"""
CREATE TABLE IF NOT EXISTS gpkg_geometry_columns(
table_name TEXT UNIQUE NOT NULL,
column_name TEXT NOT NULL,
geometry_type_name TEXT NOT NULL,
srs_id INTEGER NOT NULL,
z TINYINT NOT NULL,
m TINYINT NOT NULL,
PRIMARY KEY (table_name, column_name)
)
"""
)
# Create gpkg_ogr_contents table
conn.execute(
"""
CREATE TABLE IF NOT EXISTS gpkg_ogr_contents(
table_name TEXT NOT NULL,
feature_count INTEGER DEFAULT NULL,
PRIMARY KEY (table_name)
)
"""
)
# Create gpkg_spatial_ref_sys
conn.execute(
"""
CREATE TABLE IF NOT EXISTS gpkg_spatial_ref_sys(
srs_name TEXT NOT NULL,
srs_id INTEGER NOT NULL,
organization TEXT NOT NULL,
organization_coordsys_id INTEGER NOT NULL,
definition TEXT NOT NULL,
description TEXT,
PRIMARY KEY (srs_id)
)
"""
)
def copy(self, path: str) -> GeoPackage:
"""Copies the current GeoPackage to a new location and returns a new instance
of a GeoPackage. A convenient way to create an in-memory GeoPackage, as
path can be any SQLite-compatible connection string, including
:memory:.
:param path: Path to the new database. Any SQLite connection string can
be used.
"""
# TODO: catch the "memory" string and ensure that it includes a name
# and shared cache. Our strategy requires reconnecting to the db,
# so it must persist in memory.
new_conn = sqlite3.connect(path)
new_conn.enable_load_extension(True)
# Spatialite used for rtree-based functions (MinX, etc). Can eventually
# replace or make configurable with other extensions.
new_conn.load_extension("mod_spatialite.so")
with self.connect() as conn:
# Set row_factory to none for iterdumping
conn.row_factory = None
# Copy over all tables but not indices
for line in conn.iterdump():
# Skip all index creation - these should be recreated
# afterwards
if "CREATE TABLE" in line or "INSERT INTO" in line:
# TODO: derive index names from metadata table instead
if "idx_" in line:
continue
if "rtree_" in line:
continue
if "COMMIT" in line:
continue
new_conn.cursor().executescript(line)
# Copy over all indices
for line in conn.iterdump():
# Recreate the indices
if "CREATE TABLE" in line or "INSERT INTO" in line:
if "idx_" in line:
new_conn.cursor().executescript(line)
if "COMMIT" in line:
continue
# TODO: rtree strategy is different? Why?
# for line in conn.iterdump():
# # Recreate the indices
# if "CREATE TABLE" in line or "INSERT INTO" in line:
# if "rtree_" in line:
# new_conn.cursor().executescript(line)
# if "COMMIT" in line:
# continue
conn.row_factory = self._dict_factory
new_db = GeoPackage(path)
return new_db
# TODO: Instead of 'Any', use Python types that can be deserialized from
# sqlite
@staticmethod
def _dict_factory(
cursor: sqlite3.Cursor, row: sqlite3.Row
) -> Dict[str, Any]:
return {col[0]: row[idx] for idx, col in enumerate(cursor.description)}
|