Line data Source code
1 : //
2 : // cloudsync.c
3 : // cloudsync
4 : //
5 : // Created by Marco Bambini on 16/05/24.
6 : //
7 :
8 : #include <inttypes.h>
9 : #include <stdbool.h>
10 : #include <limits.h>
11 : #include <stdint.h>
12 : #include <stdlib.h>
13 : #include <string.h>
14 : #include <assert.h>
15 : #include <stdio.h>
16 : #include <errno.h>
17 : #include <math.h>
18 :
19 : #include "cloudsync.h"
20 : #include "cloudsync_private.h"
21 : #include "lz4.h"
22 : #include "pk.h"
23 : #include "vtab.h"
24 : #include "utils.h"
25 : #include "dbutils.h"
26 :
27 : #ifndef CLOUDSYNC_OMIT_NETWORK
28 : #include "network.h"
29 : #endif
30 :
31 : #ifdef _WIN32
32 : #include <winsock2.h>
33 : #include <ws2tcpip.h>
34 : #else
35 : #include <arpa/inet.h> // for htonl, htons, ntohl, ntohs
36 : #include <netinet/in.h> // for struct sockaddr_in, INADDR_ANY, etc. (if needed)
37 : #endif
38 :
39 : #ifndef htonll
40 : #if __BIG_ENDIAN__
41 : #define htonll(x) (x)
42 : #define ntohll(x) (x)
43 : #else
44 : #ifndef htobe64
45 : #define htonll(x) ((uint64_t)htonl((x) & 0xFFFFFFFF) << 32 | (uint64_t)htonl((x) >> 32))
46 : #define ntohll(x) ((uint64_t)ntohl((x) & 0xFFFFFFFF) << 32 | (uint64_t)ntohl((x) >> 32))
47 : #else
48 : #define htonll(x) htobe64(x)
49 : #define ntohll(x) be64toh(x)
50 : #endif
51 : #endif
52 : #endif
53 :
54 : #ifndef SQLITE_CORE
55 : SQLITE_EXTENSION_INIT1
56 : #endif
57 :
58 : #ifndef UNUSED_PARAMETER
59 : #define UNUSED_PARAMETER(X) (void)(X)
60 : #endif
61 :
62 : #ifdef _WIN32
63 : #define APIEXPORT __declspec(dllexport)
64 : #else
65 : #define APIEXPORT
66 : #endif
67 :
68 : #define CLOUDSYNC_DEFAULT_ALGO "cls"
69 : #define CLOUDSYNC_INIT_NTABLES 128
70 : #define CLOUDSYNC_VALUE_NOTSET -1
71 : #define CLOUDSYNC_MIN_DB_VERSION 0
72 :
73 : #define CLOUDSYNC_PAYLOAD_MINBUF_SIZE 512*1024
74 : #define CLOUDSYNC_PAYLOAD_VERSION 1
75 : #define CLOUDSYNC_PAYLOAD_SIGNATURE 'CLSY'
76 : #define CLOUDSYNC_PAYLOAD_APPLY_CALLBACK_KEY "cloudsync_payload_apply_callback"
77 :
78 : #ifndef MAX
79 : #define MAX(a, b) (((a)>(b))?(a):(b))
80 : #endif
81 :
82 : #define DEBUG_SQLITE_ERROR(_rc, _fn, _db) do {if (_rc != SQLITE_OK) printf("Error in %s: %s\n", _fn, sqlite3_errmsg(_db));} while (0)
83 :
84 : typedef enum {
85 : CLOUDSYNC_PK_INDEX_TBL = 0,
86 : CLOUDSYNC_PK_INDEX_PK = 1,
87 : CLOUDSYNC_PK_INDEX_COLNAME = 2,
88 : CLOUDSYNC_PK_INDEX_COLVALUE = 3,
89 : CLOUDSYNC_PK_INDEX_COLVERSION = 4,
90 : CLOUDSYNC_PK_INDEX_DBVERSION = 5,
91 : CLOUDSYNC_PK_INDEX_SITEID = 6,
92 : CLOUDSYNC_PK_INDEX_CL = 7,
93 : CLOUDSYNC_PK_INDEX_SEQ = 8
94 : } CLOUDSYNC_PK_INDEX;
95 :
96 : typedef enum {
97 : CLOUDSYNC_STMT_VALUE_ERROR = -1,
98 : CLOUDSYNC_STMT_VALUE_UNCHANGED = 0,
99 : CLOUDSYNC_STMT_VALUE_CHANGED = 1,
100 : } CLOUDSYNC_STMT_VALUE;
101 :
102 : typedef struct {
103 : sqlite3_context *context;
104 : int index;
105 : } cloudsync_pk_decode_context;
106 :
107 : #define SYNCBIT_SET(_data) _data->insync = 1
108 : #define SYNCBIT_RESET(_data) _data->insync = 0
109 : #define BUMP_SEQ(_data) ((_data)->seq += 1, (_data)->seq - 1)
110 :
111 : // MARK: -
112 :
113 : typedef struct {
114 : table_algo algo; // CRDT algoritm associated to the table
115 : char *name; // table name
116 : char **col_name; // array of column names
117 : sqlite3_stmt **col_merge_stmt; // array of merge insert stmt (indexed by col_name)
118 : sqlite3_stmt **col_value_stmt; // array of column value stmt (indexed by col_name)
119 : int *col_id; // array of column id
120 : int ncols; // number of non primary key cols
121 : int npks; // number of primary key cols
122 : bool enabled; // flag to check if a table is enabled or disabled
123 : #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
124 : bool rowid_only; // a table with no primary keys other than the implicit rowid
125 : #endif
126 :
127 : char **pk_name; // array of primary key names
128 :
129 : // precompiled statements
130 : sqlite3_stmt *meta_pkexists_stmt; // check if a primary key already exist in the augmented table
131 : sqlite3_stmt *meta_sentinel_update_stmt; // update a local sentinel row
132 : sqlite3_stmt *meta_sentinel_insert_stmt; // insert a local sentinel row
133 : sqlite3_stmt *meta_row_insert_update_stmt; // insert/update a local row
134 : sqlite3_stmt *meta_row_drop_stmt; // delete rows from meta
135 : sqlite3_stmt *meta_update_move_stmt; // update rows in meta when pk changes
136 : sqlite3_stmt *meta_local_cl_stmt; // compute local cl value
137 : sqlite3_stmt *meta_winner_clock_stmt; // get the rowid of the last inserted/updated row in the meta table
138 : sqlite3_stmt *meta_merge_delete_drop;
139 : sqlite3_stmt *meta_zero_clock_stmt;
140 : sqlite3_stmt *meta_col_version_stmt;
141 : sqlite3_stmt *meta_site_id_stmt;
142 :
143 : sqlite3_stmt *real_col_values_stmt; // retrieve all column values based on pk
144 : sqlite3_stmt *real_merge_delete_stmt;
145 : sqlite3_stmt *real_merge_sentinel_stmt;
146 :
147 : } cloudsync_table_context;
148 :
149 : struct cloudsync_pk_decode_bind_context {
150 : sqlite3_stmt *vm;
151 : char *tbl;
152 : int64_t tbl_len;
153 : const void *pk;
154 : int64_t pk_len;
155 : char *col_name;
156 : int64_t col_name_len;
157 : int64_t col_version;
158 : int64_t db_version;
159 : const void *site_id;
160 : int64_t site_id_len;
161 : int64_t cl;
162 : int64_t seq;
163 : };
164 :
165 : struct cloudsync_context {
166 : sqlite3_context *sqlite_ctx;
167 :
168 : char *libversion;
169 : uint8_t site_id[UUID_LEN];
170 : int insync;
171 : int debug;
172 : bool merge_equal_values;
173 : bool temp_bool; // temporary value used in callback
174 : void *aux_data;
175 :
176 : // stmts and context values
177 : bool pragma_checked; // we need to check PRAGMAs only once per transaction
178 : sqlite3_stmt *schema_version_stmt;
179 : sqlite3_stmt *data_version_stmt;
180 : sqlite3_stmt *db_version_stmt;
181 : sqlite3_stmt *getset_siteid_stmt;
182 : int data_version;
183 : int schema_version;
184 : uint64_t schema_hash;
185 :
186 : // set at the start of each transaction on the first invocation and
187 : // re-set on transaction commit or rollback
188 : sqlite3_int64 db_version;
189 : // the version that the db will be set to at the end of the transaction
190 : // if that transaction were to commit at the time this value is checked
191 : sqlite3_int64 pending_db_version;
192 : // used to set an order inside each transaction
193 : int seq;
194 :
195 : // augmented tables are stored in-memory so we do not need to retrieve information about col names and cid
196 : // from the disk each time a write statement is performed
197 : // we do also not need to use an hash map here because for few tables the direct in-memory comparison with table name is faster
198 : cloudsync_table_context **tables;
199 : int tables_count;
200 : int tables_alloc;
201 : };
202 :
203 : typedef struct {
204 : char *buffer;
205 : size_t balloc;
206 : size_t bused;
207 : uint64_t nrows;
208 : uint16_t ncols;
209 : } cloudsync_network_payload;
210 :
211 : #ifdef _MSC_VER
212 : #pragma pack(push, 1) // For MSVC: pack struct with 1-byte alignment
213 : #define PACKED
214 : #else
215 : #define PACKED __attribute__((__packed__))
216 : #endif
217 :
218 : typedef struct PACKED {
219 : uint32_t signature; // 'CLSY'
220 : uint8_t version; // protocol version
221 : uint8_t libversion[3]; // major.minor.patch
222 : uint32_t expanded_size;
223 : uint16_t ncols;
224 : uint32_t nrows;
225 : uint64_t schema_hash;
226 : uint8_t unused[6]; // padding to ensure the struct is exactly 32 bytes
227 : } cloudsync_network_header;
228 :
229 : #ifdef _MSC_VER
230 : #pragma pack(pop)
231 : #endif
232 :
233 : #if CLOUDSYNC_UNITTEST
234 : bool force_uncompressed_blob = false;
235 : #define CHECK_FORCE_UNCOMPRESSED_BUFFER() if (force_uncompressed_blob) use_uncompressed_buffer = true
236 : #else
237 : #define CHECK_FORCE_UNCOMPRESSED_BUFFER()
238 : #endif
239 :
240 : int db_version_rebuild_stmt (sqlite3 *db, cloudsync_context *data);
241 : int cloudsync_load_siteid (sqlite3 *db, cloudsync_context *data);
242 : int local_mark_insert_or_update_meta (sqlite3 *db, cloudsync_table_context *table, const char *pk, size_t pklen, const char *col_name, sqlite3_int64 db_version, int seq);
243 :
244 : // MARK: - STMT Utils -
245 :
246 4313 : CLOUDSYNC_STMT_VALUE stmt_execute (sqlite3_stmt *stmt, cloudsync_context *data) {
247 4313 : int rc = sqlite3_step(stmt);
248 4313 : if (rc != SQLITE_ROW && rc != SQLITE_DONE) {
249 2 : if (data) DEBUG_SQLITE_ERROR(rc, "stmt_execute", sqlite3_db_handle(stmt));
250 2 : sqlite3_reset(stmt);
251 2 : return CLOUDSYNC_STMT_VALUE_ERROR;
252 : }
253 :
254 4311 : CLOUDSYNC_STMT_VALUE result = CLOUDSYNC_STMT_VALUE_CHANGED;
255 4311 : if (stmt == data->data_version_stmt) {
256 3939 : int version = sqlite3_column_int(stmt, 0);
257 3939 : if (version != data->data_version) {
258 45 : data->data_version = version;
259 45 : } else {
260 3894 : result = CLOUDSYNC_STMT_VALUE_UNCHANGED;
261 : }
262 4311 : } else if (stmt == data->schema_version_stmt) {
263 186 : int version = sqlite3_column_int(stmt, 0);
264 186 : if (version > data->schema_version) {
265 96 : data->schema_version = version;
266 96 : } else {
267 90 : result = CLOUDSYNC_STMT_VALUE_UNCHANGED;
268 : }
269 :
270 372 : } else if (stmt == data->db_version_stmt) {
271 186 : data->db_version = (rc == SQLITE_DONE) ? CLOUDSYNC_MIN_DB_VERSION : sqlite3_column_int64(stmt, 0);
272 186 : }
273 :
274 4311 : sqlite3_reset(stmt);
275 4311 : return result;
276 4313 : }
277 :
278 300 : int stmt_count (sqlite3_stmt *stmt, const char *value, size_t len, int type) {
279 300 : int result = -1;
280 300 : int rc = SQLITE_OK;
281 :
282 300 : if (value) {
283 299 : rc = (type == SQLITE_TEXT) ? sqlite3_bind_text(stmt, 1, value, (int)len, SQLITE_STATIC) : sqlite3_bind_blob(stmt, 1, value, (int)len, SQLITE_STATIC);
284 299 : if (rc != SQLITE_OK) goto cleanup;
285 299 : }
286 :
287 300 : rc = sqlite3_step(stmt);
288 600 : if (rc == SQLITE_DONE) {
289 1 : result = 0;
290 1 : rc = SQLITE_OK;
291 300 : } else if (rc == SQLITE_ROW) {
292 299 : result = sqlite3_column_int(stmt, 0);
293 299 : rc = SQLITE_OK;
294 299 : }
295 :
296 : cleanup:
297 300 : DEBUG_SQLITE_ERROR(rc, "stmt_count", sqlite3_db_handle(stmt));
298 300 : sqlite3_reset(stmt);
299 300 : return result;
300 : }
301 :
302 16963 : sqlite3_stmt *stmt_reset (sqlite3_stmt *stmt) {
303 16963 : sqlite3_clear_bindings(stmt);
304 16963 : sqlite3_reset(stmt);
305 16963 : return NULL;
306 : }
307 :
308 146 : int stmts_add_tocontext (sqlite3 *db, cloudsync_context *data) {
309 : DEBUG_DBFUNCTION("cloudsync_add_stmts");
310 :
311 146 : if (data->data_version_stmt == NULL) {
312 47 : const char *sql = "PRAGMA data_version;";
313 47 : int rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &data->data_version_stmt, NULL);
314 : DEBUG_STMT("data_version_stmt %p", data->data_version_stmt);
315 47 : if (rc != SQLITE_OK) return rc;
316 : DEBUG_SQL("data_version_stmt: %s", sql);
317 47 : }
318 :
319 146 : if (data->schema_version_stmt == NULL) {
320 47 : const char *sql = "PRAGMA schema_version;";
321 47 : int rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &data->schema_version_stmt, NULL);
322 : DEBUG_STMT("schema_version_stmt %p", data->schema_version_stmt);
323 47 : if (rc != SQLITE_OK) return rc;
324 : DEBUG_SQL("schema_version_stmt: %s", sql);
325 47 : }
326 :
327 146 : if (data->getset_siteid_stmt == NULL) {
328 : // get and set index of the site_id
329 : // in SQLite, we can’t directly combine an INSERT and a SELECT to both insert a row and return an identifier (rowid) in a single statement,
330 : // however, we can use a workaround by leveraging the INSERT statement with ON CONFLICT DO UPDATE and then combining it with RETURNING rowid
331 47 : const char *sql = "INSERT INTO cloudsync_site_id (site_id) VALUES (?) ON CONFLICT(site_id) DO UPDATE SET site_id = site_id RETURNING rowid;";
332 47 : int rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &data->getset_siteid_stmt, NULL);
333 : DEBUG_STMT("getset_siteid_stmt %p", data->getset_siteid_stmt);
334 47 : if (rc != SQLITE_OK) return rc;
335 : DEBUG_SQL("getset_siteid_stmt: %s", sql);
336 47 : }
337 :
338 146 : return db_version_rebuild_stmt(db, data);
339 146 : }
340 :
341 : // MARK: - Database Version -
342 :
343 196 : char *db_version_build_query (sqlite3 *db) {
344 : // this function must be manually called each time tables changes
345 : // because the query plan changes too and it must be re-prepared
346 : // unfortunately there is no other way
347 :
348 : // we need to execute a query like:
349 : /*
350 : SELECT max(version) as version FROM (
351 : SELECT max(db_version) as version FROM "table1_cloudsync"
352 : UNION ALL
353 : SELECT max(db_version) as version FROM "table2_cloudsync"
354 : UNION ALL
355 : SELECT max(db_version) as version FROM "table3_cloudsync"
356 : UNION
357 : SELECT value as version FROM cloudsync_settings WHERE key = 'pre_alter_dbversion'
358 : )
359 : */
360 :
361 : // the good news is that the query can be computed in SQLite without the need to do any extra computation from the host language
362 196 : const char *sql = "WITH table_names AS ("
363 : "SELECT format('%w', name) as tbl_name "
364 : "FROM sqlite_master "
365 : "WHERE type='table' "
366 : "AND name LIKE '%_cloudsync'"
367 : "), "
368 : "query_parts AS ("
369 : "SELECT 'SELECT max(db_version) as version FROM \"' || tbl_name || '\"' as part FROM table_names"
370 : "), "
371 : "combined_query AS ("
372 : "SELECT GROUP_CONCAT(part, ' UNION ALL ') || ' UNION SELECT value as version FROM cloudsync_settings WHERE key = ''pre_alter_dbversion''' as full_query FROM query_parts"
373 : ") "
374 : "SELECT CONCAT('SELECT max(version) as version FROM (', full_query, ');') FROM combined_query;";
375 196 : return dbutils_text_select(db, sql);
376 : }
377 :
378 242 : int db_version_rebuild_stmt (sqlite3 *db, cloudsync_context *data) {
379 242 : if (data->db_version_stmt) {
380 149 : sqlite3_finalize(data->db_version_stmt);
381 149 : data->db_version_stmt = NULL;
382 149 : }
383 :
384 242 : sqlite3_int64 count = dbutils_table_settings_count_tables(db);
385 242 : if (count == 0) return SQLITE_OK;
386 196 : else if (count == -1) {
387 0 : dbutils_context_result_error(data->sqlite_ctx, "%s", sqlite3_errmsg(db));
388 0 : return SQLITE_ERROR;
389 : }
390 :
391 196 : char *sql = db_version_build_query(db);
392 196 : if (!sql) return SQLITE_NOMEM;
393 : DEBUG_SQL("db_version_stmt: %s", sql);
394 :
395 196 : int rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &data->db_version_stmt, NULL);
396 : DEBUG_STMT("db_version_stmt %p", data->db_version_stmt);
397 196 : cloudsync_memory_free(sql);
398 196 : return rc;
399 242 : }
400 :
401 186 : int db_version_rerun (sqlite3 *db, cloudsync_context *data) {
402 186 : CLOUDSYNC_STMT_VALUE schema_changed = stmt_execute(data->schema_version_stmt, data);
403 186 : if (schema_changed == CLOUDSYNC_STMT_VALUE_ERROR) return -1;
404 :
405 186 : if (schema_changed == CLOUDSYNC_STMT_VALUE_CHANGED) {
406 96 : int rc = db_version_rebuild_stmt(db, data);
407 96 : if (rc != SQLITE_OK) return -1;
408 96 : }
409 :
410 186 : CLOUDSYNC_STMT_VALUE rc = stmt_execute(data->db_version_stmt, data);
411 186 : if (rc == CLOUDSYNC_STMT_VALUE_ERROR) return -1;
412 186 : return 0;
413 186 : }
414 :
415 3939 : int db_version_check_uptodate (sqlite3 *db, cloudsync_context *data) {
416 : // perform a PRAGMA data_version to check if some other process write any data
417 3939 : CLOUDSYNC_STMT_VALUE rc = stmt_execute(data->data_version_stmt, data);
418 3939 : if (rc == CLOUDSYNC_STMT_VALUE_ERROR) return -1;
419 :
420 : // db_version is already set and there is no need to update it
421 3939 : if (data->db_version != CLOUDSYNC_VALUE_NOTSET && rc == CLOUDSYNC_STMT_VALUE_UNCHANGED) return 0;
422 :
423 186 : return db_version_rerun(db, data);
424 3939 : }
425 :
426 3913 : sqlite3_int64 db_version_next (sqlite3 *db, cloudsync_context *data, sqlite3_int64 merging_version) {
427 3913 : int rc = db_version_check_uptodate(db, data);
428 3913 : if (rc != SQLITE_OK) return -1;
429 :
430 3913 : sqlite3_int64 result = data->db_version + 1;
431 3913 : if (result < data->pending_db_version) result = data->pending_db_version;
432 3913 : if (merging_version != CLOUDSYNC_VALUE_NOTSET && result < merging_version) result = merging_version;
433 3913 : data->pending_db_version = result;
434 :
435 3913 : return result;
436 3913 : }
437 :
438 : // MARK: -
439 :
440 0 : void *cloudsync_get_auxdata (sqlite3_context *context) {
441 0 : cloudsync_context *data = (context) ? (cloudsync_context *)sqlite3_user_data(context) : NULL;
442 0 : return (data) ? data->aux_data : NULL;
443 : }
444 :
445 0 : void cloudsync_set_auxdata (sqlite3_context *context, void *xdata) {
446 0 : cloudsync_context *data = (context) ? (cloudsync_context *)sqlite3_user_data(context) : NULL;
447 0 : if (data) data->aux_data = xdata;
448 0 : }
449 :
450 : // MARK: - PK Context -
451 :
452 8449 : char *cloudsync_pk_context_tbl (cloudsync_pk_decode_bind_context *ctx, int64_t *tbl_len) {
453 8449 : *tbl_len = ctx->tbl_len;
454 8449 : return ctx->tbl;
455 : }
456 :
457 8449 : void *cloudsync_pk_context_pk (cloudsync_pk_decode_bind_context *ctx, int64_t *pk_len) {
458 8449 : *pk_len = ctx->pk_len;
459 8449 : return (void *)ctx->pk;
460 : }
461 :
462 8449 : char *cloudsync_pk_context_colname (cloudsync_pk_decode_bind_context *ctx, int64_t *colname_len) {
463 8449 : *colname_len = ctx->col_name_len;
464 8449 : return ctx->col_name;
465 : }
466 :
467 8449 : int64_t cloudsync_pk_context_cl (cloudsync_pk_decode_bind_context *ctx) {
468 8449 : return ctx->cl;
469 : }
470 :
471 8449 : int64_t cloudsync_pk_context_dbversion (cloudsync_pk_decode_bind_context *ctx) {
472 8449 : return ctx->db_version;
473 : }
474 :
475 : // MARK: - Table Utils -
476 :
477 63 : char *table_build_values_sql (sqlite3 *db, cloudsync_table_context *table) {
478 63 : char *sql = NULL;
479 :
480 : /*
481 : This SQL statement dynamically generates a SELECT query for a specified table.
482 : It uses Common Table Expressions (CTEs) to construct the column names and
483 : primary key conditions based on the table schema, which is obtained through
484 : the `pragma_table_info` function.
485 :
486 : 1. `col_names` CTE:
487 : - Retrieves a comma-separated list of non-primary key column names from
488 : the specified table's schema.
489 :
490 : 2. `pk_where` CTE:
491 : - Retrieves a condition string representing the primary key columns in the
492 : format: "column1=? AND column2=? AND ...", used to create the WHERE clause
493 : for selecting rows based on primary key values.
494 :
495 : 3. Final SELECT:
496 : - Constructs the complete SELECT statement as a string, combining:
497 : - Column names from `col_names`.
498 : - The target table name.
499 : - The WHERE clause conditions from `pk_where`.
500 :
501 : The resulting query can be used to select rows from the table based on primary
502 : key values, and can be executed within the application to retrieve data dynamically.
503 : */
504 :
505 : // Unfortunately in SQLite column names (or table names) cannot be bound parameters in a SELECT statement
506 : // otherwise we should have used something like SELECT 'SELECT ? FROM %w WHERE rowid=?';
507 :
508 63 : char *singlequote_escaped_table_name = cloudsync_memory_mprintf("%q", table->name);
509 :
510 : #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
511 : if (table->rowid_only) {
512 : sql = memory_mprintf("WITH col_names AS (SELECT group_concat('\"' || format('%%w', name) || '\"', ',') AS cols FROM pragma_table_info('%q') WHERE pk=0 ORDER BY cid) SELECT 'SELECT ' || (SELECT cols FROM col_names) || ' FROM \"%w\" WHERE rowid=?;'", table->name, table->name);
513 : goto process_process;
514 : }
515 : #endif
516 :
517 63 : sql = cloudsync_memory_mprintf("WITH col_names AS (SELECT group_concat('\"' || format('%%w', name) || '\"', ',') AS cols FROM pragma_table_info('%q') WHERE pk=0 ORDER BY cid), pk_where AS (SELECT group_concat('\"' || format('%%w', name) || '\"', '=? AND ') || '=?' AS pk_clause FROM pragma_table_info('%q') WHERE pk>0 ORDER BY pk) SELECT 'SELECT ' || (SELECT cols FROM col_names) || ' FROM \"%w\" WHERE ' || (SELECT pk_clause FROM pk_where) || ';'", table->name, table->name, singlequote_escaped_table_name);
518 :
519 : #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
520 : process_process:
521 : #endif
522 63 : cloudsync_memory_free(singlequote_escaped_table_name);
523 63 : if (!sql) return NULL;
524 63 : char *query = dbutils_text_select(db, sql);
525 63 : cloudsync_memory_free(sql);
526 :
527 63 : return query;
528 63 : }
529 :
530 94 : char *table_build_mergedelete_sql (sqlite3 *db, cloudsync_table_context *table) {
531 : #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
532 : if (table->rowid_only) {
533 : char *sql = memory_mprintf("DELETE FROM \"%w\" WHERE rowid=?;", table->name);
534 : return sql;
535 : }
536 : #endif
537 :
538 94 : char *singlequote_escaped_table_name = cloudsync_memory_mprintf("%q", table->name);
539 94 : char *sql = cloudsync_memory_mprintf("WITH pk_where AS (SELECT group_concat('\"' || format('%%w', name) || '\"', '=? AND ') || '=?' AS pk_clause FROM pragma_table_info('%q') WHERE pk>0 ORDER BY pk) SELECT 'DELETE FROM \"%w\" WHERE ' || (SELECT pk_clause FROM pk_where) || ';'", table->name, singlequote_escaped_table_name);
540 94 : cloudsync_memory_free(singlequote_escaped_table_name);
541 94 : if (!sql) return NULL;
542 :
543 94 : char *query = dbutils_text_select(db, sql);
544 94 : cloudsync_memory_free(sql);
545 :
546 94 : return query;
547 94 : }
548 :
549 280 : char *table_build_mergeinsert_sql (sqlite3 *db, cloudsync_table_context *table, const char *colname) {
550 280 : char *sql = NULL;
551 :
552 : #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
553 : if (table->rowid_only) {
554 : if (colname == NULL) {
555 : // INSERT OR IGNORE INTO customers (first_name,last_name) VALUES (?,?);
556 : sql = memory_mprintf("INSERT OR IGNORE INTO \"%w\" (rowid) VALUES (?);", table->name);
557 : } else {
558 : // INSERT INTO customers (first_name,last_name,age) VALUES (?,?,?) ON CONFLICT DO UPDATE SET age=?;
559 : sql = memory_mprintf("INSERT INTO \"%w\" (rowid, \"%w\") VALUES (?, ?) ON CONFLICT DO UPDATE SET \"%w\"=?;", table->name, colname, colname);
560 : }
561 : return sql;
562 : }
563 : #endif
564 :
565 280 : char *singlequote_escaped_table_name = cloudsync_memory_mprintf("%q", table->name);
566 :
567 280 : if (colname == NULL) {
568 : // is sentinel insert
569 94 : sql = cloudsync_memory_mprintf("WITH pk_where AS (SELECT group_concat('\"' || format('%%w', name) || '\"') AS pk_clause FROM pragma_table_info('%q') WHERE pk>0 ORDER BY pk), pk_bind AS (SELECT group_concat('?') AS pk_binding FROM pragma_table_info('%q') WHERE pk>0 ORDER BY pk) SELECT 'INSERT OR IGNORE INTO \"%w\" (' || (SELECT pk_clause FROM pk_where) || ') VALUES (' || (SELECT pk_binding FROM pk_bind) || ');'", table->name, table->name, singlequote_escaped_table_name);
570 94 : } else {
571 186 : char *singlequote_escaped_col_name = cloudsync_memory_mprintf("%q", colname);
572 186 : sql = cloudsync_memory_mprintf("WITH pk_where AS (SELECT group_concat('\"' || format('%%w', name) || '\"') AS pk_clause FROM pragma_table_info('%q') WHERE pk>0 ORDER BY pk), pk_bind AS (SELECT group_concat('?') AS pk_binding FROM pragma_table_info('%q') WHERE pk>0 ORDER BY pk) SELECT 'INSERT INTO \"%w\" (' || (SELECT pk_clause FROM pk_where) || ',\"%w\") VALUES (' || (SELECT pk_binding FROM pk_bind) || ',?) ON CONFLICT DO UPDATE SET \"%w\"=?;'", table->name, table->name, singlequote_escaped_table_name, singlequote_escaped_col_name, singlequote_escaped_col_name);
573 186 : cloudsync_memory_free(singlequote_escaped_col_name);
574 :
575 : }
576 280 : cloudsync_memory_free(singlequote_escaped_table_name);
577 280 : if (!sql) return NULL;
578 :
579 280 : char *query = dbutils_text_select(db, sql);
580 280 : cloudsync_memory_free(sql);
581 :
582 280 : return query;
583 280 : }
584 :
585 301 : char *table_build_value_sql (sqlite3 *db, cloudsync_table_context *table, const char *colname) {
586 301 : char *colnamequote = dbutils_is_star_table(colname) ? "" : "\"";
587 :
588 : #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
589 : if (table->rowid_only) {
590 : char *sql = memory_mprintf("SELECT %s%w%s FROM \"%w\" WHERE rowid=?;", colnamequote, colname, colnamequote, table->name);
591 : return sql;
592 : }
593 : #endif
594 :
595 : // SELECT age FROM customers WHERE first_name=? AND last_name=?;
596 301 : char *singlequote_escaped_table_name = cloudsync_memory_mprintf("%q", table->name);
597 301 : char *singlequote_escaped_col_name = cloudsync_memory_mprintf("%q", colname);
598 301 : char *sql = cloudsync_memory_mprintf("WITH pk_where AS (SELECT group_concat('\"' || format('%%w', name) || '\"', '=? AND ') || '=?' AS pk_clause FROM pragma_table_info('%q') WHERE pk>0 ORDER BY pk) SELECT 'SELECT %s%w%s FROM \"%w\" WHERE ' || (SELECT pk_clause FROM pk_where) || ';'", table->name, colnamequote, singlequote_escaped_col_name, colnamequote, singlequote_escaped_table_name);
599 301 : cloudsync_memory_free(singlequote_escaped_col_name);
600 301 : cloudsync_memory_free(singlequote_escaped_table_name);
601 301 : if (!sql) return NULL;
602 :
603 301 : char *query = dbutils_text_select(db, sql);
604 301 : cloudsync_memory_free(sql);
605 :
606 301 : return query;
607 301 : }
608 :
609 94 : cloudsync_table_context *table_create (const char *name, table_algo algo) {
610 : DEBUG_DBFUNCTION("table_create %s", name);
611 :
612 94 : cloudsync_table_context *table = (cloudsync_table_context *)cloudsync_memory_zeroalloc(sizeof(cloudsync_table_context));
613 94 : if (!table) return NULL;
614 :
615 94 : table->algo = algo;
616 94 : table->name = cloudsync_string_dup(name, true);
617 94 : if (!table->name) {
618 0 : cloudsync_memory_free(table);
619 0 : return NULL;
620 : }
621 94 : table->enabled = true;
622 :
623 94 : return table;
624 94 : }
625 :
626 94 : void table_free (cloudsync_table_context *table) {
627 : DEBUG_DBFUNCTION("table_free %s", (table) ? (table->name) : "NULL");
628 94 : if (!table) return;
629 :
630 94 : if (table->ncols > 0) {
631 63 : if (table->col_name) {
632 249 : for (int i=0; i<table->ncols; ++i) {
633 186 : cloudsync_memory_free(table->col_name[i]);
634 186 : }
635 63 : cloudsync_memory_free(table->col_name);
636 63 : }
637 63 : if (table->col_merge_stmt) {
638 249 : for (int i=0; i<table->ncols; ++i) {
639 186 : sqlite3_finalize(table->col_merge_stmt[i]);
640 186 : }
641 63 : cloudsync_memory_free(table->col_merge_stmt);
642 63 : }
643 63 : if (table->col_value_stmt) {
644 249 : for (int i=0; i<table->ncols; ++i) {
645 186 : sqlite3_finalize(table->col_value_stmt[i]);
646 186 : }
647 63 : cloudsync_memory_free(table->col_value_stmt);
648 63 : }
649 63 : if (table->col_id) {
650 63 : cloudsync_memory_free(table->col_id);
651 63 : }
652 63 : }
653 :
654 94 : if (table->pk_name) sqlite3_free_table(table->pk_name);
655 94 : if (table->name) cloudsync_memory_free(table->name);
656 94 : if (table->meta_pkexists_stmt) sqlite3_finalize(table->meta_pkexists_stmt);
657 94 : if (table->meta_sentinel_update_stmt) sqlite3_finalize(table->meta_sentinel_update_stmt);
658 94 : if (table->meta_sentinel_insert_stmt) sqlite3_finalize(table->meta_sentinel_insert_stmt);
659 94 : if (table->meta_row_insert_update_stmt) sqlite3_finalize(table->meta_row_insert_update_stmt);
660 94 : if (table->meta_row_drop_stmt) sqlite3_finalize(table->meta_row_drop_stmt);
661 94 : if (table->meta_update_move_stmt) sqlite3_finalize(table->meta_update_move_stmt);
662 94 : if (table->meta_local_cl_stmt) sqlite3_finalize(table->meta_local_cl_stmt);
663 94 : if (table->meta_winner_clock_stmt) sqlite3_finalize(table->meta_winner_clock_stmt);
664 94 : if (table->meta_merge_delete_drop) sqlite3_finalize(table->meta_merge_delete_drop);
665 94 : if (table->meta_zero_clock_stmt) sqlite3_finalize(table->meta_zero_clock_stmt);
666 94 : if (table->meta_col_version_stmt) sqlite3_finalize(table->meta_col_version_stmt);
667 94 : if (table->meta_site_id_stmt) sqlite3_finalize(table->meta_site_id_stmt);
668 :
669 94 : if (table->real_col_values_stmt) sqlite3_finalize(table->real_col_values_stmt);
670 94 : if (table->real_merge_delete_stmt) sqlite3_finalize(table->real_merge_delete_stmt);
671 94 : if (table->real_merge_sentinel_stmt) sqlite3_finalize(table->real_merge_sentinel_stmt);
672 :
673 94 : cloudsync_memory_free(table);
674 94 : }
675 :
676 94 : int table_add_stmts (sqlite3 *db, cloudsync_table_context *table, int ncols) {
677 94 : int rc = SQLITE_OK;
678 94 : char *sql = NULL;
679 :
680 : // META TABLE statements
681 :
682 : // CREATE TABLE IF NOT EXISTS \"%w_cloudsync\" (pk BLOB NOT NULL, col_name TEXT NOT NULL, col_version INTEGER, db_version INTEGER, site_id INTEGER DEFAULT 0, seq INTEGER, PRIMARY KEY (pk, col_name));
683 :
684 : // precompile the pk exists statement
685 : // we do not need an index on the pk column because it is already covered by the fact that it is part of the prikeys
686 : // EXPLAIN QUERY PLAN reports: SEARCH table_name USING PRIMARY KEY (pk=?)
687 94 : sql = cloudsync_memory_mprintf("SELECT EXISTS(SELECT 1 FROM \"%w_cloudsync\" WHERE pk = ? LIMIT 1);", table->name);
688 94 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
689 : DEBUG_SQL("meta_pkexists_stmt: %s", sql);
690 :
691 94 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->meta_pkexists_stmt, NULL);
692 :
693 94 : cloudsync_memory_free(sql);
694 94 : if (rc != SQLITE_OK) goto cleanup;
695 :
696 : // precompile the update local sentinel statement
697 94 : sql = cloudsync_memory_mprintf("UPDATE \"%w_cloudsync\" SET col_version = CASE col_version %% 2 WHEN 0 THEN col_version + 1 ELSE col_version + 2 END, db_version = ?, seq = ?, site_id = 0 WHERE pk = ? AND col_name = '%s';", table->name, CLOUDSYNC_TOMBSTONE_VALUE);
698 94 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
699 : DEBUG_SQL("meta_sentinel_update_stmt: %s", sql);
700 :
701 94 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->meta_sentinel_update_stmt, NULL);
702 94 : cloudsync_memory_free(sql);
703 94 : if (rc != SQLITE_OK) goto cleanup;
704 :
705 : // precompile the insert local sentinel statement
706 94 : sql = cloudsync_memory_mprintf("INSERT INTO \"%w_cloudsync\" (pk, col_name, col_version, db_version, seq, site_id) SELECT ?, '%s', 1, ?, ?, 0 WHERE 1 ON CONFLICT DO UPDATE SET col_version = CASE col_version %% 2 WHEN 0 THEN col_version + 1 ELSE col_version + 2 END, db_version = ?, seq = ?, site_id = 0;", table->name, CLOUDSYNC_TOMBSTONE_VALUE);
707 94 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
708 : DEBUG_SQL("meta_sentinel_insert_stmt: %s", sql);
709 :
710 94 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->meta_sentinel_insert_stmt, NULL);
711 94 : cloudsync_memory_free(sql);
712 94 : if (rc != SQLITE_OK) goto cleanup;
713 :
714 : // precompile the insert/update local row statement
715 94 : sql = cloudsync_memory_mprintf("INSERT INTO \"%w_cloudsync\" (pk, col_name, col_version, db_version, seq, site_id ) SELECT ?, ?, ?, ?, ?, 0 WHERE 1 ON CONFLICT DO UPDATE SET col_version = col_version + 1, db_version = ?, seq = ?, site_id = 0;", table->name);
716 94 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
717 : DEBUG_SQL("meta_row_insert_update_stmt: %s", sql);
718 :
719 94 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->meta_row_insert_update_stmt, NULL);
720 94 : cloudsync_memory_free(sql);
721 94 : if (rc != SQLITE_OK) goto cleanup;
722 :
723 : // precompile the delete rows from meta
724 94 : sql = cloudsync_memory_mprintf("DELETE FROM \"%w_cloudsync\" WHERE pk=? AND col_name!='%s';", table->name, CLOUDSYNC_TOMBSTONE_VALUE);
725 94 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
726 : DEBUG_SQL("meta_row_drop_stmt: %s", sql);
727 :
728 94 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->meta_row_drop_stmt, NULL);
729 94 : cloudsync_memory_free(sql);
730 94 : if (rc != SQLITE_OK) goto cleanup;
731 :
732 : // precompile the update rows from meta when pk changes
733 : // see https://github.com/sqliteai/sqlite-sync/blob/main/docs/PriKey.md for more details
734 94 : sql = cloudsync_memory_mprintf("UPDATE OR REPLACE \"%w_cloudsync\" SET pk=?, db_version=?, col_version=1, seq=cloudsync_seq(), site_id=0 WHERE (pk=? AND col_name!='%s');", table->name, CLOUDSYNC_TOMBSTONE_VALUE);
735 94 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
736 : DEBUG_SQL("meta_update_move_stmt: %s", sql);
737 :
738 94 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->meta_update_move_stmt, NULL);
739 94 : cloudsync_memory_free(sql);
740 94 : if (rc != SQLITE_OK) goto cleanup;
741 :
742 : // local cl
743 94 : sql = cloudsync_memory_mprintf("SELECT COALESCE((SELECT col_version FROM \"%w_cloudsync\" WHERE pk=? AND col_name='%s'), (SELECT 1 FROM \"%w_cloudsync\" WHERE pk=?));", table->name, CLOUDSYNC_TOMBSTONE_VALUE, table->name);
744 94 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
745 : DEBUG_SQL("meta_local_cl_stmt: %s", sql);
746 :
747 94 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->meta_local_cl_stmt, NULL);
748 94 : cloudsync_memory_free(sql);
749 94 : if (rc != SQLITE_OK) goto cleanup;
750 :
751 : // rowid of the last inserted/updated row in the meta table
752 94 : sql = cloudsync_memory_mprintf("INSERT OR REPLACE INTO \"%w_cloudsync\" (pk, col_name, col_version, db_version, seq, site_id) VALUES (?, ?, ?, cloudsync_db_version_next(?), ?, ?) RETURNING ((db_version << 30) | seq);", table->name);
753 94 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
754 : DEBUG_SQL("meta_winner_clock_stmt: %s", sql);
755 :
756 94 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->meta_winner_clock_stmt, NULL);
757 94 : cloudsync_memory_free(sql);
758 94 : if (rc != SQLITE_OK) goto cleanup;
759 :
760 94 : sql = cloudsync_memory_mprintf("DELETE FROM \"%w_cloudsync\" WHERE pk=? AND col_name!='%s';", table->name, CLOUDSYNC_TOMBSTONE_VALUE);
761 94 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
762 : DEBUG_SQL("meta_merge_delete_drop: %s", sql);
763 :
764 94 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->meta_merge_delete_drop, NULL);
765 94 : cloudsync_memory_free(sql);
766 94 : if (rc != SQLITE_OK) goto cleanup;
767 :
768 : // zero clock
769 94 : sql = cloudsync_memory_mprintf("UPDATE \"%w_cloudsync\" SET col_version = 0, db_version = cloudsync_db_version_next(?) WHERE pk=? AND col_name!='%s';", table->name, CLOUDSYNC_TOMBSTONE_VALUE);
770 94 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
771 : DEBUG_SQL("meta_zero_clock_stmt: %s", sql);
772 :
773 94 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->meta_zero_clock_stmt, NULL);
774 94 : cloudsync_memory_free(sql);
775 94 : if (rc != SQLITE_OK) goto cleanup;
776 :
777 : // col_version
778 94 : sql = cloudsync_memory_mprintf("SELECT col_version FROM \"%w_cloudsync\" WHERE pk=? AND col_name=?;", table->name);
779 94 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
780 : DEBUG_SQL("meta_col_version_stmt: %s", sql);
781 :
782 94 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->meta_col_version_stmt, NULL);
783 94 : cloudsync_memory_free(sql);
784 94 : if (rc != SQLITE_OK) goto cleanup;
785 :
786 : // site_id
787 94 : sql = cloudsync_memory_mprintf("SELECT site_id FROM \"%w_cloudsync\" WHERE pk=? AND col_name=?;", table->name);
788 94 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
789 : DEBUG_SQL("meta_site_id_stmt: %s", sql);
790 :
791 94 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->meta_site_id_stmt, NULL);
792 94 : cloudsync_memory_free(sql);
793 94 : if (rc != SQLITE_OK) goto cleanup;
794 :
795 : // REAL TABLE statements
796 :
797 : // precompile the get column value statement
798 94 : if (ncols > 0) {
799 63 : sql = table_build_values_sql(db, table);
800 63 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
801 : DEBUG_SQL("real_col_values_stmt: %s", sql);
802 :
803 63 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->real_col_values_stmt, NULL);
804 63 : cloudsync_memory_free(sql);
805 63 : if (rc != SQLITE_OK) goto cleanup;
806 63 : }
807 :
808 94 : sql = table_build_mergedelete_sql(db, table);
809 94 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
810 : DEBUG_SQL("real_merge_delete: %s", sql);
811 :
812 94 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->real_merge_delete_stmt, NULL);
813 94 : cloudsync_memory_free(sql);
814 94 : if (rc != SQLITE_OK) goto cleanup;
815 :
816 94 : sql = table_build_mergeinsert_sql(db, table, NULL);
817 94 : if (!sql) {rc = SQLITE_NOMEM; goto cleanup;}
818 : DEBUG_SQL("real_merge_sentinel: %s", sql);
819 :
820 94 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->real_merge_sentinel_stmt, NULL);
821 94 : cloudsync_memory_free(sql);
822 94 : if (rc != SQLITE_OK) goto cleanup;
823 :
824 : cleanup:
825 94 : if (rc != SQLITE_OK) printf("table_add_stmts error: %s\n", sqlite3_errmsg(db));
826 94 : return rc;
827 : }
828 :
829 18952 : cloudsync_table_context *table_lookup (cloudsync_context *data, const char *table_name) {
830 : DEBUG_DBFUNCTION("table_lookup %s", table_name);
831 :
832 19881 : for (int i=0; i<data->tables_count; ++i) {
833 19767 : const char *name = (data->tables[i]) ? data->tables[i]->name : NULL;
834 19767 : if ((name) && (strcasecmp(name, table_name) == 0)) {
835 18838 : return data->tables[i];
836 : }
837 929 : }
838 :
839 114 : return NULL;
840 18952 : }
841 :
842 13765 : sqlite3_stmt *table_column_lookup (cloudsync_table_context *table, const char *col_name, bool is_merge, int *index) {
843 : DEBUG_DBFUNCTION("table_column_lookup %s", col_name);
844 :
845 26576 : for (int i=0; i<table->ncols; ++i) {
846 26576 : if (strcasecmp(table->col_name[i], col_name) == 0) {
847 13765 : if (index) *index = i;
848 13765 : return (is_merge) ? table->col_merge_stmt[i] : table->col_value_stmt[i];
849 : }
850 12811 : }
851 :
852 0 : if (index) *index = -1;
853 0 : return NULL;
854 13765 : }
855 :
856 29 : int table_remove (cloudsync_context *data, const char *table_name) {
857 : DEBUG_DBFUNCTION("table_remove %s", table_name);
858 :
859 52 : for (int i=0; i<data->tables_count; ++i) {
860 52 : const char *name = (data->tables[i]) ? data->tables[i]->name : NULL;
861 52 : if ((name) && (strcasecmp(name, table_name) == 0)) {
862 29 : data->tables[i] = NULL;
863 29 : return i;
864 : }
865 23 : }
866 0 : return -1;
867 29 : }
868 :
869 186 : int table_add_to_context_cb (void *xdata, int ncols, char **values, char **names) {
870 186 : cloudsync_table_context *table = (cloudsync_table_context *)xdata;
871 :
872 186 : sqlite3 *db = sqlite3_db_handle(table->meta_pkexists_stmt);
873 186 : if (!db) return SQLITE_ERROR;
874 :
875 186 : int index = table->ncols;
876 372 : for (int i=0; i<ncols; i+=2) {
877 186 : const char *name = values[i];
878 186 : int cid = (int)strtol(values[i+1], NULL, 0);
879 :
880 186 : table->col_id[index] = cid;
881 186 : table->col_name[index] = cloudsync_string_dup(name, true);
882 186 : if (!table->col_name[index]) return 1;
883 :
884 186 : char *sql = table_build_mergeinsert_sql(db, table, name);
885 186 : if (!sql) return SQLITE_NOMEM;
886 : DEBUG_SQL("col_merge_stmt[%d]: %s", index, sql);
887 :
888 186 : int rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->col_merge_stmt[index], NULL);
889 186 : cloudsync_memory_free(sql);
890 186 : if (rc != SQLITE_OK) return rc;
891 186 : if (!table->col_merge_stmt[index]) return SQLITE_MISUSE;
892 :
893 186 : sql = table_build_value_sql(db, table, name);
894 186 : if (!sql) return SQLITE_NOMEM;
895 : DEBUG_SQL("col_value_stmt[%d]: %s", index, sql);
896 :
897 186 : rc = sqlite3_prepare_v3(db, sql, -1, SQLITE_PREPARE_PERSISTENT, &table->col_value_stmt[index], NULL);
898 186 : cloudsync_memory_free(sql);
899 186 : if (rc != SQLITE_OK) return rc;
900 186 : if (!table->col_value_stmt[index]) return SQLITE_MISUSE;
901 186 : }
902 186 : table->ncols += 1;
903 :
904 186 : return 0;
905 186 : }
906 :
907 100 : bool table_add_to_context (sqlite3 *db, cloudsync_context *data, table_algo algo, const char *table_name) {
908 : DEBUG_DBFUNCTION("cloudsync_context_add_table %s", table_name);
909 :
910 : // check if table is already in the global context and in that case just return
911 100 : cloudsync_table_context *table = table_lookup(data, table_name);
912 100 : if (table) return true;
913 :
914 : // is there any space available?
915 94 : if (data->tables_alloc <= data->tables_count + 1) {
916 : // realloc tables
917 0 : cloudsync_table_context **clone = (cloudsync_table_context **)cloudsync_memory_realloc(data->tables, sizeof(cloudsync_table_context) * data->tables_alloc + CLOUDSYNC_INIT_NTABLES);
918 0 : if (!clone) goto abort_add_table;
919 :
920 : // reset new entries
921 0 : for (int i=data->tables_alloc; i<data->tables_alloc + CLOUDSYNC_INIT_NTABLES; ++i) {
922 0 : clone[i] = NULL;
923 0 : }
924 :
925 : // replace old ptr
926 0 : data->tables = clone;
927 0 : data->tables_alloc += CLOUDSYNC_INIT_NTABLES;
928 0 : }
929 :
930 : // setup a new table context
931 94 : table = table_create(table_name, algo);
932 94 : if (!table) return false;
933 :
934 : // fill remaining metadata in the table
935 94 : char *sql = cloudsync_memory_mprintf("SELECT count(*) FROM pragma_table_info('%q') WHERE pk>0;", table_name);
936 94 : if (!sql) goto abort_add_table;
937 94 : table->npks = (int)dbutils_int_select(db, sql);
938 94 : cloudsync_memory_free(sql);
939 94 : if (table->npks == -1) {
940 0 : dbutils_context_result_error(data->sqlite_ctx, "%s", sqlite3_errmsg(db));
941 0 : goto abort_add_table;
942 : }
943 :
944 94 : if (table->npks == 0) {
945 : #if CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
946 0 : return false;
947 : #else
948 : table->rowid_only = true;
949 : table->npks = 1; // rowid
950 : #endif
951 : }
952 :
953 94 : sql = cloudsync_memory_mprintf("SELECT count(*) FROM pragma_table_info('%q') WHERE pk=0;", table_name);
954 94 : if (!sql) goto abort_add_table;
955 94 : int64_t ncols = (int64_t)dbutils_int_select(db, sql);
956 94 : cloudsync_memory_free(sql);
957 94 : if (ncols == -1) {
958 0 : dbutils_context_result_error(data->sqlite_ctx, "%s", sqlite3_errmsg(db));
959 0 : goto abort_add_table;
960 : }
961 :
962 94 : int rc = table_add_stmts(db, table, (int)ncols);
963 94 : if (rc != SQLITE_OK) goto abort_add_table;
964 :
965 : // a table with only pk(s) is totally legal
966 94 : if (ncols > 0) {
967 63 : table->col_name = (char **)cloudsync_memory_alloc((sqlite3_uint64)(sizeof(char *) * ncols));
968 63 : if (!table->col_name) goto abort_add_table;
969 :
970 63 : table->col_id = (int *)cloudsync_memory_alloc((sqlite3_uint64)(sizeof(int) * ncols));
971 63 : if (!table->col_id) goto abort_add_table;
972 :
973 63 : table->col_merge_stmt = (sqlite3_stmt **)cloudsync_memory_alloc((sqlite3_uint64)(sizeof(sqlite3_stmt *) * ncols));
974 63 : if (!table->col_merge_stmt) goto abort_add_table;
975 :
976 63 : table->col_value_stmt = (sqlite3_stmt **)cloudsync_memory_alloc((sqlite3_uint64)(sizeof(sqlite3_stmt *) * ncols));
977 63 : if (!table->col_value_stmt) goto abort_add_table;
978 :
979 63 : sql = cloudsync_memory_mprintf("SELECT name, cid FROM pragma_table_info('%q') WHERE pk=0 ORDER BY cid;", table_name);
980 63 : if (!sql) goto abort_add_table;
981 63 : int rc = sqlite3_exec(db, sql, table_add_to_context_cb, (void *)table, NULL);
982 63 : cloudsync_memory_free(sql);
983 63 : if (rc == SQLITE_ABORT) goto abort_add_table;
984 63 : }
985 :
986 : // lookup the first free slot
987 137 : for (int i=0; i<data->tables_alloc; ++i) {
988 137 : if (data->tables[i] == NULL) {
989 94 : data->tables[i] = table;
990 94 : if (i > data->tables_count - 1) ++data->tables_count;
991 94 : break;
992 : }
993 43 : }
994 :
995 94 : return true;
996 :
997 : abort_add_table:
998 0 : table_free(table);
999 0 : return false;
1000 100 : }
1001 :
1002 6 : bool table_remove_from_context (cloudsync_context *data, cloudsync_table_context *table) {
1003 6 : return (table_remove(data, table->name) != -1);
1004 : }
1005 :
1006 1505 : sqlite3_stmt *cloudsync_colvalue_stmt (sqlite3 *db, cloudsync_context *data, const char *tbl_name, bool *persistent) {
1007 1505 : sqlite3_stmt *vm = NULL;
1008 :
1009 1505 : cloudsync_table_context *table = table_lookup(data, tbl_name);
1010 1505 : if (table) {
1011 1505 : char *col_name = NULL;
1012 1505 : if (table->ncols > 0) {
1013 1390 : col_name = table->col_name[0];
1014 : // retrieve col_value precompiled statement
1015 1390 : vm = table_column_lookup(table, col_name, false, NULL);
1016 1390 : *persistent = true;
1017 1390 : } else {
1018 115 : char *sql = table_build_value_sql(db, table, "*");
1019 115 : sqlite3_prepare_v2(db, sql, -1, &vm, NULL);
1020 115 : cloudsync_memory_free(sql);
1021 115 : *persistent = false;
1022 : }
1023 1505 : }
1024 :
1025 1505 : return vm;
1026 : }
1027 :
1028 : // MARK: - Merge Insert -
1029 :
1030 1040 : sqlite3_int64 merge_get_local_cl (cloudsync_table_context *table, const char *pk, int pklen, const char **err) {
1031 1040 : sqlite3_stmt *vm = table->meta_local_cl_stmt;
1032 1040 : sqlite3_int64 result = -1;
1033 :
1034 1040 : int rc = sqlite3_bind_blob(vm, 1, (const void *)pk, pklen, SQLITE_STATIC);
1035 1040 : if (rc != SQLITE_OK) goto cleanup;
1036 :
1037 1040 : rc = sqlite3_bind_blob(vm, 2, (const void *)pk, pklen, SQLITE_STATIC);
1038 1040 : if (rc != SQLITE_OK) goto cleanup;
1039 :
1040 1040 : rc = sqlite3_step(vm);
1041 1040 : if (rc == SQLITE_ROW) result = sqlite3_column_int64(vm, 0);
1042 0 : else if (rc == SQLITE_DONE) result = 0;
1043 :
1044 : cleanup:
1045 1040 : if (result == -1) *err = sqlite3_errmsg(sqlite3_db_handle(vm));
1046 1040 : stmt_reset(vm);
1047 1040 : return result;
1048 : }
1049 :
1050 746 : int merge_get_col_version (cloudsync_table_context *table, const char *col_name, const char *pk, int pklen, sqlite3_int64 *version, const char **err) {
1051 746 : sqlite3_stmt *vm = table->meta_col_version_stmt;
1052 :
1053 746 : int rc = sqlite3_bind_blob(vm, 1, (const void *)pk, pklen, SQLITE_STATIC);
1054 746 : if (rc != SQLITE_OK) goto cleanup;
1055 :
1056 746 : rc = sqlite3_bind_text(vm, 2, col_name, -1, SQLITE_STATIC);
1057 746 : if (rc != SQLITE_OK) goto cleanup;
1058 :
1059 746 : rc = sqlite3_step(vm);
1060 1281 : if (rc == SQLITE_ROW) {
1061 535 : *version = sqlite3_column_int64(vm, 0);
1062 535 : rc = SQLITE_OK;
1063 535 : }
1064 :
1065 : cleanup:
1066 746 : if ((rc != SQLITE_OK) && (rc != SQLITE_DONE)) *err = sqlite3_errmsg(sqlite3_db_handle(vm));
1067 746 : stmt_reset(vm);
1068 746 : return rc;
1069 : }
1070 :
1071 3451 : int merge_set_winner_clock (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pk_len, const char *colname, sqlite3_int64 col_version, sqlite3_int64 db_version, const char *site_id, int site_len, sqlite3_int64 seq, sqlite3_int64 *rowid, const char **err) {
1072 :
1073 : // get/set site_id
1074 3451 : sqlite3_stmt *vm = data->getset_siteid_stmt;
1075 3451 : int rc = sqlite3_bind_blob(vm, 1, (const void *)site_id, site_len, SQLITE_STATIC);
1076 3451 : if (rc != SQLITE_OK) goto cleanup_merge;
1077 :
1078 3451 : rc = sqlite3_step(vm);
1079 3451 : if (rc != SQLITE_ROW) goto cleanup_merge;
1080 :
1081 3451 : int64_t ord = sqlite3_column_int64(vm, 0);
1082 3451 : stmt_reset(vm);
1083 :
1084 3451 : vm = table->meta_winner_clock_stmt;
1085 3451 : rc = sqlite3_bind_blob(vm, 1, (const void *)pk, pk_len, SQLITE_STATIC);
1086 3451 : if (rc != SQLITE_OK) goto cleanup_merge;
1087 :
1088 3451 : rc = sqlite3_bind_text(vm, 2, (colname) ? colname : CLOUDSYNC_TOMBSTONE_VALUE, -1, SQLITE_STATIC);
1089 3451 : if (rc != SQLITE_OK) goto cleanup_merge;
1090 :
1091 3451 : rc = sqlite3_bind_int64(vm, 3, col_version);
1092 3451 : if (rc != SQLITE_OK) goto cleanup_merge;
1093 :
1094 3451 : rc = sqlite3_bind_int64(vm, 4, db_version);
1095 3451 : if (rc != SQLITE_OK) goto cleanup_merge;
1096 :
1097 3451 : rc = sqlite3_bind_int64(vm, 5, seq);
1098 3451 : if (rc != SQLITE_OK) goto cleanup_merge;
1099 :
1100 3451 : rc = sqlite3_bind_int64(vm, 6, ord);
1101 3451 : if (rc != SQLITE_OK) goto cleanup_merge;
1102 :
1103 3451 : rc = sqlite3_step(vm);
1104 6902 : if (rc == SQLITE_ROW) {
1105 3451 : *rowid = sqlite3_column_int64(vm, 0);
1106 3451 : rc = SQLITE_OK;
1107 3451 : }
1108 :
1109 : cleanup_merge:
1110 3451 : if (rc != SQLITE_OK) *err = sqlite3_errmsg(sqlite3_db_handle(vm));
1111 3451 : stmt_reset(vm);
1112 3451 : return rc;
1113 : }
1114 :
1115 3393 : int merge_insert_col (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, const char *col_name, sqlite3_value *col_value, sqlite3_int64 col_version, sqlite3_int64 db_version, const char *site_id, int site_len, sqlite3_int64 seq, sqlite3_int64 *rowid, const char **err) {
1116 : int index;
1117 3393 : sqlite3_stmt *vm = table_column_lookup(table, col_name, true, &index);
1118 3393 : if (vm == NULL) {
1119 0 : *err = "Unable to retrieve column merge precompiled statement in merge_insert_col.";
1120 0 : return SQLITE_MISUSE;
1121 : }
1122 :
1123 : // INSERT INTO table (pk1, pk2, col_name) VALUES (?, ?, ?) ON CONFLICT DO UPDATE SET col_name=?;"
1124 :
1125 : // bind primary key(s)
1126 3393 : int rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, vm);
1127 3393 : if (rc < 0) {
1128 0 : *err = sqlite3_errmsg(sqlite3_db_handle(vm));
1129 0 : rc = sqlite3_errcode(sqlite3_db_handle(vm));
1130 0 : stmt_reset(vm);
1131 0 : return rc;
1132 : }
1133 :
1134 : // bind value
1135 3393 : if (col_value) {
1136 3393 : rc = sqlite3_bind_value(vm, table->npks+1, col_value);
1137 3393 : if (rc == SQLITE_OK) rc = sqlite3_bind_value(vm, table->npks+2, col_value);
1138 3393 : if (rc != SQLITE_OK) {
1139 0 : *err = sqlite3_errmsg(sqlite3_db_handle(vm));
1140 0 : stmt_reset(vm);
1141 0 : return rc;
1142 : }
1143 :
1144 3393 : }
1145 :
1146 : // perform real operation and disable triggers
1147 :
1148 : // in case of GOS we reused the table->col_merge_stmt statement
1149 : // which looks like: INSERT INTO table (pk1, pk2, col_name) VALUES (?, ?, ?) ON CONFLICT DO UPDATE SET col_name=?;"
1150 : // but the UPDATE in the CONFLICT statement would return SQLITE_CONSTRAINT because the trigger raises the error
1151 : // the trick is to disable that trigger before executing the statement
1152 3393 : if (table->algo == table_algo_crdt_gos) table->enabled = 0;
1153 3393 : SYNCBIT_SET(data);
1154 3393 : rc = sqlite3_step(vm);
1155 : DEBUG_MERGE("merge_insert(%02x%02x): %s (%d)", data->site_id[UUID_LEN-2], data->site_id[UUID_LEN-1], sqlite3_expanded_sql(vm), rc);
1156 3393 : stmt_reset(vm);
1157 3393 : SYNCBIT_RESET(data);
1158 3393 : if (table->algo == table_algo_crdt_gos) table->enabled = 1;
1159 :
1160 3393 : if (rc != SQLITE_DONE) {
1161 0 : *err = sqlite3_errmsg(sqlite3_db_handle(vm));
1162 0 : return rc;
1163 : }
1164 :
1165 3393 : return merge_set_winner_clock(data, table, pk, pklen, col_name, col_version, db_version, site_id, site_len, seq, rowid, err);
1166 3393 : }
1167 :
1168 34 : int merge_delete (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, const char *colname, sqlite3_int64 cl, sqlite3_int64 db_version, const char *site_id, int site_len, sqlite3_int64 seq, sqlite3_int64 *rowid, const char **err) {
1169 34 : int rc = SQLITE_OK;
1170 :
1171 : // reset return value
1172 34 : *rowid = 0;
1173 :
1174 : // bind pk
1175 34 : sqlite3_stmt *vm = table->real_merge_delete_stmt;
1176 34 : rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, vm);
1177 34 : if (rc < 0) {
1178 0 : *err = sqlite3_errmsg(sqlite3_db_handle(vm));
1179 0 : rc = sqlite3_errcode(sqlite3_db_handle(vm));
1180 0 : stmt_reset(vm);
1181 0 : return rc;
1182 : }
1183 :
1184 : // perform real operation and disable triggers
1185 34 : SYNCBIT_SET(data);
1186 34 : rc = sqlite3_step(vm);
1187 : DEBUG_MERGE("merge_delete(%02x%02x): %s (%d)", data->site_id[UUID_LEN-2], data->site_id[UUID_LEN-1], sqlite3_expanded_sql(vm), rc);
1188 34 : stmt_reset(vm);
1189 34 : SYNCBIT_RESET(data);
1190 34 : if (rc == SQLITE_DONE) rc = SQLITE_OK;
1191 34 : if (rc != SQLITE_OK) {
1192 0 : *err = sqlite3_errmsg(sqlite3_db_handle(vm));
1193 0 : return rc;
1194 : }
1195 :
1196 34 : rc = merge_set_winner_clock(data, table, pk, pklen, colname, cl, db_version, site_id, site_len, seq, rowid, err);
1197 34 : if (rc != SQLITE_OK) return rc;
1198 :
1199 : // drop clocks _after_ setting the winner clock so we don't lose track of the max db_version!!
1200 : // this must never come before `set_winner_clock`
1201 34 : vm = table->meta_merge_delete_drop;
1202 34 : rc = sqlite3_bind_blob(vm, 1, (const void *)pk, pklen, SQLITE_STATIC);
1203 34 : if (rc == SQLITE_OK) rc = sqlite3_step(vm);
1204 34 : stmt_reset(vm);
1205 34 : if (rc == SQLITE_DONE) rc = SQLITE_OK;
1206 34 : if (rc != SQLITE_OK) {
1207 0 : *err = sqlite3_errmsg(sqlite3_db_handle(vm));
1208 0 : }
1209 :
1210 34 : return rc;
1211 34 : }
1212 :
1213 24 : int merge_zeroclock_on_resurrect(cloudsync_table_context *table, sqlite3_int64 db_version, const char *pk, int pklen, const char **err) {
1214 24 : sqlite3_stmt *vm = table->meta_zero_clock_stmt;
1215 :
1216 24 : int rc = sqlite3_bind_int64(vm, 1, db_version);
1217 24 : if (rc != SQLITE_OK) goto cleanup;
1218 :
1219 24 : rc = sqlite3_bind_blob(vm, 2, (const void *)pk, pklen, SQLITE_STATIC);
1220 24 : if (rc != SQLITE_OK) goto cleanup;
1221 :
1222 24 : rc = sqlite3_step(vm);
1223 24 : if (rc == SQLITE_DONE) rc = SQLITE_OK;
1224 :
1225 : cleanup:
1226 24 : if (rc != SQLITE_OK) *err = sqlite3_errmsg(sqlite3_db_handle(vm));
1227 24 : stmt_reset(vm);
1228 24 : return rc;
1229 : }
1230 :
1231 : // executed only if insert_cl == local_cl
1232 746 : int merge_did_cid_win (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, sqlite3_value *insert_value, const char *site_id, int site_len, const char *col_name, sqlite3_int64 col_version, bool *didwin_flag, const char **err) {
1233 :
1234 746 : if (col_name == NULL) col_name = CLOUDSYNC_TOMBSTONE_VALUE;
1235 :
1236 : sqlite3_int64 local_version;
1237 746 : int rc = merge_get_col_version(table, col_name, pk, pklen, &local_version, err);
1238 746 : if (rc == SQLITE_DONE) {
1239 : // no rows returned, the incoming change wins if there's nothing there locally
1240 211 : *didwin_flag = true;
1241 211 : return SQLITE_OK;
1242 : }
1243 535 : if (rc != SQLITE_OK) return rc;
1244 :
1245 : // rc == SQLITE_OK, means that a row with a version exists
1246 535 : if (local_version != col_version) {
1247 38 : if (col_version > local_version) {*didwin_flag = true; return SQLITE_OK;}
1248 13 : if (col_version < local_version) {*didwin_flag = false; return SQLITE_OK;}
1249 0 : }
1250 :
1251 : // rc == SQLITE_ROW and col_version == local_version, need to compare values
1252 :
1253 : // retrieve col_value precompiled statement
1254 497 : sqlite3_stmt *vm = table_column_lookup(table, col_name, false, NULL);
1255 497 : if (!vm) {
1256 0 : *err = "Unable to retrieve column value precompiled statement in merge_did_cid_win.";
1257 0 : return SQLITE_ERROR;
1258 : }
1259 :
1260 : // bind primary key values
1261 497 : rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, (void *)vm);
1262 497 : if (rc < 0) {
1263 0 : *err = sqlite3_errmsg(sqlite3_db_handle(vm));
1264 0 : rc = sqlite3_errcode(sqlite3_db_handle(vm));
1265 0 : stmt_reset(vm);
1266 0 : return rc;
1267 : }
1268 :
1269 : // execute vm
1270 : sqlite3_value *local_value;
1271 497 : rc = sqlite3_step(vm);
1272 497 : if (rc == SQLITE_DONE) {
1273 : // meta entry exists but the actual value is missing
1274 : // we should allow the value_compare function to make a decision
1275 : // value_compare has been modified to handle the case where lvalue is NULL
1276 0 : local_value = NULL;
1277 0 : rc = SQLITE_OK;
1278 497 : } else if (rc == SQLITE_ROW) {
1279 497 : local_value = sqlite3_column_value(vm, 0);
1280 497 : rc = SQLITE_OK;
1281 497 : } else {
1282 0 : goto cleanup;
1283 : }
1284 :
1285 : // compare values
1286 497 : int ret = dbutils_value_compare(insert_value, local_value);
1287 : // reset after compare, otherwise local value would be deallocated
1288 497 : vm = stmt_reset(vm);
1289 :
1290 497 : bool compare_site_id = (ret == 0 && data->merge_equal_values == true);
1291 497 : if (!compare_site_id) {
1292 497 : *didwin_flag = (ret > 0);
1293 497 : goto cleanup;
1294 : }
1295 :
1296 : // values are the same and merge_equal_values is true
1297 0 : vm = table->meta_site_id_stmt;
1298 0 : rc = sqlite3_bind_blob(vm, 1, (const void *)pk, pklen, SQLITE_STATIC);
1299 0 : if (rc != SQLITE_OK) goto cleanup;
1300 :
1301 0 : rc = sqlite3_bind_text(vm, 2, col_name, -1, SQLITE_STATIC);
1302 0 : if (rc != SQLITE_OK) goto cleanup;
1303 :
1304 0 : rc = sqlite3_step(vm);
1305 0 : if (rc == SQLITE_ROW) {
1306 0 : const void *local_site_id = sqlite3_column_blob(vm, 0);
1307 0 : ret = memcmp(site_id, local_site_id, site_len);
1308 0 : *didwin_flag = (ret > 0);
1309 0 : stmt_reset(vm);
1310 0 : return SQLITE_OK;
1311 : }
1312 :
1313 : // handle error condition here
1314 0 : stmt_reset(vm);
1315 0 : *err = "Unable to find site_id for previous change. The cloudsync table is probably corrupted.";
1316 0 : return SQLITE_ERROR;
1317 :
1318 : cleanup:
1319 497 : if (rc != SQLITE_OK) *err = sqlite3_errmsg(sqlite3_db_handle(vm));
1320 497 : if (vm) stmt_reset(vm);
1321 497 : return rc;
1322 746 : }
1323 :
1324 24 : int merge_sentinel_only_insert (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, sqlite3_int64 cl, sqlite3_int64 db_version, const char *site_id, int site_len, sqlite3_int64 seq, sqlite3_int64 *rowid, const char **err) {
1325 :
1326 : // reset return value
1327 24 : *rowid = 0;
1328 :
1329 : // bind pk
1330 24 : sqlite3_stmt *vm = table->real_merge_sentinel_stmt;
1331 24 : int rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, vm);
1332 24 : if (rc < 0) {
1333 0 : *err = sqlite3_errmsg(sqlite3_db_handle(vm));
1334 0 : rc = sqlite3_errcode(sqlite3_db_handle(vm));
1335 0 : stmt_reset(vm);
1336 0 : return rc;
1337 : }
1338 :
1339 : // perform real operation and disable triggers
1340 24 : SYNCBIT_SET(data);
1341 24 : rc = sqlite3_step(vm);
1342 24 : stmt_reset(vm);
1343 24 : SYNCBIT_RESET(data);
1344 24 : if (rc == SQLITE_DONE) rc = SQLITE_OK;
1345 24 : if (rc != SQLITE_OK) {
1346 0 : *err = sqlite3_errmsg(sqlite3_db_handle(vm));
1347 0 : return rc;
1348 : }
1349 :
1350 24 : rc = merge_zeroclock_on_resurrect(table, db_version, pk, pklen, err);
1351 24 : if (rc != SQLITE_OK) return rc;
1352 :
1353 24 : return merge_set_winner_clock(data, table, pk, pklen, NULL, cl, db_version, site_id, site_len, seq, rowid, err);
1354 24 : }
1355 :
1356 3150 : int cloudsync_merge_insert_gos (sqlite3_vtab *vtab, cloudsync_context *data, cloudsync_table_context *table, const char *insert_pk, int insert_pk_len, const char *insert_name, sqlite3_value *insert_value, sqlite3_int64 insert_col_version, sqlite3_int64 insert_db_version, const char *insert_site_id, int insert_site_id_len, sqlite3_int64 insert_seq, sqlite3_int64 *rowid) {
1357 : // Grow-Only Set (GOS) Algorithm: Only insertions are allowed, deletions and updates are prevented from a trigger.
1358 :
1359 3150 : const char *err = NULL;
1360 6300 : int rc = merge_insert_col(data, table, insert_pk, insert_pk_len, insert_name, insert_value, insert_col_version, insert_db_version,
1361 3150 : insert_site_id, insert_site_id_len, insert_seq, rowid, &err);
1362 3150 : if (rc != SQLITE_OK) {
1363 0 : cloudsync_vtab_set_error(vtab, "Unable to perform GOS merge_insert_col: %s", err);
1364 0 : }
1365 :
1366 3150 : return rc;
1367 : }
1368 :
1369 4190 : int cloudsync_merge_insert (sqlite3_vtab *vtab, int argc, sqlite3_value **argv, sqlite3_int64 *rowid) {
1370 : // this function performs the merging logic for an insert in a cloud-synchronized table. It handles
1371 : // different scenarios including conflicts, causal lengths, delete operations, and resurrecting rows
1372 : // based on the incoming data (from remote nodes or clients) and the local database state
1373 :
1374 : // this function handles different CRDT algorithms (GOS, DWS, AWS, and CLS).
1375 : // the merging strategy is determined based on the table->algo value.
1376 :
1377 : // meta table declaration:
1378 : // tbl TEXT NOT NULL, pk BLOB NOT NULL, col_name TEXT NOT NULL,"
1379 : // "col_value ANY, col_version INTEGER NOT NULL, db_version INTEGER NOT NULL,"
1380 : // "site_id BLOB NOT NULL, cl INTEGER NOT NULL, seq INTEGER NOT NULL
1381 :
1382 : // meta information to retrieve from arguments:
1383 : // argv[0] -> table name (TEXT)
1384 : // argv[1] -> primary key (BLOB)
1385 : // argv[2] -> column name (TEXT or NULL if sentinel)
1386 : // argv[3] -> column value (ANY)
1387 : // argv[4] -> column version (INTEGER)
1388 : // argv[5] -> database version (INTEGER)
1389 : // argv[6] -> site ID (BLOB, identifies the origin of the update)
1390 : // argv[7] -> causal length (INTEGER, tracks the order of operations)
1391 : // argv[8] -> sequence number (INTEGER, unique per operation)
1392 :
1393 : // extract table name
1394 4190 : const char *insert_tbl = (const char *)sqlite3_value_text(argv[0]);
1395 :
1396 : // lookup table
1397 4190 : cloudsync_context *data = cloudsync_vtab_get_context(vtab);
1398 4190 : cloudsync_table_context *table = table_lookup(data, insert_tbl);
1399 4190 : if (!table) return cloudsync_vtab_set_error(vtab, "Unable to find table %s,", insert_tbl);
1400 :
1401 : // extract the remaining fields from the input values
1402 4190 : const char *insert_pk = (const char *)sqlite3_value_blob(argv[1]);
1403 4190 : int insert_pk_len = sqlite3_value_bytes(argv[1]);
1404 4190 : const char *insert_name = (sqlite3_value_type(argv[2]) == SQLITE_NULL) ? CLOUDSYNC_TOMBSTONE_VALUE : (const char *)sqlite3_value_text(argv[2]);
1405 4190 : sqlite3_value *insert_value = argv[3];
1406 4190 : sqlite3_int64 insert_col_version = sqlite3_value_int64(argv[4]);
1407 4190 : sqlite3_int64 insert_db_version = sqlite3_value_int64(argv[5]);
1408 4190 : const char *insert_site_id = (const char *)sqlite3_value_blob(argv[6]);
1409 4190 : int insert_site_id_len = sqlite3_value_bytes(argv[6]);
1410 4190 : sqlite3_int64 insert_cl = sqlite3_value_int64(argv[7]);
1411 4190 : sqlite3_int64 insert_seq = sqlite3_value_int64(argv[8]);
1412 4190 : const char *err = NULL;
1413 :
1414 : // perform different logic for each different table algorithm
1415 4190 : if (table->algo == table_algo_crdt_gos) return cloudsync_merge_insert_gos(vtab, data, table, insert_pk, insert_pk_len, insert_name, insert_value, insert_col_version, insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid);
1416 :
1417 : // Handle DWS and AWS algorithms here
1418 : // Delete-Wins Set (DWS): table_algo_crdt_dws
1419 : // Add-Wins Set (AWS): table_algo_crdt_aws
1420 :
1421 : // Causal-Length Set (CLS) Algorithm (default)
1422 :
1423 : // compute the local causal length for the row based on the primary key
1424 : // the causal length is used to determine the order of operations and resolve conflicts.
1425 1040 : sqlite3_int64 local_cl = merge_get_local_cl(table, insert_pk, insert_pk_len, &err);
1426 1040 : if (local_cl < 0) {
1427 0 : return cloudsync_vtab_set_error(vtab, "Unable to compute local causal length: %s", err);
1428 : }
1429 :
1430 : // if the incoming causal length is older than the local causal length, we can safely ignore it
1431 : // because the local changes are more recent
1432 1040 : if (insert_cl < local_cl) return SQLITE_OK;
1433 :
1434 : // check if the operation is a delete by examining the causal length
1435 : // even causal lengths typically signify delete operations
1436 1020 : bool is_delete = (insert_cl % 2 == 0);
1437 1020 : if (is_delete) {
1438 : // if it's a delete, check if the local state is at the same causal length
1439 : // if it is, no further action is needed
1440 125 : if (local_cl == insert_cl) return SQLITE_OK;
1441 :
1442 : // perform a delete merge if the causal length is newer than the local one
1443 68 : int rc = merge_delete(data, table, insert_pk, insert_pk_len, insert_name, insert_col_version,
1444 34 : insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid, &err);
1445 34 : if (rc != SQLITE_OK) cloudsync_vtab_set_error(vtab, "Unable to perform merge_delete: %s", err);
1446 34 : return rc;
1447 : }
1448 :
1449 : // if the operation is a sentinel-only insert (indicating a new row or resurrected row with no column update), handle it separately.
1450 895 : bool is_sentinel_only = (strcmp(insert_name, CLOUDSYNC_TOMBSTONE_VALUE) == 0);
1451 895 : if (is_sentinel_only) {
1452 149 : if (local_cl == insert_cl) return SQLITE_OK;
1453 :
1454 : // perform a sentinel-only insert to track the existence of the row
1455 48 : int rc = merge_sentinel_only_insert(data, table, insert_pk, insert_pk_len, insert_col_version,
1456 24 : insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid, &err);
1457 24 : if (rc != SQLITE_OK) cloudsync_vtab_set_error(vtab, "Unable to perform merge_sentinel_only_insert: %s", err);
1458 24 : return rc;
1459 : }
1460 :
1461 : // from this point I can be sure that insert_name is not sentinel
1462 :
1463 : // handle the case where a row is being resurrected (e.g., after a delete, a new insert for the same row)
1464 : // odd causal lengths can "resurrect" rows
1465 746 : bool needs_resurrect = (insert_cl > local_cl && insert_cl % 2 == 1);
1466 746 : bool row_exists_locally = local_cl != 0;
1467 :
1468 : // if a resurrection is needed, insert a sentinel to mark the row as alive
1469 : // this handles out-of-order deliveries where the row was deleted and is now being re-inserted
1470 746 : if (needs_resurrect && (row_exists_locally || (!row_exists_locally && insert_cl > 1))) {
1471 0 : int rc = merge_sentinel_only_insert(data, table, insert_pk, insert_pk_len, insert_cl,
1472 0 : insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid, &err);
1473 0 : if (rc != SQLITE_OK) {
1474 0 : cloudsync_vtab_set_error(vtab, "Unable to perform merge_sentinel_only_insert: %s", err);
1475 0 : return rc;
1476 : }
1477 0 : }
1478 :
1479 : // at this point, we determine whether the incoming change wins based on causal length
1480 : // this can be due to a resurrection, a non-existent local row, or a conflict resolution
1481 746 : bool flag = false;
1482 746 : int rc = merge_did_cid_win(data, table, insert_pk, insert_pk_len, insert_value, insert_site_id, insert_site_id_len, insert_name, insert_col_version, &flag, &err);
1483 746 : if (rc != SQLITE_OK) {
1484 0 : cloudsync_vtab_set_error(vtab, "Unable to perform merge_did_cid_win: %s", err);
1485 0 : return rc;
1486 : }
1487 :
1488 : // check if the incoming change wins and should be applied
1489 746 : bool does_cid_win = ((needs_resurrect) || (!row_exists_locally) || (flag));
1490 746 : if (!does_cid_win) return SQLITE_OK;
1491 :
1492 : // perform the final column insert or update if the incoming change wins
1493 243 : rc = merge_insert_col(data, table, insert_pk, insert_pk_len, insert_name, insert_value, insert_col_version, insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid, &err);
1494 243 : if (rc != SQLITE_OK) cloudsync_vtab_set_error(vtab, "Unable to perform merge_insert_col: %s", err);
1495 243 : return rc;
1496 4190 : }
1497 :
1498 : // MARK: - Private -
1499 :
1500 46 : bool cloudsync_config_exists (sqlite3 *db) {
1501 46 : return dbutils_table_exists(db, CLOUDSYNC_SITEID_NAME) == true;
1502 : }
1503 :
1504 46 : void *cloudsync_context_create (void) {
1505 46 : cloudsync_context *data = (cloudsync_context *)cloudsync_memory_zeroalloc((uint64_t)(sizeof(cloudsync_context)));
1506 : DEBUG_SETTINGS("cloudsync_context_create %p", data);
1507 :
1508 46 : data->libversion = CLOUDSYNC_VERSION;
1509 : #if CLOUDSYNC_DEBUG
1510 : data->debug = 1;
1511 : #endif
1512 :
1513 : // allocate space for 128 tables (it can grow if needed)
1514 46 : data->tables = (cloudsync_table_context **)cloudsync_memory_zeroalloc((uint64_t)(CLOUDSYNC_INIT_NTABLES * sizeof(cloudsync_table_context *)));
1515 46 : if (!data->tables) {
1516 0 : cloudsync_memory_free(data);
1517 0 : return NULL;
1518 : }
1519 46 : data->tables_alloc = CLOUDSYNC_INIT_NTABLES;
1520 46 : data->tables_count = 0;
1521 :
1522 46 : return data;
1523 46 : }
1524 :
1525 46 : void cloudsync_context_free (void *ptr) {
1526 : DEBUG_SETTINGS("cloudsync_context_free %p", ptr);
1527 46 : if (!ptr) return;
1528 :
1529 46 : cloudsync_context *data = (cloudsync_context*)ptr;
1530 46 : cloudsync_memory_free(data->tables);
1531 46 : cloudsync_memory_free(data);
1532 46 : }
1533 :
1534 154 : const char *cloudsync_context_init (sqlite3 *db, cloudsync_context *data, sqlite3_context *context) {
1535 154 : if (!data && context) data = (cloudsync_context *)sqlite3_user_data(context);
1536 :
1537 : // perform init just the first time, if the site_id field is not set.
1538 : // The data->site_id value could exists while settings tables don't exists if the
1539 : // cloudsync_context_init was previously called in init transaction that was rolled back
1540 : // because of an error during the init process.
1541 154 : if (data->site_id[0] == 0 || !dbutils_table_exists(db, CLOUDSYNC_SITEID_NAME)) {
1542 48 : if (dbutils_settings_init(db, data, context) != SQLITE_OK) return NULL;
1543 48 : if (stmts_add_tocontext(db, data) != SQLITE_OK) return NULL;
1544 48 : if (cloudsync_load_siteid(db, data) != SQLITE_OK) return NULL;
1545 :
1546 48 : data->sqlite_ctx = context;
1547 48 : data->schema_hash = dbutils_schema_hash(db);
1548 48 : }
1549 :
1550 154 : return (const char *)data->site_id;
1551 154 : }
1552 :
1553 242 : void cloudsync_sync_key(cloudsync_context *data, const char *key, const char *value) {
1554 : DEBUG_SETTINGS("cloudsync_sync_key key: %s value: %s", key, value);
1555 :
1556 : // sync data
1557 242 : if (strcmp(key, CLOUDSYNC_KEY_SCHEMAVERSION) == 0) {
1558 48 : data->schema_version = (int)strtol(value, NULL, 0);
1559 48 : return;
1560 : }
1561 :
1562 194 : if (strcmp(key, CLOUDSYNC_KEY_DEBUG) == 0) {
1563 0 : data->debug = 0;
1564 0 : if (value && (value[0] != 0) && (value[0] != '0')) data->debug = 1;
1565 0 : return;
1566 : }
1567 242 : }
1568 :
1569 : #if 0
1570 : void cloudsync_sync_table_key(cloudsync_context *data, const char *table, const char *column, const char *key, const char *value) {
1571 : DEBUG_SETTINGS("cloudsync_sync_table_key table: %s column: %s key: %s value: %s", table, column, key, value);
1572 : // Unused in this version
1573 : return;
1574 : }
1575 : #endif
1576 :
1577 2280 : int cloudsync_commit_hook (void *ctx) {
1578 2280 : cloudsync_context *data = (cloudsync_context *)ctx;
1579 :
1580 2280 : data->db_version = data->pending_db_version;
1581 2280 : data->pending_db_version = CLOUDSYNC_VALUE_NOTSET;
1582 2280 : data->seq = 0;
1583 :
1584 2280 : return SQLITE_OK;
1585 : }
1586 :
1587 1 : void cloudsync_rollback_hook (void *ctx) {
1588 1 : cloudsync_context *data = (cloudsync_context *)ctx;
1589 :
1590 1 : data->pending_db_version = CLOUDSYNC_VALUE_NOTSET;
1591 1 : data->seq = 0;
1592 1 : }
1593 :
1594 23 : int cloudsync_finalize_alter (sqlite3_context *context, cloudsync_context *data, cloudsync_table_context *table) {
1595 23 : int rc = SQLITE_OK;
1596 23 : sqlite3 *db = sqlite3_context_db_handle(context);
1597 :
1598 23 : db_version_check_uptodate(db, data);
1599 :
1600 : // If primary key columns change (in the schema)
1601 : // We need to drop, re-create and backfill
1602 : // the clock table.
1603 : // A change in pk columns means a change in all identities
1604 : // of all rows.
1605 : // We can determine this by comparing unique index on lookaside table vs
1606 : // pks on source table
1607 23 : char *errmsg = NULL;
1608 23 : char **result = NULL;
1609 : int nrows, ncols;
1610 23 : char *sql = cloudsync_memory_mprintf("SELECT name FROM pragma_table_info('%q') WHERE pk>0 ORDER BY pk;", table->name);
1611 23 : rc = sqlite3_get_table(db, sql, &result, &nrows, &ncols, NULL);
1612 23 : cloudsync_memory_free(sql);
1613 23 : if (rc != SQLITE_OK) {
1614 0 : DEBUG_SQLITE_ERROR(rc, "cloudsync_finalize_alter", db);
1615 0 : goto finalize;
1616 23 : } else if (errmsg || ncols != 1) {
1617 0 : rc = SQLITE_MISUSE;
1618 0 : goto finalize;
1619 : }
1620 :
1621 23 : bool pk_diff = false;
1622 23 : if (nrows != table->npks) {
1623 6 : pk_diff = true;
1624 6 : } else {
1625 51 : for (int i=0; i<nrows; ++i) {
1626 34 : if (strcmp(table->pk_name[i], result[i]) != 0) {
1627 0 : pk_diff = true;
1628 0 : break;
1629 : }
1630 34 : }
1631 : }
1632 :
1633 23 : if (pk_diff) {
1634 : // drop meta-table, it will be recreated
1635 6 : char *sql = cloudsync_memory_mprintf("DROP TABLE IF EXISTS \"%w_cloudsync\";", table->name);
1636 6 : rc = sqlite3_exec(db, sql, NULL, NULL, NULL);
1637 6 : cloudsync_memory_free(sql);
1638 6 : if (rc != SQLITE_OK) {
1639 0 : DEBUG_SQLITE_ERROR(rc, "cloudsync_finalize_alter", db);
1640 0 : goto finalize;
1641 : }
1642 6 : } else {
1643 : // compact meta-table
1644 : // delete entries for removed columns
1645 17 : char *sql = cloudsync_memory_mprintf("DELETE FROM \"%w_cloudsync\" WHERE \"col_name\" NOT IN ("
1646 : "SELECT name FROM pragma_table_info('%q') UNION SELECT '%s'"
1647 17 : ")", table->name, table->name, CLOUDSYNC_TOMBSTONE_VALUE);
1648 17 : rc = sqlite3_exec(db, sql, NULL, NULL, NULL);
1649 17 : cloudsync_memory_free(sql);
1650 17 : if (rc != SQLITE_OK) {
1651 0 : DEBUG_SQLITE_ERROR(rc, "cloudsync_finalize_alter", db);
1652 0 : goto finalize;
1653 : }
1654 :
1655 17 : char *singlequote_escaped_table_name = cloudsync_memory_mprintf("%q", table->name);
1656 17 : sql = cloudsync_memory_mprintf("SELECT group_concat('\"%w\".\"' || format('%%w', name) || '\"', ',') FROM pragma_table_info('%s') WHERE pk>0 ORDER BY pk;", singlequote_escaped_table_name, singlequote_escaped_table_name);
1657 17 : cloudsync_memory_free(singlequote_escaped_table_name);
1658 17 : if (!sql) {
1659 0 : rc = SQLITE_NOMEM;
1660 0 : goto finalize;
1661 : }
1662 17 : char *pkclause = dbutils_text_select(db, sql);
1663 17 : char *pkvalues = (pkclause) ? pkclause : "rowid";
1664 17 : cloudsync_memory_free(sql);
1665 :
1666 : // delete entries related to rows that no longer exist in the original table, but preserve tombstone
1667 17 : sql = cloudsync_memory_mprintf("DELETE FROM \"%w_cloudsync\" WHERE (\"col_name\" != '%s' OR (\"col_name\" = '%s' AND col_version %% 2 != 0)) AND NOT EXISTS (SELECT 1 FROM \"%w\" WHERE \"%w_cloudsync\".pk = cloudsync_pk_encode(%s) LIMIT 1);", table->name, CLOUDSYNC_TOMBSTONE_VALUE, CLOUDSYNC_TOMBSTONE_VALUE, table->name, table->name, pkvalues);
1668 17 : rc = sqlite3_exec(db, sql, NULL, NULL, NULL);
1669 17 : if (pkclause) cloudsync_memory_free(pkclause);
1670 17 : cloudsync_memory_free(sql);
1671 17 : if (rc != SQLITE_OK) {
1672 0 : DEBUG_SQLITE_ERROR(rc, "cloudsync_finalize_alter", db);
1673 0 : goto finalize;
1674 : }
1675 :
1676 : }
1677 :
1678 : char buf[256];
1679 23 : snprintf(buf, sizeof(buf), "%lld", data->db_version);
1680 23 : dbutils_settings_set_key_value(db, context, "pre_alter_dbversion", buf);
1681 :
1682 : finalize:
1683 23 : sqlite3_free_table(result);
1684 23 : sqlite3_free(errmsg);
1685 :
1686 23 : return rc;
1687 : }
1688 :
1689 98 : int cloudsync_refill_metatable (sqlite3 *db, cloudsync_context *data, const char *table_name) {
1690 98 : cloudsync_table_context *table = table_lookup(data, table_name);
1691 98 : if (!table) return SQLITE_INTERNAL;
1692 :
1693 98 : sqlite3_stmt *vm = NULL;
1694 98 : sqlite3_int64 db_version = db_version_next(db, data, CLOUDSYNC_VALUE_NOTSET);
1695 :
1696 98 : char *sql = cloudsync_memory_mprintf("SELECT group_concat('\"' || format('%%w', name) || '\"', ',') FROM pragma_table_info('%q') WHERE pk>0 ORDER BY pk;", table_name);
1697 98 : char *pkclause_identifiers = dbutils_text_select(db, sql);
1698 98 : char *pkvalues_identifiers = (pkclause_identifiers) ? pkclause_identifiers : "rowid";
1699 98 : cloudsync_memory_free(sql);
1700 :
1701 98 : sql = cloudsync_memory_mprintf("SELECT group_concat('cloudsync_pk_decode(pk, ' || pk || ') AS ' || '\"' || format('%%w', name) || '\"', ',') FROM pragma_table_info('%q') WHERE pk>0 ORDER BY pk;", table_name);
1702 98 : char *pkdecode = dbutils_text_select(db, sql);
1703 98 : char *pkdecodeval = (pkdecode) ? pkdecode : "cloudsync_pk_decode(pk, 1) AS rowid";
1704 98 : cloudsync_memory_free(sql);
1705 :
1706 98 : sql = cloudsync_memory_mprintf("SELECT group_concat('\"' || format('%%w', name) || '\"' || ' = cloudsync_pk_decode(pk, ' || pk || ')', ' AND ') FROM pragma_table_info('%q') WHERE pk>0 ORDER BY pk;", table_name);
1707 98 : char *pkonclause = dbutils_text_select(db, sql);
1708 98 : char *pkonclauseval = (pkonclause) ? pkonclause : "rowid = cloudsync_pk_decode(pk, 1) AS rowid";
1709 98 : cloudsync_memory_free(sql);
1710 :
1711 98 : sql = cloudsync_memory_mprintf("SELECT cloudsync_insert('%q', %s) FROM (SELECT %s FROM \"%w\" EXCEPT SELECT %s FROM \"%w_cloudsync\");", table_name, pkvalues_identifiers, pkvalues_identifiers, table_name, pkdecodeval, table_name);
1712 98 : int rc = sqlite3_exec(db, sql, NULL, NULL, NULL);
1713 98 : cloudsync_memory_free(sql);
1714 98 : if (rc != SQLITE_OK) goto finalize;
1715 :
1716 : // fill missing colums
1717 : // for each non-pk column:
1718 :
1719 98 : sql = cloudsync_memory_mprintf("SELECT cloudsync_pk_encode(%s) FROM \"%w\" LEFT JOIN \"%w_cloudsync\" ON %s AND \"%w_cloudsync\".col_name = ? WHERE \"%w_cloudsync\".db_version IS NULL", pkvalues_identifiers, table_name, table_name, pkonclauseval, table_name, table_name);
1720 98 : rc = sqlite3_prepare(db, sql, -1, &vm, NULL);
1721 98 : cloudsync_memory_free(sql);
1722 98 : if (rc != SQLITE_OK) goto finalize;
1723 :
1724 293 : for (int i=0; i<table->ncols; ++i) {
1725 195 : char *col_name = table->col_name[i];
1726 :
1727 195 : rc = sqlite3_bind_text(vm, 1, col_name, -1, SQLITE_STATIC);
1728 195 : if (rc != SQLITE_OK) goto finalize;
1729 :
1730 233 : while (1) {
1731 233 : rc = sqlite3_step(vm);
1732 233 : if (rc == SQLITE_ROW) {
1733 38 : const char *pk = (const char *)sqlite3_column_text(vm, 0);
1734 38 : size_t pklen = strlen(pk);
1735 38 : rc = local_mark_insert_or_update_meta(db, table, pk, pklen, col_name, db_version, BUMP_SEQ(data));
1736 233 : } else if (rc == SQLITE_DONE) {
1737 195 : rc = SQLITE_OK;
1738 195 : break;
1739 : } else {
1740 0 : break;
1741 : }
1742 : }
1743 195 : if (rc != SQLITE_OK) goto finalize;
1744 :
1745 195 : sqlite3_reset(vm);
1746 293 : }
1747 :
1748 : finalize:
1749 98 : if (rc != SQLITE_OK) DEBUG_ALWAYS("cloudsync_refill_metatable error: %s", sqlite3_errmsg(db));
1750 98 : if (pkclause_identifiers) cloudsync_memory_free(pkclause_identifiers);
1751 98 : if (pkdecode) cloudsync_memory_free(pkdecode);
1752 98 : if (pkonclause) cloudsync_memory_free(pkonclause);
1753 98 : if (vm) sqlite3_finalize(vm);
1754 98 : return rc;
1755 98 : }
1756 :
1757 : // MARK: - Local -
1758 :
1759 1 : int local_update_sentinel (sqlite3 *db, cloudsync_table_context *table, const char *pk, size_t pklen, sqlite3_int64 db_version, int seq) {
1760 1 : sqlite3_stmt *vm = table->meta_sentinel_update_stmt;
1761 1 : if (!vm) return -1;
1762 :
1763 1 : int rc = sqlite3_bind_int64(vm, 1, db_version);
1764 1 : if (rc != SQLITE_OK) goto cleanup;
1765 :
1766 1 : rc = sqlite3_bind_int(vm, 2, seq);
1767 1 : if (rc != SQLITE_OK) goto cleanup;
1768 :
1769 1 : rc = sqlite3_bind_blob(vm, 3, pk, (int)pklen, SQLITE_STATIC);
1770 1 : if (rc != SQLITE_OK) goto cleanup;
1771 :
1772 1 : rc = sqlite3_step(vm);
1773 1 : if (rc == SQLITE_DONE) rc = SQLITE_OK;
1774 :
1775 : cleanup:
1776 1 : DEBUG_SQLITE_ERROR(rc, "local_update_sentinel", db);
1777 1 : sqlite3_reset(vm);
1778 1 : return rc;
1779 1 : }
1780 :
1781 118 : int local_mark_insert_sentinel_meta (sqlite3 *db, cloudsync_table_context *table, const char *pk, size_t pklen, sqlite3_int64 db_version, int seq) {
1782 118 : sqlite3_stmt *vm = table->meta_sentinel_insert_stmt;
1783 118 : if (!vm) return -1;
1784 :
1785 118 : int rc = sqlite3_bind_blob(vm, 1, pk, (int)pklen, SQLITE_STATIC);
1786 118 : if (rc != SQLITE_OK) goto cleanup;
1787 :
1788 118 : rc = sqlite3_bind_int64(vm, 2, db_version);
1789 118 : if (rc != SQLITE_OK) goto cleanup;
1790 :
1791 118 : rc = sqlite3_bind_int(vm, 3, seq);
1792 118 : if (rc != SQLITE_OK) goto cleanup;
1793 :
1794 118 : rc = sqlite3_bind_int64(vm, 4, db_version);
1795 118 : if (rc != SQLITE_OK) goto cleanup;
1796 :
1797 118 : rc = sqlite3_bind_int(vm, 5, seq);
1798 118 : if (rc != SQLITE_OK) goto cleanup;
1799 :
1800 118 : rc = sqlite3_step(vm);
1801 118 : if (rc == SQLITE_DONE) rc = SQLITE_OK;
1802 :
1803 : cleanup:
1804 118 : DEBUG_SQLITE_ERROR(rc, "local_insert_sentinel", db);
1805 118 : sqlite3_reset(vm);
1806 118 : return rc;
1807 118 : }
1808 :
1809 738 : int local_mark_insert_or_update_meta_impl (sqlite3 *db, cloudsync_table_context *table, const char *pk, size_t pklen, const char *col_name, int col_version, sqlite3_int64 db_version, int seq) {
1810 :
1811 738 : sqlite3_stmt *vm = table->meta_row_insert_update_stmt;
1812 738 : if (!vm) return -1;
1813 :
1814 738 : int rc = sqlite3_bind_blob(vm, 1, pk, (int)pklen, SQLITE_STATIC);
1815 738 : if (rc != SQLITE_OK) goto cleanup;
1816 :
1817 738 : rc = sqlite3_bind_text(vm, 2, (col_name) ? col_name : CLOUDSYNC_TOMBSTONE_VALUE, -1, SQLITE_STATIC);
1818 738 : if (rc != SQLITE_OK) goto cleanup;
1819 :
1820 738 : rc = sqlite3_bind_int(vm, 3, col_version);
1821 738 : if (rc != SQLITE_OK) goto cleanup;
1822 :
1823 738 : rc = sqlite3_bind_int64(vm, 4, db_version);
1824 738 : if (rc != SQLITE_OK) goto cleanup;
1825 :
1826 738 : rc = sqlite3_bind_int(vm, 5, seq);
1827 738 : if (rc != SQLITE_OK) goto cleanup;
1828 :
1829 738 : rc = sqlite3_bind_int64(vm, 6, db_version);
1830 738 : if (rc != SQLITE_OK) goto cleanup;
1831 :
1832 738 : rc = sqlite3_bind_int(vm, 7, seq);
1833 738 : if (rc != SQLITE_OK) goto cleanup;
1834 :
1835 738 : rc = sqlite3_step(vm);
1836 738 : if (rc == SQLITE_DONE) rc = SQLITE_OK;
1837 :
1838 : cleanup:
1839 738 : DEBUG_SQLITE_ERROR(rc, "local_insert_or_update", db);
1840 738 : sqlite3_reset(vm);
1841 738 : return rc;
1842 738 : }
1843 :
1844 695 : int local_mark_insert_or_update_meta (sqlite3 *db, cloudsync_table_context *table, const char *pk, size_t pklen, const char *col_name, sqlite3_int64 db_version, int seq) {
1845 695 : return local_mark_insert_or_update_meta_impl(db, table, pk, pklen, col_name, 1, db_version, seq);
1846 : }
1847 :
1848 43 : int local_mark_delete_meta (sqlite3 *db, cloudsync_table_context *table, const char *pk, size_t pklen, sqlite3_int64 db_version, int seq) {
1849 43 : return local_mark_insert_or_update_meta_impl(db, table, pk, pklen, NULL, 2, db_version, seq);
1850 : }
1851 :
1852 12 : int local_drop_meta (sqlite3 *db, cloudsync_table_context *table, const char *pk, size_t pklen) {
1853 12 : sqlite3_stmt *vm = table->meta_row_drop_stmt;
1854 12 : if (!vm) return -1;
1855 :
1856 12 : int rc = sqlite3_bind_blob(vm, 1, pk, (int)pklen, SQLITE_STATIC);
1857 12 : if (rc != SQLITE_OK) goto cleanup;
1858 :
1859 12 : rc = sqlite3_step(vm);
1860 12 : if (rc == SQLITE_DONE) rc = SQLITE_OK;
1861 :
1862 : cleanup:
1863 12 : DEBUG_SQLITE_ERROR(rc, "local_drop_meta", db);
1864 12 : sqlite3_reset(vm);
1865 12 : return rc;
1866 12 : }
1867 :
1868 31 : int local_update_move_meta (sqlite3 *db, cloudsync_table_context *table, const char *pk, size_t pklen, const char *pk2, size_t pklen2, sqlite3_int64 db_version) {
1869 : /*
1870 : * This function moves non-sentinel metadata entries from an old primary key (OLD.pk)
1871 : * to a new primary key (NEW.pk) when a primary key change occurs.
1872 : *
1873 : * To ensure consistency and proper conflict resolution in a CRDT (Conflict-free Replicated Data Type) system,
1874 : * each non-sentinel metadata entry involved in the move must have a unique sequence value (seq).
1875 : *
1876 : * The `seq` is crucial for tracking the order of operations and for detecting and resolving conflicts
1877 : * during synchronization between replicas. Without a unique `seq` for each entry, concurrent updates
1878 : * may be applied incorrectly, leading to data inconsistency.
1879 : *
1880 : * When performing the update, a unique `seq` must be assigned to each metadata row. This can be achieved
1881 : * by either incrementing the maximum sequence value in the table or using a function (e.g., `bump_seq(data)`)
1882 : * that generates a unique sequence for each row. The update query should ensure that each row moved
1883 : * from OLD.pk to NEW.pk gets a distinct `seq` to maintain proper versioning and ordering of changes.
1884 : */
1885 :
1886 : // see https://github.com/sqliteai/sqlite-sync/blob/main/docs/PriKey.md for more details
1887 : // pk2 is the old pk
1888 :
1889 31 : sqlite3_stmt *vm = table->meta_update_move_stmt;
1890 31 : if (!vm) return -1;
1891 :
1892 : // new primary key
1893 31 : int rc = sqlite3_bind_blob(vm, 1, pk, (int)pklen, SQLITE_STATIC);
1894 31 : if (rc != SQLITE_OK) goto cleanup;
1895 :
1896 : // new db_version
1897 31 : rc = sqlite3_bind_int64(vm, 2, db_version);
1898 31 : if (rc != SQLITE_OK) goto cleanup;
1899 :
1900 : // old primary key
1901 31 : rc = sqlite3_bind_blob(vm, 3, pk2, (int)pklen2, SQLITE_STATIC);
1902 31 : if (rc != SQLITE_OK) goto cleanup;
1903 :
1904 31 : rc = sqlite3_step(vm);
1905 31 : if (rc == SQLITE_DONE) rc = SQLITE_OK;
1906 :
1907 : cleanup:
1908 31 : DEBUG_SQLITE_ERROR(rc, "local_update_move_meta", db);
1909 31 : sqlite3_reset(vm);
1910 31 : return rc;
1911 31 : }
1912 :
1913 : // MARK: - Payload Encode / Decode -
1914 :
1915 85 : bool cloudsync_buffer_free (cloudsync_network_payload *payload) {
1916 85 : if (payload) {
1917 85 : if (payload->buffer) cloudsync_memory_free(payload->buffer);
1918 85 : memset(payload, 0, sizeof(cloudsync_network_payload));
1919 85 : }
1920 :
1921 85 : return false;
1922 : }
1923 :
1924 4255 : bool cloudsync_buffer_check (cloudsync_network_payload *payload, size_t needed) {
1925 : // alloc/resize buffer
1926 4255 : if (payload->bused + needed > payload->balloc) {
1927 85 : if (needed < CLOUDSYNC_PAYLOAD_MINBUF_SIZE) needed = CLOUDSYNC_PAYLOAD_MINBUF_SIZE;
1928 85 : size_t balloc = payload->balloc + needed;
1929 :
1930 85 : char *buffer = cloudsync_memory_realloc(payload->buffer, balloc);
1931 85 : if (!buffer) return cloudsync_buffer_free(payload);
1932 :
1933 85 : payload->buffer = buffer;
1934 85 : payload->balloc = balloc;
1935 85 : if (payload->nrows == 0) payload->bused = sizeof(cloudsync_network_header);
1936 85 : }
1937 :
1938 4255 : return true;
1939 4255 : }
1940 :
1941 85 : void cloudsync_network_header_init (cloudsync_network_header *header, uint32_t expanded_size, uint16_t ncols, uint32_t nrows, uint64_t hash) {
1942 85 : memset(header, 0, sizeof(cloudsync_network_header));
1943 : assert(sizeof(cloudsync_network_header)==32);
1944 :
1945 : int major, minor, patch;
1946 85 : sscanf(CLOUDSYNC_VERSION, "%d.%d.%d", &major, &minor, &patch);
1947 :
1948 85 : header->signature = htonl(CLOUDSYNC_PAYLOAD_SIGNATURE);
1949 85 : header->version = CLOUDSYNC_PAYLOAD_VERSION;
1950 85 : header->libversion[0] = major;
1951 85 : header->libversion[1] = minor;
1952 85 : header->libversion[2] = patch;
1953 85 : header->expanded_size = htonl(expanded_size);
1954 85 : header->ncols = htons(ncols);
1955 85 : header->nrows = htonl(nrows);
1956 85 : header->schema_hash = htonll(hash);
1957 85 : }
1958 :
1959 4255 : void cloudsync_payload_encode_step (sqlite3_context *context, int argc, sqlite3_value **argv) {
1960 : DEBUG_FUNCTION("cloudsync_payload_encode_step");
1961 : // debug_values(argc, argv);
1962 :
1963 : // allocate/get the session context
1964 4255 : cloudsync_network_payload *payload = (cloudsync_network_payload *)sqlite3_aggregate_context(context, sizeof(cloudsync_network_payload));
1965 4255 : if (!payload) return;
1966 :
1967 : // check if the step function is called for the first time
1968 4255 : if (payload->nrows == 0) payload->ncols = argc;
1969 :
1970 4255 : size_t breq = pk_encode_size(argv, argc, 0);
1971 4255 : if (cloudsync_buffer_check(payload, breq) == false) return;
1972 :
1973 4255 : char *buffer = payload->buffer + payload->bused;
1974 4255 : char *ptr = pk_encode(argv, argc, buffer, false, NULL);
1975 4255 : assert(buffer == ptr);
1976 :
1977 : // update buffer
1978 4255 : payload->bused += breq;
1979 :
1980 : // increment row counter
1981 4255 : ++payload->nrows;
1982 4255 : }
1983 :
1984 93 : void cloudsync_payload_encode_final (sqlite3_context *context) {
1985 : DEBUG_FUNCTION("cloudsync_payload_encode_final");
1986 :
1987 : // get the session context
1988 93 : cloudsync_network_payload *payload = (cloudsync_network_payload *)sqlite3_aggregate_context(context, sizeof(cloudsync_network_payload));
1989 93 : if (!payload) return;
1990 :
1991 93 : if (payload->nrows == 0) {
1992 8 : sqlite3_result_null(context);
1993 8 : return;
1994 : }
1995 :
1996 : // encode payload
1997 85 : int header_size = (int)sizeof(cloudsync_network_header);
1998 85 : int real_buffer_size = (int)(payload->bused - header_size);
1999 85 : int zbound = LZ4_compressBound(real_buffer_size);
2000 85 : char *buffer = cloudsync_memory_alloc(zbound + header_size);
2001 85 : if (!buffer) {
2002 0 : cloudsync_buffer_free(payload);
2003 0 : sqlite3_result_error_code(context, SQLITE_NOMEM);
2004 0 : return;
2005 : }
2006 :
2007 : // adjust buffer to compress to skip the reserved header
2008 85 : char *src_buffer = payload->buffer + sizeof(cloudsync_network_header);
2009 85 : int zused = LZ4_compress_default(src_buffer, buffer+header_size, real_buffer_size, zbound);
2010 85 : bool use_uncompressed_buffer = (!zused || zused > real_buffer_size);
2011 85 : CHECK_FORCE_UNCOMPRESSED_BUFFER();
2012 :
2013 : // setup payload network header
2014 85 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2015 : cloudsync_network_header header;
2016 85 : cloudsync_network_header_init(&header, (use_uncompressed_buffer) ? 0 : real_buffer_size, payload->ncols, (uint32_t)payload->nrows, data->schema_hash);
2017 :
2018 : // if compression fails or if compressed size is bigger than original buffer, then use the uncompressed buffer
2019 85 : if (use_uncompressed_buffer) {
2020 2 : cloudsync_memory_free(buffer);
2021 2 : buffer = payload->buffer;
2022 2 : zused = real_buffer_size;
2023 2 : }
2024 :
2025 : // copy header and data to SQLite BLOB
2026 85 : memcpy(buffer, &header, sizeof(cloudsync_network_header));
2027 85 : int blob_size = zused+sizeof(cloudsync_network_header);
2028 85 : sqlite3_result_blob(context, buffer, blob_size, SQLITE_TRANSIENT);
2029 :
2030 : // cleanup memory
2031 85 : cloudsync_buffer_free(payload);
2032 85 : if (!use_uncompressed_buffer) cloudsync_memory_free(buffer);
2033 93 : }
2034 :
2035 83 : cloudsync_payload_apply_callback_t cloudsync_get_payload_apply_callback(sqlite3 *db) {
2036 83 : return (sqlite3_libversion_number() >= 3044000) ? sqlite3_get_clientdata(db, CLOUDSYNC_PAYLOAD_APPLY_CALLBACK_KEY) : NULL;
2037 : }
2038 :
2039 46 : void cloudsync_set_payload_apply_callback(sqlite3 *db, cloudsync_payload_apply_callback_t callback) {
2040 46 : if (sqlite3_libversion_number() >= 3044000) {
2041 46 : sqlite3_set_clientdata(db, CLOUDSYNC_PAYLOAD_APPLY_CALLBACK_KEY, (void*)callback, NULL);
2042 46 : }
2043 46 : }
2044 :
2045 37647 : int cloudsync_pk_decode_bind_callback (void *xdata, int index, int type, int64_t ival, double dval, char *pval) {
2046 37647 : cloudsync_pk_decode_bind_context *decode_context = (cloudsync_pk_decode_bind_context*)xdata;
2047 37647 : int rc = pk_decode_bind_callback(decode_context->vm, index, type, ival, dval, pval);
2048 :
2049 37647 : if (rc == SQLITE_OK) {
2050 : // the dbversion index is smaller than seq index, so it is processed first
2051 : // when processing the dbversion column: save the value to the tmp_dbversion field
2052 : // when processing the seq column: update the dbversion and seq fields only if the current dbversion is greater than the last max value
2053 37647 : switch (index) {
2054 : case CLOUDSYNC_PK_INDEX_TBL:
2055 4183 : if (type == SQLITE_TEXT) {
2056 4183 : decode_context->tbl = pval;
2057 4183 : decode_context->tbl_len = ival;
2058 4183 : }
2059 4183 : break;
2060 : case CLOUDSYNC_PK_INDEX_PK:
2061 4183 : if (type == SQLITE_BLOB) {
2062 4183 : decode_context->pk = pval;
2063 4183 : decode_context->pk_len = ival;
2064 4183 : }
2065 4183 : break;
2066 : case CLOUDSYNC_PK_INDEX_COLNAME:
2067 4183 : if (type == SQLITE_TEXT) {
2068 4183 : decode_context->col_name = pval;
2069 4183 : decode_context->col_name_len = ival;
2070 4183 : }
2071 4183 : break;
2072 : case CLOUDSYNC_PK_INDEX_COLVERSION:
2073 4183 : if (type == SQLITE_INTEGER) decode_context->col_version = ival;
2074 4183 : break;
2075 : case CLOUDSYNC_PK_INDEX_DBVERSION:
2076 4183 : if (type == SQLITE_INTEGER) decode_context->db_version = ival;
2077 4183 : break;
2078 : case CLOUDSYNC_PK_INDEX_SITEID:
2079 4183 : if (type == SQLITE_BLOB) {
2080 4183 : decode_context->site_id = pval;
2081 4183 : decode_context->site_id_len = ival;
2082 4183 : }
2083 4183 : break;
2084 : case CLOUDSYNC_PK_INDEX_CL:
2085 4183 : if (type == SQLITE_INTEGER) decode_context->cl = ival;
2086 4183 : break;
2087 : case CLOUDSYNC_PK_INDEX_SEQ:
2088 4183 : if (type == SQLITE_INTEGER) decode_context->seq = ival;
2089 4183 : break;
2090 : }
2091 37647 : }
2092 :
2093 37647 : return rc;
2094 : }
2095 :
2096 : // #ifndef CLOUDSYNC_OMIT_RLS_VALIDATION
2097 :
2098 85 : int cloudsync_payload_apply (sqlite3_context *context, const char *payload, int blen) {
2099 : // decode header
2100 : cloudsync_network_header header;
2101 85 : memcpy(&header, payload, sizeof(cloudsync_network_header));
2102 :
2103 85 : header.signature = ntohl(header.signature);
2104 85 : header.expanded_size = ntohl(header.expanded_size);
2105 85 : header.ncols = ntohs(header.ncols);
2106 85 : header.nrows = ntohl(header.nrows);
2107 85 : header.schema_hash = ntohll(header.schema_hash);
2108 :
2109 85 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2110 85 : if (!data || header.schema_hash != data->schema_hash) {
2111 3 : sqlite3 *db = sqlite3_context_db_handle(context);
2112 3 : if (!dbutils_check_schema_hash(db, header.schema_hash)) {
2113 2 : dbutils_context_result_error(context, "Cannot apply the received payload because the schema hash is unknown %llu.", header.schema_hash);
2114 2 : sqlite3_result_error_code(context, SQLITE_MISMATCH);
2115 2 : return -1;
2116 : }
2117 1 : }
2118 :
2119 : // sanity check header
2120 83 : if ((header.signature != CLOUDSYNC_PAYLOAD_SIGNATURE) || (header.ncols == 0)) {
2121 0 : dbutils_context_result_error(context, "Error on cloudsync_payload_apply: invalid signature or column size.");
2122 0 : sqlite3_result_error_code(context, SQLITE_MISUSE);
2123 0 : return -1;
2124 : }
2125 :
2126 83 : const char *buffer = payload + sizeof(cloudsync_network_header);
2127 83 : blen -= sizeof(cloudsync_network_header);
2128 :
2129 : // check if payload is compressed
2130 83 : char *clone = NULL;
2131 83 : if (header.expanded_size != 0) {
2132 81 : clone = (char *)cloudsync_memory_alloc(header.expanded_size);
2133 81 : if (!clone) {sqlite3_result_error_code(context, SQLITE_NOMEM); return -1;}
2134 :
2135 81 : uint32_t rc = LZ4_decompress_safe(buffer, clone, blen, header.expanded_size);
2136 81 : if (rc <= 0 || rc != header.expanded_size) {
2137 0 : dbutils_context_result_error(context, "Error on cloudsync_payload_apply: unable to decompress BLOB (%d).", rc);
2138 0 : sqlite3_result_error_code(context, SQLITE_MISUSE);
2139 0 : return -1;
2140 : }
2141 :
2142 81 : buffer = (const char *)clone;
2143 81 : }
2144 :
2145 : // precompile the insert statement
2146 83 : sqlite3_stmt *vm = NULL;
2147 83 : sqlite3 *db = sqlite3_context_db_handle(context);
2148 83 : const char *sql = "INSERT INTO cloudsync_changes(tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq) VALUES (?,?,?,?,?,?,?,?,?);";
2149 83 : int rc = sqlite3_prepare(db, sql, -1, &vm, NULL);
2150 83 : if (rc != SQLITE_OK) {
2151 0 : dbutils_context_result_error(context, "Error on cloudsync_payload_apply: error while compiling SQL statement (%s).", sqlite3_errmsg(db));
2152 0 : if (clone) cloudsync_memory_free(clone);
2153 0 : return -1;
2154 : }
2155 :
2156 : // process buffer, one row at a time
2157 83 : uint16_t ncols = header.ncols;
2158 83 : uint32_t nrows = header.nrows;
2159 83 : int dbversion = dbutils_settings_get_int_value(db, CLOUDSYNC_KEY_CHECK_DBVERSION);
2160 83 : int seq = dbutils_settings_get_int_value(db, CLOUDSYNC_KEY_CHECK_SEQ);
2161 83 : cloudsync_pk_decode_bind_context decoded_context = {.vm = vm};
2162 83 : void *payload_apply_xdata = NULL;
2163 83 : cloudsync_payload_apply_callback_t payload_apply_callback = cloudsync_get_payload_apply_callback(db);
2164 :
2165 4266 : for (uint32_t i=0; i<nrows; ++i) {
2166 4183 : size_t seek = 0;
2167 4183 : pk_decode((char *)buffer, blen, ncols, &seek, cloudsync_pk_decode_bind_callback, &decoded_context);
2168 : // n is the pk_decode return value, I don't think I should assert here because in any case the next sqlite3_step would fail
2169 : // assert(n == ncols);
2170 :
2171 4183 : bool approved = true;
2172 4183 : if (payload_apply_callback) approved = payload_apply_callback(&payload_apply_xdata, &decoded_context, db, data, CLOUDSYNC_PAYLOAD_APPLY_WILL_APPLY, SQLITE_OK);
2173 :
2174 4183 : if (approved) {
2175 4183 : rc = sqlite3_step(vm);
2176 4183 : if (rc != SQLITE_DONE) {
2177 : // don't "break;", the error can be due to a RLS policy.
2178 : // in case of error we try to apply the following changes
2179 0 : printf("cloudsync_payload_apply error on db_version %lld/%lld: (%d) %s\n", decoded_context.db_version, decoded_context.seq, rc, sqlite3_errmsg(db));
2180 0 : }
2181 4183 : }
2182 :
2183 4183 : if (payload_apply_callback) payload_apply_callback(&payload_apply_xdata, &decoded_context, db, data, CLOUDSYNC_PAYLOAD_APPLY_DID_APPLY, rc);
2184 :
2185 4183 : buffer += seek;
2186 4183 : blen -= seek;
2187 4183 : stmt_reset(vm);
2188 4183 : }
2189 :
2190 83 : char *lasterr = NULL;
2191 83 : if (rc != SQLITE_OK && rc != SQLITE_DONE) lasterr = cloudsync_string_dup(sqlite3_errmsg(db), false);
2192 :
2193 83 : if (payload_apply_callback) payload_apply_callback(&payload_apply_xdata, &decoded_context, db, data, CLOUDSYNC_PAYLOAD_APPLY_CLEANUP, rc);
2194 :
2195 83 : if (rc == SQLITE_DONE) rc = SQLITE_OK;
2196 83 : if (rc == SQLITE_OK) {
2197 : char buf[256];
2198 83 : if (decoded_context.db_version >= dbversion) {
2199 82 : snprintf(buf, sizeof(buf), "%lld", decoded_context.db_version);
2200 82 : dbutils_settings_set_key_value(db, context, CLOUDSYNC_KEY_CHECK_DBVERSION, buf);
2201 :
2202 82 : if (decoded_context.seq != seq) {
2203 40 : snprintf(buf, sizeof(buf), "%lld", decoded_context.seq);
2204 40 : dbutils_settings_set_key_value(db, context, CLOUDSYNC_KEY_CHECK_SEQ, buf);
2205 40 : }
2206 82 : }
2207 83 : }
2208 :
2209 : // cleanup vm
2210 83 : if (vm) sqlite3_finalize(vm);
2211 :
2212 : // cleanup memory
2213 83 : if (clone) cloudsync_memory_free(clone);
2214 :
2215 83 : if (rc != SQLITE_OK) {
2216 0 : sqlite3_result_error(context, lasterr, -1);
2217 0 : sqlite3_result_error_code(context, SQLITE_MISUSE);
2218 0 : cloudsync_memory_free(lasterr);
2219 0 : return -1;
2220 : }
2221 :
2222 : // return the number of processed rows
2223 83 : sqlite3_result_int(context, nrows);
2224 83 : return nrows;
2225 85 : }
2226 :
2227 85 : void cloudsync_payload_decode (sqlite3_context *context, int argc, sqlite3_value **argv) {
2228 : DEBUG_FUNCTION("cloudsync_payload_decode");
2229 : //debug_values(argc, argv);
2230 :
2231 : // sanity check payload type
2232 85 : if (sqlite3_value_type(argv[0]) != SQLITE_BLOB) {
2233 0 : dbutils_context_result_error(context, "Error on cloudsync_payload_decode: value must be a BLOB.");
2234 0 : sqlite3_result_error_code(context, SQLITE_MISUSE);
2235 0 : return;
2236 : }
2237 :
2238 : // sanity check payload size
2239 85 : int blen = sqlite3_value_bytes(argv[0]);
2240 85 : if (blen < (int)sizeof(cloudsync_network_header)) {
2241 0 : dbutils_context_result_error(context, "Error on cloudsync_payload_decode: invalid input size.");
2242 0 : sqlite3_result_error_code(context, SQLITE_MISUSE);
2243 0 : return;
2244 : }
2245 :
2246 : // obtain payload
2247 85 : const char *payload = (const char *)sqlite3_value_blob(argv[0]);
2248 :
2249 : // apply changes
2250 85 : cloudsync_payload_apply(context, payload, blen);
2251 85 : }
2252 :
2253 : // MARK: - Public -
2254 :
2255 3 : void cloudsync_version (sqlite3_context *context, int argc, sqlite3_value **argv) {
2256 : DEBUG_FUNCTION("cloudsync_version");
2257 3 : UNUSED_PARAMETER(argc);
2258 3 : UNUSED_PARAMETER(argv);
2259 3 : sqlite3_result_text(context, CLOUDSYNC_VERSION, -1, SQLITE_STATIC);
2260 3 : }
2261 :
2262 16 : void cloudsync_siteid (sqlite3_context *context, int argc, sqlite3_value **argv) {
2263 : DEBUG_FUNCTION("cloudsync_siteid");
2264 16 : UNUSED_PARAMETER(argc);
2265 16 : UNUSED_PARAMETER(argv);
2266 :
2267 16 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2268 16 : sqlite3_result_blob(context, data->site_id, UUID_LEN, SQLITE_STATIC);
2269 16 : }
2270 :
2271 3 : void cloudsync_db_version (sqlite3_context *context, int argc, sqlite3_value **argv) {
2272 : DEBUG_FUNCTION("cloudsync_db_version");
2273 3 : UNUSED_PARAMETER(argc);
2274 3 : UNUSED_PARAMETER(argv);
2275 :
2276 : // retrieve context
2277 3 : sqlite3 *db = sqlite3_context_db_handle(context);
2278 3 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2279 :
2280 3 : int rc = db_version_check_uptodate(db, data);
2281 3 : if (rc != SQLITE_OK) {
2282 0 : dbutils_context_result_error(context, "Unable to retrieve db_version (%s).", sqlite3_errmsg(db));
2283 0 : return;
2284 : }
2285 :
2286 3 : sqlite3_result_int64(context, data->db_version);
2287 3 : }
2288 :
2289 3452 : void cloudsync_db_version_next (sqlite3_context *context, int argc, sqlite3_value **argv) {
2290 : DEBUG_FUNCTION("cloudsync_db_version_next");
2291 :
2292 : // retrieve context
2293 3452 : sqlite3 *db = sqlite3_context_db_handle(context);
2294 3452 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2295 :
2296 3452 : sqlite3_int64 merging_version = (argc == 1) ? sqlite3_value_int64(argv[0]) : CLOUDSYNC_VALUE_NOTSET;
2297 3452 : sqlite3_int64 value = db_version_next(db, data, merging_version);
2298 3452 : if (value == -1) {
2299 0 : dbutils_context_result_error(context, "Unable to retrieve next_db_version (%s).", sqlite3_errmsg(db));
2300 0 : return;
2301 : }
2302 :
2303 3452 : sqlite3_result_int64(context, value);
2304 3452 : }
2305 :
2306 23 : void cloudsync_seq (sqlite3_context *context, int argc, sqlite3_value **argv) {
2307 : DEBUG_FUNCTION("cloudsync_seq");
2308 :
2309 : // retrieve context
2310 23 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2311 23 : sqlite3_result_int(context, BUMP_SEQ(data));
2312 23 : }
2313 :
2314 1 : void cloudsync_uuid (sqlite3_context *context, int argc, sqlite3_value **argv) {
2315 : DEBUG_FUNCTION("cloudsync_uuid");
2316 :
2317 : char value[UUID_STR_MAXLEN];
2318 1 : char *uuid = cloudsync_uuid_v7_string(value, true);
2319 1 : sqlite3_result_text(context, uuid, -1, SQLITE_TRANSIENT);
2320 1 : }
2321 :
2322 : // MARK: -
2323 :
2324 1 : void cloudsync_set (sqlite3_context *context, int argc, sqlite3_value **argv) {
2325 : DEBUG_FUNCTION("cloudsync_set");
2326 :
2327 : // sanity check parameters
2328 1 : const char *key = (const char *)sqlite3_value_text(argv[0]);
2329 1 : const char *value = (const char *)sqlite3_value_text(argv[1]);
2330 :
2331 : // silently fails
2332 1 : if (key == NULL) return;
2333 :
2334 1 : sqlite3 *db = sqlite3_context_db_handle(context);
2335 1 : dbutils_settings_set_key_value(db, context, key, value);
2336 1 : }
2337 :
2338 1 : void cloudsync_set_column (sqlite3_context *context, int argc, sqlite3_value **argv) {
2339 : DEBUG_FUNCTION("cloudsync_set_column");
2340 :
2341 1 : const char *tbl = (const char *)sqlite3_value_text(argv[0]);
2342 1 : const char *col = (const char *)sqlite3_value_text(argv[1]);
2343 1 : const char *key = (const char *)sqlite3_value_text(argv[2]);
2344 1 : const char *value = (const char *)sqlite3_value_text(argv[3]);
2345 1 : dbutils_table_settings_set_key_value(NULL, context, tbl, col, key, value);
2346 1 : }
2347 :
2348 1 : void cloudsync_set_table (sqlite3_context *context, int argc, sqlite3_value **argv) {
2349 : DEBUG_FUNCTION("cloudsync_set_table");
2350 :
2351 1 : const char *tbl = (const char *)sqlite3_value_text(argv[0]);
2352 1 : const char *key = (const char *)sqlite3_value_text(argv[1]);
2353 1 : const char *value = (const char *)sqlite3_value_text(argv[2]);
2354 1 : dbutils_table_settings_set_key_value(NULL, context, tbl, "*", key, value);
2355 1 : }
2356 :
2357 929 : void cloudsync_is_sync (sqlite3_context *context, int argc, sqlite3_value **argv) {
2358 : DEBUG_FUNCTION("cloudsync_is_sync");
2359 :
2360 929 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2361 929 : if (data->insync) {
2362 596 : sqlite3_result_int(context, 1);
2363 596 : return;
2364 : }
2365 :
2366 333 : const char *table_name = (const char *)sqlite3_value_text(argv[0]);
2367 333 : cloudsync_table_context *table = table_lookup(data, table_name);
2368 333 : sqlite3_result_int(context, (table) ? (table->enabled == 0) : 0);
2369 929 : }
2370 :
2371 9425 : void cloudsync_col_value (sqlite3_context *context, int argc, sqlite3_value **argv) {
2372 : // DEBUG_FUNCTION("cloudsync_col_value");
2373 :
2374 : // argv[0] -> table name
2375 : // argv[1] -> column name
2376 : // argv[2] -> encoded pk
2377 :
2378 : // lookup table
2379 9425 : const char *table_name = (const char *)sqlite3_value_text(argv[0]);
2380 9425 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2381 9425 : cloudsync_table_context *table = table_lookup(data, table_name);
2382 9425 : if (!table) {
2383 0 : dbutils_context_result_error(context, "Unable to retrieve table name %s in clousdsync_colvalue.", table_name);
2384 0 : return;
2385 : }
2386 :
2387 : // retrieve column name
2388 9425 : const char *col_name = (const char *)sqlite3_value_text(argv[1]);
2389 :
2390 : // check for special tombstone value
2391 9425 : if (strcmp(col_name, CLOUDSYNC_TOMBSTONE_VALUE) == 0) {
2392 940 : sqlite3_result_null(context);
2393 940 : return;
2394 : }
2395 :
2396 : // extract the right col_value vm associated to the column name
2397 8485 : sqlite3_stmt *vm = table_column_lookup(table, col_name, false, NULL);
2398 8485 : if (!vm) {
2399 0 : sqlite3_result_error(context, "Unable to retrieve column value precompiled statement in clousdsync_colvalue.", -1);
2400 0 : return;
2401 : }
2402 :
2403 : // bind primary key values
2404 8485 : int rc = pk_decode_prikey((char *)sqlite3_value_blob(argv[2]), (size_t)sqlite3_value_bytes(argv[2]), pk_decode_bind_callback, (void *)vm);
2405 8485 : if (rc < 0) goto cleanup;
2406 :
2407 : // execute vm
2408 8485 : rc = sqlite3_step(vm);
2409 16970 : if (rc == SQLITE_DONE) {
2410 0 : rc = SQLITE_OK;
2411 0 : sqlite3_result_text(context, CLOUDSYNC_RLS_RESTRICTED_VALUE, -1, SQLITE_STATIC);
2412 8485 : } else if (rc == SQLITE_ROW) {
2413 : // store value result
2414 8485 : rc = SQLITE_OK;
2415 8485 : sqlite3_result_value(context, sqlite3_column_value(vm, 0));
2416 8485 : }
2417 :
2418 : cleanup:
2419 8485 : if (rc != SQLITE_OK) {
2420 0 : sqlite3 *db = sqlite3_context_db_handle(context);
2421 0 : sqlite3_result_error(context, sqlite3_errmsg(db), -1);
2422 0 : }
2423 8485 : sqlite3_reset(vm);
2424 9425 : }
2425 :
2426 10213 : void cloudsync_pk_encode (sqlite3_context *context, int argc, sqlite3_value **argv) {
2427 10213 : size_t bsize = 0;
2428 10213 : char *buffer = pk_encode_prikey(argv, argc, NULL, &bsize);
2429 10213 : if (!buffer) {
2430 0 : sqlite3_result_null(context);
2431 0 : return;
2432 : }
2433 10213 : sqlite3_result_blob(context, (const void *)buffer, (int)bsize, SQLITE_TRANSIENT);
2434 10213 : cloudsync_memory_free(buffer);
2435 10213 : }
2436 :
2437 6208 : int cloudsync_pk_decode_set_result_callback (void *xdata, int index, int type, int64_t ival, double dval, char *pval) {
2438 6208 : cloudsync_pk_decode_context *decode_context = (cloudsync_pk_decode_context *)xdata;
2439 : // decode_context->index is 1 based
2440 : // index is 0 based
2441 6208 : if (decode_context->index != index+1) return SQLITE_OK;
2442 :
2443 3144 : int rc = 0;
2444 3144 : sqlite3_context *context = decode_context->context;
2445 3144 : switch (type) {
2446 : case SQLITE_INTEGER:
2447 0 : sqlite3_result_int64(context, ival);
2448 0 : break;
2449 :
2450 : case SQLITE_FLOAT:
2451 0 : sqlite3_result_double(context, dval);
2452 0 : break;
2453 :
2454 : case SQLITE_NULL:
2455 0 : sqlite3_result_null(context);
2456 0 : break;
2457 :
2458 : case SQLITE_TEXT:
2459 3144 : sqlite3_result_text(context, pval, (int)ival, SQLITE_TRANSIENT);
2460 3144 : break;
2461 :
2462 : case SQLITE_BLOB:
2463 0 : sqlite3_result_blob(context, pval, (int)ival, SQLITE_TRANSIENT);
2464 0 : break;
2465 : }
2466 :
2467 3144 : return rc;
2468 6208 : }
2469 :
2470 :
2471 3144 : void cloudsync_pk_decode (sqlite3_context *context, int argc, sqlite3_value **argv) {
2472 3144 : const char *pk = (const char *)sqlite3_value_text(argv[0]);
2473 3144 : int i = sqlite3_value_int(argv[1]);
2474 :
2475 3144 : cloudsync_pk_decode_context xdata = {.context = context, .index = i};
2476 3144 : pk_decode_prikey((char *)pk, strlen(pk), cloudsync_pk_decode_set_result_callback, &xdata);
2477 3144 : }
2478 :
2479 : // MARK: -
2480 :
2481 299 : void cloudsync_insert (sqlite3_context *context, int argc, sqlite3_value **argv) {
2482 : DEBUG_FUNCTION("cloudsync_insert %s", sqlite3_value_text(argv[0]));
2483 : // debug_values(argc-1, &argv[1]);
2484 :
2485 : // argv[0] is table name
2486 : // argv[1]..[N] is primary key(s)
2487 :
2488 : // table_cloudsync
2489 : // pk -> encode(argc-1, &argv[1])
2490 : // col_name -> name
2491 : // col_version -> 0/1 +1
2492 : // db_version -> check
2493 : // site_id 0
2494 : // seq -> sqlite_master
2495 :
2496 : // retrieve context
2497 299 : sqlite3 *db = sqlite3_context_db_handle(context);
2498 299 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2499 :
2500 : // lookup table
2501 299 : const char *table_name = (const char *)sqlite3_value_text(argv[0]);
2502 299 : cloudsync_table_context *table = table_lookup(data, table_name);
2503 299 : if (!table) {
2504 0 : dbutils_context_result_error(context, "Unable to retrieve table name %s in cloudsync_insert.", table_name);
2505 0 : return;
2506 : }
2507 :
2508 : // encode the primary key values into a buffer
2509 : char buffer[1024];
2510 299 : size_t pklen = sizeof(buffer);
2511 299 : char *pk = pk_encode_prikey(&argv[1], table->npks, buffer, &pklen);
2512 299 : if (!pk) {
2513 0 : sqlite3_result_error(context, "Not enough memory to encode the primary key(s).", -1);
2514 0 : return;
2515 : }
2516 :
2517 : // compute the next database version for tracking changes
2518 299 : sqlite3_int64 db_version = db_version_next(db, data, CLOUDSYNC_VALUE_NOTSET);
2519 :
2520 : // check if a row with the same primary key already exists
2521 : // if so, this means the row might have been previously deleted (sentinel)
2522 299 : bool pk_exists = (bool)stmt_count(table->meta_pkexists_stmt, pk, pklen, SQLITE_BLOB);
2523 299 : int rc = SQLITE_OK;
2524 :
2525 299 : if (table->ncols == 0) {
2526 : // if there are no columns other than primary keys, insert a sentinel record
2527 87 : rc = local_mark_insert_sentinel_meta(db, table, pk, pklen, db_version, BUMP_SEQ(data));
2528 87 : if (rc != SQLITE_OK) goto cleanup;
2529 299 : } else if (pk_exists){
2530 : // if a row with the same primary key already exists, update the sentinel record
2531 1 : rc = local_update_sentinel(db, table, pk, pklen, db_version, BUMP_SEQ(data));
2532 1 : if (rc != SQLITE_OK) goto cleanup;
2533 1 : }
2534 :
2535 : // process each non-primary key column for insert or update
2536 915 : for (int i=0; i<table->ncols; ++i) {
2537 : // mark the column as inserted or updated in the metadata
2538 616 : rc = local_mark_insert_or_update_meta(db, table, pk, pklen, table->col_name[i], db_version, BUMP_SEQ(data));
2539 616 : if (rc != SQLITE_OK) goto cleanup;
2540 915 : }
2541 :
2542 : cleanup:
2543 299 : if (rc != SQLITE_OK) sqlite3_result_error(context, sqlite3_errmsg(db), -1);
2544 : // free memory if the primary key was dynamically allocated
2545 299 : if (pk != buffer) cloudsync_memory_free(pk);
2546 299 : }
2547 :
2548 52 : void cloudsync_update (sqlite3_context *context, int argc, sqlite3_value **argv) {
2549 : DEBUG_FUNCTION("cloudsync_update %s", sqlite3_value_text(argv[0]));
2550 : // debug_values(argc-1, &argv[1]);
2551 :
2552 : // arguments are:
2553 : // [0] table name
2554 : // [1..table->npks] NEW.prikeys
2555 : // [1+table->npks ..] OLD.prikeys
2556 : // then NEW.value,OLD.value
2557 :
2558 : // retrieve context
2559 52 : sqlite3 *db = sqlite3_context_db_handle(context);
2560 52 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2561 :
2562 : // lookup table
2563 52 : const char *table_name = (const char *)sqlite3_value_text(argv[0]);
2564 52 : cloudsync_table_context *table = table_lookup(data, table_name);
2565 52 : if (!table) {
2566 0 : dbutils_context_result_error(context, "Unable to retrieve table name %s in cloudsync_update.", table_name);
2567 0 : return;
2568 : }
2569 :
2570 : // compute the next database version for tracking changes
2571 52 : sqlite3_int64 db_version = db_version_next(db, data, CLOUDSYNC_VALUE_NOTSET);
2572 52 : int rc = SQLITE_OK;
2573 :
2574 : // check if the primary key(s) have changed by comparing the NEW and OLD primary key values
2575 52 : bool prikey_changed = false;
2576 94 : for (int i=1; i<=table->npks; ++i) {
2577 73 : if (dbutils_value_compare(argv[i], argv[i+table->npks]) != 0) {
2578 31 : prikey_changed = true;
2579 31 : break;
2580 : }
2581 42 : }
2582 :
2583 : // encode the NEW primary key values into a buffer (used later for indexing)
2584 : char buffer[1024];
2585 : char buffer2[1024];
2586 52 : size_t pklen = sizeof(buffer);
2587 52 : size_t oldpklen = sizeof(buffer2);
2588 52 : char *oldpk = NULL;
2589 :
2590 52 : char *pk = pk_encode_prikey(&argv[1], table->npks, buffer, &pklen);
2591 52 : if (!pk) {
2592 0 : sqlite3_result_error(context, "Not enough memory to encode the primary key(s).", -1);
2593 0 : return;
2594 : }
2595 :
2596 52 : if (prikey_changed) {
2597 : // if the primary key has changed, we need to handle the row differently:
2598 : // 1. mark the old row (OLD primary key) as deleted
2599 : // 2. create a new row (NEW primary key)
2600 :
2601 : // encode the OLD primary key into a buffer
2602 31 : oldpk = pk_encode_prikey(&argv[1+table->npks], table->npks, buffer2, &oldpklen);
2603 31 : if (!oldpk) {
2604 0 : if (pk != buffer) cloudsync_memory_free(pk);
2605 0 : sqlite3_result_error(context, "Not enough memory to encode the primary key(s).", -1);
2606 0 : return;
2607 : }
2608 :
2609 : // mark the rows with the old primary key as deleted in the metadata (old row handling)
2610 31 : rc = local_mark_delete_meta(db, table, oldpk, oldpklen, db_version, BUMP_SEQ(data));
2611 31 : if (rc != SQLITE_OK) goto cleanup;
2612 :
2613 : // move non-sentinel metadata entries from OLD primary key to NEW primary key
2614 : // handles the case where some metadata is retained across primary key change
2615 : // see https://github.com/sqliteai/sqlite-sync/blob/main/docs/PriKey.md for more details
2616 31 : rc = local_update_move_meta(db, table, pk, pklen, oldpk, oldpklen, db_version);
2617 31 : if (rc != SQLITE_OK) goto cleanup;
2618 :
2619 : // mark a new sentinel row with the new primary key in the metadata
2620 31 : rc = local_mark_insert_sentinel_meta(db, table, pk, pklen, db_version, BUMP_SEQ(data));
2621 31 : if (rc != SQLITE_OK) goto cleanup;
2622 :
2623 : // free memory if the OLD primary key was dynamically allocated
2624 31 : if (oldpk != buffer2) cloudsync_memory_free(oldpk);
2625 31 : oldpk = NULL;
2626 31 : }
2627 :
2628 : // compare NEW and OLD values (excluding primary keys) to handle column updates
2629 : // starting index for column values
2630 52 : int index = 1 + (table->npks * 2);
2631 179 : for (int i=0; i<table->ncols; i++) {
2632 127 : if (dbutils_value_compare(argv[i+index], argv[i+index+1]) != 0) {
2633 : // if a column value has changed, mark it as updated in the metadata
2634 : // columns are in cid order
2635 41 : rc = local_mark_insert_or_update_meta(db, table, pk, pklen, table->col_name[i], db_version, BUMP_SEQ(data));
2636 41 : if (rc != SQLITE_OK) goto cleanup;
2637 41 : }
2638 127 : ++index;
2639 179 : }
2640 :
2641 : cleanup:
2642 52 : if (rc != SQLITE_OK) sqlite3_result_error(context, sqlite3_errmsg(db), -1);
2643 52 : if (pk != buffer) cloudsync_memory_free(pk);
2644 52 : if (oldpk && (oldpk != buffer2)) cloudsync_memory_free(oldpk);
2645 52 : }
2646 :
2647 12 : void cloudsync_delete (sqlite3_context *context, int argc, sqlite3_value **argv) {
2648 : DEBUG_FUNCTION("cloudsync_delete %s", sqlite3_value_text(argv[0]));
2649 : // debug_values(argc-1, &argv[1]);
2650 :
2651 : // retrieve context
2652 12 : sqlite3 *db = sqlite3_context_db_handle(context);
2653 12 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2654 :
2655 : // lookup table
2656 12 : const char *table_name = (const char *)sqlite3_value_text(argv[0]);
2657 12 : cloudsync_table_context *table = table_lookup(data, table_name);
2658 12 : if (!table) {
2659 0 : dbutils_context_result_error(context, "Unable to retrieve table name %s in cloudsync_delete.", table_name);
2660 0 : return;
2661 : }
2662 :
2663 : // compute the next database version for tracking changes
2664 12 : sqlite3_int64 db_version = db_version_next(db, data, CLOUDSYNC_VALUE_NOTSET);
2665 12 : int rc = SQLITE_OK;
2666 :
2667 : // encode the primary key values into a buffer
2668 : char buffer[1024];
2669 12 : size_t pklen = sizeof(buffer);
2670 12 : char *pk = pk_encode_prikey(&argv[1], table->npks, buffer, &pklen);
2671 12 : if (!pk) {
2672 0 : sqlite3_result_error(context, "Not enough memory to encode the primary key(s).", -1);
2673 0 : return;
2674 : }
2675 :
2676 : // mark the row as deleted by inserting a delete sentinel into the metadata
2677 12 : rc = local_mark_delete_meta(db, table, pk, pklen, db_version, BUMP_SEQ(data));
2678 12 : if (rc != SQLITE_OK) goto cleanup;
2679 :
2680 : // remove any metadata related to the old rows associated with this primary key
2681 12 : rc = local_drop_meta(db, table, pk, pklen);
2682 12 : if (rc != SQLITE_OK) goto cleanup;
2683 :
2684 : cleanup:
2685 12 : if (rc != SQLITE_OK) sqlite3_result_error(context, sqlite3_errmsg(db), -1);
2686 : // free memory if the primary key was dynamically allocated
2687 12 : if (pk != buffer) cloudsync_memory_free(pk);
2688 12 : }
2689 :
2690 : // MARK: -
2691 :
2692 6 : int cloudsync_cleanup_internal (sqlite3_context *context, const char *table_name) {
2693 6 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2694 :
2695 : // get database reference
2696 6 : sqlite3 *db = sqlite3_context_db_handle(context);
2697 :
2698 : // init cloudsync_settings
2699 6 : if (cloudsync_context_init(db, data, context) == NULL) return SQLITE_MISUSE;
2700 :
2701 6 : cloudsync_table_context *table = table_lookup(data, table_name);
2702 6 : if (!table) return SQLITE_OK;
2703 :
2704 6 : table_remove_from_context(data, table);
2705 6 : table_free(table);
2706 :
2707 : // drop meta-table
2708 6 : char *sql = cloudsync_memory_mprintf("DROP TABLE IF EXISTS \"%w_cloudsync\";", table_name);
2709 6 : int rc = sqlite3_exec(db, sql, NULL, NULL, NULL);
2710 6 : cloudsync_memory_free(sql);
2711 6 : if (rc != SQLITE_OK) {
2712 0 : dbutils_context_result_error(context, "Unable to drop cloudsync table %s_cloudsync in cloudsync_cleanup.", table_name);
2713 0 : sqlite3_result_error_code(context, rc);
2714 0 : return rc;
2715 : }
2716 :
2717 : // drop original triggers
2718 6 : dbutils_delete_triggers(db, table_name);
2719 6 : if (rc != SQLITE_OK) {
2720 0 : dbutils_context_result_error(context, "Unable to drop cloudsync table %s_cloudsync in cloudsync_cleanup.", table_name);
2721 0 : sqlite3_result_error_code(context, rc);
2722 0 : return rc;
2723 : }
2724 :
2725 : // remove all table related settings
2726 6 : dbutils_table_settings_set_key_value(db, context, table_name, NULL, NULL, NULL);
2727 :
2728 6 : return SQLITE_OK;
2729 6 : }
2730 :
2731 1 : void cloudsync_cleanup_all (sqlite3_context *context) {
2732 1 : char *sql = "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'cloudsync_%' AND name NOT LIKE '%_cloudsync';";
2733 :
2734 1 : sqlite3 *db = sqlite3_context_db_handle(context);
2735 1 : char **result = NULL;
2736 : int nrows, ncols;
2737 : char *errmsg;
2738 1 : int rc = sqlite3_get_table(db, sql, &result, &nrows, &ncols, &errmsg);
2739 1 : if (errmsg || ncols != 1) {
2740 0 : printf("cloudsync_cleanup_all error: %s\n", errmsg ? errmsg : "invalid table");
2741 0 : goto cleanup;
2742 : }
2743 :
2744 1 : rc = SQLITE_OK;
2745 6 : for (int i = ncols; i < nrows+ncols; i+=ncols) {
2746 5 : int rc2 = cloudsync_cleanup_internal(context, result[i]);
2747 5 : if (rc2 != SQLITE_OK) rc = rc2;
2748 5 : }
2749 :
2750 2 : if (rc == SQLITE_OK) {
2751 1 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2752 1 : data->site_id[0] = 0;
2753 1 : dbutils_settings_cleanup(db);
2754 1 : }
2755 :
2756 : cleanup:
2757 1 : sqlite3_free_table(result);
2758 1 : sqlite3_free(errmsg);
2759 1 : }
2760 :
2761 2 : void cloudsync_cleanup (sqlite3_context *context, int argc, sqlite3_value **argv) {
2762 : DEBUG_FUNCTION("cloudsync_cleanup");
2763 :
2764 2 : const char *table = (const char *)sqlite3_value_text(argv[0]);
2765 2 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2766 2 : sqlite3 *db = sqlite3_context_db_handle(context);
2767 :
2768 2 : if (dbutils_is_star_table(table)) cloudsync_cleanup_all(context);
2769 1 : else cloudsync_cleanup_internal(context, table);
2770 :
2771 2 : if (dbutils_table_exists(db, CLOUDSYNC_TABLE_SETTINGS_NAME) == true) dbutils_update_schema_hash(db, &data->schema_hash);
2772 2 : }
2773 :
2774 2 : void cloudsync_enable_disable (sqlite3_context *context, const char *table_name, bool value) {
2775 : DEBUG_FUNCTION("cloudsync_enable_disable");
2776 :
2777 2 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2778 2 : cloudsync_table_context *table = table_lookup(data, table_name);
2779 2 : if (!table) return;
2780 :
2781 2 : table->enabled = value;
2782 2 : }
2783 :
2784 28 : int cloudsync_enable_disable_all_callback (void *xdata, int ncols, char **values, char **names) {
2785 28 : sqlite3_context *context = (sqlite3_context *)xdata;
2786 28 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2787 28 : bool value = data->temp_bool;
2788 :
2789 56 : for (int i=0; i<ncols; i++) {
2790 28 : const char *table_name = values[i];
2791 28 : cloudsync_table_context *table = table_lookup(data, table_name);
2792 28 : if (!table) continue;
2793 10 : table->enabled = value;
2794 10 : }
2795 :
2796 28 : return SQLITE_OK;
2797 : }
2798 :
2799 2 : void cloudsync_enable_disable_all (sqlite3_context *context, bool value) {
2800 : DEBUG_FUNCTION("cloudsync_enable_disable_all");
2801 :
2802 2 : char *sql = "SELECT name FROM sqlite_master WHERE type='table';";
2803 :
2804 2 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2805 2 : data->temp_bool = value;
2806 2 : sqlite3 *db = sqlite3_context_db_handle(context);
2807 2 : sqlite3_exec(db, sql, cloudsync_enable_disable_all_callback, context, NULL);
2808 2 : }
2809 :
2810 2 : void cloudsync_enable (sqlite3_context *context, int argc, sqlite3_value **argv) {
2811 : DEBUG_FUNCTION("cloudsync_enable");
2812 :
2813 2 : const char *table = (const char *)sqlite3_value_text(argv[0]);
2814 2 : if (dbutils_is_star_table(table)) cloudsync_enable_disable_all(context, true);
2815 1 : else cloudsync_enable_disable(context, table, true);
2816 2 : }
2817 :
2818 2 : void cloudsync_disable (sqlite3_context *context, int argc, sqlite3_value **argv) {
2819 : DEBUG_FUNCTION("cloudsync_disable");
2820 :
2821 2 : const char *table = (const char *)sqlite3_value_text(argv[0]);
2822 2 : if (dbutils_is_star_table(table)) cloudsync_enable_disable_all(context, false);
2823 1 : else cloudsync_enable_disable(context, table, false);
2824 2 : }
2825 :
2826 2854 : void cloudsync_is_enabled (sqlite3_context *context, int argc, sqlite3_value **argv) {
2827 : DEBUG_FUNCTION("cloudsync_is_enabled");
2828 :
2829 2854 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2830 2854 : const char *table_name = (const char *)sqlite3_value_text(argv[0]);
2831 2854 : cloudsync_table_context *table = table_lookup(data, table_name);
2832 :
2833 2854 : int result = (table && table->enabled) ? 1 : 0;
2834 2854 : sqlite3_result_int(context, result);
2835 2854 : }
2836 :
2837 48 : void cloudsync_terminate (sqlite3_context *context, int argc, sqlite3_value **argv) {
2838 : DEBUG_FUNCTION("cloudsync_terminate");
2839 :
2840 48 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2841 :
2842 120 : for (int i=0; i<data->tables_count; ++i) {
2843 72 : if (data->tables[i]) table_free(data->tables[i]);
2844 72 : data->tables[i] = NULL;
2845 72 : }
2846 :
2847 48 : if (data->schema_version_stmt) sqlite3_finalize(data->schema_version_stmt);
2848 48 : if (data->data_version_stmt) sqlite3_finalize(data->data_version_stmt);
2849 48 : if (data->db_version_stmt) sqlite3_finalize(data->db_version_stmt);
2850 48 : if (data->getset_siteid_stmt) sqlite3_finalize(data->getset_siteid_stmt);
2851 :
2852 48 : data->schema_version_stmt = NULL;
2853 48 : data->data_version_stmt = NULL;
2854 48 : data->db_version_stmt = NULL;
2855 48 : data->getset_siteid_stmt = NULL;
2856 :
2857 : // reset the site_id so the cloudsync_context_init will be executed again
2858 : // if any other cloudsync function is called after terminate
2859 48 : data->site_id[0] = 0;
2860 48 : }
2861 :
2862 : // MARK: -
2863 :
2864 48 : int cloudsync_load_siteid (sqlite3 *db, cloudsync_context *data) {
2865 : // check if site_id was already loaded
2866 48 : if (data->site_id[0] != 0) return SQLITE_OK;
2867 :
2868 : // load site_id
2869 : int size, rc;
2870 47 : char *buffer = dbutils_blob_select(db, "SELECT site_id FROM cloudsync_site_id WHERE rowid=0;", &size, data->sqlite_ctx, &rc);
2871 47 : if (!buffer) return rc;
2872 47 : if (size != UUID_LEN) return SQLITE_MISUSE;
2873 :
2874 47 : memcpy(data->site_id, buffer, UUID_LEN);
2875 47 : cloudsync_memory_free(buffer);
2876 :
2877 47 : return SQLITE_OK;
2878 48 : }
2879 :
2880 100 : int cloudsync_init_internal (sqlite3_context *context, const char *table_name, const char *algo_name, bool skip_int_pk_check) {
2881 : DEBUG_FUNCTION("cloudsync_init_internal");
2882 :
2883 : // get database reference
2884 100 : sqlite3 *db = sqlite3_context_db_handle(context);
2885 :
2886 : // retrieve global context
2887 100 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2888 :
2889 : // sanity check table and its primary key(s)
2890 100 : if (dbutils_table_sanity_check(db, context, table_name, skip_int_pk_check) == false) {
2891 1 : return SQLITE_MISUSE;
2892 : }
2893 :
2894 : // init cloudsync_settings
2895 99 : if (cloudsync_context_init(db, data, context) == NULL) return SQLITE_MISUSE;
2896 :
2897 : // sanity check algo name (if exists)
2898 99 : table_algo algo_new = table_algo_none;
2899 99 : if (!algo_name) {
2900 4 : algo_name = CLOUDSYNC_DEFAULT_ALGO;
2901 4 : }
2902 :
2903 99 : algo_new = crdt_algo_from_name(algo_name);
2904 99 : if (algo_new == table_algo_none) {
2905 1 : dbutils_context_result_error(context, "algo name %s does not exist", crdt_algo_name);
2906 1 : return SQLITE_MISUSE;
2907 : }
2908 :
2909 : // check if table name was already augmented
2910 98 : table_algo algo_current = dbutils_table_settings_get_algo(db, table_name);
2911 :
2912 : // sanity check algorithm
2913 98 : if ((algo_new == algo_current) && (algo_current != table_algo_none)) {
2914 : // if table algorithms and the same and not none, do nothing
2915 98 : } else if ((algo_new == table_algo_none) && (algo_current == table_algo_none)) {
2916 : // nothing is written into settings because the default table_algo_crdt_cls will be used
2917 0 : algo_new = algo_current = table_algo_crdt_cls;
2918 69 : } else if ((algo_new == table_algo_none) && (algo_current != table_algo_none)) {
2919 : // algo is already written into settins so just use it
2920 0 : algo_new = algo_current;
2921 69 : } else if ((algo_new != table_algo_none) && (algo_current == table_algo_none)) {
2922 : // write table algo name in settings
2923 69 : dbutils_table_settings_set_key_value(NULL, context, table_name, "*", "algo", algo_name);
2924 69 : } else {
2925 : // error condition
2926 0 : dbutils_context_result_error(context, "%s", "Before changing a table algorithm you must call cloudsync_cleanup(table_name)");
2927 0 : return SQLITE_MISUSE;
2928 : }
2929 :
2930 : // Run the following function even if table was already augmented.
2931 : // It is safe to call the following function multiple times, if there is nothing to update nothing will be changed.
2932 : // After an alter table, in contrast, all the cloudsync triggers, tables and stmts must be recreated.
2933 :
2934 : // sync algo with table (unused in this version)
2935 : // cloudsync_sync_table_key(data, table_name, "*", CLOUDSYNC_KEY_ALGO, crdt_algo_name(algo_new));
2936 :
2937 : // check triggers
2938 98 : dbutils_check_triggers(db, table_name, algo_new);
2939 :
2940 : // check meta-table
2941 98 : dbutils_check_metatable(db, table_name, algo_new);
2942 :
2943 : // add prepared statements
2944 98 : if (stmts_add_tocontext(db, data) != SQLITE_OK) {
2945 0 : dbutils_context_result_error(context, "%s", "An error occurred while trying to compile prepared SQL statements.");
2946 0 : return SQLITE_MISUSE;
2947 : }
2948 :
2949 : // add table to in-memory data context
2950 98 : if (table_add_to_context(db, data, algo_new, table_name) == false) {
2951 0 : dbutils_context_result_error(context, "An error occurred while adding %s table information to global context", table_name);
2952 0 : return SQLITE_MISUSE;
2953 : }
2954 :
2955 98 : if (cloudsync_refill_metatable(db, data, table_name) != SQLITE_OK) {
2956 0 : dbutils_context_result_error(context, "%s", "An error occurred while trying to fill the augmented table.");
2957 0 : return SQLITE_MISUSE;
2958 : }
2959 :
2960 98 : return SQLITE_OK;
2961 100 : }
2962 :
2963 1 : int cloudsync_init_all (sqlite3_context *context, const char *algo_name, bool skip_int_pk_check) {
2964 : char sql[1024];
2965 1 : snprintf(sql, sizeof(sql), "SELECT name, '%s' FROM sqlite_master WHERE type='table' and name NOT LIKE 'sqlite_%%' AND name NOT LIKE 'cloudsync_%%' AND name NOT LIKE '%%_cloudsync';", (algo_name) ? algo_name : CLOUDSYNC_DEFAULT_ALGO);
2966 :
2967 1 : sqlite3 *db = sqlite3_context_db_handle(context);
2968 1 : sqlite3_stmt *vm = NULL;
2969 1 : int rc = sqlite3_prepare_v2(db, sql, -1, &vm, NULL);
2970 1 : if (rc != SQLITE_OK) goto abort_init_all;
2971 :
2972 6 : while (1) {
2973 6 : rc = sqlite3_step(vm);
2974 6 : if (rc == SQLITE_DONE) break;
2975 5 : else if (rc != SQLITE_ROW) goto abort_init_all;
2976 :
2977 5 : const char *table = (const char *)sqlite3_column_text(vm, 0);
2978 5 : const char *algo = (const char *)sqlite3_column_text(vm, 1);
2979 5 : rc = cloudsync_init_internal(context, table, algo, skip_int_pk_check);
2980 5 : if (rc != SQLITE_OK) {cloudsync_cleanup_internal(context, table); goto abort_init_all;}
2981 : }
2982 1 : rc = SQLITE_OK;
2983 :
2984 : abort_init_all:
2985 1 : if (vm) sqlite3_finalize(vm);
2986 1 : return rc;
2987 : }
2988 :
2989 73 : void cloudsync_init (sqlite3_context *context, const char *table, const char *algo, bool skip_int_pk_check) {
2990 73 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
2991 73 : data->sqlite_ctx = context;
2992 :
2993 73 : sqlite3 *db = sqlite3_context_db_handle(context);
2994 73 : int rc = sqlite3_exec(db, "SAVEPOINT cloudsync_init;", NULL, NULL, NULL);
2995 73 : if (rc != SQLITE_OK) {
2996 0 : dbutils_context_result_error(context, "Unable to create cloudsync_init savepoint. %s", sqlite3_errmsg(db));
2997 0 : sqlite3_result_error_code(context, rc);
2998 0 : return;
2999 : }
3000 :
3001 73 : if (dbutils_is_star_table(table)) rc = cloudsync_init_all(context, algo, skip_int_pk_check);
3002 72 : else rc = cloudsync_init_internal(context, table, algo, skip_int_pk_check);
3003 :
3004 73 : if (rc == SQLITE_OK) {
3005 71 : rc = sqlite3_exec(db, "RELEASE cloudsync_init", NULL, NULL, NULL);
3006 71 : if (rc != SQLITE_OK) {
3007 0 : dbutils_context_result_error(context, "Unable to release cloudsync_init savepoint. %s", sqlite3_errmsg(db));
3008 0 : sqlite3_result_error_code(context, rc);
3009 0 : }
3010 71 : }
3011 :
3012 73 : if (rc == SQLITE_OK) dbutils_update_schema_hash(db, &data->schema_hash);
3013 2 : else sqlite3_exec(db, "ROLLBACK TO cloudsync_init; RELEASE cloudsync_init", NULL, NULL, NULL);
3014 73 : }
3015 :
3016 8 : void cloudsync_init3 (sqlite3_context *context, int argc, sqlite3_value **argv) {
3017 : DEBUG_FUNCTION("cloudsync_init2");
3018 :
3019 8 : const char *table = (const char *)sqlite3_value_text(argv[0]);
3020 8 : const char *algo = (const char *)sqlite3_value_text(argv[1]);
3021 8 : bool skip_int_pk_check = (bool)sqlite3_value_int(argv[2]);
3022 :
3023 8 : cloudsync_init(context, table, algo, skip_int_pk_check);
3024 8 : }
3025 :
3026 59 : void cloudsync_init2 (sqlite3_context *context, int argc, sqlite3_value **argv) {
3027 : DEBUG_FUNCTION("cloudsync_init2");
3028 :
3029 59 : const char *table = (const char *)sqlite3_value_text(argv[0]);
3030 59 : const char *algo = (const char *)sqlite3_value_text(argv[1]);
3031 :
3032 59 : cloudsync_init(context, table, algo, false);
3033 59 : }
3034 :
3035 6 : void cloudsync_init1 (sqlite3_context *context, int argc, sqlite3_value **argv) {
3036 : DEBUG_FUNCTION("cloudsync_init1");
3037 :
3038 6 : const char *table = (const char *)sqlite3_value_text(argv[0]);
3039 :
3040 6 : cloudsync_init(context, table, NULL, false);
3041 6 : }
3042 :
3043 : // MARK: -
3044 :
3045 24 : void cloudsync_begin_alter (sqlite3_context *context, int argc, sqlite3_value **argv) {
3046 : DEBUG_FUNCTION("cloudsync_begin_alter");
3047 24 : char *errmsg = NULL;
3048 24 : char **result = NULL;
3049 :
3050 24 : const char *table_name = (const char *)sqlite3_value_text(argv[0]);
3051 :
3052 : // get database reference
3053 24 : sqlite3 *db = sqlite3_context_db_handle(context);
3054 :
3055 : // retrieve global context
3056 24 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
3057 :
3058 : // init cloudsync_settings
3059 24 : if (cloudsync_context_init(db, data, context) == NULL) {
3060 0 : sqlite3_result_error(context, "Unable to init the cloudsync context.", -1);
3061 0 : sqlite3_result_error_code(context, SQLITE_MISUSE);
3062 0 : return;
3063 : }
3064 :
3065 : // create a savepoint to manage the alter operations as a transaction
3066 24 : int rc = sqlite3_exec(db, "SAVEPOINT cloudsync_alter", NULL, NULL, NULL);
3067 24 : if (rc != SQLITE_OK) {
3068 0 : sqlite3_result_error(context, "Unable to create cloudsync_alter savepoint.", -1);
3069 0 : sqlite3_result_error_code(context, rc);
3070 0 : goto rollback_begin_alter;
3071 : }
3072 :
3073 24 : cloudsync_table_context *table = table_lookup(data, table_name);
3074 24 : if (!table) {
3075 1 : dbutils_context_result_error(context, "Unable to find table %s", table_name);
3076 1 : sqlite3_result_error_code(context, SQLITE_MISUSE);
3077 1 : goto rollback_begin_alter;
3078 : }
3079 :
3080 : int nrows, ncols;
3081 23 : char *sql = cloudsync_memory_mprintf("SELECT name FROM pragma_table_info('%q') WHERE pk>0 ORDER BY pk;", table_name);
3082 23 : rc = sqlite3_get_table(db, sql, &result, &nrows, &ncols, &errmsg);
3083 23 : cloudsync_memory_free(sql);
3084 23 : if (errmsg || ncols != 1 || nrows != table->npks) {
3085 0 : dbutils_context_result_error(context, "Unable to get primary keys for table %s (%s)", table_name, errmsg);
3086 0 : sqlite3_result_error_code(context, SQLITE_MISUSE);
3087 0 : goto rollback_begin_alter;
3088 : }
3089 :
3090 : // drop original triggers
3091 23 : dbutils_delete_triggers(db, table_name);
3092 23 : if (rc != SQLITE_OK) {
3093 0 : dbutils_context_result_error(context, "Unable to delete triggers for table %s in cloudsync_begin_alter.", table_name);
3094 0 : sqlite3_result_error_code(context, rc);
3095 0 : goto rollback_begin_alter;
3096 : }
3097 :
3098 23 : if (table->pk_name) sqlite3_free_table(table->pk_name);
3099 23 : table->pk_name = result;
3100 23 : return;
3101 :
3102 : rollback_begin_alter:
3103 1 : sqlite3_exec(db, "ROLLBACK TO cloudsync_alter; RELEASE cloudsync_alter;", NULL, NULL, NULL);
3104 :
3105 : cleanup_begin_alter:
3106 1 : sqlite3_free_table(result);
3107 1 : sqlite3_free(errmsg);
3108 24 : }
3109 :
3110 24 : void cloudsync_commit_alter (sqlite3_context *context, int argc, sqlite3_value **argv) {
3111 : DEBUG_FUNCTION("cloudsync_commit_alter");
3112 :
3113 24 : const char *table_name = (const char *)sqlite3_value_text(argv[0]);
3114 24 : cloudsync_table_context *table = NULL;
3115 :
3116 : // get database reference
3117 24 : sqlite3 *db = sqlite3_context_db_handle(context);
3118 :
3119 : // retrieve global context
3120 24 : cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
3121 :
3122 : // init cloudsync_settings
3123 24 : if (cloudsync_context_init(db, data, context) == NULL) {
3124 0 : dbutils_context_result_error(context, "Unable to init the cloudsync context.");
3125 0 : sqlite3_result_error_code(context, SQLITE_MISUSE);
3126 0 : goto rollback_finalize_alter;
3127 : }
3128 :
3129 24 : table = table_lookup(data, table_name);
3130 24 : if (!table || !table->pk_name) {
3131 1 : dbutils_context_result_error(context, "Unable to find table context.");
3132 1 : sqlite3_result_error_code(context, SQLITE_MISUSE);
3133 1 : goto rollback_finalize_alter;
3134 : }
3135 :
3136 23 : int rc = cloudsync_finalize_alter(context, data, table);
3137 23 : if (rc != SQLITE_OK) goto rollback_finalize_alter;
3138 :
3139 : // the table is outdated, delete it and it will be reloaded in the cloudsync_init_internal
3140 23 : table_remove(data, table_name);
3141 23 : table_free(table);
3142 23 : table = NULL;
3143 :
3144 : // init again cloudsync for the table
3145 23 : table_algo algo_current = dbutils_table_settings_get_algo(db, table_name);
3146 23 : if (algo_current == table_algo_none) algo_current = dbutils_table_settings_get_algo(db, "*");
3147 23 : rc = cloudsync_init_internal(context, table_name, crdt_algo_name(algo_current), true);
3148 23 : if (rc != SQLITE_OK) goto rollback_finalize_alter;
3149 :
3150 : // release savepoint
3151 23 : rc = sqlite3_exec(db, "RELEASE cloudsync_alter", NULL, NULL, NULL);
3152 23 : if (rc != SQLITE_OK) {
3153 0 : dbutils_context_result_error(context, sqlite3_errmsg(db));
3154 0 : sqlite3_result_error_code(context, rc);
3155 0 : goto rollback_finalize_alter;
3156 : }
3157 :
3158 23 : dbutils_update_schema_hash(db, &data->schema_hash);
3159 :
3160 23 : return;
3161 :
3162 : rollback_finalize_alter:
3163 1 : sqlite3_exec(db, "ROLLBACK TO cloudsync_alter; RELEASE cloudsync_alter;", NULL, NULL, NULL);
3164 1 : if (table) {
3165 0 : sqlite3_free_table(table->pk_name);
3166 0 : table->pk_name = NULL;
3167 0 : }
3168 24 : }
3169 :
3170 : // MARK: - Main Entrypoint -
3171 :
3172 48 : int cloudsync_register (sqlite3 *db, char **pzErrMsg) {
3173 48 : int rc = SQLITE_OK;
3174 :
3175 : // there's no built-in way to verify if sqlite3_cloudsync_init has already been called
3176 : // for this specific database connection, we use a workaround: we attempt to retrieve the
3177 : // cloudsync_version and check for an error, an error indicates that initialization has not been performed
3178 48 : if (sqlite3_exec(db, "SELECT cloudsync_version();", NULL, NULL, NULL) == SQLITE_OK) return SQLITE_OK;
3179 :
3180 : // init memory debugger (NOOP in production)
3181 : cloudsync_memory_init(1);
3182 :
3183 : // init context
3184 46 : void *ctx = cloudsync_context_create();
3185 46 : if (!ctx) {
3186 0 : if (pzErrMsg) *pzErrMsg = "Not enought memory to create a database context";
3187 0 : return SQLITE_NOMEM;
3188 : }
3189 :
3190 : // register functions
3191 :
3192 : // PUBLIC functions
3193 46 : rc = dbutils_register_function(db, "cloudsync_version", cloudsync_version, 0, pzErrMsg, ctx, cloudsync_context_free);
3194 46 : if (rc != SQLITE_OK) return rc;
3195 :
3196 46 : rc = dbutils_register_function(db, "cloudsync_init", cloudsync_init1, 1, pzErrMsg, ctx, NULL);
3197 46 : if (rc != SQLITE_OK) return rc;
3198 :
3199 46 : rc = dbutils_register_function(db, "cloudsync_init", cloudsync_init2, 2, pzErrMsg, ctx, NULL);
3200 46 : if (rc != SQLITE_OK) return rc;
3201 :
3202 46 : rc = dbutils_register_function(db, "cloudsync_init", cloudsync_init3, 3, pzErrMsg, ctx, NULL);
3203 46 : if (rc != SQLITE_OK) return rc;
3204 :
3205 :
3206 46 : rc = dbutils_register_function(db, "cloudsync_enable", cloudsync_enable, 1, pzErrMsg, ctx, NULL);
3207 46 : if (rc != SQLITE_OK) return rc;
3208 :
3209 46 : rc = dbutils_register_function(db, "cloudsync_disable", cloudsync_disable, 1, pzErrMsg, ctx, NULL);
3210 46 : if (rc != SQLITE_OK) return rc;
3211 :
3212 46 : rc = dbutils_register_function(db, "cloudsync_is_enabled", cloudsync_is_enabled, 1, pzErrMsg, ctx, NULL);
3213 46 : if (rc != SQLITE_OK) return rc;
3214 :
3215 46 : rc = dbutils_register_function(db, "cloudsync_cleanup", cloudsync_cleanup, 1, pzErrMsg, ctx, NULL);
3216 46 : if (rc != SQLITE_OK) return rc;
3217 :
3218 46 : rc = dbutils_register_function(db, "cloudsync_terminate", cloudsync_terminate, 0, pzErrMsg, ctx, NULL);
3219 46 : if (rc != SQLITE_OK) return rc;
3220 :
3221 46 : rc = dbutils_register_function(db, "cloudsync_set", cloudsync_set, 2, pzErrMsg, ctx, NULL);
3222 46 : if (rc != SQLITE_OK) return rc;
3223 :
3224 46 : rc = dbutils_register_function(db, "cloudsync_set_table", cloudsync_set_table, 3, pzErrMsg, ctx, NULL);
3225 46 : if (rc != SQLITE_OK) return rc;
3226 :
3227 46 : rc = dbutils_register_function(db, "cloudsync_set_column", cloudsync_set_column, 4, pzErrMsg, ctx, NULL);
3228 46 : if (rc != SQLITE_OK) return rc;
3229 :
3230 46 : rc = dbutils_register_function(db, "cloudsync_siteid", cloudsync_siteid, 0, pzErrMsg, ctx, NULL);
3231 46 : if (rc != SQLITE_OK) return rc;
3232 :
3233 46 : rc = dbutils_register_function(db, "cloudsync_db_version", cloudsync_db_version, 0, pzErrMsg, ctx, NULL);
3234 46 : if (rc != SQLITE_OK) return rc;
3235 :
3236 46 : rc = dbutils_register_function(db, "cloudsync_db_version_next", cloudsync_db_version_next, 0, pzErrMsg, ctx, NULL);
3237 46 : if (rc != SQLITE_OK) return rc;
3238 :
3239 46 : rc = dbutils_register_function(db, "cloudsync_db_version_next", cloudsync_db_version_next, 1, pzErrMsg, ctx, NULL);
3240 46 : if (rc != SQLITE_OK) return rc;
3241 :
3242 46 : rc = dbutils_register_function(db, "cloudsync_begin_alter", cloudsync_begin_alter, 1, pzErrMsg, ctx, NULL);
3243 46 : if (rc != SQLITE_OK) return rc;
3244 :
3245 46 : rc = dbutils_register_function(db, "cloudsync_commit_alter", cloudsync_commit_alter, 1, pzErrMsg, ctx, NULL);
3246 46 : if (rc != SQLITE_OK) return rc;
3247 :
3248 46 : rc = dbutils_register_function(db, "cloudsync_uuid", cloudsync_uuid, 0, pzErrMsg, ctx, NULL);
3249 46 : if (rc != SQLITE_OK) return rc;
3250 :
3251 : // PAYLOAD
3252 46 : rc = dbutils_register_aggregate(db, "cloudsync_payload_encode", cloudsync_payload_encode_step, cloudsync_payload_encode_final, -1, pzErrMsg, ctx, NULL);
3253 46 : if (rc != SQLITE_OK) return rc;
3254 :
3255 46 : rc = dbutils_register_function(db, "cloudsync_payload_decode", cloudsync_payload_decode, -1, pzErrMsg, ctx, NULL);
3256 46 : if (rc != SQLITE_OK) return rc;
3257 :
3258 : // PRIVATE functions
3259 46 : rc = dbutils_register_function(db, "cloudsync_is_sync", cloudsync_is_sync, 1, pzErrMsg, ctx, NULL);
3260 46 : if (rc != SQLITE_OK) return rc;
3261 :
3262 46 : rc = dbutils_register_function(db, "cloudsync_insert", cloudsync_insert, -1, pzErrMsg, ctx, NULL);
3263 46 : if (rc != SQLITE_OK) return rc;
3264 :
3265 46 : rc = dbutils_register_function(db, "cloudsync_update", cloudsync_update, -1, pzErrMsg, ctx, NULL);
3266 46 : if (rc != SQLITE_OK) return rc;
3267 :
3268 46 : rc = dbutils_register_function(db, "cloudsync_delete", cloudsync_delete, -1, pzErrMsg, ctx, NULL);
3269 46 : if (rc != SQLITE_OK) return rc;
3270 :
3271 46 : rc = dbutils_register_function(db, "cloudsync_col_value", cloudsync_col_value, 3, pzErrMsg, ctx, NULL);
3272 46 : if (rc != SQLITE_OK) return rc;
3273 :
3274 46 : rc = dbutils_register_function(db, "cloudsync_pk_encode", cloudsync_pk_encode, -1, pzErrMsg, ctx, NULL);
3275 46 : if (rc != SQLITE_OK) return rc;
3276 :
3277 46 : rc = dbutils_register_function(db, "cloudsync_pk_decode", cloudsync_pk_decode, 2, pzErrMsg, ctx, NULL);
3278 46 : if (rc != SQLITE_OK) return rc;
3279 :
3280 46 : rc = dbutils_register_function(db, "cloudsync_seq", cloudsync_seq, 0, pzErrMsg, ctx, NULL);
3281 46 : if (rc != SQLITE_OK) return rc;
3282 :
3283 : // NETWORK LAYER
3284 : #ifndef CLOUDSYNC_OMIT_NETWORK
3285 : rc = cloudsync_network_register(db, pzErrMsg, ctx);
3286 : if (rc != SQLITE_OK) return rc;
3287 : #endif
3288 :
3289 46 : cloudsync_context *data = (cloudsync_context *)ctx;
3290 46 : sqlite3_commit_hook(db, cloudsync_commit_hook, ctx);
3291 46 : sqlite3_rollback_hook(db, cloudsync_rollback_hook, ctx);
3292 :
3293 : // register eponymous only changes virtual table
3294 46 : rc = cloudsync_vtab_register_changes (db, data);
3295 46 : if (rc != SQLITE_OK) return rc;
3296 :
3297 : // load config, if exists
3298 46 : if (cloudsync_config_exists(db)) {
3299 1 : cloudsync_context_init(db, ctx, NULL);
3300 1 : }
3301 :
3302 46 : return SQLITE_OK;
3303 48 : }
3304 :
3305 48 : APIEXPORT int sqlite3_cloudsync_init (sqlite3 *db, char **pzErrMsg, const sqlite3_api_routines *pApi) {
3306 : DEBUG_FUNCTION("sqlite3_cloudsync_init");
3307 :
3308 : #ifndef SQLITE_CORE
3309 : SQLITE_EXTENSION_INIT2(pApi);
3310 : #endif
3311 :
3312 48 : return cloudsync_register(db, pzErrMsg);
3313 : }
3314 :
|