Line data Source code
1 : //
2 : // cloudsync.c
3 : // cloudsync
4 : //
5 : // Created by Marco Bambini on 16/05/24.
6 : //
7 :
8 : #include <inttypes.h>
9 : #include <stdbool.h>
10 : #include <limits.h>
11 : #include <stdint.h>
12 : #include <stdlib.h>
13 : #include <string.h>
14 : #include <assert.h>
15 : #include <stdio.h>
16 : #include <errno.h>
17 : #include <math.h>
18 :
19 : #include "cloudsync.h"
20 : #include "cloudsync_private.h"
21 : #include "lz4.h"
22 : #include "pk.h"
23 : #include "vtab.h"
24 : #include "utils.h"
25 : #include "dbutils.h"
26 :
27 : #ifndef CLOUDSYNC_OMIT_NETWORK
28 : #include "network.h"
29 : #endif
30 :
31 : #ifdef _WIN32
32 : #include <winsock2.h>
33 : #include <ws2tcpip.h>
34 : #else
35 : #include <arpa/inet.h> // for htonl, htons, ntohl, ntohs
36 : #include <netinet/in.h> // for struct sockaddr_in, INADDR_ANY, etc. (if needed)
37 : #endif
38 :
39 : #ifndef htonll
40 : #if __BIG_ENDIAN__
41 : #define htonll(x) (x)
42 : #define ntohll(x) (x)
43 : #else
44 : #ifndef htobe64
45 : #define htonll(x) ((uint64_t)htonl((x) & 0xFFFFFFFF) << 32 | (uint64_t)htonl((x) >> 32))
46 : #define ntohll(x) ((uint64_t)ntohl((x) & 0xFFFFFFFF) << 32 | (uint64_t)ntohl((x) >> 32))
47 : #else
48 : #define htonll(x) htobe64(x)
49 : #define ntohll(x) be64toh(x)
50 : #endif
51 : #endif
52 : #endif
53 :
54 : #ifndef SQLITE_CORE
55 : SQLITE_EXTENSION_INIT1
56 : #endif
57 :
58 : #ifndef UNUSED_PARAMETER
59 : #define UNUSED_PARAMETER(X) (void)(X)
60 : #endif
61 :
62 : #ifdef _WIN32
63 : #define APIEXPORT __declspec(dllexport)
64 : #else
65 : #define APIEXPORT
66 : #endif
67 :
68 : #define CLOUDSYNC_DEFAULT_ALGO "cls"
69 : #define CLOUDSYNC_INIT_NTABLES 128
70 : #define CLOUDSYNC_VALUE_NOTSET -1
71 : #define CLOUDSYNC_MIN_DB_VERSION 0
72 :
73 : #define CLOUDSYNC_PAYLOAD_MINBUF_SIZE 512*1024
74 : #define CLOUDSYNC_PAYLOAD_VERSION 1
75 : #define CLOUDSYNC_PAYLOAD_SIGNATURE 'CLSY'
76 : #define CLOUDSYNC_PAYLOAD_APPLY_CALLBACK_KEY "cloudsync_payload_apply_callback"
77 :
78 : #ifndef MAX
79 : #define MAX(a, b) (((a)>(b))?(a):(b))
80 : #endif
81 :
82 : #define DEBUG_SQLITE_ERROR(_rc, _fn, _db) do {if (_rc != SQLITE_OK) printf("Error in %s: %s\n", _fn, sqlite3_errmsg(_db));} while (0)
83 :
84 : typedef enum {
85 : CLOUDSYNC_PK_INDEX_TBL = 0,
86 : CLOUDSYNC_PK_INDEX_PK = 1,
87 : CLOUDSYNC_PK_INDEX_COLNAME = 2,
88 : CLOUDSYNC_PK_INDEX_COLVALUE = 3,
89 : CLOUDSYNC_PK_INDEX_COLVERSION = 4,
90 : CLOUDSYNC_PK_INDEX_DBVERSION = 5,
91 : CLOUDSYNC_PK_INDEX_SITEID = 6,
92 : CLOUDSYNC_PK_INDEX_CL = 7,
93 : CLOUDSYNC_PK_INDEX_SEQ = 8
94 : } CLOUDSYNC_PK_INDEX;
95 :
96 : typedef enum {
97 : CLOUDSYNC_STMT_VALUE_ERROR = -1,
98 : CLOUDSYNC_STMT_VALUE_UNCHANGED = 0,
99 : CLOUDSYNC_STMT_VALUE_CHANGED = 1,
100 : } CLOUDSYNC_STMT_VALUE;
101 :
102 : typedef struct {
103 : sqlite3_context *context;
104 : int index;
105 : } cloudsync_pk_decode_context;
106 :
107 : #define SYNCBIT_SET(_data) _data->insync = 1
108 : #define SYNCBIT_RESET(_data) _data->insync = 0
109 : #define BUMP_SEQ(_data) ((_data)->seq += 1, (_data)->seq - 1)
110 :
111 : // MARK: -
112 :
113 : typedef struct {
114 : table_algo algo; // CRDT algoritm associated to the table
115 : char *name; // table name
116 : char **col_name; // array of column names
117 : sqlite3_stmt **col_merge_stmt; // array of merge insert stmt (indexed by col_name)
118 : sqlite3_stmt **col_value_stmt; // array of column value stmt (indexed by col_name)
119 : int *col_id; // array of column id
120 : int ncols; // number of non primary key cols
121 : int npks; // number of primary key cols
122 : bool enabled; // flag to check if a table is enabled or disabled
123 : #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
124 : bool rowid_only; // a table with no primary keys other than the implicit rowid
125 : #endif
126 :
127 : char **pk_name; // array of primary key names
128 :
129 : // precompiled statements
130 : sqlite3_stmt *meta_pkexists_stmt; // check if a primary key already exist in the augmented table
131 : sqlite3_stmt *meta_sentinel_update_stmt; // update a local sentinel row
132 : sqlite3_stmt *meta_sentinel_insert_stmt; // insert a local sentinel row
133 : sqlite3_stmt *meta_row_insert_update_stmt; // insert/update a local row
134 : sqlite3_stmt *meta_row_drop_stmt; // delete rows from meta
135 : sqlite3_stmt *meta_update_move_stmt; // update rows in meta when pk changes
136 : sqlite3_stmt *meta_local_cl_stmt; // compute local cl value
137 : sqlite3_stmt *meta_winner_clock_stmt; // get the rowid of the last inserted/updated row in the meta table
138 : sqlite3_stmt *meta_merge_delete_drop;
139 : sqlite3_stmt *meta_zero_clock_stmt;
140 : sqlite3_stmt *meta_col_version_stmt;
141 : sqlite3_stmt *meta_site_id_stmt;
142 :
143 : sqlite3_stmt *real_col_values_stmt; // retrieve all column values based on pk
144 : sqlite3_stmt *real_merge_delete_stmt;
145 : sqlite3_stmt *real_merge_sentinel_stmt;
146 :
147 : } cloudsync_table_context;
148 :
149 : struct cloudsync_pk_decode_bind_context {
150 : sqlite3_stmt *vm;
151 : char *tbl;
152 : int64_t tbl_len;
153 : const void *pk;
154 : int64_t pk_len;
155 : char *col_name;
156 : int64_t col_name_len;
157 : int64_t col_version;
158 : int64_t db_version;
159 : const void *site_id;
160 : int64_t site_id_len;
161 : int64_t cl;
162 : int64_t seq;
163 : };
164 :
165 : struct cloudsync_context {
166 : sqlite3_context *sqlite_ctx;
167 :
168 : char *libversion;
169 : uint8_t site_id[UUID_LEN];
170 : int insync;
171 : int debug;
172 : bool merge_equal_values;
173 : bool temp_bool; // temporary value used in callback
174 : void *aux_data;
175 :
176 : // stmts and context values
177 : bool pragma_checked; // we need to check PRAGMAs only once per transaction
178 : sqlite3_stmt *schema_version_stmt;
179 : sqlite3_stmt *data_version_stmt;
180 : sqlite3_stmt *db_version_stmt;
181 : sqlite3_stmt *getset_siteid_stmt;
182 : int data_version;
183 : int schema_version;
184 : uint64_t schema_hash;
185 :
186 : // set at the start of each transaction on the first invocation and
187 : // re-set on transaction commit or rollback
188 : sqlite3_int64 db_version;
189 : // the version that the db will be set to at the end of the transaction
190 : // if that transaction were to commit at the time this value is checked
191 : sqlite3_int64 pending_db_version;
192 : // used to set an order inside each transaction
193 : int seq;
194 :
195 : // augmented tables are stored in-memory so we do not need to retrieve information about col names and cid
196 : // from the disk each time a write statement is performed
197 : // we do also not need to use an hash map here because for few tables the direct in-memory comparison with table name is faster
198 : cloudsync_table_context **tables;
199 : int tables_count;
200 : int tables_alloc;
201 : };
202 :
203 : typedef struct {
204 : char *buffer;
205 : size_t balloc;
206 : size_t bused;
207 : uint64_t nrows;
208 : uint16_t ncols;
209 : } cloudsync_data_payload;
210 :
211 : #ifdef _MSC_VER
212 : #pragma pack(push, 1) // For MSVC: pack struct with 1-byte alignment
213 : #define PACKED
214 : #else
215 : #define PACKED __attribute__((__packed__))
216 : #endif
217 :
218 : typedef struct PACKED {
219 : uint32_t signature; // 'CLSY'
220 : uint8_t version; // protocol version
221 : uint8_t libversion[3]; // major.minor.patch
222 : uint32_t expanded_size;
223 : uint16_t ncols;
224 : uint32_t nrows;
225 : uint64_t schema_hash;
226 : uint8_t unused[6]; // padding to ensure the struct is exactly 32 bytes
227 : } cloudsync_payload_header;
228 :
229 : typedef struct {
230 : sqlite3_value *table_name;
231 : sqlite3_value **new_values;
232 : sqlite3_value **old_values;
233 : int count;
234 : int capacity;
235 : } cloudsync_update_payload;
236 :
237 : #ifdef _MSC_VER
238 : #pragma pack(pop)
239 : #endif
240 :
241 : #if CLOUDSYNC_UNITTEST
242 : bool force_uncompressed_blob = false;
243 : #define CHECK_FORCE_UNCOMPRESSED_BUFFER() if (force_uncompressed_blob) use_uncompressed_buffer = true
244 : #else
245 : #define CHECK_FORCE_UNCOMPRESSED_BUFFER()
246 : #endif
247 :
248 : int db_version_rebuild_stmt (sqlite3 *db, cloudsync_context *data);
249 : int cloudsync_load_siteid (sqlite3 *db, cloudsync_context *data);
250 : int local_mark_insert_or_update_meta (sqlite3 *db, cloudsync_table_context *table, const char *pk, size_t pklen, const char *col_name, sqlite3_int64 db_version, int seq);
251 :
252 : // MARK: - STMT Utils -
253 :
254 30473 : CLOUDSYNC_STMT_VALUE stmt_execute (sqlite3_stmt *stmt, cloudsync_context *data) {
255 30473 : int rc = sqlite3_step(stmt);
256 30473 : if (rc != SQLITE_ROW && rc != SQLITE_DONE) {
257 2 : if (data) DEBUG_SQLITE_ERROR(rc, "stmt_execute", sqlite3_db_handle(stmt));
258 2 : sqlite3_reset(stmt);
259 2 : return CLOUDSYNC_STMT_VALUE_ERROR;
260 : }
261 :
262 30471 : CLOUDSYNC_STMT_VALUE result = CLOUDSYNC_STMT_VALUE_CHANGED;
263 30471 : if (stmt == data->data_version_stmt) {
264 28653 : int version = sqlite3_column_int(stmt, 0);
265 28653 : if (version != data->data_version) {
266 108 : data->data_version = version;
267 108 : } else {
268 28545 : result = CLOUDSYNC_STMT_VALUE_UNCHANGED;
269 : }
270 30471 : } else if (stmt == data->schema_version_stmt) {
271 909 : int version = sqlite3_column_int(stmt, 0);
272 909 : if (version > data->schema_version) {
273 166 : data->schema_version = version;
274 166 : } else {
275 743 : result = CLOUDSYNC_STMT_VALUE_UNCHANGED;
276 : }
277 :
278 1818 : } else if (stmt == data->db_version_stmt) {
279 909 : data->db_version = (rc == SQLITE_DONE) ? CLOUDSYNC_MIN_DB_VERSION : sqlite3_column_int64(stmt, 0);
280 909 : }
281 :
282 30471 : sqlite3_reset(stmt);
283 30471 : return result;
284 30473 : }
285 :
286 3469 : int stmt_count (sqlite3_stmt *stmt, const char *value, size_t len, int type) {
287 3469 : int result = -1;
288 3469 : int rc = SQLITE_OK;
289 :
290 3469 : if (value) {
291 3468 : rc = (type == SQLITE_TEXT) ? sqlite3_bind_text(stmt, 1, value, (int)len, SQLITE_STATIC) : sqlite3_bind_blob(stmt, 1, value, (int)len, SQLITE_STATIC);
292 3468 : if (rc != SQLITE_OK) goto cleanup;
293 3468 : }
294 :
295 3469 : rc = sqlite3_step(stmt);
296 6938 : if (rc == SQLITE_DONE) {
297 1 : result = 0;
298 1 : rc = SQLITE_OK;
299 3469 : } else if (rc == SQLITE_ROW) {
300 3468 : result = sqlite3_column_int(stmt, 0);
301 3468 : rc = SQLITE_OK;
302 3468 : }
303 :
304 : cleanup:
305 3469 : DEBUG_SQLITE_ERROR(rc, "stmt_count", sqlite3_db_handle(stmt));
306 3469 : sqlite3_reset(stmt);
307 3469 : return result;
308 : }
309 :
310 236083 : sqlite3_stmt *stmt_reset (sqlite3_stmt *stmt) {
311 236083 : sqlite3_clear_bindings(stmt);
312 236083 : sqlite3_reset(stmt);
313 236083 : return NULL;
314 : }
315 :
316 279 : int stmts_add_tocontext (sqlite3 *db, cloudsync_context *data) {
317 : DEBUG_DBFUNCTION("cloudsync_add_stmts");
318 :
319 279 : if (data->data_version_stmt == NULL) {
320 109 : const char *sql = "PRAGMA data_version;";
321 109 : int rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &data->data_version_stmt, NULL);
322 : DEBUG_STMT("data_version_stmt %p", data->data_version_stmt);
323 109 : if (rc != SQLITE_OK) return rc;
324 : DEBUG_SQL("data_version_stmt: %s", sql);
325 109 : }
326 :
327 279 : if (data->schema_version_stmt == NULL) {
328 109 : const char *sql = "PRAGMA schema_version;";
329 109 : int rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &data->schema_version_stmt, NULL);
330 : DEBUG_STMT("schema_version_stmt %p", data->schema_version_stmt);
331 109 : if (rc != SQLITE_OK) return rc;
332 : DEBUG_SQL("schema_version_stmt: %s", sql);
333 109 : }
334 :
335 279 : if (data->getset_siteid_stmt == NULL) {
336 : // get and set index of the site_id
337 : // in SQLite, we can’t directly combine an INSERT and a SELECT to both insert a row and return an identifier (rowid) in a single statement,
338 : // however, we can use a workaround by leveraging the INSERT statement with ON CONFLICT DO UPDATE and then combining it with RETURNING rowid
339 109 : const char *sql = "INSERT INTO cloudsync_site_id (site_id) VALUES (?) ON CONFLICT(site_id) DO UPDATE SET site_id = site_id RETURNING rowid;";
340 109 : int rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &data->getset_siteid_stmt, NULL);
341 : DEBUG_STMT("getset_siteid_stmt %p", data->getset_siteid_stmt);
342 109 : if (rc != SQLITE_OK) return rc;
343 : DEBUG_SQL("getset_siteid_stmt: %s", sql);
344 109 : }
345 :
346 279 : return db_version_rebuild_stmt(db, data);
347 279 : }
348 :
349 : // MARK: - Database Version -
350 :
351 335 : char *db_version_build_query (sqlite3 *db) {
352 : // this function must be manually called each time tables changes
353 : // because the query plan changes too and it must be re-prepared
354 : // unfortunately there is no other way
355 :
356 : // we need to execute a query like:
357 : /*
358 : SELECT max(version) as version FROM (
359 : SELECT max(db_version) as version FROM "table1_cloudsync"
360 : UNION ALL
361 : SELECT max(db_version) as version FROM "table2_cloudsync"
362 : UNION ALL
363 : SELECT max(db_version) as version FROM "table3_cloudsync"
364 : UNION
365 : SELECT value as version FROM cloudsync_settings WHERE key = 'pre_alter_dbversion'
366 : )
367 : */
368 :
369 : // the good news is that the query can be computed in SQLite without the need to do any extra computation from the host language
370 335 : const char *sql = "WITH table_names AS ("
371 : "SELECT format('%w', name) as tbl_name "
372 : "FROM sqlite_master "
373 : "WHERE type='table' "
374 : "AND name LIKE '%_cloudsync'"
375 : "), "
376 : "query_parts AS ("
377 : "SELECT 'SELECT max(db_version) as version FROM \"' || tbl_name || '\"' as part FROM table_names"
378 : "), "
379 : "combined_query AS ("
380 : "SELECT GROUP_CONCAT(part, ' UNION ALL ') || ' UNION SELECT value as version FROM cloudsync_settings WHERE key = ''pre_alter_dbversion''' as full_query FROM query_parts"
381 : ") "
382 : "SELECT 'SELECT max(version) as version FROM (' || full_query || ');' FROM combined_query;";
383 335 : return dbutils_text_select(db, sql);
384 : }
385 :
386 445 : int db_version_rebuild_stmt (sqlite3 *db, cloudsync_context *data) {
387 445 : if (data->db_version_stmt) {
388 226 : sqlite3_finalize(data->db_version_stmt);
389 226 : data->db_version_stmt = NULL;
390 226 : }
391 :
392 445 : sqlite3_int64 count = dbutils_table_settings_count_tables(db);
393 445 : if (count == 0) return SQLITE_OK;
394 335 : else if (count == -1) {
395 0 : dbutils_context_result_error(data->sqlite_ctx, "%s", sqlite3_errmsg(db));
396 0 : return SQLITE_ERROR;
397 : }
398 :
399 335 : char *sql = db_version_build_query(db);
400 335 : if (!sql) return SQLITE_NOMEM;
401 : DEBUG_SQL("db_version_stmt: %s", sql);
402 :
403 335 : int rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &data->db_version_stmt, NULL);
404 : DEBUG_STMT("db_version_stmt %p", data->db_version_stmt);
405 335 : cloudsync_memory_free(sql);
406 335 : return rc;
407 445 : }
408 :
409 909 : int db_version_rerun (sqlite3 *db, cloudsync_context *data) {
410 909 : CLOUDSYNC_STMT_VALUE schema_changed = stmt_execute(data->schema_version_stmt, data);
411 909 : if (schema_changed == CLOUDSYNC_STMT_VALUE_ERROR) return -1;
412 :
413 909 : if (schema_changed == CLOUDSYNC_STMT_VALUE_CHANGED) {
414 166 : int rc = db_version_rebuild_stmt(db, data);
415 166 : if (rc != SQLITE_OK) return -1;
416 166 : }
417 :
418 909 : CLOUDSYNC_STMT_VALUE rc = stmt_execute(data->db_version_stmt, data);
419 909 : if (rc == CLOUDSYNC_STMT_VALUE_ERROR) return -1;
420 909 : return 0;
421 909 : }
422 :
423 28653 : int db_version_check_uptodate (sqlite3 *db, cloudsync_context *data) {
424 : // perform a PRAGMA data_version to check if some other process write any data
425 28653 : CLOUDSYNC_STMT_VALUE rc = stmt_execute(data->data_version_stmt, data);
426 28653 : if (rc == CLOUDSYNC_STMT_VALUE_ERROR) return -1;
427 :
428 : // db_version is already set and there is no need to update it
429 28653 : if (data->db_version != CLOUDSYNC_VALUE_NOTSET && rc == CLOUDSYNC_STMT_VALUE_UNCHANGED) return 0;
430 :
431 909 : return db_version_rerun(db, data);
432 28653 : }
433 :
434 28627 : sqlite3_int64 db_version_next (sqlite3 *db, cloudsync_context *data, sqlite3_int64 merging_version) {
435 28627 : int rc = db_version_check_uptodate(db, data);
436 28627 : if (rc != SQLITE_OK) return -1;
437 :
438 28627 : sqlite3_int64 result = data->db_version + 1;
439 28627 : if (result < data->pending_db_version) result = data->pending_db_version;
440 28627 : if (merging_version != CLOUDSYNC_VALUE_NOTSET && result < merging_version) result = merging_version;
441 28627 : data->pending_db_version = result;
442 :
443 28627 : return result;
444 28627 : }
445 :
446 : // MARK: -
447 :
448 0 : void *cloudsync_get_auxdata (sqlite3_context *context) {
449 0 : cloudsync_context *data = (context) ? (cloudsync_context *)sqlite3_user_data(context) : NULL;
450 0 : return (data) ? data->aux_data : NULL;
451 : }
452 :
453 0 : void cloudsync_set_auxdata (sqlite3_context *context, void *xdata) {
454 0 : cloudsync_context *data = (context) ? (cloudsync_context *)sqlite3_user_data(context) : NULL;
455 0 : if (data) data->aux_data = xdata;
456 0 : }
457 :
458 : // MARK: - PK Context -
459 :
460 16416 : char *cloudsync_pk_context_tbl (cloudsync_pk_decode_bind_context *ctx, int64_t *tbl_len) {
461 16416 : *tbl_len = ctx->tbl_len;
462 16416 : return ctx->tbl;
463 : }
464 :
465 16416 : void *cloudsync_pk_context_pk (cloudsync_pk_decode_bind_context *ctx, int64_t *pk_len) {
466 16416 : *pk_len = ctx->pk_len;
467 16416 : return (void *)ctx->pk;
468 : }
469 :
470 16416 : char *cloudsync_pk_context_colname (cloudsync_pk_decode_bind_context *ctx, int64_t *colname_len) {
471 16416 : *colname_len = ctx->col_name_len;
472 16416 : return ctx->col_name;
473 : }
474 :
475 16416 : int64_t cloudsync_pk_context_cl (cloudsync_pk_decode_bind_context *ctx) {
476 16416 : return ctx->cl;
477 : }
478 :
479 16416 : int64_t cloudsync_pk_context_dbversion (cloudsync_pk_decode_bind_context *ctx) {
480 16416 : return ctx->db_version;
481 : }
482 :
483 : // MARK: - Table Utils -
484 :
485 128 : char *table_build_values_sql (sqlite3 *db, cloudsync_table_context *table) {
486 128 : char *sql = NULL;
487 :
488 : /*
489 : This SQL statement dynamically generates a SELECT query for a specified table.
490 : It uses Common Table Expressions (CTEs) to construct the column names and
491 : primary key conditions based on the table schema, which is obtained through
492 : the `pragma_table_info` function.
493 :
494 : 1. `col_names` CTE:
495 : - Retrieves a comma-separated list of non-primary key column names from
496 : the specified table's schema.
497 :
498 : 2. `pk_where` CTE:
499 : - Retrieves a condition string representing the primary key columns in the
500 : format: "column1=? AND column2=? AND ...", used to create the WHERE clause
501 : for selecting rows based on primary key values.
502 :
503 : 3. Final SELECT:
504 : - Constructs the complete SELECT statement as a string, combining:
505 : - Column names from `col_names`.
506 : - The target table name.
507 : - The WHERE clause conditions from `pk_where`.
508 :
509 : The resulting query can be used to select rows from the table based on primary
510 : key values, and can be executed within the application to retrieve data dynamically.
511 : */
512 :
513 : // Unfortunately in SQLite column names (or table names) cannot be bound parameters in a SELECT statement
514 : // otherwise we should have used something like SELECT 'SELECT ? FROM %w WHERE rowid=?';
515 :
516 128 : char *singlequote_escaped_table_name = cloudsync_memory_mprintf("%q", table->name);
517 :
518 : #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
519 : if (table->rowid_only) {
520 : sql = memory_mprintf("WITH col_names AS (SELECT group_concat('\"' || format('%%w', name) || '\"', ',') AS cols FROM pragma_table_info('%q') WHERE pk=0 ORDER BY cid) SELECT 'SELECT ' || (SELECT cols FROM col_names) || ' FROM \"%w\" WHERE rowid=?;'", table->name, table->name);
521 : goto process_process;
522 : }
523 : #endif
524 :
525 128 : sql = cloudsync_memory_mprintf("WITH col_names AS (SELECT group_concat('\"' || format('%%w', name) || '\"', ',') AS cols FROM pragma_table_info('%q') WHERE pk=0 ORDER BY cid), pk_where AS (SELECT group_concat('\"' || format('%%w', name) || '\"', '=? AND ') || '=?' AS pk_clause FROM pragma_table_info('%q') WHERE pk>0 ORDER BY pk) SELECT 'SELECT ' || (SELECT cols FROM col_names) || ' FROM \"%w\" WHERE ' || (SELECT pk_clause FROM pk_where) || ';'", table->name, table->name, singlequote_escaped_table_name);
526 :
527 : #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
528 : process_process:
529 : #endif
530 128 : cloudsync_memory_free(singlequote_escaped_table_name);
531 128 : if (!sql) return NULL;
532 128 : char *query = dbutils_text_select(db, sql);
533 128 : cloudsync_memory_free(sql);
534 :
535 128 : return query;
536 128 : }
537 :
538 163 : char *table_build_mergedelete_sql (sqlite3 *db, cloudsync_table_context *table) {
539 : #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
540 : if (table->rowid_only) {
541 : char *sql = memory_mprintf("DELETE FROM \"%w\" WHERE rowid=?;", table->name);
542 : return sql;
543 : }
544 : #endif
545 :
546 163 : char *singlequote_escaped_table_name = cloudsync_memory_mprintf("%q", table->name);
547 163 : char *sql = cloudsync_memory_mprintf("WITH pk_where AS (SELECT group_concat('\"' || format('%%w', name) || '\"', '=? AND ') || '=?' AS pk_clause FROM pragma_table_info('%q') WHERE pk>0 ORDER BY pk) SELECT 'DELETE FROM \"%w\" WHERE ' || (SELECT pk_clause FROM pk_where) || ';'", table->name, singlequote_escaped_table_name);
548 163 : cloudsync_memory_free(singlequote_escaped_table_name);
549 163 : if (!sql) return NULL;
550 :
551 163 : char *query = dbutils_text_select(db, sql);
552 163 : cloudsync_memory_free(sql);
553 :
554 163 : return query;
555 163 : }
556 :
557 1100 : char *table_build_mergeinsert_sql (sqlite3 *db, cloudsync_table_context *table, const char *colname) {
558 1100 : char *sql = NULL;
559 :
560 : #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
561 : if (table->rowid_only) {
562 : if (colname == NULL) {
563 : // INSERT OR IGNORE INTO customers (first_name,last_name) VALUES (?,?);
564 : sql = memory_mprintf("INSERT OR IGNORE INTO \"%w\" (rowid) VALUES (?);", table->name);
565 : } else {
566 : // INSERT INTO customers (first_name,last_name,age) VALUES (?,?,?) ON CONFLICT DO UPDATE SET age=?;
567 : sql = memory_mprintf("INSERT INTO \"%w\" (rowid, \"%w\") VALUES (?, ?) ON CONFLICT DO UPDATE SET \"%w\"=?;", table->name, colname, colname);
568 : }
569 : return sql;
570 : }
571 : #endif
572 :
573 1100 : char *singlequote_escaped_table_name = cloudsync_memory_mprintf("%q", table->name);
574 :
575 1100 : if (colname == NULL) {
576 : // is sentinel insert
577 163 : sql = cloudsync_memory_mprintf("WITH pk_where AS (SELECT group_concat('\"' || format('%%w', name) || '\"') AS pk_clause FROM pragma_table_info('%q') WHERE pk>0 ORDER BY pk), pk_bind AS (SELECT group_concat('?') AS pk_binding FROM pragma_table_info('%q') WHERE pk>0 ORDER BY pk) SELECT 'INSERT OR IGNORE INTO \"%w\" (' || (SELECT pk_clause FROM pk_where) || ') VALUES (' || (SELECT pk_binding FROM pk_bind) || ');'", table->name, table->name, singlequote_escaped_table_name);
578 163 : } else {
579 937 : char *singlequote_escaped_col_name = cloudsync_memory_mprintf("%q", colname);
580 937 : sql = cloudsync_memory_mprintf("WITH pk_where AS (SELECT group_concat('\"' || format('%%w', name) || '\"') AS pk_clause FROM pragma_table_info('%q') WHERE pk>0 ORDER BY pk), pk_bind AS (SELECT group_concat('?') AS pk_binding FROM pragma_table_info('%q') WHERE pk>0 ORDER BY pk) SELECT 'INSERT INTO \"%w\" (' || (SELECT pk_clause FROM pk_where) || ',\"%w\") VALUES (' || (SELECT pk_binding FROM pk_bind) || ',?) ON CONFLICT DO UPDATE SET \"%w\"=?;'", table->name, table->name, singlequote_escaped_table_name, singlequote_escaped_col_name, singlequote_escaped_col_name);
581 937 : cloudsync_memory_free(singlequote_escaped_col_name);
582 :
583 : }
584 1100 : cloudsync_memory_free(singlequote_escaped_table_name);
585 1100 : if (!sql) return NULL;
586 :
587 1100 : char *query = dbutils_text_select(db, sql);
588 1100 : cloudsync_memory_free(sql);
589 :
590 1100 : return query;
591 1100 : }
592 :
593 937 : char *table_build_value_sql (sqlite3 *db, cloudsync_table_context *table, const char *colname) {
594 937 : char *colnamequote = dbutils_is_star_table(colname) ? "" : "\"";
595 :
596 : #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
597 : if (table->rowid_only) {
598 : char *sql = memory_mprintf("SELECT %s%w%s FROM \"%w\" WHERE rowid=?;", colnamequote, colname, colnamequote, table->name);
599 : return sql;
600 : }
601 : #endif
602 :
603 : // SELECT age FROM customers WHERE first_name=? AND last_name=?;
604 937 : char *singlequote_escaped_table_name = cloudsync_memory_mprintf("%q", table->name);
605 937 : char *singlequote_escaped_col_name = cloudsync_memory_mprintf("%q", colname);
606 937 : char *sql = cloudsync_memory_mprintf("WITH pk_where AS (SELECT group_concat('\"' || format('%%w', name) || '\"', '=? AND ') || '=?' AS pk_clause FROM pragma_table_info('%q') WHERE pk>0 ORDER BY pk) SELECT 'SELECT %s%w%s FROM \"%w\" WHERE ' || (SELECT pk_clause FROM pk_where) || ';'", table->name, colnamequote, singlequote_escaped_col_name, colnamequote, singlequote_escaped_table_name);
607 937 : cloudsync_memory_free(singlequote_escaped_col_name);
608 937 : cloudsync_memory_free(singlequote_escaped_table_name);
609 937 : if (!sql) return NULL;
610 :
611 937 : char *query = dbutils_text_select(db, sql);
612 937 : cloudsync_memory_free(sql);
613 :
614 937 : return query;
615 937 : }
616 :
617 163 : cloudsync_table_context *table_create (const char *name, table_algo algo) {
618 : DEBUG_DBFUNCTION("table_create %s", name);
619 :
620 163 : cloudsync_table_context *table = (cloudsync_table_context *)cloudsync_memory_zeroalloc(sizeof(cloudsync_table_context));
621 163 : if (!table) return NULL;
622 :
623 163 : table->algo = algo;
624 163 : table->name = cloudsync_string_dup(name, true);
625 163 : if (!table->name) {
626 0 : cloudsync_memory_free(table);
627 0 : return NULL;
628 : }
629 163 : table->enabled = true;
630 :
631 163 : return table;
632 163 : }
633 :
634 163 : void table_free (cloudsync_table_context *table) {
635 : DEBUG_DBFUNCTION("table_free %s", (table) ? (table->name) : "NULL");
636 163 : if (!table) return;
637 :
638 163 : if (table->ncols > 0) {
639 128 : if (table->col_name) {
640 1065 : for (int i=0; i<table->ncols; ++i) {
641 937 : cloudsync_memory_free(table->col_name[i]);
642 937 : }
643 128 : cloudsync_memory_free(table->col_name);
644 128 : }
645 128 : if (table->col_merge_stmt) {
646 1065 : for (int i=0; i<table->ncols; ++i) {
647 937 : sqlite3_finalize(table->col_merge_stmt[i]);
648 937 : }
649 128 : cloudsync_memory_free(table->col_merge_stmt);
650 128 : }
651 128 : if (table->col_value_stmt) {
652 1065 : for (int i=0; i<table->ncols; ++i) {
653 937 : sqlite3_finalize(table->col_value_stmt[i]);
654 937 : }
655 128 : cloudsync_memory_free(table->col_value_stmt);
656 128 : }
657 128 : if (table->col_id) {
658 128 : cloudsync_memory_free(table->col_id);
659 128 : }
660 128 : }
661 :
662 163 : if (table->pk_name) sqlite3_free_table(table->pk_name);
663 163 : if (table->name) cloudsync_memory_free(table->name);
664 163 : if (table->meta_pkexists_stmt) sqlite3_finalize(table->meta_pkexists_stmt);
665 163 : if (table->meta_sentinel_update_stmt) sqlite3_finalize(table->meta_sentinel_update_stmt);
666 163 : if (table->meta_sentinel_insert_stmt) sqlite3_finalize(table->meta_sentinel_insert_stmt);
667 163 : if (table->meta_row_insert_update_stmt) sqlite3_finalize(table->meta_row_insert_update_stmt);
668 163 : if (table->meta_row_drop_stmt) sqlite3_finalize(table->meta_row_drop_stmt);
669 163 : if (table->meta_update_move_stmt) sqlite3_finalize(table->meta_update_move_stmt);
670 163 : if (table->meta_local_cl_stmt) sqlite3_finalize(table->meta_local_cl_stmt);
671 163 : if (table->meta_winner_clock_stmt) sqlite3_finalize(table->meta_winner_clock_stmt);
672 163 : if (table->meta_merge_delete_drop) sqlite3_finalize(table->meta_merge_delete_drop);
673 163 : if (table->meta_zero_clock_stmt) sqlite3_finalize(table->meta_zero_clock_stmt);
674 163 : if (table->meta_col_version_stmt) sqlite3_finalize(table->meta_col_version_stmt);
675 163 : if (table->meta_site_id_stmt) sqlite3_finalize(table->meta_site_id_stmt);
676 :
677 163 : if (table->real_col_values_stmt) sqlite3_finalize(table->real_col_values_stmt);
678 163 : if (table->real_merge_delete_stmt) sqlite3_finalize(table->real_merge_delete_stmt);
679 163 : if (table->real_merge_sentinel_stmt) sqlite3_finalize(table->real_merge_sentinel_stmt);
680 :
681 163 : cloudsync_memory_free(table);
682 163 : }
683 :
684 163 : int table_add_stmts (sqlite3 *db, cloudsync_table_context *table, int ncols) {
685 163 : int rc = SQLITE_OK;
686 163 : char *sql = NULL;
687 :
688 : // META TABLE statements
689 :
690 : // CREATE TABLE IF NOT EXISTS \"%w_cloudsync\" (pk BLOB NOT NULL, col_name TEXT NOT NULL, col_version INTEGER, db_version INTEGER, site_id INTEGER DEFAULT 0, seq INTEGER, PRIMARY KEY (pk, col_name));
691 :
692 : // precompile the pk exists statement
693 : // we do not need an index on the pk column because it is already covered by the fact that it is part of the prikeys
694 : // EXPLAIN QUERY PLAN reports: SEARCH table_name USING PRIMARY KEY (pk=?)
695 163 : sql = cloudsync_memory_mprintf("SELECT EXISTS(SELECT 1 FROM \"%w_cloudsync\" WHERE pk = ? LIMIT 1);", table->name);
696 163 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
697 : DEBUG_SQL("meta_pkexists_stmt: %s", sql);
698 :
699 163 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->meta_pkexists_stmt, NULL);
700 :
701 163 : cloudsync_memory_free(sql);
702 163 : if (rc != SQLITE_OK) goto cleanup;
703 :
704 : // precompile the update local sentinel statement
705 163 : sql = cloudsync_memory_mprintf("UPDATE \"%w_cloudsync\" SET col_version = CASE col_version %% 2 WHEN 0 THEN col_version + 1 ELSE col_version + 2 END, db_version = ?, seq = ?, site_id = 0 WHERE pk = ? AND col_name = '%s';", table->name, CLOUDSYNC_TOMBSTONE_VALUE);
706 163 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
707 : DEBUG_SQL("meta_sentinel_update_stmt: %s", sql);
708 :
709 163 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->meta_sentinel_update_stmt, NULL);
710 163 : cloudsync_memory_free(sql);
711 163 : if (rc != SQLITE_OK) goto cleanup;
712 :
713 : // precompile the insert local sentinel statement
714 163 : sql = cloudsync_memory_mprintf("INSERT INTO \"%w_cloudsync\" (pk, col_name, col_version, db_version, seq, site_id) SELECT ?, '%s', 1, ?, ?, 0 WHERE 1 ON CONFLICT DO UPDATE SET col_version = CASE col_version %% 2 WHEN 0 THEN col_version + 1 ELSE col_version + 2 END, db_version = ?, seq = ?, site_id = 0;", table->name, CLOUDSYNC_TOMBSTONE_VALUE);
715 163 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
716 : DEBUG_SQL("meta_sentinel_insert_stmt: %s", sql);
717 :
718 163 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->meta_sentinel_insert_stmt, NULL);
719 163 : cloudsync_memory_free(sql);
720 163 : if (rc != SQLITE_OK) goto cleanup;
721 :
722 : // precompile the insert/update local row statement
723 163 : sql = cloudsync_memory_mprintf("INSERT INTO \"%w_cloudsync\" (pk, col_name, col_version, db_version, seq, site_id ) SELECT ?, ?, ?, ?, ?, 0 WHERE 1 ON CONFLICT DO UPDATE SET col_version = col_version + 1, db_version = ?, seq = ?, site_id = 0;", table->name);
724 163 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
725 : DEBUG_SQL("meta_row_insert_update_stmt: %s", sql);
726 :
727 163 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->meta_row_insert_update_stmt, NULL);
728 163 : cloudsync_memory_free(sql);
729 163 : if (rc != SQLITE_OK) goto cleanup;
730 :
731 : // precompile the delete rows from meta
732 163 : sql = cloudsync_memory_mprintf("DELETE FROM \"%w_cloudsync\" WHERE pk=? AND col_name!='%s';", table->name, CLOUDSYNC_TOMBSTONE_VALUE);
733 163 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
734 : DEBUG_SQL("meta_row_drop_stmt: %s", sql);
735 :
736 163 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->meta_row_drop_stmt, NULL);
737 163 : cloudsync_memory_free(sql);
738 163 : if (rc != SQLITE_OK) goto cleanup;
739 :
740 : // precompile the update rows from meta when pk changes
741 : // see https://github.com/sqliteai/sqlite-sync/blob/main/docs/PriKey.md for more details
742 163 : sql = cloudsync_memory_mprintf("UPDATE OR REPLACE \"%w_cloudsync\" SET pk=?, db_version=?, col_version=1, seq=cloudsync_seq(), site_id=0 WHERE (pk=? AND col_name!='%s');", table->name, CLOUDSYNC_TOMBSTONE_VALUE);
743 163 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
744 : DEBUG_SQL("meta_update_move_stmt: %s", sql);
745 :
746 163 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->meta_update_move_stmt, NULL);
747 163 : cloudsync_memory_free(sql);
748 163 : if (rc != SQLITE_OK) goto cleanup;
749 :
750 : // local cl
751 163 : sql = cloudsync_memory_mprintf("SELECT COALESCE((SELECT col_version FROM \"%w_cloudsync\" WHERE pk=? AND col_name='%s'), (SELECT 1 FROM \"%w_cloudsync\" WHERE pk=?));", table->name, CLOUDSYNC_TOMBSTONE_VALUE, table->name);
752 163 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
753 : DEBUG_SQL("meta_local_cl_stmt: %s", sql);
754 :
755 163 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->meta_local_cl_stmt, NULL);
756 163 : cloudsync_memory_free(sql);
757 163 : if (rc != SQLITE_OK) goto cleanup;
758 :
759 : // rowid of the last inserted/updated row in the meta table
760 163 : sql = cloudsync_memory_mprintf("INSERT OR REPLACE INTO \"%w_cloudsync\" (pk, col_name, col_version, db_version, seq, site_id) VALUES (?, ?, ?, cloudsync_db_version_next(?), ?, ?) RETURNING ((db_version << 30) | seq);", table->name);
761 163 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
762 : DEBUG_SQL("meta_winner_clock_stmt: %s", sql);
763 :
764 163 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->meta_winner_clock_stmt, NULL);
765 163 : cloudsync_memory_free(sql);
766 163 : if (rc != SQLITE_OK) goto cleanup;
767 :
768 163 : sql = cloudsync_memory_mprintf("DELETE FROM \"%w_cloudsync\" WHERE pk=? AND col_name!='%s';", table->name, CLOUDSYNC_TOMBSTONE_VALUE);
769 163 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
770 : DEBUG_SQL("meta_merge_delete_drop: %s", sql);
771 :
772 163 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->meta_merge_delete_drop, NULL);
773 163 : cloudsync_memory_free(sql);
774 163 : if (rc != SQLITE_OK) goto cleanup;
775 :
776 : // zero clock
777 163 : sql = cloudsync_memory_mprintf("UPDATE \"%w_cloudsync\" SET col_version = 0, db_version = cloudsync_db_version_next(?) WHERE pk=? AND col_name!='%s';", table->name, CLOUDSYNC_TOMBSTONE_VALUE);
778 163 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
779 : DEBUG_SQL("meta_zero_clock_stmt: %s", sql);
780 :
781 163 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->meta_zero_clock_stmt, NULL);
782 163 : cloudsync_memory_free(sql);
783 163 : if (rc != SQLITE_OK) goto cleanup;
784 :
785 : // col_version
786 163 : sql = cloudsync_memory_mprintf("SELECT col_version FROM \"%w_cloudsync\" WHERE pk=? AND col_name=?;", table->name);
787 163 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
788 : DEBUG_SQL("meta_col_version_stmt: %s", sql);
789 :
790 163 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->meta_col_version_stmt, NULL);
791 163 : cloudsync_memory_free(sql);
792 163 : if (rc != SQLITE_OK) goto cleanup;
793 :
794 : // site_id
795 163 : sql = cloudsync_memory_mprintf("SELECT site_id FROM \"%w_cloudsync\" WHERE pk=? AND col_name=?;", table->name);
796 163 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
797 : DEBUG_SQL("meta_site_id_stmt: %s", sql);
798 :
799 163 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->meta_site_id_stmt, NULL);
800 163 : cloudsync_memory_free(sql);
801 163 : if (rc != SQLITE_OK) goto cleanup;
802 :
803 : // REAL TABLE statements
804 :
805 : // precompile the get column value statement
806 163 : if (ncols > 0) {
807 128 : sql = table_build_values_sql(db, table);
808 128 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
809 : DEBUG_SQL("real_col_values_stmt: %s", sql);
810 :
811 128 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->real_col_values_stmt, NULL);
812 128 : cloudsync_memory_free(sql);
813 128 : if (rc != SQLITE_OK) goto cleanup;
814 128 : }
815 :
816 163 : sql = table_build_mergedelete_sql(db, table);
817 163 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
818 : DEBUG_SQL("real_merge_delete: %s", sql);
819 :
820 163 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->real_merge_delete_stmt, NULL);
821 163 : cloudsync_memory_free(sql);
822 163 : if (rc != SQLITE_OK) goto cleanup;
823 :
824 163 : sql = table_build_mergeinsert_sql(db, table, NULL);
825 163 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
826 : DEBUG_SQL("real_merge_sentinel: %s", sql);
827 :
828 163 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->real_merge_sentinel_stmt, NULL);
829 163 : cloudsync_memory_free(sql);
830 163 : if (rc != SQLITE_OK) goto cleanup;
831 :
832 : cleanup:
833 163 : if (rc != SQLITE_OK) printf("table_add_stmts error: %s\n", sqlite3_errmsg(db));
834 163 : return rc;
835 : }
836 :
837 188856 : cloudsync_table_context *table_lookup (cloudsync_context *data, const char *table_name) {
838 : DEBUG_DBFUNCTION("table_lookup %s", table_name);
839 :
840 189784 : for (int i=0; i<data->tables_count; ++i) {
841 189601 : const char *name = (data->tables[i]) ? data->tables[i]->name : NULL;
842 189601 : if ((name) && (strcasecmp(name, table_name) == 0)) {
843 188673 : return data->tables[i];
844 : }
845 928 : }
846 :
847 183 : return NULL;
848 188856 : }
849 :
850 172676 : sqlite3_stmt *table_column_lookup (cloudsync_table_context *table, const char *col_name, bool is_merge, int *index) {
851 : DEBUG_DBFUNCTION("table_column_lookup %s", col_name);
852 :
853 318644 : for (int i=0; i<table->ncols; ++i) {
854 318644 : if (strcasecmp(table->col_name[i], col_name) == 0) {
855 172676 : if (index) *index = i;
856 172676 : return (is_merge) ? table->col_merge_stmt[i] : table->col_value_stmt[i];
857 : }
858 145968 : }
859 :
860 0 : if (index) *index = -1;
861 0 : return NULL;
862 172676 : }
863 :
864 29 : int table_remove (cloudsync_context *data, const char *table_name) {
865 : DEBUG_DBFUNCTION("table_remove %s", table_name);
866 :
867 52 : for (int i=0; i<data->tables_count; ++i) {
868 52 : const char *name = (data->tables[i]) ? data->tables[i]->name : NULL;
869 52 : if ((name) && (strcasecmp(name, table_name) == 0)) {
870 29 : data->tables[i] = NULL;
871 29 : return i;
872 : }
873 23 : }
874 0 : return -1;
875 29 : }
876 :
877 937 : int table_add_to_context_cb (void *xdata, int ncols, char **values, char **names) {
878 937 : cloudsync_table_context *table = (cloudsync_table_context *)xdata;
879 :
880 937 : sqlite3 *db = sqlite3_db_handle(table->meta_pkexists_stmt);
881 937 : if (!db) return SQLITE_ERROR;
882 :
883 937 : int index = table->ncols;
884 1874 : for (int i=0; i<ncols; i+=2) {
885 937 : const char *name = values[i];
886 937 : int cid = (int)strtol(values[i+1], NULL, 0);
887 :
888 937 : table->col_id[index] = cid;
889 937 : table->col_name[index] = cloudsync_string_dup(name, true);
890 937 : if (!table->col_name[index]) return 1;
891 :
892 937 : char *sql = table_build_mergeinsert_sql(db, table, name);
893 937 : if (!sql) return SQLITE_NOMEM;
894 : DEBUG_SQL("col_merge_stmt[%d]: %s", index, sql);
895 :
896 937 : int rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->col_merge_stmt[index], NULL);
897 937 : cloudsync_memory_free(sql);
898 937 : if (rc != SQLITE_OK) return rc;
899 937 : if (!table->col_merge_stmt[index]) return SQLITE_MISUSE;
900 :
901 937 : sql = table_build_value_sql(db, table, name);
902 937 : if (!sql) return SQLITE_NOMEM;
903 : DEBUG_SQL("col_value_stmt[%d]: %s", index, sql);
904 :
905 937 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->col_value_stmt[index], NULL);
906 937 : cloudsync_memory_free(sql);
907 937 : if (rc != SQLITE_OK) return rc;
908 937 : if (!table->col_value_stmt[index]) return SQLITE_MISUSE;
909 937 : }
910 937 : table->ncols += 1;
911 :
912 937 : return 0;
913 937 : }
914 :
915 169 : bool table_add_to_context (sqlite3 *db, cloudsync_context *data, table_algo algo, const char *table_name) {
916 : DEBUG_DBFUNCTION("cloudsync_context_add_table %s", table_name);
917 :
918 : // check if table is already in the global context and in that case just return
919 169 : cloudsync_table_context *table = table_lookup(data, table_name);
920 169 : if (table) return true;
921 :
922 : // is there any space available?
923 163 : if (data->tables_alloc <= data->tables_count + 1) {
924 : // realloc tables
925 0 : cloudsync_table_context **clone = (cloudsync_table_context **)cloudsync_memory_realloc(data->tables, sizeof(cloudsync_table_context) * data->tables_alloc + CLOUDSYNC_INIT_NTABLES);
926 0 : if (!clone) goto abort_add_table;
927 :
928 : // reset new entries
929 0 : for (int i=data->tables_alloc; i<data->tables_alloc + CLOUDSYNC_INIT_NTABLES; ++i) {
930 0 : clone[i] = NULL;
931 0 : }
932 :
933 : // replace old ptr
934 0 : data->tables = clone;
935 0 : data->tables_alloc += CLOUDSYNC_INIT_NTABLES;
936 0 : }
937 :
938 : // setup a new table context
939 163 : table = table_create(table_name, algo);
940 163 : if (!table) return false;
941 :
942 : // fill remaining metadata in the table
943 163 : char *sql = cloudsync_memory_mprintf("SELECT count(*) FROM pragma_table_info('%q') WHERE pk>0;", table_name);
944 163 : if (!sql) goto abort_add_table;
945 163 : table->npks = (int)dbutils_int_select(db, sql);
946 163 : cloudsync_memory_free(sql);
947 163 : if (table->npks == -1) {
948 0 : dbutils_context_result_error(data->sqlite_ctx, "%s", sqlite3_errmsg(db));
949 0 : goto abort_add_table;
950 : }
951 :
952 163 : if (table->npks == 0) {
953 : #if CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
954 0 : return false;
955 : #else
956 : table->rowid_only = true;
957 : table->npks = 1; // rowid
958 : #endif
959 : }
960 :
961 163 : sql = cloudsync_memory_mprintf("SELECT count(*) FROM pragma_table_info('%q') WHERE pk=0;", table_name);
962 163 : if (!sql) goto abort_add_table;
963 163 : int64_t ncols = (int64_t)dbutils_int_select(db, sql);
964 163 : cloudsync_memory_free(sql);
965 163 : if (ncols == -1) {
966 0 : dbutils_context_result_error(data->sqlite_ctx, "%s", sqlite3_errmsg(db));
967 0 : goto abort_add_table;
968 : }
969 :
970 163 : int rc = table_add_stmts(db, table, (int)ncols);
971 163 : if (rc != SQLITE_OK) goto abort_add_table;
972 :
973 : // a table with only pk(s) is totally legal
974 163 : if (ncols > 0) {
975 128 : table->col_name = (char **)cloudsync_memory_alloc((sqlite3_uint64)(sizeof(char *) * ncols));
976 128 : if (!table->col_name) goto abort_add_table;
977 :
978 128 : table->col_id = (int *)cloudsync_memory_alloc((sqlite3_uint64)(sizeof(int) * ncols));
979 128 : if (!table->col_id) goto abort_add_table;
980 :
981 128 : table->col_merge_stmt = (sqlite3_stmt **)cloudsync_memory_alloc((sqlite3_uint64)(sizeof(sqlite3_stmt *) * ncols));
982 128 : if (!table->col_merge_stmt) goto abort_add_table;
983 :
984 128 : table->col_value_stmt = (sqlite3_stmt **)cloudsync_memory_alloc((sqlite3_uint64)(sizeof(sqlite3_stmt *) * ncols));
985 128 : if (!table->col_value_stmt) goto abort_add_table;
986 :
987 128 : sql = cloudsync_memory_mprintf("SELECT name, cid FROM pragma_table_info('%q') WHERE pk=0 ORDER BY cid;", table_name);
988 128 : if (!sql) goto abort_add_table;
989 128 : int rc = sqlite3_exec(db, sql, table_add_to_context_cb, (void *)table, NULL);
990 128 : cloudsync_memory_free(sql);
991 128 : if (rc == SQLITE_ABORT) goto abort_add_table;
992 128 : }
993 :
994 : // lookup the first free slot
995 212 : for (int i=0; i<data->tables_alloc; ++i) {
996 212 : if (data->tables[i] == NULL) {
997 163 : data->tables[i] = table;
998 163 : if (i > data->tables_count - 1) ++data->tables_count;
999 163 : break;
1000 : }
1001 49 : }
1002 :
1003 163 : return true;
1004 :
1005 : abort_add_table:
1006 0 : table_free(table);
1007 0 : return false;
1008 169 : }
1009 :
1010 6 : bool table_remove_from_context (cloudsync_context *data, cloudsync_table_context *table) {
1011 6 : return (table_remove(data, table->name) != -1);
1012 : }
1013 :
1014 3709 : sqlite3_stmt *cloudsync_colvalue_stmt (sqlite3 *db, cloudsync_context *data, const char *tbl_name, bool *persistent) {
1015 3709 : sqlite3_stmt *vm = NULL;
1016 :
1017 3709 : cloudsync_table_context *table = table_lookup(data, tbl_name);
1018 3709 : if (table) {
1019 3709 : char *col_name = NULL;
1020 3709 : if (table->ncols > 0) {
1021 3709 : col_name = table->col_name[0];
1022 : // retrieve col_value precompiled statement
1023 3709 : vm = table_column_lookup(table, col_name, false, NULL);
1024 3709 : *persistent = true;
1025 3709 : } else {
1026 0 : char *sql = table_build_value_sql(db, table, "*");
1027 0 : sqlite3_prepare_v2(db, sql, -1, &vm, NULL);
1028 0 : cloudsync_memory_free(sql);
1029 0 : *persistent = false;
1030 : }
1031 3709 : }
1032 :
1033 3709 : return vm;
1034 : }
1035 :
1036 : // MARK: - Merge Insert -
1037 :
1038 45417 : sqlite3_int64 merge_get_local_cl (cloudsync_table_context *table, const char *pk, int pklen, const char **err) {
1039 45417 : sqlite3_stmt *vm = table->meta_local_cl_stmt;
1040 45417 : sqlite3_int64 result = -1;
1041 :
1042 45417 : int rc = sqlite3_bind_blob(vm, 1, (const void *)pk, pklen, SQLITE_STATIC);
1043 45417 : if (rc != SQLITE_OK) goto cleanup;
1044 :
1045 45417 : rc = sqlite3_bind_blob(vm, 2, (const void *)pk, pklen, SQLITE_STATIC);
1046 45417 : if (rc != SQLITE_OK) goto cleanup;
1047 :
1048 45417 : rc = sqlite3_step(vm);
1049 45417 : if (rc == SQLITE_ROW) result = sqlite3_column_int64(vm, 0);
1050 0 : else if (rc == SQLITE_DONE) result = 0;
1051 :
1052 : cleanup:
1053 45417 : if (result == -1) *err = sqlite3_errmsg(sqlite3_db_handle(vm));
1054 45417 : stmt_reset(vm);
1055 45417 : return result;
1056 : }
1057 :
1058 44439 : int merge_get_col_version (cloudsync_table_context *table, const char *col_name, const char *pk, int pklen, sqlite3_int64 *version, const char **err) {
1059 44439 : sqlite3_stmt *vm = table->meta_col_version_stmt;
1060 :
1061 44439 : int rc = sqlite3_bind_blob(vm, 1, (const void *)pk, pklen, SQLITE_STATIC);
1062 44439 : if (rc != SQLITE_OK) goto cleanup;
1063 :
1064 44439 : rc = sqlite3_bind_text(vm, 2, col_name, -1, SQLITE_STATIC);
1065 44439 : if (rc != SQLITE_OK) goto cleanup;
1066 :
1067 44439 : rc = sqlite3_step(vm);
1068 69193 : if (rc == SQLITE_ROW) {
1069 24754 : *version = sqlite3_column_int64(vm, 0);
1070 24754 : rc = SQLITE_OK;
1071 24754 : }
1072 :
1073 : cleanup:
1074 44439 : if ((rc != SQLITE_OK) && (rc != SQLITE_DONE)) *err = sqlite3_errmsg(sqlite3_db_handle(vm));
1075 44439 : stmt_reset(vm);
1076 44439 : return rc;
1077 : }
1078 :
1079 24625 : int merge_set_winner_clock (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pk_len, const char *colname, sqlite3_int64 col_version, sqlite3_int64 db_version, const char *site_id, int site_len, sqlite3_int64 seq, sqlite3_int64 *rowid, const char **err) {
1080 :
1081 : // get/set site_id
1082 24625 : sqlite3_stmt *vm = data->getset_siteid_stmt;
1083 24625 : int rc = sqlite3_bind_blob(vm, 1, (const void *)site_id, site_len, SQLITE_STATIC);
1084 24625 : if (rc != SQLITE_OK) goto cleanup_merge;
1085 :
1086 24625 : rc = sqlite3_step(vm);
1087 24625 : if (rc != SQLITE_ROW) goto cleanup_merge;
1088 :
1089 24625 : int64_t ord = sqlite3_column_int64(vm, 0);
1090 24625 : stmt_reset(vm);
1091 :
1092 24625 : vm = table->meta_winner_clock_stmt;
1093 24625 : rc = sqlite3_bind_blob(vm, 1, (const void *)pk, pk_len, SQLITE_STATIC);
1094 24625 : if (rc != SQLITE_OK) goto cleanup_merge;
1095 :
1096 24625 : rc = sqlite3_bind_text(vm, 2, (colname) ? colname : CLOUDSYNC_TOMBSTONE_VALUE, -1, SQLITE_STATIC);
1097 24625 : if (rc != SQLITE_OK) goto cleanup_merge;
1098 :
1099 24625 : rc = sqlite3_bind_int64(vm, 3, col_version);
1100 24625 : if (rc != SQLITE_OK) goto cleanup_merge;
1101 :
1102 24625 : rc = sqlite3_bind_int64(vm, 4, db_version);
1103 24625 : if (rc != SQLITE_OK) goto cleanup_merge;
1104 :
1105 24625 : rc = sqlite3_bind_int64(vm, 5, seq);
1106 24625 : if (rc != SQLITE_OK) goto cleanup_merge;
1107 :
1108 24625 : rc = sqlite3_bind_int64(vm, 6, ord);
1109 24625 : if (rc != SQLITE_OK) goto cleanup_merge;
1110 :
1111 24625 : rc = sqlite3_step(vm);
1112 49250 : if (rc == SQLITE_ROW) {
1113 24625 : *rowid = sqlite3_column_int64(vm, 0);
1114 24625 : rc = SQLITE_OK;
1115 24625 : }
1116 :
1117 : cleanup_merge:
1118 24625 : if (rc != SQLITE_OK) *err = sqlite3_errmsg(sqlite3_db_handle(vm));
1119 24625 : stmt_reset(vm);
1120 24625 : return rc;
1121 : }
1122 :
1123 24431 : int merge_insert_col (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, const char *col_name, sqlite3_value *col_value, sqlite3_int64 col_version, sqlite3_int64 db_version, const char *site_id, int site_len, sqlite3_int64 seq, sqlite3_int64 *rowid, const char **err) {
1124 : int index;
1125 24431 : sqlite3_stmt *vm = table_column_lookup(table, col_name, true, &index);
1126 24431 : if (vm == NULL) {
1127 0 : *err = "Unable to retrieve column merge precompiled statement in merge_insert_col.";
1128 0 : return SQLITE_MISUSE;
1129 : }
1130 :
1131 : // INSERT INTO table (pk1, pk2, col_name) VALUES (?, ?, ?) ON CONFLICT DO UPDATE SET col_name=?;"
1132 :
1133 : // bind primary key(s)
1134 24431 : int rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, vm);
1135 24431 : if (rc < 0) {
1136 0 : *err = sqlite3_errmsg(sqlite3_db_handle(vm));
1137 0 : rc = sqlite3_errcode(sqlite3_db_handle(vm));
1138 0 : stmt_reset(vm);
1139 0 : return rc;
1140 : }
1141 :
1142 : // bind value
1143 24431 : if (col_value) {
1144 24431 : rc = sqlite3_bind_value(vm, table->npks+1, col_value);
1145 24431 : if (rc == SQLITE_OK) rc = sqlite3_bind_value(vm, table->npks+2, col_value);
1146 24431 : if (rc != SQLITE_OK) {
1147 0 : *err = sqlite3_errmsg(sqlite3_db_handle(vm));
1148 0 : stmt_reset(vm);
1149 0 : return rc;
1150 : }
1151 :
1152 24431 : }
1153 :
1154 : // perform real operation and disable triggers
1155 :
1156 : // in case of GOS we reused the table->col_merge_stmt statement
1157 : // which looks like: INSERT INTO table (pk1, pk2, col_name) VALUES (?, ?, ?) ON CONFLICT DO UPDATE SET col_name=?;"
1158 : // but the UPDATE in the CONFLICT statement would return SQLITE_CONSTRAINT because the trigger raises the error
1159 : // the trick is to disable that trigger before executing the statement
1160 24431 : if (table->algo == table_algo_crdt_gos) table->enabled = 0;
1161 24431 : SYNCBIT_SET(data);
1162 24431 : rc = sqlite3_step(vm);
1163 : DEBUG_MERGE("merge_insert(%02x%02x): %s (%d)", data->site_id[UUID_LEN-2], data->site_id[UUID_LEN-1], sqlite3_expanded_sql(vm), rc);
1164 24431 : stmt_reset(vm);
1165 24431 : SYNCBIT_RESET(data);
1166 24431 : if (table->algo == table_algo_crdt_gos) table->enabled = 1;
1167 :
1168 24431 : if (rc != SQLITE_DONE) {
1169 0 : *err = sqlite3_errmsg(sqlite3_db_handle(vm));
1170 0 : return rc;
1171 : }
1172 :
1173 24431 : return merge_set_winner_clock(data, table, pk, pklen, col_name, col_version, db_version, site_id, site_len, seq, rowid, err);
1174 24431 : }
1175 :
1176 166 : int merge_delete (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, const char *colname, sqlite3_int64 cl, sqlite3_int64 db_version, const char *site_id, int site_len, sqlite3_int64 seq, sqlite3_int64 *rowid, const char **err) {
1177 166 : int rc = SQLITE_OK;
1178 :
1179 : // reset return value
1180 166 : *rowid = 0;
1181 :
1182 : // bind pk
1183 166 : sqlite3_stmt *vm = table->real_merge_delete_stmt;
1184 166 : rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, vm);
1185 166 : if (rc < 0) {
1186 0 : *err = sqlite3_errmsg(sqlite3_db_handle(vm));
1187 0 : rc = sqlite3_errcode(sqlite3_db_handle(vm));
1188 0 : stmt_reset(vm);
1189 0 : return rc;
1190 : }
1191 :
1192 : // perform real operation and disable triggers
1193 166 : SYNCBIT_SET(data);
1194 166 : rc = sqlite3_step(vm);
1195 : DEBUG_MERGE("merge_delete(%02x%02x): %s (%d)", data->site_id[UUID_LEN-2], data->site_id[UUID_LEN-1], sqlite3_expanded_sql(vm), rc);
1196 166 : stmt_reset(vm);
1197 166 : SYNCBIT_RESET(data);
1198 166 : if (rc == SQLITE_DONE) rc = SQLITE_OK;
1199 166 : if (rc != SQLITE_OK) {
1200 0 : *err = sqlite3_errmsg(sqlite3_db_handle(vm));
1201 0 : return rc;
1202 : }
1203 :
1204 166 : rc = merge_set_winner_clock(data, table, pk, pklen, colname, cl, db_version, site_id, site_len, seq, rowid, err);
1205 166 : if (rc != SQLITE_OK) return rc;
1206 :
1207 : // drop clocks _after_ setting the winner clock so we don't lose track of the max db_version!!
1208 : // this must never come before `set_winner_clock`
1209 166 : vm = table->meta_merge_delete_drop;
1210 166 : rc = sqlite3_bind_blob(vm, 1, (const void *)pk, pklen, SQLITE_STATIC);
1211 166 : if (rc == SQLITE_OK) rc = sqlite3_step(vm);
1212 166 : stmt_reset(vm);
1213 166 : if (rc == SQLITE_DONE) rc = SQLITE_OK;
1214 166 : if (rc != SQLITE_OK) {
1215 0 : *err = sqlite3_errmsg(sqlite3_db_handle(vm));
1216 0 : }
1217 :
1218 166 : return rc;
1219 166 : }
1220 :
1221 28 : int merge_zeroclock_on_resurrect(cloudsync_table_context *table, sqlite3_int64 db_version, const char *pk, int pklen, const char **err) {
1222 28 : sqlite3_stmt *vm = table->meta_zero_clock_stmt;
1223 :
1224 28 : int rc = sqlite3_bind_int64(vm, 1, db_version);
1225 28 : if (rc != SQLITE_OK) goto cleanup;
1226 :
1227 28 : rc = sqlite3_bind_blob(vm, 2, (const void *)pk, pklen, SQLITE_STATIC);
1228 28 : if (rc != SQLITE_OK) goto cleanup;
1229 :
1230 28 : rc = sqlite3_step(vm);
1231 28 : if (rc == SQLITE_DONE) rc = SQLITE_OK;
1232 :
1233 : cleanup:
1234 28 : if (rc != SQLITE_OK) *err = sqlite3_errmsg(sqlite3_db_handle(vm));
1235 28 : stmt_reset(vm);
1236 28 : return rc;
1237 : }
1238 :
1239 : // executed only if insert_cl == local_cl
1240 44439 : int merge_did_cid_win (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, sqlite3_value *insert_value, const char *site_id, int site_len, const char *col_name, sqlite3_int64 col_version, bool *didwin_flag, const char **err) {
1241 :
1242 44439 : if (col_name == NULL) col_name = CLOUDSYNC_TOMBSTONE_VALUE;
1243 :
1244 : sqlite3_int64 local_version;
1245 44439 : int rc = merge_get_col_version(table, col_name, pk, pklen, &local_version, err);
1246 44439 : if (rc == SQLITE_DONE) {
1247 : // no rows returned, the incoming change wins if there's nothing there locally
1248 19685 : *didwin_flag = true;
1249 19685 : return SQLITE_OK;
1250 : }
1251 24754 : if (rc != SQLITE_OK) return rc;
1252 :
1253 : // rc == SQLITE_OK, means that a row with a version exists
1254 24754 : if (local_version != col_version) {
1255 1886 : if (col_version > local_version) {*didwin_flag = true; return SQLITE_OK;}
1256 703 : if (col_version < local_version) {*didwin_flag = false; return SQLITE_OK;}
1257 0 : }
1258 :
1259 : // rc == SQLITE_ROW and col_version == local_version, need to compare values
1260 :
1261 : // retrieve col_value precompiled statement
1262 22868 : sqlite3_stmt *vm = table_column_lookup(table, col_name, false, NULL);
1263 22868 : if (!vm) {
1264 0 : *err = "Unable to retrieve column value precompiled statement in merge_did_cid_win.";
1265 0 : return SQLITE_ERROR;
1266 : }
1267 :
1268 : // bind primary key values
1269 22868 : rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, (void *)vm);
1270 22868 : if (rc < 0) {
1271 0 : *err = sqlite3_errmsg(sqlite3_db_handle(vm));
1272 0 : rc = sqlite3_errcode(sqlite3_db_handle(vm));
1273 0 : stmt_reset(vm);
1274 0 : return rc;
1275 : }
1276 :
1277 : // execute vm
1278 : sqlite3_value *local_value;
1279 22868 : rc = sqlite3_step(vm);
1280 22868 : if (rc == SQLITE_DONE) {
1281 : // meta entry exists but the actual value is missing
1282 : // we should allow the value_compare function to make a decision
1283 : // value_compare has been modified to handle the case where lvalue is NULL
1284 0 : local_value = NULL;
1285 0 : rc = SQLITE_OK;
1286 22868 : } else if (rc == SQLITE_ROW) {
1287 22868 : local_value = sqlite3_column_value(vm, 0);
1288 22868 : rc = SQLITE_OK;
1289 22868 : } else {
1290 0 : goto cleanup;
1291 : }
1292 :
1293 : // compare values
1294 22868 : int ret = dbutils_value_compare(insert_value, local_value);
1295 : // reset after compare, otherwise local value would be deallocated
1296 22868 : vm = stmt_reset(vm);
1297 :
1298 22868 : bool compare_site_id = (ret == 0 && data->merge_equal_values == true);
1299 22868 : if (!compare_site_id) {
1300 22868 : *didwin_flag = (ret > 0);
1301 22868 : goto cleanup;
1302 : }
1303 :
1304 : // values are the same and merge_equal_values is true
1305 0 : vm = table->meta_site_id_stmt;
1306 0 : rc = sqlite3_bind_blob(vm, 1, (const void *)pk, pklen, SQLITE_STATIC);
1307 0 : if (rc != SQLITE_OK) goto cleanup;
1308 :
1309 0 : rc = sqlite3_bind_text(vm, 2, col_name, -1, SQLITE_STATIC);
1310 0 : if (rc != SQLITE_OK) goto cleanup;
1311 :
1312 0 : rc = sqlite3_step(vm);
1313 0 : if (rc == SQLITE_ROW) {
1314 0 : const void *local_site_id = sqlite3_column_blob(vm, 0);
1315 0 : ret = memcmp(site_id, local_site_id, site_len);
1316 0 : *didwin_flag = (ret > 0);
1317 0 : stmt_reset(vm);
1318 0 : return SQLITE_OK;
1319 : }
1320 :
1321 : // handle error condition here
1322 0 : stmt_reset(vm);
1323 0 : *err = "Unable to find site_id for previous change. The cloudsync table is probably corrupted.";
1324 0 : return SQLITE_ERROR;
1325 :
1326 : cleanup:
1327 22868 : if (rc != SQLITE_OK) *err = sqlite3_errmsg(sqlite3_db_handle(vm));
1328 22868 : if (vm) stmt_reset(vm);
1329 22868 : return rc;
1330 44439 : }
1331 :
1332 28 : int merge_sentinel_only_insert (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, sqlite3_int64 cl, sqlite3_int64 db_version, const char *site_id, int site_len, sqlite3_int64 seq, sqlite3_int64 *rowid, const char **err) {
1333 :
1334 : // reset return value
1335 28 : *rowid = 0;
1336 :
1337 : // bind pk
1338 28 : sqlite3_stmt *vm = table->real_merge_sentinel_stmt;
1339 28 : int rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, vm);
1340 28 : if (rc < 0) {
1341 0 : *err = sqlite3_errmsg(sqlite3_db_handle(vm));
1342 0 : rc = sqlite3_errcode(sqlite3_db_handle(vm));
1343 0 : stmt_reset(vm);
1344 0 : return rc;
1345 : }
1346 :
1347 : // perform real operation and disable triggers
1348 28 : SYNCBIT_SET(data);
1349 28 : rc = sqlite3_step(vm);
1350 28 : stmt_reset(vm);
1351 28 : SYNCBIT_RESET(data);
1352 28 : if (rc == SQLITE_DONE) rc = SQLITE_OK;
1353 28 : if (rc != SQLITE_OK) {
1354 0 : *err = sqlite3_errmsg(sqlite3_db_handle(vm));
1355 0 : return rc;
1356 : }
1357 :
1358 28 : rc = merge_zeroclock_on_resurrect(table, db_version, pk, pklen, err);
1359 28 : if (rc != SQLITE_OK) return rc;
1360 :
1361 28 : return merge_set_winner_clock(data, table, pk, pklen, NULL, cl, db_version, site_id, site_len, seq, rowid, err);
1362 28 : }
1363 :
1364 3150 : int cloudsync_merge_insert_gos (sqlite3_vtab *vtab, cloudsync_context *data, cloudsync_table_context *table, const char *insert_pk, int insert_pk_len, const char *insert_name, sqlite3_value *insert_value, sqlite3_int64 insert_col_version, sqlite3_int64 insert_db_version, const char *insert_site_id, int insert_site_id_len, sqlite3_int64 insert_seq, sqlite3_int64 *rowid) {
1365 : // Grow-Only Set (GOS) Algorithm: Only insertions are allowed, deletions and updates are prevented from a trigger.
1366 :
1367 3150 : const char *err = NULL;
1368 6300 : int rc = merge_insert_col(data, table, insert_pk, insert_pk_len, insert_name, insert_value, insert_col_version, insert_db_version,
1369 3150 : insert_site_id, insert_site_id_len, insert_seq, rowid, &err);
1370 3150 : if (rc != SQLITE_OK) {
1371 0 : cloudsync_vtab_set_error(vtab, "Unable to perform GOS merge_insert_col: %s", err);
1372 0 : }
1373 :
1374 3150 : return rc;
1375 : }
1376 :
1377 48567 : int cloudsync_merge_insert (sqlite3_vtab *vtab, int argc, sqlite3_value **argv, sqlite3_int64 *rowid) {
1378 : // this function performs the merging logic for an insert in a cloud-synchronized table. It handles
1379 : // different scenarios including conflicts, causal lengths, delete operations, and resurrecting rows
1380 : // based on the incoming data (from remote nodes or clients) and the local database state
1381 :
1382 : // this function handles different CRDT algorithms (GOS, DWS, AWS, and CLS).
1383 : // the merging strategy is determined based on the table->algo value.
1384 :
1385 : // meta table declaration:
1386 : // tbl TEXT NOT NULL, pk BLOB NOT NULL, col_name TEXT NOT NULL,"
1387 : // "col_value ANY, col_version INTEGER NOT NULL, db_version INTEGER NOT NULL,"
1388 : // "site_id BLOB NOT NULL, cl INTEGER NOT NULL, seq INTEGER NOT NULL
1389 :
1390 : // meta information to retrieve from arguments:
1391 : // argv[0] -> table name (TEXT)
1392 : // argv[1] -> primary key (BLOB)
1393 : // argv[2] -> column name (TEXT or NULL if sentinel)
1394 : // argv[3] -> column value (ANY)
1395 : // argv[4] -> column version (INTEGER)
1396 : // argv[5] -> database version (INTEGER)
1397 : // argv[6] -> site ID (BLOB, identifies the origin of the update)
1398 : // argv[7] -> causal length (INTEGER, tracks the order of operations)
1399 : // argv[8] -> sequence number (INTEGER, unique per operation)
1400 :
1401 : // extract table name
1402 48567 : const char *insert_tbl = (const char *)sqlite3_value_text(argv[0]);
1403 :
1404 : // lookup table
1405 48567 : cloudsync_context *data = cloudsync_vtab_get_context(vtab);
1406 48567 : cloudsync_table_context *table = table_lookup(data, insert_tbl);
1407 48567 : if (!table) return cloudsync_vtab_set_error(vtab, "Unable to find table %s,", insert_tbl);
1408 :
1409 : // extract the remaining fields from the input values
1410 48567 : const char *insert_pk = (const char *)sqlite3_value_blob(argv[1]);
1411 48567 : int insert_pk_len = sqlite3_value_bytes(argv[1]);
1412 48567 : const char *insert_name = (sqlite3_value_type(argv[2]) == SQLITE_NULL) ? CLOUDSYNC_TOMBSTONE_VALUE : (const char *)sqlite3_value_text(argv[2]);
1413 48567 : sqlite3_value *insert_value = argv[3];
1414 48567 : sqlite3_int64 insert_col_version = sqlite3_value_int64(argv[4]);
1415 48567 : sqlite3_int64 insert_db_version = sqlite3_value_int64(argv[5]);
1416 48567 : const char *insert_site_id = (const char *)sqlite3_value_blob(argv[6]);
1417 48567 : int insert_site_id_len = sqlite3_value_bytes(argv[6]);
1418 48567 : sqlite3_int64 insert_cl = sqlite3_value_int64(argv[7]);
1419 48567 : sqlite3_int64 insert_seq = sqlite3_value_int64(argv[8]);
1420 48567 : const char *err = NULL;
1421 :
1422 : // perform different logic for each different table algorithm
1423 48567 : if (table->algo == table_algo_crdt_gos) return cloudsync_merge_insert_gos(vtab, data, table, insert_pk, insert_pk_len, insert_name, insert_value, insert_col_version, insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid);
1424 :
1425 : // Handle DWS and AWS algorithms here
1426 : // Delete-Wins Set (DWS): table_algo_crdt_dws
1427 : // Add-Wins Set (AWS): table_algo_crdt_aws
1428 :
1429 : // Causal-Length Set (CLS) Algorithm (default)
1430 :
1431 : // compute the local causal length for the row based on the primary key
1432 : // the causal length is used to determine the order of operations and resolve conflicts.
1433 45417 : sqlite3_int64 local_cl = merge_get_local_cl(table, insert_pk, insert_pk_len, &err);
1434 45417 : if (local_cl < 0) {
1435 0 : return cloudsync_vtab_set_error(vtab, "Unable to compute local causal length: %s", err);
1436 : }
1437 :
1438 : // if the incoming causal length is older than the local causal length, we can safely ignore it
1439 : // because the local changes are more recent
1440 45417 : if (insert_cl < local_cl) return SQLITE_OK;
1441 :
1442 : // check if the operation is a delete by examining the causal length
1443 : // even causal lengths typically signify delete operations
1444 45191 : bool is_delete = (insert_cl % 2 == 0);
1445 45191 : if (is_delete) {
1446 : // if it's a delete, check if the local state is at the same causal length
1447 : // if it is, no further action is needed
1448 602 : if (local_cl == insert_cl) return SQLITE_OK;
1449 :
1450 : // perform a delete merge if the causal length is newer than the local one
1451 332 : int rc = merge_delete(data, table, insert_pk, insert_pk_len, insert_name, insert_col_version,
1452 166 : insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid, &err);
1453 166 : if (rc != SQLITE_OK) cloudsync_vtab_set_error(vtab, "Unable to perform merge_delete: %s", err);
1454 166 : return rc;
1455 : }
1456 :
1457 : // if the operation is a sentinel-only insert (indicating a new row or resurrected row with no column update), handle it separately.
1458 44589 : bool is_sentinel_only = (strcmp(insert_name, CLOUDSYNC_TOMBSTONE_VALUE) == 0);
1459 44589 : if (is_sentinel_only) {
1460 150 : if (local_cl == insert_cl) return SQLITE_OK;
1461 :
1462 : // perform a sentinel-only insert to track the existence of the row
1463 56 : int rc = merge_sentinel_only_insert(data, table, insert_pk, insert_pk_len, insert_col_version,
1464 28 : insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid, &err);
1465 28 : if (rc != SQLITE_OK) cloudsync_vtab_set_error(vtab, "Unable to perform merge_sentinel_only_insert: %s", err);
1466 28 : return rc;
1467 : }
1468 :
1469 : // from this point I can be sure that insert_name is not sentinel
1470 :
1471 : // handle the case where a row is being resurrected (e.g., after a delete, a new insert for the same row)
1472 : // odd causal lengths can "resurrect" rows
1473 44439 : bool needs_resurrect = (insert_cl > local_cl && insert_cl % 2 == 1);
1474 44439 : bool row_exists_locally = local_cl != 0;
1475 :
1476 : // if a resurrection is needed, insert a sentinel to mark the row as alive
1477 : // this handles out-of-order deliveries where the row was deleted and is now being re-inserted
1478 44439 : if (needs_resurrect && (row_exists_locally || (!row_exists_locally && insert_cl > 1))) {
1479 0 : int rc = merge_sentinel_only_insert(data, table, insert_pk, insert_pk_len, insert_cl,
1480 0 : insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid, &err);
1481 0 : if (rc != SQLITE_OK) {
1482 0 : cloudsync_vtab_set_error(vtab, "Unable to perform merge_sentinel_only_insert: %s", err);
1483 0 : return rc;
1484 : }
1485 0 : }
1486 :
1487 : // at this point, we determine whether the incoming change wins based on causal length
1488 : // this can be due to a resurrection, a non-existent local row, or a conflict resolution
1489 44439 : bool flag = false;
1490 44439 : int rc = merge_did_cid_win(data, table, insert_pk, insert_pk_len, insert_value, insert_site_id, insert_site_id_len, insert_name, insert_col_version, &flag, &err);
1491 44439 : if (rc != SQLITE_OK) {
1492 0 : cloudsync_vtab_set_error(vtab, "Unable to perform merge_did_cid_win: %s", err);
1493 0 : return rc;
1494 : }
1495 :
1496 : // check if the incoming change wins and should be applied
1497 44439 : bool does_cid_win = ((needs_resurrect) || (!row_exists_locally) || (flag));
1498 44439 : if (!does_cid_win) return SQLITE_OK;
1499 :
1500 : // perform the final column insert or update if the incoming change wins
1501 21281 : rc = merge_insert_col(data, table, insert_pk, insert_pk_len, insert_name, insert_value, insert_col_version, insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid, &err);
1502 21281 : if (rc != SQLITE_OK) cloudsync_vtab_set_error(vtab, "Unable to perform merge_insert_col: %s", err);
1503 21281 : return rc;
1504 48567 : }
1505 :
1506 : // MARK: - Private -
1507 :
1508 109 : bool cloudsync_config_exists (sqlite3 *db) {
1509 109 : return dbutils_table_exists(db, CLOUDSYNC_SITEID_NAME) == true;
1510 : }
1511 :
1512 109 : void *cloudsync_context_create (void) {
1513 109 : cloudsync_context *data = (cloudsync_context *)cloudsync_memory_zeroalloc((uint64_t)(sizeof(cloudsync_context)));
1514 : DEBUG_SETTINGS("cloudsync_context_create %p", data);
1515 :
1516 109 : data->libversion = CLOUDSYNC_VERSION;
1517 109 : data->pending_db_version = CLOUDSYNC_VALUE_NOTSET;
1518 : #if CLOUDSYNC_DEBUG
1519 : data->debug = 1;
1520 : #endif
1521 :
1522 : // allocate space for 128 tables (it can grow if needed)
1523 109 : data->tables = (cloudsync_table_context **)cloudsync_memory_zeroalloc((uint64_t)(CLOUDSYNC_INIT_NTABLES * sizeof(cloudsync_table_context *)));
1524 109 : if (!data->tables) {
1525 0 : cloudsync_memory_free(data);
1526 0 : return NULL;
1527 : }
1528 109 : data->tables_alloc = CLOUDSYNC_INIT_NTABLES;
1529 109 : data->tables_count = 0;
1530 :
1531 109 : return data;
1532 109 : }
1533 :
1534 109 : void cloudsync_context_free (void *ptr) {
1535 : DEBUG_SETTINGS("cloudsync_context_free %p", ptr);
1536 109 : if (!ptr) return;
1537 :
1538 109 : cloudsync_context *data = (cloudsync_context*)ptr;
1539 109 : cloudsync_memory_free(data->tables);
1540 109 : cloudsync_memory_free(data);
1541 109 : }
1542 :
1543 223 : const char *cloudsync_context_init (sqlite3 *db, cloudsync_context *data, sqlite3_context *context) {
1544 223 : if (!data && context) data = (cloudsync_context *)sqlite3_user_data(context);
1545 :
1546 : // perform init just the first time, if the site_id field is not set.
1547 : // The data->site_id value could exists while settings tables don't exists if the
1548 : // cloudsync_context_init was previously called in init transaction that was rolled back
1549 : // because of an error during the init process.
1550 223 : if (data->site_id[0] == 0 || !dbutils_table_exists(db, CLOUDSYNC_SITEID_NAME)) {
1551 111 : if (dbutils_settings_init(db, data, context) != SQLITE_OK) return NULL;
1552 111 : if (stmts_add_tocontext(db, data) != SQLITE_OK) return NULL;
1553 111 : if (cloudsync_load_siteid(db, data) != SQLITE_OK) return NULL;
1554 :
1555 111 : data->sqlite_ctx = context;
1556 111 : data->schema_hash = dbutils_schema_hash(db);
1557 111 : }
1558 :
1559 223 : return (const char *)data->site_id;
1560 223 : }
1561 :
1562 1029 : void cloudsync_sync_key(cloudsync_context *data, const char *key, const char *value) {
1563 : DEBUG_SETTINGS("cloudsync_sync_key key: %s value: %s", key, value);
1564 :
1565 : // sync data
1566 1029 : if (strcmp(key, CLOUDSYNC_KEY_SCHEMAVERSION) == 0) {
1567 111 : data->schema_version = (int)strtol(value, NULL, 0);
1568 111 : return;
1569 : }
1570 :
1571 918 : if (strcmp(key, CLOUDSYNC_KEY_DEBUG) == 0) {
1572 0 : data->debug = 0;
1573 0 : if (value && (value[0] != 0) && (value[0] != '0')) data->debug = 1;
1574 0 : return;
1575 : }
1576 1029 : }
1577 :
1578 : #if 0
1579 : void cloudsync_sync_table_key(cloudsync_context *data, const char *table, const char *column, const char *key, const char *value) {
1580 : DEBUG_SETTINGS("cloudsync_sync_table_key table: %s column: %s key: %s value: %s", table, column, key, value);
1581 : // Unused in this version
1582 : return;
1583 : }
1584 : #endif
1585 :
1586 6898 : int cloudsync_commit_hook (void *ctx) {
1587 6898 : cloudsync_context *data = (cloudsync_context *)ctx;
1588 :
1589 6898 : data->db_version = data->pending_db_version;
1590 6898 : data->pending_db_version = CLOUDSYNC_VALUE_NOTSET;
1591 6898 : data->seq = 0;
1592 :
1593 6898 : return SQLITE_OK;
1594 : }
1595 :
1596 2 : void cloudsync_rollback_hook (void *ctx) {
1597 2 : cloudsync_context *data = (cloudsync_context *)ctx;
1598 :
1599 2 : data->pending_db_version = CLOUDSYNC_VALUE_NOTSET;
1600 2 : data->seq = 0;
1601 2 : }
1602 :
1603 23 : int cloudsync_finalize_alter (sqlite3_context *context, cloudsync_context *data, cloudsync_table_context *table) {
1604 23 : int rc = SQLITE_OK;
1605 23 : sqlite3 *db = sqlite3_context_db_handle(context);
1606 :
1607 23 : db_version_check_uptodate(db, data);
1608 :
1609 : // If primary key columns change (in the schema)
1610 : // We need to drop, re-create and backfill
1611 : // the clock table.
1612 : // A change in pk columns means a change in all identities
1613 : // of all rows.
1614 : // We can determine this by comparing unique index on lookaside table vs
1615 : // pks on source table
1616 23 : char *errmsg = NULL;
1617 23 : char **result = NULL;
1618 : int nrows, ncols;
1619 23 : char *sql = cloudsync_memory_mprintf("SELECT name FROM pragma_table_info('%q') WHERE pk>0 ORDER BY pk;", table->name);
1620 23 : rc = sqlite3_get_table(db, sql, &result, &nrows, &ncols, NULL);
1621 23 : cloudsync_memory_free(sql);
1622 23 : if (rc != SQLITE_OK) {
1623 0 : DEBUG_SQLITE_ERROR(rc, "cloudsync_finalize_alter", db);
1624 0 : goto finalize;
1625 23 : } else if (errmsg || ncols != 1) {
1626 0 : rc = SQLITE_MISUSE;
1627 0 : goto finalize;
1628 : }
1629 :
1630 23 : bool pk_diff = false;
1631 23 : if (nrows != table->npks) {
1632 6 : pk_diff = true;
1633 6 : } else {
1634 51 : for (int i=0; i<nrows; ++i) {
1635 34 : if (strcmp(table->pk_name[i], result[i]) != 0) {
1636 0 : pk_diff = true;
1637 0 : break;
1638 : }
1639 34 : }
1640 : }
1641 :
1642 23 : if (pk_diff) {
1643 : // drop meta-table, it will be recreated
1644 6 : char *sql = cloudsync_memory_mprintf("DROP TABLE IF EXISTS \"%w_cloudsync\";", table->name);
1645 6 : rc = sqlite3_exec(db, sql, NULL, NULL, NULL);
1646 6 : cloudsync_memory_free(sql);
1647 6 : if (rc != SQLITE_OK) {
1648 0 : DEBUG_SQLITE_ERROR(rc, "cloudsync_finalize_alter", db);
1649 0 : goto finalize;
1650 : }
1651 6 : } else {
1652 : // compact meta-table
1653 : // delete entries for removed columns
1654 17 : char *sql = cloudsync_memory_mprintf("DELETE FROM \"%w_cloudsync\" WHERE \"col_name\" NOT IN ("
1655 : "SELECT name FROM pragma_table_info('%q') UNION SELECT '%s'"
1656 17 : ")", table->name, table->name, CLOUDSYNC_TOMBSTONE_VALUE);
1657 17 : rc = sqlite3_exec(db, sql, NULL, NULL, NULL);
1658 17 : cloudsync_memory_free(sql);
1659 17 : if (rc != SQLITE_OK) {
1660 0 : DEBUG_SQLITE_ERROR(rc, "cloudsync_finalize_alter", db);
1661 0 : goto finalize;
1662 : }
1663 :
1664 17 : char *singlequote_escaped_table_name = cloudsync_memory_mprintf("%q", table->name);
1665 17 : sql = cloudsync_memory_mprintf("SELECT group_concat('\"%w\".\"' || format('%%w', name) || '\"', ',') FROM pragma_table_info('%s') WHERE pk>0 ORDER BY pk;", singlequote_escaped_table_name, singlequote_escaped_table_name);
1666 17 : cloudsync_memory_free(singlequote_escaped_table_name);
1667 17 : if (!sql) {
1668 0 : rc = SQLITE_NOMEM;
1669 0 : goto finalize;
1670 : }
1671 17 : char *pkclause = dbutils_text_select(db, sql);
1672 17 : char *pkvalues = (pkclause) ? pkclause : "rowid";
1673 17 : cloudsync_memory_free(sql);
1674 :
1675 : // delete entries related to rows that no longer exist in the original table, but preserve tombstone
1676 17 : sql = cloudsync_memory_mprintf("DELETE FROM \"%w_cloudsync\" WHERE (\"col_name\" != '%s' OR (\"col_name\" = '%s' AND col_version %% 2 != 0)) AND NOT EXISTS (SELECT 1 FROM \"%w\" WHERE \"%w_cloudsync\".pk = cloudsync_pk_encode(%s) LIMIT 1);", table->name, CLOUDSYNC_TOMBSTONE_VALUE, CLOUDSYNC_TOMBSTONE_VALUE, table->name, table->name, pkvalues);
1677 17 : rc = sqlite3_exec(db, sql, NULL, NULL, NULL);
1678 17 : if (pkclause) cloudsync_memory_free(pkclause);
1679 17 : cloudsync_memory_free(sql);
1680 17 : if (rc != SQLITE_OK) {
1681 0 : DEBUG_SQLITE_ERROR(rc, "cloudsync_finalize_alter", db);
1682 0 : goto finalize;
1683 : }
1684 :
1685 : }
1686 :
1687 : char buf[256];
1688 23 : snprintf(buf, sizeof(buf), "%lld", data->db_version);
1689 23 : dbutils_settings_set_key_value(db, context, "pre_alter_dbversion", buf);
1690 :
1691 : finalize:
1692 23 : sqlite3_free_table(result);
1693 23 : sqlite3_free(errmsg);
1694 :
1695 23 : return rc;
1696 : }
1697 :
1698 168 : int cloudsync_refill_metatable (sqlite3 *db, cloudsync_context *data, const char *table_name) {
1699 168 : cloudsync_table_context *table = table_lookup(data, table_name);
1700 168 : if (!table) return SQLITE_INTERNAL;
1701 :
1702 168 : sqlite3_stmt *vm = NULL;
1703 168 : sqlite3_int64 db_version = db_version_next(db, data, CLOUDSYNC_VALUE_NOTSET);
1704 :
1705 168 : char *sql = cloudsync_memory_mprintf("SELECT group_concat('\"' || format('%%w', name) || '\"', ',') FROM pragma_table_info('%q') WHERE pk>0 ORDER BY pk;", table_name);
1706 168 : char *pkclause_identifiers = dbutils_text_select(db, sql);
1707 168 : char *pkvalues_identifiers = (pkclause_identifiers) ? pkclause_identifiers : "rowid";
1708 168 : cloudsync_memory_free(sql);
1709 :
1710 168 : sql = cloudsync_memory_mprintf("SELECT group_concat('cloudsync_pk_decode(pk, ' || pk || ') AS ' || '\"' || format('%%w', name) || '\"', ',') FROM pragma_table_info('%q') WHERE pk>0 ORDER BY pk;", table_name);
1711 168 : char *pkdecode = dbutils_text_select(db, sql);
1712 168 : char *pkdecodeval = (pkdecode) ? pkdecode : "cloudsync_pk_decode(pk, 1) AS rowid";
1713 168 : cloudsync_memory_free(sql);
1714 :
1715 168 : sql = cloudsync_memory_mprintf("SELECT cloudsync_insert('%q', %s) FROM (SELECT %s FROM \"%w\" EXCEPT SELECT %s FROM \"%w_cloudsync\");", table_name, pkvalues_identifiers, pkvalues_identifiers, table_name, pkdecodeval, table_name);
1716 168 : int rc = sqlite3_exec(db, sql, NULL, NULL, NULL);
1717 168 : cloudsync_memory_free(sql);
1718 168 : if (rc != SQLITE_OK) goto finalize;
1719 :
1720 : // fill missing colums
1721 : // for each non-pk column:
1722 : // The new query does 1 encode per source row and one indexed NOT-EXISTS probe.
1723 : // The old plan does many decodes per candidate and can’t use an index to rule out matches quickly—so it burns CPU and I/O.
1724 :
1725 168 : sql = cloudsync_memory_mprintf("WITH _cstemp1 AS (SELECT cloudsync_pk_encode(%s) AS pk FROM \"%w\") SELECT _cstemp1.pk FROM _cstemp1 WHERE NOT EXISTS (SELECT 1 FROM \"%w_cloudsync\" _cstemp2 WHERE _cstemp2.pk = _cstemp1.pk AND _cstemp2.col_name = ?);", pkvalues_identifiers, table_name, table_name);
1726 168 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &vm, NULL);
1727 168 : cloudsync_memory_free(sql);
1728 168 : if (rc != SQLITE_OK) goto finalize;
1729 :
1730 1117 : for (int i=0; i<table->ncols; ++i) {
1731 949 : char *col_name = table->col_name[i];
1732 :
1733 949 : rc = sqlite3_bind_text(vm, 1, col_name, -1, SQLITE_STATIC);
1734 949 : if (rc != SQLITE_OK) goto finalize;
1735 :
1736 987 : while (1) {
1737 987 : rc = sqlite3_step(vm);
1738 987 : if (rc == SQLITE_ROW) {
1739 38 : const char *pk = (const char *)sqlite3_column_text(vm, 0);
1740 38 : size_t pklen = strlen(pk);
1741 38 : rc = local_mark_insert_or_update_meta(db, table, pk, pklen, col_name, db_version, BUMP_SEQ(data));
1742 987 : } else if (rc == SQLITE_DONE) {
1743 949 : rc = SQLITE_OK;
1744 949 : break;
1745 : } else {
1746 0 : break;
1747 : }
1748 : }
1749 949 : if (rc != SQLITE_OK) goto finalize;
1750 :
1751 949 : sqlite3_reset(vm);
1752 1117 : }
1753 :
1754 : finalize:
1755 168 : if (rc != SQLITE_OK) DEBUG_ALWAYS("cloudsync_refill_metatable error: %s", sqlite3_errmsg(db));
1756 168 : if (pkclause_identifiers) cloudsync_memory_free(pkclause_identifiers);
1757 168 : if (pkdecode) cloudsync_memory_free(pkdecode);
1758 168 : if (vm) sqlite3_finalize(vm);
1759 168 : return rc;
1760 168 : }
1761 :
1762 : // MARK: - Local -
1763 :
1764 1 : int local_update_sentinel (sqlite3 *db, cloudsync_table_context *table, const char *pk, size_t pklen, sqlite3_int64 db_version, int seq) {
1765 1 : sqlite3_stmt *vm = table->meta_sentinel_update_stmt;
1766 1 : if (!vm) return -1;
1767 :
1768 1 : int rc = sqlite3_bind_int64(vm, 1, db_version);
1769 1 : if (rc != SQLITE_OK) goto cleanup;
1770 :
1771 1 : rc = sqlite3_bind_int(vm, 2, seq);
1772 1 : if (rc != SQLITE_OK) goto cleanup;
1773 :
1774 1 : rc = sqlite3_bind_blob(vm, 3, pk, (int)pklen, SQLITE_STATIC);
1775 1 : if (rc != SQLITE_OK) goto cleanup;
1776 :
1777 1 : rc = sqlite3_step(vm);
1778 1 : if (rc == SQLITE_DONE) rc = SQLITE_OK;
1779 :
1780 : cleanup:
1781 1 : DEBUG_SQLITE_ERROR(rc, "local_update_sentinel", db);
1782 1 : sqlite3_reset(vm);
1783 1 : return rc;
1784 1 : }
1785 :
1786 119 : int local_mark_insert_sentinel_meta (sqlite3 *db, cloudsync_table_context *table, const char *pk, size_t pklen, sqlite3_int64 db_version, int seq) {
1787 119 : sqlite3_stmt *vm = table->meta_sentinel_insert_stmt;
1788 119 : if (!vm) return -1;
1789 :
1790 119 : int rc = sqlite3_bind_blob(vm, 1, pk, (int)pklen, SQLITE_STATIC);
1791 119 : if (rc != SQLITE_OK) goto cleanup;
1792 :
1793 119 : rc = sqlite3_bind_int64(vm, 2, db_version);
1794 119 : if (rc != SQLITE_OK) goto cleanup;
1795 :
1796 119 : rc = sqlite3_bind_int(vm, 3, seq);
1797 119 : if (rc != SQLITE_OK) goto cleanup;
1798 :
1799 119 : rc = sqlite3_bind_int64(vm, 4, db_version);
1800 119 : if (rc != SQLITE_OK) goto cleanup;
1801 :
1802 119 : rc = sqlite3_bind_int(vm, 5, seq);
1803 119 : if (rc != SQLITE_OK) goto cleanup;
1804 :
1805 119 : rc = sqlite3_step(vm);
1806 119 : if (rc == SQLITE_DONE) rc = SQLITE_OK;
1807 :
1808 : cleanup:
1809 119 : DEBUG_SQLITE_ERROR(rc, "local_insert_sentinel", db);
1810 119 : sqlite3_reset(vm);
1811 119 : return rc;
1812 119 : }
1813 :
1814 11288 : int local_mark_insert_or_update_meta_impl (sqlite3 *db, cloudsync_table_context *table, const char *pk, size_t pklen, const char *col_name, int col_version, sqlite3_int64 db_version, int seq) {
1815 :
1816 11288 : sqlite3_stmt *vm = table->meta_row_insert_update_stmt;
1817 11288 : if (!vm) return -1;
1818 :
1819 11288 : int rc = sqlite3_bind_blob(vm, 1, pk, (int)pklen, SQLITE_STATIC);
1820 11288 : if (rc != SQLITE_OK) goto cleanup;
1821 :
1822 11288 : rc = sqlite3_bind_text(vm, 2, (col_name) ? col_name : CLOUDSYNC_TOMBSTONE_VALUE, -1, SQLITE_STATIC);
1823 11288 : if (rc != SQLITE_OK) goto cleanup;
1824 :
1825 11288 : rc = sqlite3_bind_int(vm, 3, col_version);
1826 11288 : if (rc != SQLITE_OK) goto cleanup;
1827 :
1828 11288 : rc = sqlite3_bind_int64(vm, 4, db_version);
1829 11288 : if (rc != SQLITE_OK) goto cleanup;
1830 :
1831 11288 : rc = sqlite3_bind_int(vm, 5, seq);
1832 11288 : if (rc != SQLITE_OK) goto cleanup;
1833 :
1834 11288 : rc = sqlite3_bind_int64(vm, 6, db_version);
1835 11288 : if (rc != SQLITE_OK) goto cleanup;
1836 :
1837 11288 : rc = sqlite3_bind_int(vm, 7, seq);
1838 11288 : if (rc != SQLITE_OK) goto cleanup;
1839 :
1840 11288 : rc = sqlite3_step(vm);
1841 11288 : if (rc == SQLITE_DONE) rc = SQLITE_OK;
1842 :
1843 : cleanup:
1844 11288 : DEBUG_SQLITE_ERROR(rc, "local_insert_or_update", db);
1845 11288 : sqlite3_reset(vm);
1846 11288 : return rc;
1847 11288 : }
1848 :
1849 11232 : int local_mark_insert_or_update_meta (sqlite3 *db, cloudsync_table_context *table, const char *pk, size_t pklen, const char *col_name, sqlite3_int64 db_version, int seq) {
1850 11232 : return local_mark_insert_or_update_meta_impl(db, table, pk, pklen, col_name, 1, db_version, seq);
1851 : }
1852 :
1853 56 : int local_mark_delete_meta (sqlite3 *db, cloudsync_table_context *table, const char *pk, size_t pklen, sqlite3_int64 db_version, int seq) {
1854 56 : return local_mark_insert_or_update_meta_impl(db, table, pk, pklen, NULL, 2, db_version, seq);
1855 : }
1856 :
1857 28 : int local_drop_meta (sqlite3 *db, cloudsync_table_context *table, const char *pk, size_t pklen) {
1858 28 : sqlite3_stmt *vm = table->meta_row_drop_stmt;
1859 28 : if (!vm) return -1;
1860 :
1861 28 : int rc = sqlite3_bind_blob(vm, 1, pk, (int)pklen, SQLITE_STATIC);
1862 28 : if (rc != SQLITE_OK) goto cleanup;
1863 :
1864 28 : rc = sqlite3_step(vm);
1865 28 : if (rc == SQLITE_DONE) rc = SQLITE_OK;
1866 :
1867 : cleanup:
1868 28 : DEBUG_SQLITE_ERROR(rc, "local_drop_meta", db);
1869 28 : sqlite3_reset(vm);
1870 28 : return rc;
1871 28 : }
1872 :
1873 28 : int local_update_move_meta (sqlite3 *db, cloudsync_table_context *table, const char *pk, size_t pklen, const char *pk2, size_t pklen2, sqlite3_int64 db_version) {
1874 : /*
1875 : * This function moves non-sentinel metadata entries from an old primary key (OLD.pk)
1876 : * to a new primary key (NEW.pk) when a primary key change occurs.
1877 : *
1878 : * To ensure consistency and proper conflict resolution in a CRDT (Conflict-free Replicated Data Type) system,
1879 : * each non-sentinel metadata entry involved in the move must have a unique sequence value (seq).
1880 : *
1881 : * The `seq` is crucial for tracking the order of operations and for detecting and resolving conflicts
1882 : * during synchronization between replicas. Without a unique `seq` for each entry, concurrent updates
1883 : * may be applied incorrectly, leading to data inconsistency.
1884 : *
1885 : * When performing the update, a unique `seq` must be assigned to each metadata row. This can be achieved
1886 : * by either incrementing the maximum sequence value in the table or using a function (e.g., `bump_seq(data)`)
1887 : * that generates a unique sequence for each row. The update query should ensure that each row moved
1888 : * from OLD.pk to NEW.pk gets a distinct `seq` to maintain proper versioning and ordering of changes.
1889 : */
1890 :
1891 : // see https://github.com/sqliteai/sqlite-sync/blob/main/docs/PriKey.md for more details
1892 : // pk2 is the old pk
1893 :
1894 28 : sqlite3_stmt *vm = table->meta_update_move_stmt;
1895 28 : if (!vm) return -1;
1896 :
1897 : // new primary key
1898 28 : int rc = sqlite3_bind_blob(vm, 1, pk, (int)pklen, SQLITE_STATIC);
1899 28 : if (rc != SQLITE_OK) goto cleanup;
1900 :
1901 : // new db_version
1902 28 : rc = sqlite3_bind_int64(vm, 2, db_version);
1903 28 : if (rc != SQLITE_OK) goto cleanup;
1904 :
1905 : // old primary key
1906 28 : rc = sqlite3_bind_blob(vm, 3, pk2, (int)pklen2, SQLITE_STATIC);
1907 28 : if (rc != SQLITE_OK) goto cleanup;
1908 :
1909 28 : rc = sqlite3_step(vm);
1910 28 : if (rc == SQLITE_DONE) rc = SQLITE_OK;
1911 :
1912 : cleanup:
1913 28 : DEBUG_SQLITE_ERROR(rc, "local_update_move_meta", db);
1914 28 : sqlite3_reset(vm);
1915 28 : return rc;
1916 28 : }
1917 :
1918 : // MARK: - Payload Encode / Decode -
1919 :
1920 617 : bool cloudsync_buffer_free (cloudsync_data_payload *payload) {
1921 617 : if (payload) {
1922 617 : if (payload->buffer) cloudsync_memory_free(payload->buffer);
1923 617 : memset(payload, 0, sizeof(cloudsync_data_payload));
1924 617 : }
1925 :
1926 617 : return false;
1927 : }
1928 :
1929 48750 : bool cloudsync_buffer_check (cloudsync_data_payload *payload, size_t needed) {
1930 48750 : if (payload->nrows == 0) needed += sizeof(cloudsync_payload_header);
1931 :
1932 : // alloc/resize buffer
1933 48750 : if (payload->bused + needed > payload->balloc) {
1934 626 : if (needed < CLOUDSYNC_PAYLOAD_MINBUF_SIZE) needed = CLOUDSYNC_PAYLOAD_MINBUF_SIZE;
1935 626 : size_t balloc = payload->balloc + needed;
1936 :
1937 626 : char *buffer = cloudsync_memory_realloc(payload->buffer, balloc);
1938 626 : if (!buffer) return cloudsync_buffer_free(payload);
1939 :
1940 626 : payload->buffer = buffer;
1941 626 : payload->balloc = balloc;
1942 626 : if (payload->nrows == 0) payload->bused = sizeof(cloudsync_payload_header);
1943 626 : }
1944 :
1945 48750 : return true;
1946 48750 : }
1947 :
1948 617 : void cloudsync_payload_header_init (cloudsync_payload_header *header, uint32_t expanded_size, uint16_t ncols, uint32_t nrows, uint64_t hash) {
1949 617 : memset(header, 0, sizeof(cloudsync_payload_header));
1950 : assert(sizeof(cloudsync_payload_header)==32);
1951 :
1952 : int major, minor, patch;
1953 617 : sscanf(CLOUDSYNC_VERSION, "%d.%d.%d", &major, &minor, &patch);
1954 :
1955 617 : header->signature = htonl(CLOUDSYNC_PAYLOAD_SIGNATURE);
1956 617 : header->version = CLOUDSYNC_PAYLOAD_VERSION;
1957 617 : header->libversion[0] = major;
1958 617 : header->libversion[1] = minor;
1959 617 : header->libversion[2] = patch;
1960 617 : header->expanded_size = htonl(expanded_size);
1961 617 : header->ncols = htons(ncols);
1962 617 : header->nrows = htonl(nrows);
1963 617 : header->schema_hash = htonll(hash);
1964 617 : }
1965 :
1966 48750 : void cloudsync_payload_encode_step (sqlite3_context *context, int argc, sqlite3_value **argv) {
1967 : DEBUG_FUNCTION("cloudsync_payload_encode_step");
1968 : // debug_values(argc, argv);
1969 :
1970 : // allocate/get the session context
1971 48750 : cloudsync_data_payload *payload = (cloudsync_data_payload *)sqlite3_aggregate_context(context, sizeof(cloudsync_data_payload));
1972 48750 : if (!payload) return;
1973 :
1974 : // check if the step function is called for the first time
1975 48750 : if (payload->nrows == 0) payload->ncols = argc;
1976 :
1977 48750 : size_t breq = pk_encode_size(argv, argc, 0);
1978 48750 : if (cloudsync_buffer_check(payload, breq) == false) return;
1979 :
1980 48750 : char *buffer = payload->buffer + payload->bused;
1981 48750 : char *ptr = pk_encode(argv, argc, buffer, false, NULL);
1982 48750 : assert(buffer == ptr);
1983 :
1984 : // update buffer
1985 48750 : payload->bused += breq;
1986 :
1987 : // increment row counter
1988 48750 : ++payload->nrows;
1989 48750 : }
1990 :
1991 625 : void cloudsync_payload_encode_final (sqlite3_context *context) {
1992 : DEBUG_FUNCTION("cloudsync_payload_encode_final");
1993 :
1994 : // get the session context
1995 625 : cloudsync_data_payload *payload = (cloudsync_data_payload *)sqlite3_aggregate_context(context, sizeof(cloudsync_data_payload));
1996 625 : if (!payload) return;
1997 :
1998 625 : if (payload->nrows == 0) {
1999 8 : sqlite3_result_null(context);
2000 8 : return;
2001 : }
2002 :
2003 : // encode payload
2004 617 : int header_size = (int)sizeof(cloudsync_payload_header);
2005 617 : int real_buffer_size = (int)(payload->bused - header_size);
2006 617 : int zbound = LZ4_compressBound(real_buffer_size);
2007 617 : char *buffer = cloudsync_memory_alloc(zbound + header_size);
2008 617 : if (!buffer) {
2009 0 : cloudsync_buffer_free(payload);
2010 0 : sqlite3_result_error_code(context, SQLITE_NOMEM);
2011 0 : return;
2012 : }
2013 :
2014 : // adjust buffer to compress to skip the reserved header
2015 617 : char *src_buffer = payload->buffer + sizeof(cloudsync_payload_header);
2016 617 : int zused = LZ4_compress_default(src_buffer, buffer+header_size, real_buffer_size, zbound);
2017 617 : bool use_uncompressed_buffer = (!zused || zused > real_buffer_size);
2018 617 : CHECK_FORCE_UNCOMPRESSED_BUFFER();
2019 :
2020 : // setup payload header
2021 617 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2022 : cloudsync_payload_header header;
2023 617 : cloudsync_payload_header_init(&header, (use_uncompressed_buffer) ? 0 : real_buffer_size, payload->ncols, (uint32_t)payload->nrows, data->schema_hash);
2024 :
2025 : // if compression fails or if compressed size is bigger than original buffer, then use the uncompressed buffer
2026 617 : if (use_uncompressed_buffer) {
2027 2 : cloudsync_memory_free(buffer);
2028 2 : buffer = payload->buffer;
2029 2 : zused = real_buffer_size;
2030 2 : }
2031 :
2032 : // copy header and data to SQLite BLOB
2033 617 : memcpy(buffer, &header, sizeof(cloudsync_payload_header));
2034 617 : int blob_size = zused+sizeof(cloudsync_payload_header);
2035 617 : sqlite3_result_blob(context, buffer, blob_size, SQLITE_TRANSIENT);
2036 :
2037 : // cleanup memory
2038 617 : cloudsync_buffer_free(payload);
2039 617 : if (!use_uncompressed_buffer) cloudsync_memory_free(buffer);
2040 625 : }
2041 :
2042 611 : cloudsync_payload_apply_callback_t cloudsync_get_payload_apply_callback(sqlite3 *db) {
2043 611 : return (sqlite3_libversion_number() >= 3044000) ? sqlite3_get_clientdata(db, CLOUDSYNC_PAYLOAD_APPLY_CALLBACK_KEY) : NULL;
2044 : }
2045 :
2046 22 : void cloudsync_set_payload_apply_callback(sqlite3 *db, cloudsync_payload_apply_callback_t callback) {
2047 22 : if (sqlite3_libversion_number() >= 3044000) {
2048 22 : sqlite3_set_clientdata(db, CLOUDSYNC_PAYLOAD_APPLY_CALLBACK_KEY, (void*)callback, NULL);
2049 22 : }
2050 22 : }
2051 :
2052 438030 : int cloudsync_pk_decode_bind_callback (void *xdata, int index, int type, int64_t ival, double dval, char *pval) {
2053 438030 : cloudsync_pk_decode_bind_context *decode_context = (cloudsync_pk_decode_bind_context*)xdata;
2054 438030 : int rc = pk_decode_bind_callback(decode_context->vm, index, type, ival, dval, pval);
2055 :
2056 438030 : if (rc == SQLITE_OK) {
2057 : // the dbversion index is smaller than seq index, so it is processed first
2058 : // when processing the dbversion column: save the value to the tmp_dbversion field
2059 : // when processing the seq column: update the dbversion and seq fields only if the current dbversion is greater than the last max value
2060 438030 : switch (index) {
2061 : case CLOUDSYNC_PK_INDEX_TBL:
2062 48670 : if (type == SQLITE_TEXT) {
2063 48670 : decode_context->tbl = pval;
2064 48670 : decode_context->tbl_len = ival;
2065 48670 : }
2066 48670 : break;
2067 : case CLOUDSYNC_PK_INDEX_PK:
2068 48670 : if (type == SQLITE_BLOB) {
2069 48670 : decode_context->pk = pval;
2070 48670 : decode_context->pk_len = ival;
2071 48670 : }
2072 48670 : break;
2073 : case CLOUDSYNC_PK_INDEX_COLNAME:
2074 48670 : if (type == SQLITE_TEXT) {
2075 48670 : decode_context->col_name = pval;
2076 48670 : decode_context->col_name_len = ival;
2077 48670 : }
2078 48670 : break;
2079 : case CLOUDSYNC_PK_INDEX_COLVERSION:
2080 48670 : if (type == SQLITE_INTEGER) decode_context->col_version = ival;
2081 48670 : break;
2082 : case CLOUDSYNC_PK_INDEX_DBVERSION:
2083 48670 : if (type == SQLITE_INTEGER) decode_context->db_version = ival;
2084 48670 : break;
2085 : case CLOUDSYNC_PK_INDEX_SITEID:
2086 48670 : if (type == SQLITE_BLOB) {
2087 48670 : decode_context->site_id = pval;
2088 48670 : decode_context->site_id_len = ival;
2089 48670 : }
2090 48670 : break;
2091 : case CLOUDSYNC_PK_INDEX_CL:
2092 48670 : if (type == SQLITE_INTEGER) decode_context->cl = ival;
2093 48670 : break;
2094 : case CLOUDSYNC_PK_INDEX_SEQ:
2095 48670 : if (type == SQLITE_INTEGER) decode_context->seq = ival;
2096 48670 : break;
2097 : }
2098 438030 : }
2099 :
2100 438030 : return rc;
2101 : }
2102 :
2103 : // #ifndef CLOUDSYNC_OMIT_RLS_VALIDATION
2104 :
2105 613 : int cloudsync_payload_apply (sqlite3_context *context, const char *payload, int blen) {
2106 : // decode header
2107 : cloudsync_payload_header header;
2108 613 : memcpy(&header, payload, sizeof(cloudsync_payload_header));
2109 :
2110 613 : header.signature = ntohl(header.signature);
2111 613 : header.expanded_size = ntohl(header.expanded_size);
2112 613 : header.ncols = ntohs(header.ncols);
2113 613 : header.nrows = ntohl(header.nrows);
2114 613 : header.schema_hash = ntohll(header.schema_hash);
2115 :
2116 613 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2117 613 : if (!data || header.schema_hash != data->schema_hash) {
2118 3 : sqlite3 *db = sqlite3_context_db_handle(context);
2119 3 : if (!dbutils_check_schema_hash(db, header.schema_hash)) {
2120 2 : dbutils_context_result_error(context, "Cannot apply the received payload because the schema hash is unknown %llu.", header.schema_hash);
2121 2 : sqlite3_result_error_code(context, SQLITE_MISMATCH);
2122 2 : return -1;
2123 : }
2124 1 : }
2125 :
2126 : // sanity check header
2127 611 : if ((header.signature != CLOUDSYNC_PAYLOAD_SIGNATURE) || (header.ncols == 0)) {
2128 0 : dbutils_context_result_error(context, "Error on cloudsync_payload_apply: invalid signature or column size.");
2129 0 : sqlite3_result_error_code(context, SQLITE_MISUSE);
2130 0 : return -1;
2131 : }
2132 :
2133 611 : const char *buffer = payload + sizeof(cloudsync_payload_header);
2134 611 : blen -= sizeof(cloudsync_payload_header);
2135 :
2136 : // check if payload is compressed
2137 611 : char *clone = NULL;
2138 611 : if (header.expanded_size != 0) {
2139 609 : clone = (char *)cloudsync_memory_alloc(header.expanded_size);
2140 609 : if (!clone) {sqlite3_result_error_code(context, SQLITE_NOMEM); return -1;}
2141 :
2142 609 : uint32_t rc = LZ4_decompress_safe(buffer, clone, blen, header.expanded_size);
2143 609 : if (rc <= 0 || rc != header.expanded_size) {
2144 0 : dbutils_context_result_error(context, "Error on cloudsync_payload_apply: unable to decompress BLOB (%d).", rc);
2145 0 : sqlite3_result_error_code(context, SQLITE_MISUSE);
2146 0 : return -1;
2147 : }
2148 :
2149 609 : buffer = (const char *)clone;
2150 609 : }
2151 :
2152 611 : sqlite3 *db = sqlite3_context_db_handle(context);
2153 :
2154 : // precompile the insert statement
2155 611 : sqlite3_stmt *vm = NULL;
2156 611 : const char *sql = "INSERT INTO cloudsync_changes(tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq) VALUES (?,?,?,?,?,?,?,?,?);";
2157 611 : int rc = sqlite3_prepare(db, sql, -1, &vm, NULL);
2158 611 : if (rc != SQLITE_OK) {
2159 0 : dbutils_context_result_error(context, "Error on cloudsync_payload_apply: error while compiling SQL statement (%s).", sqlite3_errmsg(db));
2160 0 : if (clone) cloudsync_memory_free(clone);
2161 0 : return -1;
2162 : }
2163 :
2164 : // process buffer, one row at a time
2165 611 : uint16_t ncols = header.ncols;
2166 611 : uint32_t nrows = header.nrows;
2167 611 : int64_t last_payload_db_version = -1;
2168 611 : bool in_savepoint = false;
2169 611 : int dbversion = dbutils_settings_get_int_value(db, CLOUDSYNC_KEY_CHECK_DBVERSION);
2170 611 : int seq = dbutils_settings_get_int_value(db, CLOUDSYNC_KEY_CHECK_SEQ);
2171 611 : cloudsync_pk_decode_bind_context decoded_context = {.vm = vm};
2172 611 : void *payload_apply_xdata = NULL;
2173 611 : cloudsync_payload_apply_callback_t payload_apply_callback = cloudsync_get_payload_apply_callback(db);
2174 :
2175 49281 : for (uint32_t i=0; i<nrows; ++i) {
2176 48670 : size_t seek = 0;
2177 48670 : pk_decode((char *)buffer, blen, ncols, &seek, cloudsync_pk_decode_bind_callback, &decoded_context);
2178 : // n is the pk_decode return value, I don't think I should assert here because in any case the next sqlite3_step would fail
2179 : // assert(n == ncols);
2180 :
2181 48670 : bool approved = true;
2182 48670 : if (payload_apply_callback) approved = payload_apply_callback(&payload_apply_xdata, &decoded_context, db, data, CLOUDSYNC_PAYLOAD_APPLY_WILL_APPLY, SQLITE_OK);
2183 :
2184 : // Apply consecutive rows with the same db_version inside a transaction if no
2185 : // transaction has already been opened.
2186 : // The user may have already opened a transaction before applying the payload,
2187 : // and the `payload_apply_callback` may have already opened a savepoint.
2188 : // Nested savepoints work, but overlapping savepoints could alter the expected behavior.
2189 : // This savepoint ensures that the db_version value remains consistent for all
2190 : // rows with the same original db_version in the payload.
2191 :
2192 48670 : bool db_version_changed = (last_payload_db_version != decoded_context.db_version);
2193 :
2194 : // Release existing savepoint if db_version changed
2195 48670 : if (in_savepoint && db_version_changed) {
2196 1533 : rc = sqlite3_exec(db, "RELEASE cloudsync_payload_apply;", NULL, NULL, NULL);
2197 1533 : if (rc != SQLITE_OK) {
2198 0 : dbutils_context_result_error(context, "Error on cloudsync_payload_apply: unable to release a savepoint (%s).", sqlite3_errmsg(db));
2199 0 : if (clone) cloudsync_memory_free(clone);
2200 0 : return -1;
2201 : }
2202 1533 : in_savepoint = false;
2203 1533 : }
2204 :
2205 : // Start new savepoint if needed
2206 48670 : bool in_transaction = sqlite3_get_autocommit(db) != true;
2207 48670 : if (!in_transaction && db_version_changed) {
2208 1676 : rc = sqlite3_exec(db, "SAVEPOINT cloudsync_payload_apply;", NULL, NULL, NULL);
2209 1676 : if (rc != SQLITE_OK) {
2210 0 : dbutils_context_result_error(context, "Error on cloudsync_payload_apply: unable to start a transaction (%s).", sqlite3_errmsg(db));
2211 0 : if (clone) cloudsync_memory_free(clone);
2212 0 : return -1;
2213 : }
2214 1676 : last_payload_db_version = decoded_context.db_version;
2215 1676 : in_savepoint = true;
2216 1676 : }
2217 :
2218 48670 : if (approved) {
2219 48549 : rc = sqlite3_step(vm);
2220 48549 : if (rc != SQLITE_DONE) {
2221 : // don't "break;", the error can be due to a RLS policy.
2222 : // in case of error we try to apply the following changes
2223 0 : printf("cloudsync_payload_apply error on db_version %lld/%lld: (%d) %s\n", decoded_context.db_version, decoded_context.seq, rc, sqlite3_errmsg(db));
2224 0 : }
2225 48549 : }
2226 :
2227 48670 : if (payload_apply_callback) payload_apply_callback(&payload_apply_xdata, &decoded_context, db, data, CLOUDSYNC_PAYLOAD_APPLY_DID_APPLY, rc);
2228 :
2229 48670 : buffer += seek;
2230 48670 : blen -= seek;
2231 48670 : stmt_reset(vm);
2232 48670 : }
2233 :
2234 611 : if (in_savepoint) {
2235 143 : sql = "RELEASE cloudsync_payload_apply;";
2236 143 : int rc1 = sqlite3_exec(db, sql, NULL, NULL, NULL);
2237 143 : if (rc1 != SQLITE_OK) rc = rc1;
2238 143 : }
2239 :
2240 611 : char *lasterr = (rc != SQLITE_OK && rc != SQLITE_DONE) ? cloudsync_string_dup(sqlite3_errmsg(db), false) : NULL;
2241 :
2242 611 : if (payload_apply_callback) {
2243 468 : payload_apply_callback(&payload_apply_xdata, &decoded_context, db, data, CLOUDSYNC_PAYLOAD_APPLY_CLEANUP, rc);
2244 468 : }
2245 :
2246 611 : if (rc == SQLITE_DONE) rc = SQLITE_OK;
2247 611 : if (rc == SQLITE_OK) {
2248 : char buf[256];
2249 611 : if (decoded_context.db_version >= dbversion) {
2250 479 : snprintf(buf, sizeof(buf), "%lld", decoded_context.db_version);
2251 479 : dbutils_settings_set_key_value(db, context, CLOUDSYNC_KEY_CHECK_DBVERSION, buf);
2252 :
2253 479 : if (decoded_context.seq != seq) {
2254 304 : snprintf(buf, sizeof(buf), "%lld", decoded_context.seq);
2255 304 : dbutils_settings_set_key_value(db, context, CLOUDSYNC_KEY_CHECK_SEQ, buf);
2256 304 : }
2257 479 : }
2258 611 : }
2259 :
2260 : // cleanup vm
2261 611 : if (vm) sqlite3_finalize(vm);
2262 :
2263 : // cleanup memory
2264 611 : if (clone) cloudsync_memory_free(clone);
2265 :
2266 611 : if (rc != SQLITE_OK) {
2267 0 : sqlite3_result_error(context, lasterr, -1);
2268 0 : sqlite3_result_error_code(context, SQLITE_MISUSE);
2269 0 : cloudsync_memory_free(lasterr);
2270 0 : return -1;
2271 : }
2272 :
2273 : // return the number of processed rows
2274 611 : sqlite3_result_int(context, nrows);
2275 611 : return nrows;
2276 613 : }
2277 :
2278 613 : void cloudsync_payload_decode (sqlite3_context *context, int argc, sqlite3_value **argv) {
2279 : DEBUG_FUNCTION("cloudsync_payload_decode");
2280 : //debug_values(argc, argv);
2281 :
2282 : // sanity check payload type
2283 613 : if (sqlite3_value_type(argv[0]) != SQLITE_BLOB) {
2284 0 : dbutils_context_result_error(context, "Error on cloudsync_payload_decode: value must be a BLOB.");
2285 0 : sqlite3_result_error_code(context, SQLITE_MISUSE);
2286 0 : return;
2287 : }
2288 :
2289 : // sanity check payload size
2290 613 : int blen = sqlite3_value_bytes(argv[0]);
2291 613 : if (blen < (int)sizeof(cloudsync_payload_header)) {
2292 0 : dbutils_context_result_error(context, "Error on cloudsync_payload_decode: invalid input size.");
2293 0 : sqlite3_result_error_code(context, SQLITE_MISUSE);
2294 0 : return;
2295 : }
2296 :
2297 : // obtain payload
2298 613 : const char *payload = (const char *)sqlite3_value_blob(argv[0]);
2299 :
2300 : // apply changes
2301 613 : cloudsync_payload_apply(context, payload, blen);
2302 613 : }
2303 :
2304 : // MARK: - Payload load/store -
2305 :
2306 0 : int cloudsync_payload_get (sqlite3_context *context, char **blob, int *blob_size, int *db_version, int *seq, sqlite3_int64 *new_db_version, sqlite3_int64 *new_seq) {
2307 0 : sqlite3 *db = sqlite3_context_db_handle(context);
2308 :
2309 0 : *db_version = dbutils_settings_get_int_value(db, CLOUDSYNC_KEY_SEND_DBVERSION);
2310 0 : if (*db_version < 0) {sqlite3_result_error(context, "Unable to retrieve db_version.", -1); return SQLITE_ERROR;}
2311 :
2312 0 : *seq = dbutils_settings_get_int_value(db, CLOUDSYNC_KEY_SEND_SEQ);
2313 0 : if (*seq < 0) {sqlite3_result_error(context, "Unable to retrieve seq.", -1); return SQLITE_ERROR;}
2314 :
2315 : // retrieve BLOB
2316 : char sql[1024];
2317 0 : snprintf(sql, sizeof(sql), "WITH max_db_version AS (SELECT MAX(db_version) AS max_db_version FROM cloudsync_changes) "
2318 : "SELECT cloudsync_payload_encode(tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq), max_db_version AS max_db_version, MAX(IIF(db_version = max_db_version, seq, NULL)) FROM cloudsync_changes, max_db_version WHERE site_id=cloudsync_siteid() AND (db_version>%d OR (db_version=%d AND seq>%d))", *db_version, *db_version, *seq);
2319 :
2320 0 : int rc = dbutils_blob_int_int_select(db, sql, blob, blob_size, new_db_version, new_seq);
2321 0 : if (rc != SQLITE_OK) {
2322 0 : sqlite3_result_error(context, "cloudsync_network_send_changes unable to get changes", -1);
2323 0 : sqlite3_result_error_code(context, rc);
2324 0 : return rc;
2325 : }
2326 :
2327 : // exit if there is no data to send
2328 0 : if (blob == NULL || blob_size == 0) return SQLITE_OK;
2329 0 : return rc;
2330 0 : }
2331 :
2332 : #ifdef CLOUDSYNC_DESKTOP_OS
2333 :
2334 0 : void cloudsync_payload_save (sqlite3_context *context, int argc, sqlite3_value **argv) {
2335 : DEBUG_FUNCTION("cloudsync_payload_save");
2336 :
2337 : // sanity check argument
2338 0 : if (sqlite3_value_type(argv[0]) != SQLITE_TEXT) {
2339 0 : sqlite3_result_error(context, "Unable to retrieve file path.", -1);
2340 0 : return;
2341 : }
2342 :
2343 : // retrieve full path to file
2344 0 : const char *path = (const char *)sqlite3_value_text(argv[0]);
2345 0 : cloudsync_file_delete(path);
2346 :
2347 : // retrieve payload
2348 0 : char *blob = NULL;
2349 0 : int blob_size = 0, db_version = 0, seq = 0;
2350 0 : sqlite3_int64 new_db_version = 0, new_seq = 0;
2351 0 : int rc = cloudsync_payload_get(context, &blob, &blob_size, &db_version, &seq, &new_db_version, &new_seq);
2352 0 : if (rc != SQLITE_OK) return;
2353 :
2354 : // exit if there is no data to send
2355 0 : if (blob == NULL || blob_size == 0) return;
2356 :
2357 : // write payload to file
2358 0 : bool res = cloudsync_file_write(path, blob, (size_t)blob_size);
2359 0 : sqlite3_free(blob);
2360 :
2361 0 : if (res == false) {
2362 0 : sqlite3_result_error(context, "Unable to write payload to file path.", -1);
2363 0 : return;
2364 : }
2365 :
2366 : // update db_version and seq
2367 : char buf[256];
2368 0 : sqlite3 *db = sqlite3_context_db_handle(context);
2369 0 : if (new_db_version != db_version) {
2370 0 : snprintf(buf, sizeof(buf), "%lld", new_db_version);
2371 0 : dbutils_settings_set_key_value(db, context, CLOUDSYNC_KEY_SEND_DBVERSION, buf);
2372 0 : }
2373 0 : if (new_seq != seq) {
2374 0 : snprintf(buf, sizeof(buf), "%lld", new_seq);
2375 0 : dbutils_settings_set_key_value(db, context, CLOUDSYNC_KEY_SEND_SEQ, buf);
2376 0 : }
2377 :
2378 : // returns blob size
2379 0 : sqlite3_result_int64(context, (sqlite3_int64)blob_size);
2380 0 : }
2381 :
2382 0 : void cloudsync_payload_load (sqlite3_context *context, int argc, sqlite3_value **argv) {
2383 : DEBUG_FUNCTION("cloudsync_payload_load");
2384 :
2385 : // sanity check argument
2386 0 : if (sqlite3_value_type(argv[0]) != SQLITE_TEXT) {
2387 0 : sqlite3_result_error(context, "Unable to retrieve file path.", -1);
2388 0 : return;
2389 : }
2390 :
2391 : // retrieve full path to file
2392 0 : const char *path = (const char *)sqlite3_value_text(argv[0]);
2393 :
2394 0 : sqlite3_int64 payload_size = 0;
2395 0 : char *payload = cloudsync_file_read(path, &payload_size);
2396 0 : if (!payload) {
2397 0 : if (payload_size == -1) sqlite3_result_error(context, "Unable to read payload from file path.", -1);
2398 0 : if (payload) cloudsync_memory_free(payload);
2399 0 : return;
2400 : }
2401 :
2402 0 : int nrows = (payload_size) ? cloudsync_payload_apply (context, payload, (int)payload_size) : 0;
2403 0 : if (payload) cloudsync_memory_free(payload);
2404 :
2405 : // returns number of applied rows
2406 0 : if (nrows != -1) sqlite3_result_int(context, nrows);
2407 0 : }
2408 :
2409 : #endif
2410 :
2411 : // MARK: - Public -
2412 :
2413 7 : void cloudsync_version (sqlite3_context *context, int argc, sqlite3_value **argv) {
2414 : DEBUG_FUNCTION("cloudsync_version");
2415 7 : UNUSED_PARAMETER(argc);
2416 7 : UNUSED_PARAMETER(argv);
2417 7 : sqlite3_result_text(context, CLOUDSYNC_VERSION, -1, SQLITE_STATIC);
2418 7 : }
2419 :
2420 471 : void cloudsync_siteid (sqlite3_context *context, int argc, sqlite3_value **argv) {
2421 : DEBUG_FUNCTION("cloudsync_siteid");
2422 471 : UNUSED_PARAMETER(argc);
2423 471 : UNUSED_PARAMETER(argv);
2424 :
2425 471 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2426 471 : sqlite3_result_blob(context, data->site_id, UUID_LEN, SQLITE_STATIC);
2427 471 : }
2428 :
2429 3 : void cloudsync_db_version (sqlite3_context *context, int argc, sqlite3_value **argv) {
2430 : DEBUG_FUNCTION("cloudsync_db_version");
2431 3 : UNUSED_PARAMETER(argc);
2432 3 : UNUSED_PARAMETER(argv);
2433 :
2434 : // retrieve context
2435 3 : sqlite3 *db = sqlite3_context_db_handle(context);
2436 3 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2437 :
2438 3 : int rc = db_version_check_uptodate(db, data);
2439 3 : if (rc != SQLITE_OK) {
2440 0 : dbutils_context_result_error(context, "Unable to retrieve db_version (%s).", sqlite3_errmsg(db));
2441 0 : return;
2442 : }
2443 :
2444 3 : sqlite3_result_int64(context, data->db_version);
2445 3 : }
2446 :
2447 24626 : void cloudsync_db_version_next (sqlite3_context *context, int argc, sqlite3_value **argv) {
2448 : DEBUG_FUNCTION("cloudsync_db_version_next");
2449 :
2450 : // retrieve context
2451 24626 : sqlite3 *db = sqlite3_context_db_handle(context);
2452 24626 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2453 :
2454 24626 : sqlite3_int64 merging_version = (argc == 1) ? sqlite3_value_int64(argv[0]) : CLOUDSYNC_VALUE_NOTSET;
2455 24626 : sqlite3_int64 value = db_version_next(db, data, merging_version);
2456 24626 : if (value == -1) {
2457 0 : dbutils_context_result_error(context, "Unable to retrieve next_db_version (%s).", sqlite3_errmsg(db));
2458 0 : return;
2459 : }
2460 :
2461 24626 : sqlite3_result_int64(context, value);
2462 24626 : }
2463 :
2464 20 : void cloudsync_seq (sqlite3_context *context, int argc, sqlite3_value **argv) {
2465 : DEBUG_FUNCTION("cloudsync_seq");
2466 :
2467 : // retrieve context
2468 20 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2469 20 : sqlite3_result_int(context, BUMP_SEQ(data));
2470 20 : }
2471 :
2472 1 : void cloudsync_uuid (sqlite3_context *context, int argc, sqlite3_value **argv) {
2473 : DEBUG_FUNCTION("cloudsync_uuid");
2474 :
2475 : char value[UUID_STR_MAXLEN];
2476 1 : char *uuid = cloudsync_uuid_v7_string(value, true);
2477 1 : sqlite3_result_text(context, uuid, -1, SQLITE_TRANSIENT);
2478 1 : }
2479 :
2480 : // MARK: -
2481 :
2482 1 : void cloudsync_set (sqlite3_context *context, int argc, sqlite3_value **argv) {
2483 : DEBUG_FUNCTION("cloudsync_set");
2484 :
2485 : // sanity check parameters
2486 1 : const char *key = (const char *)sqlite3_value_text(argv[0]);
2487 1 : const char *value = (const char *)sqlite3_value_text(argv[1]);
2488 :
2489 : // silently fails
2490 1 : if (key == NULL) return;
2491 :
2492 1 : sqlite3 *db = sqlite3_context_db_handle(context);
2493 1 : dbutils_settings_set_key_value(db, context, key, value);
2494 1 : }
2495 :
2496 1 : void cloudsync_set_column (sqlite3_context *context, int argc, sqlite3_value **argv) {
2497 : DEBUG_FUNCTION("cloudsync_set_column");
2498 :
2499 1 : const char *tbl = (const char *)sqlite3_value_text(argv[0]);
2500 1 : const char *col = (const char *)sqlite3_value_text(argv[1]);
2501 1 : const char *key = (const char *)sqlite3_value_text(argv[2]);
2502 1 : const char *value = (const char *)sqlite3_value_text(argv[3]);
2503 1 : dbutils_table_settings_set_key_value(NULL, context, tbl, col, key, value);
2504 1 : }
2505 :
2506 1 : void cloudsync_set_table (sqlite3_context *context, int argc, sqlite3_value **argv) {
2507 : DEBUG_FUNCTION("cloudsync_set_table");
2508 :
2509 1 : const char *tbl = (const char *)sqlite3_value_text(argv[0]);
2510 1 : const char *key = (const char *)sqlite3_value_text(argv[1]);
2511 1 : const char *value = (const char *)sqlite3_value_text(argv[2]);
2512 1 : dbutils_table_settings_set_key_value(NULL, context, tbl, "*", key, value);
2513 1 : }
2514 :
2515 25551 : void cloudsync_is_sync (sqlite3_context *context, int argc, sqlite3_value **argv) {
2516 : DEBUG_FUNCTION("cloudsync_is_sync");
2517 :
2518 25551 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2519 25551 : if (data->insync) {
2520 21748 : sqlite3_result_int(context, 1);
2521 21748 : return;
2522 : }
2523 :
2524 3803 : const char *table_name = (const char *)sqlite3_value_text(argv[0]);
2525 3803 : cloudsync_table_context *table = table_lookup(data, table_name);
2526 3803 : sqlite3_result_int(context, (table) ? (table->enabled == 0) : 0);
2527 25551 : }
2528 :
2529 125669 : void cloudsync_col_value (sqlite3_context *context, int argc, sqlite3_value **argv) {
2530 : // DEBUG_FUNCTION("cloudsync_col_value");
2531 :
2532 : // argv[0] -> table name
2533 : // argv[1] -> column name
2534 : // argv[2] -> encoded pk
2535 :
2536 : // lookup table
2537 125669 : const char *table_name = (const char *)sqlite3_value_text(argv[0]);
2538 125669 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2539 125669 : cloudsync_table_context *table = table_lookup(data, table_name);
2540 125669 : if (!table) {
2541 0 : dbutils_context_result_error(context, "Unable to retrieve table name %s in clousdsync_colvalue.", table_name);
2542 0 : return;
2543 : }
2544 :
2545 : // retrieve column name
2546 125669 : const char *col_name = (const char *)sqlite3_value_text(argv[1]);
2547 :
2548 : // check for special tombstone value
2549 125669 : if (strcmp(col_name, CLOUDSYNC_TOMBSTONE_VALUE) == 0) {
2550 4001 : sqlite3_result_null(context);
2551 4001 : return;
2552 : }
2553 :
2554 : // extract the right col_value vm associated to the column name
2555 121668 : sqlite3_stmt *vm = table_column_lookup(table, col_name, false, NULL);
2556 121668 : if (!vm) {
2557 0 : sqlite3_result_error(context, "Unable to retrieve column value precompiled statement in clousdsync_colvalue.", -1);
2558 0 : return;
2559 : }
2560 :
2561 : // bind primary key values
2562 121668 : int rc = pk_decode_prikey((char *)sqlite3_value_blob(argv[2]), (size_t)sqlite3_value_bytes(argv[2]), pk_decode_bind_callback, (void *)vm);
2563 121668 : if (rc < 0) goto cleanup;
2564 :
2565 : // execute vm
2566 121668 : rc = sqlite3_step(vm);
2567 243336 : if (rc == SQLITE_DONE) {
2568 0 : rc = SQLITE_OK;
2569 0 : sqlite3_result_text(context, CLOUDSYNC_RLS_RESTRICTED_VALUE, -1, SQLITE_STATIC);
2570 121668 : } else if (rc == SQLITE_ROW) {
2571 : // store value result
2572 121668 : rc = SQLITE_OK;
2573 121668 : sqlite3_result_value(context, sqlite3_column_value(vm, 0));
2574 121668 : }
2575 :
2576 : cleanup:
2577 121668 : if (rc != SQLITE_OK) {
2578 0 : sqlite3 *db = sqlite3_context_db_handle(context);
2579 0 : sqlite3_result_error(context, sqlite3_errmsg(db), -1);
2580 0 : }
2581 121668 : sqlite3_reset(vm);
2582 125669 : }
2583 :
2584 10365 : void cloudsync_pk_encode (sqlite3_context *context, int argc, sqlite3_value **argv) {
2585 10365 : size_t bsize = 0;
2586 10365 : char *buffer = pk_encode_prikey(argv, argc, NULL, &bsize);
2587 10365 : if (!buffer) {
2588 0 : sqlite3_result_null(context);
2589 0 : return;
2590 : }
2591 10365 : sqlite3_result_blob(context, (const void *)buffer, (int)bsize, SQLITE_TRANSIENT);
2592 10365 : cloudsync_memory_free(buffer);
2593 10365 : }
2594 :
2595 380 : int cloudsync_pk_decode_set_result_callback (void *xdata, int index, int type, int64_t ival, double dval, char *pval) {
2596 380 : cloudsync_pk_decode_context *decode_context = (cloudsync_pk_decode_context *)xdata;
2597 : // decode_context->index is 1 based
2598 : // index is 0 based
2599 380 : if (decode_context->index != index+1) return SQLITE_OK;
2600 :
2601 190 : int rc = 0;
2602 190 : sqlite3_context *context = decode_context->context;
2603 190 : switch (type) {
2604 : case SQLITE_INTEGER:
2605 0 : sqlite3_result_int64(context, ival);
2606 0 : break;
2607 :
2608 : case SQLITE_FLOAT:
2609 0 : sqlite3_result_double(context, dval);
2610 0 : break;
2611 :
2612 : case SQLITE_NULL:
2613 0 : sqlite3_result_null(context);
2614 0 : break;
2615 :
2616 : case SQLITE_TEXT:
2617 190 : sqlite3_result_text(context, pval, (int)ival, SQLITE_TRANSIENT);
2618 190 : break;
2619 :
2620 : case SQLITE_BLOB:
2621 0 : sqlite3_result_blob(context, pval, (int)ival, SQLITE_TRANSIENT);
2622 0 : break;
2623 : }
2624 :
2625 190 : return rc;
2626 380 : }
2627 :
2628 :
2629 190 : void cloudsync_pk_decode (sqlite3_context *context, int argc, sqlite3_value **argv) {
2630 190 : const char *pk = (const char *)sqlite3_value_text(argv[0]);
2631 190 : int i = sqlite3_value_int(argv[1]);
2632 :
2633 190 : cloudsync_pk_decode_context xdata = {.context = context, .index = i};
2634 190 : pk_decode_prikey((char *)pk, strlen(pk), cloudsync_pk_decode_set_result_callback, &xdata);
2635 190 : }
2636 :
2637 : // MARK: -
2638 :
2639 3468 : void cloudsync_insert (sqlite3_context *context, int argc, sqlite3_value **argv) {
2640 : DEBUG_FUNCTION("cloudsync_insert %s", sqlite3_value_text(argv[0]));
2641 : // debug_values(argc-1, &argv[1]);
2642 :
2643 : // argv[0] is table name
2644 : // argv[1]..[N] is primary key(s)
2645 :
2646 : // table_cloudsync
2647 : // pk -> encode(argc-1, &argv[1])
2648 : // col_name -> name
2649 : // col_version -> 0/1 +1
2650 : // db_version -> check
2651 : // site_id 0
2652 : // seq -> sqlite_master
2653 :
2654 : // retrieve context
2655 3468 : sqlite3 *db = sqlite3_context_db_handle(context);
2656 3468 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2657 :
2658 : // lookup table
2659 3468 : const char *table_name = (const char *)sqlite3_value_text(argv[0]);
2660 3468 : cloudsync_table_context *table = table_lookup(data, table_name);
2661 3468 : if (!table) {
2662 0 : dbutils_context_result_error(context, "Unable to retrieve table name %s in cloudsync_insert.", table_name);
2663 0 : return;
2664 : }
2665 :
2666 : // encode the primary key values into a buffer
2667 : char buffer[1024];
2668 3468 : size_t pklen = sizeof(buffer);
2669 3468 : char *pk = pk_encode_prikey(&argv[1], table->npks, buffer, &pklen);
2670 3468 : if (!pk) {
2671 0 : sqlite3_result_error(context, "Not enough memory to encode the primary key(s).", -1);
2672 0 : return;
2673 : }
2674 :
2675 : // compute the next database version for tracking changes
2676 3468 : sqlite3_int64 db_version = db_version_next(db, data, CLOUDSYNC_VALUE_NOTSET);
2677 :
2678 : // check if a row with the same primary key already exists
2679 : // if so, this means the row might have been previously deleted (sentinel)
2680 3468 : bool pk_exists = (bool)stmt_count(table->meta_pkexists_stmt, pk, pklen, SQLITE_BLOB);
2681 3468 : int rc = SQLITE_OK;
2682 :
2683 3468 : if (table->ncols == 0) {
2684 : // if there are no columns other than primary keys, insert a sentinel record
2685 91 : rc = local_mark_insert_sentinel_meta(db, table, pk, pklen, db_version, BUMP_SEQ(data));
2686 91 : if (rc != SQLITE_OK) goto cleanup;
2687 3468 : } else if (pk_exists){
2688 : // if a row with the same primary key already exists, update the sentinel record
2689 1 : rc = local_update_sentinel(db, table, pk, pklen, db_version, BUMP_SEQ(data));
2690 1 : if (rc != SQLITE_OK) goto cleanup;
2691 1 : }
2692 :
2693 : // process each non-primary key column for insert or update
2694 14049 : for (int i=0; i<table->ncols; ++i) {
2695 : // mark the column as inserted or updated in the metadata
2696 10581 : rc = local_mark_insert_or_update_meta(db, table, pk, pklen, table->col_name[i], db_version, BUMP_SEQ(data));
2697 10581 : if (rc != SQLITE_OK) goto cleanup;
2698 14049 : }
2699 :
2700 : cleanup:
2701 3468 : if (rc != SQLITE_OK) sqlite3_result_error(context, sqlite3_errmsg(db), -1);
2702 : // free memory if the primary key was dynamically allocated
2703 3468 : if (pk != buffer) cloudsync_memory_free(pk);
2704 3468 : }
2705 :
2706 28 : void cloudsync_delete (sqlite3_context *context, int argc, sqlite3_value **argv) {
2707 : DEBUG_FUNCTION("cloudsync_delete %s", sqlite3_value_text(argv[0]));
2708 : // debug_values(argc-1, &argv[1]);
2709 :
2710 : // retrieve context
2711 28 : sqlite3 *db = sqlite3_context_db_handle(context);
2712 28 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2713 :
2714 : // lookup table
2715 28 : const char *table_name = (const char *)sqlite3_value_text(argv[0]);
2716 28 : cloudsync_table_context *table = table_lookup(data, table_name);
2717 28 : if (!table) {
2718 0 : dbutils_context_result_error(context, "Unable to retrieve table name %s in cloudsync_delete.", table_name);
2719 0 : return;
2720 : }
2721 :
2722 : // compute the next database version for tracking changes
2723 28 : sqlite3_int64 db_version = db_version_next(db, data, CLOUDSYNC_VALUE_NOTSET);
2724 28 : int rc = SQLITE_OK;
2725 :
2726 : // encode the primary key values into a buffer
2727 : char buffer[1024];
2728 28 : size_t pklen = sizeof(buffer);
2729 28 : char *pk = pk_encode_prikey(&argv[1], table->npks, buffer, &pklen);
2730 28 : if (!pk) {
2731 0 : sqlite3_result_error(context, "Not enough memory to encode the primary key(s).", -1);
2732 0 : return;
2733 : }
2734 :
2735 : // mark the row as deleted by inserting a delete sentinel into the metadata
2736 28 : rc = local_mark_delete_meta(db, table, pk, pklen, db_version, BUMP_SEQ(data));
2737 28 : if (rc != SQLITE_OK) goto cleanup;
2738 :
2739 : // remove any metadata related to the old rows associated with this primary key
2740 28 : rc = local_drop_meta(db, table, pk, pklen);
2741 28 : if (rc != SQLITE_OK) goto cleanup;
2742 :
2743 : cleanup:
2744 28 : if (rc != SQLITE_OK) sqlite3_result_error(context, sqlite3_errmsg(db), -1);
2745 : // free memory if the primary key was dynamically allocated
2746 28 : if (pk != buffer) cloudsync_memory_free(pk);
2747 28 : }
2748 :
2749 : // MARK: -
2750 :
2751 337 : void cloudsync_update_payload_free (cloudsync_update_payload *payload) {
2752 2303 : for (int i=0; i<payload->count; i++) {
2753 1966 : sqlite3_value_free(payload->new_values[i]);
2754 1966 : sqlite3_value_free(payload->old_values[i]);
2755 1966 : }
2756 337 : cloudsync_memory_free(payload->new_values);
2757 337 : cloudsync_memory_free(payload->old_values);
2758 337 : sqlite3_value_free(payload->table_name);
2759 337 : payload->new_values = NULL;
2760 337 : payload->old_values = NULL;
2761 337 : payload->table_name = NULL;
2762 337 : payload->count = 0;
2763 337 : payload->capacity = 0;
2764 337 : }
2765 :
2766 1966 : int cloudsync_update_payload_append (cloudsync_update_payload *payload, sqlite3_value *v1, sqlite3_value *v2, sqlite3_value *v3) {
2767 1966 : if (payload->count >= payload->capacity) {
2768 340 : int newcap = payload->capacity ? payload->capacity * 2 : 128;
2769 :
2770 340 : sqlite3_value **new_values_2 = (sqlite3_value **)cloudsync_memory_realloc(payload->new_values, newcap * sizeof(*new_values_2));
2771 340 : if (!new_values_2) return SQLITE_NOMEM;
2772 340 : payload->new_values = new_values_2;
2773 :
2774 340 : sqlite3_value **old_values_2 = (sqlite3_value **)cloudsync_memory_realloc(payload->old_values, newcap * sizeof(*old_values_2));
2775 340 : if (!old_values_2) return SQLITE_NOMEM;
2776 340 : payload->old_values = old_values_2;
2777 :
2778 340 : payload->capacity = newcap;
2779 340 : }
2780 :
2781 1966 : int index = payload->count;
2782 1966 : if (payload->table_name == NULL) payload->table_name = sqlite3_value_dup(v1);
2783 1629 : else if (dbutils_value_compare(payload->table_name, v1) != 0) return SQLITE_NOMEM;
2784 1966 : payload->new_values[index] = sqlite3_value_dup(v2);
2785 1966 : payload->old_values[index] = sqlite3_value_dup(v3);
2786 1966 : payload->count++;
2787 :
2788 : // sanity check memory allocations
2789 1966 : bool v1_can_be_null = (sqlite3_value_type(v1) == SQLITE_NULL);
2790 1966 : bool v2_can_be_null = (sqlite3_value_type(v2) == SQLITE_NULL);
2791 1966 : bool v3_can_be_null = (sqlite3_value_type(v3) == SQLITE_NULL);
2792 :
2793 1966 : if ((payload->table_name == NULL) && (!v1_can_be_null)) return SQLITE_NOMEM;
2794 1966 : if ((payload->old_values[index] == NULL) && (!v2_can_be_null)) return SQLITE_NOMEM;
2795 1966 : if ((payload->new_values[index] == NULL) && (!v3_can_be_null)) return SQLITE_NOMEM;
2796 :
2797 1966 : return SQLITE_OK;
2798 1966 : }
2799 :
2800 1966 : void cloudsync_update_step (sqlite3_context *context, int argc, sqlite3_value **argv) {
2801 : // argv[0] => table_name
2802 : // argv[1] => new_column_value
2803 : // argv[2] => old_column_value
2804 :
2805 : // allocate/get the update payload
2806 1966 : cloudsync_update_payload *payload = (cloudsync_update_payload *)sqlite3_aggregate_context(context, sizeof(cloudsync_update_payload));
2807 1966 : if (!payload) {sqlite3_result_error_nomem(context); return;}
2808 :
2809 1966 : if (cloudsync_update_payload_append(payload, argv[0], argv[1], argv[2]) != SQLITE_OK) {
2810 0 : sqlite3_result_error_nomem(context);
2811 0 : }
2812 1966 : }
2813 :
2814 337 : void cloudsync_update_final (sqlite3_context *context) {
2815 337 : cloudsync_update_payload *payload = (cloudsync_update_payload *)sqlite3_aggregate_context(context, sizeof(cloudsync_update_payload));
2816 337 : if (!payload || payload->count == 0) return;
2817 :
2818 : // retrieve context
2819 337 : sqlite3 *db = sqlite3_context_db_handle(context);
2820 337 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2821 :
2822 : // lookup table
2823 337 : const char *table_name = (const char *)sqlite3_value_text(payload->table_name);
2824 337 : cloudsync_table_context *table = table_lookup(data, table_name);
2825 337 : if (!table) {
2826 0 : dbutils_context_result_error(context, "Unable to retrieve table name %s in cloudsync_update.", table_name);
2827 0 : return;
2828 : }
2829 :
2830 : // compute the next database version for tracking changes
2831 337 : sqlite3_int64 db_version = db_version_next(db, data, CLOUDSYNC_VALUE_NOTSET);
2832 337 : int rc = SQLITE_OK;
2833 :
2834 : // Check if the primary key(s) have changed
2835 337 : bool prikey_changed = false;
2836 951 : for (int i=0; i<table->npks; ++i) {
2837 642 : if (dbutils_value_compare(payload->old_values[i], payload->new_values[i]) != 0) {
2838 28 : prikey_changed = true;
2839 28 : break;
2840 : }
2841 614 : }
2842 :
2843 : // encode the NEW primary key values into a buffer (used later for indexing)
2844 : char buffer[1024];
2845 : char buffer2[1024];
2846 337 : size_t pklen = sizeof(buffer);
2847 337 : size_t oldpklen = sizeof(buffer2);
2848 337 : char *oldpk = NULL;
2849 :
2850 337 : char *pk = pk_encode_prikey(payload->new_values, table->npks, buffer, &pklen);
2851 337 : if (!pk) {
2852 0 : sqlite3_result_error(context, "Not enough memory to encode the primary key(s).", -1);
2853 0 : return;
2854 : }
2855 :
2856 337 : if (prikey_changed) {
2857 : // if the primary key has changed, we need to handle the row differently:
2858 : // 1. mark the old row (OLD primary key) as deleted
2859 : // 2. create a new row (NEW primary key)
2860 :
2861 : // encode the OLD primary key into a buffer
2862 28 : oldpk = pk_encode_prikey(payload->old_values, table->npks, buffer2, &oldpklen);
2863 28 : if (!oldpk) {
2864 0 : if (pk != buffer) cloudsync_memory_free(pk);
2865 0 : sqlite3_result_error(context, "Not enough memory to encode the primary key(s).", -1);
2866 0 : return;
2867 : }
2868 :
2869 : // mark the rows with the old primary key as deleted in the metadata (old row handling)
2870 28 : rc = local_mark_delete_meta(db, table, oldpk, oldpklen, db_version, BUMP_SEQ(data));
2871 28 : if (rc != SQLITE_OK) goto cleanup;
2872 :
2873 : // move non-sentinel metadata entries from OLD primary key to NEW primary key
2874 : // handles the case where some metadata is retained across primary key change
2875 : // see https://github.com/sqliteai/sqlite-sync/blob/main/docs/PriKey.md for more details
2876 28 : rc = local_update_move_meta(db, table, pk, pklen, oldpk, oldpklen, db_version);
2877 28 : if (rc != SQLITE_OK) goto cleanup;
2878 :
2879 : // mark a new sentinel row with the new primary key in the metadata
2880 28 : rc = local_mark_insert_sentinel_meta(db, table, pk, pklen, db_version, BUMP_SEQ(data));
2881 28 : if (rc != SQLITE_OK) goto cleanup;
2882 :
2883 : // free memory if the OLD primary key was dynamically allocated
2884 28 : if (oldpk != buffer2) cloudsync_memory_free(oldpk);
2885 28 : oldpk = NULL;
2886 28 : }
2887 :
2888 : // compare NEW and OLD values (excluding primary keys) to handle column updates
2889 1634 : for (int i=0; i<table->ncols; i++) {
2890 1297 : int col_index = table->npks + i; // Regular columns start after primary keys
2891 :
2892 1297 : if (dbutils_value_compare(payload->old_values[col_index], payload->new_values[col_index]) != 0) {
2893 : // if a column value has changed, mark it as updated in the metadata
2894 : // columns are in cid order
2895 613 : rc = local_mark_insert_or_update_meta(db, table, pk, pklen, table->col_name[i], db_version, BUMP_SEQ(data));
2896 613 : if (rc != SQLITE_OK) goto cleanup;
2897 613 : }
2898 1634 : }
2899 :
2900 : cleanup:
2901 337 : if (rc != SQLITE_OK) sqlite3_result_error(context, sqlite3_errmsg(db), -1);
2902 337 : if (pk != buffer) cloudsync_memory_free(pk);
2903 337 : if (oldpk && (oldpk != buffer2)) cloudsync_memory_free(oldpk);
2904 :
2905 337 : cloudsync_update_payload_free(payload);
2906 337 : }
2907 :
2908 : // MARK: -
2909 :
2910 6 : int cloudsync_cleanup_internal (sqlite3_context *context, const char *table_name) {
2911 6 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2912 :
2913 : // get database reference
2914 6 : sqlite3 *db = sqlite3_context_db_handle(context);
2915 :
2916 : // init cloudsync_settings
2917 6 : if (cloudsync_context_init(db, data, context) == NULL) return SQLITE_MISUSE;
2918 :
2919 6 : cloudsync_table_context *table = table_lookup(data, table_name);
2920 6 : if (!table) return SQLITE_OK;
2921 :
2922 6 : table_remove_from_context(data, table);
2923 6 : table_free(table);
2924 :
2925 : // drop meta-table
2926 6 : char *sql = cloudsync_memory_mprintf("DROP TABLE IF EXISTS \"%w_cloudsync\";", table_name);
2927 6 : int rc = sqlite3_exec(db, sql, NULL, NULL, NULL);
2928 6 : cloudsync_memory_free(sql);
2929 6 : if (rc != SQLITE_OK) {
2930 0 : dbutils_context_result_error(context, "Unable to drop cloudsync table %s_cloudsync in cloudsync_cleanup.", table_name);
2931 0 : sqlite3_result_error_code(context, rc);
2932 0 : return rc;
2933 : }
2934 :
2935 : // drop original triggers
2936 6 : dbutils_delete_triggers(db, table_name);
2937 6 : if (rc != SQLITE_OK) {
2938 0 : dbutils_context_result_error(context, "Unable to drop cloudsync table %s_cloudsync in cloudsync_cleanup.", table_name);
2939 0 : sqlite3_result_error_code(context, rc);
2940 0 : return rc;
2941 : }
2942 :
2943 : // remove all table related settings
2944 6 : dbutils_table_settings_set_key_value(db, context, table_name, NULL, NULL, NULL);
2945 :
2946 6 : return SQLITE_OK;
2947 6 : }
2948 :
2949 1 : void cloudsync_cleanup_all (sqlite3_context *context) {
2950 1 : char *sql = "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'cloudsync_%' AND name NOT LIKE '%_cloudsync';";
2951 :
2952 1 : sqlite3 *db = sqlite3_context_db_handle(context);
2953 1 : char **result = NULL;
2954 : int nrows, ncols;
2955 : char *errmsg;
2956 1 : int rc = sqlite3_get_table(db, sql, &result, &nrows, &ncols, &errmsg);
2957 1 : if (errmsg || ncols != 1) {
2958 0 : printf("cloudsync_cleanup_all error: %s\n", errmsg ? errmsg : "invalid table");
2959 0 : goto cleanup;
2960 : }
2961 :
2962 1 : rc = SQLITE_OK;
2963 6 : for (int i = ncols; i < nrows+ncols; i+=ncols) {
2964 5 : int rc2 = cloudsync_cleanup_internal(context, result[i]);
2965 5 : if (rc2 != SQLITE_OK) rc = rc2;
2966 5 : }
2967 :
2968 2 : if (rc == SQLITE_OK) {
2969 1 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2970 1 : data->site_id[0] = 0;
2971 1 : dbutils_settings_cleanup(db);
2972 1 : }
2973 :
2974 : cleanup:
2975 1 : sqlite3_free_table(result);
2976 1 : sqlite3_free(errmsg);
2977 1 : }
2978 :
2979 2 : void cloudsync_cleanup (sqlite3_context *context, int argc, sqlite3_value **argv) {
2980 : DEBUG_FUNCTION("cloudsync_cleanup");
2981 :
2982 2 : const char *table = (const char *)sqlite3_value_text(argv[0]);
2983 2 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2984 2 : sqlite3 *db = sqlite3_context_db_handle(context);
2985 :
2986 2 : if (dbutils_is_star_table(table)) cloudsync_cleanup_all(context);
2987 1 : else cloudsync_cleanup_internal(context, table);
2988 :
2989 2 : if (dbutils_table_exists(db, CLOUDSYNC_TABLE_SETTINGS_NAME) == true) dbutils_update_schema_hash(db, &data->schema_hash);
2990 2 : }
2991 :
2992 2 : void cloudsync_enable_disable (sqlite3_context *context, const char *table_name, bool value) {
2993 : DEBUG_FUNCTION("cloudsync_enable_disable");
2994 :
2995 2 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2996 2 : cloudsync_table_context *table = table_lookup(data, table_name);
2997 2 : if (!table) return;
2998 :
2999 2 : table->enabled = value;
3000 2 : }
3001 :
3002 28 : int cloudsync_enable_disable_all_callback (void *xdata, int ncols, char **values, char **names) {
3003 28 : sqlite3_context *context = (sqlite3_context *)xdata;
3004 28 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
3005 28 : bool value = data->temp_bool;
3006 :
3007 56 : for (int i=0; i<ncols; i++) {
3008 28 : const char *table_name = values[i];
3009 28 : cloudsync_table_context *table = table_lookup(data, table_name);
3010 28 : if (!table) continue;
3011 10 : table->enabled = value;
3012 10 : }
3013 :
3014 28 : return SQLITE_OK;
3015 : }
3016 :
3017 2 : void cloudsync_enable_disable_all (sqlite3_context *context, bool value) {
3018 : DEBUG_FUNCTION("cloudsync_enable_disable_all");
3019 :
3020 2 : char *sql = "SELECT name FROM sqlite_master WHERE type='table';";
3021 :
3022 2 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
3023 2 : data->temp_bool = value;
3024 2 : sqlite3 *db = sqlite3_context_db_handle(context);
3025 2 : sqlite3_exec(db, sql, cloudsync_enable_disable_all_callback, context, NULL);
3026 2 : }
3027 :
3028 2 : void cloudsync_enable (sqlite3_context *context, int argc, sqlite3_value **argv) {
3029 : DEBUG_FUNCTION("cloudsync_enable");
3030 :
3031 2 : const char *table = (const char *)sqlite3_value_text(argv[0]);
3032 2 : if (dbutils_is_star_table(table)) cloudsync_enable_disable_all(context, true);
3033 1 : else cloudsync_enable_disable(context, table, true);
3034 2 : }
3035 :
3036 2 : void cloudsync_disable (sqlite3_context *context, int argc, sqlite3_value **argv) {
3037 : DEBUG_FUNCTION("cloudsync_disable");
3038 :
3039 2 : const char *table = (const char *)sqlite3_value_text(argv[0]);
3040 2 : if (dbutils_is_star_table(table)) cloudsync_enable_disable_all(context, false);
3041 1 : else cloudsync_enable_disable(context, table, false);
3042 2 : }
3043 :
3044 2854 : void cloudsync_is_enabled (sqlite3_context *context, int argc, sqlite3_value **argv) {
3045 : DEBUG_FUNCTION("cloudsync_is_enabled");
3046 :
3047 2854 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
3048 2854 : const char *table_name = (const char *)sqlite3_value_text(argv[0]);
3049 2854 : cloudsync_table_context *table = table_lookup(data, table_name);
3050 :
3051 2854 : int result = (table && table->enabled) ? 1 : 0;
3052 2854 : sqlite3_result_int(context, result);
3053 2854 : }
3054 :
3055 111 : void cloudsync_terminate (sqlite3_context *context, int argc, sqlite3_value **argv) {
3056 : DEBUG_FUNCTION("cloudsync_terminate");
3057 :
3058 111 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
3059 :
3060 251 : for (int i=0; i<data->tables_count; ++i) {
3061 140 : if (data->tables[i]) table_free(data->tables[i]);
3062 140 : data->tables[i] = NULL;
3063 140 : }
3064 :
3065 111 : if (data->schema_version_stmt) sqlite3_finalize(data->schema_version_stmt);
3066 111 : if (data->data_version_stmt) sqlite3_finalize(data->data_version_stmt);
3067 111 : if (data->db_version_stmt) sqlite3_finalize(data->db_version_stmt);
3068 111 : if (data->getset_siteid_stmt) sqlite3_finalize(data->getset_siteid_stmt);
3069 :
3070 111 : data->schema_version_stmt = NULL;
3071 111 : data->data_version_stmt = NULL;
3072 111 : data->db_version_stmt = NULL;
3073 111 : data->getset_siteid_stmt = NULL;
3074 :
3075 : // reset the site_id so the cloudsync_context_init will be executed again
3076 : // if any other cloudsync function is called after terminate
3077 111 : data->site_id[0] = 0;
3078 :
3079 111 : sqlite3_result_int(context, 1);
3080 111 : }
3081 :
3082 : // MARK: -
3083 :
3084 111 : int cloudsync_load_siteid (sqlite3 *db, cloudsync_context *data) {
3085 : // check if site_id was already loaded
3086 111 : if (data->site_id[0] != 0) return SQLITE_OK;
3087 :
3088 : // load site_id
3089 : int size, rc;
3090 110 : char *buffer = dbutils_blob_select(db, "SELECT site_id FROM cloudsync_site_id WHERE rowid=0;", &size, data->sqlite_ctx, &rc);
3091 110 : if (!buffer) return rc;
3092 110 : if (size != UUID_LEN) return SQLITE_MISUSE;
3093 :
3094 110 : memcpy(data->site_id, buffer, UUID_LEN);
3095 110 : cloudsync_memory_free(buffer);
3096 :
3097 110 : return SQLITE_OK;
3098 111 : }
3099 :
3100 170 : int cloudsync_init_internal (sqlite3_context *context, const char *table_name, const char *algo_name, bool skip_int_pk_check) {
3101 : DEBUG_FUNCTION("cloudsync_init_internal");
3102 :
3103 : // get database reference
3104 170 : sqlite3 *db = sqlite3_context_db_handle(context);
3105 :
3106 : // retrieve global context
3107 170 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
3108 :
3109 : // sanity check table and its primary key(s)
3110 170 : if (dbutils_table_sanity_check(db, context, table_name, skip_int_pk_check) == false) {
3111 1 : return SQLITE_MISUSE;
3112 : }
3113 :
3114 : // init cloudsync_settings
3115 169 : if (cloudsync_context_init(db, data, context) == NULL) return SQLITE_MISUSE;
3116 :
3117 : // sanity check algo name (if exists)
3118 169 : table_algo algo_new = table_algo_none;
3119 169 : if (!algo_name) {
3120 17 : algo_name = CLOUDSYNC_DEFAULT_ALGO;
3121 17 : }
3122 :
3123 169 : algo_new = crdt_algo_from_name(algo_name);
3124 169 : if (algo_new == table_algo_none) {
3125 1 : dbutils_context_result_error(context, "algo name %s does not exist", crdt_algo_name);
3126 1 : return SQLITE_MISUSE;
3127 : }
3128 :
3129 : // check if table name was already augmented
3130 168 : table_algo algo_current = dbutils_table_settings_get_algo(db, table_name);
3131 :
3132 : // sanity check algorithm
3133 168 : if ((algo_new == algo_current) && (algo_current != table_algo_none)) {
3134 : // if table algorithms and the same and not none, do nothing
3135 168 : } else if ((algo_new == table_algo_none) && (algo_current == table_algo_none)) {
3136 : // nothing is written into settings because the default table_algo_crdt_cls will be used
3137 0 : algo_new = algo_current = table_algo_crdt_cls;
3138 139 : } else if ((algo_new == table_algo_none) && (algo_current != table_algo_none)) {
3139 : // algo is already written into settins so just use it
3140 0 : algo_new = algo_current;
3141 139 : } else if ((algo_new != table_algo_none) && (algo_current == table_algo_none)) {
3142 : // write table algo name in settings
3143 139 : dbutils_table_settings_set_key_value(NULL, context, table_name, "*", "algo", algo_name);
3144 139 : } else {
3145 : // error condition
3146 0 : dbutils_context_result_error(context, "%s", "Before changing a table algorithm you must call cloudsync_cleanup(table_name)");
3147 0 : return SQLITE_MISUSE;
3148 : }
3149 :
3150 : // Run the following function even if table was already augmented.
3151 : // It is safe to call the following function multiple times, if there is nothing to update nothing will be changed.
3152 : // After an alter table, in contrast, all the cloudsync triggers, tables and stmts must be recreated.
3153 :
3154 : // sync algo with table (unused in this version)
3155 : // cloudsync_sync_table_key(data, table_name, "*", CLOUDSYNC_KEY_ALGO, crdt_algo_name(algo_new));
3156 :
3157 : // check triggers
3158 168 : int rc = dbutils_check_triggers(db, table_name, algo_new);
3159 168 : if (rc != SQLITE_OK) {
3160 0 : dbutils_context_result_error(context, "An error occurred while creating triggers: %s (%d)", sqlite3_errmsg(db), rc);
3161 0 : return SQLITE_MISUSE;
3162 : }
3163 :
3164 : // check meta-table
3165 168 : rc = dbutils_check_metatable(db, table_name, algo_new);
3166 168 : if (rc != SQLITE_OK) {
3167 0 : dbutils_context_result_error(context, "An error occurred while creating metatable: %s (%d)", sqlite3_errmsg(db), rc);
3168 0 : return SQLITE_MISUSE;
3169 : }
3170 :
3171 : // add prepared statements
3172 168 : if (stmts_add_tocontext(db, data) != SQLITE_OK) {
3173 0 : dbutils_context_result_error(context, "%s", "An error occurred while trying to compile prepared SQL statements.");
3174 0 : return SQLITE_MISUSE;
3175 : }
3176 :
3177 : // add table to in-memory data context
3178 168 : if (table_add_to_context(db, data, algo_new, table_name) == false) {
3179 0 : dbutils_context_result_error(context, "An error occurred while adding %s table information to global context", table_name);
3180 0 : return SQLITE_MISUSE;
3181 : }
3182 :
3183 168 : if (cloudsync_refill_metatable(db, data, table_name) != SQLITE_OK) {
3184 0 : dbutils_context_result_error(context, "%s", "An error occurred while trying to fill the augmented table.");
3185 0 : return SQLITE_MISUSE;
3186 : }
3187 :
3188 168 : return SQLITE_OK;
3189 170 : }
3190 :
3191 1 : int cloudsync_init_all (sqlite3_context *context, const char *algo_name, bool skip_int_pk_check) {
3192 : char sql[1024];
3193 1 : snprintf(sql, sizeof(sql), "SELECT name, '%s' FROM sqlite_master WHERE type='table' and name NOT LIKE 'sqlite_%%' AND name NOT LIKE 'cloudsync_%%' AND name NOT LIKE '%%_cloudsync';", (algo_name) ? algo_name : CLOUDSYNC_DEFAULT_ALGO);
3194 :
3195 1 : sqlite3 *db = sqlite3_context_db_handle(context);
3196 1 : sqlite3_stmt *vm = NULL;
3197 1 : int rc = sqlite3_prepare_v2(db, sql, -1, &vm, NULL);
3198 1 : if (rc != SQLITE_OK) goto abort_init_all;
3199 :
3200 6 : while (1) {
3201 6 : rc = sqlite3_step(vm);
3202 6 : if (rc == SQLITE_DONE) break;
3203 5 : else if (rc != SQLITE_ROW) goto abort_init_all;
3204 :
3205 5 : const char *table = (const char *)sqlite3_column_text(vm, 0);
3206 5 : const char *algo = (const char *)sqlite3_column_text(vm, 1);
3207 5 : rc = cloudsync_init_internal(context, table, algo, skip_int_pk_check);
3208 5 : if (rc != SQLITE_OK) {cloudsync_cleanup_internal(context, table); goto abort_init_all;}
3209 : }
3210 1 : rc = SQLITE_OK;
3211 :
3212 : abort_init_all:
3213 1 : if (vm) sqlite3_finalize(vm);
3214 1 : return rc;
3215 : }
3216 :
3217 143 : void cloudsync_init (sqlite3_context *context, const char *table, const char *algo, bool skip_int_pk_check) {
3218 143 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
3219 143 : data->sqlite_ctx = context;
3220 :
3221 143 : sqlite3 *db = sqlite3_context_db_handle(context);
3222 143 : int rc = sqlite3_exec(db, "SAVEPOINT cloudsync_init;", NULL, NULL, NULL);
3223 143 : if (rc != SQLITE_OK) {
3224 0 : dbutils_context_result_error(context, "Unable to create cloudsync_init savepoint. %s", sqlite3_errmsg(db));
3225 0 : sqlite3_result_error_code(context, rc);
3226 0 : return;
3227 : }
3228 :
3229 143 : if (dbutils_is_star_table(table)) rc = cloudsync_init_all(context, algo, skip_int_pk_check);
3230 142 : else rc = cloudsync_init_internal(context, table, algo, skip_int_pk_check);
3231 :
3232 143 : if (rc == SQLITE_OK) {
3233 141 : rc = sqlite3_exec(db, "RELEASE cloudsync_init", NULL, NULL, NULL);
3234 141 : if (rc != SQLITE_OK) {
3235 0 : dbutils_context_result_error(context, "Unable to release cloudsync_init savepoint. %s", sqlite3_errmsg(db));
3236 0 : sqlite3_result_error_code(context, rc);
3237 0 : }
3238 141 : }
3239 :
3240 : // in case of error, rollback transaction
3241 143 : if (rc != SQLITE_OK) {
3242 2 : sqlite3_exec(db, "ROLLBACK TO cloudsync_init; RELEASE cloudsync_init", NULL, NULL, NULL);
3243 2 : return;
3244 : }
3245 :
3246 141 : dbutils_update_schema_hash(db, &data->schema_hash);
3247 :
3248 : // returns site_id as TEXT
3249 : char buffer[UUID_STR_MAXLEN];
3250 141 : cloudsync_uuid_v7_stringify(data->site_id, buffer, false);
3251 141 : sqlite3_result_text(context, buffer, -1, NULL);
3252 143 : }
3253 :
3254 41 : void cloudsync_init3 (sqlite3_context *context, int argc, sqlite3_value **argv) {
3255 : DEBUG_FUNCTION("cloudsync_init2");
3256 :
3257 41 : const char *table = (const char *)sqlite3_value_text(argv[0]);
3258 41 : const char *algo = (const char *)sqlite3_value_text(argv[1]);
3259 41 : bool skip_int_pk_check = (bool)sqlite3_value_int(argv[2]);
3260 :
3261 41 : cloudsync_init(context, table, algo, skip_int_pk_check);
3262 41 : }
3263 :
3264 83 : void cloudsync_init2 (sqlite3_context *context, int argc, sqlite3_value **argv) {
3265 : DEBUG_FUNCTION("cloudsync_init2");
3266 :
3267 83 : const char *table = (const char *)sqlite3_value_text(argv[0]);
3268 83 : const char *algo = (const char *)sqlite3_value_text(argv[1]);
3269 :
3270 83 : cloudsync_init(context, table, algo, false);
3271 83 : }
3272 :
3273 19 : void cloudsync_init1 (sqlite3_context *context, int argc, sqlite3_value **argv) {
3274 : DEBUG_FUNCTION("cloudsync_init1");
3275 :
3276 19 : const char *table = (const char *)sqlite3_value_text(argv[0]);
3277 :
3278 19 : cloudsync_init(context, table, NULL, false);
3279 19 : }
3280 :
3281 : // MARK: -
3282 :
3283 24 : void cloudsync_begin_alter (sqlite3_context *context, int argc, sqlite3_value **argv) {
3284 : DEBUG_FUNCTION("cloudsync_begin_alter");
3285 24 : char *errmsg = NULL;
3286 24 : char **result = NULL;
3287 :
3288 24 : const char *table_name = (const char *)sqlite3_value_text(argv[0]);
3289 :
3290 : // get database reference
3291 24 : sqlite3 *db = sqlite3_context_db_handle(context);
3292 :
3293 : // retrieve global context
3294 24 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
3295 :
3296 : // init cloudsync_settings
3297 24 : if (cloudsync_context_init(db, data, context) == NULL) {
3298 0 : sqlite3_result_error(context, "Unable to init the cloudsync context.", -1);
3299 0 : sqlite3_result_error_code(context, SQLITE_MISUSE);
3300 0 : return;
3301 : }
3302 :
3303 : // create a savepoint to manage the alter operations as a transaction
3304 24 : int rc = sqlite3_exec(db, "SAVEPOINT cloudsync_alter", NULL, NULL, NULL);
3305 24 : if (rc != SQLITE_OK) {
3306 0 : sqlite3_result_error(context, "Unable to create cloudsync_alter savepoint.", -1);
3307 0 : sqlite3_result_error_code(context, rc);
3308 0 : goto rollback_begin_alter;
3309 : }
3310 :
3311 24 : cloudsync_table_context *table = table_lookup(data, table_name);
3312 24 : if (!table) {
3313 1 : dbutils_context_result_error(context, "Unable to find table %s", table_name);
3314 1 : sqlite3_result_error_code(context, SQLITE_MISUSE);
3315 1 : goto rollback_begin_alter;
3316 : }
3317 :
3318 : int nrows, ncols;
3319 23 : char *sql = cloudsync_memory_mprintf("SELECT name FROM pragma_table_info('%q') WHERE pk>0 ORDER BY pk;", table_name);
3320 23 : rc = sqlite3_get_table(db, sql, &result, &nrows, &ncols, &errmsg);
3321 23 : cloudsync_memory_free(sql);
3322 23 : if (errmsg || ncols != 1 || nrows != table->npks) {
3323 0 : dbutils_context_result_error(context, "Unable to get primary keys for table %s (%s)", table_name, errmsg);
3324 0 : sqlite3_result_error_code(context, SQLITE_MISUSE);
3325 0 : goto rollback_begin_alter;
3326 : }
3327 :
3328 : // drop original triggers
3329 23 : dbutils_delete_triggers(db, table_name);
3330 23 : if (rc != SQLITE_OK) {
3331 0 : dbutils_context_result_error(context, "Unable to delete triggers for table %s in cloudsync_begin_alter.", table_name);
3332 0 : sqlite3_result_error_code(context, rc);
3333 0 : goto rollback_begin_alter;
3334 : }
3335 :
3336 23 : if (table->pk_name) sqlite3_free_table(table->pk_name);
3337 23 : table->pk_name = result;
3338 23 : return;
3339 :
3340 : rollback_begin_alter:
3341 1 : sqlite3_exec(db, "ROLLBACK TO cloudsync_alter; RELEASE cloudsync_alter;", NULL, NULL, NULL);
3342 :
3343 : cleanup_begin_alter:
3344 1 : sqlite3_free_table(result);
3345 1 : sqlite3_free(errmsg);
3346 24 : }
3347 :
3348 24 : void cloudsync_commit_alter (sqlite3_context *context, int argc, sqlite3_value **argv) {
3349 : DEBUG_FUNCTION("cloudsync_commit_alter");
3350 :
3351 24 : const char *table_name = (const char *)sqlite3_value_text(argv[0]);
3352 24 : cloudsync_table_context *table = NULL;
3353 :
3354 : // get database reference
3355 24 : sqlite3 *db = sqlite3_context_db_handle(context);
3356 :
3357 : // retrieve global context
3358 24 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
3359 :
3360 : // init cloudsync_settings
3361 24 : if (cloudsync_context_init(db, data, context) == NULL) {
3362 0 : dbutils_context_result_error(context, "Unable to init the cloudsync context.");
3363 0 : sqlite3_result_error_code(context, SQLITE_MISUSE);
3364 0 : goto rollback_finalize_alter;
3365 : }
3366 :
3367 24 : table = table_lookup(data, table_name);
3368 24 : if (!table || !table->pk_name) {
3369 1 : dbutils_context_result_error(context, "Unable to find table context.");
3370 1 : sqlite3_result_error_code(context, SQLITE_MISUSE);
3371 1 : goto rollback_finalize_alter;
3372 : }
3373 :
3374 23 : int rc = cloudsync_finalize_alter(context, data, table);
3375 23 : if (rc != SQLITE_OK) goto rollback_finalize_alter;
3376 :
3377 : // the table is outdated, delete it and it will be reloaded in the cloudsync_init_internal
3378 23 : table_remove(data, table_name);
3379 23 : table_free(table);
3380 23 : table = NULL;
3381 :
3382 : // init again cloudsync for the table
3383 23 : table_algo algo_current = dbutils_table_settings_get_algo(db, table_name);
3384 23 : if (algo_current == table_algo_none) algo_current = dbutils_table_settings_get_algo(db, "*");
3385 23 : rc = cloudsync_init_internal(context, table_name, crdt_algo_name(algo_current), true);
3386 23 : if (rc != SQLITE_OK) goto rollback_finalize_alter;
3387 :
3388 : // release savepoint
3389 23 : rc = sqlite3_exec(db, "RELEASE cloudsync_alter", NULL, NULL, NULL);
3390 23 : if (rc != SQLITE_OK) {
3391 0 : dbutils_context_result_error(context, sqlite3_errmsg(db));
3392 0 : sqlite3_result_error_code(context, rc);
3393 0 : goto rollback_finalize_alter;
3394 : }
3395 :
3396 23 : dbutils_update_schema_hash(db, &data->schema_hash);
3397 :
3398 23 : return;
3399 :
3400 : rollback_finalize_alter:
3401 1 : sqlite3_exec(db, "ROLLBACK TO cloudsync_alter; RELEASE cloudsync_alter;", NULL, NULL, NULL);
3402 1 : if (table) {
3403 0 : sqlite3_free_table(table->pk_name);
3404 0 : table->pk_name = NULL;
3405 0 : }
3406 24 : }
3407 :
3408 : // MARK: - Main Entrypoint -
3409 :
3410 111 : int cloudsync_register (sqlite3 *db, char **pzErrMsg) {
3411 111 : int rc = SQLITE_OK;
3412 :
3413 : // there's no built-in way to verify if sqlite3_cloudsync_init has already been called
3414 : // for this specific database connection, we use a workaround: we attempt to retrieve the
3415 : // cloudsync_version and check for an error, an error indicates that initialization has not been performed
3416 111 : if (sqlite3_exec(db, "SELECT cloudsync_version();", NULL, NULL, NULL) == SQLITE_OK) return SQLITE_OK;
3417 :
3418 : // init memory debugger (NOOP in production)
3419 : cloudsync_memory_init(1);
3420 :
3421 : // init context
3422 109 : void *ctx = cloudsync_context_create();
3423 109 : if (!ctx) {
3424 0 : if (pzErrMsg) *pzErrMsg = "Not enought memory to create a database context";
3425 0 : return SQLITE_NOMEM;
3426 : }
3427 :
3428 : // register functions
3429 :
3430 : // PUBLIC functions
3431 109 : rc = dbutils_register_function(db, "cloudsync_version", cloudsync_version, 0, pzErrMsg, ctx, cloudsync_context_free);
3432 109 : if (rc != SQLITE_OK) return rc;
3433 :
3434 109 : rc = dbutils_register_function(db, "cloudsync_init", cloudsync_init1, 1, pzErrMsg, ctx, NULL);
3435 109 : if (rc != SQLITE_OK) return rc;
3436 :
3437 109 : rc = dbutils_register_function(db, "cloudsync_init", cloudsync_init2, 2, pzErrMsg, ctx, NULL);
3438 109 : if (rc != SQLITE_OK) return rc;
3439 :
3440 109 : rc = dbutils_register_function(db, "cloudsync_init", cloudsync_init3, 3, pzErrMsg, ctx, NULL);
3441 109 : if (rc != SQLITE_OK) return rc;
3442 :
3443 :
3444 109 : rc = dbutils_register_function(db, "cloudsync_enable", cloudsync_enable, 1, pzErrMsg, ctx, NULL);
3445 109 : if (rc != SQLITE_OK) return rc;
3446 :
3447 109 : rc = dbutils_register_function(db, "cloudsync_disable", cloudsync_disable, 1, pzErrMsg, ctx, NULL);
3448 109 : if (rc != SQLITE_OK) return rc;
3449 :
3450 109 : rc = dbutils_register_function(db, "cloudsync_is_enabled", cloudsync_is_enabled, 1, pzErrMsg, ctx, NULL);
3451 109 : if (rc != SQLITE_OK) return rc;
3452 :
3453 109 : rc = dbutils_register_function(db, "cloudsync_cleanup", cloudsync_cleanup, 1, pzErrMsg, ctx, NULL);
3454 109 : if (rc != SQLITE_OK) return rc;
3455 :
3456 109 : rc = dbutils_register_function(db, "cloudsync_terminate", cloudsync_terminate, 0, pzErrMsg, ctx, NULL);
3457 109 : if (rc != SQLITE_OK) return rc;
3458 :
3459 109 : rc = dbutils_register_function(db, "cloudsync_set", cloudsync_set, 2, pzErrMsg, ctx, NULL);
3460 109 : if (rc != SQLITE_OK) return rc;
3461 :
3462 109 : rc = dbutils_register_function(db, "cloudsync_set_table", cloudsync_set_table, 3, pzErrMsg, ctx, NULL);
3463 109 : if (rc != SQLITE_OK) return rc;
3464 :
3465 109 : rc = dbutils_register_function(db, "cloudsync_set_column", cloudsync_set_column, 4, pzErrMsg, ctx, NULL);
3466 109 : if (rc != SQLITE_OK) return rc;
3467 :
3468 109 : rc = dbutils_register_function(db, "cloudsync_siteid", cloudsync_siteid, 0, pzErrMsg, ctx, NULL);
3469 109 : if (rc != SQLITE_OK) return rc;
3470 :
3471 109 : rc = dbutils_register_function(db, "cloudsync_db_version", cloudsync_db_version, 0, pzErrMsg, ctx, NULL);
3472 109 : if (rc != SQLITE_OK) return rc;
3473 :
3474 109 : rc = dbutils_register_function(db, "cloudsync_db_version_next", cloudsync_db_version_next, 0, pzErrMsg, ctx, NULL);
3475 109 : if (rc != SQLITE_OK) return rc;
3476 :
3477 109 : rc = dbutils_register_function(db, "cloudsync_db_version_next", cloudsync_db_version_next, 1, pzErrMsg, ctx, NULL);
3478 109 : if (rc != SQLITE_OK) return rc;
3479 :
3480 109 : rc = dbutils_register_function(db, "cloudsync_begin_alter", cloudsync_begin_alter, 1, pzErrMsg, ctx, NULL);
3481 109 : if (rc != SQLITE_OK) return rc;
3482 :
3483 109 : rc = dbutils_register_function(db, "cloudsync_commit_alter", cloudsync_commit_alter, 1, pzErrMsg, ctx, NULL);
3484 109 : if (rc != SQLITE_OK) return rc;
3485 :
3486 109 : rc = dbutils_register_function(db, "cloudsync_uuid", cloudsync_uuid, 0, pzErrMsg, ctx, NULL);
3487 109 : if (rc != SQLITE_OK) return rc;
3488 :
3489 : // PAYLOAD
3490 109 : rc = dbutils_register_aggregate(db, "cloudsync_payload_encode", cloudsync_payload_encode_step, cloudsync_payload_encode_final, -1, pzErrMsg, ctx, NULL);
3491 109 : if (rc != SQLITE_OK) return rc;
3492 :
3493 109 : rc = dbutils_register_function(db, "cloudsync_payload_decode", cloudsync_payload_decode, -1, pzErrMsg, ctx, NULL);
3494 109 : if (rc != SQLITE_OK) return rc;
3495 :
3496 : #ifdef CLOUDSYNC_DESKTOP_OS
3497 109 : rc = dbutils_register_function(db, "cloudsync_payload_save", cloudsync_payload_save, 1, pzErrMsg, ctx, NULL);
3498 109 : if (rc != SQLITE_OK) return rc;
3499 :
3500 109 : rc = dbutils_register_function(db, "cloudsync_payload_load", cloudsync_payload_load, 1, pzErrMsg, ctx, NULL);
3501 109 : if (rc != SQLITE_OK) return rc;
3502 : #endif
3503 :
3504 : // PRIVATE functions
3505 109 : rc = dbutils_register_function(db, "cloudsync_is_sync", cloudsync_is_sync, 1, pzErrMsg, ctx, NULL);
3506 109 : if (rc != SQLITE_OK) return rc;
3507 :
3508 109 : rc = dbutils_register_function(db, "cloudsync_insert", cloudsync_insert, -1, pzErrMsg, ctx, NULL);
3509 109 : if (rc != SQLITE_OK) return rc;
3510 :
3511 109 : rc = dbutils_register_aggregate(db, "cloudsync_update", cloudsync_update_step, cloudsync_update_final, 3, pzErrMsg, ctx, NULL);
3512 109 : if (rc != SQLITE_OK) return rc;
3513 :
3514 109 : rc = dbutils_register_function(db, "cloudsync_delete", cloudsync_delete, -1, pzErrMsg, ctx, NULL);
3515 109 : if (rc != SQLITE_OK) return rc;
3516 :
3517 109 : rc = dbutils_register_function(db, "cloudsync_col_value", cloudsync_col_value, 3, pzErrMsg, ctx, NULL);
3518 109 : if (rc != SQLITE_OK) return rc;
3519 :
3520 109 : rc = dbutils_register_function(db, "cloudsync_pk_encode", cloudsync_pk_encode, -1, pzErrMsg, ctx, NULL);
3521 109 : if (rc != SQLITE_OK) return rc;
3522 :
3523 109 : rc = dbutils_register_function(db, "cloudsync_pk_decode", cloudsync_pk_decode, 2, pzErrMsg, ctx, NULL);
3524 109 : if (rc != SQLITE_OK) return rc;
3525 :
3526 109 : rc = dbutils_register_function(db, "cloudsync_seq", cloudsync_seq, 0, pzErrMsg, ctx, NULL);
3527 109 : if (rc != SQLITE_OK) return rc;
3528 :
3529 : // NETWORK LAYER
3530 : #ifndef CLOUDSYNC_OMIT_NETWORK
3531 : rc = cloudsync_network_register(db, pzErrMsg, ctx);
3532 : if (rc != SQLITE_OK) return rc;
3533 : #endif
3534 :
3535 109 : cloudsync_context *data = (cloudsync_context *)ctx;
3536 109 : sqlite3_commit_hook(db, cloudsync_commit_hook, ctx);
3537 109 : sqlite3_rollback_hook(db, cloudsync_rollback_hook, ctx);
3538 :
3539 : // register eponymous only changes virtual table
3540 109 : rc = cloudsync_vtab_register_changes (db, data);
3541 109 : if (rc != SQLITE_OK) return rc;
3542 :
3543 : // load config, if exists
3544 109 : if (cloudsync_config_exists(db)) {
3545 0 : cloudsync_context_init(db, ctx, NULL);
3546 :
3547 : // make sure to update internal version to current version
3548 0 : dbutils_settings_set_key_value(db, NULL, CLOUDSYNC_KEY_LIBVERSION, CLOUDSYNC_VERSION);
3549 0 : }
3550 :
3551 109 : return SQLITE_OK;
3552 111 : }
3553 :
3554 111 : APIEXPORT int sqlite3_cloudsync_init (sqlite3 *db, char **pzErrMsg, const sqlite3_api_routines *pApi) {
3555 : DEBUG_FUNCTION("sqlite3_cloudsync_init");
3556 :
3557 : #ifndef SQLITE_CORE
3558 : SQLITE_EXTENSION_INIT2(pApi);
3559 : #endif
3560 :
3561 111 : return cloudsync_register(db, pzErrMsg);
3562 : }
|