Line data Source code
1 : //
2 : // cloudsync_changes_sqlite.c
3 : // cloudsync
4 : //
5 : // Created by Marco Bambini on 23/09/24.
6 : //
7 :
8 : #include <stdio.h>
9 : #include <string.h>
10 :
11 : #include "cloudsync_changes_sqlite.h"
12 : #include "../utils.h"
13 : #include "../dbutils.h"
14 :
15 : #ifndef SQLITE_CORE
16 : SQLITE_EXTENSION_INIT3
17 : #endif
18 :
19 : typedef struct cloudsync_changes_vtab {
20 : sqlite3_vtab base; // base class, must be first
21 : sqlite3 *db;
22 : cloudsync_context *data;
23 : } cloudsync_changes_vtab;
24 :
25 : typedef struct cloudsync_changes_cursor {
26 : sqlite3_vtab_cursor base; // base class, must be first
27 : cloudsync_changes_vtab *vtab;
28 : sqlite3_stmt *vm; // prepared statement
29 : } cloudsync_changes_cursor;
30 :
31 : char *cloudsync_changes_columns[] = {"tbl", "pk", "col_name", "col_value", "col_version", "db_version", "site_id", "cl", "seq"};
32 : #define COLNAME_FROM_INDEX(i) cloudsync_changes_columns[i]
33 : #define COL_TBL_INDEX 0
34 : #define COL_PK_INDEX 1
35 : #define COL_NAME_INDEX 2
36 : #define COL_VALUE_INDEX 3
37 : #define COL_VERSION_INDEX 4
38 : #define COL_DBVERSION_INDEX 5
39 : #define COL_SITEID_INDEX 6
40 : #define COL_CL_INDEX 7
41 : #define COL_SEQ_INDEX 8
42 :
43 : #if CLOUDSYNC_UNITTEST
44 : bool force_vtab_filter_abort = false;
45 : #define CHECK_VFILTERTEST_ABORT() if (force_vtab_filter_abort) rc = SQLITE_ERROR
46 : #else
47 : #define CHECK_VFILTERTEST_ABORT()
48 : #endif
49 :
50 : // MARK: -
51 :
52 1 : int vtab_set_error (sqlite3_vtab *vtab, const char *format, ...) {
53 : va_list arg;
54 1 : va_start (arg, format);
55 1 : char *err = sqlite3_vmprintf(format, arg);
56 1 : va_end (arg);
57 :
58 1 : if (vtab->zErrMsg) sqlite3_free(vtab->zErrMsg);
59 1 : vtab->zErrMsg = err;
60 1 : return SQLITE_ERROR;
61 : }
62 :
63 527 : const char *vtab_opname_from_value (int value) {
64 527 : switch (value) {
65 511 : case SQLITE_INDEX_CONSTRAINT_EQ: return "=";
66 5 : case SQLITE_INDEX_CONSTRAINT_GT: return ">";
67 1 : case SQLITE_INDEX_CONSTRAINT_LE: return "<=";
68 1 : case SQLITE_INDEX_CONSTRAINT_LT: return "<";
69 1 : case SQLITE_INDEX_CONSTRAINT_GE: return ">=";
70 1 : case SQLITE_INDEX_CONSTRAINT_LIKE: return "LIKE";
71 1 : case SQLITE_INDEX_CONSTRAINT_GLOB: return "GLOB";
72 1 : case SQLITE_INDEX_CONSTRAINT_NE: return "!=";
73 1 : case SQLITE_INDEX_CONSTRAINT_ISNOT: return "IS NOT";
74 1 : case SQLITE_INDEX_CONSTRAINT_ISNOTNULL: return "IS NOT NULL";
75 1 : case SQLITE_INDEX_CONSTRAINT_ISNULL: return "IS NULL";
76 1 : case SQLITE_INDEX_CONSTRAINT_IS: return "IS";
77 :
78 : // The REGEXP operator is a special syntax for the regexp() user function.
79 : // No regexp() user function is defined by default and so use of the REGEXP operator will normally result in an error message.
80 : // If an application-defined SQL function named "regexp" is added at run-time, then the "X REGEXP Y" operator will be implemented as
81 : // a call to "regexp(Y,X)".
82 : // case SQLITE_INDEX_CONSTRAINT_REGEXP: return "REGEX";
83 :
84 : // MATCH is only valid for virtual FTS tables
85 : // The MATCH operator is a special syntax for the match() application-defined function.
86 : // The default match() function implementation raises an exception and is not really useful for anything.
87 : // But extensions can override the match() function with more helpful logic.
88 : // case SQLITE_INDEX_CONSTRAINT_MATCH: return "MATCH";
89 : }
90 1 : return NULL;
91 527 : }
92 :
93 10 : int vtab_colname_is_legal (const char *name) {
94 10 : int count = sizeof(cloudsync_changes_columns) / sizeof (char *);
95 :
96 72 : for (int i=0; i<count; ++i) {
97 71 : if (strcasecmp(cloudsync_changes_columns[i], name) == 0) return 1;
98 62 : }
99 1 : return 0;
100 10 : }
101 :
102 727 : char *vtab_build_changes_sql (cloudsync_context *data, const char *idxs) {
103 : DEBUG_VTAB("build_changes_sql");
104 :
105 : /*
106 : * This SQLite query dynamically generates and executes a consolidated query
107 : * to fetch changes from all tables related to cloud synchronization.
108 : *
109 : * It works in the following steps:
110 : *
111 : * 1. `table_names` CTE: Retrieves all table names in the database that end
112 : * with '_cloudsync' and extracts the base table name (without the '_cloudsync' suffix).
113 : *
114 : * 2. `changes_query` CTE: Constructs individual SELECT statements for each
115 : * cloud sync table, fetching data about changes in columns:
116 : * - `pk`: Primary key of the table.
117 : * - `col_name`: Name of the changed column.
118 : * - `col_version`: Version of the changed column.
119 : * - `db_version`: Database version when the change was recorded.
120 : * - `site_id`: Site identifier associated with the change.
121 : * - `cl`: Coalesced version (either `t2.col_version` or 1 if `t2.col_version` is NULL).
122 : * - `seq`: Sequence number of the change.
123 : * Each query is constructed by joining the table with `cloudsync_site_id`
124 : * for resolving the site ID and performing a LEFT JOIN with itself to
125 : * identify columns with NULL values in `t2.col_name`.
126 : *
127 : * 3. `union_query` CTE: Combines all the constructed SELECT statements
128 : * from `changes_query` into a single query using `UNION ALL`.
129 : *
130 : * 4. `final_query` CTE: Wraps the combined UNION ALL query into another
131 : * SELECT statement, preparing for additional filtering.
132 : *
133 : * 5. Final SELECT: Adds a WHERE clause to filter records with `db_version`
134 : * greater than a specified value (using a placeholder `?`), and orders
135 : * the results by `db_version` and `seq`.
136 : *
137 : * The overall result is a consolidated view of changes across all
138 : * cloud sync tables, filtered and ordered based on the `db_version` and
139 : * `seq` fields.
140 : */
141 :
142 727 : const char *query =
143 : "WITH table_names AS ( "
144 : " SELECT format('%q',SUBSTR(tbl_name, 1, LENGTH(tbl_name) - 10)) AS table_name_literal, format('%w',SUBSTR(tbl_name, 1, LENGTH(tbl_name) - 10)) AS table_name_identifier, format('%w',tbl_name) AS table_meta "
145 : " FROM sqlite_master "
146 : " WHERE type = 'table' AND tbl_name LIKE '%_cloudsync' "
147 : "), "
148 : "changes_query AS ( "
149 : " SELECT "
150 : " 'SELECT "
151 : " ''' || \"table_name_literal\" || ''' AS tbl, "
152 : " t1.pk AS pk, "
153 : " t1.col_name AS col_name, "
154 : " cloudsync_col_value(''' || \"table_name_literal\" || ''', t1.col_name, t1.pk) AS col_value, "
155 : " t1.col_version AS col_version, "
156 : " t1.db_version AS db_version, "
157 : " site_tbl.site_id AS site_id, "
158 : " t1.seq AS seq, "
159 : " COALESCE(t2.col_version, 1) AS cl "
160 : " FROM \"' || \"table_meta\" || '\" AS t1 "
161 : " LEFT JOIN cloudsync_site_id AS site_tbl ON t1.site_id = site_tbl.rowid "
162 : " LEFT JOIN \"' || \"table_meta\" || '\" AS t2 ON t1.pk = t2.pk AND t2.col_name = ''" CLOUDSYNC_TOMBSTONE_VALUE "'' "
163 : " WHERE col_value IS NOT ''" CLOUDSYNC_RLS_RESTRICTED_VALUE "''' "
164 : " AS query_string FROM table_names "
165 : "), "
166 : "union_query AS ( "
167 : " SELECT GROUP_CONCAT(query_string, ' UNION ALL ') AS union_query FROM changes_query "
168 : "), "
169 : "final_query AS ( "
170 : " SELECT "
171 : " 'SELECT tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq FROM (' || union_query || ')' "
172 : " AS final_string FROM union_query "
173 : ") "
174 : "SELECT final_string || ' ";
175 727 : const char *final_query = ";' FROM final_query;";
176 :
177 : static size_t query_len = 0;
178 : static size_t final_query_len = 0;
179 727 : if (query_len == 0) query_len = strlen(query);
180 727 : if (final_query_len == 0) final_query_len = strlen(final_query);
181 :
182 727 : size_t idx_len = strlen(idxs);
183 727 : size_t blen = query_len + idx_len + final_query_len + 128;
184 727 : char *sql = (char *)cloudsync_memory_alloc((sqlite3_uint64)blen);
185 727 : if (!sql) return NULL;
186 :
187 : // build final sql statement taking in account the dynamic idxs string provided by the user
188 727 : memcpy(sql, query, query_len);
189 727 : memcpy(sql + (query_len), idxs, idx_len);
190 727 : memcpy(sql + (query_len + idx_len), final_query, final_query_len+1);
191 :
192 727 : char *value = NULL;
193 727 : int rc = database_select_text(data, sql, &value);
194 727 : cloudsync_memory_free(sql);
195 :
196 727 : return (rc == DBRES_OK) ? value : NULL;
197 727 : }
198 :
199 : // MARK: -
200 :
201 169 : int cloudsync_changesvtab_connect (sqlite3 *db, void *aux, int argc, const char *const *argv, sqlite3_vtab **vtab, char **err) {
202 : DEBUG_VTAB("cloudsync_changesvtab_connect");
203 :
204 169 : int rc = sqlite3_declare_vtab(db, "CREATE TABLE x (tbl TEXT NOT NULL, pk BLOB NOT NULL, col_name TEXT NOT NULL,"
205 : "col_value TEXT, col_version INTEGER NOT NULL, db_version INTEGER NOT NULL,"
206 : "site_id BLOB NOT NULL, cl INTEGER NOT NULL, seq INTEGER NOT NULL);");
207 169 : if (rc == SQLITE_OK) {
208 : // memory internally managed by SQLite, so I cannot use memory_alloc here
209 169 : cloudsync_changes_vtab *vnew = sqlite3_malloc64(sizeof(cloudsync_changes_vtab));
210 169 : if (vnew == NULL) return SQLITE_NOMEM;
211 :
212 169 : memset(vnew, 0, sizeof(cloudsync_changes_vtab));
213 169 : vnew->db = db;
214 169 : vnew->data = aux;
215 :
216 169 : *vtab = (sqlite3_vtab *)vnew;
217 169 : }
218 :
219 169 : return rc;
220 169 : }
221 :
222 169 : int cloudsync_changesvtab_disconnect (sqlite3_vtab *vtab) {
223 : DEBUG_VTAB("cloudsync_changesvtab_disconnect");
224 :
225 169 : cloudsync_changes_vtab *p = (cloudsync_changes_vtab *)vtab;
226 169 : sqlite3_free(p);
227 169 : return SQLITE_OK;
228 : }
229 :
230 727 : int cloudsync_changesvtab_open (sqlite3_vtab *vtab, sqlite3_vtab_cursor **pcursor) {
231 : DEBUG_VTAB("cloudsync_changesvtab_open");
232 :
233 727 : cloudsync_changes_cursor *cursor = cloudsync_memory_alloc(sizeof(cloudsync_changes_cursor));
234 727 : if (cursor == NULL) return SQLITE_NOMEM;
235 :
236 727 : memset(cursor, 0, sizeof(cloudsync_changes_cursor));
237 727 : cursor->vtab = (cloudsync_changes_vtab *)vtab;
238 :
239 727 : *pcursor = (sqlite3_vtab_cursor *)cursor;
240 727 : return SQLITE_OK;
241 727 : }
242 :
243 727 : int cloudsync_changesvtab_close (sqlite3_vtab_cursor *cursor) {
244 : DEBUG_VTAB("cloudsync_changesvtab_close");
245 :
246 727 : cloudsync_changes_cursor *c = (cloudsync_changes_cursor *)cursor;
247 727 : if (c->vm) {
248 2 : sqlite3_finalize(c->vm);
249 2 : c->vm = NULL;
250 2 : }
251 :
252 727 : cloudsync_memory_free(cursor);
253 727 : return SQLITE_OK;
254 : }
255 :
256 728 : int cloudsync_changesvtab_best_index (sqlite3_vtab *vtab, sqlite3_index_info *idxinfo) {
257 : DEBUG_VTAB("cloudsync_changesvtab_best_index");
258 :
259 : // the goal here is to build a WHERE clause and gives an estimate
260 : // of the cost of that clause
261 :
262 : // we'll incrementally build the clause and we avoid the realloc
263 : // of memory, so perform a quick loop to estimate memory usage
264 :
265 : // count the number of contrainst and order by clauses
266 728 : int count1 = idxinfo->nConstraint;
267 728 : int count2 = idxinfo->nOrderBy;
268 :
269 : // col_version is the longest column name (11)
270 : // +1 for the space
271 : // IS NOT NULL is the longest constraint value (11)
272 : // +1 for the space
273 : // +1 for the ? character
274 : // +5 for space AND space
275 : // +512 for the extra space and for the WHERE and ORDER BY literals
276 :
277 : // memory internally manager by SQLite, so I cannot use memory_alloc here
278 728 : size_t slen = (count1 * (11 + 1 + 11 + 1 + 5)) + (count2 * 11 + 1 + 5) + 512;
279 728 : char *s = (char *)sqlite3_malloc64((sqlite3_uint64)slen);
280 728 : if (!s) return SQLITE_NOMEM;
281 728 : size_t sindex= 0;
282 :
283 728 : int idxnum = 0;
284 728 : int arg_index = 1;
285 728 : int orderconsumed = 1;
286 :
287 : // is there a WHERE clause ?
288 728 : if (count1 > 0) sindex += snprintf(s+sindex, slen-sindex, "WHERE ");
289 :
290 : // check constraints
291 1254 : for (int i=0; i < count1; ++i) {
292 : // analyze only usable constraints
293 526 : struct sqlite3_index_constraint *constraint = &idxinfo->aConstraint[i];
294 526 : if (constraint->usable == false) continue;
295 :
296 526 : int idx = constraint->iColumn;
297 526 : uint8_t op = constraint->op;
298 :
299 526 : const char *colname = (idx >= 0 && idx < CLOUDSYNC_CHANGES_NCOLS) ? COLNAME_FROM_INDEX(idx) : "rowid";
300 526 : const char *opname = vtab_opname_from_value(op);
301 526 : if (!opname) continue;
302 :
303 : // build next constraint
304 526 : if (i > 0) sindex += snprintf(s+sindex, slen-sindex, " AND ");
305 :
306 : // handle special case where value is not needed
307 526 : if ((op == SQLITE_INDEX_CONSTRAINT_ISNULL) || (op == SQLITE_INDEX_CONSTRAINT_ISNOTNULL)) {
308 2 : sindex += snprintf(s+sindex, slen-sindex, "%s %s", colname, opname);
309 2 : idxinfo->aConstraintUsage[i].argvIndex = 0;
310 2 : } else {
311 524 : sindex += snprintf(s+sindex, slen-sindex, "%s %s ?", colname, opname);
312 524 : idxinfo->aConstraintUsage[i].argvIndex = arg_index++;
313 : }
314 526 : idxinfo->aConstraintUsage[i].omit = 1;
315 :
316 : //a bitmask (idxnum) is built up based on which constraints are applied
317 526 : if (idx == COL_DBVERSION_INDEX) idxnum |= 2; // set bit 1
318 513 : else if (idx == COL_SITEID_INDEX) idxnum |= 4; // set bit 2
319 526 : }
320 :
321 : // is there an ORDER BY clause ?
322 : // if not use a default one
323 728 : if (count2 > 0) sindex += snprintf(s+sindex, slen-sindex, " ORDER BY ");
324 723 : else sindex += snprintf(s+sindex, slen-sindex, " ORDER BY db_version, seq ASC");
325 :
326 736 : for (int i=0; i < count2; ++i) {
327 8 : struct sqlite3_index_orderby *orderby = &idxinfo->aOrderBy[i];
328 :
329 : // build next constraint
330 8 : if (i > 0) sindex += snprintf(s+sindex, slen-sindex, ", ");
331 :
332 8 : int idx = orderby->iColumn;
333 8 : const char *colname = (idx >= 0 && idx < CLOUDSYNC_CHANGES_NCOLS) ? COLNAME_FROM_INDEX(idx) : "rowid";
334 8 : if (!vtab_colname_is_legal(colname)) orderconsumed = 0;
335 :
336 8 : sindex += snprintf(s+sindex, slen-sindex, "%s %s", colname, orderby->desc ? " DESC" : " ASC");
337 8 : }
338 :
339 728 : idxinfo->idxNum = idxnum;
340 728 : idxinfo->idxStr = s;
341 728 : idxinfo->needToFreeIdxStr = 1;
342 728 : idxinfo->orderByConsumed = orderconsumed;
343 :
344 : // the goal of the xBestIndex function is to help SQLite's query planner decide on the most efficient way
345 : // to execute a query on the virtual table. It does so by evaluating which constraints (filters) can be applied
346 : // and providing an estimate of the cost and number of rows that the query will return.
347 :
348 :
349 : /*
350 :
351 : By commenting the following code we assume that the developer is using an SQLite library
352 : more recent than 3.8.2 released on 2013-12-06
353 :
354 : int version = sqlite3_libversion_number();
355 : if (version >= 3008002) {
356 : // field sqlite3_int64 estimatedRows is available (estimated number of rows returned)
357 : }
358 :
359 : if (version >= 3009000) {
360 : // field int idxFlags is available (mask of SQLITE_INDEX_SCAN)
361 : }
362 :
363 : if (version >= 3010000) {
364 : // field sqlite3_uint64 colUsed is available (input: mask of columns used by statement)
365 : }
366 :
367 : */
368 :
369 : // perform estimated cost and row count based on the constraints
370 728 : if ((idxnum & 6) == 6) {
371 : // both DbVrsn and SiteId constraints are present
372 : // query is expected to be highly selective, returning only one row, with a very low execution cost
373 1 : idxinfo->estimatedCost = 1.0;
374 1 : idxinfo->estimatedRows = 1;
375 728 : } else if ((idxnum & 2) == 2) {
376 : // only DbVrsn constraint is present
377 : // query is expected to return more rows (10) and take more time (cost of 10.0) than in the previous case
378 9 : idxinfo->estimatedCost = 10.0;
379 9 : idxinfo->estimatedRows = 10;
380 727 : } else if ((idxnum & 4) == 4) {
381 : // only SiteId constraint is present
382 : // query is expected to be very inefficient, returning a large number of rows and taking a long time to execute
383 510 : idxinfo->estimatedCost = (double)INT32_MAX;
384 510 : idxinfo->estimatedRows = (sqlite3_int64)INT32_MAX;
385 510 : } else {
386 : // no constraints are present
387 : // worst-case scenario, where the query returns all rows from the virtual table
388 208 : idxinfo->estimatedCost = (double)INT64_MAX;
389 208 : idxinfo->estimatedRows = (sqlite3_int64)INT64_MAX;
390 : }
391 :
392 728 : return SQLITE_OK;
393 728 : }
394 :
395 :
396 727 : int cloudsync_changesvtab_filter (sqlite3_vtab_cursor *cursor, int idxn, const char *idxs, int argc, sqlite3_value **argv) {
397 : DEBUG_VTAB("cloudsync_changesvtab_filter");
398 :
399 727 : cloudsync_changes_cursor *c = (cloudsync_changes_cursor *)cursor;
400 727 : cloudsync_context *data = c->vtab->data;
401 727 : sqlite3 *db = c->vtab->db;
402 727 : char *sql = vtab_build_changes_sql(data, idxs);
403 727 : if (sql == NULL) return SQLITE_NOMEM;
404 :
405 : // the xFilter method may be called multiple times on the same sqlite3_vtab_cursor*
406 727 : if (c->vm) sqlite3_finalize(c->vm);
407 727 : c->vm = NULL;
408 :
409 727 : int rc = sqlite3_prepare_v2(db, sql, -1, &c->vm, NULL);
410 727 : cloudsync_memory_free(sql);
411 727 : if (rc != SQLITE_OK) goto abort_filter;
412 :
413 1251 : for (int i=0; i<argc; ++i) {
414 524 : rc = sqlite3_bind_value(c->vm, i+1, argv[i]);
415 524 : if (rc != SQLITE_OK) goto abort_filter;
416 524 : }
417 :
418 727 : rc = sqlite3_step(c->vm);
419 727 : CHECK_VFILTERTEST_ABORT();
420 :
421 727 : if (rc == SQLITE_DONE) {
422 11 : sqlite3_finalize(c->vm);
423 11 : c->vm = NULL;
424 727 : } else if (rc != SQLITE_ROW) {
425 1 : goto abort_filter;
426 : }
427 :
428 726 : return SQLITE_OK;
429 :
430 : abort_filter:
431 : // error condition
432 : DEBUG_VTAB("cloudsync_changesvtab_filter: %s\n", sqlite3_errmsg(db));
433 1 : if (c->vm) {
434 1 : sqlite3_finalize(c->vm);
435 1 : c->vm = NULL;
436 1 : }
437 1 : return rc;
438 727 : }
439 :
440 49633 : int cloudsync_changesvtab_next (sqlite3_vtab_cursor *cursor) {
441 : DEBUG_VTAB("cloudsync_changesvtab_next");
442 :
443 49633 : cloudsync_changes_cursor *c = (cloudsync_changes_cursor *)cursor;
444 49633 : int rc = sqlite3_step(c->vm);
445 :
446 49633 : if (rc == SQLITE_DONE) {
447 713 : sqlite3_finalize(c->vm);
448 713 : c->vm = NULL;
449 713 : rc = SQLITE_OK;
450 49633 : } else if (rc == SQLITE_ROW) {
451 48920 : rc = SQLITE_OK;
452 48920 : }
453 :
454 49633 : if (rc != SQLITE_OK) DEBUG_VTAB("cloudsync_changesvtab_next: %s\n", sqlite3_errmsg(c->vtab->db));
455 49633 : return rc;
456 : }
457 :
458 50359 : int cloudsync_changesvtab_eof (sqlite3_vtab_cursor *cursor) {
459 : DEBUG_VTAB("cloudsync_changesvtab_eof");
460 :
461 : // we must return false (zero) if the specified cursor currently points to a valid row of data, or true (non-zero) otherwise
462 50359 : cloudsync_changes_cursor *c = (cloudsync_changes_cursor *)cursor;
463 50359 : return (c->vm) ? 0 : 1;
464 : }
465 :
466 444312 : int cloudsync_changesvtab_column (sqlite3_vtab_cursor *cursor, sqlite3_context *ctx, int col) {
467 : DEBUG_VTAB("cloudsync_changesvtab_column %d\n", col);
468 :
469 444312 : cloudsync_changes_cursor *c = (cloudsync_changes_cursor *)cursor;
470 444312 : sqlite3_value *value = sqlite3_column_value(c->vm, col);
471 444312 : sqlite3_result_value(ctx, value);
472 :
473 444312 : return SQLITE_OK;
474 : }
475 :
476 88 : int cloudsync_changesvtab_rowid (sqlite3_vtab_cursor *cursor, sqlite3_int64 *rowid) {
477 : DEBUG_VTAB("cloudsync_changesvtab_rowid");
478 :
479 88 : cloudsync_changes_cursor *c = (cloudsync_changes_cursor *)cursor;
480 88 : sqlite3_int64 seq = sqlite3_column_int64(c->vm, COL_SEQ_INDEX);
481 88 : sqlite3_int64 db_version = sqlite3_column_int64(c->vm, COL_DBVERSION_INDEX);
482 :
483 : // for an explanation see https://github.com/sqliteai/sqlite-sync/blob/main/docs/RowID.md
484 88 : *rowid = (db_version << 30) | seq;
485 88 : return SQLITE_OK;
486 : }
487 :
488 3150 : int cloudsync_changesvtab_insert_gos (sqlite3_vtab *vtab, cloudsync_context *data, cloudsync_table_context *table, const char *insert_pk, int insert_pk_len, const char *insert_name, sqlite3_value *insert_value, sqlite3_int64 insert_col_version, sqlite3_int64 insert_db_version, const char *insert_site_id, int insert_site_id_len, sqlite3_int64 insert_seq, int64_t *rowid) {
489 : DEBUG_VTAB("cloudsync_changesvtab_insert_gos");
490 :
491 : // Grow-Only Set (GOS) Algorithm: Only insertions are allowed, deletions and updates are prevented from a trigger.
492 3150 : int rc = merge_insert_col(data, table, insert_pk, insert_pk_len, insert_name, insert_value, (int64_t)insert_col_version, (int64_t)insert_db_version, insert_site_id, insert_site_id_len, (int64_t)insert_seq, rowid);
493 :
494 3150 : if (rc != SQLITE_OK) {
495 0 : vtab_set_error(vtab, "%s", cloudsync_errmsg(data));
496 0 : }
497 :
498 3150 : return rc;
499 : }
500 :
501 49154 : int cloudsync_changesvtab_insert (sqlite3_vtab *vtab, int argc, sqlite3_value **argv, sqlite3_int64 *rowid) {
502 : DEBUG_VTAB("cloudsync_changesvtab_insert");
503 :
504 : // this function performs the merging logic for an insert in a cloud-synchronized table. It handles
505 : // different scenarios including conflicts, causal lengths, delete operations, and resurrecting rows
506 : // based on the incoming data (from remote nodes or clients) and the local database state
507 :
508 : // this function handles different CRDT algorithms (GOS, DWS, AWS, and CLS).
509 : // the merging strategy is determined based on the table->algo value.
510 :
511 : // meta table declaration:
512 : // tbl TEXT NOT NULL, pk BLOB NOT NULL, col_name TEXT NOT NULL,"
513 : // "col_value ANY, col_version INTEGER NOT NULL, db_version INTEGER NOT NULL,"
514 : // "site_id BLOB NOT NULL, cl INTEGER NOT NULL, seq INTEGER NOT NULL
515 :
516 : // meta information to retrieve from arguments:
517 : // argv[0] -> table name (TEXT)
518 : // argv[1] -> primary key (BLOB)
519 : // argv[2] -> column name (TEXT or NULL if sentinel)
520 : // argv[3] -> column value (ANY)
521 : // argv[4] -> column version (INTEGER)
522 : // argv[5] -> database version (INTEGER)
523 : // argv[6] -> site ID (BLOB, identifies the origin of the update)
524 : // argv[7] -> causal length (INTEGER, tracks the order of operations)
525 : // argv[8] -> sequence number (INTEGER, unique per operation)
526 :
527 : // extract table name
528 49154 : const char *insert_tbl = (const char *)sqlite3_value_text(argv[0]);
529 :
530 : // lookup table
531 49154 : cloudsync_context *data = (cloudsync_context *)(((cloudsync_changes_vtab *)vtab)->data);
532 49154 : cloudsync_table_context *table = table_lookup(data, insert_tbl);
533 49154 : if (!table) return vtab_set_error(vtab, "Unable to find table %s,", insert_tbl);
534 :
535 : // extract the remaining fields from the input values
536 49154 : const char *insert_pk = (const char *)sqlite3_value_blob(argv[1]);
537 49154 : int insert_pk_len = sqlite3_value_bytes(argv[1]);
538 49154 : const char *insert_name = (sqlite3_value_type(argv[2]) == SQLITE_NULL) ? CLOUDSYNC_TOMBSTONE_VALUE : (const char *)sqlite3_value_text(argv[2]);
539 49154 : sqlite3_value *insert_value = argv[3];
540 49154 : int64_t insert_col_version = (int64_t)sqlite3_value_int(argv[4]);
541 49154 : int64_t insert_db_version = (int64_t)sqlite3_value_int(argv[5]);
542 49154 : const char *insert_site_id = (const char *)sqlite3_value_blob(argv[6]);
543 49154 : int insert_site_id_len = sqlite3_value_bytes(argv[6]);
544 49154 : int64_t insert_cl = (int64_t)sqlite3_value_int(argv[7]);
545 49154 : int64_t insert_seq = (int64_t)sqlite3_value_int(argv[8]);
546 :
547 : // perform different logic for each different table algorithm
548 49154 : if (table_algo_isgos(table)) return cloudsync_changesvtab_insert_gos(vtab, data, table, insert_pk, insert_pk_len, insert_name, insert_value, insert_col_version, insert_db_version, insert_site_id, insert_site_id_len, insert_seq, (int64_t *)rowid);
549 :
550 46004 : int rc = merge_insert (data, table, insert_pk, insert_pk_len, insert_cl, insert_name, insert_value, insert_col_version, insert_db_version, insert_site_id, insert_site_id_len, insert_seq, (int64_t *)rowid);
551 46004 : if (rc != SQLITE_OK) {
552 0 : return vtab_set_error(vtab, "%s", cloudsync_errmsg(data));
553 : }
554 :
555 46004 : return SQLITE_OK;
556 49154 : }
557 :
558 49155 : int cloudsync_changesvtab_update (sqlite3_vtab *vtab, int argc, sqlite3_value **argv, sqlite3_int64 *rowid) {
559 : DEBUG_VTAB("cloudsync_changesvtab_update");
560 :
561 : // only INSERT statements are allowed
562 49155 : bool is_insert = (argc > 1 && sqlite3_value_type(argv[0]) == SQLITE_NULL);
563 49155 : if (!is_insert) {
564 1 : vtab_set_error(vtab, "Only INSERT and SELECT statements are allowed against the cloudsync_changes table");
565 1 : return SQLITE_MISUSE;
566 : }
567 :
568 : // argv[0] is set only in case of DELETE statement (it contains the rowid of a row in the virtual table to be deleted)
569 : // argv[1] is the rowid of a new row to be inserted into the virtual table (always NULL in our case)
570 : // so reduce the number of meaningful arguments by 2
571 49154 : return cloudsync_changesvtab_insert(vtab, argc-2, &argv[2], rowid);
572 49155 : }
573 :
574 : // MARK: -
575 :
576 193 : int cloudsync_vtab_register_changes (sqlite3 *db, cloudsync_context *xdata) {
577 : static sqlite3_module cloudsync_changes_module = {
578 : /* iVersion */ 0,
579 : /* xCreate */ 0, // Eponymous only virtual table
580 : /* xConnect */ cloudsync_changesvtab_connect,
581 : /* xBestIndex */ cloudsync_changesvtab_best_index,
582 : /* xDisconnect */ cloudsync_changesvtab_disconnect,
583 : /* xDestroy */ 0,
584 : /* xOpen */ cloudsync_changesvtab_open,
585 : /* xClose */ cloudsync_changesvtab_close,
586 : /* xFilter */ cloudsync_changesvtab_filter,
587 : /* xNext */ cloudsync_changesvtab_next,
588 : /* xEof */ cloudsync_changesvtab_eof,
589 : /* xColumn */ cloudsync_changesvtab_column,
590 : /* xRowid */ cloudsync_changesvtab_rowid,
591 : /* xUpdate */ cloudsync_changesvtab_update,
592 : /* xBegin */ 0,
593 : /* xSync */ 0,
594 : /* xCommit */ 0,
595 : /* xRollback */ 0,
596 : /* xFindMethod */ 0,
597 : /* xRename */ 0,
598 : /* xSavepoint */ 0,
599 : /* xRelease */ 0,
600 : /* xRollbackTo */ 0,
601 : /* xShadowName */ 0,
602 : /* xIntegrity */ 0
603 : };
604 :
605 193 : return sqlite3_create_module(db, "cloudsync_changes", &cloudsync_changes_module, (void *)xdata);
606 : }
|