Line data Source code
1 : //
2 : // cloudsync.c
3 : // cloudsync
4 : //
5 : // Created by Marco Bambini on 16/05/24.
6 : //
7 :
8 : #include <inttypes.h>
9 : #include <stdbool.h>
10 : #include <limits.h>
11 : #include <stdint.h>
12 : #include <stdlib.h>
13 : #include <string.h>
14 : #include <assert.h>
15 : #include <stdio.h>
16 : #include <errno.h>
17 : #include <math.h>
18 :
19 : #include "cloudsync.h"
20 : #include "cloudsync_private.h"
21 : #include "lz4.h"
22 : #include "pk.h"
23 : #include "vtab.h"
24 : #include "utils.h"
25 : #include "dbutils.h"
26 :
27 : #ifndef CLOUDSYNC_OMIT_NETWORK
28 : #include "network.h"
29 : #endif
30 :
31 : #ifdef _WIN32
32 : #include <winsock2.h>
33 : #include <ws2tcpip.h>
34 : #else
35 : #include <arpa/inet.h> // for htonl, htons, ntohl, ntohs
36 : #include <netinet/in.h> // for struct sockaddr_in, INADDR_ANY, etc. (if needed)
37 : #endif
38 :
39 : #ifndef htonll
40 : #if __BIG_ENDIAN__
41 : #define htonll(x) (x)
42 : #define ntohll(x) (x)
43 : #else
44 : #ifndef htobe64
45 : #define htonll(x) ((uint64_t)htonl((x) & 0xFFFFFFFF) << 32 | (uint64_t)htonl((x) >> 32))
46 : #define ntohll(x) ((uint64_t)ntohl((x) & 0xFFFFFFFF) << 32 | (uint64_t)ntohl((x) >> 32))
47 : #else
48 : #define htonll(x) htobe64(x)
49 : #define ntohll(x) be64toh(x)
50 : #endif
51 : #endif
52 : #endif
53 :
54 : #ifndef SQLITE_CORE
55 : SQLITE_EXTENSION_INIT1
56 : #endif
57 :
58 : #ifndef UNUSED_PARAMETER
59 : #define UNUSED_PARAMETER(X) (void)(X)
60 : #endif
61 :
62 : #ifdef _WIN32
63 : #define APIEXPORT __declspec(dllexport)
64 : #else
65 : #define APIEXPORT
66 : #endif
67 :
68 : #define CLOUDSYNC_DEFAULT_ALGO "cls"
69 : #define CLOUDSYNC_INIT_NTABLES 128
70 : #define CLOUDSYNC_VALUE_NOTSET -1
71 : #define CLOUDSYNC_MIN_DB_VERSION 0
72 :
73 : #define CLOUDSYNC_PAYLOAD_MINBUF_SIZE 512*1024
74 : #define CLOUDSYNC_PAYLOAD_VERSION 1
75 : #define CLOUDSYNC_PAYLOAD_SIGNATURE 'CLSY'
76 : #define CLOUDSYNC_PAYLOAD_APPLY_CALLBACK_KEY "cloudsync_payload_apply_callback"
77 :
78 : #ifndef MAX
79 : #define MAX(a, b) (((a)>(b))?(a):(b))
80 : #endif
81 :
82 : #define DEBUG_SQLITE_ERROR(_rc, _fn, _db) do {if (_rc != SQLITE_OK) printf("Error in %s: %s\n", _fn, sqlite3_errmsg(_db));} while (0)
83 :
84 : typedef enum {
85 : CLOUDSYNC_PK_INDEX_TBL = 0,
86 : CLOUDSYNC_PK_INDEX_PK = 1,
87 : CLOUDSYNC_PK_INDEX_COLNAME = 2,
88 : CLOUDSYNC_PK_INDEX_COLVALUE = 3,
89 : CLOUDSYNC_PK_INDEX_COLVERSION = 4,
90 : CLOUDSYNC_PK_INDEX_DBVERSION = 5,
91 : CLOUDSYNC_PK_INDEX_SITEID = 6,
92 : CLOUDSYNC_PK_INDEX_CL = 7,
93 : CLOUDSYNC_PK_INDEX_SEQ = 8
94 : } CLOUDSYNC_PK_INDEX;
95 :
96 : typedef enum {
97 : CLOUDSYNC_STMT_VALUE_ERROR = -1,
98 : CLOUDSYNC_STMT_VALUE_UNCHANGED = 0,
99 : CLOUDSYNC_STMT_VALUE_CHANGED = 1,
100 : } CLOUDSYNC_STMT_VALUE;
101 :
102 : typedef struct {
103 : sqlite3_context *context;
104 : int index;
105 : } cloudsync_pk_decode_context;
106 :
107 : #define SYNCBIT_SET(_data) _data->insync = 1
108 : #define SYNCBIT_RESET(_data) _data->insync = 0
109 : #define BUMP_SEQ(_data) ((_data)->seq += 1, (_data)->seq - 1)
110 :
111 : // MARK: -
112 :
113 : typedef struct {
114 : table_algo algo; // CRDT algoritm associated to the table
115 : char *name; // table name
116 : char **col_name; // array of column names
117 : sqlite3_stmt **col_merge_stmt; // array of merge insert stmt (indexed by col_name)
118 : sqlite3_stmt **col_value_stmt; // array of column value stmt (indexed by col_name)
119 : int *col_id; // array of column id
120 : int ncols; // number of non primary key cols
121 : int npks; // number of primary key cols
122 : bool enabled; // flag to check if a table is enabled or disabled
123 : #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
124 : bool rowid_only; // a table with no primary keys other than the implicit rowid
125 : #endif
126 :
127 : char **pk_name; // array of primary key names
128 :
129 : // precompiled statements
130 : sqlite3_stmt *meta_pkexists_stmt; // check if a primary key already exist in the augmented table
131 : sqlite3_stmt *meta_sentinel_update_stmt; // update a local sentinel row
132 : sqlite3_stmt *meta_sentinel_insert_stmt; // insert a local sentinel row
133 : sqlite3_stmt *meta_row_insert_update_stmt; // insert/update a local row
134 : sqlite3_stmt *meta_row_drop_stmt; // delete rows from meta
135 : sqlite3_stmt *meta_update_move_stmt; // update rows in meta when pk changes
136 : sqlite3_stmt *meta_local_cl_stmt; // compute local cl value
137 : sqlite3_stmt *meta_winner_clock_stmt; // get the rowid of the last inserted/updated row in the meta table
138 : sqlite3_stmt *meta_merge_delete_drop;
139 : sqlite3_stmt *meta_zero_clock_stmt;
140 : sqlite3_stmt *meta_col_version_stmt;
141 : sqlite3_stmt *meta_site_id_stmt;
142 :
143 : sqlite3_stmt *real_col_values_stmt; // retrieve all column values based on pk
144 : sqlite3_stmt *real_merge_delete_stmt;
145 : sqlite3_stmt *real_merge_sentinel_stmt;
146 :
147 : } cloudsync_table_context;
148 :
149 : struct cloudsync_pk_decode_bind_context {
150 : sqlite3_stmt *vm;
151 : char *tbl;
152 : int64_t tbl_len;
153 : const void *pk;
154 : int64_t pk_len;
155 : char *col_name;
156 : int64_t col_name_len;
157 : int64_t col_version;
158 : int64_t db_version;
159 : const void *site_id;
160 : int64_t site_id_len;
161 : int64_t cl;
162 : int64_t seq;
163 : };
164 :
165 : struct cloudsync_context {
166 : sqlite3_context *sqlite_ctx;
167 :
168 : char *libversion;
169 : uint8_t site_id[UUID_LEN];
170 : int insync;
171 : int debug;
172 : bool merge_equal_values;
173 : bool temp_bool; // temporary value used in callback
174 : void *aux_data;
175 :
176 : // stmts and context values
177 : bool pragma_checked; // we need to check PRAGMAs only once per transaction
178 : sqlite3_stmt *schema_version_stmt;
179 : sqlite3_stmt *data_version_stmt;
180 : sqlite3_stmt *db_version_stmt;
181 : sqlite3_stmt *getset_siteid_stmt;
182 : int data_version;
183 : int schema_version;
184 : uint64_t schema_hash;
185 :
186 : // set at the start of each transaction on the first invocation and
187 : // re-set on transaction commit or rollback
188 : sqlite3_int64 db_version;
189 : // the version that the db will be set to at the end of the transaction
190 : // if that transaction were to commit at the time this value is checked
191 : sqlite3_int64 pending_db_version;
192 : // used to set an order inside each transaction
193 : int seq;
194 :
195 : // augmented tables are stored in-memory so we do not need to retrieve information about col names and cid
196 : // from the disk each time a write statement is performed
197 : // we do also not need to use an hash map here because for few tables the direct in-memory comparison with table name is faster
198 : cloudsync_table_context **tables;
199 : int tables_count;
200 : int tables_alloc;
201 : };
202 :
203 : typedef struct {
204 : char *buffer;
205 : size_t balloc;
206 : size_t bused;
207 : uint64_t nrows;
208 : uint16_t ncols;
209 : } cloudsync_data_payload;
210 :
211 : #ifdef _MSC_VER
212 : #pragma pack(push, 1) // For MSVC: pack struct with 1-byte alignment
213 : #define PACKED
214 : #else
215 : #define PACKED __attribute__((__packed__))
216 : #endif
217 :
218 : typedef struct PACKED {
219 : uint32_t signature; // 'CLSY'
220 : uint8_t version; // protocol version
221 : uint8_t libversion[3]; // major.minor.patch
222 : uint32_t expanded_size;
223 : uint16_t ncols;
224 : uint32_t nrows;
225 : uint64_t schema_hash;
226 : uint8_t unused[6]; // padding to ensure the struct is exactly 32 bytes
227 : } cloudsync_payload_header;
228 :
229 : typedef struct {
230 : sqlite3_value *table_name;
231 : sqlite3_value **new_values;
232 : sqlite3_value **old_values;
233 : int count;
234 : int capacity;
235 : } cloudsync_update_payload;
236 :
237 : #ifdef _MSC_VER
238 : #pragma pack(pop)
239 : #endif
240 :
241 : #if CLOUDSYNC_UNITTEST
242 : bool force_uncompressed_blob = false;
243 : #define CHECK_FORCE_UNCOMPRESSED_BUFFER() if (force_uncompressed_blob) use_uncompressed_buffer = true
244 : #else
245 : #define CHECK_FORCE_UNCOMPRESSED_BUFFER()
246 : #endif
247 :
248 : int db_version_rebuild_stmt (sqlite3 *db, cloudsync_context *data);
249 : int cloudsync_load_siteid (sqlite3 *db, cloudsync_context *data);
250 : int local_mark_insert_or_update_meta (sqlite3 *db, cloudsync_table_context *table, const char *pk, size_t pklen, const char *col_name, sqlite3_int64 db_version, int seq);
251 :
252 : // MARK: - STMT Utils -
253 :
254 25861 : CLOUDSYNC_STMT_VALUE stmt_execute (sqlite3_stmt *stmt, cloudsync_context *data) {
255 25861 : int rc = sqlite3_step(stmt);
256 25861 : if (rc != SQLITE_ROW && rc != SQLITE_DONE) {
257 2 : if (data) DEBUG_SQLITE_ERROR(rc, "stmt_execute", sqlite3_db_handle(stmt));
258 2 : sqlite3_reset(stmt);
259 2 : return CLOUDSYNC_STMT_VALUE_ERROR;
260 : }
261 :
262 25859 : CLOUDSYNC_STMT_VALUE result = CLOUDSYNC_STMT_VALUE_CHANGED;
263 25859 : if (stmt == data->data_version_stmt) {
264 25315 : int version = sqlite3_column_int(stmt, 0);
265 25315 : if (version != data->data_version) {
266 85 : data->data_version = version;
267 85 : } else {
268 25230 : result = CLOUDSYNC_STMT_VALUE_UNCHANGED;
269 : }
270 25859 : } else if (stmt == data->schema_version_stmt) {
271 272 : int version = sqlite3_column_int(stmt, 0);
272 272 : if (version > data->schema_version) {
273 143 : data->schema_version = version;
274 143 : } else {
275 129 : result = CLOUDSYNC_STMT_VALUE_UNCHANGED;
276 : }
277 :
278 544 : } else if (stmt == data->db_version_stmt) {
279 272 : data->db_version = (rc == SQLITE_DONE) ? CLOUDSYNC_MIN_DB_VERSION : sqlite3_column_int64(stmt, 0);
280 272 : }
281 :
282 25859 : sqlite3_reset(stmt);
283 25859 : return result;
284 25861 : }
285 :
286 3367 : int stmt_count (sqlite3_stmt *stmt, const char *value, size_t len, int type) {
287 3367 : int result = -1;
288 3367 : int rc = SQLITE_OK;
289 :
290 3367 : if (value) {
291 3366 : rc = (type == SQLITE_TEXT) ? sqlite3_bind_text(stmt, 1, value, (int)len, SQLITE_STATIC) : sqlite3_bind_blob(stmt, 1, value, (int)len, SQLITE_STATIC);
292 3366 : if (rc != SQLITE_OK) goto cleanup;
293 3366 : }
294 :
295 3367 : rc = sqlite3_step(stmt);
296 6734 : if (rc == SQLITE_DONE) {
297 1 : result = 0;
298 1 : rc = SQLITE_OK;
299 3367 : } else if (rc == SQLITE_ROW) {
300 3366 : result = sqlite3_column_int(stmt, 0);
301 3366 : rc = SQLITE_OK;
302 3366 : }
303 :
304 : cleanup:
305 3367 : DEBUG_SQLITE_ERROR(rc, "stmt_count", sqlite3_db_handle(stmt));
306 3367 : sqlite3_reset(stmt);
307 3367 : return result;
308 : }
309 :
310 200312 : sqlite3_stmt *stmt_reset (sqlite3_stmt *stmt) {
311 200312 : sqlite3_clear_bindings(stmt);
312 200312 : sqlite3_reset(stmt);
313 200312 : return NULL;
314 : }
315 :
316 234 : int stmts_add_tocontext (sqlite3 *db, cloudsync_context *data) {
317 : DEBUG_DBFUNCTION("cloudsync_add_stmts");
318 :
319 234 : if (data->data_version_stmt == NULL) {
320 87 : const char *sql = "PRAGMA data_version;";
321 87 : int rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &data->data_version_stmt, NULL);
322 : DEBUG_STMT("data_version_stmt %p", data->data_version_stmt);
323 87 : if (rc != SQLITE_OK) return rc;
324 : DEBUG_SQL("data_version_stmt: %s", sql);
325 87 : }
326 :
327 234 : if (data->schema_version_stmt == NULL) {
328 87 : const char *sql = "PRAGMA schema_version;";
329 87 : int rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &data->schema_version_stmt, NULL);
330 : DEBUG_STMT("schema_version_stmt %p", data->schema_version_stmt);
331 87 : if (rc != SQLITE_OK) return rc;
332 : DEBUG_SQL("schema_version_stmt: %s", sql);
333 87 : }
334 :
335 234 : if (data->getset_siteid_stmt == NULL) {
336 : // get and set index of the site_id
337 : // in SQLite, we can’t directly combine an INSERT and a SELECT to both insert a row and return an identifier (rowid) in a single statement,
338 : // however, we can use a workaround by leveraging the INSERT statement with ON CONFLICT DO UPDATE and then combining it with RETURNING rowid
339 87 : const char *sql = "INSERT INTO cloudsync_site_id (site_id) VALUES (?) ON CONFLICT(site_id) DO UPDATE SET site_id = site_id RETURNING rowid;";
340 87 : int rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &data->getset_siteid_stmt, NULL);
341 : DEBUG_STMT("getset_siteid_stmt %p", data->getset_siteid_stmt);
342 87 : if (rc != SQLITE_OK) return rc;
343 : DEBUG_SQL("getset_siteid_stmt: %s", sql);
344 87 : }
345 :
346 234 : return db_version_rebuild_stmt(db, data);
347 234 : }
348 :
349 : // MARK: - Database Version -
350 :
351 290 : char *db_version_build_query (sqlite3 *db) {
352 : // this function must be manually called each time tables changes
353 : // because the query plan changes too and it must be re-prepared
354 : // unfortunately there is no other way
355 :
356 : // we need to execute a query like:
357 : /*
358 : SELECT max(version) as version FROM (
359 : SELECT max(db_version) as version FROM "table1_cloudsync"
360 : UNION ALL
361 : SELECT max(db_version) as version FROM "table2_cloudsync"
362 : UNION ALL
363 : SELECT max(db_version) as version FROM "table3_cloudsync"
364 : UNION
365 : SELECT value as version FROM cloudsync_settings WHERE key = 'pre_alter_dbversion'
366 : )
367 : */
368 :
369 : // the good news is that the query can be computed in SQLite without the need to do any extra computation from the host language
370 290 : const char *sql = "WITH table_names AS ("
371 : "SELECT format('%w', name) as tbl_name "
372 : "FROM sqlite_master "
373 : "WHERE type='table' "
374 : "AND name LIKE '%_cloudsync'"
375 : "), "
376 : "query_parts AS ("
377 : "SELECT 'SELECT max(db_version) as version FROM \"' || tbl_name || '\"' as part FROM table_names"
378 : "), "
379 : "combined_query AS ("
380 : "SELECT GROUP_CONCAT(part, ' UNION ALL ') || ' UNION SELECT value as version FROM cloudsync_settings WHERE key = ''pre_alter_dbversion''' as full_query FROM query_parts"
381 : ") "
382 : "SELECT 'SELECT max(version) as version FROM (' || full_query || ');' FROM combined_query;";
383 290 : return dbutils_text_select(db, sql);
384 : }
385 :
386 377 : int db_version_rebuild_stmt (sqlite3 *db, cloudsync_context *data) {
387 377 : if (data->db_version_stmt) {
388 203 : sqlite3_finalize(data->db_version_stmt);
389 203 : data->db_version_stmt = NULL;
390 203 : }
391 :
392 377 : sqlite3_int64 count = dbutils_table_settings_count_tables(db);
393 377 : if (count == 0) return SQLITE_OK;
394 290 : else if (count == -1) {
395 0 : dbutils_context_result_error(data->sqlite_ctx, "%s", sqlite3_errmsg(db));
396 0 : return SQLITE_ERROR;
397 : }
398 :
399 290 : char *sql = db_version_build_query(db);
400 290 : if (!sql) return SQLITE_NOMEM;
401 : DEBUG_SQL("db_version_stmt: %s", sql);
402 :
403 290 : int rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &data->db_version_stmt, NULL);
404 : DEBUG_STMT("db_version_stmt %p", data->db_version_stmt);
405 290 : cloudsync_memory_free(sql);
406 290 : return rc;
407 377 : }
408 :
409 272 : int db_version_rerun (sqlite3 *db, cloudsync_context *data) {
410 272 : CLOUDSYNC_STMT_VALUE schema_changed = stmt_execute(data->schema_version_stmt, data);
411 272 : if (schema_changed == CLOUDSYNC_STMT_VALUE_ERROR) return -1;
412 :
413 272 : if (schema_changed == CLOUDSYNC_STMT_VALUE_CHANGED) {
414 143 : int rc = db_version_rebuild_stmt(db, data);
415 143 : if (rc != SQLITE_OK) return -1;
416 143 : }
417 :
418 272 : CLOUDSYNC_STMT_VALUE rc = stmt_execute(data->db_version_stmt, data);
419 272 : if (rc == CLOUDSYNC_STMT_VALUE_ERROR) return -1;
420 272 : return 0;
421 272 : }
422 :
423 25315 : int db_version_check_uptodate (sqlite3 *db, cloudsync_context *data) {
424 : // perform a PRAGMA data_version to check if some other process write any data
425 25315 : CLOUDSYNC_STMT_VALUE rc = stmt_execute(data->data_version_stmt, data);
426 25315 : if (rc == CLOUDSYNC_STMT_VALUE_ERROR) return -1;
427 :
428 : // db_version is already set and there is no need to update it
429 25315 : if (data->db_version != CLOUDSYNC_VALUE_NOTSET && rc == CLOUDSYNC_STMT_VALUE_UNCHANGED) return 0;
430 :
431 272 : return db_version_rerun(db, data);
432 25315 : }
433 :
434 25289 : sqlite3_int64 db_version_next (sqlite3 *db, cloudsync_context *data, sqlite3_int64 merging_version) {
435 25289 : int rc = db_version_check_uptodate(db, data);
436 25289 : if (rc != SQLITE_OK) return -1;
437 :
438 25289 : sqlite3_int64 result = data->db_version + 1;
439 25289 : if (result < data->pending_db_version) result = data->pending_db_version;
440 25289 : if (merging_version != CLOUDSYNC_VALUE_NOTSET && result < merging_version) result = merging_version;
441 25289 : data->pending_db_version = result;
442 :
443 25289 : return result;
444 25289 : }
445 :
446 : // MARK: -
447 :
448 0 : void *cloudsync_get_auxdata (sqlite3_context *context) {
449 0 : cloudsync_context *data = (context) ? (cloudsync_context *)sqlite3_user_data(context) : NULL;
450 0 : return (data) ? data->aux_data : NULL;
451 : }
452 :
453 0 : void cloudsync_set_auxdata (sqlite3_context *context, void *xdata) {
454 0 : cloudsync_context *data = (context) ? (cloudsync_context *)sqlite3_user_data(context) : NULL;
455 0 : if (data) data->aux_data = xdata;
456 0 : }
457 :
458 : // MARK: - PK Context -
459 :
460 81485 : char *cloudsync_pk_context_tbl (cloudsync_pk_decode_bind_context *ctx, int64_t *tbl_len) {
461 81485 : *tbl_len = ctx->tbl_len;
462 81485 : return ctx->tbl;
463 : }
464 :
465 81485 : void *cloudsync_pk_context_pk (cloudsync_pk_decode_bind_context *ctx, int64_t *pk_len) {
466 81485 : *pk_len = ctx->pk_len;
467 81485 : return (void *)ctx->pk;
468 : }
469 :
470 81485 : char *cloudsync_pk_context_colname (cloudsync_pk_decode_bind_context *ctx, int64_t *colname_len) {
471 81485 : *colname_len = ctx->col_name_len;
472 81485 : return ctx->col_name;
473 : }
474 :
475 81485 : int64_t cloudsync_pk_context_cl (cloudsync_pk_decode_bind_context *ctx) {
476 81485 : return ctx->cl;
477 : }
478 :
479 81485 : int64_t cloudsync_pk_context_dbversion (cloudsync_pk_decode_bind_context *ctx) {
480 81485 : return ctx->db_version;
481 : }
482 :
483 : // MARK: - Table Utils -
484 :
485 106 : char *table_build_values_sql (sqlite3 *db, cloudsync_table_context *table) {
486 106 : char *sql = NULL;
487 :
488 : /*
489 : This SQL statement dynamically generates a SELECT query for a specified table.
490 : It uses Common Table Expressions (CTEs) to construct the column names and
491 : primary key conditions based on the table schema, which is obtained through
492 : the `pragma_table_info` function.
493 :
494 : 1. `col_names` CTE:
495 : - Retrieves a comma-separated list of non-primary key column names from
496 : the specified table's schema.
497 :
498 : 2. `pk_where` CTE:
499 : - Retrieves a condition string representing the primary key columns in the
500 : format: "column1=? AND column2=? AND ...", used to create the WHERE clause
501 : for selecting rows based on primary key values.
502 :
503 : 3. Final SELECT:
504 : - Constructs the complete SELECT statement as a string, combining:
505 : - Column names from `col_names`.
506 : - The target table name.
507 : - The WHERE clause conditions from `pk_where`.
508 :
509 : The resulting query can be used to select rows from the table based on primary
510 : key values, and can be executed within the application to retrieve data dynamically.
511 : */
512 :
513 : // Unfortunately in SQLite column names (or table names) cannot be bound parameters in a SELECT statement
514 : // otherwise we should have used something like SELECT 'SELECT ? FROM %w WHERE rowid=?';
515 :
516 106 : char *singlequote_escaped_table_name = cloudsync_memory_mprintf("%q", table->name);
517 :
518 : #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
519 : if (table->rowid_only) {
520 : sql = memory_mprintf("WITH col_names AS (SELECT group_concat('\"' || format('%%w', name) || '\"', ',') AS cols FROM pragma_table_info('%q') WHERE pk=0 ORDER BY cid) SELECT 'SELECT ' || (SELECT cols FROM col_names) || ' FROM \"%w\" WHERE rowid=?;'", table->name, table->name);
521 : goto process_process;
522 : }
523 : #endif
524 :
525 106 : sql = cloudsync_memory_mprintf("WITH col_names AS (SELECT group_concat('\"' || format('%%w', name) || '\"', ',') AS cols FROM pragma_table_info('%q') WHERE pk=0 ORDER BY cid), pk_where AS (SELECT group_concat('\"' || format('%%w', name) || '\"', '=? AND ') || '=?' AS pk_clause FROM pragma_table_info('%q') WHERE pk>0 ORDER BY pk) SELECT 'SELECT ' || (SELECT cols FROM col_names) || ' FROM \"%w\" WHERE ' || (SELECT pk_clause FROM pk_where) || ';'", table->name, table->name, singlequote_escaped_table_name);
526 :
527 : #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
528 : process_process:
529 : #endif
530 106 : cloudsync_memory_free(singlequote_escaped_table_name);
531 106 : if (!sql) return NULL;
532 106 : char *query = dbutils_text_select(db, sql);
533 106 : cloudsync_memory_free(sql);
534 :
535 106 : return query;
536 106 : }
537 :
538 141 : char *table_build_mergedelete_sql (sqlite3 *db, cloudsync_table_context *table) {
539 : #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
540 : if (table->rowid_only) {
541 : char *sql = memory_mprintf("DELETE FROM \"%w\" WHERE rowid=?;", table->name);
542 : return sql;
543 : }
544 : #endif
545 :
546 141 : char *singlequote_escaped_table_name = cloudsync_memory_mprintf("%q", table->name);
547 141 : char *sql = cloudsync_memory_mprintf("WITH pk_where AS (SELECT group_concat('\"' || format('%%w', name) || '\"', '=? AND ') || '=?' AS pk_clause FROM pragma_table_info('%q') WHERE pk>0 ORDER BY pk) SELECT 'DELETE FROM \"%w\" WHERE ' || (SELECT pk_clause FROM pk_where) || ';'", table->name, singlequote_escaped_table_name);
548 141 : cloudsync_memory_free(singlequote_escaped_table_name);
549 141 : if (!sql) return NULL;
550 :
551 141 : char *query = dbutils_text_select(db, sql);
552 141 : cloudsync_memory_free(sql);
553 :
554 141 : return query;
555 141 : }
556 :
557 1035 : char *table_build_mergeinsert_sql (sqlite3 *db, cloudsync_table_context *table, const char *colname) {
558 1035 : char *sql = NULL;
559 :
560 : #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
561 : if (table->rowid_only) {
562 : if (colname == NULL) {
563 : // INSERT OR IGNORE INTO customers (first_name,last_name) VALUES (?,?);
564 : sql = memory_mprintf("INSERT OR IGNORE INTO \"%w\" (rowid) VALUES (?);", table->name);
565 : } else {
566 : // INSERT INTO customers (first_name,last_name,age) VALUES (?,?,?) ON CONFLICT DO UPDATE SET age=?;
567 : sql = memory_mprintf("INSERT INTO \"%w\" (rowid, \"%w\") VALUES (?, ?) ON CONFLICT DO UPDATE SET \"%w\"=?;", table->name, colname, colname);
568 : }
569 : return sql;
570 : }
571 : #endif
572 :
573 1035 : char *singlequote_escaped_table_name = cloudsync_memory_mprintf("%q", table->name);
574 :
575 1035 : if (colname == NULL) {
576 : // is sentinel insert
577 141 : sql = cloudsync_memory_mprintf("WITH pk_where AS (SELECT group_concat('\"' || format('%%w', name) || '\"') AS pk_clause FROM pragma_table_info('%q') WHERE pk>0 ORDER BY pk), pk_bind AS (SELECT group_concat('?') AS pk_binding FROM pragma_table_info('%q') WHERE pk>0 ORDER BY pk) SELECT 'INSERT OR IGNORE INTO \"%w\" (' || (SELECT pk_clause FROM pk_where) || ') VALUES (' || (SELECT pk_binding FROM pk_bind) || ');'", table->name, table->name, singlequote_escaped_table_name);
578 141 : } else {
579 894 : char *singlequote_escaped_col_name = cloudsync_memory_mprintf("%q", colname);
580 894 : sql = cloudsync_memory_mprintf("WITH pk_where AS (SELECT group_concat('\"' || format('%%w', name) || '\"') AS pk_clause FROM pragma_table_info('%q') WHERE pk>0 ORDER BY pk), pk_bind AS (SELECT group_concat('?') AS pk_binding FROM pragma_table_info('%q') WHERE pk>0 ORDER BY pk) SELECT 'INSERT INTO \"%w\" (' || (SELECT pk_clause FROM pk_where) || ',\"%w\") VALUES (' || (SELECT pk_binding FROM pk_bind) || ',?) ON CONFLICT DO UPDATE SET \"%w\"=?;'", table->name, table->name, singlequote_escaped_table_name, singlequote_escaped_col_name, singlequote_escaped_col_name);
581 894 : cloudsync_memory_free(singlequote_escaped_col_name);
582 :
583 : }
584 1035 : cloudsync_memory_free(singlequote_escaped_table_name);
585 1035 : if (!sql) return NULL;
586 :
587 1035 : char *query = dbutils_text_select(db, sql);
588 1035 : cloudsync_memory_free(sql);
589 :
590 1035 : return query;
591 1035 : }
592 :
593 1015 : char *table_build_value_sql (sqlite3 *db, cloudsync_table_context *table, const char *colname) {
594 1015 : char *colnamequote = dbutils_is_star_table(colname) ? "" : "\"";
595 :
596 : #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
597 : if (table->rowid_only) {
598 : char *sql = memory_mprintf("SELECT %s%w%s FROM \"%w\" WHERE rowid=?;", colnamequote, colname, colnamequote, table->name);
599 : return sql;
600 : }
601 : #endif
602 :
603 : // SELECT age FROM customers WHERE first_name=? AND last_name=?;
604 1015 : char *singlequote_escaped_table_name = cloudsync_memory_mprintf("%q", table->name);
605 1015 : char *singlequote_escaped_col_name = cloudsync_memory_mprintf("%q", colname);
606 1015 : char *sql = cloudsync_memory_mprintf("WITH pk_where AS (SELECT group_concat('\"' || format('%%w', name) || '\"', '=? AND ') || '=?' AS pk_clause FROM pragma_table_info('%q') WHERE pk>0 ORDER BY pk) SELECT 'SELECT %s%w%s FROM \"%w\" WHERE ' || (SELECT pk_clause FROM pk_where) || ';'", table->name, colnamequote, singlequote_escaped_col_name, colnamequote, singlequote_escaped_table_name);
607 1015 : cloudsync_memory_free(singlequote_escaped_col_name);
608 1015 : cloudsync_memory_free(singlequote_escaped_table_name);
609 1015 : if (!sql) return NULL;
610 :
611 1015 : char *query = dbutils_text_select(db, sql);
612 1015 : cloudsync_memory_free(sql);
613 :
614 1015 : return query;
615 1015 : }
616 :
617 141 : cloudsync_table_context *table_create (const char *name, table_algo algo) {
618 : DEBUG_DBFUNCTION("table_create %s", name);
619 :
620 141 : cloudsync_table_context *table = (cloudsync_table_context *)cloudsync_memory_zeroalloc(sizeof(cloudsync_table_context));
621 141 : if (!table) return NULL;
622 :
623 141 : table->algo = algo;
624 141 : table->name = cloudsync_string_dup(name, true);
625 141 : if (!table->name) {
626 0 : cloudsync_memory_free(table);
627 0 : return NULL;
628 : }
629 141 : table->enabled = true;
630 :
631 141 : return table;
632 141 : }
633 :
634 141 : void table_free (cloudsync_table_context *table) {
635 : DEBUG_DBFUNCTION("table_free %s", (table) ? (table->name) : "NULL");
636 141 : if (!table) return;
637 :
638 141 : if (table->ncols > 0) {
639 106 : if (table->col_name) {
640 1000 : for (int i=0; i<table->ncols; ++i) {
641 894 : cloudsync_memory_free(table->col_name[i]);
642 894 : }
643 106 : cloudsync_memory_free(table->col_name);
644 106 : }
645 106 : if (table->col_merge_stmt) {
646 1000 : for (int i=0; i<table->ncols; ++i) {
647 894 : sqlite3_finalize(table->col_merge_stmt[i]);
648 894 : }
649 106 : cloudsync_memory_free(table->col_merge_stmt);
650 106 : }
651 106 : if (table->col_value_stmt) {
652 1000 : for (int i=0; i<table->ncols; ++i) {
653 894 : sqlite3_finalize(table->col_value_stmt[i]);
654 894 : }
655 106 : cloudsync_memory_free(table->col_value_stmt);
656 106 : }
657 106 : if (table->col_id) {
658 106 : cloudsync_memory_free(table->col_id);
659 106 : }
660 106 : }
661 :
662 141 : if (table->pk_name) sqlite3_free_table(table->pk_name);
663 141 : if (table->name) cloudsync_memory_free(table->name);
664 141 : if (table->meta_pkexists_stmt) sqlite3_finalize(table->meta_pkexists_stmt);
665 141 : if (table->meta_sentinel_update_stmt) sqlite3_finalize(table->meta_sentinel_update_stmt);
666 141 : if (table->meta_sentinel_insert_stmt) sqlite3_finalize(table->meta_sentinel_insert_stmt);
667 141 : if (table->meta_row_insert_update_stmt) sqlite3_finalize(table->meta_row_insert_update_stmt);
668 141 : if (table->meta_row_drop_stmt) sqlite3_finalize(table->meta_row_drop_stmt);
669 141 : if (table->meta_update_move_stmt) sqlite3_finalize(table->meta_update_move_stmt);
670 141 : if (table->meta_local_cl_stmt) sqlite3_finalize(table->meta_local_cl_stmt);
671 141 : if (table->meta_winner_clock_stmt) sqlite3_finalize(table->meta_winner_clock_stmt);
672 141 : if (table->meta_merge_delete_drop) sqlite3_finalize(table->meta_merge_delete_drop);
673 141 : if (table->meta_zero_clock_stmt) sqlite3_finalize(table->meta_zero_clock_stmt);
674 141 : if (table->meta_col_version_stmt) sqlite3_finalize(table->meta_col_version_stmt);
675 141 : if (table->meta_site_id_stmt) sqlite3_finalize(table->meta_site_id_stmt);
676 :
677 141 : if (table->real_col_values_stmt) sqlite3_finalize(table->real_col_values_stmt);
678 141 : if (table->real_merge_delete_stmt) sqlite3_finalize(table->real_merge_delete_stmt);
679 141 : if (table->real_merge_sentinel_stmt) sqlite3_finalize(table->real_merge_sentinel_stmt);
680 :
681 141 : cloudsync_memory_free(table);
682 141 : }
683 :
684 141 : int table_add_stmts (sqlite3 *db, cloudsync_table_context *table, int ncols) {
685 141 : int rc = SQLITE_OK;
686 141 : char *sql = NULL;
687 :
688 : // META TABLE statements
689 :
690 : // CREATE TABLE IF NOT EXISTS \"%w_cloudsync\" (pk BLOB NOT NULL, col_name TEXT NOT NULL, col_version INTEGER, db_version INTEGER, site_id INTEGER DEFAULT 0, seq INTEGER, PRIMARY KEY (pk, col_name));
691 :
692 : // precompile the pk exists statement
693 : // we do not need an index on the pk column because it is already covered by the fact that it is part of the prikeys
694 : // EXPLAIN QUERY PLAN reports: SEARCH table_name USING PRIMARY KEY (pk=?)
695 141 : sql = cloudsync_memory_mprintf("SELECT EXISTS(SELECT 1 FROM \"%w_cloudsync\" WHERE pk = ? LIMIT 1);", table->name);
696 141 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
697 : DEBUG_SQL("meta_pkexists_stmt: %s", sql);
698 :
699 141 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->meta_pkexists_stmt, NULL);
700 :
701 141 : cloudsync_memory_free(sql);
702 141 : if (rc != SQLITE_OK) goto cleanup;
703 :
704 : // precompile the update local sentinel statement
705 141 : sql = cloudsync_memory_mprintf("UPDATE \"%w_cloudsync\" SET col_version = CASE col_version %% 2 WHEN 0 THEN col_version + 1 ELSE col_version + 2 END, db_version = ?, seq = ?, site_id = 0 WHERE pk = ? AND col_name = '%s';", table->name, CLOUDSYNC_TOMBSTONE_VALUE);
706 141 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
707 : DEBUG_SQL("meta_sentinel_update_stmt: %s", sql);
708 :
709 141 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->meta_sentinel_update_stmt, NULL);
710 141 : cloudsync_memory_free(sql);
711 141 : if (rc != SQLITE_OK) goto cleanup;
712 :
713 : // precompile the insert local sentinel statement
714 141 : sql = cloudsync_memory_mprintf("INSERT INTO \"%w_cloudsync\" (pk, col_name, col_version, db_version, seq, site_id) SELECT ?, '%s', 1, ?, ?, 0 WHERE 1 ON CONFLICT DO UPDATE SET col_version = CASE col_version %% 2 WHEN 0 THEN col_version + 1 ELSE col_version + 2 END, db_version = ?, seq = ?, site_id = 0;", table->name, CLOUDSYNC_TOMBSTONE_VALUE);
715 141 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
716 : DEBUG_SQL("meta_sentinel_insert_stmt: %s", sql);
717 :
718 141 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->meta_sentinel_insert_stmt, NULL);
719 141 : cloudsync_memory_free(sql);
720 141 : if (rc != SQLITE_OK) goto cleanup;
721 :
722 : // precompile the insert/update local row statement
723 141 : sql = cloudsync_memory_mprintf("INSERT INTO \"%w_cloudsync\" (pk, col_name, col_version, db_version, seq, site_id ) SELECT ?, ?, ?, ?, ?, 0 WHERE 1 ON CONFLICT DO UPDATE SET col_version = col_version + 1, db_version = ?, seq = ?, site_id = 0;", table->name);
724 141 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
725 : DEBUG_SQL("meta_row_insert_update_stmt: %s", sql);
726 :
727 141 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->meta_row_insert_update_stmt, NULL);
728 141 : cloudsync_memory_free(sql);
729 141 : if (rc != SQLITE_OK) goto cleanup;
730 :
731 : // precompile the delete rows from meta
732 141 : sql = cloudsync_memory_mprintf("DELETE FROM \"%w_cloudsync\" WHERE pk=? AND col_name!='%s';", table->name, CLOUDSYNC_TOMBSTONE_VALUE);
733 141 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
734 : DEBUG_SQL("meta_row_drop_stmt: %s", sql);
735 :
736 141 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->meta_row_drop_stmt, NULL);
737 141 : cloudsync_memory_free(sql);
738 141 : if (rc != SQLITE_OK) goto cleanup;
739 :
740 : // precompile the update rows from meta when pk changes
741 : // see https://github.com/sqliteai/sqlite-sync/blob/main/docs/PriKey.md for more details
742 141 : sql = cloudsync_memory_mprintf("UPDATE OR REPLACE \"%w_cloudsync\" SET pk=?, db_version=?, col_version=1, seq=cloudsync_seq(), site_id=0 WHERE (pk=? AND col_name!='%s');", table->name, CLOUDSYNC_TOMBSTONE_VALUE);
743 141 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
744 : DEBUG_SQL("meta_update_move_stmt: %s", sql);
745 :
746 141 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->meta_update_move_stmt, NULL);
747 141 : cloudsync_memory_free(sql);
748 141 : if (rc != SQLITE_OK) goto cleanup;
749 :
750 : // local cl
751 141 : sql = cloudsync_memory_mprintf("SELECT COALESCE((SELECT col_version FROM \"%w_cloudsync\" WHERE pk=? AND col_name='%s'), (SELECT 1 FROM \"%w_cloudsync\" WHERE pk=?));", table->name, CLOUDSYNC_TOMBSTONE_VALUE, table->name);
752 141 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
753 : DEBUG_SQL("meta_local_cl_stmt: %s", sql);
754 :
755 141 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->meta_local_cl_stmt, NULL);
756 141 : cloudsync_memory_free(sql);
757 141 : if (rc != SQLITE_OK) goto cleanup;
758 :
759 : // rowid of the last inserted/updated row in the meta table
760 141 : sql = cloudsync_memory_mprintf("INSERT OR REPLACE INTO \"%w_cloudsync\" (pk, col_name, col_version, db_version, seq, site_id) VALUES (?, ?, ?, cloudsync_db_version_next(?), ?, ?) RETURNING ((db_version << 30) | seq);", table->name);
761 141 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
762 : DEBUG_SQL("meta_winner_clock_stmt: %s", sql);
763 :
764 141 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->meta_winner_clock_stmt, NULL);
765 141 : cloudsync_memory_free(sql);
766 141 : if (rc != SQLITE_OK) goto cleanup;
767 :
768 141 : sql = cloudsync_memory_mprintf("DELETE FROM \"%w_cloudsync\" WHERE pk=? AND col_name!='%s';", table->name, CLOUDSYNC_TOMBSTONE_VALUE);
769 141 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
770 : DEBUG_SQL("meta_merge_delete_drop: %s", sql);
771 :
772 141 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->meta_merge_delete_drop, NULL);
773 141 : cloudsync_memory_free(sql);
774 141 : if (rc != SQLITE_OK) goto cleanup;
775 :
776 : // zero clock
777 141 : sql = cloudsync_memory_mprintf("UPDATE \"%w_cloudsync\" SET col_version = 0, db_version = cloudsync_db_version_next(?) WHERE pk=? AND col_name!='%s';", table->name, CLOUDSYNC_TOMBSTONE_VALUE);
778 141 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
779 : DEBUG_SQL("meta_zero_clock_stmt: %s", sql);
780 :
781 141 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->meta_zero_clock_stmt, NULL);
782 141 : cloudsync_memory_free(sql);
783 141 : if (rc != SQLITE_OK) goto cleanup;
784 :
785 : // col_version
786 141 : sql = cloudsync_memory_mprintf("SELECT col_version FROM \"%w_cloudsync\" WHERE pk=? AND col_name=?;", table->name);
787 141 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
788 : DEBUG_SQL("meta_col_version_stmt: %s", sql);
789 :
790 141 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->meta_col_version_stmt, NULL);
791 141 : cloudsync_memory_free(sql);
792 141 : if (rc != SQLITE_OK) goto cleanup;
793 :
794 : // site_id
795 141 : sql = cloudsync_memory_mprintf("SELECT site_id FROM \"%w_cloudsync\" WHERE pk=? AND col_name=?;", table->name);
796 141 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
797 : DEBUG_SQL("meta_site_id_stmt: %s", sql);
798 :
799 141 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->meta_site_id_stmt, NULL);
800 141 : cloudsync_memory_free(sql);
801 141 : if (rc != SQLITE_OK) goto cleanup;
802 :
803 : // REAL TABLE statements
804 :
805 : // precompile the get column value statement
806 141 : if (ncols > 0) {
807 106 : sql = table_build_values_sql(db, table);
808 106 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
809 : DEBUG_SQL("real_col_values_stmt: %s", sql);
810 :
811 106 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->real_col_values_stmt, NULL);
812 106 : cloudsync_memory_free(sql);
813 106 : if (rc != SQLITE_OK) goto cleanup;
814 106 : }
815 :
816 141 : sql = table_build_mergedelete_sql(db, table);
817 141 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
818 : DEBUG_SQL("real_merge_delete: %s", sql);
819 :
820 141 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->real_merge_delete_stmt, NULL);
821 141 : cloudsync_memory_free(sql);
822 141 : if (rc != SQLITE_OK) goto cleanup;
823 :
824 141 : sql = table_build_mergeinsert_sql(db, table, NULL);
825 141 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
826 : DEBUG_SQL("real_merge_sentinel: %s", sql);
827 :
828 141 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->real_merge_sentinel_stmt, NULL);
829 141 : cloudsync_memory_free(sql);
830 141 : if (rc != SQLITE_OK) goto cleanup;
831 :
832 : cleanup:
833 141 : if (rc != SQLITE_OK) printf("table_add_stmts error: %s\n", sqlite3_errmsg(db));
834 141 : return rc;
835 : }
836 :
837 149016 : cloudsync_table_context *table_lookup (cloudsync_context *data, const char *table_name) {
838 : DEBUG_DBFUNCTION("table_lookup %s", table_name);
839 :
840 150026 : for (int i=0; i<data->tables_count; ++i) {
841 149865 : const char *name = (data->tables[i]) ? data->tables[i]->name : NULL;
842 149865 : if ((name) && (strcasecmp(name, table_name) == 0)) {
843 148855 : return data->tables[i];
844 : }
845 1010 : }
846 :
847 161 : return NULL;
848 149016 : }
849 :
850 137545 : sqlite3_stmt *table_column_lookup (cloudsync_table_context *table, const char *col_name, bool is_merge, int *index) {
851 : DEBUG_DBFUNCTION("table_column_lookup %s", col_name);
852 :
853 260313 : for (int i=0; i<table->ncols; ++i) {
854 260313 : if (strcasecmp(table->col_name[i], col_name) == 0) {
855 137545 : if (index) *index = i;
856 137545 : return (is_merge) ? table->col_merge_stmt[i] : table->col_value_stmt[i];
857 : }
858 122768 : }
859 :
860 0 : if (index) *index = -1;
861 0 : return NULL;
862 137545 : }
863 :
864 29 : int table_remove (cloudsync_context *data, const char *table_name) {
865 : DEBUG_DBFUNCTION("table_remove %s", table_name);
866 :
867 52 : for (int i=0; i<data->tables_count; ++i) {
868 52 : const char *name = (data->tables[i]) ? data->tables[i]->name : NULL;
869 52 : if ((name) && (strcasecmp(name, table_name) == 0)) {
870 29 : data->tables[i] = NULL;
871 29 : return i;
872 : }
873 23 : }
874 0 : return -1;
875 29 : }
876 :
877 894 : int table_add_to_context_cb (void *xdata, int ncols, char **values, char **names) {
878 894 : cloudsync_table_context *table = (cloudsync_table_context *)xdata;
879 :
880 894 : sqlite3 *db = sqlite3_db_handle(table->meta_pkexists_stmt);
881 894 : if (!db) return SQLITE_ERROR;
882 :
883 894 : int index = table->ncols;
884 1788 : for (int i=0; i<ncols; i+=2) {
885 894 : const char *name = values[i];
886 894 : int cid = (int)strtol(values[i+1], NULL, 0);
887 :
888 894 : table->col_id[index] = cid;
889 894 : table->col_name[index] = cloudsync_string_dup(name, true);
890 894 : if (!table->col_name[index]) return 1;
891 :
892 894 : char *sql = table_build_mergeinsert_sql(db, table, name);
893 894 : if (!sql) return SQLITE_NOMEM;
894 : DEBUG_SQL("col_merge_stmt[%d]: %s", index, sql);
895 :
896 894 : int rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->col_merge_stmt[index], NULL);
897 894 : cloudsync_memory_free(sql);
898 894 : if (rc != SQLITE_OK) return rc;
899 894 : if (!table->col_merge_stmt[index]) return SQLITE_MISUSE;
900 :
901 894 : sql = table_build_value_sql(db, table, name);
902 894 : if (!sql) return SQLITE_NOMEM;
903 : DEBUG_SQL("col_value_stmt[%d]: %s", index, sql);
904 :
905 894 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->col_value_stmt[index], NULL);
906 894 : cloudsync_memory_free(sql);
907 894 : if (rc != SQLITE_OK) return rc;
908 894 : if (!table->col_value_stmt[index]) return SQLITE_MISUSE;
909 894 : }
910 894 : table->ncols += 1;
911 :
912 894 : return 0;
913 894 : }
914 :
915 147 : bool table_add_to_context (sqlite3 *db, cloudsync_context *data, table_algo algo, const char *table_name) {
916 : DEBUG_DBFUNCTION("cloudsync_context_add_table %s", table_name);
917 :
918 : // check if table is already in the global context and in that case just return
919 147 : cloudsync_table_context *table = table_lookup(data, table_name);
920 147 : if (table) return true;
921 :
922 : // is there any space available?
923 141 : if (data->tables_alloc <= data->tables_count + 1) {
924 : // realloc tables
925 0 : cloudsync_table_context **clone = (cloudsync_table_context **)cloudsync_memory_realloc(data->tables, sizeof(cloudsync_table_context) * data->tables_alloc + CLOUDSYNC_INIT_NTABLES);
926 0 : if (!clone) goto abort_add_table;
927 :
928 : // reset new entries
929 0 : for (int i=data->tables_alloc; i<data->tables_alloc + CLOUDSYNC_INIT_NTABLES; ++i) {
930 0 : clone[i] = NULL;
931 0 : }
932 :
933 : // replace old ptr
934 0 : data->tables = clone;
935 0 : data->tables_alloc += CLOUDSYNC_INIT_NTABLES;
936 0 : }
937 :
938 : // setup a new table context
939 141 : table = table_create(table_name, algo);
940 141 : if (!table) return false;
941 :
942 : // fill remaining metadata in the table
943 141 : char *sql = cloudsync_memory_mprintf("SELECT count(*) FROM pragma_table_info('%q') WHERE pk>0;", table_name);
944 141 : if (!sql) goto abort_add_table;
945 141 : table->npks = (int)dbutils_int_select(db, sql);
946 141 : cloudsync_memory_free(sql);
947 141 : if (table->npks == -1) {
948 0 : dbutils_context_result_error(data->sqlite_ctx, "%s", sqlite3_errmsg(db));
949 0 : goto abort_add_table;
950 : }
951 :
952 141 : if (table->npks == 0) {
953 : #if CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
954 0 : return false;
955 : #else
956 : table->rowid_only = true;
957 : table->npks = 1; // rowid
958 : #endif
959 : }
960 :
961 141 : sql = cloudsync_memory_mprintf("SELECT count(*) FROM pragma_table_info('%q') WHERE pk=0;", table_name);
962 141 : if (!sql) goto abort_add_table;
963 141 : int64_t ncols = (int64_t)dbutils_int_select(db, sql);
964 141 : cloudsync_memory_free(sql);
965 141 : if (ncols == -1) {
966 0 : dbutils_context_result_error(data->sqlite_ctx, "%s", sqlite3_errmsg(db));
967 0 : goto abort_add_table;
968 : }
969 :
970 141 : int rc = table_add_stmts(db, table, (int)ncols);
971 141 : if (rc != SQLITE_OK) goto abort_add_table;
972 :
973 : // a table with only pk(s) is totally legal
974 141 : if (ncols > 0) {
975 106 : table->col_name = (char **)cloudsync_memory_alloc((sqlite3_uint64)(sizeof(char *) * ncols));
976 106 : if (!table->col_name) goto abort_add_table;
977 :
978 106 : table->col_id = (int *)cloudsync_memory_alloc((sqlite3_uint64)(sizeof(int) * ncols));
979 106 : if (!table->col_id) goto abort_add_table;
980 :
981 106 : table->col_merge_stmt = (sqlite3_stmt **)cloudsync_memory_alloc((sqlite3_uint64)(sizeof(sqlite3_stmt *) * ncols));
982 106 : if (!table->col_merge_stmt) goto abort_add_table;
983 :
984 106 : table->col_value_stmt = (sqlite3_stmt **)cloudsync_memory_alloc((sqlite3_uint64)(sizeof(sqlite3_stmt *) * ncols));
985 106 : if (!table->col_value_stmt) goto abort_add_table;
986 :
987 106 : sql = cloudsync_memory_mprintf("SELECT name, cid FROM pragma_table_info('%q') WHERE pk=0 ORDER BY cid;", table_name);
988 106 : if (!sql) goto abort_add_table;
989 106 : int rc = sqlite3_exec(db, sql, table_add_to_context_cb, (void *)table, NULL);
990 106 : cloudsync_memory_free(sql);
991 106 : if (rc == SQLITE_ABORT) goto abort_add_table;
992 106 : }
993 :
994 : // lookup the first free slot
995 190 : for (int i=0; i<data->tables_alloc; ++i) {
996 190 : if (data->tables[i] == NULL) {
997 141 : data->tables[i] = table;
998 141 : if (i > data->tables_count - 1) ++data->tables_count;
999 141 : break;
1000 : }
1001 49 : }
1002 :
1003 141 : return true;
1004 :
1005 : abort_add_table:
1006 0 : table_free(table);
1007 0 : return false;
1008 147 : }
1009 :
1010 6 : bool table_remove_from_context (cloudsync_context *data, cloudsync_table_context *table) {
1011 6 : return (table_remove(data, table->name) != -1);
1012 : }
1013 :
1014 15209 : sqlite3_stmt *cloudsync_colvalue_stmt (sqlite3 *db, cloudsync_context *data, const char *tbl_name, bool *persistent) {
1015 15209 : sqlite3_stmt *vm = NULL;
1016 :
1017 15209 : cloudsync_table_context *table = table_lookup(data, tbl_name);
1018 15209 : if (table) {
1019 15209 : char *col_name = NULL;
1020 15209 : if (table->ncols > 0) {
1021 15088 : col_name = table->col_name[0];
1022 : // retrieve col_value precompiled statement
1023 15088 : vm = table_column_lookup(table, col_name, false, NULL);
1024 15088 : *persistent = true;
1025 15088 : } else {
1026 121 : char *sql = table_build_value_sql(db, table, "*");
1027 121 : sqlite3_prepare_v2(db, sql, -1, &vm, NULL);
1028 121 : cloudsync_memory_free(sql);
1029 121 : *persistent = false;
1030 : }
1031 15209 : }
1032 :
1033 15209 : return vm;
1034 : }
1035 :
1036 : // MARK: - Merge Insert -
1037 :
1038 37743 : sqlite3_int64 merge_get_local_cl (cloudsync_table_context *table, const char *pk, int pklen, const char **err) {
1039 37743 : sqlite3_stmt *vm = table->meta_local_cl_stmt;
1040 37743 : sqlite3_int64 result = -1;
1041 :
1042 37743 : int rc = sqlite3_bind_blob(vm, 1, (const void *)pk, pklen, SQLITE_STATIC);
1043 37743 : if (rc != SQLITE_OK) goto cleanup;
1044 :
1045 37743 : rc = sqlite3_bind_blob(vm, 2, (const void *)pk, pklen, SQLITE_STATIC);
1046 37743 : if (rc != SQLITE_OK) goto cleanup;
1047 :
1048 37743 : rc = sqlite3_step(vm);
1049 37743 : if (rc == SQLITE_ROW) result = sqlite3_column_int64(vm, 0);
1050 0 : else if (rc == SQLITE_DONE) result = 0;
1051 :
1052 : cleanup:
1053 37743 : if (result == -1) *err = sqlite3_errmsg(sqlite3_db_handle(vm));
1054 37743 : stmt_reset(vm);
1055 37743 : return result;
1056 : }
1057 :
1058 37429 : int merge_get_col_version (cloudsync_table_context *table, const char *col_name, const char *pk, int pklen, sqlite3_int64 *version, const char **err) {
1059 37429 : sqlite3_stmt *vm = table->meta_col_version_stmt;
1060 :
1061 37429 : int rc = sqlite3_bind_blob(vm, 1, (const void *)pk, pklen, SQLITE_STATIC);
1062 37429 : if (rc != SQLITE_OK) goto cleanup;
1063 :
1064 37429 : rc = sqlite3_bind_text(vm, 2, col_name, -1, SQLITE_STATIC);
1065 37429 : if (rc != SQLITE_OK) goto cleanup;
1066 :
1067 37429 : rc = sqlite3_step(vm);
1068 56418 : if (rc == SQLITE_ROW) {
1069 18989 : *version = sqlite3_column_int64(vm, 0);
1070 18989 : rc = SQLITE_OK;
1071 18989 : }
1072 :
1073 : cleanup:
1074 37429 : if ((rc != SQLITE_OK) && (rc != SQLITE_DONE)) *err = sqlite3_errmsg(sqlite3_db_handle(vm));
1075 37429 : stmt_reset(vm);
1076 37429 : return rc;
1077 : }
1078 :
1079 21702 : int merge_set_winner_clock (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pk_len, const char *colname, sqlite3_int64 col_version, sqlite3_int64 db_version, const char *site_id, int site_len, sqlite3_int64 seq, sqlite3_int64 *rowid, const char **err) {
1080 :
1081 : // get/set site_id
1082 21702 : sqlite3_stmt *vm = data->getset_siteid_stmt;
1083 21702 : int rc = sqlite3_bind_blob(vm, 1, (const void *)site_id, site_len, SQLITE_STATIC);
1084 21702 : if (rc != SQLITE_OK) goto cleanup_merge;
1085 :
1086 21702 : rc = sqlite3_step(vm);
1087 21702 : if (rc != SQLITE_ROW) goto cleanup_merge;
1088 :
1089 21702 : int64_t ord = sqlite3_column_int64(vm, 0);
1090 21702 : stmt_reset(vm);
1091 :
1092 21702 : vm = table->meta_winner_clock_stmt;
1093 21702 : rc = sqlite3_bind_blob(vm, 1, (const void *)pk, pk_len, SQLITE_STATIC);
1094 21702 : if (rc != SQLITE_OK) goto cleanup_merge;
1095 :
1096 21702 : rc = sqlite3_bind_text(vm, 2, (colname) ? colname : CLOUDSYNC_TOMBSTONE_VALUE, -1, SQLITE_STATIC);
1097 21702 : if (rc != SQLITE_OK) goto cleanup_merge;
1098 :
1099 21702 : rc = sqlite3_bind_int64(vm, 3, col_version);
1100 21702 : if (rc != SQLITE_OK) goto cleanup_merge;
1101 :
1102 21702 : rc = sqlite3_bind_int64(vm, 4, db_version);
1103 21702 : if (rc != SQLITE_OK) goto cleanup_merge;
1104 :
1105 21702 : rc = sqlite3_bind_int64(vm, 5, seq);
1106 21702 : if (rc != SQLITE_OK) goto cleanup_merge;
1107 :
1108 21702 : rc = sqlite3_bind_int64(vm, 6, ord);
1109 21702 : if (rc != SQLITE_OK) goto cleanup_merge;
1110 :
1111 21702 : rc = sqlite3_step(vm);
1112 43404 : if (rc == SQLITE_ROW) {
1113 21702 : *rowid = sqlite3_column_int64(vm, 0);
1114 21702 : rc = SQLITE_OK;
1115 21702 : }
1116 :
1117 : cleanup_merge:
1118 21702 : if (rc != SQLITE_OK) *err = sqlite3_errmsg(sqlite3_db_handle(vm));
1119 21702 : stmt_reset(vm);
1120 21702 : return rc;
1121 : }
1122 :
1123 21639 : int merge_insert_col (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, const char *col_name, sqlite3_value *col_value, sqlite3_int64 col_version, sqlite3_int64 db_version, const char *site_id, int site_len, sqlite3_int64 seq, sqlite3_int64 *rowid, const char **err) {
1124 : int index;
1125 21639 : sqlite3_stmt *vm = table_column_lookup(table, col_name, true, &index);
1126 21639 : if (vm == NULL) {
1127 0 : *err = "Unable to retrieve column merge precompiled statement in merge_insert_col.";
1128 0 : return SQLITE_MISUSE;
1129 : }
1130 :
1131 : // INSERT INTO table (pk1, pk2, col_name) VALUES (?, ?, ?) ON CONFLICT DO UPDATE SET col_name=?;"
1132 :
1133 : // bind primary key(s)
1134 21639 : int rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, vm);
1135 21639 : if (rc < 0) {
1136 0 : *err = sqlite3_errmsg(sqlite3_db_handle(vm));
1137 0 : rc = sqlite3_errcode(sqlite3_db_handle(vm));
1138 0 : stmt_reset(vm);
1139 0 : return rc;
1140 : }
1141 :
1142 : // bind value
1143 21639 : if (col_value) {
1144 21639 : rc = sqlite3_bind_value(vm, table->npks+1, col_value);
1145 21639 : if (rc == SQLITE_OK) rc = sqlite3_bind_value(vm, table->npks+2, col_value);
1146 21639 : if (rc != SQLITE_OK) {
1147 0 : *err = sqlite3_errmsg(sqlite3_db_handle(vm));
1148 0 : stmt_reset(vm);
1149 0 : return rc;
1150 : }
1151 :
1152 21639 : }
1153 :
1154 : // perform real operation and disable triggers
1155 :
1156 : // in case of GOS we reused the table->col_merge_stmt statement
1157 : // which looks like: INSERT INTO table (pk1, pk2, col_name) VALUES (?, ?, ?) ON CONFLICT DO UPDATE SET col_name=?;"
1158 : // but the UPDATE in the CONFLICT statement would return SQLITE_CONSTRAINT because the trigger raises the error
1159 : // the trick is to disable that trigger before executing the statement
1160 21639 : if (table->algo == table_algo_crdt_gos) table->enabled = 0;
1161 21639 : SYNCBIT_SET(data);
1162 21639 : rc = sqlite3_step(vm);
1163 : DEBUG_MERGE("merge_insert(%02x%02x): %s (%d)", data->site_id[UUID_LEN-2], data->site_id[UUID_LEN-1], sqlite3_expanded_sql(vm), rc);
1164 21639 : stmt_reset(vm);
1165 21639 : SYNCBIT_RESET(data);
1166 21639 : if (table->algo == table_algo_crdt_gos) table->enabled = 1;
1167 :
1168 21639 : if (rc != SQLITE_DONE) {
1169 0 : *err = sqlite3_errmsg(sqlite3_db_handle(vm));
1170 0 : return rc;
1171 : }
1172 :
1173 21639 : return merge_set_winner_clock(data, table, pk, pklen, col_name, col_version, db_version, site_id, site_len, seq, rowid, err);
1174 21639 : }
1175 :
1176 35 : int merge_delete (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, const char *colname, sqlite3_int64 cl, sqlite3_int64 db_version, const char *site_id, int site_len, sqlite3_int64 seq, sqlite3_int64 *rowid, const char **err) {
1177 35 : int rc = SQLITE_OK;
1178 :
1179 : // reset return value
1180 35 : *rowid = 0;
1181 :
1182 : // bind pk
1183 35 : sqlite3_stmt *vm = table->real_merge_delete_stmt;
1184 35 : rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, vm);
1185 35 : if (rc < 0) {
1186 0 : *err = sqlite3_errmsg(sqlite3_db_handle(vm));
1187 0 : rc = sqlite3_errcode(sqlite3_db_handle(vm));
1188 0 : stmt_reset(vm);
1189 0 : return rc;
1190 : }
1191 :
1192 : // perform real operation and disable triggers
1193 35 : SYNCBIT_SET(data);
1194 35 : rc = sqlite3_step(vm);
1195 : DEBUG_MERGE("merge_delete(%02x%02x): %s (%d)", data->site_id[UUID_LEN-2], data->site_id[UUID_LEN-1], sqlite3_expanded_sql(vm), rc);
1196 35 : stmt_reset(vm);
1197 35 : SYNCBIT_RESET(data);
1198 35 : if (rc == SQLITE_DONE) rc = SQLITE_OK;
1199 35 : if (rc != SQLITE_OK) {
1200 0 : *err = sqlite3_errmsg(sqlite3_db_handle(vm));
1201 0 : return rc;
1202 : }
1203 :
1204 35 : rc = merge_set_winner_clock(data, table, pk, pklen, colname, cl, db_version, site_id, site_len, seq, rowid, err);
1205 35 : if (rc != SQLITE_OK) return rc;
1206 :
1207 : // drop clocks _after_ setting the winner clock so we don't lose track of the max db_version!!
1208 : // this must never come before `set_winner_clock`
1209 35 : vm = table->meta_merge_delete_drop;
1210 35 : rc = sqlite3_bind_blob(vm, 1, (const void *)pk, pklen, SQLITE_STATIC);
1211 35 : if (rc == SQLITE_OK) rc = sqlite3_step(vm);
1212 35 : stmt_reset(vm);
1213 35 : if (rc == SQLITE_DONE) rc = SQLITE_OK;
1214 35 : if (rc != SQLITE_OK) {
1215 0 : *err = sqlite3_errmsg(sqlite3_db_handle(vm));
1216 0 : }
1217 :
1218 35 : return rc;
1219 35 : }
1220 :
1221 28 : int merge_zeroclock_on_resurrect(cloudsync_table_context *table, sqlite3_int64 db_version, const char *pk, int pklen, const char **err) {
1222 28 : sqlite3_stmt *vm = table->meta_zero_clock_stmt;
1223 :
1224 28 : int rc = sqlite3_bind_int64(vm, 1, db_version);
1225 28 : if (rc != SQLITE_OK) goto cleanup;
1226 :
1227 28 : rc = sqlite3_bind_blob(vm, 2, (const void *)pk, pklen, SQLITE_STATIC);
1228 28 : if (rc != SQLITE_OK) goto cleanup;
1229 :
1230 28 : rc = sqlite3_step(vm);
1231 28 : if (rc == SQLITE_DONE) rc = SQLITE_OK;
1232 :
1233 : cleanup:
1234 28 : if (rc != SQLITE_OK) *err = sqlite3_errmsg(sqlite3_db_handle(vm));
1235 28 : stmt_reset(vm);
1236 28 : return rc;
1237 : }
1238 :
1239 : // executed only if insert_cl == local_cl
1240 37429 : int merge_did_cid_win (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, sqlite3_value *insert_value, const char *site_id, int site_len, const char *col_name, sqlite3_int64 col_version, bool *didwin_flag, const char **err) {
1241 :
1242 37429 : if (col_name == NULL) col_name = CLOUDSYNC_TOMBSTONE_VALUE;
1243 :
1244 : sqlite3_int64 local_version;
1245 37429 : int rc = merge_get_col_version(table, col_name, pk, pklen, &local_version, err);
1246 37429 : if (rc == SQLITE_DONE) {
1247 : // no rows returned, the incoming change wins if there's nothing there locally
1248 18440 : *didwin_flag = true;
1249 18440 : return SQLITE_OK;
1250 : }
1251 18989 : if (rc != SQLITE_OK) return rc;
1252 :
1253 : // rc == SQLITE_OK, means that a row with a version exists
1254 18989 : if (local_version != col_version) {
1255 40 : if (col_version > local_version) {*didwin_flag = true; return SQLITE_OK;}
1256 8 : if (col_version < local_version) {*didwin_flag = false; return SQLITE_OK;}
1257 0 : }
1258 :
1259 : // rc == SQLITE_ROW and col_version == local_version, need to compare values
1260 :
1261 : // retrieve col_value precompiled statement
1262 18949 : sqlite3_stmt *vm = table_column_lookup(table, col_name, false, NULL);
1263 18949 : if (!vm) {
1264 0 : *err = "Unable to retrieve column value precompiled statement in merge_did_cid_win.";
1265 0 : return SQLITE_ERROR;
1266 : }
1267 :
1268 : // bind primary key values
1269 18949 : rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, (void *)vm);
1270 18949 : if (rc < 0) {
1271 0 : *err = sqlite3_errmsg(sqlite3_db_handle(vm));
1272 0 : rc = sqlite3_errcode(sqlite3_db_handle(vm));
1273 0 : stmt_reset(vm);
1274 0 : return rc;
1275 : }
1276 :
1277 : // execute vm
1278 : sqlite3_value *local_value;
1279 18949 : rc = sqlite3_step(vm);
1280 18949 : if (rc == SQLITE_DONE) {
1281 : // meta entry exists but the actual value is missing
1282 : // we should allow the value_compare function to make a decision
1283 : // value_compare has been modified to handle the case where lvalue is NULL
1284 0 : local_value = NULL;
1285 0 : rc = SQLITE_OK;
1286 18949 : } else if (rc == SQLITE_ROW) {
1287 18949 : local_value = sqlite3_column_value(vm, 0);
1288 18949 : rc = SQLITE_OK;
1289 18949 : } else {
1290 0 : goto cleanup;
1291 : }
1292 :
1293 : // compare values
1294 18949 : int ret = dbutils_value_compare(insert_value, local_value);
1295 : // reset after compare, otherwise local value would be deallocated
1296 18949 : vm = stmt_reset(vm);
1297 :
1298 18949 : bool compare_site_id = (ret == 0 && data->merge_equal_values == true);
1299 18949 : if (!compare_site_id) {
1300 18949 : *didwin_flag = (ret > 0);
1301 18949 : goto cleanup;
1302 : }
1303 :
1304 : // values are the same and merge_equal_values is true
1305 0 : vm = table->meta_site_id_stmt;
1306 0 : rc = sqlite3_bind_blob(vm, 1, (const void *)pk, pklen, SQLITE_STATIC);
1307 0 : if (rc != SQLITE_OK) goto cleanup;
1308 :
1309 0 : rc = sqlite3_bind_text(vm, 2, col_name, -1, SQLITE_STATIC);
1310 0 : if (rc != SQLITE_OK) goto cleanup;
1311 :
1312 0 : rc = sqlite3_step(vm);
1313 0 : if (rc == SQLITE_ROW) {
1314 0 : const void *local_site_id = sqlite3_column_blob(vm, 0);
1315 0 : ret = memcmp(site_id, local_site_id, site_len);
1316 0 : *didwin_flag = (ret > 0);
1317 0 : stmt_reset(vm);
1318 0 : return SQLITE_OK;
1319 : }
1320 :
1321 : // handle error condition here
1322 0 : stmt_reset(vm);
1323 0 : *err = "Unable to find site_id for previous change. The cloudsync table is probably corrupted.";
1324 0 : return SQLITE_ERROR;
1325 :
1326 : cleanup:
1327 18949 : if (rc != SQLITE_OK) *err = sqlite3_errmsg(sqlite3_db_handle(vm));
1328 18949 : if (vm) stmt_reset(vm);
1329 18949 : return rc;
1330 37429 : }
1331 :
1332 28 : int merge_sentinel_only_insert (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, sqlite3_int64 cl, sqlite3_int64 db_version, const char *site_id, int site_len, sqlite3_int64 seq, sqlite3_int64 *rowid, const char **err) {
1333 :
1334 : // reset return value
1335 28 : *rowid = 0;
1336 :
1337 : // bind pk
1338 28 : sqlite3_stmt *vm = table->real_merge_sentinel_stmt;
1339 28 : int rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, vm);
1340 28 : if (rc < 0) {
1341 0 : *err = sqlite3_errmsg(sqlite3_db_handle(vm));
1342 0 : rc = sqlite3_errcode(sqlite3_db_handle(vm));
1343 0 : stmt_reset(vm);
1344 0 : return rc;
1345 : }
1346 :
1347 : // perform real operation and disable triggers
1348 28 : SYNCBIT_SET(data);
1349 28 : rc = sqlite3_step(vm);
1350 28 : stmt_reset(vm);
1351 28 : SYNCBIT_RESET(data);
1352 28 : if (rc == SQLITE_DONE) rc = SQLITE_OK;
1353 28 : if (rc != SQLITE_OK) {
1354 0 : *err = sqlite3_errmsg(sqlite3_db_handle(vm));
1355 0 : return rc;
1356 : }
1357 :
1358 28 : rc = merge_zeroclock_on_resurrect(table, db_version, pk, pklen, err);
1359 28 : if (rc != SQLITE_OK) return rc;
1360 :
1361 28 : return merge_set_winner_clock(data, table, pk, pklen, NULL, cl, db_version, site_id, site_len, seq, rowid, err);
1362 28 : }
1363 :
1364 3150 : int cloudsync_merge_insert_gos (sqlite3_vtab *vtab, cloudsync_context *data, cloudsync_table_context *table, const char *insert_pk, int insert_pk_len, const char *insert_name, sqlite3_value *insert_value, sqlite3_int64 insert_col_version, sqlite3_int64 insert_db_version, const char *insert_site_id, int insert_site_id_len, sqlite3_int64 insert_seq, sqlite3_int64 *rowid) {
1365 : // Grow-Only Set (GOS) Algorithm: Only insertions are allowed, deletions and updates are prevented from a trigger.
1366 :
1367 3150 : const char *err = NULL;
1368 6300 : int rc = merge_insert_col(data, table, insert_pk, insert_pk_len, insert_name, insert_value, insert_col_version, insert_db_version,
1369 3150 : insert_site_id, insert_site_id_len, insert_seq, rowid, &err);
1370 3150 : if (rc != SQLITE_OK) {
1371 0 : cloudsync_vtab_set_error(vtab, "Unable to perform GOS merge_insert_col: %s", err);
1372 0 : }
1373 :
1374 3150 : return rc;
1375 : }
1376 :
1377 40893 : int cloudsync_merge_insert (sqlite3_vtab *vtab, int argc, sqlite3_value **argv, sqlite3_int64 *rowid) {
1378 : // this function performs the merging logic for an insert in a cloud-synchronized table. It handles
1379 : // different scenarios including conflicts, causal lengths, delete operations, and resurrecting rows
1380 : // based on the incoming data (from remote nodes or clients) and the local database state
1381 :
1382 : // this function handles different CRDT algorithms (GOS, DWS, AWS, and CLS).
1383 : // the merging strategy is determined based on the table->algo value.
1384 :
1385 : // meta table declaration:
1386 : // tbl TEXT NOT NULL, pk BLOB NOT NULL, col_name TEXT NOT NULL,"
1387 : // "col_value ANY, col_version INTEGER NOT NULL, db_version INTEGER NOT NULL,"
1388 : // "site_id BLOB NOT NULL, cl INTEGER NOT NULL, seq INTEGER NOT NULL
1389 :
1390 : // meta information to retrieve from arguments:
1391 : // argv[0] -> table name (TEXT)
1392 : // argv[1] -> primary key (BLOB)
1393 : // argv[2] -> column name (TEXT or NULL if sentinel)
1394 : // argv[3] -> column value (ANY)
1395 : // argv[4] -> column version (INTEGER)
1396 : // argv[5] -> database version (INTEGER)
1397 : // argv[6] -> site ID (BLOB, identifies the origin of the update)
1398 : // argv[7] -> causal length (INTEGER, tracks the order of operations)
1399 : // argv[8] -> sequence number (INTEGER, unique per operation)
1400 :
1401 : // extract table name
1402 40893 : const char *insert_tbl = (const char *)sqlite3_value_text(argv[0]);
1403 :
1404 : // lookup table
1405 40893 : cloudsync_context *data = cloudsync_vtab_get_context(vtab);
1406 40893 : cloudsync_table_context *table = table_lookup(data, insert_tbl);
1407 40893 : if (!table) return cloudsync_vtab_set_error(vtab, "Unable to find table %s,", insert_tbl);
1408 :
1409 : // extract the remaining fields from the input values
1410 40893 : const char *insert_pk = (const char *)sqlite3_value_blob(argv[1]);
1411 40893 : int insert_pk_len = sqlite3_value_bytes(argv[1]);
1412 40893 : const char *insert_name = (sqlite3_value_type(argv[2]) == SQLITE_NULL) ? CLOUDSYNC_TOMBSTONE_VALUE : (const char *)sqlite3_value_text(argv[2]);
1413 40893 : sqlite3_value *insert_value = argv[3];
1414 40893 : sqlite3_int64 insert_col_version = sqlite3_value_int64(argv[4]);
1415 40893 : sqlite3_int64 insert_db_version = sqlite3_value_int64(argv[5]);
1416 40893 : const char *insert_site_id = (const char *)sqlite3_value_blob(argv[6]);
1417 40893 : int insert_site_id_len = sqlite3_value_bytes(argv[6]);
1418 40893 : sqlite3_int64 insert_cl = sqlite3_value_int64(argv[7]);
1419 40893 : sqlite3_int64 insert_seq = sqlite3_value_int64(argv[8]);
1420 40893 : const char *err = NULL;
1421 :
1422 : // perform different logic for each different table algorithm
1423 40893 : if (table->algo == table_algo_crdt_gos) return cloudsync_merge_insert_gos(vtab, data, table, insert_pk, insert_pk_len, insert_name, insert_value, insert_col_version, insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid);
1424 :
1425 : // Handle DWS and AWS algorithms here
1426 : // Delete-Wins Set (DWS): table_algo_crdt_dws
1427 : // Add-Wins Set (AWS): table_algo_crdt_aws
1428 :
1429 : // Causal-Length Set (CLS) Algorithm (default)
1430 :
1431 : // compute the local causal length for the row based on the primary key
1432 : // the causal length is used to determine the order of operations and resolve conflicts.
1433 37743 : sqlite3_int64 local_cl = merge_get_local_cl(table, insert_pk, insert_pk_len, &err);
1434 37743 : if (local_cl < 0) {
1435 0 : return cloudsync_vtab_set_error(vtab, "Unable to compute local causal length: %s", err);
1436 : }
1437 :
1438 : // if the incoming causal length is older than the local causal length, we can safely ignore it
1439 : // because the local changes are more recent
1440 37743 : if (insert_cl < local_cl) return SQLITE_OK;
1441 :
1442 : // check if the operation is a delete by examining the causal length
1443 : // even causal lengths typically signify delete operations
1444 37714 : bool is_delete = (insert_cl % 2 == 0);
1445 37714 : if (is_delete) {
1446 : // if it's a delete, check if the local state is at the same causal length
1447 : // if it is, no further action is needed
1448 127 : if (local_cl == insert_cl) return SQLITE_OK;
1449 :
1450 : // perform a delete merge if the causal length is newer than the local one
1451 70 : int rc = merge_delete(data, table, insert_pk, insert_pk_len, insert_name, insert_col_version,
1452 35 : insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid, &err);
1453 35 : if (rc != SQLITE_OK) cloudsync_vtab_set_error(vtab, "Unable to perform merge_delete: %s", err);
1454 35 : return rc;
1455 : }
1456 :
1457 : // if the operation is a sentinel-only insert (indicating a new row or resurrected row with no column update), handle it separately.
1458 37587 : bool is_sentinel_only = (strcmp(insert_name, CLOUDSYNC_TOMBSTONE_VALUE) == 0);
1459 37587 : if (is_sentinel_only) {
1460 158 : if (local_cl == insert_cl) return SQLITE_OK;
1461 :
1462 : // perform a sentinel-only insert to track the existence of the row
1463 56 : int rc = merge_sentinel_only_insert(data, table, insert_pk, insert_pk_len, insert_col_version,
1464 28 : insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid, &err);
1465 28 : if (rc != SQLITE_OK) cloudsync_vtab_set_error(vtab, "Unable to perform merge_sentinel_only_insert: %s", err);
1466 28 : return rc;
1467 : }
1468 :
1469 : // from this point I can be sure that insert_name is not sentinel
1470 :
1471 : // handle the case where a row is being resurrected (e.g., after a delete, a new insert for the same row)
1472 : // odd causal lengths can "resurrect" rows
1473 37429 : bool needs_resurrect = (insert_cl > local_cl && insert_cl % 2 == 1);
1474 37429 : bool row_exists_locally = local_cl != 0;
1475 :
1476 : // if a resurrection is needed, insert a sentinel to mark the row as alive
1477 : // this handles out-of-order deliveries where the row was deleted and is now being re-inserted
1478 37429 : if (needs_resurrect && (row_exists_locally || (!row_exists_locally && insert_cl > 1))) {
1479 0 : int rc = merge_sentinel_only_insert(data, table, insert_pk, insert_pk_len, insert_cl,
1480 0 : insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid, &err);
1481 0 : if (rc != SQLITE_OK) {
1482 0 : cloudsync_vtab_set_error(vtab, "Unable to perform merge_sentinel_only_insert: %s", err);
1483 0 : return rc;
1484 : }
1485 0 : }
1486 :
1487 : // at this point, we determine whether the incoming change wins based on causal length
1488 : // this can be due to a resurrection, a non-existent local row, or a conflict resolution
1489 37429 : bool flag = false;
1490 37429 : int rc = merge_did_cid_win(data, table, insert_pk, insert_pk_len, insert_value, insert_site_id, insert_site_id_len, insert_name, insert_col_version, &flag, &err);
1491 37429 : if (rc != SQLITE_OK) {
1492 0 : cloudsync_vtab_set_error(vtab, "Unable to perform merge_did_cid_win: %s", err);
1493 0 : return rc;
1494 : }
1495 :
1496 : // check if the incoming change wins and should be applied
1497 37429 : bool does_cid_win = ((needs_resurrect) || (!row_exists_locally) || (flag));
1498 37429 : if (!does_cid_win) return SQLITE_OK;
1499 :
1500 : // perform the final column insert or update if the incoming change wins
1501 18489 : rc = merge_insert_col(data, table, insert_pk, insert_pk_len, insert_name, insert_value, insert_col_version, insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid, &err);
1502 18489 : if (rc != SQLITE_OK) cloudsync_vtab_set_error(vtab, "Unable to perform merge_insert_col: %s", err);
1503 18489 : return rc;
1504 40893 : }
1505 :
1506 : // MARK: - Private -
1507 :
1508 86 : bool cloudsync_config_exists (sqlite3 *db) {
1509 86 : return dbutils_table_exists(db, CLOUDSYNC_SITEID_NAME) == true;
1510 : }
1511 :
1512 86 : void *cloudsync_context_create (void) {
1513 86 : cloudsync_context *data = (cloudsync_context *)cloudsync_memory_zeroalloc((uint64_t)(sizeof(cloudsync_context)));
1514 : DEBUG_SETTINGS("cloudsync_context_create %p", data);
1515 :
1516 86 : data->libversion = CLOUDSYNC_VERSION;
1517 86 : data->pending_db_version = CLOUDSYNC_VALUE_NOTSET;
1518 : #if CLOUDSYNC_DEBUG
1519 : data->debug = 1;
1520 : #endif
1521 :
1522 : // allocate space for 128 tables (it can grow if needed)
1523 86 : data->tables = (cloudsync_table_context **)cloudsync_memory_zeroalloc((uint64_t)(CLOUDSYNC_INIT_NTABLES * sizeof(cloudsync_table_context *)));
1524 86 : if (!data->tables) {
1525 0 : cloudsync_memory_free(data);
1526 0 : return NULL;
1527 : }
1528 86 : data->tables_alloc = CLOUDSYNC_INIT_NTABLES;
1529 86 : data->tables_count = 0;
1530 :
1531 86 : return data;
1532 86 : }
1533 :
1534 86 : void cloudsync_context_free (void *ptr) {
1535 : DEBUG_SETTINGS("cloudsync_context_free %p", ptr);
1536 86 : if (!ptr) return;
1537 :
1538 86 : cloudsync_context *data = (cloudsync_context*)ptr;
1539 86 : cloudsync_memory_free(data->tables);
1540 86 : cloudsync_memory_free(data);
1541 86 : }
1542 :
1543 201 : const char *cloudsync_context_init (sqlite3 *db, cloudsync_context *data, sqlite3_context *context) {
1544 201 : if (!data && context) data = (cloudsync_context *)sqlite3_user_data(context);
1545 :
1546 : // perform init just the first time, if the site_id field is not set.
1547 : // The data->site_id value could exists while settings tables don't exists if the
1548 : // cloudsync_context_init was previously called in init transaction that was rolled back
1549 : // because of an error during the init process.
1550 201 : if (data->site_id[0] == 0 || !dbutils_table_exists(db, CLOUDSYNC_SITEID_NAME)) {
1551 89 : if (dbutils_settings_init(db, data, context) != SQLITE_OK) return NULL;
1552 89 : if (stmts_add_tocontext(db, data) != SQLITE_OK) return NULL;
1553 89 : if (cloudsync_load_siteid(db, data) != SQLITE_OK) return NULL;
1554 :
1555 89 : data->sqlite_ctx = context;
1556 89 : data->schema_hash = dbutils_schema_hash(db);
1557 89 : }
1558 :
1559 201 : return (const char *)data->site_id;
1560 201 : }
1561 :
1562 404 : void cloudsync_sync_key(cloudsync_context *data, const char *key, const char *value) {
1563 : DEBUG_SETTINGS("cloudsync_sync_key key: %s value: %s", key, value);
1564 :
1565 : // sync data
1566 404 : if (strcmp(key, CLOUDSYNC_KEY_SCHEMAVERSION) == 0) {
1567 89 : data->schema_version = (int)strtol(value, NULL, 0);
1568 89 : return;
1569 : }
1570 :
1571 315 : if (strcmp(key, CLOUDSYNC_KEY_DEBUG) == 0) {
1572 0 : data->debug = 0;
1573 0 : if (value && (value[0] != 0) && (value[0] != '0')) data->debug = 1;
1574 0 : return;
1575 : }
1576 404 : }
1577 :
1578 : #if 0
1579 : void cloudsync_sync_table_key(cloudsync_context *data, const char *table, const char *column, const char *key, const char *value) {
1580 : DEBUG_SETTINGS("cloudsync_sync_table_key table: %s column: %s key: %s value: %s", table, column, key, value);
1581 : // Unused in this version
1582 : return;
1583 : }
1584 : #endif
1585 :
1586 1460 : int cloudsync_commit_hook (void *ctx) {
1587 1460 : cloudsync_context *data = (cloudsync_context *)ctx;
1588 :
1589 1460 : data->db_version = data->pending_db_version;
1590 1460 : data->pending_db_version = CLOUDSYNC_VALUE_NOTSET;
1591 1460 : data->seq = 0;
1592 :
1593 1460 : return SQLITE_OK;
1594 : }
1595 :
1596 2 : void cloudsync_rollback_hook (void *ctx) {
1597 2 : cloudsync_context *data = (cloudsync_context *)ctx;
1598 :
1599 2 : data->pending_db_version = CLOUDSYNC_VALUE_NOTSET;
1600 2 : data->seq = 0;
1601 2 : }
1602 :
1603 23 : int cloudsync_finalize_alter (sqlite3_context *context, cloudsync_context *data, cloudsync_table_context *table) {
1604 23 : int rc = SQLITE_OK;
1605 23 : sqlite3 *db = sqlite3_context_db_handle(context);
1606 :
1607 23 : db_version_check_uptodate(db, data);
1608 :
1609 : // If primary key columns change (in the schema)
1610 : // We need to drop, re-create and backfill
1611 : // the clock table.
1612 : // A change in pk columns means a change in all identities
1613 : // of all rows.
1614 : // We can determine this by comparing unique index on lookaside table vs
1615 : // pks on source table
1616 23 : char *errmsg = NULL;
1617 23 : char **result = NULL;
1618 : int nrows, ncols;
1619 23 : char *sql = cloudsync_memory_mprintf("SELECT name FROM pragma_table_info('%q') WHERE pk>0 ORDER BY pk;", table->name);
1620 23 : rc = sqlite3_get_table(db, sql, &result, &nrows, &ncols, NULL);
1621 23 : cloudsync_memory_free(sql);
1622 23 : if (rc != SQLITE_OK) {
1623 0 : DEBUG_SQLITE_ERROR(rc, "cloudsync_finalize_alter", db);
1624 0 : goto finalize;
1625 23 : } else if (errmsg || ncols != 1) {
1626 0 : rc = SQLITE_MISUSE;
1627 0 : goto finalize;
1628 : }
1629 :
1630 23 : bool pk_diff = false;
1631 23 : if (nrows != table->npks) {
1632 6 : pk_diff = true;
1633 6 : } else {
1634 51 : for (int i=0; i<nrows; ++i) {
1635 34 : if (strcmp(table->pk_name[i], result[i]) != 0) {
1636 0 : pk_diff = true;
1637 0 : break;
1638 : }
1639 34 : }
1640 : }
1641 :
1642 23 : if (pk_diff) {
1643 : // drop meta-table, it will be recreated
1644 6 : char *sql = cloudsync_memory_mprintf("DROP TABLE IF EXISTS \"%w_cloudsync\";", table->name);
1645 6 : rc = sqlite3_exec(db, sql, NULL, NULL, NULL);
1646 6 : cloudsync_memory_free(sql);
1647 6 : if (rc != SQLITE_OK) {
1648 0 : DEBUG_SQLITE_ERROR(rc, "cloudsync_finalize_alter", db);
1649 0 : goto finalize;
1650 : }
1651 6 : } else {
1652 : // compact meta-table
1653 : // delete entries for removed columns
1654 17 : char *sql = cloudsync_memory_mprintf("DELETE FROM \"%w_cloudsync\" WHERE \"col_name\" NOT IN ("
1655 : "SELECT name FROM pragma_table_info('%q') UNION SELECT '%s'"
1656 17 : ")", table->name, table->name, CLOUDSYNC_TOMBSTONE_VALUE);
1657 17 : rc = sqlite3_exec(db, sql, NULL, NULL, NULL);
1658 17 : cloudsync_memory_free(sql);
1659 17 : if (rc != SQLITE_OK) {
1660 0 : DEBUG_SQLITE_ERROR(rc, "cloudsync_finalize_alter", db);
1661 0 : goto finalize;
1662 : }
1663 :
1664 17 : char *singlequote_escaped_table_name = cloudsync_memory_mprintf("%q", table->name);
1665 17 : sql = cloudsync_memory_mprintf("SELECT group_concat('\"%w\".\"' || format('%%w', name) || '\"', ',') FROM pragma_table_info('%s') WHERE pk>0 ORDER BY pk;", singlequote_escaped_table_name, singlequote_escaped_table_name);
1666 17 : cloudsync_memory_free(singlequote_escaped_table_name);
1667 17 : if (!sql) {
1668 0 : rc = SQLITE_NOMEM;
1669 0 : goto finalize;
1670 : }
1671 17 : char *pkclause = dbutils_text_select(db, sql);
1672 17 : char *pkvalues = (pkclause) ? pkclause : "rowid";
1673 17 : cloudsync_memory_free(sql);
1674 :
1675 : // delete entries related to rows that no longer exist in the original table, but preserve tombstone
1676 17 : sql = cloudsync_memory_mprintf("DELETE FROM \"%w_cloudsync\" WHERE (\"col_name\" != '%s' OR (\"col_name\" = '%s' AND col_version %% 2 != 0)) AND NOT EXISTS (SELECT 1 FROM \"%w\" WHERE \"%w_cloudsync\".pk = cloudsync_pk_encode(%s) LIMIT 1);", table->name, CLOUDSYNC_TOMBSTONE_VALUE, CLOUDSYNC_TOMBSTONE_VALUE, table->name, table->name, pkvalues);
1677 17 : rc = sqlite3_exec(db, sql, NULL, NULL, NULL);
1678 17 : if (pkclause) cloudsync_memory_free(pkclause);
1679 17 : cloudsync_memory_free(sql);
1680 17 : if (rc != SQLITE_OK) {
1681 0 : DEBUG_SQLITE_ERROR(rc, "cloudsync_finalize_alter", db);
1682 0 : goto finalize;
1683 : }
1684 :
1685 : }
1686 :
1687 : char buf[256];
1688 23 : snprintf(buf, sizeof(buf), "%lld", data->db_version);
1689 23 : dbutils_settings_set_key_value(db, context, "pre_alter_dbversion", buf);
1690 :
1691 : finalize:
1692 23 : sqlite3_free_table(result);
1693 23 : sqlite3_free(errmsg);
1694 :
1695 23 : return rc;
1696 : }
1697 :
1698 145 : int cloudsync_refill_metatable (sqlite3 *db, cloudsync_context *data, const char *table_name) {
1699 145 : cloudsync_table_context *table = table_lookup(data, table_name);
1700 145 : if (!table) return SQLITE_INTERNAL;
1701 :
1702 145 : sqlite3_stmt *vm = NULL;
1703 145 : sqlite3_int64 db_version = db_version_next(db, data, CLOUDSYNC_VALUE_NOTSET);
1704 :
1705 145 : char *sql = cloudsync_memory_mprintf("SELECT group_concat('\"' || format('%%w', name) || '\"', ',') FROM pragma_table_info('%q') WHERE pk>0 ORDER BY pk;", table_name);
1706 145 : char *pkclause_identifiers = dbutils_text_select(db, sql);
1707 145 : char *pkvalues_identifiers = (pkclause_identifiers) ? pkclause_identifiers : "rowid";
1708 145 : cloudsync_memory_free(sql);
1709 :
1710 145 : sql = cloudsync_memory_mprintf("SELECT group_concat('cloudsync_pk_decode(pk, ' || pk || ') AS ' || '\"' || format('%%w', name) || '\"', ',') FROM pragma_table_info('%q') WHERE pk>0 ORDER BY pk;", table_name);
1711 145 : char *pkdecode = dbutils_text_select(db, sql);
1712 145 : char *pkdecodeval = (pkdecode) ? pkdecode : "cloudsync_pk_decode(pk, 1) AS rowid";
1713 145 : cloudsync_memory_free(sql);
1714 :
1715 145 : sql = cloudsync_memory_mprintf("SELECT group_concat('\"' || format('%%w', name) || '\"' || ' = cloudsync_pk_decode(pk, ' || pk || ')', ' AND ') FROM pragma_table_info('%q') WHERE pk>0 ORDER BY pk;", table_name);
1716 145 : char *pkonclause = dbutils_text_select(db, sql);
1717 145 : char *pkonclauseval = (pkonclause) ? pkonclause : "rowid = cloudsync_pk_decode(pk, 1) AS rowid";
1718 145 : cloudsync_memory_free(sql);
1719 :
1720 145 : sql = cloudsync_memory_mprintf("SELECT cloudsync_insert('%q', %s) FROM (SELECT %s FROM \"%w\" EXCEPT SELECT %s FROM \"%w_cloudsync\");", table_name, pkvalues_identifiers, pkvalues_identifiers, table_name, pkdecodeval, table_name);
1721 145 : int rc = sqlite3_exec(db, sql, NULL, NULL, NULL);
1722 145 : cloudsync_memory_free(sql);
1723 145 : if (rc != SQLITE_OK) goto finalize;
1724 :
1725 : // fill missing colums
1726 : // for each non-pk column:
1727 :
1728 145 : sql = cloudsync_memory_mprintf("SELECT cloudsync_pk_encode(%s) FROM \"%w\" LEFT JOIN \"%w_cloudsync\" ON %s AND \"%w_cloudsync\".col_name = ? WHERE \"%w_cloudsync\".db_version IS NULL", pkvalues_identifiers, table_name, table_name, pkonclauseval, table_name, table_name);
1729 145 : rc = sqlite3_prepare(db, sql, -1, &vm, NULL);
1730 145 : cloudsync_memory_free(sql);
1731 145 : if (rc != SQLITE_OK) goto finalize;
1732 :
1733 1048 : for (int i=0; i<table->ncols; ++i) {
1734 903 : char *col_name = table->col_name[i];
1735 :
1736 903 : rc = sqlite3_bind_text(vm, 1, col_name, -1, SQLITE_STATIC);
1737 903 : if (rc != SQLITE_OK) goto finalize;
1738 :
1739 941 : while (1) {
1740 941 : rc = sqlite3_step(vm);
1741 941 : if (rc == SQLITE_ROW) {
1742 38 : const char *pk = (const char *)sqlite3_column_text(vm, 0);
1743 38 : size_t pklen = strlen(pk);
1744 38 : rc = local_mark_insert_or_update_meta(db, table, pk, pklen, col_name, db_version, BUMP_SEQ(data));
1745 941 : } else if (rc == SQLITE_DONE) {
1746 903 : rc = SQLITE_OK;
1747 903 : break;
1748 : } else {
1749 0 : break;
1750 : }
1751 : }
1752 903 : if (rc != SQLITE_OK) goto finalize;
1753 :
1754 903 : sqlite3_reset(vm);
1755 1048 : }
1756 :
1757 : finalize:
1758 145 : if (rc != SQLITE_OK) DEBUG_ALWAYS("cloudsync_refill_metatable error: %s", sqlite3_errmsg(db));
1759 145 : if (pkclause_identifiers) cloudsync_memory_free(pkclause_identifiers);
1760 145 : if (pkdecode) cloudsync_memory_free(pkdecode);
1761 145 : if (pkonclause) cloudsync_memory_free(pkonclause);
1762 145 : if (vm) sqlite3_finalize(vm);
1763 145 : return rc;
1764 145 : }
1765 :
1766 : // MARK: - Local -
1767 :
1768 1 : int local_update_sentinel (sqlite3 *db, cloudsync_table_context *table, const char *pk, size_t pklen, sqlite3_int64 db_version, int seq) {
1769 1 : sqlite3_stmt *vm = table->meta_sentinel_update_stmt;
1770 1 : if (!vm) return -1;
1771 :
1772 1 : int rc = sqlite3_bind_int64(vm, 1, db_version);
1773 1 : if (rc != SQLITE_OK) goto cleanup;
1774 :
1775 1 : rc = sqlite3_bind_int(vm, 2, seq);
1776 1 : if (rc != SQLITE_OK) goto cleanup;
1777 :
1778 1 : rc = sqlite3_bind_blob(vm, 3, pk, (int)pklen, SQLITE_STATIC);
1779 1 : if (rc != SQLITE_OK) goto cleanup;
1780 :
1781 1 : rc = sqlite3_step(vm);
1782 1 : if (rc == SQLITE_DONE) rc = SQLITE_OK;
1783 :
1784 : cleanup:
1785 1 : DEBUG_SQLITE_ERROR(rc, "local_update_sentinel", db);
1786 1 : sqlite3_reset(vm);
1787 1 : return rc;
1788 1 : }
1789 :
1790 122 : int local_mark_insert_sentinel_meta (sqlite3 *db, cloudsync_table_context *table, const char *pk, size_t pklen, sqlite3_int64 db_version, int seq) {
1791 122 : sqlite3_stmt *vm = table->meta_sentinel_insert_stmt;
1792 122 : if (!vm) return -1;
1793 :
1794 122 : int rc = sqlite3_bind_blob(vm, 1, pk, (int)pklen, SQLITE_STATIC);
1795 122 : if (rc != SQLITE_OK) goto cleanup;
1796 :
1797 122 : rc = sqlite3_bind_int64(vm, 2, db_version);
1798 122 : if (rc != SQLITE_OK) goto cleanup;
1799 :
1800 122 : rc = sqlite3_bind_int(vm, 3, seq);
1801 122 : if (rc != SQLITE_OK) goto cleanup;
1802 :
1803 122 : rc = sqlite3_bind_int64(vm, 4, db_version);
1804 122 : if (rc != SQLITE_OK) goto cleanup;
1805 :
1806 122 : rc = sqlite3_bind_int(vm, 5, seq);
1807 122 : if (rc != SQLITE_OK) goto cleanup;
1808 :
1809 122 : rc = sqlite3_step(vm);
1810 122 : if (rc == SQLITE_DONE) rc = SQLITE_OK;
1811 :
1812 : cleanup:
1813 122 : DEBUG_SQLITE_ERROR(rc, "local_insert_sentinel", db);
1814 122 : sqlite3_reset(vm);
1815 122 : return rc;
1816 122 : }
1817 :
1818 10516 : int local_mark_insert_or_update_meta_impl (sqlite3 *db, cloudsync_table_context *table, const char *pk, size_t pklen, const char *col_name, int col_version, sqlite3_int64 db_version, int seq) {
1819 :
1820 10516 : sqlite3_stmt *vm = table->meta_row_insert_update_stmt;
1821 10516 : if (!vm) return -1;
1822 :
1823 10516 : int rc = sqlite3_bind_blob(vm, 1, pk, (int)pklen, SQLITE_STATIC);
1824 10516 : if (rc != SQLITE_OK) goto cleanup;
1825 :
1826 10516 : rc = sqlite3_bind_text(vm, 2, (col_name) ? col_name : CLOUDSYNC_TOMBSTONE_VALUE, -1, SQLITE_STATIC);
1827 10516 : if (rc != SQLITE_OK) goto cleanup;
1828 :
1829 10516 : rc = sqlite3_bind_int(vm, 3, col_version);
1830 10516 : if (rc != SQLITE_OK) goto cleanup;
1831 :
1832 10516 : rc = sqlite3_bind_int64(vm, 4, db_version);
1833 10516 : if (rc != SQLITE_OK) goto cleanup;
1834 :
1835 10516 : rc = sqlite3_bind_int(vm, 5, seq);
1836 10516 : if (rc != SQLITE_OK) goto cleanup;
1837 :
1838 10516 : rc = sqlite3_bind_int64(vm, 6, db_version);
1839 10516 : if (rc != SQLITE_OK) goto cleanup;
1840 :
1841 10516 : rc = sqlite3_bind_int(vm, 7, seq);
1842 10516 : if (rc != SQLITE_OK) goto cleanup;
1843 :
1844 10516 : rc = sqlite3_step(vm);
1845 10516 : if (rc == SQLITE_DONE) rc = SQLITE_OK;
1846 :
1847 : cleanup:
1848 10516 : DEBUG_SQLITE_ERROR(rc, "local_insert_or_update", db);
1849 10516 : sqlite3_reset(vm);
1850 10516 : return rc;
1851 10516 : }
1852 :
1853 10472 : int local_mark_insert_or_update_meta (sqlite3 *db, cloudsync_table_context *table, const char *pk, size_t pklen, const char *col_name, sqlite3_int64 db_version, int seq) {
1854 10472 : return local_mark_insert_or_update_meta_impl(db, table, pk, pklen, col_name, 1, db_version, seq);
1855 : }
1856 :
1857 44 : int local_mark_delete_meta (sqlite3 *db, cloudsync_table_context *table, const char *pk, size_t pklen, sqlite3_int64 db_version, int seq) {
1858 44 : return local_mark_insert_or_update_meta_impl(db, table, pk, pklen, NULL, 2, db_version, seq);
1859 : }
1860 :
1861 13 : int local_drop_meta (sqlite3 *db, cloudsync_table_context *table, const char *pk, size_t pklen) {
1862 13 : sqlite3_stmt *vm = table->meta_row_drop_stmt;
1863 13 : if (!vm) return -1;
1864 :
1865 13 : int rc = sqlite3_bind_blob(vm, 1, pk, (int)pklen, SQLITE_STATIC);
1866 13 : if (rc != SQLITE_OK) goto cleanup;
1867 :
1868 13 : rc = sqlite3_step(vm);
1869 13 : if (rc == SQLITE_DONE) rc = SQLITE_OK;
1870 :
1871 : cleanup:
1872 13 : DEBUG_SQLITE_ERROR(rc, "local_drop_meta", db);
1873 13 : sqlite3_reset(vm);
1874 13 : return rc;
1875 13 : }
1876 :
1877 31 : int local_update_move_meta (sqlite3 *db, cloudsync_table_context *table, const char *pk, size_t pklen, const char *pk2, size_t pklen2, sqlite3_int64 db_version) {
1878 : /*
1879 : * This function moves non-sentinel metadata entries from an old primary key (OLD.pk)
1880 : * to a new primary key (NEW.pk) when a primary key change occurs.
1881 : *
1882 : * To ensure consistency and proper conflict resolution in a CRDT (Conflict-free Replicated Data Type) system,
1883 : * each non-sentinel metadata entry involved in the move must have a unique sequence value (seq).
1884 : *
1885 : * The `seq` is crucial for tracking the order of operations and for detecting and resolving conflicts
1886 : * during synchronization between replicas. Without a unique `seq` for each entry, concurrent updates
1887 : * may be applied incorrectly, leading to data inconsistency.
1888 : *
1889 : * When performing the update, a unique `seq` must be assigned to each metadata row. This can be achieved
1890 : * by either incrementing the maximum sequence value in the table or using a function (e.g., `bump_seq(data)`)
1891 : * that generates a unique sequence for each row. The update query should ensure that each row moved
1892 : * from OLD.pk to NEW.pk gets a distinct `seq` to maintain proper versioning and ordering of changes.
1893 : */
1894 :
1895 : // see https://github.com/sqliteai/sqlite-sync/blob/main/docs/PriKey.md for more details
1896 : // pk2 is the old pk
1897 :
1898 31 : sqlite3_stmt *vm = table->meta_update_move_stmt;
1899 31 : if (!vm) return -1;
1900 :
1901 : // new primary key
1902 31 : int rc = sqlite3_bind_blob(vm, 1, pk, (int)pklen, SQLITE_STATIC);
1903 31 : if (rc != SQLITE_OK) goto cleanup;
1904 :
1905 : // new db_version
1906 31 : rc = sqlite3_bind_int64(vm, 2, db_version);
1907 31 : if (rc != SQLITE_OK) goto cleanup;
1908 :
1909 : // old primary key
1910 31 : rc = sqlite3_bind_blob(vm, 3, pk2, (int)pklen2, SQLITE_STATIC);
1911 31 : if (rc != SQLITE_OK) goto cleanup;
1912 :
1913 31 : rc = sqlite3_step(vm);
1914 31 : if (rc == SQLITE_DONE) rc = SQLITE_OK;
1915 :
1916 : cleanup:
1917 31 : DEBUG_SQLITE_ERROR(rc, "local_update_move_meta", db);
1918 31 : sqlite3_reset(vm);
1919 31 : return rc;
1920 31 : }
1921 :
1922 : // MARK: - Payload Encode / Decode -
1923 :
1924 135 : bool cloudsync_buffer_free (cloudsync_data_payload *payload) {
1925 135 : if (payload) {
1926 135 : if (payload->buffer) cloudsync_memory_free(payload->buffer);
1927 135 : memset(payload, 0, sizeof(cloudsync_data_payload));
1928 135 : }
1929 :
1930 135 : return false;
1931 : }
1932 :
1933 40748 : bool cloudsync_buffer_check (cloudsync_data_payload *payload, size_t needed) {
1934 : // alloc/resize buffer
1935 40748 : if (payload->bused + needed > payload->balloc) {
1936 141 : if (needed < CLOUDSYNC_PAYLOAD_MINBUF_SIZE) needed = CLOUDSYNC_PAYLOAD_MINBUF_SIZE;
1937 141 : size_t balloc = payload->balloc + needed;
1938 :
1939 141 : char *buffer = cloudsync_memory_realloc(payload->buffer, balloc);
1940 141 : if (!buffer) return cloudsync_buffer_free(payload);
1941 :
1942 141 : payload->buffer = buffer;
1943 141 : payload->balloc = balloc;
1944 141 : if (payload->nrows == 0) payload->bused = sizeof(cloudsync_payload_header);
1945 141 : }
1946 :
1947 40748 : return true;
1948 40748 : }
1949 :
1950 135 : void cloudsync_payload_header_init (cloudsync_payload_header *header, uint32_t expanded_size, uint16_t ncols, uint32_t nrows, uint64_t hash) {
1951 135 : memset(header, 0, sizeof(cloudsync_payload_header));
1952 : assert(sizeof(cloudsync_payload_header)==32);
1953 :
1954 : int major, minor, patch;
1955 135 : sscanf(CLOUDSYNC_VERSION, "%d.%d.%d", &major, &minor, &patch);
1956 :
1957 135 : header->signature = htonl(CLOUDSYNC_PAYLOAD_SIGNATURE);
1958 135 : header->version = CLOUDSYNC_PAYLOAD_VERSION;
1959 135 : header->libversion[0] = major;
1960 135 : header->libversion[1] = minor;
1961 135 : header->libversion[2] = patch;
1962 135 : header->expanded_size = htonl(expanded_size);
1963 135 : header->ncols = htons(ncols);
1964 135 : header->nrows = htonl(nrows);
1965 135 : header->schema_hash = htonll(hash);
1966 135 : }
1967 :
1968 40748 : void cloudsync_payload_encode_step (sqlite3_context *context, int argc, sqlite3_value **argv) {
1969 : DEBUG_FUNCTION("cloudsync_payload_encode_step");
1970 : // debug_values(argc, argv);
1971 :
1972 : // allocate/get the session context
1973 40748 : cloudsync_data_payload *payload = (cloudsync_data_payload *)sqlite3_aggregate_context(context, sizeof(cloudsync_data_payload));
1974 40748 : if (!payload) return;
1975 :
1976 : // check if the step function is called for the first time
1977 40748 : if (payload->nrows == 0) payload->ncols = argc;
1978 :
1979 40748 : size_t breq = pk_encode_size(argv, argc, 0);
1980 40748 : if (cloudsync_buffer_check(payload, breq) == false) return;
1981 :
1982 40748 : char *buffer = payload->buffer + payload->bused;
1983 40748 : char *ptr = pk_encode(argv, argc, buffer, false, NULL);
1984 40748 : assert(buffer == ptr);
1985 :
1986 : // update buffer
1987 40748 : payload->bused += breq;
1988 :
1989 : // increment row counter
1990 40748 : ++payload->nrows;
1991 40748 : }
1992 :
1993 143 : void cloudsync_payload_encode_final (sqlite3_context *context) {
1994 : DEBUG_FUNCTION("cloudsync_payload_encode_final");
1995 :
1996 : // get the session context
1997 143 : cloudsync_data_payload *payload = (cloudsync_data_payload *)sqlite3_aggregate_context(context, sizeof(cloudsync_data_payload));
1998 143 : if (!payload) return;
1999 :
2000 143 : if (payload->nrows == 0) {
2001 8 : sqlite3_result_null(context);
2002 8 : return;
2003 : }
2004 :
2005 : // encode payload
2006 135 : int header_size = (int)sizeof(cloudsync_payload_header);
2007 135 : int real_buffer_size = (int)(payload->bused - header_size);
2008 135 : int zbound = LZ4_compressBound(real_buffer_size);
2009 135 : char *buffer = cloudsync_memory_alloc(zbound + header_size);
2010 135 : if (!buffer) {
2011 0 : cloudsync_buffer_free(payload);
2012 0 : sqlite3_result_error_code(context, SQLITE_NOMEM);
2013 0 : return;
2014 : }
2015 :
2016 : // adjust buffer to compress to skip the reserved header
2017 135 : char *src_buffer = payload->buffer + sizeof(cloudsync_payload_header);
2018 135 : int zused = LZ4_compress_default(src_buffer, buffer+header_size, real_buffer_size, zbound);
2019 135 : bool use_uncompressed_buffer = (!zused || zused > real_buffer_size);
2020 135 : CHECK_FORCE_UNCOMPRESSED_BUFFER();
2021 :
2022 : // setup payload header
2023 135 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2024 : cloudsync_payload_header header;
2025 135 : cloudsync_payload_header_init(&header, (use_uncompressed_buffer) ? 0 : real_buffer_size, payload->ncols, (uint32_t)payload->nrows, data->schema_hash);
2026 :
2027 : // if compression fails or if compressed size is bigger than original buffer, then use the uncompressed buffer
2028 135 : if (use_uncompressed_buffer) {
2029 2 : cloudsync_memory_free(buffer);
2030 2 : buffer = payload->buffer;
2031 2 : zused = real_buffer_size;
2032 2 : }
2033 :
2034 : // copy header and data to SQLite BLOB
2035 135 : memcpy(buffer, &header, sizeof(cloudsync_payload_header));
2036 135 : int blob_size = zused+sizeof(cloudsync_payload_header);
2037 135 : sqlite3_result_blob(context, buffer, blob_size, SQLITE_TRANSIENT);
2038 :
2039 : // cleanup memory
2040 135 : cloudsync_buffer_free(payload);
2041 135 : if (!use_uncompressed_buffer) cloudsync_memory_free(buffer);
2042 143 : }
2043 :
2044 133 : cloudsync_payload_apply_callback_t cloudsync_get_payload_apply_callback(sqlite3 *db) {
2045 133 : return (sqlite3_libversion_number() >= 3044000) ? sqlite3_get_clientdata(db, CLOUDSYNC_PAYLOAD_APPLY_CALLBACK_KEY) : NULL;
2046 : }
2047 :
2048 86 : void cloudsync_set_payload_apply_callback(sqlite3 *db, cloudsync_payload_apply_callback_t callback) {
2049 86 : if (sqlite3_libversion_number() >= 3044000) {
2050 86 : sqlite3_set_clientdata(db, CLOUDSYNC_PAYLOAD_APPLY_CALLBACK_KEY, (void*)callback, NULL);
2051 86 : }
2052 86 : }
2053 :
2054 366084 : int cloudsync_pk_decode_bind_callback (void *xdata, int index, int type, int64_t ival, double dval, char *pval) {
2055 366084 : cloudsync_pk_decode_bind_context *decode_context = (cloudsync_pk_decode_bind_context*)xdata;
2056 366084 : int rc = pk_decode_bind_callback(decode_context->vm, index, type, ival, dval, pval);
2057 :
2058 366084 : if (rc == SQLITE_OK) {
2059 : // the dbversion index is smaller than seq index, so it is processed first
2060 : // when processing the dbversion column: save the value to the tmp_dbversion field
2061 : // when processing the seq column: update the dbversion and seq fields only if the current dbversion is greater than the last max value
2062 366084 : switch (index) {
2063 : case CLOUDSYNC_PK_INDEX_TBL:
2064 40676 : if (type == SQLITE_TEXT) {
2065 40676 : decode_context->tbl = pval;
2066 40676 : decode_context->tbl_len = ival;
2067 40676 : }
2068 40676 : break;
2069 : case CLOUDSYNC_PK_INDEX_PK:
2070 40676 : if (type == SQLITE_BLOB) {
2071 40676 : decode_context->pk = pval;
2072 40676 : decode_context->pk_len = ival;
2073 40676 : }
2074 40676 : break;
2075 : case CLOUDSYNC_PK_INDEX_COLNAME:
2076 40676 : if (type == SQLITE_TEXT) {
2077 40676 : decode_context->col_name = pval;
2078 40676 : decode_context->col_name_len = ival;
2079 40676 : }
2080 40676 : break;
2081 : case CLOUDSYNC_PK_INDEX_COLVERSION:
2082 40676 : if (type == SQLITE_INTEGER) decode_context->col_version = ival;
2083 40676 : break;
2084 : case CLOUDSYNC_PK_INDEX_DBVERSION:
2085 40676 : if (type == SQLITE_INTEGER) decode_context->db_version = ival;
2086 40676 : break;
2087 : case CLOUDSYNC_PK_INDEX_SITEID:
2088 40676 : if (type == SQLITE_BLOB) {
2089 40676 : decode_context->site_id = pval;
2090 40676 : decode_context->site_id_len = ival;
2091 40676 : }
2092 40676 : break;
2093 : case CLOUDSYNC_PK_INDEX_CL:
2094 40676 : if (type == SQLITE_INTEGER) decode_context->cl = ival;
2095 40676 : break;
2096 : case CLOUDSYNC_PK_INDEX_SEQ:
2097 40676 : if (type == SQLITE_INTEGER) decode_context->seq = ival;
2098 40676 : break;
2099 : }
2100 366084 : }
2101 :
2102 366084 : return rc;
2103 : }
2104 :
2105 : // #ifndef CLOUDSYNC_OMIT_RLS_VALIDATION
2106 :
2107 135 : int cloudsync_payload_apply (sqlite3_context *context, const char *payload, int blen) {
2108 : // decode header
2109 : cloudsync_payload_header header;
2110 135 : memcpy(&header, payload, sizeof(cloudsync_payload_header));
2111 :
2112 135 : header.signature = ntohl(header.signature);
2113 135 : header.expanded_size = ntohl(header.expanded_size);
2114 135 : header.ncols = ntohs(header.ncols);
2115 135 : header.nrows = ntohl(header.nrows);
2116 135 : header.schema_hash = ntohll(header.schema_hash);
2117 :
2118 135 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2119 135 : if (!data || header.schema_hash != data->schema_hash) {
2120 3 : sqlite3 *db = sqlite3_context_db_handle(context);
2121 3 : if (!dbutils_check_schema_hash(db, header.schema_hash)) {
2122 2 : dbutils_context_result_error(context, "Cannot apply the received payload because the schema hash is unknown %llu.", header.schema_hash);
2123 2 : sqlite3_result_error_code(context, SQLITE_MISMATCH);
2124 2 : return -1;
2125 : }
2126 1 : }
2127 :
2128 : // sanity check header
2129 133 : if ((header.signature != CLOUDSYNC_PAYLOAD_SIGNATURE) || (header.ncols == 0)) {
2130 0 : dbutils_context_result_error(context, "Error on cloudsync_payload_apply: invalid signature or column size.");
2131 0 : sqlite3_result_error_code(context, SQLITE_MISUSE);
2132 0 : return -1;
2133 : }
2134 :
2135 133 : const char *buffer = payload + sizeof(cloudsync_payload_header);
2136 133 : blen -= sizeof(cloudsync_payload_header);
2137 :
2138 : // check if payload is compressed
2139 133 : char *clone = NULL;
2140 133 : if (header.expanded_size != 0) {
2141 131 : clone = (char *)cloudsync_memory_alloc(header.expanded_size);
2142 131 : if (!clone) {sqlite3_result_error_code(context, SQLITE_NOMEM); return -1;}
2143 :
2144 131 : uint32_t rc = LZ4_decompress_safe(buffer, clone, blen, header.expanded_size);
2145 131 : if (rc <= 0 || rc != header.expanded_size) {
2146 0 : dbutils_context_result_error(context, "Error on cloudsync_payload_apply: unable to decompress BLOB (%d).", rc);
2147 0 : sqlite3_result_error_code(context, SQLITE_MISUSE);
2148 0 : return -1;
2149 : }
2150 :
2151 131 : buffer = (const char *)clone;
2152 131 : }
2153 :
2154 : // apply payload inside a transaction
2155 133 : sqlite3 *db = sqlite3_context_db_handle(context);
2156 133 : int rc = sqlite3_exec(db, "SAVEPOINT cloudsync_payload_apply;", NULL, NULL, NULL);
2157 133 : if (rc != SQLITE_OK) {
2158 0 : dbutils_context_result_error(context, "Error on cloudsync_payload_apply: unable to start a transaction (%s).", sqlite3_errmsg(db));
2159 0 : if (clone) cloudsync_memory_free(clone);
2160 0 : return -1;
2161 : }
2162 :
2163 : // precompile the insert statement
2164 133 : sqlite3_stmt *vm = NULL;
2165 133 : const char *sql = "INSERT INTO cloudsync_changes(tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq) VALUES (?,?,?,?,?,?,?,?,?);";
2166 133 : rc = sqlite3_prepare(db, sql, -1, &vm, NULL);
2167 133 : if (rc != SQLITE_OK) {
2168 0 : dbutils_context_result_error(context, "Error on cloudsync_payload_apply: error while compiling SQL statement (%s).", sqlite3_errmsg(db));
2169 0 : if (clone) cloudsync_memory_free(clone);
2170 0 : return -1;
2171 : }
2172 :
2173 : // process buffer, one row at a time
2174 133 : uint16_t ncols = header.ncols;
2175 133 : uint32_t nrows = header.nrows;
2176 133 : int dbversion = dbutils_settings_get_int_value(db, CLOUDSYNC_KEY_CHECK_DBVERSION);
2177 133 : int seq = dbutils_settings_get_int_value(db, CLOUDSYNC_KEY_CHECK_SEQ);
2178 133 : cloudsync_pk_decode_bind_context decoded_context = {.vm = vm};
2179 133 : void *payload_apply_xdata = NULL;
2180 133 : cloudsync_payload_apply_callback_t payload_apply_callback = cloudsync_get_payload_apply_callback(db);
2181 :
2182 40809 : for (uint32_t i=0; i<nrows; ++i) {
2183 40676 : size_t seek = 0;
2184 40676 : pk_decode((char *)buffer, blen, ncols, &seek, cloudsync_pk_decode_bind_callback, &decoded_context);
2185 : // n is the pk_decode return value, I don't think I should assert here because in any case the next sqlite3_step would fail
2186 : // assert(n == ncols);
2187 :
2188 40676 : bool approved = true;
2189 40676 : if (payload_apply_callback) approved = payload_apply_callback(&payload_apply_xdata, &decoded_context, db, data, CLOUDSYNC_PAYLOAD_APPLY_WILL_APPLY, SQLITE_OK);
2190 :
2191 40676 : if (approved) {
2192 40676 : rc = sqlite3_step(vm);
2193 40676 : if (rc != SQLITE_DONE) {
2194 : // don't "break;", the error can be due to a RLS policy.
2195 : // in case of error we try to apply the following changes
2196 0 : printf("cloudsync_payload_apply error on db_version %lld/%lld: (%d) %s\n", decoded_context.db_version, decoded_context.seq, rc, sqlite3_errmsg(db));
2197 0 : }
2198 40676 : }
2199 :
2200 40676 : if (payload_apply_callback) payload_apply_callback(&payload_apply_xdata, &decoded_context, db, data, CLOUDSYNC_PAYLOAD_APPLY_DID_APPLY, rc);
2201 :
2202 40676 : buffer += seek;
2203 40676 : blen -= seek;
2204 40676 : stmt_reset(vm);
2205 40676 : }
2206 :
2207 133 : char *lasterr = (rc != SQLITE_OK && rc != SQLITE_DONE) ? cloudsync_string_dup(sqlite3_errmsg(db), false) : NULL;
2208 133 : sql = (lasterr) ? "ROLLBACK TO cloudsync_payload_apply;" : "RELEASE cloudsync_payload_apply;";
2209 133 : sqlite3_exec(db, sql, NULL, NULL, NULL);
2210 :
2211 133 : if (payload_apply_callback) {
2212 133 : payload_apply_callback(&payload_apply_xdata, &decoded_context, db, data, CLOUDSYNC_PAYLOAD_APPLY_CLEANUP, rc);
2213 133 : }
2214 :
2215 133 : if (rc == SQLITE_DONE) rc = SQLITE_OK;
2216 133 : if (rc == SQLITE_OK) {
2217 : char buf[256];
2218 133 : if (decoded_context.db_version >= dbversion) {
2219 129 : snprintf(buf, sizeof(buf), "%lld", decoded_context.db_version);
2220 129 : dbutils_settings_set_key_value(db, context, CLOUDSYNC_KEY_CHECK_DBVERSION, buf);
2221 :
2222 129 : if (decoded_context.seq != seq) {
2223 73 : snprintf(buf, sizeof(buf), "%lld", decoded_context.seq);
2224 73 : dbutils_settings_set_key_value(db, context, CLOUDSYNC_KEY_CHECK_SEQ, buf);
2225 73 : }
2226 129 : }
2227 133 : }
2228 :
2229 : // cleanup vm
2230 133 : if (vm) sqlite3_finalize(vm);
2231 :
2232 : // cleanup memory
2233 133 : if (clone) cloudsync_memory_free(clone);
2234 :
2235 133 : if (rc != SQLITE_OK) {
2236 0 : sqlite3_result_error(context, lasterr, -1);
2237 0 : sqlite3_result_error_code(context, SQLITE_MISUSE);
2238 0 : cloudsync_memory_free(lasterr);
2239 0 : return -1;
2240 : }
2241 :
2242 : // return the number of processed rows
2243 133 : sqlite3_result_int(context, nrows);
2244 133 : return nrows;
2245 135 : }
2246 :
2247 135 : void cloudsync_payload_decode (sqlite3_context *context, int argc, sqlite3_value **argv) {
2248 : DEBUG_FUNCTION("cloudsync_payload_decode");
2249 : //debug_values(argc, argv);
2250 :
2251 : // sanity check payload type
2252 135 : if (sqlite3_value_type(argv[0]) != SQLITE_BLOB) {
2253 0 : dbutils_context_result_error(context, "Error on cloudsync_payload_decode: value must be a BLOB.");
2254 0 : sqlite3_result_error_code(context, SQLITE_MISUSE);
2255 0 : return;
2256 : }
2257 :
2258 : // sanity check payload size
2259 135 : int blen = sqlite3_value_bytes(argv[0]);
2260 135 : if (blen < (int)sizeof(cloudsync_payload_header)) {
2261 0 : dbutils_context_result_error(context, "Error on cloudsync_payload_decode: invalid input size.");
2262 0 : sqlite3_result_error_code(context, SQLITE_MISUSE);
2263 0 : return;
2264 : }
2265 :
2266 : // obtain payload
2267 135 : const char *payload = (const char *)sqlite3_value_blob(argv[0]);
2268 :
2269 : // apply changes
2270 135 : cloudsync_payload_apply(context, payload, blen);
2271 135 : }
2272 :
2273 : // MARK: - Payload load/store -
2274 :
2275 0 : int cloudsync_payload_get (sqlite3_context *context, char **blob, int *blob_size, int *db_version, int *seq, sqlite3_int64 *new_db_version, sqlite3_int64 *new_seq) {
2276 0 : sqlite3 *db = sqlite3_context_db_handle(context);
2277 :
2278 0 : *db_version = dbutils_settings_get_int_value(db, CLOUDSYNC_KEY_SEND_DBVERSION);
2279 0 : if (*db_version < 0) {sqlite3_result_error(context, "Unable to retrieve db_version.", -1); return SQLITE_ERROR;}
2280 :
2281 0 : *seq = dbutils_settings_get_int_value(db, CLOUDSYNC_KEY_SEND_SEQ);
2282 0 : if (*seq < 0) {sqlite3_result_error(context, "Unable to retrieve seq.", -1); return SQLITE_ERROR;}
2283 :
2284 : // retrieve BLOB
2285 : char sql[1024];
2286 0 : snprintf(sql, sizeof(sql), "WITH max_db_version AS (SELECT MAX(db_version) AS max_db_version FROM cloudsync_changes) "
2287 : "SELECT cloudsync_payload_encode(tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq), max_db_version AS max_db_version, MAX(IIF(db_version = max_db_version, seq, NULL)) FROM cloudsync_changes, max_db_version WHERE site_id=cloudsync_siteid() AND (db_version>%d OR (db_version=%d AND seq>%d))", *db_version, *db_version, *seq);
2288 :
2289 0 : int rc = dbutils_blob_int_int_select(db, sql, blob, blob_size, new_db_version, new_seq);
2290 0 : if (rc != SQLITE_OK) {
2291 0 : sqlite3_result_error(context, "cloudsync_network_send_changes unable to get changes", -1);
2292 0 : sqlite3_result_error_code(context, rc);
2293 0 : return rc;
2294 : }
2295 :
2296 : // exit if there is no data to send
2297 0 : if (blob == NULL || blob_size == 0) return SQLITE_OK;
2298 0 : return rc;
2299 0 : }
2300 :
2301 : #ifdef CLOUDSYNC_DESKTOP_OS
2302 :
2303 0 : void cloudsync_payload_save (sqlite3_context *context, int argc, sqlite3_value **argv) {
2304 : DEBUG_FUNCTION("cloudsync_payload_save");
2305 :
2306 : // sanity check argument
2307 0 : if (sqlite3_value_type(argv[0]) != SQLITE_TEXT) {
2308 0 : sqlite3_result_error(context, "Unable to retrieve file path.", -1);
2309 0 : return;
2310 : }
2311 :
2312 : // retrieve full path to file
2313 0 : const char *path = (const char *)sqlite3_value_text(argv[0]);
2314 0 : file_delete(path);
2315 :
2316 : // retrieve payload
2317 0 : char *blob = NULL;
2318 0 : int blob_size = 0, db_version = 0, seq = 0;
2319 0 : sqlite3_int64 new_db_version = 0, new_seq = 0;
2320 0 : int rc = cloudsync_payload_get(context, &blob, &blob_size, &db_version, &seq, &new_db_version, &new_seq);
2321 0 : if (rc != SQLITE_OK) return;
2322 :
2323 : // exit if there is no data to send
2324 0 : if (blob == NULL || blob_size == 0) return;
2325 :
2326 : // write payload to file
2327 0 : bool res = file_write(path, blob, (size_t)blob_size);
2328 0 : sqlite3_free(blob);
2329 :
2330 0 : if (res == false) {
2331 0 : sqlite3_result_error(context, "Unable to write payload to file path.", -1);
2332 0 : return;
2333 : }
2334 :
2335 : // update db_version and seq
2336 : char buf[256];
2337 0 : sqlite3 *db = sqlite3_context_db_handle(context);
2338 0 : if (new_db_version != db_version) {
2339 0 : snprintf(buf, sizeof(buf), "%lld", new_db_version);
2340 0 : dbutils_settings_set_key_value(db, context, CLOUDSYNC_KEY_SEND_DBVERSION, buf);
2341 0 : }
2342 0 : if (new_seq != seq) {
2343 0 : snprintf(buf, sizeof(buf), "%lld", new_seq);
2344 0 : dbutils_settings_set_key_value(db, context, CLOUDSYNC_KEY_SEND_SEQ, buf);
2345 0 : }
2346 :
2347 : // returns blob size
2348 0 : sqlite3_result_int64(context, (sqlite3_int64)blob_size);
2349 0 : }
2350 :
2351 0 : void cloudsync_payload_load (sqlite3_context *context, int argc, sqlite3_value **argv) {
2352 : DEBUG_FUNCTION("cloudsync_payload_load");
2353 :
2354 : // sanity check argument
2355 0 : if (sqlite3_value_type(argv[0]) != SQLITE_TEXT) {
2356 0 : sqlite3_result_error(context, "Unable to retrieve file path.", -1);
2357 0 : return;
2358 : }
2359 :
2360 : // retrieve full path to file
2361 0 : const char *path = (const char *)sqlite3_value_text(argv[0]);
2362 :
2363 0 : sqlite3_int64 payload_size = 0;
2364 0 : char *payload = file_read(path, &payload_size);
2365 0 : if (!payload) {
2366 0 : if (payload_size == -1) sqlite3_result_error(context, "Unable to read payload from file path.", -1);
2367 0 : if (payload) cloudsync_memory_free(payload);
2368 0 : return;
2369 : }
2370 :
2371 0 : int nrows = (payload_size) ? cloudsync_payload_apply (context, payload, (int)payload_size) : 0;
2372 0 : if (payload) cloudsync_memory_free(payload);
2373 :
2374 : // returns number of applied rows
2375 0 : if (nrows != -1) sqlite3_result_int(context, nrows);
2376 0 : }
2377 :
2378 : #endif
2379 :
2380 : // MARK: - Public -
2381 :
2382 3 : void cloudsync_version (sqlite3_context *context, int argc, sqlite3_value **argv) {
2383 : DEBUG_FUNCTION("cloudsync_version");
2384 3 : UNUSED_PARAMETER(argc);
2385 3 : UNUSED_PARAMETER(argv);
2386 3 : sqlite3_result_text(context, CLOUDSYNC_VERSION, -1, SQLITE_STATIC);
2387 3 : }
2388 :
2389 16 : void cloudsync_siteid (sqlite3_context *context, int argc, sqlite3_value **argv) {
2390 : DEBUG_FUNCTION("cloudsync_siteid");
2391 16 : UNUSED_PARAMETER(argc);
2392 16 : UNUSED_PARAMETER(argv);
2393 :
2394 16 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2395 16 : sqlite3_result_blob(context, data->site_id, UUID_LEN, SQLITE_STATIC);
2396 16 : }
2397 :
2398 3 : void cloudsync_db_version (sqlite3_context *context, int argc, sqlite3_value **argv) {
2399 : DEBUG_FUNCTION("cloudsync_db_version");
2400 3 : UNUSED_PARAMETER(argc);
2401 3 : UNUSED_PARAMETER(argv);
2402 :
2403 : // retrieve context
2404 3 : sqlite3 *db = sqlite3_context_db_handle(context);
2405 3 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2406 :
2407 3 : int rc = db_version_check_uptodate(db, data);
2408 3 : if (rc != SQLITE_OK) {
2409 0 : dbutils_context_result_error(context, "Unable to retrieve db_version (%s).", sqlite3_errmsg(db));
2410 0 : return;
2411 : }
2412 :
2413 3 : sqlite3_result_int64(context, data->db_version);
2414 3 : }
2415 :
2416 21703 : void cloudsync_db_version_next (sqlite3_context *context, int argc, sqlite3_value **argv) {
2417 : DEBUG_FUNCTION("cloudsync_db_version_next");
2418 :
2419 : // retrieve context
2420 21703 : sqlite3 *db = sqlite3_context_db_handle(context);
2421 21703 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2422 :
2423 21703 : sqlite3_int64 merging_version = (argc == 1) ? sqlite3_value_int64(argv[0]) : CLOUDSYNC_VALUE_NOTSET;
2424 21703 : sqlite3_int64 value = db_version_next(db, data, merging_version);
2425 21703 : if (value == -1) {
2426 0 : dbutils_context_result_error(context, "Unable to retrieve next_db_version (%s).", sqlite3_errmsg(db));
2427 0 : return;
2428 : }
2429 :
2430 21703 : sqlite3_result_int64(context, value);
2431 21703 : }
2432 :
2433 23 : void cloudsync_seq (sqlite3_context *context, int argc, sqlite3_value **argv) {
2434 : DEBUG_FUNCTION("cloudsync_seq");
2435 :
2436 : // retrieve context
2437 23 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2438 23 : sqlite3_result_int(context, BUMP_SEQ(data));
2439 23 : }
2440 :
2441 1 : void cloudsync_uuid (sqlite3_context *context, int argc, sqlite3_value **argv) {
2442 : DEBUG_FUNCTION("cloudsync_uuid");
2443 :
2444 : char value[UUID_STR_MAXLEN];
2445 1 : char *uuid = cloudsync_uuid_v7_string(value, true);
2446 1 : sqlite3_result_text(context, uuid, -1, SQLITE_TRANSIENT);
2447 1 : }
2448 :
2449 : // MARK: -
2450 :
2451 1 : void cloudsync_set (sqlite3_context *context, int argc, sqlite3_value **argv) {
2452 : DEBUG_FUNCTION("cloudsync_set");
2453 :
2454 : // sanity check parameters
2455 1 : const char *key = (const char *)sqlite3_value_text(argv[0]);
2456 1 : const char *value = (const char *)sqlite3_value_text(argv[1]);
2457 :
2458 : // silently fails
2459 1 : if (key == NULL) return;
2460 :
2461 1 : sqlite3 *db = sqlite3_context_db_handle(context);
2462 1 : dbutils_settings_set_key_value(db, context, key, value);
2463 1 : }
2464 :
2465 1 : void cloudsync_set_column (sqlite3_context *context, int argc, sqlite3_value **argv) {
2466 : DEBUG_FUNCTION("cloudsync_set_column");
2467 :
2468 1 : const char *tbl = (const char *)sqlite3_value_text(argv[0]);
2469 1 : const char *col = (const char *)sqlite3_value_text(argv[1]);
2470 1 : const char *key = (const char *)sqlite3_value_text(argv[2]);
2471 1 : const char *value = (const char *)sqlite3_value_text(argv[3]);
2472 1 : dbutils_table_settings_set_key_value(NULL, context, tbl, col, key, value);
2473 1 : }
2474 :
2475 1 : void cloudsync_set_table (sqlite3_context *context, int argc, sqlite3_value **argv) {
2476 : DEBUG_FUNCTION("cloudsync_set_table");
2477 :
2478 1 : const char *tbl = (const char *)sqlite3_value_text(argv[0]);
2479 1 : const char *key = (const char *)sqlite3_value_text(argv[1]);
2480 1 : const char *value = (const char *)sqlite3_value_text(argv[2]);
2481 1 : dbutils_table_settings_set_key_value(NULL, context, tbl, "*", key, value);
2482 1 : }
2483 :
2484 22261 : void cloudsync_is_sync (sqlite3_context *context, int argc, sqlite3_value **argv) {
2485 : DEBUG_FUNCTION("cloudsync_is_sync");
2486 :
2487 22261 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2488 22261 : if (data->insync) {
2489 18850 : sqlite3_result_int(context, 1);
2490 18850 : return;
2491 : }
2492 :
2493 3411 : const char *table_name = (const char *)sqlite3_value_text(argv[0]);
2494 3411 : cloudsync_table_context *table = table_lookup(data, table_name);
2495 3411 : sqlite3_result_int(context, (table) ? (table->enabled == 0) : 0);
2496 22261 : }
2497 :
2498 82832 : void cloudsync_col_value (sqlite3_context *context, int argc, sqlite3_value **argv) {
2499 : // DEBUG_FUNCTION("cloudsync_col_value");
2500 :
2501 : // argv[0] -> table name
2502 : // argv[1] -> column name
2503 : // argv[2] -> encoded pk
2504 :
2505 : // lookup table
2506 82832 : const char *table_name = (const char *)sqlite3_value_text(argv[0]);
2507 82832 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2508 82832 : cloudsync_table_context *table = table_lookup(data, table_name);
2509 82832 : if (!table) {
2510 0 : dbutils_context_result_error(context, "Unable to retrieve table name %s in clousdsync_colvalue.", table_name);
2511 0 : return;
2512 : }
2513 :
2514 : // retrieve column name
2515 82832 : const char *col_name = (const char *)sqlite3_value_text(argv[1]);
2516 :
2517 : // check for special tombstone value
2518 82832 : if (strcmp(col_name, CLOUDSYNC_TOMBSTONE_VALUE) == 0) {
2519 963 : sqlite3_result_null(context);
2520 963 : return;
2521 : }
2522 :
2523 : // extract the right col_value vm associated to the column name
2524 81869 : sqlite3_stmt *vm = table_column_lookup(table, col_name, false, NULL);
2525 81869 : if (!vm) {
2526 0 : sqlite3_result_error(context, "Unable to retrieve column value precompiled statement in clousdsync_colvalue.", -1);
2527 0 : return;
2528 : }
2529 :
2530 : // bind primary key values
2531 81869 : int rc = pk_decode_prikey((char *)sqlite3_value_blob(argv[2]), (size_t)sqlite3_value_bytes(argv[2]), pk_decode_bind_callback, (void *)vm);
2532 81869 : if (rc < 0) goto cleanup;
2533 :
2534 : // execute vm
2535 81869 : rc = sqlite3_step(vm);
2536 163738 : if (rc == SQLITE_DONE) {
2537 0 : rc = SQLITE_OK;
2538 0 : sqlite3_result_text(context, CLOUDSYNC_RLS_RESTRICTED_VALUE, -1, SQLITE_STATIC);
2539 81869 : } else if (rc == SQLITE_ROW) {
2540 : // store value result
2541 81869 : rc = SQLITE_OK;
2542 81869 : sqlite3_result_value(context, sqlite3_column_value(vm, 0));
2543 81869 : }
2544 :
2545 : cleanup:
2546 81869 : if (rc != SQLITE_OK) {
2547 0 : sqlite3 *db = sqlite3_context_db_handle(context);
2548 0 : sqlite3_result_error(context, sqlite3_errmsg(db), -1);
2549 0 : }
2550 81869 : sqlite3_reset(vm);
2551 82832 : }
2552 :
2553 10213 : void cloudsync_pk_encode (sqlite3_context *context, int argc, sqlite3_value **argv) {
2554 10213 : size_t bsize = 0;
2555 10213 : char *buffer = pk_encode_prikey(argv, argc, NULL, &bsize);
2556 10213 : if (!buffer) {
2557 0 : sqlite3_result_null(context);
2558 0 : return;
2559 : }
2560 10213 : sqlite3_result_blob(context, (const void *)buffer, (int)bsize, SQLITE_TRANSIENT);
2561 10213 : cloudsync_memory_free(buffer);
2562 10213 : }
2563 :
2564 6208 : int cloudsync_pk_decode_set_result_callback (void *xdata, int index, int type, int64_t ival, double dval, char *pval) {
2565 6208 : cloudsync_pk_decode_context *decode_context = (cloudsync_pk_decode_context *)xdata;
2566 : // decode_context->index is 1 based
2567 : // index is 0 based
2568 6208 : if (decode_context->index != index+1) return SQLITE_OK;
2569 :
2570 3144 : int rc = 0;
2571 3144 : sqlite3_context *context = decode_context->context;
2572 3144 : switch (type) {
2573 : case SQLITE_INTEGER:
2574 0 : sqlite3_result_int64(context, ival);
2575 0 : break;
2576 :
2577 : case SQLITE_FLOAT:
2578 0 : sqlite3_result_double(context, dval);
2579 0 : break;
2580 :
2581 : case SQLITE_NULL:
2582 0 : sqlite3_result_null(context);
2583 0 : break;
2584 :
2585 : case SQLITE_TEXT:
2586 3144 : sqlite3_result_text(context, pval, (int)ival, SQLITE_TRANSIENT);
2587 3144 : break;
2588 :
2589 : case SQLITE_BLOB:
2590 0 : sqlite3_result_blob(context, pval, (int)ival, SQLITE_TRANSIENT);
2591 0 : break;
2592 : }
2593 :
2594 3144 : return rc;
2595 6208 : }
2596 :
2597 :
2598 3144 : void cloudsync_pk_decode (sqlite3_context *context, int argc, sqlite3_value **argv) {
2599 3144 : const char *pk = (const char *)sqlite3_value_text(argv[0]);
2600 3144 : int i = sqlite3_value_int(argv[1]);
2601 :
2602 3144 : cloudsync_pk_decode_context xdata = {.context = context, .index = i};
2603 3144 : pk_decode_prikey((char *)pk, strlen(pk), cloudsync_pk_decode_set_result_callback, &xdata);
2604 3144 : }
2605 :
2606 : // MARK: -
2607 :
2608 3366 : void cloudsync_insert (sqlite3_context *context, int argc, sqlite3_value **argv) {
2609 : DEBUG_FUNCTION("cloudsync_insert %s", sqlite3_value_text(argv[0]));
2610 : // debug_values(argc-1, &argv[1]);
2611 :
2612 : // argv[0] is table name
2613 : // argv[1]..[N] is primary key(s)
2614 :
2615 : // table_cloudsync
2616 : // pk -> encode(argc-1, &argv[1])
2617 : // col_name -> name
2618 : // col_version -> 0/1 +1
2619 : // db_version -> check
2620 : // site_id 0
2621 : // seq -> sqlite_master
2622 :
2623 : // retrieve context
2624 3366 : sqlite3 *db = sqlite3_context_db_handle(context);
2625 3366 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2626 :
2627 : // lookup table
2628 3366 : const char *table_name = (const char *)sqlite3_value_text(argv[0]);
2629 3366 : cloudsync_table_context *table = table_lookup(data, table_name);
2630 3366 : if (!table) {
2631 0 : dbutils_context_result_error(context, "Unable to retrieve table name %s in cloudsync_insert.", table_name);
2632 0 : return;
2633 : }
2634 :
2635 : // encode the primary key values into a buffer
2636 : char buffer[1024];
2637 3366 : size_t pklen = sizeof(buffer);
2638 3366 : char *pk = pk_encode_prikey(&argv[1], table->npks, buffer, &pklen);
2639 3366 : if (!pk) {
2640 0 : sqlite3_result_error(context, "Not enough memory to encode the primary key(s).", -1);
2641 0 : return;
2642 : }
2643 :
2644 : // compute the next database version for tracking changes
2645 3366 : sqlite3_int64 db_version = db_version_next(db, data, CLOUDSYNC_VALUE_NOTSET);
2646 :
2647 : // check if a row with the same primary key already exists
2648 : // if so, this means the row might have been previously deleted (sentinel)
2649 3366 : bool pk_exists = (bool)stmt_count(table->meta_pkexists_stmt, pk, pklen, SQLITE_BLOB);
2650 3366 : int rc = SQLITE_OK;
2651 :
2652 3366 : if (table->ncols == 0) {
2653 : // if there are no columns other than primary keys, insert a sentinel record
2654 91 : rc = local_mark_insert_sentinel_meta(db, table, pk, pklen, db_version, BUMP_SEQ(data));
2655 91 : if (rc != SQLITE_OK) goto cleanup;
2656 3366 : } else if (pk_exists){
2657 : // if a row with the same primary key already exists, update the sentinel record
2658 1 : rc = local_update_sentinel(db, table, pk, pklen, db_version, BUMP_SEQ(data));
2659 1 : if (rc != SQLITE_OK) goto cleanup;
2660 1 : }
2661 :
2662 : // process each non-primary key column for insert or update
2663 13743 : for (int i=0; i<table->ncols; ++i) {
2664 : // mark the column as inserted or updated in the metadata
2665 10377 : rc = local_mark_insert_or_update_meta(db, table, pk, pklen, table->col_name[i], db_version, BUMP_SEQ(data));
2666 10377 : if (rc != SQLITE_OK) goto cleanup;
2667 13743 : }
2668 :
2669 : cleanup:
2670 3366 : if (rc != SQLITE_OK) sqlite3_result_error(context, sqlite3_errmsg(db), -1);
2671 : // free memory if the primary key was dynamically allocated
2672 3366 : if (pk != buffer) cloudsync_memory_free(pk);
2673 3366 : }
2674 :
2675 13 : void cloudsync_delete (sqlite3_context *context, int argc, sqlite3_value **argv) {
2676 : DEBUG_FUNCTION("cloudsync_delete %s", sqlite3_value_text(argv[0]));
2677 : // debug_values(argc-1, &argv[1]);
2678 :
2679 : // retrieve context
2680 13 : sqlite3 *db = sqlite3_context_db_handle(context);
2681 13 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2682 :
2683 : // lookup table
2684 13 : const char *table_name = (const char *)sqlite3_value_text(argv[0]);
2685 13 : cloudsync_table_context *table = table_lookup(data, table_name);
2686 13 : if (!table) {
2687 0 : dbutils_context_result_error(context, "Unable to retrieve table name %s in cloudsync_delete.", table_name);
2688 0 : return;
2689 : }
2690 :
2691 : // compute the next database version for tracking changes
2692 13 : sqlite3_int64 db_version = db_version_next(db, data, CLOUDSYNC_VALUE_NOTSET);
2693 13 : int rc = SQLITE_OK;
2694 :
2695 : // encode the primary key values into a buffer
2696 : char buffer[1024];
2697 13 : size_t pklen = sizeof(buffer);
2698 13 : char *pk = pk_encode_prikey(&argv[1], table->npks, buffer, &pklen);
2699 13 : if (!pk) {
2700 0 : sqlite3_result_error(context, "Not enough memory to encode the primary key(s).", -1);
2701 0 : return;
2702 : }
2703 :
2704 : // mark the row as deleted by inserting a delete sentinel into the metadata
2705 13 : rc = local_mark_delete_meta(db, table, pk, pklen, db_version, BUMP_SEQ(data));
2706 13 : if (rc != SQLITE_OK) goto cleanup;
2707 :
2708 : // remove any metadata related to the old rows associated with this primary key
2709 13 : rc = local_drop_meta(db, table, pk, pklen);
2710 13 : if (rc != SQLITE_OK) goto cleanup;
2711 :
2712 : cleanup:
2713 13 : if (rc != SQLITE_OK) sqlite3_result_error(context, sqlite3_errmsg(db), -1);
2714 : // free memory if the primary key was dynamically allocated
2715 13 : if (pk != buffer) cloudsync_memory_free(pk);
2716 13 : }
2717 :
2718 : // MARK: -
2719 :
2720 62 : void cloudsync_update_payload_free (cloudsync_update_payload *payload) {
2721 931 : for (int i=0; i<payload->count; i++) {
2722 869 : sqlite3_value_free(payload->new_values[i]);
2723 869 : sqlite3_value_free(payload->old_values[i]);
2724 869 : }
2725 62 : cloudsync_memory_free(payload->new_values);
2726 62 : cloudsync_memory_free(payload->old_values);
2727 62 : sqlite3_value_free(payload->table_name);
2728 62 : payload->new_values = NULL;
2729 62 : payload->old_values = NULL;
2730 62 : payload->table_name = NULL;
2731 62 : payload->count = 0;
2732 62 : payload->capacity = 0;
2733 62 : }
2734 :
2735 869 : int cloudsync_update_payload_append (cloudsync_update_payload *payload, sqlite3_value *v1, sqlite3_value *v2, sqlite3_value *v3) {
2736 869 : if (payload->count >= payload->capacity) {
2737 65 : int newcap = payload->capacity ? payload->capacity * 2 : 128;
2738 :
2739 65 : sqlite3_value **new_values_2 = (sqlite3_value **)cloudsync_memory_realloc(payload->new_values, newcap * sizeof(*new_values_2));
2740 65 : if (!new_values_2) return SQLITE_NOMEM;
2741 65 : payload->new_values = new_values_2;
2742 :
2743 65 : sqlite3_value **old_values_2 = (sqlite3_value **)cloudsync_memory_realloc(payload->old_values, newcap * sizeof(*old_values_2));
2744 65 : if (!old_values_2) return SQLITE_NOMEM;
2745 65 : payload->old_values = old_values_2;
2746 :
2747 65 : payload->capacity = newcap;
2748 65 : }
2749 :
2750 869 : int index = payload->count;
2751 869 : if (payload->table_name == NULL) payload->table_name = sqlite3_value_dup(v1);
2752 807 : else if (dbutils_value_compare(payload->table_name, v1) != 0) return SQLITE_NOMEM;
2753 869 : payload->new_values[index] = sqlite3_value_dup(v2);
2754 869 : payload->old_values[index] = sqlite3_value_dup(v3);
2755 869 : payload->count++;
2756 :
2757 : // sanity check memory allocations
2758 869 : bool v1_can_be_null = (sqlite3_value_type(v1) == SQLITE_NULL);
2759 869 : bool v2_can_be_null = (sqlite3_value_type(v2) == SQLITE_NULL);
2760 869 : bool v3_can_be_null = (sqlite3_value_type(v3) == SQLITE_NULL);
2761 :
2762 869 : if ((payload->table_name == NULL) && (!v1_can_be_null)) return SQLITE_NOMEM;
2763 869 : if ((payload->old_values[index] == NULL) && (!v2_can_be_null)) return SQLITE_NOMEM;
2764 869 : if ((payload->new_values[index] == NULL) && (!v3_can_be_null)) return SQLITE_NOMEM;
2765 :
2766 869 : return SQLITE_OK;
2767 869 : }
2768 :
2769 869 : void cloudsync_update_step (sqlite3_context *context, int argc, sqlite3_value **argv) {
2770 : // argv[0] => table_name
2771 : // argv[1] => new_column_value
2772 : // argv[2] => old_column_value
2773 :
2774 : // allocate/get the update payload
2775 869 : cloudsync_update_payload *payload = (cloudsync_update_payload *)sqlite3_aggregate_context(context, sizeof(cloudsync_update_payload));
2776 869 : if (!payload) {sqlite3_result_error_nomem(context); return;}
2777 :
2778 869 : if (cloudsync_update_payload_append(payload, argv[0], argv[1], argv[2]) != SQLITE_OK) {
2779 0 : sqlite3_result_error_nomem(context);
2780 0 : }
2781 869 : }
2782 :
2783 62 : void cloudsync_update_final (sqlite3_context *context) {
2784 62 : cloudsync_update_payload *payload = (cloudsync_update_payload *)sqlite3_aggregate_context(context, sizeof(cloudsync_update_payload));
2785 62 : if (!payload || payload->count == 0) return;
2786 :
2787 : // retrieve context
2788 62 : sqlite3 *db = sqlite3_context_db_handle(context);
2789 62 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2790 :
2791 : // lookup table
2792 62 : const char *table_name = (const char *)sqlite3_value_text(payload->table_name);
2793 62 : cloudsync_table_context *table = table_lookup(data, table_name);
2794 62 : if (!table) {
2795 0 : dbutils_context_result_error(context, "Unable to retrieve table name %s in cloudsync_update.", table_name);
2796 0 : return;
2797 : }
2798 :
2799 : // compute the next database version for tracking changes
2800 62 : sqlite3_int64 db_version = db_version_next(db, data, CLOUDSYNC_VALUE_NOTSET);
2801 62 : int rc = SQLITE_OK;
2802 :
2803 : // Check if the primary key(s) have changed
2804 62 : bool prikey_changed = false;
2805 120 : for (int i=0; i<table->npks; ++i) {
2806 89 : if (dbutils_value_compare(payload->old_values[i], payload->new_values[i]) != 0) {
2807 31 : prikey_changed = true;
2808 31 : break;
2809 : }
2810 58 : }
2811 :
2812 : // encode the NEW primary key values into a buffer (used later for indexing)
2813 : char buffer[1024];
2814 : char buffer2[1024];
2815 62 : size_t pklen = sizeof(buffer);
2816 62 : size_t oldpklen = sizeof(buffer2);
2817 62 : char *oldpk = NULL;
2818 :
2819 62 : char *pk = pk_encode_prikey(payload->new_values, table->npks, buffer, &pklen);
2820 62 : if (!pk) {
2821 0 : sqlite3_result_error(context, "Not enough memory to encode the primary key(s).", -1);
2822 0 : return;
2823 : }
2824 :
2825 62 : if (prikey_changed) {
2826 : // if the primary key has changed, we need to handle the row differently:
2827 : // 1. mark the old row (OLD primary key) as deleted
2828 : // 2. create a new row (NEW primary key)
2829 :
2830 : // encode the OLD primary key into a buffer
2831 31 : oldpk = pk_encode_prikey(payload->old_values, table->npks, buffer2, &oldpklen);
2832 31 : if (!oldpk) {
2833 0 : if (pk != buffer) cloudsync_memory_free(pk);
2834 0 : sqlite3_result_error(context, "Not enough memory to encode the primary key(s).", -1);
2835 0 : return;
2836 : }
2837 :
2838 : // mark the rows with the old primary key as deleted in the metadata (old row handling)
2839 31 : rc = local_mark_delete_meta(db, table, oldpk, oldpklen, db_version, BUMP_SEQ(data));
2840 31 : if (rc != SQLITE_OK) goto cleanup;
2841 :
2842 : // move non-sentinel metadata entries from OLD primary key to NEW primary key
2843 : // handles the case where some metadata is retained across primary key change
2844 : // see https://github.com/sqliteai/sqlite-sync/blob/main/docs/PriKey.md for more details
2845 31 : rc = local_update_move_meta(db, table, pk, pklen, oldpk, oldpklen, db_version);
2846 31 : if (rc != SQLITE_OK) goto cleanup;
2847 :
2848 : // mark a new sentinel row with the new primary key in the metadata
2849 31 : rc = local_mark_insert_sentinel_meta(db, table, pk, pklen, db_version, BUMP_SEQ(data));
2850 31 : if (rc != SQLITE_OK) goto cleanup;
2851 :
2852 : // free memory if the OLD primary key was dynamically allocated
2853 31 : if (oldpk != buffer2) cloudsync_memory_free(oldpk);
2854 31 : oldpk = NULL;
2855 31 : }
2856 :
2857 : // compare NEW and OLD values (excluding primary keys) to handle column updates
2858 812 : for (int i=0; i<table->ncols; i++) {
2859 750 : int col_index = table->npks + i; // Regular columns start after primary keys
2860 :
2861 750 : if (dbutils_value_compare(payload->old_values[col_index], payload->new_values[col_index]) != 0) {
2862 : // if a column value has changed, mark it as updated in the metadata
2863 : // columns are in cid order
2864 57 : rc = local_mark_insert_or_update_meta(db, table, pk, pklen, table->col_name[i], db_version, BUMP_SEQ(data));
2865 57 : if (rc != SQLITE_OK) goto cleanup;
2866 57 : }
2867 812 : }
2868 :
2869 : cleanup:
2870 62 : if (rc != SQLITE_OK) sqlite3_result_error(context, sqlite3_errmsg(db), -1);
2871 62 : if (pk != buffer) cloudsync_memory_free(pk);
2872 62 : if (oldpk && (oldpk != buffer2)) cloudsync_memory_free(oldpk);
2873 :
2874 62 : cloudsync_update_payload_free(payload);
2875 62 : }
2876 :
2877 : // MARK: -
2878 :
2879 6 : int cloudsync_cleanup_internal (sqlite3_context *context, const char *table_name) {
2880 6 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2881 :
2882 : // get database reference
2883 6 : sqlite3 *db = sqlite3_context_db_handle(context);
2884 :
2885 : // init cloudsync_settings
2886 6 : if (cloudsync_context_init(db, data, context) == NULL) return SQLITE_MISUSE;
2887 :
2888 6 : cloudsync_table_context *table = table_lookup(data, table_name);
2889 6 : if (!table) return SQLITE_OK;
2890 :
2891 6 : table_remove_from_context(data, table);
2892 6 : table_free(table);
2893 :
2894 : // drop meta-table
2895 6 : char *sql = cloudsync_memory_mprintf("DROP TABLE IF EXISTS \"%w_cloudsync\";", table_name);
2896 6 : int rc = sqlite3_exec(db, sql, NULL, NULL, NULL);
2897 6 : cloudsync_memory_free(sql);
2898 6 : if (rc != SQLITE_OK) {
2899 0 : dbutils_context_result_error(context, "Unable to drop cloudsync table %s_cloudsync in cloudsync_cleanup.", table_name);
2900 0 : sqlite3_result_error_code(context, rc);
2901 0 : return rc;
2902 : }
2903 :
2904 : // drop original triggers
2905 6 : dbutils_delete_triggers(db, table_name);
2906 6 : if (rc != SQLITE_OK) {
2907 0 : dbutils_context_result_error(context, "Unable to drop cloudsync table %s_cloudsync in cloudsync_cleanup.", table_name);
2908 0 : sqlite3_result_error_code(context, rc);
2909 0 : return rc;
2910 : }
2911 :
2912 : // remove all table related settings
2913 6 : dbutils_table_settings_set_key_value(db, context, table_name, NULL, NULL, NULL);
2914 :
2915 6 : return SQLITE_OK;
2916 6 : }
2917 :
2918 1 : void cloudsync_cleanup_all (sqlite3_context *context) {
2919 1 : char *sql = "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'cloudsync_%' AND name NOT LIKE '%_cloudsync';";
2920 :
2921 1 : sqlite3 *db = sqlite3_context_db_handle(context);
2922 1 : char **result = NULL;
2923 : int nrows, ncols;
2924 : char *errmsg;
2925 1 : int rc = sqlite3_get_table(db, sql, &result, &nrows, &ncols, &errmsg);
2926 1 : if (errmsg || ncols != 1) {
2927 0 : printf("cloudsync_cleanup_all error: %s\n", errmsg ? errmsg : "invalid table");
2928 0 : goto cleanup;
2929 : }
2930 :
2931 1 : rc = SQLITE_OK;
2932 6 : for (int i = ncols; i < nrows+ncols; i+=ncols) {
2933 5 : int rc2 = cloudsync_cleanup_internal(context, result[i]);
2934 5 : if (rc2 != SQLITE_OK) rc = rc2;
2935 5 : }
2936 :
2937 2 : if (rc == SQLITE_OK) {
2938 1 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2939 1 : data->site_id[0] = 0;
2940 1 : dbutils_settings_cleanup(db);
2941 1 : }
2942 :
2943 : cleanup:
2944 1 : sqlite3_free_table(result);
2945 1 : sqlite3_free(errmsg);
2946 1 : }
2947 :
2948 2 : void cloudsync_cleanup (sqlite3_context *context, int argc, sqlite3_value **argv) {
2949 : DEBUG_FUNCTION("cloudsync_cleanup");
2950 :
2951 2 : const char *table = (const char *)sqlite3_value_text(argv[0]);
2952 2 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2953 2 : sqlite3 *db = sqlite3_context_db_handle(context);
2954 :
2955 2 : if (dbutils_is_star_table(table)) cloudsync_cleanup_all(context);
2956 1 : else cloudsync_cleanup_internal(context, table);
2957 :
2958 2 : if (dbutils_table_exists(db, CLOUDSYNC_TABLE_SETTINGS_NAME) == true) dbutils_update_schema_hash(db, &data->schema_hash);
2959 2 : }
2960 :
2961 2 : void cloudsync_enable_disable (sqlite3_context *context, const char *table_name, bool value) {
2962 : DEBUG_FUNCTION("cloudsync_enable_disable");
2963 :
2964 2 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2965 2 : cloudsync_table_context *table = table_lookup(data, table_name);
2966 2 : if (!table) return;
2967 :
2968 2 : table->enabled = value;
2969 2 : }
2970 :
2971 28 : int cloudsync_enable_disable_all_callback (void *xdata, int ncols, char **values, char **names) {
2972 28 : sqlite3_context *context = (sqlite3_context *)xdata;
2973 28 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2974 28 : bool value = data->temp_bool;
2975 :
2976 56 : for (int i=0; i<ncols; i++) {
2977 28 : const char *table_name = values[i];
2978 28 : cloudsync_table_context *table = table_lookup(data, table_name);
2979 28 : if (!table) continue;
2980 10 : table->enabled = value;
2981 10 : }
2982 :
2983 28 : return SQLITE_OK;
2984 : }
2985 :
2986 2 : void cloudsync_enable_disable_all (sqlite3_context *context, bool value) {
2987 : DEBUG_FUNCTION("cloudsync_enable_disable_all");
2988 :
2989 2 : char *sql = "SELECT name FROM sqlite_master WHERE type='table';";
2990 :
2991 2 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2992 2 : data->temp_bool = value;
2993 2 : sqlite3 *db = sqlite3_context_db_handle(context);
2994 2 : sqlite3_exec(db, sql, cloudsync_enable_disable_all_callback, context, NULL);
2995 2 : }
2996 :
2997 2 : void cloudsync_enable (sqlite3_context *context, int argc, sqlite3_value **argv) {
2998 : DEBUG_FUNCTION("cloudsync_enable");
2999 :
3000 2 : const char *table = (const char *)sqlite3_value_text(argv[0]);
3001 2 : if (dbutils_is_star_table(table)) cloudsync_enable_disable_all(context, true);
3002 1 : else cloudsync_enable_disable(context, table, true);
3003 2 : }
3004 :
3005 2 : void cloudsync_disable (sqlite3_context *context, int argc, sqlite3_value **argv) {
3006 : DEBUG_FUNCTION("cloudsync_disable");
3007 :
3008 2 : const char *table = (const char *)sqlite3_value_text(argv[0]);
3009 2 : if (dbutils_is_star_table(table)) cloudsync_enable_disable_all(context, false);
3010 1 : else cloudsync_enable_disable(context, table, false);
3011 2 : }
3012 :
3013 2854 : void cloudsync_is_enabled (sqlite3_context *context, int argc, sqlite3_value **argv) {
3014 : DEBUG_FUNCTION("cloudsync_is_enabled");
3015 :
3016 2854 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
3017 2854 : const char *table_name = (const char *)sqlite3_value_text(argv[0]);
3018 2854 : cloudsync_table_context *table = table_lookup(data, table_name);
3019 :
3020 2854 : int result = (table && table->enabled) ? 1 : 0;
3021 2854 : sqlite3_result_int(context, result);
3022 2854 : }
3023 :
3024 88 : void cloudsync_terminate (sqlite3_context *context, int argc, sqlite3_value **argv) {
3025 : DEBUG_FUNCTION("cloudsync_terminate");
3026 :
3027 88 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
3028 :
3029 206 : for (int i=0; i<data->tables_count; ++i) {
3030 118 : if (data->tables[i]) table_free(data->tables[i]);
3031 118 : data->tables[i] = NULL;
3032 118 : }
3033 :
3034 88 : if (data->schema_version_stmt) sqlite3_finalize(data->schema_version_stmt);
3035 88 : if (data->data_version_stmt) sqlite3_finalize(data->data_version_stmt);
3036 88 : if (data->db_version_stmt) sqlite3_finalize(data->db_version_stmt);
3037 88 : if (data->getset_siteid_stmt) sqlite3_finalize(data->getset_siteid_stmt);
3038 :
3039 88 : data->schema_version_stmt = NULL;
3040 88 : data->data_version_stmt = NULL;
3041 88 : data->db_version_stmt = NULL;
3042 88 : data->getset_siteid_stmt = NULL;
3043 :
3044 : // reset the site_id so the cloudsync_context_init will be executed again
3045 : // if any other cloudsync function is called after terminate
3046 88 : data->site_id[0] = 0;
3047 :
3048 88 : sqlite3_result_int(context, 1);
3049 88 : }
3050 :
3051 : // MARK: -
3052 :
3053 89 : int cloudsync_load_siteid (sqlite3 *db, cloudsync_context *data) {
3054 : // check if site_id was already loaded
3055 89 : if (data->site_id[0] != 0) return SQLITE_OK;
3056 :
3057 : // load site_id
3058 : int size, rc;
3059 88 : char *buffer = dbutils_blob_select(db, "SELECT site_id FROM cloudsync_site_id WHERE rowid=0;", &size, data->sqlite_ctx, &rc);
3060 88 : if (!buffer) return rc;
3061 88 : if (size != UUID_LEN) return SQLITE_MISUSE;
3062 :
3063 88 : memcpy(data->site_id, buffer, UUID_LEN);
3064 88 : cloudsync_memory_free(buffer);
3065 :
3066 88 : return SQLITE_OK;
3067 89 : }
3068 :
3069 147 : int cloudsync_init_internal (sqlite3_context *context, const char *table_name, const char *algo_name, bool skip_int_pk_check) {
3070 : DEBUG_FUNCTION("cloudsync_init_internal");
3071 :
3072 : // get database reference
3073 147 : sqlite3 *db = sqlite3_context_db_handle(context);
3074 :
3075 : // retrieve global context
3076 147 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
3077 :
3078 : // sanity check table and its primary key(s)
3079 147 : if (dbutils_table_sanity_check(db, context, table_name, skip_int_pk_check) == false) {
3080 1 : return SQLITE_MISUSE;
3081 : }
3082 :
3083 : // init cloudsync_settings
3084 146 : if (cloudsync_context_init(db, data, context) == NULL) return SQLITE_MISUSE;
3085 :
3086 : // sanity check algo name (if exists)
3087 146 : table_algo algo_new = table_algo_none;
3088 146 : if (!algo_name) {
3089 4 : algo_name = CLOUDSYNC_DEFAULT_ALGO;
3090 4 : }
3091 :
3092 146 : algo_new = crdt_algo_from_name(algo_name);
3093 146 : if (algo_new == table_algo_none) {
3094 1 : dbutils_context_result_error(context, "algo name %s does not exist", crdt_algo_name);
3095 1 : return SQLITE_MISUSE;
3096 : }
3097 :
3098 : // check if table name was already augmented
3099 145 : table_algo algo_current = dbutils_table_settings_get_algo(db, table_name);
3100 :
3101 : // sanity check algorithm
3102 145 : if ((algo_new == algo_current) && (algo_current != table_algo_none)) {
3103 : // if table algorithms and the same and not none, do nothing
3104 145 : } else if ((algo_new == table_algo_none) && (algo_current == table_algo_none)) {
3105 : // nothing is written into settings because the default table_algo_crdt_cls will be used
3106 0 : algo_new = algo_current = table_algo_crdt_cls;
3107 116 : } else if ((algo_new == table_algo_none) && (algo_current != table_algo_none)) {
3108 : // algo is already written into settins so just use it
3109 0 : algo_new = algo_current;
3110 116 : } else if ((algo_new != table_algo_none) && (algo_current == table_algo_none)) {
3111 : // write table algo name in settings
3112 116 : dbutils_table_settings_set_key_value(NULL, context, table_name, "*", "algo", algo_name);
3113 116 : } else {
3114 : // error condition
3115 0 : dbutils_context_result_error(context, "%s", "Before changing a table algorithm you must call cloudsync_cleanup(table_name)");
3116 0 : return SQLITE_MISUSE;
3117 : }
3118 :
3119 : // Run the following function even if table was already augmented.
3120 : // It is safe to call the following function multiple times, if there is nothing to update nothing will be changed.
3121 : // After an alter table, in contrast, all the cloudsync triggers, tables and stmts must be recreated.
3122 :
3123 : // sync algo with table (unused in this version)
3124 : // cloudsync_sync_table_key(data, table_name, "*", CLOUDSYNC_KEY_ALGO, crdt_algo_name(algo_new));
3125 :
3126 : // check triggers
3127 145 : int rc = dbutils_check_triggers(db, table_name, algo_new);
3128 145 : if (rc != SQLITE_OK) {
3129 0 : dbutils_context_result_error(context, "An error occurred while creating triggers: %s (%d)", sqlite3_errmsg(db), rc);
3130 0 : return SQLITE_MISUSE;
3131 : }
3132 :
3133 : // check meta-table
3134 145 : rc = dbutils_check_metatable(db, table_name, algo_new);
3135 145 : if (rc != SQLITE_OK) {
3136 0 : dbutils_context_result_error(context, "An error occurred while creating metatable: %s (%d)", sqlite3_errmsg(db), rc);
3137 0 : return SQLITE_MISUSE;
3138 : }
3139 :
3140 : // add prepared statements
3141 145 : if (stmts_add_tocontext(db, data) != SQLITE_OK) {
3142 0 : dbutils_context_result_error(context, "%s", "An error occurred while trying to compile prepared SQL statements.");
3143 0 : return SQLITE_MISUSE;
3144 : }
3145 :
3146 : // add table to in-memory data context
3147 145 : if (table_add_to_context(db, data, algo_new, table_name) == false) {
3148 0 : dbutils_context_result_error(context, "An error occurred while adding %s table information to global context", table_name);
3149 0 : return SQLITE_MISUSE;
3150 : }
3151 :
3152 145 : if (cloudsync_refill_metatable(db, data, table_name) != SQLITE_OK) {
3153 0 : dbutils_context_result_error(context, "%s", "An error occurred while trying to fill the augmented table.");
3154 0 : return SQLITE_MISUSE;
3155 : }
3156 :
3157 145 : return SQLITE_OK;
3158 147 : }
3159 :
3160 1 : int cloudsync_init_all (sqlite3_context *context, const char *algo_name, bool skip_int_pk_check) {
3161 : char sql[1024];
3162 1 : snprintf(sql, sizeof(sql), "SELECT name, '%s' FROM sqlite_master WHERE type='table' and name NOT LIKE 'sqlite_%%' AND name NOT LIKE 'cloudsync_%%' AND name NOT LIKE '%%_cloudsync';", (algo_name) ? algo_name : CLOUDSYNC_DEFAULT_ALGO);
3163 :
3164 1 : sqlite3 *db = sqlite3_context_db_handle(context);
3165 1 : sqlite3_stmt *vm = NULL;
3166 1 : int rc = sqlite3_prepare_v2(db, sql, -1, &vm, NULL);
3167 1 : if (rc != SQLITE_OK) goto abort_init_all;
3168 :
3169 6 : while (1) {
3170 6 : rc = sqlite3_step(vm);
3171 6 : if (rc == SQLITE_DONE) break;
3172 5 : else if (rc != SQLITE_ROW) goto abort_init_all;
3173 :
3174 5 : const char *table = (const char *)sqlite3_column_text(vm, 0);
3175 5 : const char *algo = (const char *)sqlite3_column_text(vm, 1);
3176 5 : rc = cloudsync_init_internal(context, table, algo, skip_int_pk_check);
3177 5 : if (rc != SQLITE_OK) {cloudsync_cleanup_internal(context, table); goto abort_init_all;}
3178 : }
3179 1 : rc = SQLITE_OK;
3180 :
3181 : abort_init_all:
3182 1 : if (vm) sqlite3_finalize(vm);
3183 1 : return rc;
3184 : }
3185 :
3186 120 : void cloudsync_init (sqlite3_context *context, const char *table, const char *algo, bool skip_int_pk_check) {
3187 120 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
3188 120 : data->sqlite_ctx = context;
3189 :
3190 120 : sqlite3 *db = sqlite3_context_db_handle(context);
3191 120 : int rc = sqlite3_exec(db, "SAVEPOINT cloudsync_init;", NULL, NULL, NULL);
3192 120 : if (rc != SQLITE_OK) {
3193 0 : dbutils_context_result_error(context, "Unable to create cloudsync_init savepoint. %s", sqlite3_errmsg(db));
3194 0 : sqlite3_result_error_code(context, rc);
3195 0 : return;
3196 : }
3197 :
3198 120 : if (dbutils_is_star_table(table)) rc = cloudsync_init_all(context, algo, skip_int_pk_check);
3199 119 : else rc = cloudsync_init_internal(context, table, algo, skip_int_pk_check);
3200 :
3201 120 : if (rc == SQLITE_OK) {
3202 118 : rc = sqlite3_exec(db, "RELEASE cloudsync_init", NULL, NULL, NULL);
3203 118 : if (rc != SQLITE_OK) {
3204 0 : dbutils_context_result_error(context, "Unable to release cloudsync_init savepoint. %s", sqlite3_errmsg(db));
3205 0 : sqlite3_result_error_code(context, rc);
3206 0 : }
3207 118 : }
3208 :
3209 : // in case of error, rollback transaction
3210 120 : if (rc != SQLITE_OK) {
3211 2 : sqlite3_exec(db, "ROLLBACK TO cloudsync_init; RELEASE cloudsync_init", NULL, NULL, NULL);
3212 2 : return;
3213 : }
3214 :
3215 118 : dbutils_update_schema_hash(db, &data->schema_hash);
3216 :
3217 : // returns site_id as TEXT
3218 : char buffer[UUID_STR_MAXLEN];
3219 118 : cloudsync_uuid_v7_stringify(data->site_id, buffer, false);
3220 118 : sqlite3_result_text(context, buffer, -1, NULL);
3221 120 : }
3222 :
3223 31 : void cloudsync_init3 (sqlite3_context *context, int argc, sqlite3_value **argv) {
3224 : DEBUG_FUNCTION("cloudsync_init2");
3225 :
3226 31 : const char *table = (const char *)sqlite3_value_text(argv[0]);
3227 31 : const char *algo = (const char *)sqlite3_value_text(argv[1]);
3228 31 : bool skip_int_pk_check = (bool)sqlite3_value_int(argv[2]);
3229 :
3230 31 : cloudsync_init(context, table, algo, skip_int_pk_check);
3231 31 : }
3232 :
3233 83 : void cloudsync_init2 (sqlite3_context *context, int argc, sqlite3_value **argv) {
3234 : DEBUG_FUNCTION("cloudsync_init2");
3235 :
3236 83 : const char *table = (const char *)sqlite3_value_text(argv[0]);
3237 83 : const char *algo = (const char *)sqlite3_value_text(argv[1]);
3238 :
3239 83 : cloudsync_init(context, table, algo, false);
3240 83 : }
3241 :
3242 6 : void cloudsync_init1 (sqlite3_context *context, int argc, sqlite3_value **argv) {
3243 : DEBUG_FUNCTION("cloudsync_init1");
3244 :
3245 6 : const char *table = (const char *)sqlite3_value_text(argv[0]);
3246 :
3247 6 : cloudsync_init(context, table, NULL, false);
3248 6 : }
3249 :
3250 : // MARK: -
3251 :
3252 24 : void cloudsync_begin_alter (sqlite3_context *context, int argc, sqlite3_value **argv) {
3253 : DEBUG_FUNCTION("cloudsync_begin_alter");
3254 24 : char *errmsg = NULL;
3255 24 : char **result = NULL;
3256 :
3257 24 : const char *table_name = (const char *)sqlite3_value_text(argv[0]);
3258 :
3259 : // get database reference
3260 24 : sqlite3 *db = sqlite3_context_db_handle(context);
3261 :
3262 : // retrieve global context
3263 24 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
3264 :
3265 : // init cloudsync_settings
3266 24 : if (cloudsync_context_init(db, data, context) == NULL) {
3267 0 : sqlite3_result_error(context, "Unable to init the cloudsync context.", -1);
3268 0 : sqlite3_result_error_code(context, SQLITE_MISUSE);
3269 0 : return;
3270 : }
3271 :
3272 : // create a savepoint to manage the alter operations as a transaction
3273 24 : int rc = sqlite3_exec(db, "SAVEPOINT cloudsync_alter", NULL, NULL, NULL);
3274 24 : if (rc != SQLITE_OK) {
3275 0 : sqlite3_result_error(context, "Unable to create cloudsync_alter savepoint.", -1);
3276 0 : sqlite3_result_error_code(context, rc);
3277 0 : goto rollback_begin_alter;
3278 : }
3279 :
3280 24 : cloudsync_table_context *table = table_lookup(data, table_name);
3281 24 : if (!table) {
3282 1 : dbutils_context_result_error(context, "Unable to find table %s", table_name);
3283 1 : sqlite3_result_error_code(context, SQLITE_MISUSE);
3284 1 : goto rollback_begin_alter;
3285 : }
3286 :
3287 : int nrows, ncols;
3288 23 : char *sql = cloudsync_memory_mprintf("SELECT name FROM pragma_table_info('%q') WHERE pk>0 ORDER BY pk;", table_name);
3289 23 : rc = sqlite3_get_table(db, sql, &result, &nrows, &ncols, &errmsg);
3290 23 : cloudsync_memory_free(sql);
3291 23 : if (errmsg || ncols != 1 || nrows != table->npks) {
3292 0 : dbutils_context_result_error(context, "Unable to get primary keys for table %s (%s)", table_name, errmsg);
3293 0 : sqlite3_result_error_code(context, SQLITE_MISUSE);
3294 0 : goto rollback_begin_alter;
3295 : }
3296 :
3297 : // drop original triggers
3298 23 : dbutils_delete_triggers(db, table_name);
3299 23 : if (rc != SQLITE_OK) {
3300 0 : dbutils_context_result_error(context, "Unable to delete triggers for table %s in cloudsync_begin_alter.", table_name);
3301 0 : sqlite3_result_error_code(context, rc);
3302 0 : goto rollback_begin_alter;
3303 : }
3304 :
3305 23 : if (table->pk_name) sqlite3_free_table(table->pk_name);
3306 23 : table->pk_name = result;
3307 23 : return;
3308 :
3309 : rollback_begin_alter:
3310 1 : sqlite3_exec(db, "ROLLBACK TO cloudsync_alter; RELEASE cloudsync_alter;", NULL, NULL, NULL);
3311 :
3312 : cleanup_begin_alter:
3313 1 : sqlite3_free_table(result);
3314 1 : sqlite3_free(errmsg);
3315 24 : }
3316 :
3317 24 : void cloudsync_commit_alter (sqlite3_context *context, int argc, sqlite3_value **argv) {
3318 : DEBUG_FUNCTION("cloudsync_commit_alter");
3319 :
3320 24 : const char *table_name = (const char *)sqlite3_value_text(argv[0]);
3321 24 : cloudsync_table_context *table = NULL;
3322 :
3323 : // get database reference
3324 24 : sqlite3 *db = sqlite3_context_db_handle(context);
3325 :
3326 : // retrieve global context
3327 24 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
3328 :
3329 : // init cloudsync_settings
3330 24 : if (cloudsync_context_init(db, data, context) == NULL) {
3331 0 : dbutils_context_result_error(context, "Unable to init the cloudsync context.");
3332 0 : sqlite3_result_error_code(context, SQLITE_MISUSE);
3333 0 : goto rollback_finalize_alter;
3334 : }
3335 :
3336 24 : table = table_lookup(data, table_name);
3337 24 : if (!table || !table->pk_name) {
3338 1 : dbutils_context_result_error(context, "Unable to find table context.");
3339 1 : sqlite3_result_error_code(context, SQLITE_MISUSE);
3340 1 : goto rollback_finalize_alter;
3341 : }
3342 :
3343 23 : int rc = cloudsync_finalize_alter(context, data, table);
3344 23 : if (rc != SQLITE_OK) goto rollback_finalize_alter;
3345 :
3346 : // the table is outdated, delete it and it will be reloaded in the cloudsync_init_internal
3347 23 : table_remove(data, table_name);
3348 23 : table_free(table);
3349 23 : table = NULL;
3350 :
3351 : // init again cloudsync for the table
3352 23 : table_algo algo_current = dbutils_table_settings_get_algo(db, table_name);
3353 23 : if (algo_current == table_algo_none) algo_current = dbutils_table_settings_get_algo(db, "*");
3354 23 : rc = cloudsync_init_internal(context, table_name, crdt_algo_name(algo_current), true);
3355 23 : if (rc != SQLITE_OK) goto rollback_finalize_alter;
3356 :
3357 : // release savepoint
3358 23 : rc = sqlite3_exec(db, "RELEASE cloudsync_alter", NULL, NULL, NULL);
3359 23 : if (rc != SQLITE_OK) {
3360 0 : dbutils_context_result_error(context, sqlite3_errmsg(db));
3361 0 : sqlite3_result_error_code(context, rc);
3362 0 : goto rollback_finalize_alter;
3363 : }
3364 :
3365 23 : dbutils_update_schema_hash(db, &data->schema_hash);
3366 :
3367 23 : return;
3368 :
3369 : rollback_finalize_alter:
3370 1 : sqlite3_exec(db, "ROLLBACK TO cloudsync_alter; RELEASE cloudsync_alter;", NULL, NULL, NULL);
3371 1 : if (table) {
3372 0 : sqlite3_free_table(table->pk_name);
3373 0 : table->pk_name = NULL;
3374 0 : }
3375 24 : }
3376 :
3377 : // MARK: - Main Entrypoint -
3378 :
3379 88 : int cloudsync_register (sqlite3 *db, char **pzErrMsg) {
3380 88 : int rc = SQLITE_OK;
3381 :
3382 : // there's no built-in way to verify if sqlite3_cloudsync_init has already been called
3383 : // for this specific database connection, we use a workaround: we attempt to retrieve the
3384 : // cloudsync_version and check for an error, an error indicates that initialization has not been performed
3385 88 : if (sqlite3_exec(db, "SELECT cloudsync_version();", NULL, NULL, NULL) == SQLITE_OK) return SQLITE_OK;
3386 :
3387 : // init memory debugger (NOOP in production)
3388 : cloudsync_memory_init(1);
3389 :
3390 : // init context
3391 86 : void *ctx = cloudsync_context_create();
3392 86 : if (!ctx) {
3393 0 : if (pzErrMsg) *pzErrMsg = "Not enought memory to create a database context";
3394 0 : return SQLITE_NOMEM;
3395 : }
3396 :
3397 : // register functions
3398 :
3399 : // PUBLIC functions
3400 86 : rc = dbutils_register_function(db, "cloudsync_version", cloudsync_version, 0, pzErrMsg, ctx, cloudsync_context_free);
3401 86 : if (rc != SQLITE_OK) return rc;
3402 :
3403 86 : rc = dbutils_register_function(db, "cloudsync_init", cloudsync_init1, 1, pzErrMsg, ctx, NULL);
3404 86 : if (rc != SQLITE_OK) return rc;
3405 :
3406 86 : rc = dbutils_register_function(db, "cloudsync_init", cloudsync_init2, 2, pzErrMsg, ctx, NULL);
3407 86 : if (rc != SQLITE_OK) return rc;
3408 :
3409 86 : rc = dbutils_register_function(db, "cloudsync_init", cloudsync_init3, 3, pzErrMsg, ctx, NULL);
3410 86 : if (rc != SQLITE_OK) return rc;
3411 :
3412 :
3413 86 : rc = dbutils_register_function(db, "cloudsync_enable", cloudsync_enable, 1, pzErrMsg, ctx, NULL);
3414 86 : if (rc != SQLITE_OK) return rc;
3415 :
3416 86 : rc = dbutils_register_function(db, "cloudsync_disable", cloudsync_disable, 1, pzErrMsg, ctx, NULL);
3417 86 : if (rc != SQLITE_OK) return rc;
3418 :
3419 86 : rc = dbutils_register_function(db, "cloudsync_is_enabled", cloudsync_is_enabled, 1, pzErrMsg, ctx, NULL);
3420 86 : if (rc != SQLITE_OK) return rc;
3421 :
3422 86 : rc = dbutils_register_function(db, "cloudsync_cleanup", cloudsync_cleanup, 1, pzErrMsg, ctx, NULL);
3423 86 : if (rc != SQLITE_OK) return rc;
3424 :
3425 86 : rc = dbutils_register_function(db, "cloudsync_terminate", cloudsync_terminate, 0, pzErrMsg, ctx, NULL);
3426 86 : if (rc != SQLITE_OK) return rc;
3427 :
3428 86 : rc = dbutils_register_function(db, "cloudsync_set", cloudsync_set, 2, pzErrMsg, ctx, NULL);
3429 86 : if (rc != SQLITE_OK) return rc;
3430 :
3431 86 : rc = dbutils_register_function(db, "cloudsync_set_table", cloudsync_set_table, 3, pzErrMsg, ctx, NULL);
3432 86 : if (rc != SQLITE_OK) return rc;
3433 :
3434 86 : rc = dbutils_register_function(db, "cloudsync_set_column", cloudsync_set_column, 4, pzErrMsg, ctx, NULL);
3435 86 : if (rc != SQLITE_OK) return rc;
3436 :
3437 86 : rc = dbutils_register_function(db, "cloudsync_siteid", cloudsync_siteid, 0, pzErrMsg, ctx, NULL);
3438 86 : if (rc != SQLITE_OK) return rc;
3439 :
3440 86 : rc = dbutils_register_function(db, "cloudsync_db_version", cloudsync_db_version, 0, pzErrMsg, ctx, NULL);
3441 86 : if (rc != SQLITE_OK) return rc;
3442 :
3443 86 : rc = dbutils_register_function(db, "cloudsync_db_version_next", cloudsync_db_version_next, 0, pzErrMsg, ctx, NULL);
3444 86 : if (rc != SQLITE_OK) return rc;
3445 :
3446 86 : rc = dbutils_register_function(db, "cloudsync_db_version_next", cloudsync_db_version_next, 1, pzErrMsg, ctx, NULL);
3447 86 : if (rc != SQLITE_OK) return rc;
3448 :
3449 86 : rc = dbutils_register_function(db, "cloudsync_begin_alter", cloudsync_begin_alter, 1, pzErrMsg, ctx, NULL);
3450 86 : if (rc != SQLITE_OK) return rc;
3451 :
3452 86 : rc = dbutils_register_function(db, "cloudsync_commit_alter", cloudsync_commit_alter, 1, pzErrMsg, ctx, NULL);
3453 86 : if (rc != SQLITE_OK) return rc;
3454 :
3455 86 : rc = dbutils_register_function(db, "cloudsync_uuid", cloudsync_uuid, 0, pzErrMsg, ctx, NULL);
3456 86 : if (rc != SQLITE_OK) return rc;
3457 :
3458 : // PAYLOAD
3459 86 : rc = dbutils_register_aggregate(db, "cloudsync_payload_encode", cloudsync_payload_encode_step, cloudsync_payload_encode_final, -1, pzErrMsg, ctx, NULL);
3460 86 : if (rc != SQLITE_OK) return rc;
3461 :
3462 86 : rc = dbutils_register_function(db, "cloudsync_payload_decode", cloudsync_payload_decode, -1, pzErrMsg, ctx, NULL);
3463 86 : if (rc != SQLITE_OK) return rc;
3464 :
3465 : #ifdef CLOUDSYNC_DESKTOP_OS
3466 86 : rc = dbutils_register_function(db, "cloudsync_payload_save", cloudsync_payload_save, 1, pzErrMsg, ctx, NULL);
3467 86 : if (rc != SQLITE_OK) return rc;
3468 :
3469 86 : rc = dbutils_register_function(db, "cloudsync_payload_load", cloudsync_payload_load, 1, pzErrMsg, ctx, NULL);
3470 86 : if (rc != SQLITE_OK) return rc;
3471 : #endif
3472 :
3473 : // PRIVATE functions
3474 86 : rc = dbutils_register_function(db, "cloudsync_is_sync", cloudsync_is_sync, 1, pzErrMsg, ctx, NULL);
3475 86 : if (rc != SQLITE_OK) return rc;
3476 :
3477 86 : rc = dbutils_register_function(db, "cloudsync_insert", cloudsync_insert, -1, pzErrMsg, ctx, NULL);
3478 86 : if (rc != SQLITE_OK) return rc;
3479 :
3480 86 : rc = dbutils_register_aggregate(db, "cloudsync_update", cloudsync_update_step, cloudsync_update_final, 3, pzErrMsg, ctx, NULL);
3481 86 : if (rc != SQLITE_OK) return rc;
3482 :
3483 86 : rc = dbutils_register_function(db, "cloudsync_delete", cloudsync_delete, -1, pzErrMsg, ctx, NULL);
3484 86 : if (rc != SQLITE_OK) return rc;
3485 :
3486 86 : rc = dbutils_register_function(db, "cloudsync_col_value", cloudsync_col_value, 3, pzErrMsg, ctx, NULL);
3487 86 : if (rc != SQLITE_OK) return rc;
3488 :
3489 86 : rc = dbutils_register_function(db, "cloudsync_pk_encode", cloudsync_pk_encode, -1, pzErrMsg, ctx, NULL);
3490 86 : if (rc != SQLITE_OK) return rc;
3491 :
3492 86 : rc = dbutils_register_function(db, "cloudsync_pk_decode", cloudsync_pk_decode, 2, pzErrMsg, ctx, NULL);
3493 86 : if (rc != SQLITE_OK) return rc;
3494 :
3495 86 : rc = dbutils_register_function(db, "cloudsync_seq", cloudsync_seq, 0, pzErrMsg, ctx, NULL);
3496 86 : if (rc != SQLITE_OK) return rc;
3497 :
3498 : // NETWORK LAYER
3499 : #ifndef CLOUDSYNC_OMIT_NETWORK
3500 : rc = cloudsync_network_register(db, pzErrMsg, ctx);
3501 : if (rc != SQLITE_OK) return rc;
3502 : #endif
3503 :
3504 86 : cloudsync_context *data = (cloudsync_context *)ctx;
3505 86 : sqlite3_commit_hook(db, cloudsync_commit_hook, ctx);
3506 86 : sqlite3_rollback_hook(db, cloudsync_rollback_hook, ctx);
3507 :
3508 : // register eponymous only changes virtual table
3509 86 : rc = cloudsync_vtab_register_changes (db, data);
3510 86 : if (rc != SQLITE_OK) return rc;
3511 :
3512 : // load config, if exists
3513 86 : if (cloudsync_config_exists(db)) {
3514 1 : cloudsync_context_init(db, ctx, NULL);
3515 :
3516 : // make sure to update internal version to current version
3517 1 : dbutils_settings_set_key_value(db, NULL, CLOUDSYNC_KEY_LIBVERSION, CLOUDSYNC_VERSION);
3518 1 : }
3519 :
3520 86 : return SQLITE_OK;
3521 88 : }
3522 :
3523 88 : APIEXPORT int sqlite3_cloudsync_init (sqlite3 *db, char **pzErrMsg, const sqlite3_api_routines *pApi) {
3524 : DEBUG_FUNCTION("sqlite3_cloudsync_init");
3525 :
3526 : #ifndef SQLITE_CORE
3527 : SQLITE_EXTENSION_INIT2(pApi);
3528 : #endif
3529 :
3530 88 : return cloudsync_register(db, pzErrMsg);
3531 : }
3532 :
|