Line data Source code
1 : //
2 : // cloudsync_changes_sqlite.c
3 : // cloudsync
4 : //
5 : // Created by Marco Bambini on 23/09/24.
6 : //
7 :
8 : #include <stdio.h>
9 : #include <string.h>
10 :
11 : #include "cloudsync_changes_sqlite.h"
12 : #include "../utils.h"
13 : #include "../dbutils.h"
14 :
15 : #ifndef SQLITE_CORE
16 : SQLITE_EXTENSION_INIT3
17 : #endif
18 :
19 : typedef struct cloudsync_changes_vtab {
20 : sqlite3_vtab base; // base class, must be first
21 : sqlite3 *db;
22 : cloudsync_context *data;
23 : } cloudsync_changes_vtab;
24 :
25 : typedef struct cloudsync_changes_cursor {
26 : sqlite3_vtab_cursor base; // base class, must be first
27 : cloudsync_changes_vtab *vtab;
28 : sqlite3_stmt *vm; // prepared statement
29 : } cloudsync_changes_cursor;
30 :
31 : char *cloudsync_changes_columns[] = {"tbl", "pk", "col_name", "col_value", "col_version", "db_version", "site_id", "cl", "seq"};
32 : #define COLNAME_FROM_INDEX(i) cloudsync_changes_columns[i]
33 : #define COL_TBL_INDEX 0
34 : #define COL_PK_INDEX 1
35 : #define COL_NAME_INDEX 2
36 : #define COL_VALUE_INDEX 3
37 : #define COL_VERSION_INDEX 4
38 : #define COL_DBVERSION_INDEX 5
39 : #define COL_SITEID_INDEX 6
40 : #define COL_CL_INDEX 7
41 : #define COL_SEQ_INDEX 8
42 :
43 : #if CLOUDSYNC_UNITTEST
44 : bool force_vtab_filter_abort = false;
45 : #define CHECK_VFILTERTEST_ABORT() if (force_vtab_filter_abort) rc = SQLITE_ERROR
46 : #else
47 : #define CHECK_VFILTERTEST_ABORT()
48 : #endif
49 :
50 : // MARK: -
51 :
52 2 : int vtab_set_error (sqlite3_vtab *vtab, const char *format, ...) {
53 : va_list arg;
54 2 : va_start (arg, format);
55 2 : char *err = sqlite3_vmprintf(format, arg);
56 2 : va_end (arg);
57 :
58 2 : if (vtab->zErrMsg) sqlite3_free(vtab->zErrMsg);
59 2 : vtab->zErrMsg = err;
60 2 : return SQLITE_ERROR;
61 : }
62 :
63 552 : const char *vtab_opname_from_value (int value) {
64 552 : switch (value) {
65 536 : case SQLITE_INDEX_CONSTRAINT_EQ: return "=";
66 5 : case SQLITE_INDEX_CONSTRAINT_GT: return ">";
67 1 : case SQLITE_INDEX_CONSTRAINT_LE: return "<=";
68 1 : case SQLITE_INDEX_CONSTRAINT_LT: return "<";
69 1 : case SQLITE_INDEX_CONSTRAINT_GE: return ">=";
70 1 : case SQLITE_INDEX_CONSTRAINT_LIKE: return "LIKE";
71 1 : case SQLITE_INDEX_CONSTRAINT_GLOB: return "GLOB";
72 1 : case SQLITE_INDEX_CONSTRAINT_NE: return "!=";
73 1 : case SQLITE_INDEX_CONSTRAINT_ISNOT: return "IS NOT";
74 1 : case SQLITE_INDEX_CONSTRAINT_ISNOTNULL: return "IS NOT NULL";
75 1 : case SQLITE_INDEX_CONSTRAINT_ISNULL: return "IS NULL";
76 1 : case SQLITE_INDEX_CONSTRAINT_IS: return "IS";
77 :
78 : // The REGEXP operator is a special syntax for the regexp() user function.
79 : // No regexp() user function is defined by default and so use of the REGEXP operator will normally result in an error message.
80 : // If an application-defined SQL function named "regexp" is added at run-time, then the "X REGEXP Y" operator will be implemented as
81 : // a call to "regexp(Y,X)".
82 : // case SQLITE_INDEX_CONSTRAINT_REGEXP: return "REGEX";
83 :
84 : // MATCH is only valid for virtual FTS tables
85 : // The MATCH operator is a special syntax for the match() application-defined function.
86 : // The default match() function implementation raises an exception and is not really useful for anything.
87 : // But extensions can override the match() function with more helpful logic.
88 : // case SQLITE_INDEX_CONSTRAINT_MATCH: return "MATCH";
89 : }
90 1 : return NULL;
91 552 : }
92 :
93 10 : int vtab_colname_is_legal (const char *name) {
94 10 : int count = sizeof(cloudsync_changes_columns) / sizeof (char *);
95 :
96 72 : for (int i=0; i<count; ++i) {
97 71 : if (strcasecmp(cloudsync_changes_columns[i], name) == 0) return 1;
98 62 : }
99 1 : return 0;
100 10 : }
101 :
102 766 : char *vtab_build_changes_sql (cloudsync_context *data, const char *idxs) {
103 : DEBUG_VTAB("build_changes_sql");
104 :
105 : /*
106 : * This SQLite query dynamically generates and executes a consolidated query
107 : * to fetch changes from all tables related to cloud synchronization.
108 : *
109 : * It works in the following steps:
110 : *
111 : * 1. `table_names` CTE: Retrieves all table names in the database that end
112 : * with '_cloudsync' and extracts the base table name (without the '_cloudsync' suffix).
113 : *
114 : * 2. `changes_query` CTE: Constructs individual SELECT statements for each
115 : * cloud sync table, fetching data about changes in columns:
116 : * - `pk`: Primary key of the table.
117 : * - `col_name`: Name of the changed column.
118 : * - `col_version`: Version of the changed column.
119 : * - `db_version`: Database version when the change was recorded.
120 : * - `site_id`: Site identifier associated with the change.
121 : * - `cl`: Coalesced version (either `t2.col_version` or 1 if `t2.col_version` is NULL).
122 : * - `seq`: Sequence number of the change.
123 : * Each query is constructed by joining the table with `cloudsync_site_id`
124 : * for resolving the site ID and performing a LEFT JOIN with itself to
125 : * identify columns with NULL values in `t2.col_name`.
126 : *
127 : * 3. `union_query` CTE: Combines all the constructed SELECT statements
128 : * from `changes_query` into a single query using `UNION ALL`.
129 : *
130 : * 4. `final_query` CTE: Wraps the combined UNION ALL query into another
131 : * SELECT statement, preparing for additional filtering.
132 : *
133 : * 5. Final SELECT: Adds a WHERE clause to filter records with `db_version`
134 : * greater than a specified value (using a placeholder `?`), and orders
135 : * the results by `db_version` and `seq`.
136 : *
137 : * The overall result is a consolidated view of changes across all
138 : * cloud sync tables, filtered and ordered based on the `db_version` and
139 : * `seq` fields.
140 : */
141 :
142 766 : const char *query =
143 : "WITH table_names AS ( "
144 : " SELECT format('%q',SUBSTR(tbl_name, 1, LENGTH(tbl_name) - 10)) AS table_name_literal, format('%w',SUBSTR(tbl_name, 1, LENGTH(tbl_name) - 10)) AS table_name_identifier, format('%w',tbl_name) AS table_meta "
145 : " FROM sqlite_master "
146 : " WHERE type = 'table' AND tbl_name LIKE '%_cloudsync' "
147 : "), "
148 : "changes_query AS ( "
149 : " SELECT "
150 : " 'SELECT "
151 : " ''' || \"table_name_literal\" || ''' AS tbl, "
152 : " t1.pk AS pk, "
153 : " t1.col_name AS col_name, "
154 : " cloudsync_col_value(''' || \"table_name_literal\" || ''', t1.col_name, t1.pk) AS col_value, "
155 : " t1.col_version AS col_version, "
156 : " t1.db_version AS db_version, "
157 : " site_tbl.site_id AS site_id, "
158 : " t1.seq AS seq, "
159 : " COALESCE(t2.col_version, 1) AS cl "
160 : " FROM \"' || \"table_meta\" || '\" AS t1 "
161 : " LEFT JOIN cloudsync_site_id AS site_tbl ON t1.site_id = site_tbl.rowid "
162 : " LEFT JOIN \"' || \"table_meta\" || '\" AS t2 ON t1.pk = t2.pk AND t2.col_name = ''" CLOUDSYNC_TOMBSTONE_VALUE "'' "
163 : " WHERE col_value IS NOT ''" CLOUDSYNC_RLS_RESTRICTED_VALUE "''' "
164 : " AS query_string FROM table_names "
165 : "), "
166 : "union_query AS ( "
167 : " SELECT GROUP_CONCAT(query_string, ' UNION ALL ') AS union_query FROM changes_query "
168 : "), "
169 : "final_query AS ( "
170 : " SELECT "
171 : " 'SELECT tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq FROM (' || union_query || ')' "
172 : " AS final_string FROM union_query "
173 : ") "
174 : "SELECT final_string || ' ";
175 766 : const char *final_query = ";' FROM final_query;";
176 :
177 : static size_t query_len = 0;
178 : static size_t final_query_len = 0;
179 766 : if (query_len == 0) query_len = strlen(query);
180 766 : if (final_query_len == 0) final_query_len = strlen(final_query);
181 :
182 766 : size_t idx_len = strlen(idxs);
183 766 : size_t blen = query_len + idx_len + final_query_len + 128;
184 766 : char *sql = (char *)cloudsync_memory_alloc((sqlite3_uint64)blen);
185 766 : if (!sql) return NULL;
186 :
187 : // build final sql statement taking in account the dynamic idxs string provided by the user
188 766 : memcpy(sql, query, query_len);
189 766 : memcpy(sql + (query_len), idxs, idx_len);
190 766 : memcpy(sql + (query_len + idx_len), final_query, final_query_len+1);
191 :
192 766 : char *value = NULL;
193 766 : int rc = database_select_text(data, sql, &value);
194 766 : cloudsync_memory_free(sql);
195 :
196 766 : return (rc == DBRES_OK) ? value : NULL;
197 766 : }
198 :
199 : // MARK: -
200 :
201 199 : int cloudsync_changesvtab_connect (sqlite3 *db, void *aux, int argc, const char *const *argv, sqlite3_vtab **vtab, char **err) {
202 : DEBUG_VTAB("cloudsync_changesvtab_connect");
203 :
204 199 : int rc = sqlite3_declare_vtab(db, "CREATE TABLE x (tbl TEXT NOT NULL, pk BLOB NOT NULL, col_name TEXT NOT NULL,"
205 : "col_value TEXT, col_version INTEGER NOT NULL, db_version INTEGER NOT NULL,"
206 : "site_id BLOB NOT NULL, cl INTEGER NOT NULL, seq INTEGER NOT NULL);");
207 199 : if (rc == SQLITE_OK) {
208 : // memory internally managed by SQLite, so I cannot use memory_alloc here
209 199 : cloudsync_changes_vtab *vnew = sqlite3_malloc64(sizeof(cloudsync_changes_vtab));
210 199 : if (vnew == NULL) return SQLITE_NOMEM;
211 :
212 199 : memset(vnew, 0, sizeof(cloudsync_changes_vtab));
213 199 : vnew->db = db;
214 199 : vnew->data = aux;
215 :
216 199 : *vtab = (sqlite3_vtab *)vnew;
217 199 : }
218 :
219 199 : return rc;
220 199 : }
221 :
222 199 : int cloudsync_changesvtab_disconnect (sqlite3_vtab *vtab) {
223 : DEBUG_VTAB("cloudsync_changesvtab_disconnect");
224 :
225 199 : cloudsync_changes_vtab *p = (cloudsync_changes_vtab *)vtab;
226 199 : sqlite3_free(p);
227 199 : return SQLITE_OK;
228 : }
229 :
230 766 : int cloudsync_changesvtab_open (sqlite3_vtab *vtab, sqlite3_vtab_cursor **pcursor) {
231 : DEBUG_VTAB("cloudsync_changesvtab_open");
232 :
233 766 : cloudsync_changes_cursor *cursor = cloudsync_memory_alloc(sizeof(cloudsync_changes_cursor));
234 766 : if (cursor == NULL) return SQLITE_NOMEM;
235 :
236 766 : memset(cursor, 0, sizeof(cloudsync_changes_cursor));
237 766 : cursor->vtab = (cloudsync_changes_vtab *)vtab;
238 :
239 766 : *pcursor = (sqlite3_vtab_cursor *)cursor;
240 766 : return SQLITE_OK;
241 766 : }
242 :
243 766 : int cloudsync_changesvtab_close (sqlite3_vtab_cursor *cursor) {
244 : DEBUG_VTAB("cloudsync_changesvtab_close");
245 :
246 766 : cloudsync_changes_cursor *c = (cloudsync_changes_cursor *)cursor;
247 766 : if (c->vm) {
248 2 : sqlite3_finalize(c->vm);
249 2 : c->vm = NULL;
250 2 : }
251 :
252 766 : cloudsync_memory_free(cursor);
253 766 : return SQLITE_OK;
254 : }
255 :
256 767 : int cloudsync_changesvtab_best_index (sqlite3_vtab *vtab, sqlite3_index_info *idxinfo) {
257 : DEBUG_VTAB("cloudsync_changesvtab_best_index");
258 :
259 : // the goal here is to build a WHERE clause and gives an estimate
260 : // of the cost of that clause
261 :
262 : // we'll incrementally build the clause and we avoid the realloc
263 : // of memory, so perform a quick loop to estimate memory usage
264 :
265 : // count the number of contrainst and order by clauses
266 767 : int count1 = idxinfo->nConstraint;
267 767 : int count2 = idxinfo->nOrderBy;
268 :
269 : // col_version is the longest column name (11)
270 : // +1 for the space
271 : // IS NOT NULL is the longest constraint value (11)
272 : // +1 for the space
273 : // +1 for the ? character
274 : // +5 for space AND space
275 : // +512 for the extra space and for the WHERE and ORDER BY literals
276 :
277 : // memory internally manager by SQLite, so I cannot use memory_alloc here
278 767 : size_t slen = (count1 * (11 + 1 + 11 + 1 + 5)) + (count2 * 11 + 1 + 5) + 512;
279 767 : char *s = (char *)sqlite3_malloc64((sqlite3_uint64)slen);
280 767 : if (!s) return SQLITE_NOMEM;
281 767 : size_t sindex= 0;
282 :
283 767 : int idxnum = 0;
284 767 : int arg_index = 1;
285 767 : int orderconsumed = 1;
286 :
287 : // is there a WHERE clause ?
288 767 : if (count1 > 0) sindex += snprintf(s+sindex, slen-sindex, "WHERE ");
289 :
290 : // check constraints
291 1318 : for (int i=0; i < count1; ++i) {
292 : // analyze only usable constraints
293 551 : struct sqlite3_index_constraint *constraint = &idxinfo->aConstraint[i];
294 551 : if (constraint->usable == false) continue;
295 :
296 551 : int idx = constraint->iColumn;
297 551 : uint8_t op = constraint->op;
298 :
299 551 : const char *colname = (idx >= 0 && idx < CLOUDSYNC_CHANGES_NCOLS) ? COLNAME_FROM_INDEX(idx) : "rowid";
300 551 : const char *opname = vtab_opname_from_value(op);
301 551 : if (!opname) continue;
302 :
303 : // build next constraint
304 551 : if (i > 0) sindex += snprintf(s+sindex, slen-sindex, " AND ");
305 :
306 : // handle special case where value is not needed
307 551 : if ((op == SQLITE_INDEX_CONSTRAINT_ISNULL) || (op == SQLITE_INDEX_CONSTRAINT_ISNOTNULL)) {
308 2 : sindex += snprintf(s+sindex, slen-sindex, "%s %s", colname, opname);
309 2 : idxinfo->aConstraintUsage[i].argvIndex = 0;
310 2 : } else {
311 549 : sindex += snprintf(s+sindex, slen-sindex, "%s %s ?", colname, opname);
312 549 : idxinfo->aConstraintUsage[i].argvIndex = arg_index++;
313 : }
314 551 : idxinfo->aConstraintUsage[i].omit = 1;
315 :
316 : //a bitmask (idxnum) is built up based on which constraints are applied
317 551 : if (idx == COL_DBVERSION_INDEX) idxnum |= 2; // set bit 1
318 538 : else if (idx == COL_SITEID_INDEX) idxnum |= 4; // set bit 2
319 551 : }
320 :
321 : // is there an ORDER BY clause ?
322 : // if not use a default one
323 767 : if (count2 > 0) sindex += snprintf(s+sindex, slen-sindex, " ORDER BY ");
324 762 : else sindex += snprintf(s+sindex, slen-sindex, " ORDER BY db_version, seq ASC");
325 :
326 775 : for (int i=0; i < count2; ++i) {
327 8 : struct sqlite3_index_orderby *orderby = &idxinfo->aOrderBy[i];
328 :
329 : // build next constraint
330 8 : if (i > 0) sindex += snprintf(s+sindex, slen-sindex, ", ");
331 :
332 8 : int idx = orderby->iColumn;
333 8 : const char *colname = (idx >= 0 && idx < CLOUDSYNC_CHANGES_NCOLS) ? COLNAME_FROM_INDEX(idx) : "rowid";
334 8 : if (!vtab_colname_is_legal(colname)) orderconsumed = 0;
335 :
336 8 : sindex += snprintf(s+sindex, slen-sindex, "%s %s", colname, orderby->desc ? " DESC" : " ASC");
337 8 : }
338 :
339 767 : idxinfo->idxNum = idxnum;
340 767 : idxinfo->idxStr = s;
341 767 : idxinfo->needToFreeIdxStr = 1;
342 767 : idxinfo->orderByConsumed = orderconsumed;
343 :
344 : // the goal of the xBestIndex function is to help SQLite's query planner decide on the most efficient way
345 : // to execute a query on the virtual table. It does so by evaluating which constraints (filters) can be applied
346 : // and providing an estimate of the cost and number of rows that the query will return.
347 :
348 :
349 : /*
350 :
351 : By commenting the following code we assume that the developer is using an SQLite library
352 : more recent than 3.8.2 released on 2013-12-06
353 :
354 : int version = sqlite3_libversion_number();
355 : if (version >= 3008002) {
356 : // field sqlite3_int64 estimatedRows is available (estimated number of rows returned)
357 : }
358 :
359 : if (version >= 3009000) {
360 : // field int idxFlags is available (mask of SQLITE_INDEX_SCAN)
361 : }
362 :
363 : if (version >= 3010000) {
364 : // field sqlite3_uint64 colUsed is available (input: mask of columns used by statement)
365 : }
366 :
367 : */
368 :
369 : // perform estimated cost and row count based on the constraints
370 767 : if ((idxnum & 6) == 6) {
371 : // both DbVrsn and SiteId constraints are present
372 : // query is expected to be highly selective, returning only one row, with a very low execution cost
373 1 : idxinfo->estimatedCost = 1.0;
374 1 : idxinfo->estimatedRows = 1;
375 767 : } else if ((idxnum & 2) == 2) {
376 : // only DbVrsn constraint is present
377 : // query is expected to return more rows (10) and take more time (cost of 10.0) than in the previous case
378 9 : idxinfo->estimatedCost = 10.0;
379 9 : idxinfo->estimatedRows = 10;
380 766 : } else if ((idxnum & 4) == 4) {
381 : // only SiteId constraint is present
382 : // query is expected to be very inefficient, returning a large number of rows and taking a long time to execute
383 535 : idxinfo->estimatedCost = (double)INT32_MAX;
384 535 : idxinfo->estimatedRows = (sqlite3_int64)INT32_MAX;
385 535 : } else {
386 : // no constraints are present
387 : // worst-case scenario, where the query returns all rows from the virtual table
388 222 : idxinfo->estimatedCost = (double)INT64_MAX;
389 222 : idxinfo->estimatedRows = (sqlite3_int64)INT64_MAX;
390 : }
391 :
392 767 : return SQLITE_OK;
393 767 : }
394 :
395 :
396 766 : int cloudsync_changesvtab_filter (sqlite3_vtab_cursor *cursor, int idxn, const char *idxs, int argc, sqlite3_value **argv) {
397 : DEBUG_VTAB("cloudsync_changesvtab_filter");
398 :
399 766 : cloudsync_changes_cursor *c = (cloudsync_changes_cursor *)cursor;
400 766 : cloudsync_context *data = c->vtab->data;
401 766 : sqlite3 *db = c->vtab->db;
402 766 : char *sql = vtab_build_changes_sql(data, idxs);
403 766 : if (sql == NULL) {
404 : // vtab_build_changes_sql returns NULL when no *_cloudsync meta-tables
405 : // exist (cloudsync_init was never called, or the last configured table
406 : // was cleaned up): the inner GROUP_CONCAT produces a NULL row and the
407 : // outer SELECT yields a NULL final string. Distinguish this from a
408 : // genuine OOM by checking whether cloudsync is configured, so the user
409 : // gets an actionable message instead of "out of memory".
410 1 : if (!cloudsync_config_exists(data) || dbutils_table_settings_count_tables(data) == 0) {
411 1 : return vtab_set_error((sqlite3_vtab *)c->vtab,
412 : "cloudsync has no tables configured for sync. Call "
413 : "SELECT cloudsync_init('<table_name>') to enable sync on a "
414 : "table before querying cloudsync_changes.");
415 : }
416 0 : return SQLITE_NOMEM;
417 : }
418 :
419 : // the xFilter method may be called multiple times on the same sqlite3_vtab_cursor*
420 765 : if (c->vm) sqlite3_finalize(c->vm);
421 765 : c->vm = NULL;
422 :
423 765 : int rc = sqlite3_prepare_v2(db, sql, -1, &c->vm, NULL);
424 765 : cloudsync_memory_free(sql);
425 765 : if (rc != SQLITE_OK) goto abort_filter;
426 :
427 1314 : for (int i=0; i<argc; ++i) {
428 549 : rc = sqlite3_bind_value(c->vm, i+1, argv[i]);
429 549 : if (rc != SQLITE_OK) goto abort_filter;
430 549 : }
431 :
432 765 : rc = sqlite3_step(c->vm);
433 765 : CHECK_VFILTERTEST_ABORT();
434 :
435 765 : if (rc == SQLITE_DONE) {
436 11 : sqlite3_finalize(c->vm);
437 11 : c->vm = NULL;
438 765 : } else if (rc != SQLITE_ROW) {
439 1 : goto abort_filter;
440 : }
441 :
442 764 : return SQLITE_OK;
443 :
444 : abort_filter:
445 : // error condition
446 : DEBUG_VTAB("cloudsync_changesvtab_filter: %s\n", sqlite3_errmsg(db));
447 1 : if (c->vm) {
448 1 : sqlite3_finalize(c->vm);
449 1 : c->vm = NULL;
450 1 : }
451 1 : return rc;
452 766 : }
453 :
454 49721 : int cloudsync_changesvtab_next (sqlite3_vtab_cursor *cursor) {
455 : DEBUG_VTAB("cloudsync_changesvtab_next");
456 :
457 49721 : cloudsync_changes_cursor *c = (cloudsync_changes_cursor *)cursor;
458 49721 : int rc = sqlite3_step(c->vm);
459 :
460 49721 : if (rc == SQLITE_DONE) {
461 751 : sqlite3_finalize(c->vm);
462 751 : c->vm = NULL;
463 751 : rc = SQLITE_OK;
464 49721 : } else if (rc == SQLITE_ROW) {
465 48970 : rc = SQLITE_OK;
466 48970 : }
467 :
468 49721 : if (rc != SQLITE_OK) DEBUG_VTAB("cloudsync_changesvtab_next: %s\n", sqlite3_errmsg(c->vtab->db));
469 49721 : return rc;
470 : }
471 :
472 50485 : int cloudsync_changesvtab_eof (sqlite3_vtab_cursor *cursor) {
473 : DEBUG_VTAB("cloudsync_changesvtab_eof");
474 :
475 : // we must return false (zero) if the specified cursor currently points to a valid row of data, or true (non-zero) otherwise
476 50485 : cloudsync_changes_cursor *c = (cloudsync_changes_cursor *)cursor;
477 50485 : return (c->vm) ? 0 : 1;
478 : }
479 :
480 445104 : int cloudsync_changesvtab_column (sqlite3_vtab_cursor *cursor, sqlite3_context *ctx, int col) {
481 : DEBUG_VTAB("cloudsync_changesvtab_column %d\n", col);
482 :
483 445104 : cloudsync_changes_cursor *c = (cloudsync_changes_cursor *)cursor;
484 445104 : sqlite3_value *value = sqlite3_column_value(c->vm, col);
485 445104 : sqlite3_result_value(ctx, value);
486 :
487 445104 : return SQLITE_OK;
488 : }
489 :
490 88 : int cloudsync_changesvtab_rowid (sqlite3_vtab_cursor *cursor, sqlite3_int64 *rowid) {
491 : DEBUG_VTAB("cloudsync_changesvtab_rowid");
492 :
493 88 : cloudsync_changes_cursor *c = (cloudsync_changes_cursor *)cursor;
494 88 : sqlite3_int64 seq = sqlite3_column_int64(c->vm, COL_SEQ_INDEX);
495 88 : sqlite3_int64 db_version = sqlite3_column_int64(c->vm, COL_DBVERSION_INDEX);
496 :
497 : // for an explanation see https://github.com/sqliteai/sqlite-sync/blob/main/docs/RowID.md
498 88 : *rowid = (db_version << 30) | seq;
499 88 : return SQLITE_OK;
500 : }
501 :
502 3150 : int cloudsync_changesvtab_insert_gos (sqlite3_vtab *vtab, cloudsync_context *data, cloudsync_table_context *table, const char *insert_pk, int insert_pk_len, const char *insert_name, sqlite3_value *insert_value, sqlite3_int64 insert_col_version, sqlite3_int64 insert_db_version, const char *insert_site_id, int insert_site_id_len, sqlite3_int64 insert_seq, int64_t *rowid) {
503 : DEBUG_VTAB("cloudsync_changesvtab_insert_gos");
504 :
505 : // Grow-Only Set (GOS) Algorithm: Only insertions are allowed, deletions and updates are prevented from a trigger.
506 3150 : int rc = merge_insert_col(data, table, insert_pk, insert_pk_len, insert_name, insert_value, (int64_t)insert_col_version, (int64_t)insert_db_version, insert_site_id, insert_site_id_len, (int64_t)insert_seq, rowid);
507 :
508 3150 : if (rc != SQLITE_OK) {
509 0 : vtab_set_error(vtab, "%s", cloudsync_errmsg(data));
510 0 : }
511 :
512 3150 : return rc;
513 : }
514 :
515 49240 : int cloudsync_changesvtab_insert (sqlite3_vtab *vtab, int argc, sqlite3_value **argv, sqlite3_int64 *rowid) {
516 : DEBUG_VTAB("cloudsync_changesvtab_insert");
517 :
518 : // this function performs the merging logic for an insert in a cloud-synchronized table. It handles
519 : // different scenarios including conflicts, causal lengths, delete operations, and resurrecting rows
520 : // based on the incoming data (from remote nodes or clients) and the local database state
521 :
522 : // this function handles different CRDT algorithms (GOS, DWS, AWS, and CLS).
523 : // the merging strategy is determined based on the table->algo value.
524 :
525 : // meta table declaration:
526 : // tbl TEXT NOT NULL, pk BLOB NOT NULL, col_name TEXT NOT NULL,"
527 : // "col_value ANY, col_version INTEGER NOT NULL, db_version INTEGER NOT NULL,"
528 : // "site_id BLOB NOT NULL, cl INTEGER NOT NULL, seq INTEGER NOT NULL
529 :
530 : // meta information to retrieve from arguments:
531 : // argv[0] -> table name (TEXT)
532 : // argv[1] -> primary key (BLOB)
533 : // argv[2] -> column name (TEXT or NULL if sentinel)
534 : // argv[3] -> column value (ANY)
535 : // argv[4] -> column version (INTEGER)
536 : // argv[5] -> database version (INTEGER)
537 : // argv[6] -> site ID (BLOB, identifies the origin of the update)
538 : // argv[7] -> causal length (INTEGER, tracks the order of operations)
539 : // argv[8] -> sequence number (INTEGER, unique per operation)
540 :
541 : // extract table name
542 49240 : const char *insert_tbl = (const char *)sqlite3_value_text(argv[0]);
543 :
544 : // lookup table
545 49240 : cloudsync_context *data = (cloudsync_context *)(((cloudsync_changes_vtab *)vtab)->data);
546 49240 : cloudsync_table_context *table = table_lookup(data, insert_tbl);
547 49240 : if (!table) return vtab_set_error(vtab, "Unable to find table %s,", insert_tbl);
548 :
549 : // extract the remaining fields from the input values
550 49240 : const char *insert_pk = (const char *)sqlite3_value_blob(argv[1]);
551 49240 : int insert_pk_len = sqlite3_value_bytes(argv[1]);
552 49240 : const char *insert_name = (sqlite3_value_type(argv[2]) == SQLITE_NULL) ? CLOUDSYNC_TOMBSTONE_VALUE : (const char *)sqlite3_value_text(argv[2]);
553 49240 : sqlite3_value *insert_value = argv[3];
554 49240 : int64_t insert_col_version = (int64_t)sqlite3_value_int(argv[4]);
555 49240 : int64_t insert_db_version = (int64_t)sqlite3_value_int(argv[5]);
556 49240 : const char *insert_site_id = (const char *)sqlite3_value_blob(argv[6]);
557 49240 : int insert_site_id_len = sqlite3_value_bytes(argv[6]);
558 49240 : int64_t insert_cl = (int64_t)sqlite3_value_int(argv[7]);
559 49240 : int64_t insert_seq = (int64_t)sqlite3_value_int(argv[8]);
560 :
561 : // perform different logic for each different table algorithm
562 49240 : if (table_algo_isgos(table)) return cloudsync_changesvtab_insert_gos(vtab, data, table, insert_pk, insert_pk_len, insert_name, insert_value, insert_col_version, insert_db_version, insert_site_id, insert_site_id_len, insert_seq, (int64_t *)rowid);
563 :
564 46090 : int rc = merge_insert (data, table, insert_pk, insert_pk_len, insert_cl, insert_name, insert_value, insert_col_version, insert_db_version, insert_site_id, insert_site_id_len, insert_seq, (int64_t *)rowid);
565 46090 : if (rc != SQLITE_OK) {
566 0 : return vtab_set_error(vtab, "%s", cloudsync_errmsg(data));
567 : }
568 :
569 46090 : return SQLITE_OK;
570 49240 : }
571 :
572 49241 : int cloudsync_changesvtab_update (sqlite3_vtab *vtab, int argc, sqlite3_value **argv, sqlite3_int64 *rowid) {
573 : DEBUG_VTAB("cloudsync_changesvtab_update");
574 :
575 : // only INSERT statements are allowed
576 49241 : bool is_insert = (argc > 1 && sqlite3_value_type(argv[0]) == SQLITE_NULL);
577 49241 : if (!is_insert) {
578 1 : vtab_set_error(vtab, "Only INSERT and SELECT statements are allowed against the cloudsync_changes table");
579 1 : return SQLITE_MISUSE;
580 : }
581 :
582 : // argv[0] is set only in case of DELETE statement (it contains the rowid of a row in the virtual table to be deleted)
583 : // argv[1] is the rowid of a new row to be inserted into the virtual table (always NULL in our case)
584 : // so reduce the number of meaningful arguments by 2
585 49240 : return cloudsync_changesvtab_insert(vtab, argc-2, &argv[2], rowid);
586 49241 : }
587 :
588 : // MARK: -
589 :
590 233 : int cloudsync_vtab_register_changes (sqlite3 *db, cloudsync_context *xdata) {
591 : static sqlite3_module cloudsync_changes_module = {
592 : /* iVersion */ 0,
593 : /* xCreate */ 0, // Eponymous only virtual table
594 : /* xConnect */ cloudsync_changesvtab_connect,
595 : /* xBestIndex */ cloudsync_changesvtab_best_index,
596 : /* xDisconnect */ cloudsync_changesvtab_disconnect,
597 : /* xDestroy */ 0,
598 : /* xOpen */ cloudsync_changesvtab_open,
599 : /* xClose */ cloudsync_changesvtab_close,
600 : /* xFilter */ cloudsync_changesvtab_filter,
601 : /* xNext */ cloudsync_changesvtab_next,
602 : /* xEof */ cloudsync_changesvtab_eof,
603 : /* xColumn */ cloudsync_changesvtab_column,
604 : /* xRowid */ cloudsync_changesvtab_rowid,
605 : /* xUpdate */ cloudsync_changesvtab_update,
606 : /* xBegin */ 0,
607 : /* xSync */ 0,
608 : /* xCommit */ 0,
609 : /* xRollback */ 0,
610 : /* xFindMethod */ 0,
611 : /* xRename */ 0,
612 : /* xSavepoint */ 0,
613 : /* xRelease */ 0,
614 : /* xRollbackTo */ 0,
615 : /* xShadowName */ 0,
616 : /* xIntegrity */ 0
617 : };
618 :
619 233 : return sqlite3_create_module(db, "cloudsync_changes", &cloudsync_changes_module, (void *)xdata);
620 : }
|