Line data Source code
1 : //
2 : // cloudsync.c
3 : // cloudsync
4 : //
5 : // Created by Marco Bambini on 16/05/24.
6 : //
7 :
8 : #include <inttypes.h>
9 : #include <stdbool.h>
10 : #include <limits.h>
11 : #include <stdint.h>
12 : #include <stdlib.h>
13 : #include <string.h>
14 : #include <assert.h>
15 : #include <stdio.h>
16 : #include <errno.h>
17 : #include <math.h>
18 :
19 : #include "cloudsync.h"
20 : #include "lz4.h"
21 : #include "pk.h"
22 : #include "sql.h"
23 : #include "utils.h"
24 : #include "dbutils.h"
25 : #include "block.h"
26 :
27 : #ifdef _WIN32
28 : #include <winsock2.h>
29 : #include <ws2tcpip.h>
30 : #else
31 : #include <arpa/inet.h> // for htonl, htons, ntohl, ntohs
32 : #include <netinet/in.h> // for struct sockaddr_in, INADDR_ANY, etc. (if needed)
33 : #endif
34 :
35 : #ifndef htonll
36 : #if __BIG_ENDIAN__
37 : #define htonll(x) (x)
38 : #define ntohll(x) (x)
39 : #else
40 : #ifndef htobe64
41 : #define htonll(x) ((uint64_t)htonl((x) & 0xFFFFFFFF) << 32 | (uint64_t)htonl((x) >> 32))
42 : #define ntohll(x) ((uint64_t)ntohl((x) & 0xFFFFFFFF) << 32 | (uint64_t)ntohl((x) >> 32))
43 : #else
44 : #define htonll(x) htobe64(x)
45 : #define ntohll(x) be64toh(x)
46 : #endif
47 : #endif
48 : #endif
49 :
50 : #define CLOUDSYNC_INIT_NTABLES 64
51 : #define CLOUDSYNC_MIN_DB_VERSION 0
52 :
53 : #define CLOUDSYNC_PAYLOAD_MINBUF_SIZE (512*1024)
54 : #define CLOUDSYNC_PAYLOAD_SIGNATURE 0x434C5359 /* 'C','L','S','Y' */
55 : #define CLOUDSYNC_PAYLOAD_VERSION_ORIGNAL 1
56 : #define CLOUDSYNC_PAYLOAD_VERSION_1 CLOUDSYNC_PAYLOAD_VERSION_ORIGNAL
57 : #define CLOUDSYNC_PAYLOAD_VERSION_2 2
58 : #define CLOUDSYNC_PAYLOAD_VERSION_LATEST CLOUDSYNC_PAYLOAD_VERSION_2
59 : #define CLOUDSYNC_PAYLOAD_MIN_VERSION_WITH_CHECKSUM CLOUDSYNC_PAYLOAD_VERSION_2
60 :
61 : #ifndef MAX
62 : #define MAX(a, b) (((a)>(b))?(a):(b))
63 : #endif
64 :
65 : #define DEBUG_DBERROR(_rc, _fn, _data) do {if (_rc != DBRES_OK) printf("Error in %s: %s\n", _fn, database_errmsg(_data));} while (0)
66 :
67 : typedef enum {
68 : CLOUDSYNC_PK_INDEX_TBL = 0,
69 : CLOUDSYNC_PK_INDEX_PK = 1,
70 : CLOUDSYNC_PK_INDEX_COLNAME = 2,
71 : CLOUDSYNC_PK_INDEX_COLVALUE = 3,
72 : CLOUDSYNC_PK_INDEX_COLVERSION = 4,
73 : CLOUDSYNC_PK_INDEX_DBVERSION = 5,
74 : CLOUDSYNC_PK_INDEX_SITEID = 6,
75 : CLOUDSYNC_PK_INDEX_CL = 7,
76 : CLOUDSYNC_PK_INDEX_SEQ = 8
77 : } CLOUDSYNC_PK_INDEX;
78 :
79 : typedef enum {
80 : DBVM_VALUE_ERROR = -1,
81 : DBVM_VALUE_UNCHANGED = 0,
82 : DBVM_VALUE_CHANGED = 1,
83 : } DBVM_VALUE;
84 :
85 : #define SYNCBIT_SET(_data) _data->insync = 1
86 : #define SYNCBIT_RESET(_data) _data->insync = 0
87 :
88 : // MARK: - Deferred column-batch merge -
89 :
90 : typedef struct {
91 : const char *col_name; // pointer into table_context->col_name[idx] (stable)
92 : dbvalue_t *col_value; // duplicated via database_value_dup (owned)
93 : int64_t col_version;
94 : int64_t db_version;
95 : uint8_t site_id[UUID_LEN];
96 : int site_id_len;
97 : int64_t seq;
98 : } merge_pending_entry;
99 :
100 : typedef struct {
101 : cloudsync_table_context *table;
102 : char *pk; // malloc'd copy, freed on flush
103 : int pk_len;
104 : int64_t cl;
105 : bool sentinel_pending;
106 : bool row_exists; // true when the PK already exists locally
107 : int count;
108 : int capacity;
109 : merge_pending_entry *entries;
110 :
111 : // Statement cache — reuse the prepared statement when the column
112 : // combination and row_exists flag match between consecutive PK flushes.
113 : dbvm_t *cached_vm;
114 : bool cached_row_exists;
115 : int cached_col_count;
116 : const char **cached_col_names; // array of pointers into table_context (not owned)
117 : } merge_pending_batch;
118 :
119 : // MARK: -
120 :
121 : struct cloudsync_pk_decode_bind_context {
122 : dbvm_t *vm;
123 : char *tbl;
124 : int64_t tbl_len;
125 : const void *pk;
126 : int64_t pk_len;
127 : char *col_name;
128 : int64_t col_name_len;
129 : int64_t col_version;
130 : int64_t db_version;
131 : const void *site_id;
132 : int64_t site_id_len;
133 : int64_t cl;
134 : int64_t seq;
135 : };
136 :
137 : struct cloudsync_context {
138 : void *db;
139 : char errmsg[1024];
140 : int errcode;
141 :
142 : char *libversion;
143 : uint8_t site_id[UUID_LEN];
144 : int insync;
145 : int debug;
146 : bool merge_equal_values;
147 : void *aux_data;
148 :
149 : // stmts and context values
150 : dbvm_t *schema_version_stmt;
151 : dbvm_t *data_version_stmt;
152 : dbvm_t *db_version_stmt;
153 : dbvm_t *getset_siteid_stmt;
154 : int data_version;
155 : int schema_version;
156 : uint64_t schema_hash;
157 :
158 : // set at transaction start and reset on commit/rollback
159 : int64_t db_version;
160 : // version the DB would have if the transaction committed now
161 : int64_t pending_db_version;
162 : // used to set an order inside each transaction
163 : int seq;
164 :
165 : // optional schema_name to be set in the cloudsync_table_context
166 : char *current_schema;
167 :
168 : // augmented tables are stored in-memory so we do not need to retrieve information about
169 : // col_names and cid from the disk each time a write statement is performed
170 : // we do also not need to use an hash map here because for few tables the direct
171 : // in-memory comparison with table name is faster
172 : cloudsync_table_context **tables; // dense vector: [0..tables_count-1] are valid
173 : int tables_count; // size
174 : int tables_cap; // capacity
175 :
176 : int skip_decode_idx; // -1 in sqlite, col_value index in postgresql
177 :
178 : // deferred column-batch merge (active during payload_apply)
179 : merge_pending_batch *pending_batch;
180 : };
181 :
182 : struct cloudsync_table_context {
183 : table_algo algo; // CRDT algoritm associated to the table
184 : char *name; // table name
185 : char *schema; // table schema
186 : char *meta_ref; // schema-qualified meta table name (e.g. "schema"."name_cloudsync")
187 : char *base_ref; // schema-qualified base table name (e.g. "schema"."name")
188 : char **col_name; // array of column names
189 : dbvm_t **col_merge_stmt; // array of merge insert stmt (indexed by col_name)
190 : dbvm_t **col_value_stmt; // array of column value stmt (indexed by col_name)
191 : int *col_id; // array of column id
192 : col_algo_t *col_algo; // per-column algorithm (normal or block)
193 : char **col_delimiter; // per-column delimiter for block splitting (NULL = default "\n")
194 : bool has_block_cols; // quick check: does this table have any block columns?
195 : dbvm_t *block_value_read_stmt; // SELECT col_value FROM blocks table
196 : dbvm_t *block_value_write_stmt; // INSERT OR REPLACE into blocks table
197 : dbvm_t *block_value_delete_stmt; // DELETE from blocks table
198 : dbvm_t *block_list_stmt; // SELECT block entries for materialization
199 : char *blocks_ref; // schema-qualified blocks table name
200 : int ncols; // number of non primary key cols
201 : int npks; // number of primary key cols
202 : bool enabled; // flag to check if a table is enabled or disabled
203 : #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
204 : bool rowid_only; // a table with no primary keys other than the implicit rowid
205 : #endif
206 :
207 : char **pk_name; // array of primary key names
208 :
209 : // precompiled statements
210 : dbvm_t *meta_pkexists_stmt; // check if a primary key already exist in the augmented table
211 : dbvm_t *meta_sentinel_update_stmt; // update a local sentinel row
212 : dbvm_t *meta_sentinel_insert_stmt; // insert a local sentinel row
213 : dbvm_t *meta_row_insert_update_stmt; // insert/update a local row
214 : dbvm_t *meta_row_drop_stmt; // delete rows from meta
215 : dbvm_t *meta_update_move_stmt; // update rows in meta when pk changes
216 : dbvm_t *meta_local_cl_stmt; // compute local cl value
217 : dbvm_t *meta_winner_clock_stmt; // get the rowid of the last inserted/updated row in the meta table
218 : dbvm_t *meta_merge_delete_drop;
219 : dbvm_t *meta_zero_clock_stmt;
220 : dbvm_t *meta_col_version_stmt;
221 : dbvm_t *meta_site_id_stmt;
222 :
223 : dbvm_t *real_col_values_stmt; // retrieve all column values based on pk
224 : dbvm_t *real_merge_delete_stmt;
225 : dbvm_t *real_merge_sentinel_stmt;
226 :
227 : bool is_altering; // flag to track if a table alteration is in progress
228 :
229 : // context
230 : cloudsync_context *context;
231 : };
232 :
233 : struct cloudsync_payload_context {
234 : char *buffer;
235 : size_t bsize;
236 : size_t balloc;
237 : size_t bused;
238 : uint64_t nrows;
239 : uint16_t ncols;
240 : };
241 :
242 : #ifdef _MSC_VER
243 : #pragma pack(push, 1) // For MSVC: pack struct with 1-byte alignment
244 : #define PACKED
245 : #else
246 : #define PACKED __attribute__((__packed__))
247 : #endif
248 :
249 : typedef struct PACKED {
250 : uint32_t signature; // 'CLSY'
251 : uint8_t version; // protocol version
252 : uint8_t libversion[3]; // major.minor.patch
253 : uint32_t expanded_size;
254 : uint16_t ncols;
255 : uint32_t nrows;
256 : uint64_t schema_hash;
257 : uint8_t checksum[6]; // 48 bits checksum (to ensure struct is 32 bytes)
258 : } cloudsync_payload_header;
259 :
260 : #ifdef _MSC_VER
261 : #pragma pack(pop)
262 : #endif
263 :
264 : #if CLOUDSYNC_UNITTEST
265 : bool force_uncompressed_blob = false;
266 : #define CHECK_FORCE_UNCOMPRESSED_BUFFER() if (force_uncompressed_blob) use_uncompressed_buffer = true
267 : #else
268 : #define CHECK_FORCE_UNCOMPRESSED_BUFFER()
269 : #endif
270 :
271 : // Internal prototypes
272 : int local_mark_insert_or_update_meta (cloudsync_table_context *table, const void *pk, size_t pklen, const char *col_name, int64_t db_version, int seq);
273 :
274 : // MARK: - CRDT algos -
275 :
276 620 : table_algo cloudsync_algo_from_name (const char *algo_name) {
277 620 : if (algo_name == NULL) return table_algo_none;
278 :
279 620 : if ((strcasecmp(algo_name, "CausalLengthSet") == 0) || (strcasecmp(algo_name, "cls") == 0)) return table_algo_crdt_cls;
280 265 : if ((strcasecmp(algo_name, "GrowOnlySet") == 0) || (strcasecmp(algo_name, "gos") == 0)) return table_algo_crdt_gos;
281 258 : if ((strcasecmp(algo_name, "DeleteWinsSet") == 0) || (strcasecmp(algo_name, "dws") == 0)) return table_algo_crdt_dws;
282 257 : if ((strcasecmp(algo_name, "AddWinsSet") == 0) || (strcasecmp(algo_name, "aws") == 0)) return table_algo_crdt_aws;
283 :
284 : // if nothing is found
285 256 : return table_algo_none;
286 620 : }
287 :
288 110 : const char *cloudsync_algo_name (table_algo algo) {
289 110 : switch (algo) {
290 105 : case table_algo_crdt_cls: return "cls";
291 1 : case table_algo_crdt_gos: return "gos";
292 1 : case table_algo_crdt_dws: return "dws";
293 1 : case table_algo_crdt_aws: return "aws";
294 1 : case table_algo_none: return NULL;
295 : }
296 1 : return NULL;
297 110 : }
298 :
299 : // MARK: - DBVM Utils -
300 :
301 32013 : DBVM_VALUE dbvm_execute (dbvm_t *stmt, cloudsync_context *data) {
302 32013 : if (!stmt) return DBVM_VALUE_ERROR;
303 :
304 32013 : int rc = databasevm_step(stmt);
305 32013 : if (rc != DBRES_ROW && rc != DBRES_DONE) {
306 2 : if (data) DEBUG_DBERROR(rc, "stmt_execute", data);
307 2 : databasevm_reset(stmt);
308 2 : return DBVM_VALUE_ERROR;
309 : }
310 :
311 32011 : DBVM_VALUE result = DBVM_VALUE_CHANGED;
312 32011 : if (stmt == data->data_version_stmt) {
313 29609 : int version = (int)database_column_int(stmt, 0);
314 29609 : if (version != data->data_version) {
315 224 : data->data_version = version;
316 224 : } else {
317 29385 : result = DBVM_VALUE_UNCHANGED;
318 : }
319 32011 : } else if (stmt == data->schema_version_stmt) {
320 1201 : int version = (int)database_column_int(stmt, 0);
321 1201 : if (version > data->schema_version) {
322 379 : data->schema_version = version;
323 379 : } else {
324 822 : result = DBVM_VALUE_UNCHANGED;
325 : }
326 :
327 2402 : } else if (stmt == data->db_version_stmt) {
328 1201 : data->db_version = (rc == DBRES_DONE) ? CLOUDSYNC_MIN_DB_VERSION : database_column_int(stmt, 0);
329 1201 : }
330 :
331 32011 : databasevm_reset(stmt);
332 32011 : return result;
333 32013 : }
334 :
335 3587 : int dbvm_count (dbvm_t *stmt, const char *value, size_t len, int type) {
336 3587 : int result = -1;
337 3587 : int rc = DBRES_OK;
338 :
339 3587 : if (value) {
340 3586 : rc = (type == DBTYPE_TEXT) ? databasevm_bind_text(stmt, 1, value, (int)len) : databasevm_bind_blob(stmt, 1, value, len);
341 3586 : if (rc != DBRES_OK) goto cleanup;
342 3586 : }
343 :
344 3587 : rc = databasevm_step(stmt);
345 7174 : if (rc == DBRES_DONE) {
346 1 : result = 0;
347 1 : rc = DBRES_OK;
348 3587 : } else if (rc == DBRES_ROW) {
349 3586 : result = (int)database_column_int(stmt, 0);
350 3586 : rc = DBRES_OK;
351 3586 : }
352 :
353 : cleanup:
354 3587 : databasevm_reset(stmt);
355 3587 : return result;
356 : }
357 :
358 255334 : void dbvm_reset (dbvm_t *stmt) {
359 255334 : if (!stmt) return;
360 232386 : databasevm_clear_bindings(stmt);
361 232386 : databasevm_reset(stmt);
362 255334 : }
363 :
364 : // MARK: - Database Version -
365 :
366 669 : int cloudsync_dbversion_build_query (cloudsync_context *data, char **sql_out) {
367 : // this function must be manually called each time tables changes
368 : // because the query plan changes too and it must be re-prepared
369 : // unfortunately there is no other way
370 :
371 : // we need to execute a query like:
372 : /*
373 : SELECT max(version) as version FROM (
374 : SELECT max(db_version) as version FROM "table1_cloudsync"
375 : UNION ALL
376 : SELECT max(db_version) as version FROM "table2_cloudsync"
377 : UNION ALL
378 : SELECT max(db_version) as version FROM "table3_cloudsync"
379 : UNION
380 : SELECT value as version FROM cloudsync_settings WHERE key = 'pre_alter_dbversion'
381 : )
382 : */
383 :
384 : // the good news is that the query can be computed in SQLite without the need to do any extra computation from the host language
385 :
386 669 : *sql_out = NULL;
387 669 : return database_select_text(data, SQL_DBVERSION_BUILD_QUERY, sql_out);
388 : }
389 :
390 891 : int cloudsync_dbversion_rebuild (cloudsync_context *data) {
391 891 : if (data->db_version_stmt) {
392 442 : databasevm_finalize(data->db_version_stmt);
393 442 : data->db_version_stmt = NULL;
394 442 : }
395 :
396 891 : int64_t count = dbutils_table_settings_count_tables(data);
397 891 : if (count == 0) return DBRES_OK;
398 669 : else if (count == -1) return cloudsync_set_dberror(data);
399 :
400 669 : char *sql = NULL;
401 669 : int rc = cloudsync_dbversion_build_query(data, &sql);
402 669 : if (rc != DBRES_OK) return cloudsync_set_dberror(data);
403 :
404 : // A NULL SQL with rc == OK means the generator produced a NULL row:
405 : // sqlite_master has no *_cloudsync meta-tables (for example, the user
406 : // dropped the base table and its meta-table without calling
407 : // cloudsync_cleanup, leaving stale cloudsync_table_settings rows).
408 : // Treat this the same as count == 0: no prepared statement, db_version
409 : // stays at the minimum and will be rebuilt on the next cloudsync_init.
410 : // Genuine errors from database_select_text are handled above.
411 668 : if (!sql) return DBRES_OK;
412 : DEBUG_SQL("db_version_stmt: %s", sql);
413 :
414 667 : rc = databasevm_prepare(data, sql, (void **)&data->db_version_stmt, DBFLAG_PERSISTENT);
415 : DEBUG_STMT("db_version_stmt %p", data->db_version_stmt);
416 667 : cloudsync_memory_free(sql);
417 667 : return rc;
418 891 : }
419 :
420 1201 : int cloudsync_dbversion_rerun (cloudsync_context *data) {
421 1201 : DBVM_VALUE schema_changed = dbvm_execute(data->schema_version_stmt, data);
422 1201 : if (schema_changed == DBVM_VALUE_ERROR) return -1;
423 :
424 1201 : if (schema_changed == DBVM_VALUE_CHANGED) {
425 379 : int rc = cloudsync_dbversion_rebuild(data);
426 379 : if (rc != DBRES_OK) return -1;
427 379 : }
428 :
429 1201 : if (!data->db_version_stmt) {
430 0 : data->db_version = CLOUDSYNC_MIN_DB_VERSION;
431 0 : return 0;
432 : }
433 :
434 1201 : DBVM_VALUE rc = dbvm_execute(data->db_version_stmt, data);
435 1201 : if (rc == DBVM_VALUE_ERROR) return -1;
436 1201 : return 0;
437 1201 : }
438 :
439 29609 : int cloudsync_dbversion_check_uptodate (cloudsync_context *data) {
440 : // perform a PRAGMA data_version to check if some other process write any data
441 29609 : DBVM_VALUE rc = dbvm_execute(data->data_version_stmt, data);
442 29609 : if (rc == DBVM_VALUE_ERROR) return -1;
443 :
444 : // db_version is already set and there is no need to update it
445 29609 : if (data->db_version != CLOUDSYNC_VALUE_NOTSET && rc == DBVM_VALUE_UNCHANGED) return 0;
446 :
447 1201 : return cloudsync_dbversion_rerun(data);
448 29609 : }
449 :
450 29585 : int64_t cloudsync_dbversion_next (cloudsync_context *data, int64_t merging_version) {
451 29585 : int rc = cloudsync_dbversion_check_uptodate(data);
452 29585 : if (rc != DBRES_OK) return -1;
453 :
454 29585 : int64_t result = data->db_version + 1;
455 29585 : if (result < data->pending_db_version) result = data->pending_db_version;
456 29585 : if (merging_version != CLOUDSYNC_VALUE_NOTSET && result < merging_version) result = merging_version;
457 29585 : data->pending_db_version = result;
458 :
459 29585 : return result;
460 29585 : }
461 :
462 : // MARK: - PK Context -
463 :
464 0 : char *cloudsync_pk_context_tbl (cloudsync_pk_decode_bind_context *ctx, int64_t *tbl_len) {
465 0 : *tbl_len = ctx->tbl_len;
466 0 : return ctx->tbl;
467 : }
468 :
469 0 : void *cloudsync_pk_context_pk (cloudsync_pk_decode_bind_context *ctx, int64_t *pk_len) {
470 0 : *pk_len = ctx->pk_len;
471 0 : return (void *)ctx->pk;
472 : }
473 :
474 0 : char *cloudsync_pk_context_colname (cloudsync_pk_decode_bind_context *ctx, int64_t *colname_len) {
475 0 : *colname_len = ctx->col_name_len;
476 0 : return ctx->col_name;
477 : }
478 :
479 0 : int64_t cloudsync_pk_context_cl (cloudsync_pk_decode_bind_context *ctx) {
480 0 : return ctx->cl;
481 : }
482 :
483 0 : int64_t cloudsync_pk_context_dbversion (cloudsync_pk_decode_bind_context *ctx) {
484 0 : return ctx->db_version;
485 : }
486 :
487 : // MARK: - CloudSync Context -
488 :
489 12665 : int cloudsync_insync (cloudsync_context *data) {
490 12665 : return data->insync;
491 : }
492 :
493 796 : void *cloudsync_siteid (cloudsync_context *data) {
494 796 : return (void *)data->site_id;
495 : }
496 :
497 2 : void cloudsync_reset_siteid (cloudsync_context *data) {
498 2 : memset(data->site_id, 0, sizeof(uint8_t) * UUID_LEN);
499 2 : }
500 :
501 228 : int cloudsync_load_siteid (cloudsync_context *data) {
502 : // check if site_id was already loaded
503 228 : if (data->site_id[0] != 0) return DBRES_OK;
504 :
505 : // load site_id
506 226 : char *buffer = NULL;
507 226 : int64_t size = 0;
508 226 : int rc = database_select_blob(data, SQL_SITEID_SELECT_ROWID0, &buffer, &size);
509 226 : if (rc != DBRES_OK) return rc;
510 226 : if (!buffer || size != UUID_LEN) {
511 0 : if (buffer) cloudsync_memory_free(buffer);
512 0 : return cloudsync_set_error(data, "Unable to retrieve siteid", DBRES_MISUSE);
513 : }
514 :
515 226 : memcpy(data->site_id, buffer, UUID_LEN);
516 226 : cloudsync_memory_free(buffer);
517 :
518 226 : return DBRES_OK;
519 228 : }
520 :
521 1 : int64_t cloudsync_dbversion (cloudsync_context *data) {
522 1 : return data->db_version;
523 : }
524 :
525 12151 : int cloudsync_bumpseq (cloudsync_context *data) {
526 12151 : int value = data->seq;
527 12151 : data->seq += 1;
528 12151 : return value;
529 : }
530 :
531 285 : void cloudsync_update_schema_hash (cloudsync_context *data) {
532 285 : database_update_schema_hash(data, &data->schema_hash);
533 285 : }
534 :
535 62234 : void *cloudsync_db (cloudsync_context *data) {
536 62234 : return data->db;
537 : }
538 :
539 511 : int cloudsync_add_dbvms (cloudsync_context *data) {
540 : DEBUG_DBFUNCTION("cloudsync_add_stmts");
541 :
542 511 : if (data->data_version_stmt == NULL) {
543 226 : int rc = databasevm_prepare(data, SQL_DATA_VERSION, (void **)&data->data_version_stmt, DBFLAG_PERSISTENT);
544 : DEBUG_STMT("data_version_stmt %p", data->data_version_stmt);
545 226 : if (rc != DBRES_OK) return rc;
546 : DEBUG_SQL("data_version_stmt: %s", SQL_DATA_VERSION);
547 226 : }
548 :
549 511 : if (data->schema_version_stmt == NULL) {
550 226 : int rc = databasevm_prepare(data, SQL_SCHEMA_VERSION, (void **)&data->schema_version_stmt, DBFLAG_PERSISTENT);
551 : DEBUG_STMT("schema_version_stmt %p", data->schema_version_stmt);
552 226 : if (rc != DBRES_OK) return rc;
553 : DEBUG_SQL("schema_version_stmt: %s", SQL_SCHEMA_VERSION);
554 226 : }
555 :
556 511 : if (data->getset_siteid_stmt == NULL) {
557 : // get and set index of the site_id
558 : // in SQLite, we can’t directly combine an INSERT and a SELECT to both insert a row and return an identifier (rowid) in a single statement,
559 : // however, we can use a workaround by leveraging the INSERT statement with ON CONFLICT DO UPDATE and then combining it with RETURNING rowid
560 226 : int rc = databasevm_prepare(data, SQL_SITEID_GETSET_ROWID_BY_SITEID, (void **)&data->getset_siteid_stmt, DBFLAG_PERSISTENT);
561 : DEBUG_STMT("getset_siteid_stmt %p", data->getset_siteid_stmt);
562 226 : if (rc != DBRES_OK) return rc;
563 : DEBUG_SQL("getset_siteid_stmt: %s", SQL_SITEID_GETSET_ROWID_BY_SITEID);
564 226 : }
565 :
566 511 : return cloudsync_dbversion_rebuild(data);
567 511 : }
568 :
569 22 : int cloudsync_set_error (cloudsync_context *data, const char *err_user, int err_code) {
570 : // force err_code to be something different than OK
571 22 : if (err_code == DBRES_OK) err_code = database_errcode(data);
572 22 : if (err_code == DBRES_OK) err_code = DBRES_ERROR;
573 :
574 : // compute a meaningful error message
575 22 : if (err_user == NULL) {
576 6 : snprintf(data->errmsg, sizeof(data->errmsg), "%s", database_errmsg(data));
577 6 : } else {
578 16 : const char *db_error = database_errmsg(data);
579 : char db_error_copy[sizeof(data->errmsg)];
580 16 : int rc = database_errcode(data);
581 16 : if (rc == DBRES_OK) {
582 16 : snprintf(data->errmsg, sizeof(data->errmsg), "%s", err_user);
583 16 : } else {
584 0 : if (db_error == data->errmsg) {
585 0 : snprintf(db_error_copy, sizeof(db_error_copy), "%s", db_error);
586 0 : db_error = db_error_copy;
587 0 : }
588 0 : snprintf(data->errmsg, sizeof(data->errmsg), "%s (%s)", err_user, db_error);
589 : }
590 : }
591 :
592 22 : data->errcode = err_code;
593 22 : return err_code;
594 : }
595 :
596 6 : int cloudsync_set_dberror (cloudsync_context *data) {
597 6 : return cloudsync_set_error(data, NULL, DBRES_OK);
598 : }
599 :
600 13 : const char *cloudsync_errmsg (cloudsync_context *data) {
601 13 : return data->errmsg;
602 : }
603 :
604 4 : int cloudsync_errcode (cloudsync_context *data) {
605 4 : return data->errcode;
606 : }
607 :
608 2 : void cloudsync_reset_error (cloudsync_context *data) {
609 2 : data->errmsg[0] = 0;
610 2 : data->errcode = DBRES_OK;
611 2 : }
612 :
613 3 : void *cloudsync_auxdata (cloudsync_context *data) {
614 3 : return data->aux_data;
615 : }
616 :
617 2 : void cloudsync_set_auxdata (cloudsync_context *data, void *xdata) {
618 2 : data->aux_data = xdata;
619 2 : }
620 :
621 6 : void cloudsync_set_schema (cloudsync_context *data, const char *schema) {
622 6 : if (data->current_schema && schema && strcmp(data->current_schema, schema) == 0) return;
623 5 : if (data->current_schema) cloudsync_memory_free(data->current_schema);
624 5 : data->current_schema = NULL;
625 5 : if (schema) data->current_schema = cloudsync_string_dup_lowercase(schema);
626 6 : }
627 :
628 1284 : const char *cloudsync_schema (cloudsync_context *data) {
629 1284 : return data->current_schema;
630 : }
631 :
632 4 : const char *cloudsync_table_schema (cloudsync_context *data, const char *table_name) {
633 4 : cloudsync_table_context *table = table_lookup(data, table_name);
634 4 : if (!table) return NULL;
635 :
636 1 : return table->schema;
637 4 : }
638 :
639 : // MARK: - Table Utils -
640 :
641 69 : void table_pknames_free (char **names, int nrows) {
642 69 : if (!names) return;
643 132 : for (int i = 0; i < nrows; ++i) {cloudsync_memory_free(names[i]);}
644 46 : cloudsync_memory_free(names);
645 69 : }
646 :
647 283 : char *table_build_mergedelete_sql (cloudsync_table_context *table) {
648 : #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
649 : if (table->rowid_only) {
650 : char *sql = memory_mprintf(SQL_DELETE_ROW_BY_ROWID, table->name);
651 : return sql;
652 : }
653 : #endif
654 :
655 283 : return sql_build_delete_by_pk(table->context, table->name, table->schema);
656 : }
657 :
658 1365 : char *table_build_mergeinsert_sql (cloudsync_table_context *table, const char *colname) {
659 1365 : char *sql = NULL;
660 :
661 : #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
662 : if (table->rowid_only) {
663 : if (colname == NULL) {
664 : // INSERT OR IGNORE INTO customers (first_name,last_name) VALUES (?,?);
665 : sql = memory_mprintf(SQL_INSERT_ROWID_IGNORE, table->name);
666 : } else {
667 : // INSERT INTO customers (first_name,last_name,age) VALUES (?,?,?) ON CONFLICT DO UPDATE SET age=?;
668 : sql = memory_mprintf(SQL_UPSERT_ROWID_AND_COL_BY_ROWID, table->name, colname, colname);
669 : }
670 : return sql;
671 : }
672 : #endif
673 :
674 1365 : if (colname == NULL) {
675 : // is sentinel insert
676 283 : sql = sql_build_insert_pk_ignore(table->context, table->name, table->schema);
677 283 : } else {
678 1082 : sql = sql_build_upsert_pk_and_col(table->context, table->name, colname, table->schema);
679 : }
680 1365 : return sql;
681 : }
682 :
683 1081 : char *table_build_value_sql (cloudsync_table_context *table, const char *colname) {
684 : #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
685 : if (table->rowid_only) {
686 : char *colnamequote = "\"";
687 : char *sql = memory_mprintf(SQL_SELECT_COLS_BY_ROWID_FMT, colnamequote, colname, colnamequote, table->name);
688 : return sql;
689 : }
690 : #endif
691 :
692 : // SELECT age FROM customers WHERE first_name=? AND last_name=?;
693 1081 : return sql_build_select_cols_by_pk(table->context, table->name, colname, table->schema);
694 : }
695 :
696 283 : cloudsync_table_context *table_create (cloudsync_context *data, const char *name, table_algo algo) {
697 : DEBUG_DBFUNCTION("table_create %s", name);
698 :
699 283 : cloudsync_table_context *table = (cloudsync_table_context *)cloudsync_memory_zeroalloc(sizeof(cloudsync_table_context));
700 283 : if (!table) return NULL;
701 :
702 283 : table->context = data;
703 283 : table->algo = algo;
704 283 : table->name = cloudsync_string_dup_lowercase(name);
705 :
706 : // Detect schema from metadata table location. If metadata table doesn't
707 : // exist yet (during initialization), fall back to cloudsync_schema() which
708 : // returns the explicitly set schema or current_schema().
709 283 : table->schema = database_table_schema(name);
710 283 : if (!table->schema) {
711 283 : const char *fallback_schema = cloudsync_schema(data);
712 283 : if (fallback_schema) {
713 0 : table->schema = cloudsync_string_dup(fallback_schema);
714 0 : }
715 283 : }
716 :
717 283 : if (!table->name) {
718 0 : cloudsync_memory_free(table);
719 0 : return NULL;
720 : }
721 283 : table->meta_ref = database_build_meta_ref(table->schema, table->name);
722 283 : table->base_ref = database_build_base_ref(table->schema, table->name);
723 283 : table->enabled = true;
724 :
725 283 : return table;
726 283 : }
727 :
728 283 : void table_free (cloudsync_table_context *table) {
729 : DEBUG_DBFUNCTION("table_free %s", (table) ? (table->name) : "NULL");
730 283 : if (!table) return;
731 :
732 283 : if (table->col_name) {
733 1325 : for (int i=0; i<table->ncols; ++i) {
734 1081 : cloudsync_memory_free(table->col_name[i]);
735 1081 : }
736 244 : cloudsync_memory_free(table->col_name);
737 244 : }
738 283 : if (table->col_merge_stmt) {
739 1325 : for (int i=0; i<table->ncols; ++i) {
740 1081 : databasevm_finalize(table->col_merge_stmt[i]);
741 1081 : }
742 244 : cloudsync_memory_free(table->col_merge_stmt);
743 244 : }
744 283 : if (table->col_value_stmt) {
745 1325 : for (int i=0; i<table->ncols; ++i) {
746 1081 : databasevm_finalize(table->col_value_stmt[i]);
747 1081 : }
748 244 : cloudsync_memory_free(table->col_value_stmt);
749 244 : }
750 283 : if (table->col_id) {
751 244 : cloudsync_memory_free(table->col_id);
752 244 : }
753 283 : if (table->col_algo) {
754 244 : cloudsync_memory_free(table->col_algo);
755 244 : }
756 283 : if (table->col_delimiter) {
757 1325 : for (int i=0; i<table->ncols; ++i) {
758 1081 : if (table->col_delimiter[i]) cloudsync_memory_free(table->col_delimiter[i]);
759 1081 : }
760 244 : cloudsync_memory_free(table->col_delimiter);
761 244 : }
762 :
763 283 : if (table->block_value_read_stmt) databasevm_finalize(table->block_value_read_stmt);
764 283 : if (table->block_value_write_stmt) databasevm_finalize(table->block_value_write_stmt);
765 283 : if (table->block_value_delete_stmt) databasevm_finalize(table->block_value_delete_stmt);
766 283 : if (table->block_list_stmt) databasevm_finalize(table->block_list_stmt);
767 283 : if (table->blocks_ref) cloudsync_memory_free(table->blocks_ref);
768 :
769 283 : if (table->name) cloudsync_memory_free(table->name);
770 283 : if (table->schema) cloudsync_memory_free(table->schema);
771 283 : if (table->meta_ref) cloudsync_memory_free(table->meta_ref);
772 283 : if (table->base_ref) cloudsync_memory_free(table->base_ref);
773 283 : if (table->pk_name) table_pknames_free(table->pk_name, table->npks);
774 283 : if (table->meta_pkexists_stmt) databasevm_finalize(table->meta_pkexists_stmt);
775 283 : if (table->meta_sentinel_update_stmt) databasevm_finalize(table->meta_sentinel_update_stmt);
776 283 : if (table->meta_sentinel_insert_stmt) databasevm_finalize(table->meta_sentinel_insert_stmt);
777 283 : if (table->meta_row_insert_update_stmt) databasevm_finalize(table->meta_row_insert_update_stmt);
778 283 : if (table->meta_row_drop_stmt) databasevm_finalize(table->meta_row_drop_stmt);
779 283 : if (table->meta_update_move_stmt) databasevm_finalize(table->meta_update_move_stmt);
780 283 : if (table->meta_local_cl_stmt) databasevm_finalize(table->meta_local_cl_stmt);
781 283 : if (table->meta_winner_clock_stmt) databasevm_finalize(table->meta_winner_clock_stmt);
782 283 : if (table->meta_merge_delete_drop) databasevm_finalize(table->meta_merge_delete_drop);
783 283 : if (table->meta_zero_clock_stmt) databasevm_finalize(table->meta_zero_clock_stmt);
784 283 : if (table->meta_col_version_stmt) databasevm_finalize(table->meta_col_version_stmt);
785 283 : if (table->meta_site_id_stmt) databasevm_finalize(table->meta_site_id_stmt);
786 :
787 283 : if (table->real_col_values_stmt) databasevm_finalize(table->real_col_values_stmt);
788 283 : if (table->real_merge_delete_stmt) databasevm_finalize(table->real_merge_delete_stmt);
789 283 : if (table->real_merge_sentinel_stmt) databasevm_finalize(table->real_merge_sentinel_stmt);
790 :
791 283 : cloudsync_memory_free(table);
792 283 : }
793 :
794 283 : int table_add_stmts (cloudsync_table_context *table, int ncols) {
795 283 : int rc = DBRES_OK;
796 283 : char *sql = NULL;
797 283 : cloudsync_context *data = table->context;
798 :
799 : // META TABLE statements
800 :
801 : // CREATE TABLE IF NOT EXISTS \"%w_cloudsync\" (pk BLOB NOT NULL, col_name TEXT NOT NULL, col_version INTEGER, db_version INTEGER, site_id INTEGER DEFAULT 0, seq INTEGER, PRIMARY KEY (pk, col_name));
802 :
803 : // precompile the pk exists statement
804 : // we do not need an index on the pk column because it is already covered by the fact that it is part of the prikeys
805 : // EXPLAIN QUERY PLAN reports: SEARCH table_name USING PRIMARY KEY (pk=?)
806 283 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_ROW_EXISTS_BY_PK, table->meta_ref);
807 283 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
808 : DEBUG_SQL("meta_pkexists_stmt: %s", sql);
809 :
810 283 : rc = databasevm_prepare(data, sql, (void **)&table->meta_pkexists_stmt, DBFLAG_PERSISTENT);
811 283 : cloudsync_memory_free(sql);
812 283 : if (rc != DBRES_OK) goto cleanup;
813 :
814 : // precompile the update local sentinel statement
815 283 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_UPDATE_COL_BUMP_VERSION, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
816 283 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
817 : DEBUG_SQL("meta_sentinel_update_stmt: %s", sql);
818 :
819 283 : rc = databasevm_prepare(data, sql, (void **)&table->meta_sentinel_update_stmt, DBFLAG_PERSISTENT);
820 283 : cloudsync_memory_free(sql);
821 283 : if (rc != DBRES_OK) goto cleanup;
822 :
823 : // precompile the insert local sentinel statement
824 283 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_UPSERT_COL_INIT_OR_BUMP_VERSION, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE, table->meta_ref, table->meta_ref, table->meta_ref);
825 283 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
826 : DEBUG_SQL("meta_sentinel_insert_stmt: %s", sql);
827 :
828 283 : rc = databasevm_prepare(data, sql, (void **)&table->meta_sentinel_insert_stmt, DBFLAG_PERSISTENT);
829 283 : cloudsync_memory_free(sql);
830 283 : if (rc != DBRES_OK) goto cleanup;
831 :
832 : // precompile the insert/update local row statement
833 283 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_UPSERT_RAW_COLVERSION, table->meta_ref, table->meta_ref);
834 283 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
835 : DEBUG_SQL("meta_row_insert_update_stmt: %s", sql);
836 :
837 283 : rc = databasevm_prepare(data, sql, (void **)&table->meta_row_insert_update_stmt, DBFLAG_PERSISTENT);
838 283 : cloudsync_memory_free(sql);
839 283 : if (rc != DBRES_OK) goto cleanup;
840 :
841 : // precompile the delete rows from meta
842 283 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_DELETE_PK_EXCEPT_COL, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
843 283 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
844 : DEBUG_SQL("meta_row_drop_stmt: %s", sql);
845 :
846 283 : rc = databasevm_prepare(data, sql, (void **)&table->meta_row_drop_stmt, DBFLAG_PERSISTENT);
847 283 : cloudsync_memory_free(sql);
848 283 : if (rc != DBRES_OK) goto cleanup;
849 :
850 : // precompile the update rows from meta when pk changes
851 : // see https://github.com/sqliteai/sqlite-sync/blob/main/docs/PriKey.md for more details
852 283 : sql = sql_build_rekey_pk_and_reset_version_except_col(data, table->name, CLOUDSYNC_TOMBSTONE_VALUE);
853 283 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
854 : DEBUG_SQL("meta_update_move_stmt: %s", sql);
855 :
856 283 : rc = databasevm_prepare(data, sql, (void **)&table->meta_update_move_stmt, DBFLAG_PERSISTENT);
857 283 : cloudsync_memory_free(sql);
858 283 : if (rc != DBRES_OK) goto cleanup;
859 :
860 : // local cl
861 283 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_GET_COL_VERSION_OR_ROW_EXISTS, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE, table->meta_ref);
862 283 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
863 : DEBUG_SQL("meta_local_cl_stmt: %s", sql);
864 :
865 283 : rc = databasevm_prepare(data, sql, (void **)&table->meta_local_cl_stmt, DBFLAG_PERSISTENT);
866 283 : cloudsync_memory_free(sql);
867 283 : if (rc != DBRES_OK) goto cleanup;
868 :
869 : // rowid of the last inserted/updated row in the meta table
870 283 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_INSERT_RETURN_CHANGE_ID, table->meta_ref);
871 283 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
872 : DEBUG_SQL("meta_winner_clock_stmt: %s", sql);
873 :
874 283 : rc = databasevm_prepare(data, sql, (void **)&table->meta_winner_clock_stmt, DBFLAG_PERSISTENT);
875 283 : cloudsync_memory_free(sql);
876 283 : if (rc != DBRES_OK) goto cleanup;
877 :
878 283 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_DELETE_PK_EXCEPT_COL, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
879 283 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
880 : DEBUG_SQL("meta_merge_delete_drop: %s", sql);
881 :
882 283 : rc = databasevm_prepare(data, sql, (void **)&table->meta_merge_delete_drop, DBFLAG_PERSISTENT);
883 283 : cloudsync_memory_free(sql);
884 283 : if (rc != DBRES_OK) goto cleanup;
885 :
886 : // zero clock
887 283 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_TOMBSTONE_PK_EXCEPT_COL, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
888 283 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
889 : DEBUG_SQL("meta_zero_clock_stmt: %s", sql);
890 :
891 283 : rc = databasevm_prepare(data, sql, (void **)&table->meta_zero_clock_stmt, DBFLAG_PERSISTENT);
892 283 : cloudsync_memory_free(sql);
893 283 : if (rc != DBRES_OK) goto cleanup;
894 :
895 : // col_version
896 283 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_COL_VERSION_BY_PK_COL, table->meta_ref);
897 283 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
898 : DEBUG_SQL("meta_col_version_stmt: %s", sql);
899 :
900 283 : rc = databasevm_prepare(data, sql, (void **)&table->meta_col_version_stmt, DBFLAG_PERSISTENT);
901 283 : cloudsync_memory_free(sql);
902 283 : if (rc != DBRES_OK) goto cleanup;
903 :
904 : // site_id
905 283 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_SITE_ID_BY_PK_COL, table->meta_ref);
906 283 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
907 : DEBUG_SQL("meta_site_id_stmt: %s", sql);
908 :
909 283 : rc = databasevm_prepare(data, sql, (void **)&table->meta_site_id_stmt, DBFLAG_PERSISTENT);
910 283 : cloudsync_memory_free(sql);
911 283 : if (rc != DBRES_OK) goto cleanup;
912 :
913 : // REAL TABLE statements
914 :
915 : // precompile the get column value statement
916 283 : if (ncols > 0) {
917 244 : sql = sql_build_select_nonpk_by_pk(data, table->name, table->schema);
918 244 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
919 : DEBUG_SQL("real_col_values_stmt: %s", sql);
920 :
921 244 : rc = databasevm_prepare(data, sql, (void **)&table->real_col_values_stmt, DBFLAG_PERSISTENT);
922 244 : cloudsync_memory_free(sql);
923 244 : if (rc != DBRES_OK) goto cleanup;
924 244 : }
925 :
926 283 : sql = table_build_mergedelete_sql(table);
927 283 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
928 : DEBUG_SQL("real_merge_delete: %s", sql);
929 :
930 283 : rc = databasevm_prepare(data, sql, (void **)&table->real_merge_delete_stmt, DBFLAG_PERSISTENT);
931 283 : cloudsync_memory_free(sql);
932 283 : if (rc != DBRES_OK) goto cleanup;
933 :
934 283 : sql = table_build_mergeinsert_sql(table, NULL);
935 283 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
936 : DEBUG_SQL("real_merge_sentinel: %s", sql);
937 :
938 283 : rc = databasevm_prepare(data, sql, (void **)&table->real_merge_sentinel_stmt, DBFLAG_PERSISTENT);
939 283 : cloudsync_memory_free(sql);
940 283 : if (rc != DBRES_OK) goto cleanup;
941 :
942 : cleanup:
943 283 : if (rc != DBRES_OK) DEBUG_ALWAYS("table_add_stmts error: %d %s\n", rc, database_errmsg(data));
944 283 : return rc;
945 : }
946 :
947 183996 : cloudsync_table_context *table_lookup (cloudsync_context *data, const char *table_name) {
948 : DEBUG_DBFUNCTION("table_lookup %s", table_name);
949 :
950 183996 : if (table_name) {
951 185620 : for (int i=0; i<data->tables_count; ++i) {
952 185043 : if ((strcasecmp(data->tables[i]->name, table_name) == 0)) return data->tables[i];
953 1624 : }
954 577 : }
955 :
956 577 : return NULL;
957 183996 : }
958 :
959 169196 : void *table_column_lookup (cloudsync_table_context *table, const char *col_name, bool is_merge, int *index) {
960 : DEBUG_DBFUNCTION("table_column_lookup %s", col_name);
961 :
962 315284 : for (int i=0; i<table->ncols; ++i) {
963 315284 : if (strcasecmp(table->col_name[i], col_name) == 0) {
964 169196 : if (index) *index = i;
965 169196 : return (is_merge) ? table->col_merge_stmt[i] : table->col_value_stmt[i];
966 : }
967 146088 : }
968 :
969 0 : if (index) *index = -1;
970 0 : return NULL;
971 169196 : }
972 :
973 282 : int table_remove (cloudsync_context *data, cloudsync_table_context *table) {
974 282 : const char *table_name = table->name;
975 : DEBUG_DBFUNCTION("table_remove %s", table_name);
976 :
977 332 : for (int i = 0; i < data->tables_count; ++i) {
978 332 : cloudsync_table_context *t = data->tables[i];
979 :
980 : // pointer compare is fastest but fallback to strcasecmp if not same pointer
981 332 : if ((t == table) || ((strcasecmp(t->name, table_name) == 0))) {
982 282 : int last = data->tables_count - 1;
983 282 : data->tables[i] = data->tables[last]; // move last into the hole (keeps array dense)
984 282 : data->tables[last] = NULL; // NULLify tail (as an extra security measure)
985 282 : data->tables_count--;
986 282 : return data->tables_count;
987 : }
988 50 : }
989 :
990 0 : return -1;
991 282 : }
992 :
993 1082 : int table_add_to_context_cb (void *xdata, int ncols, char **values, char **names) {
994 1082 : cloudsync_table_context *table = (cloudsync_table_context *)xdata;
995 1082 : cloudsync_context *data = table->context;
996 :
997 1082 : int index = table->ncols;
998 2163 : for (int i=0; i<ncols; i+=2) {
999 1082 : const char *name = values[i];
1000 1082 : int cid = (int)strtol(values[i+1], NULL, 0);
1001 :
1002 1082 : table->col_id[index] = cid;
1003 1082 : table->col_name[index] = cloudsync_string_dup_lowercase(name);
1004 1082 : if (!table->col_name[index]) goto error;
1005 :
1006 1082 : char *sql = table_build_mergeinsert_sql(table, name);
1007 1082 : if (!sql) goto error;
1008 : DEBUG_SQL("col_merge_stmt[%d]: %s", index, sql);
1009 :
1010 1082 : int rc = databasevm_prepare(data, sql, (void **)&table->col_merge_stmt[index], DBFLAG_PERSISTENT);
1011 1082 : cloudsync_memory_free(sql);
1012 1082 : if (rc != DBRES_OK) goto error;
1013 1081 : if (!table->col_merge_stmt[index]) goto error;
1014 :
1015 1081 : sql = table_build_value_sql(table, name);
1016 1081 : if (!sql) goto error;
1017 : DEBUG_SQL("col_value_stmt[%d]: %s", index, sql);
1018 :
1019 1081 : rc = databasevm_prepare(data, sql, (void **)&table->col_value_stmt[index], DBFLAG_PERSISTENT);
1020 1081 : cloudsync_memory_free(sql);
1021 1081 : if (rc != DBRES_OK) goto error;
1022 1081 : if (!table->col_value_stmt[index]) goto error;
1023 1081 : }
1024 1081 : table->ncols += 1;
1025 :
1026 1081 : return 0;
1027 :
1028 : error:
1029 : // clean up partially-initialized entry at index
1030 1 : if (table->col_name[index]) {cloudsync_memory_free(table->col_name[index]); table->col_name[index] = NULL;}
1031 1 : if (table->col_merge_stmt[index]) {databasevm_finalize(table->col_merge_stmt[index]); table->col_merge_stmt[index] = NULL;}
1032 1 : if (table->col_value_stmt[index]) {databasevm_finalize(table->col_value_stmt[index]); table->col_value_stmt[index] = NULL;}
1033 1 : return 1;
1034 1082 : }
1035 :
1036 283 : bool table_ensure_capacity (cloudsync_context *data) {
1037 283 : if (data->tables_count < data->tables_cap) return true;
1038 :
1039 0 : int new_cap = data->tables_cap ? data->tables_cap * 2 : CLOUDSYNC_INIT_NTABLES;
1040 0 : size_t bytes = (size_t)new_cap * sizeof(*data->tables);
1041 0 : void *p = cloudsync_memory_realloc(data->tables, bytes);
1042 0 : if (!p) return false;
1043 :
1044 0 : data->tables = (cloudsync_table_context **)p;
1045 0 : data->tables_cap = new_cap;
1046 0 : return true;
1047 283 : }
1048 :
1049 288 : bool table_add_to_context (cloudsync_context *data, table_algo algo, const char *table_name) {
1050 : DEBUG_DBFUNCTION("cloudsync_context_add_table %s", table_name);
1051 :
1052 : // Check if table already initialized in this connection's context.
1053 : // Note: This prevents same-connection duplicate initialization.
1054 : // SQLite clients cannot distinguish schemas, so having 'public.users'
1055 : // and 'auth.users' would cause sync ambiguity. Users should avoid
1056 : // initializing tables with the same name in different schemas.
1057 : // If two concurrent connections initialize tables with the same name
1058 : // in different schemas, the behavior is undefined.
1059 288 : cloudsync_table_context *table = table_lookup(data, table_name);
1060 288 : if (table) return true;
1061 :
1062 : // check for space availability
1063 283 : if (!table_ensure_capacity(data)) return false;
1064 :
1065 : // setup a new table
1066 283 : table = table_create(data, table_name, algo);
1067 283 : if (!table) return false;
1068 :
1069 : // fill remaining metadata in the table
1070 283 : int count = database_count_pk(data, table_name, false, table->schema);
1071 283 : if (count < 0) {cloudsync_set_dberror(data); goto abort_add_table;}
1072 283 : table->npks = count;
1073 283 : if (table->npks == 0) {
1074 : #if CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
1075 0 : goto abort_add_table;
1076 : #else
1077 : table->rowid_only = true;
1078 : table->npks = 1; // rowid
1079 : #endif
1080 : }
1081 :
1082 283 : int ncols = database_count_nonpk(data, table_name, table->schema);
1083 283 : if (ncols < 0) {cloudsync_set_dberror(data); goto abort_add_table;}
1084 283 : int rc = table_add_stmts(table, ncols);
1085 283 : if (rc != DBRES_OK) goto abort_add_table;
1086 :
1087 : // a table with only pk(s) is totally legal
1088 283 : if (ncols > 0) {
1089 244 : table->col_name = (char **)cloudsync_memory_zeroalloc((uint64_t)(sizeof(char *) * ncols));
1090 244 : if (!table->col_name) goto abort_add_table;
1091 :
1092 244 : table->col_id = (int *)cloudsync_memory_zeroalloc((uint64_t)(sizeof(int) * ncols));
1093 244 : if (!table->col_id) goto abort_add_table;
1094 :
1095 244 : table->col_merge_stmt = (dbvm_t **)cloudsync_memory_zeroalloc((uint64_t)(sizeof(void *) * ncols));
1096 244 : if (!table->col_merge_stmt) goto abort_add_table;
1097 :
1098 244 : table->col_value_stmt = (dbvm_t **)cloudsync_memory_zeroalloc((uint64_t)(sizeof(void *) * ncols));
1099 244 : if (!table->col_value_stmt) goto abort_add_table;
1100 :
1101 244 : table->col_algo = (col_algo_t *)cloudsync_memory_zeroalloc((uint64_t)(sizeof(col_algo_t) * ncols));
1102 244 : if (!table->col_algo) goto abort_add_table;
1103 :
1104 244 : table->col_delimiter = (char **)cloudsync_memory_zeroalloc((uint64_t)(sizeof(char *) * ncols));
1105 244 : if (!table->col_delimiter) goto abort_add_table;
1106 :
1107 : // Pass empty string when schema is NULL; SQL will fall back to current_schema()
1108 244 : const char *schema = table->schema ? table->schema : "";
1109 488 : char *sql = cloudsync_memory_mprintf(SQL_PRAGMA_TABLEINFO_LIST_NONPK_NAME_CID,
1110 244 : table_name, schema, table_name, schema);
1111 244 : if (!sql) goto abort_add_table;
1112 244 : rc = database_exec_callback(data, sql, table_add_to_context_cb, (void *)table);
1113 244 : cloudsync_memory_free(sql);
1114 244 : if (rc == DBRES_ABORT) goto abort_add_table;
1115 243 : }
1116 :
1117 : // append newly created table
1118 282 : data->tables[data->tables_count++] = table;
1119 282 : return true;
1120 :
1121 : abort_add_table:
1122 1 : table_free(table);
1123 1 : return false;
1124 288 : }
1125 :
1126 0 : dbvm_t *cloudsync_colvalue_stmt (cloudsync_context *data, const char *tbl_name, bool *persistent) {
1127 0 : dbvm_t *vm = NULL;
1128 0 : *persistent = false;
1129 :
1130 0 : cloudsync_table_context *table = table_lookup(data, tbl_name);
1131 0 : if (table) {
1132 0 : char *col_name = NULL;
1133 0 : if (table->ncols > 0) {
1134 0 : col_name = table->col_name[0];
1135 : // retrieve col_value precompiled statement
1136 0 : vm = table_column_lookup(table, col_name, false, NULL);
1137 0 : *persistent = true;
1138 0 : } else {
1139 0 : char *sql = table_build_value_sql(table, "*");
1140 0 : databasevm_prepare(data, sql, (void **)&vm, 0);
1141 0 : cloudsync_memory_free(sql);
1142 0 : *persistent = false;
1143 : }
1144 0 : }
1145 :
1146 0 : return vm;
1147 : }
1148 :
1149 6876 : bool table_enabled (cloudsync_table_context *table) {
1150 6876 : return table->enabled;
1151 : }
1152 :
1153 6 : void table_set_enabled (cloudsync_table_context *table, bool value) {
1154 6 : table->enabled = value;
1155 6 : }
1156 :
1157 19850 : int table_count_cols (cloudsync_table_context *table) {
1158 19850 : return table->ncols;
1159 : }
1160 :
1161 6766 : int table_count_pks (cloudsync_table_context *table) {
1162 6766 : return table->npks;
1163 : }
1164 :
1165 32916 : const char *table_colname (cloudsync_table_context *table, int index) {
1166 32916 : return table->col_name[index];
1167 : }
1168 :
1169 3586 : bool table_pk_exists (cloudsync_table_context *table, const char *value, size_t len) {
1170 : // check if a row with the same primary key already exists
1171 : // if so, this means the row might have been previously deleted (sentinel)
1172 3586 : return (dbvm_count(table->meta_pkexists_stmt, value, len, DBTYPE_BLOB) > 0);
1173 : }
1174 :
1175 0 : char **table_pknames (cloudsync_table_context *table) {
1176 0 : return table->pk_name;
1177 : }
1178 :
1179 23 : void table_set_pknames (cloudsync_table_context *table, char **pknames) {
1180 23 : table_pknames_free(table->pk_name, table->npks);
1181 23 : table->pk_name = pknames;
1182 23 : }
1183 :
1184 49228 : bool table_algo_isgos (cloudsync_table_context *table) {
1185 49228 : return (table->algo == table_algo_crdt_gos);
1186 : }
1187 :
1188 0 : const char *table_schema (cloudsync_table_context *table) {
1189 0 : return table->schema;
1190 : }
1191 :
1192 : // MARK: - Merge Insert -
1193 :
1194 46078 : int64_t merge_get_local_cl (cloudsync_table_context *table, const char *pk, int pklen) {
1195 46078 : dbvm_t *vm = table->meta_local_cl_stmt;
1196 46078 : int64_t result = -1;
1197 :
1198 46078 : int rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
1199 46078 : if (rc != DBRES_OK) goto cleanup;
1200 :
1201 46078 : rc = databasevm_bind_blob(vm, 2, (const void *)pk, pklen);
1202 46078 : if (rc != DBRES_OK) goto cleanup;
1203 :
1204 46078 : rc = databasevm_step(vm);
1205 46078 : if (rc == DBRES_ROW) result = database_column_int(vm, 0);
1206 0 : else if (rc == DBRES_DONE) result = 0;
1207 :
1208 : cleanup:
1209 46078 : if (result == -1) cloudsync_set_dberror(table->context);
1210 46078 : dbvm_reset(vm);
1211 46078 : return result;
1212 : }
1213 :
1214 45051 : int merge_get_col_version (cloudsync_table_context *table, const char *col_name, const char *pk, int pklen, int64_t *version) {
1215 45051 : dbvm_t *vm = table->meta_col_version_stmt;
1216 :
1217 45051 : int rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
1218 45051 : if (rc != DBRES_OK) goto cleanup;
1219 :
1220 45051 : rc = databasevm_bind_text(vm, 2, col_name, -1);
1221 45051 : if (rc != DBRES_OK) goto cleanup;
1222 :
1223 45051 : rc = databasevm_step(vm);
1224 69976 : if (rc == DBRES_ROW) {
1225 24925 : *version = database_column_int(vm, 0);
1226 24925 : rc = DBRES_OK;
1227 24925 : }
1228 :
1229 : cleanup:
1230 45051 : if ((rc != DBRES_OK) && (rc != DBRES_DONE)) cloudsync_set_dberror(table->context);
1231 45051 : dbvm_reset(vm);
1232 45051 : return rc;
1233 : }
1234 :
1235 25136 : int merge_set_winner_clock (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pk_len, const char *colname, int64_t col_version, int64_t db_version, const char *site_id, int site_len, int64_t seq, int64_t *rowid) {
1236 :
1237 : // get/set site_id
1238 25136 : dbvm_t *vm = data->getset_siteid_stmt;
1239 25136 : int rc = databasevm_bind_blob(vm, 1, (const void *)site_id, site_len);
1240 25136 : if (rc != DBRES_OK) goto cleanup_merge;
1241 :
1242 25136 : rc = databasevm_step(vm);
1243 25136 : if (rc != DBRES_ROW) goto cleanup_merge;
1244 :
1245 25136 : int64_t ord = database_column_int(vm, 0);
1246 25136 : dbvm_reset(vm);
1247 :
1248 25136 : vm = table->meta_winner_clock_stmt;
1249 25136 : rc = databasevm_bind_blob(vm, 1, (const void *)pk, pk_len);
1250 25136 : if (rc != DBRES_OK) goto cleanup_merge;
1251 :
1252 25136 : rc = databasevm_bind_text(vm, 2, (colname) ? colname : CLOUDSYNC_TOMBSTONE_VALUE, -1);
1253 25136 : if (rc != DBRES_OK) goto cleanup_merge;
1254 :
1255 25136 : rc = databasevm_bind_int(vm, 3, col_version);
1256 25136 : if (rc != DBRES_OK) goto cleanup_merge;
1257 :
1258 25136 : rc = databasevm_bind_int(vm, 4, db_version);
1259 25136 : if (rc != DBRES_OK) goto cleanup_merge;
1260 :
1261 25136 : rc = databasevm_bind_int(vm, 5, seq);
1262 25136 : if (rc != DBRES_OK) goto cleanup_merge;
1263 :
1264 25136 : rc = databasevm_bind_int(vm, 6, ord);
1265 25136 : if (rc != DBRES_OK) goto cleanup_merge;
1266 :
1267 25136 : rc = databasevm_step(vm);
1268 50272 : if (rc == DBRES_ROW) {
1269 25136 : *rowid = database_column_int(vm, 0);
1270 25136 : rc = DBRES_OK;
1271 25136 : }
1272 :
1273 : cleanup_merge:
1274 25136 : if (rc != DBRES_OK) cloudsync_set_dberror(data);
1275 25136 : dbvm_reset(vm);
1276 25136 : return rc;
1277 : }
1278 :
1279 : // MARK: - Deferred column-batch merge functions -
1280 :
1281 21337 : static int merge_pending_add (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, const char *col_name, dbvalue_t *col_value, int64_t col_version, int64_t db_version, const char *site_id, int site_len, int64_t seq) {
1282 21337 : merge_pending_batch *batch = data->pending_batch;
1283 :
1284 : // Store table and PK on first entry
1285 21337 : if (batch->table == NULL) {
1286 7699 : batch->table = table;
1287 7699 : batch->pk = (char *)cloudsync_memory_alloc(pklen);
1288 7699 : if (!batch->pk) return cloudsync_set_error(data, "merge_pending_add: out of memory for pk", DBRES_NOMEM);
1289 7699 : memcpy(batch->pk, pk, pklen);
1290 7699 : batch->pk_len = pklen;
1291 7699 : }
1292 :
1293 : // Ensure capacity
1294 21337 : if (batch->count >= batch->capacity) {
1295 504 : int new_cap = batch->capacity ? batch->capacity * 2 : 8;
1296 504 : merge_pending_entry *new_entries = (merge_pending_entry *)cloudsync_memory_realloc(batch->entries, new_cap * sizeof(merge_pending_entry));
1297 504 : if (!new_entries) return cloudsync_set_error(data, "merge_pending_add: out of memory for entries", DBRES_NOMEM);
1298 504 : batch->entries = new_entries;
1299 504 : batch->capacity = new_cap;
1300 504 : }
1301 :
1302 : // Resolve col_name to a stable pointer from the table context
1303 : // (the incoming col_name may point to VM-owned memory that gets freed on reset)
1304 21337 : int col_idx = -1;
1305 21337 : table_column_lookup(table, col_name, true, &col_idx);
1306 21337 : const char *stable_col_name = (col_idx >= 0) ? table_colname(table, col_idx) : NULL;
1307 21337 : if (!stable_col_name) return cloudsync_set_error(data, "merge_pending_add: column not found in table context", DBRES_ERROR);
1308 :
1309 21337 : merge_pending_entry *e = &batch->entries[batch->count];
1310 21337 : e->col_name = stable_col_name;
1311 21337 : e->col_value = col_value ? (dbvalue_t *)database_value_dup(col_value) : NULL;
1312 21337 : e->col_version = col_version;
1313 21337 : e->db_version = db_version;
1314 21337 : e->site_id_len = (site_len <= (int)sizeof(e->site_id)) ? site_len : (int)sizeof(e->site_id);
1315 21337 : memcpy(e->site_id, site_id, e->site_id_len);
1316 21337 : e->seq = seq;
1317 :
1318 21337 : batch->count++;
1319 21337 : return DBRES_OK;
1320 21337 : }
1321 :
1322 18904 : static void merge_pending_free_entries (merge_pending_batch *batch) {
1323 18904 : if (batch->entries) {
1324 35578 : for (int i = 0; i < batch->count; i++) {
1325 21337 : if (batch->entries[i].col_value) {
1326 21337 : database_value_free(batch->entries[i].col_value);
1327 21337 : batch->entries[i].col_value = NULL;
1328 21337 : }
1329 21337 : }
1330 14241 : }
1331 18904 : if (batch->pk) {
1332 7731 : cloudsync_memory_free(batch->pk);
1333 7731 : batch->pk = NULL;
1334 7731 : }
1335 18904 : batch->table = NULL;
1336 18904 : batch->pk_len = 0;
1337 18904 : batch->cl = 0;
1338 18904 : batch->sentinel_pending = false;
1339 18904 : batch->row_exists = false;
1340 18904 : batch->count = 0;
1341 18904 : }
1342 :
1343 18904 : static int merge_flush_pending (cloudsync_context *data) {
1344 18904 : merge_pending_batch *batch = data->pending_batch;
1345 18904 : if (!batch) return DBRES_OK;
1346 :
1347 18904 : int rc = DBRES_OK;
1348 18904 : bool flush_savepoint = false;
1349 :
1350 : // Nothing to write — handle sentinel-only case or skip
1351 18904 : if (batch->count == 0 && !(batch->sentinel_pending && batch->table)) {
1352 11173 : goto cleanup;
1353 : }
1354 :
1355 : // Wrap database operations in a savepoint so that on failure (e.g. RLS
1356 : // denial) the rollback properly releases all executor resources (open
1357 : // relations, snapshots, plan cache) acquired during the failed statement.
1358 7731 : flush_savepoint = (database_begin_savepoint(data, "merge_flush") == DBRES_OK);
1359 :
1360 7731 : if (batch->count == 0) {
1361 : // Sentinel with no winning columns (PK-only row)
1362 30 : dbvm_t *vm = batch->table->real_merge_sentinel_stmt;
1363 30 : rc = pk_decode_prikey(batch->pk, (size_t)batch->pk_len, pk_decode_bind_callback, vm);
1364 30 : if (rc < 0) {
1365 0 : cloudsync_set_dberror(data);
1366 0 : dbvm_reset(vm);
1367 0 : goto cleanup;
1368 : }
1369 30 : SYNCBIT_SET(data);
1370 30 : rc = databasevm_step(vm);
1371 30 : dbvm_reset(vm);
1372 30 : SYNCBIT_RESET(data);
1373 30 : if (rc == DBRES_DONE) rc = DBRES_OK;
1374 30 : if (rc != DBRES_OK) {
1375 0 : cloudsync_set_dberror(data);
1376 0 : goto cleanup;
1377 : }
1378 30 : goto cleanup;
1379 : }
1380 :
1381 : // Check if cached prepared statement can be reused
1382 7701 : cloudsync_table_context *table = batch->table;
1383 7701 : dbvm_t *vm = NULL;
1384 7701 : bool cache_hit = false;
1385 :
1386 14595 : if (batch->cached_vm &&
1387 7197 : batch->cached_row_exists == batch->row_exists &&
1388 6894 : batch->cached_col_count == batch->count) {
1389 6870 : cache_hit = true;
1390 26559 : for (int i = 0; i < batch->count; i++) {
1391 19715 : if (batch->cached_col_names[i] != batch->entries[i].col_name) {
1392 26 : cache_hit = false;
1393 26 : break;
1394 : }
1395 19689 : }
1396 6870 : }
1397 :
1398 7701 : if (cache_hit) {
1399 6844 : vm = batch->cached_vm;
1400 6844 : dbvm_reset(vm);
1401 6844 : } else {
1402 : // Invalidate old cache
1403 857 : if (batch->cached_vm) {
1404 353 : databasevm_finalize(batch->cached_vm);
1405 353 : batch->cached_vm = NULL;
1406 353 : }
1407 :
1408 : // Build multi-column SQL
1409 857 : const char **colnames = (const char **)cloudsync_memory_alloc(batch->count * sizeof(const char *));
1410 857 : if (!colnames) {
1411 0 : rc = cloudsync_set_error(data, "merge_flush_pending: out of memory", DBRES_NOMEM);
1412 0 : goto cleanup;
1413 : }
1414 2505 : for (int i = 0; i < batch->count; i++) {
1415 1648 : colnames[i] = batch->entries[i].col_name;
1416 1648 : }
1417 :
1418 857 : char *sql = batch->row_exists
1419 417 : ? sql_build_update_pk_and_multi_cols(data, table->name, colnames, batch->count, table->schema)
1420 440 : : sql_build_upsert_pk_and_multi_cols(data, table->name, colnames, batch->count, table->schema);
1421 857 : cloudsync_memory_free(colnames);
1422 :
1423 857 : if (!sql) {
1424 0 : rc = cloudsync_set_error(data, "merge_flush_pending: unable to build multi-column upsert SQL", DBRES_ERROR);
1425 0 : goto cleanup;
1426 : }
1427 :
1428 857 : rc = databasevm_prepare(data, sql, &vm, 0);
1429 857 : cloudsync_memory_free(sql);
1430 857 : if (rc != DBRES_OK) {
1431 0 : rc = cloudsync_set_error(data, "merge_flush_pending: unable to prepare statement", rc);
1432 0 : goto cleanup;
1433 : }
1434 :
1435 : // Update cache
1436 857 : batch->cached_vm = vm;
1437 857 : batch->cached_row_exists = batch->row_exists;
1438 857 : batch->cached_col_count = batch->count;
1439 : // Reallocate cached_col_names if needed
1440 857 : if (batch->cached_col_count > 0) {
1441 857 : const char **new_names = (const char **)cloudsync_memory_realloc(
1442 857 : batch->cached_col_names, batch->count * sizeof(const char *));
1443 857 : if (new_names) {
1444 2505 : for (int i = 0; i < batch->count; i++) {
1445 1648 : new_names[i] = batch->entries[i].col_name;
1446 1648 : }
1447 857 : batch->cached_col_names = new_names;
1448 857 : }
1449 857 : }
1450 : }
1451 :
1452 : // Bind PKs (positions 1..npks)
1453 7701 : int npks = pk_decode_prikey(batch->pk, (size_t)batch->pk_len, pk_decode_bind_callback, vm);
1454 7701 : if (npks < 0) {
1455 0 : cloudsync_set_dberror(data);
1456 0 : dbvm_reset(vm);
1457 0 : rc = DBRES_ERROR;
1458 0 : goto cleanup;
1459 : }
1460 :
1461 : // Bind column values (positions npks+1..npks+count)
1462 29038 : for (int i = 0; i < batch->count; i++) {
1463 21337 : merge_pending_entry *e = &batch->entries[i];
1464 21337 : int bind_idx = npks + 1 + i;
1465 21337 : if (e->col_value) {
1466 21337 : rc = databasevm_bind_value(vm, bind_idx, e->col_value);
1467 21337 : } else {
1468 0 : rc = databasevm_bind_null(vm, bind_idx);
1469 : }
1470 21337 : if (rc != DBRES_OK) {
1471 0 : cloudsync_set_dberror(data);
1472 0 : dbvm_reset(vm);
1473 0 : goto cleanup;
1474 : }
1475 21337 : }
1476 :
1477 : // Execute with SYNCBIT and GOS handling
1478 7701 : if (table->algo == table_algo_crdt_gos) table->enabled = 0;
1479 7701 : SYNCBIT_SET(data);
1480 7701 : rc = databasevm_step(vm);
1481 7701 : dbvm_reset(vm);
1482 7701 : SYNCBIT_RESET(data);
1483 7701 : if (table->algo == table_algo_crdt_gos) table->enabled = 1;
1484 :
1485 7701 : if (rc != DBRES_DONE) {
1486 3 : cloudsync_set_dberror(data);
1487 3 : goto cleanup;
1488 : }
1489 7698 : rc = DBRES_OK;
1490 :
1491 : // Call merge_set_winner_clock for each buffered entry
1492 7698 : int64_t rowid = 0;
1493 29027 : for (int i = 0; i < batch->count; i++) {
1494 21329 : merge_pending_entry *e = &batch->entries[i];
1495 42658 : int clock_rc = merge_set_winner_clock(data, table, batch->pk, batch->pk_len,
1496 21329 : e->col_name, e->col_version, e->db_version,
1497 21329 : (const char *)e->site_id, e->site_id_len,
1498 21329 : e->seq, &rowid);
1499 21329 : if (clock_rc != DBRES_OK) {
1500 0 : rc = clock_rc;
1501 0 : goto cleanup;
1502 : }
1503 29027 : }
1504 :
1505 : cleanup:
1506 18904 : merge_pending_free_entries(batch);
1507 18904 : if (flush_savepoint) {
1508 7731 : if (rc == DBRES_OK) database_commit_savepoint(data, "merge_flush");
1509 3 : else database_rollback_savepoint(data, "merge_flush");
1510 7731 : }
1511 18904 : return rc;
1512 18904 : }
1513 :
1514 3167 : int merge_insert_col (cloudsync_context *data, cloudsync_table_context *table, const void *pk, int pklen, const char *col_name, dbvalue_t *col_value, int64_t col_version, int64_t db_version, const char *site_id, int site_len, int64_t seq, int64_t *rowid) {
1515 : int index;
1516 3167 : dbvm_t *vm = table_column_lookup(table, col_name, true, &index);
1517 3167 : if (vm == NULL) return cloudsync_set_error(data, "Unable to retrieve column merge precompiled statement in merge_insert_col", DBRES_MISUSE);
1518 :
1519 : // INSERT INTO table (pk1, pk2, col_name) VALUES (?, ?, ?) ON CONFLICT DO UPDATE SET col_name=?;"
1520 :
1521 : // bind primary key(s)
1522 3167 : int rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, vm);
1523 3167 : if (rc < 0) {
1524 0 : cloudsync_set_dberror(data);
1525 0 : dbvm_reset(vm);
1526 0 : return rc;
1527 : }
1528 :
1529 : // bind value (always bind all expected parameters for correct prepared statement handling)
1530 3167 : if (col_value) {
1531 3167 : rc = databasevm_bind_value(vm, table->npks+1, col_value);
1532 3167 : if (rc == DBRES_OK) rc = databasevm_bind_value(vm, table->npks+2, col_value);
1533 3167 : } else {
1534 0 : rc = databasevm_bind_null(vm, table->npks+1);
1535 0 : if (rc == DBRES_OK) rc = databasevm_bind_null(vm, table->npks+2);
1536 : }
1537 3167 : if (rc != DBRES_OK) {
1538 0 : cloudsync_set_dberror(data);
1539 0 : dbvm_reset(vm);
1540 0 : return rc;
1541 : }
1542 :
1543 : // perform real operation and disable triggers
1544 :
1545 : // in case of GOS we reused the table->col_merge_stmt statement
1546 : // which looks like: INSERT INTO table (pk1, pk2, col_name) VALUES (?, ?, ?) ON CONFLICT DO UPDATE SET col_name=?;"
1547 : // but the UPDATE in the CONFLICT statement would return SQLITE_CONSTRAINT because the trigger raises the error
1548 : // the trick is to disable that trigger before executing the statement
1549 3167 : if (table->algo == table_algo_crdt_gos) table->enabled = 0;
1550 3167 : SYNCBIT_SET(data);
1551 3167 : rc = databasevm_step(vm);
1552 : DEBUG_MERGE("merge_insert(%02x%02x): %s (%d)", data->site_id[UUID_LEN-2], data->site_id[UUID_LEN-1], databasevm_sql(vm), rc);
1553 3167 : dbvm_reset(vm);
1554 3167 : SYNCBIT_RESET(data);
1555 3167 : if (table->algo == table_algo_crdt_gos) table->enabled = 1;
1556 :
1557 3167 : if (rc != DBRES_DONE) {
1558 0 : cloudsync_set_dberror(data);
1559 0 : return rc;
1560 : }
1561 :
1562 3167 : return merge_set_winner_clock(data, table, pk, pklen, col_name, col_version, db_version, site_id, site_len, seq, rowid);
1563 3167 : }
1564 :
1565 169 : int merge_delete (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, const char *colname, int64_t cl, int64_t db_version, const char *site_id, int site_len, int64_t seq, int64_t *rowid) {
1566 169 : int rc = DBRES_OK;
1567 :
1568 : // reset return value
1569 169 : *rowid = 0;
1570 :
1571 : // bind pk
1572 169 : dbvm_t *vm = table->real_merge_delete_stmt;
1573 169 : rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, vm);
1574 169 : if (rc < 0) {
1575 0 : rc = cloudsync_set_dberror(data);
1576 0 : dbvm_reset(vm);
1577 0 : return rc;
1578 : }
1579 :
1580 : // perform real operation and disable triggers
1581 169 : SYNCBIT_SET(data);
1582 169 : rc = databasevm_step(vm);
1583 : DEBUG_MERGE("merge_delete(%02x%02x): %s (%d)", data->site_id[UUID_LEN-2], data->site_id[UUID_LEN-1], databasevm_sql(vm), rc);
1584 169 : dbvm_reset(vm);
1585 169 : SYNCBIT_RESET(data);
1586 169 : if (rc == DBRES_DONE) rc = DBRES_OK;
1587 169 : if (rc != DBRES_OK) {
1588 0 : cloudsync_set_dberror(data);
1589 0 : return rc;
1590 : }
1591 :
1592 169 : rc = merge_set_winner_clock(data, table, pk, pklen, colname, cl, db_version, site_id, site_len, seq, rowid);
1593 169 : if (rc != DBRES_OK) return rc;
1594 :
1595 : // drop clocks _after_ setting the winner clock so we don't lose track of the max db_version!!
1596 : // this must never come before `set_winner_clock`
1597 169 : vm = table->meta_merge_delete_drop;
1598 169 : rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
1599 169 : if (rc == DBRES_OK) rc = databasevm_step(vm);
1600 169 : dbvm_reset(vm);
1601 :
1602 169 : if (rc == DBRES_DONE) rc = DBRES_OK;
1603 169 : if (rc != DBRES_OK) cloudsync_set_dberror(data);
1604 169 : return rc;
1605 169 : }
1606 :
1607 59 : int merge_zeroclock_on_resurrect(cloudsync_table_context *table, int64_t db_version, const char *pk, int pklen) {
1608 59 : dbvm_t *vm = table->meta_zero_clock_stmt;
1609 :
1610 59 : int rc = databasevm_bind_int(vm, 1, db_version);
1611 59 : if (rc != DBRES_OK) goto cleanup;
1612 :
1613 59 : rc = databasevm_bind_blob(vm, 2, (const void *)pk, pklen);
1614 59 : if (rc != DBRES_OK) goto cleanup;
1615 :
1616 59 : rc = databasevm_step(vm);
1617 59 : if (rc == DBRES_DONE) rc = DBRES_OK;
1618 :
1619 : cleanup:
1620 59 : if (rc != DBRES_OK) cloudsync_set_dberror(table->context);
1621 59 : dbvm_reset(vm);
1622 59 : return rc;
1623 : }
1624 :
1625 : // executed only if insert_cl == local_cl
1626 45051 : int merge_did_cid_win (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, dbvalue_t *insert_value, const char *site_id, int site_len, const char *col_name, int64_t col_version, bool *didwin_flag) {
1627 :
1628 45051 : if (col_name == NULL) col_name = CLOUDSYNC_TOMBSTONE_VALUE;
1629 :
1630 : int64_t local_version;
1631 45051 : int rc = merge_get_col_version(table, col_name, pk, pklen, &local_version);
1632 45051 : if (rc == DBRES_DONE) {
1633 : // no rows returned, the incoming change wins if there's nothing there locally
1634 20126 : *didwin_flag = true;
1635 20126 : return DBRES_OK;
1636 : }
1637 24925 : if (rc != DBRES_OK) return rc;
1638 :
1639 : // rc == DBRES_OK, means that a row with a version exists
1640 24925 : if (local_version != col_version) {
1641 1977 : if (col_version > local_version) {*didwin_flag = true; return DBRES_OK;}
1642 744 : if (col_version < local_version) {*didwin_flag = false; return DBRES_OK;}
1643 0 : }
1644 :
1645 : // rc == DBRES_ROW and col_version == local_version, need to compare values
1646 :
1647 : // retrieve col_value precompiled statement
1648 22948 : bool is_block_col = block_is_block_colname(col_name) && table_has_block_cols(table);
1649 : dbvm_t *vm;
1650 22948 : if (is_block_col) {
1651 : // Block column: read value from blocks table (pk + col_name bindings)
1652 43 : vm = table_block_value_read_stmt(table);
1653 43 : if (!vm) return cloudsync_set_error(data, "Unable to retrieve block value read statement in merge_did_cid_win", DBRES_ERROR);
1654 43 : rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
1655 43 : if (rc != DBRES_OK) { dbvm_reset(vm); return cloudsync_set_dberror(data); }
1656 43 : rc = databasevm_bind_text(vm, 2, col_name, -1);
1657 43 : if (rc != DBRES_OK) { dbvm_reset(vm); return cloudsync_set_dberror(data); }
1658 43 : } else {
1659 22905 : vm = table_column_lookup(table, col_name, false, NULL);
1660 22905 : if (!vm) return cloudsync_set_error(data, "Unable to retrieve column value precompiled statement in merge_did_cid_win", DBRES_ERROR);
1661 :
1662 : // bind primary key values
1663 22905 : rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, (void *)vm);
1664 22905 : if (rc < 0) {
1665 0 : rc = cloudsync_set_dberror(data);
1666 0 : dbvm_reset(vm);
1667 0 : return rc;
1668 : }
1669 : }
1670 :
1671 : // execute vm
1672 : dbvalue_t *local_value;
1673 22948 : rc = databasevm_step(vm);
1674 22948 : if (rc == DBRES_DONE) {
1675 : // meta entry exists but the actual value is missing
1676 : // we should allow the value_compare function to make a decision
1677 : // value_compare has been modified to handle the case where lvalue is NULL
1678 2 : local_value = NULL;
1679 2 : rc = DBRES_OK;
1680 22948 : } else if (rc == DBRES_ROW) {
1681 22946 : local_value = database_column_value(vm, 0);
1682 22946 : rc = DBRES_OK;
1683 22946 : } else {
1684 0 : goto cleanup;
1685 : }
1686 :
1687 : // compare values
1688 22948 : int ret = dbutils_value_compare(insert_value, local_value);
1689 : // reset after compare, otherwise local value would be deallocated
1690 22948 : dbvm_reset(vm);
1691 22948 : vm = NULL;
1692 :
1693 22948 : bool compare_site_id = (ret == 0 && data->merge_equal_values == true);
1694 22948 : if (!compare_site_id) {
1695 22948 : *didwin_flag = (ret > 0);
1696 22948 : goto cleanup;
1697 : }
1698 :
1699 : // values are the same and merge_equal_values is true
1700 0 : vm = table->meta_site_id_stmt;
1701 0 : rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
1702 0 : if (rc != DBRES_OK) goto cleanup;
1703 :
1704 0 : rc = databasevm_bind_text(vm, 2, col_name, -1);
1705 0 : if (rc != DBRES_OK) goto cleanup;
1706 :
1707 0 : rc = databasevm_step(vm);
1708 0 : if (rc == DBRES_ROW) {
1709 0 : const void *local_site_id = database_column_blob(vm, 0, NULL);
1710 0 : if (!local_site_id) {
1711 0 : dbvm_reset(vm);
1712 0 : return cloudsync_set_error(data, "NULL site_id in cloudsync table, table is probably corrupted", DBRES_ERROR);
1713 : }
1714 0 : ret = memcmp(site_id, local_site_id, site_len);
1715 0 : *didwin_flag = (ret > 0);
1716 0 : dbvm_reset(vm);
1717 0 : return DBRES_OK;
1718 : }
1719 :
1720 : // handle error condition here
1721 0 : dbvm_reset(vm);
1722 0 : return cloudsync_set_error(data, "Unable to find site_id for previous change, cloudsync table is probably corrupted", DBRES_ERROR);
1723 :
1724 : cleanup:
1725 22948 : if (rc != DBRES_OK) cloudsync_set_dberror(data);
1726 22948 : dbvm_reset(vm);
1727 22948 : return rc;
1728 45051 : }
1729 :
1730 59 : int merge_sentinel_only_insert (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, int64_t cl, int64_t db_version, const char *site_id, int site_len, int64_t seq, int64_t *rowid) {
1731 :
1732 : // reset return value
1733 59 : *rowid = 0;
1734 :
1735 59 : if (data->pending_batch == NULL) {
1736 : // Immediate mode: execute base table INSERT
1737 0 : dbvm_t *vm = table->real_merge_sentinel_stmt;
1738 0 : int rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, vm);
1739 0 : if (rc < 0) {
1740 0 : rc = cloudsync_set_dberror(data);
1741 0 : dbvm_reset(vm);
1742 0 : return rc;
1743 : }
1744 :
1745 0 : SYNCBIT_SET(data);
1746 0 : rc = databasevm_step(vm);
1747 0 : dbvm_reset(vm);
1748 0 : SYNCBIT_RESET(data);
1749 0 : if (rc == DBRES_DONE) rc = DBRES_OK;
1750 0 : if (rc != DBRES_OK) {
1751 0 : cloudsync_set_dberror(data);
1752 0 : return rc;
1753 : }
1754 0 : } else {
1755 : // Batch mode: skip base table INSERT, the batch flush will create the row
1756 59 : merge_pending_batch *batch = data->pending_batch;
1757 59 : batch->sentinel_pending = true;
1758 59 : if (batch->table == NULL) {
1759 32 : batch->table = table;
1760 32 : batch->pk = (char *)cloudsync_memory_alloc(pklen);
1761 32 : if (!batch->pk) return cloudsync_set_error(data, "merge_sentinel_only_insert: out of memory for pk", DBRES_NOMEM);
1762 32 : memcpy(batch->pk, pk, pklen);
1763 32 : batch->pk_len = pklen;
1764 32 : }
1765 : }
1766 :
1767 : // Metadata operations always execute regardless of batch mode
1768 59 : int rc = merge_zeroclock_on_resurrect(table, db_version, pk, pklen);
1769 59 : if (rc != DBRES_OK) return rc;
1770 :
1771 59 : return merge_set_winner_clock(data, table, pk, pklen, NULL, cl, db_version, site_id, site_len, seq, rowid);
1772 59 : }
1773 :
1774 : // MARK: - Block-level merge helpers -
1775 :
1776 : // Store a block value in the blocks table
1777 379 : static int block_store_value (cloudsync_context *data, cloudsync_table_context *table, const void *pk, int pklen, const char *block_colname, dbvalue_t *col_value) {
1778 379 : dbvm_t *vm = table->block_value_write_stmt;
1779 379 : if (!vm) return cloudsync_set_error(data, "block_store_value: blocks table not initialized", DBRES_MISUSE);
1780 :
1781 379 : int rc = databasevm_bind_blob(vm, 1, pk, pklen);
1782 379 : if (rc != DBRES_OK) goto cleanup;
1783 379 : rc = databasevm_bind_text(vm, 2, block_colname, -1);
1784 379 : if (rc != DBRES_OK) goto cleanup;
1785 379 : if (col_value) {
1786 379 : rc = databasevm_bind_value(vm, 3, col_value);
1787 379 : } else {
1788 0 : rc = databasevm_bind_null(vm, 3);
1789 : }
1790 379 : if (rc != DBRES_OK) goto cleanup;
1791 :
1792 379 : rc = databasevm_step(vm);
1793 379 : if (rc == DBRES_DONE) rc = DBRES_OK;
1794 :
1795 : cleanup:
1796 379 : if (rc != DBRES_OK) cloudsync_set_dberror(data);
1797 379 : databasevm_reset(vm);
1798 379 : return rc;
1799 379 : }
1800 :
1801 : // Delete a block value from the blocks table
1802 74 : static int block_delete_value (cloudsync_context *data, cloudsync_table_context *table, const void *pk, int pklen, const char *block_colname) {
1803 74 : dbvm_t *vm = table->block_value_delete_stmt;
1804 74 : if (!vm) return cloudsync_set_error(data, "block_delete_value: blocks table not initialized", DBRES_MISUSE);
1805 :
1806 74 : int rc = databasevm_bind_blob(vm, 1, pk, pklen);
1807 74 : if (rc != DBRES_OK) goto cleanup;
1808 74 : rc = databasevm_bind_text(vm, 2, block_colname, -1);
1809 74 : if (rc != DBRES_OK) goto cleanup;
1810 :
1811 74 : rc = databasevm_step(vm);
1812 74 : if (rc == DBRES_DONE) rc = DBRES_OK;
1813 :
1814 : cleanup:
1815 74 : if (rc != DBRES_OK) cloudsync_set_dberror(data);
1816 74 : databasevm_reset(vm);
1817 74 : return rc;
1818 74 : }
1819 :
1820 : // Materialize all alive blocks for a base column into the base table
1821 462 : int block_materialize_column (cloudsync_context *data, cloudsync_table_context *table, const void *pk, int pklen, const char *base_col_name) {
1822 462 : if (!table->block_list_stmt) return cloudsync_set_error(data, "block_materialize_column: blocks table not initialized", DBRES_MISUSE);
1823 :
1824 : // Find column index and delimiter
1825 462 : int col_idx = -1;
1826 468 : for (int i = 0; i < table->ncols; i++) {
1827 468 : if (strcasecmp(table->col_name[i], base_col_name) == 0) {
1828 462 : col_idx = i;
1829 462 : break;
1830 : }
1831 6 : }
1832 462 : if (col_idx < 0) return cloudsync_set_error(data, "block_materialize_column: column not found", DBRES_ERROR);
1833 462 : const char *delimiter = table->col_delimiter[col_idx] ? table->col_delimiter[col_idx] : BLOCK_DEFAULT_DELIMITER;
1834 :
1835 : // Build the LIKE pattern for block col_names: "base_col\x1F%"
1836 462 : char *like_pattern = block_build_colname(base_col_name, "%");
1837 462 : if (!like_pattern) return DBRES_NOMEM;
1838 :
1839 : // Query alive blocks from blocks table joined with metadata
1840 : // block_list_stmt: SELECT b.col_value FROM blocks b JOIN meta m
1841 : // ON b.pk = m.pk AND b.col_name = m.col_name
1842 : // WHERE b.pk = ? AND b.col_name LIKE ? AND m.col_version % 2 = 1
1843 : // ORDER BY b.col_name
1844 462 : dbvm_t *vm = table->block_list_stmt;
1845 462 : int rc = databasevm_bind_blob(vm, 1, pk, pklen);
1846 462 : if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_reset(vm); return rc; }
1847 462 : rc = databasevm_bind_text(vm, 2, like_pattern, -1);
1848 462 : if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_reset(vm); return rc; }
1849 : // Bind pk again for the join condition (parameter 3)
1850 462 : rc = databasevm_bind_blob(vm, 3, pk, pklen);
1851 462 : if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_reset(vm); return rc; }
1852 462 : rc = databasevm_bind_text(vm, 4, like_pattern, -1);
1853 462 : if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_reset(vm); return rc; }
1854 :
1855 : // Collect block values
1856 462 : const char **block_values = NULL;
1857 462 : int block_count = 0;
1858 462 : int block_cap = 0;
1859 :
1860 22695 : while ((rc = databasevm_step(vm)) == DBRES_ROW) {
1861 22233 : const char *value = database_column_text(vm, 0);
1862 22233 : if (block_count >= block_cap) {
1863 1076 : int new_cap = block_cap ? block_cap * 2 : 16;
1864 1076 : const char **new_arr = (const char **)cloudsync_memory_realloc((void *)block_values, (uint64_t)(new_cap * sizeof(char *)));
1865 1076 : if (!new_arr) { rc = DBRES_NOMEM; break; }
1866 1076 : block_values = new_arr;
1867 1076 : block_cap = new_cap;
1868 1076 : }
1869 22233 : block_values[block_count] = value ? cloudsync_string_dup(value) : cloudsync_string_dup("");
1870 22233 : block_count++;
1871 : }
1872 462 : databasevm_reset(vm);
1873 462 : cloudsync_memory_free(like_pattern);
1874 :
1875 462 : if (rc != DBRES_DONE && rc != DBRES_OK && rc != DBRES_ROW) {
1876 : // Free collected values
1877 0 : for (int i = 0; i < block_count; i++) cloudsync_memory_free((void *)block_values[i]);
1878 0 : if (block_values) cloudsync_memory_free((void *)block_values);
1879 0 : return cloudsync_set_dberror(data);
1880 : }
1881 :
1882 : // Materialize text (NULL when no alive blocks)
1883 462 : char *text = (block_count > 0) ? block_materialize_text(block_values, block_count, delimiter) : NULL;
1884 22695 : for (int i = 0; i < block_count; i++) cloudsync_memory_free((void *)block_values[i]);
1885 462 : if (block_values) cloudsync_memory_free((void *)block_values);
1886 462 : if (block_count > 0 && !text) return DBRES_NOMEM;
1887 :
1888 : // Update the base table column via the col_merge_stmt (with triggers disabled)
1889 462 : dbvm_t *merge_vm = table->col_merge_stmt[col_idx];
1890 462 : if (!merge_vm) { cloudsync_memory_free(text); return DBRES_ERROR; }
1891 :
1892 : // Bind PKs
1893 462 : rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, merge_vm);
1894 462 : if (rc < 0) { cloudsync_memory_free(text); databasevm_reset(merge_vm); return DBRES_ERROR; }
1895 :
1896 : // Bind the text value twice (INSERT value + ON CONFLICT UPDATE value)
1897 462 : int npks = table->npks;
1898 462 : if (text) {
1899 458 : rc = databasevm_bind_text(merge_vm, npks + 1, text, -1);
1900 458 : if (rc != DBRES_OK) { cloudsync_memory_free(text); databasevm_reset(merge_vm); return rc; }
1901 458 : rc = databasevm_bind_text(merge_vm, npks + 2, text, -1);
1902 458 : if (rc != DBRES_OK) { cloudsync_memory_free(text); databasevm_reset(merge_vm); return rc; }
1903 458 : } else {
1904 4 : rc = databasevm_bind_null(merge_vm, npks + 1);
1905 4 : if (rc != DBRES_OK) { databasevm_reset(merge_vm); return rc; }
1906 4 : rc = databasevm_bind_null(merge_vm, npks + 2);
1907 4 : if (rc != DBRES_OK) { databasevm_reset(merge_vm); return rc; }
1908 : }
1909 :
1910 : // Execute with triggers disabled
1911 462 : table->enabled = 0;
1912 462 : SYNCBIT_SET(data);
1913 462 : rc = databasevm_step(merge_vm);
1914 462 : databasevm_reset(merge_vm);
1915 462 : SYNCBIT_RESET(data);
1916 462 : table->enabled = 1;
1917 :
1918 462 : cloudsync_memory_free(text);
1919 :
1920 462 : if (rc == DBRES_DONE) rc = DBRES_OK;
1921 462 : if (rc != DBRES_OK) return cloudsync_set_dberror(data);
1922 462 : return DBRES_OK;
1923 462 : }
1924 :
1925 : // Accessor for has_block_cols flag
1926 1024 : bool table_has_block_cols (cloudsync_table_context *table) {
1927 1024 : return table && table->has_block_cols;
1928 : }
1929 :
1930 : // Get block column algo for a given column index
1931 11588 : col_algo_t table_col_algo (cloudsync_table_context *table, int index) {
1932 11588 : if (!table || !table->col_algo || index < 0 || index >= table->ncols) return col_algo_normal;
1933 11588 : return table->col_algo[index];
1934 11588 : }
1935 :
1936 : // Get block delimiter for a given column index
1937 137 : const char *table_col_delimiter (cloudsync_table_context *table, int index) {
1938 137 : if (!table || !table->col_delimiter || index < 0 || index >= table->ncols) return BLOCK_DEFAULT_DELIMITER;
1939 137 : return table->col_delimiter[index] ? table->col_delimiter[index] : BLOCK_DEFAULT_DELIMITER;
1940 137 : }
1941 :
1942 : // Block column struct accessors (for use outside cloudsync.c where struct is opaque)
1943 1024 : dbvm_t *table_block_value_read_stmt (cloudsync_table_context *table) { return table ? table->block_value_read_stmt : NULL; }
1944 506 : dbvm_t *table_block_value_write_stmt (cloudsync_table_context *table) { return table ? table->block_value_write_stmt : NULL; }
1945 93 : dbvm_t *table_block_list_stmt (cloudsync_table_context *table) { return table ? table->block_list_stmt : NULL; }
1946 93 : const char *table_blocks_ref (cloudsync_table_context *table) { return table ? table->blocks_ref : NULL; }
1947 :
1948 3 : void table_set_col_delimiter (cloudsync_table_context *table, int col_idx, const char *delimiter) {
1949 3 : if (!table || !table->col_delimiter || col_idx < 0 || col_idx >= table->ncols) return;
1950 3 : if (table->col_delimiter[col_idx]) cloudsync_memory_free(table->col_delimiter[col_idx]);
1951 3 : table->col_delimiter[col_idx] = delimiter ? cloudsync_string_dup(delimiter) : NULL;
1952 3 : }
1953 :
1954 : // Find column index by name
1955 127 : int table_col_index (cloudsync_table_context *table, const char *col_name) {
1956 127 : if (!table || !col_name) return -1;
1957 131 : for (int i = 0; i < table->ncols; i++) {
1958 131 : if (strcasecmp(table->col_name[i], col_name) == 0) return i;
1959 4 : }
1960 0 : return -1;
1961 127 : }
1962 :
1963 46078 : int merge_insert (cloudsync_context *data, cloudsync_table_context *table, const char *insert_pk, int insert_pk_len, int64_t insert_cl, const char *insert_name, dbvalue_t *insert_value, int64_t insert_col_version, int64_t insert_db_version, const char *insert_site_id, int insert_site_id_len, int64_t insert_seq, int64_t *rowid) {
1964 : // Handle DWS and AWS algorithms here
1965 : // Delete-Wins Set (DWS): table_algo_crdt_dws
1966 : // Add-Wins Set (AWS): table_algo_crdt_aws
1967 :
1968 : // Causal-Length Set (CLS) Algorithm (default)
1969 :
1970 : // compute the local causal length for the row based on the primary key
1971 : // the causal length is used to determine the order of operations and resolve conflicts.
1972 46078 : int64_t local_cl = merge_get_local_cl(table, insert_pk, insert_pk_len);
1973 46078 : if (local_cl < 0) return cloudsync_set_error(data, "Unable to compute local causal length", DBRES_ERROR);
1974 :
1975 : // if the incoming causal length is older than the local causal length, we can safely ignore it
1976 : // because the local changes are more recent
1977 46078 : if (insert_cl < local_cl) return DBRES_OK;
1978 :
1979 : // check if the operation is a delete by examining the causal length
1980 : // even causal lengths typically signify delete operations
1981 45853 : bool is_delete = (insert_cl % 2 == 0);
1982 45853 : if (is_delete) {
1983 : // if it's a delete, check if the local state is at the same causal length
1984 : // if it is, no further action is needed
1985 613 : if (local_cl == insert_cl) return DBRES_OK;
1986 :
1987 : // perform a delete merge if the causal length is newer than the local one
1988 338 : int rc = merge_delete(data, table, insert_pk, insert_pk_len, insert_name, insert_col_version,
1989 169 : insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid);
1990 169 : if (rc != DBRES_OK) cloudsync_set_error(data, "Unable to perform merge_delete", rc);
1991 169 : return rc;
1992 : }
1993 :
1994 : // if the operation is a sentinel-only insert (indicating a new row or resurrected row with no column update), handle it separately.
1995 45240 : bool is_sentinel_only = (strcmp(insert_name, CLOUDSYNC_TOMBSTONE_VALUE) == 0);
1996 45240 : if (is_sentinel_only) {
1997 189 : if (local_cl == insert_cl) return DBRES_OK;
1998 :
1999 : // perform a sentinel-only insert to track the existence of the row
2000 118 : int rc = merge_sentinel_only_insert(data, table, insert_pk, insert_pk_len, insert_col_version,
2001 59 : insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid);
2002 59 : if (rc != DBRES_OK) cloudsync_set_error(data, "Unable to perform merge_sentinel_only_insert", rc);
2003 59 : return rc;
2004 : }
2005 :
2006 : // from this point I can be sure that insert_name is not sentinel
2007 :
2008 : // handle the case where a row is being resurrected (e.g., after a delete, a new insert for the same row)
2009 : // odd causal lengths can "resurrect" rows
2010 45051 : bool needs_resurrect = (insert_cl > local_cl && insert_cl % 2 == 1);
2011 45051 : bool row_exists_locally = local_cl != 0;
2012 :
2013 : // if a resurrection is needed, insert a sentinel to mark the row as alive
2014 : // this handles out-of-order deliveries where the row was deleted and is now being re-inserted
2015 45051 : if (needs_resurrect && (row_exists_locally || (!row_exists_locally && insert_cl > 1))) {
2016 0 : int rc = merge_sentinel_only_insert(data, table, insert_pk, insert_pk_len, insert_cl,
2017 0 : insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid);
2018 0 : if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to perform merge_sentinel_only_insert", rc);
2019 0 : }
2020 :
2021 : // at this point, we determine whether the incoming change wins based on causal length
2022 : // this can be due to a resurrection, a non-existent local row, or a conflict resolution
2023 45051 : bool flag = false;
2024 45051 : int rc = merge_did_cid_win(data, table, insert_pk, insert_pk_len, insert_value, insert_site_id, insert_site_id_len, insert_name, insert_col_version, &flag);
2025 45051 : if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to perform merge_did_cid_win", rc);
2026 :
2027 : // check if the incoming change wins and should be applied
2028 45051 : bool does_cid_win = ((needs_resurrect) || (!row_exists_locally) || (flag));
2029 45051 : if (!does_cid_win) return DBRES_OK;
2030 :
2031 : // Block-level LWW: if the incoming col_name is a block entry (contains \x1F),
2032 : // bypass the normal base-table write and instead store the value in the blocks table.
2033 : // The base table column will be materialized from all alive blocks.
2034 21766 : if (block_is_block_colname(insert_name) && table->has_block_cols) {
2035 : // Store or delete block value in blocks table depending on tombstone status
2036 412 : if (insert_col_version % 2 == 0) {
2037 : // Tombstone: remove from blocks table
2038 33 : rc = block_delete_value(data, table, insert_pk, insert_pk_len, insert_name);
2039 33 : } else {
2040 379 : rc = block_store_value(data, table, insert_pk, insert_pk_len, insert_name, insert_value);
2041 : }
2042 412 : if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to store/delete block value", rc);
2043 :
2044 : // Set winner clock in metadata
2045 824 : rc = merge_set_winner_clock(data, table, insert_pk, insert_pk_len, insert_name,
2046 412 : insert_col_version, insert_db_version,
2047 412 : insert_site_id, insert_site_id_len, insert_seq, rowid);
2048 412 : if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to set winner clock for block", rc);
2049 :
2050 : // Materialize the full column from blocks into the base table
2051 412 : char *base_col = block_extract_base_colname(insert_name);
2052 412 : if (base_col) {
2053 412 : rc = block_materialize_column(data, table, insert_pk, insert_pk_len, base_col);
2054 412 : cloudsync_memory_free(base_col);
2055 412 : if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to materialize block column", rc);
2056 412 : }
2057 :
2058 412 : return DBRES_OK;
2059 : }
2060 :
2061 : // perform the final column insert or update if the incoming change wins
2062 21354 : if (data->pending_batch) {
2063 : // Propagate row_exists_locally to the batch on the first winning column.
2064 : // This lets merge_flush_pending choose UPDATE vs INSERT ON CONFLICT,
2065 : // which matters when RLS policies reference columns not in the payload.
2066 21337 : if (data->pending_batch->table == NULL) {
2067 7699 : data->pending_batch->row_exists = row_exists_locally;
2068 7699 : }
2069 21337 : rc = merge_pending_add(data, table, insert_pk, insert_pk_len, insert_name, insert_value, insert_col_version, insert_db_version, insert_site_id, insert_site_id_len, insert_seq);
2070 21337 : if (rc != DBRES_OK) cloudsync_set_error(data, "Unable to perform merge_pending_add", rc);
2071 21337 : } else {
2072 17 : rc = merge_insert_col(data, table, insert_pk, insert_pk_len, insert_name, insert_value, insert_col_version, insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid);
2073 17 : if (rc != DBRES_OK) cloudsync_set_error(data, "Unable to perform merge_insert_col", rc);
2074 : }
2075 :
2076 21354 : return rc;
2077 46078 : }
2078 :
2079 : // MARK: - Block column setup -
2080 :
2081 : // Migrate existing tracked rows to block format when block-level LWW is first enabled on a column.
2082 : // Scans the metadata table for alive rows with the plain col_name entry (not yet block entries),
2083 : // reads each row's current value from the base table, splits it into blocks, and inserts
2084 : // the block entries into both the blocks table and the metadata table.
2085 : // Uses INSERT OR IGNORE semantics so the operation is safe to call multiple times.
2086 73 : static int block_migrate_existing_rows (cloudsync_context *data, cloudsync_table_context *table, int col_idx) {
2087 73 : const char *col_name = table->col_name[col_idx];
2088 73 : if (!col_name || !table->meta_ref || !table->blocks_ref) return DBRES_OK;
2089 :
2090 73 : const char *delim = table->col_delimiter[col_idx] ? table->col_delimiter[col_idx] : BLOCK_DEFAULT_DELIMITER;
2091 73 : int64_t db_version = cloudsync_dbversion_next(data, CLOUDSYNC_VALUE_NOTSET);
2092 :
2093 : // Phase 1: collect all existing PKs that have an alive regular col_name entry
2094 : // AND do not yet have any entries in the blocks table for this column.
2095 : // The NOT IN filter makes this idempotent: rows that were already migrated
2096 : // (or had their blocks created via INSERT) are skipped on subsequent calls.
2097 : // We collect PKs before writing so that writes to the metadata table (Phase 2)
2098 : // do not perturb the read cursor on the same table.
2099 73 : char *like_pattern = block_build_colname(col_name, "%");
2100 73 : if (!like_pattern) return DBRES_NOMEM;
2101 :
2102 73 : char *scan_sql = cloudsync_memory_mprintf(SQL_META_SCAN_COL_FOR_MIGRATION, table->meta_ref, table->blocks_ref);
2103 73 : if (!scan_sql) { cloudsync_memory_free(like_pattern); return DBRES_NOMEM; }
2104 73 : dbvm_t *scan_vm = NULL;
2105 73 : int rc = databasevm_prepare(data, scan_sql, &scan_vm, 0);
2106 73 : cloudsync_memory_free(scan_sql);
2107 73 : if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); return rc; }
2108 :
2109 73 : rc = databasevm_bind_text(scan_vm, 1, col_name, -1);
2110 73 : if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_finalize(scan_vm); return rc; }
2111 : // Bind like_pattern as ?2 and keep it alive until after all scan steps complete,
2112 : // because databasevm_bind_text uses SQLITE_STATIC (no copy).
2113 73 : rc = databasevm_bind_text(scan_vm, 2, like_pattern, -1);
2114 73 : if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_finalize(scan_vm); return rc; }
2115 :
2116 : // Collect pk blobs into a dynamically-grown array of owned copies
2117 73 : void **pks = NULL;
2118 73 : size_t *pklens = NULL;
2119 73 : int pk_count = 0;
2120 73 : int pk_cap = 0;
2121 :
2122 75 : while ((rc = databasevm_step(scan_vm)) == DBRES_ROW) {
2123 2 : size_t pklen = 0;
2124 2 : const void *pk = database_column_blob(scan_vm, 0, &pklen);
2125 2 : if (!pk || pklen == 0) continue;
2126 :
2127 2 : if (pk_count >= pk_cap) {
2128 1 : int new_cap = pk_cap ? pk_cap * 2 : 8;
2129 1 : void **new_pks = (void **)cloudsync_memory_realloc(pks, (uint64_t)(new_cap * sizeof(void *)));
2130 1 : size_t *new_pklens = (size_t *)cloudsync_memory_realloc(pklens, (uint64_t)(new_cap * sizeof(size_t)));
2131 1 : if (!new_pks || !new_pklens) {
2132 0 : cloudsync_memory_free(new_pks ? new_pks : pks);
2133 0 : cloudsync_memory_free(new_pklens ? new_pklens : pklens);
2134 0 : databasevm_finalize(scan_vm);
2135 0 : return DBRES_NOMEM;
2136 : }
2137 1 : pks = new_pks;
2138 1 : pklens = new_pklens;
2139 1 : pk_cap = new_cap;
2140 1 : }
2141 :
2142 2 : pks[pk_count] = cloudsync_memory_alloc((uint64_t)pklen);
2143 2 : if (!pks[pk_count]) { rc = DBRES_NOMEM; break; }
2144 2 : memcpy(pks[pk_count], pk, pklen);
2145 2 : pklens[pk_count] = pklen;
2146 2 : pk_count++;
2147 : }
2148 :
2149 73 : databasevm_finalize(scan_vm);
2150 73 : cloudsync_memory_free(like_pattern); // safe to free after scan_vm is finalized
2151 73 : if (rc != DBRES_DONE && rc != DBRES_OK) {
2152 0 : for (int i = 0; i < pk_count; i++) cloudsync_memory_free(pks[i]);
2153 0 : cloudsync_memory_free(pks);
2154 0 : cloudsync_memory_free(pklens);
2155 0 : return rc;
2156 : }
2157 :
2158 73 : if (pk_count == 0) {
2159 72 : cloudsync_memory_free(pks);
2160 72 : cloudsync_memory_free(pklens);
2161 72 : return DBRES_OK;
2162 : }
2163 :
2164 : // Phase 2: for each collected PK, read the column value, split into blocks,
2165 : // and insert into the blocks table + metadata using INSERT OR IGNORE.
2166 :
2167 1 : char *meta_sql = cloudsync_memory_mprintf(SQL_META_INSERT_BLOCK_IGNORE, table->meta_ref);
2168 1 : if (!meta_sql) { rc = DBRES_NOMEM; goto cleanup_pks; }
2169 1 : dbvm_t *meta_vm = NULL;
2170 1 : rc = databasevm_prepare(data, meta_sql, &meta_vm, 0);
2171 1 : cloudsync_memory_free(meta_sql);
2172 1 : if (rc != DBRES_OK) goto cleanup_pks;
2173 :
2174 1 : char *blocks_sql = cloudsync_memory_mprintf(SQL_BLOCKS_INSERT_IGNORE, table->blocks_ref);
2175 1 : if (!blocks_sql) { databasevm_finalize(meta_vm); rc = DBRES_NOMEM; goto cleanup_pks; }
2176 1 : dbvm_t *blocks_vm = NULL;
2177 1 : rc = databasevm_prepare(data, blocks_sql, &blocks_vm, 0);
2178 1 : cloudsync_memory_free(blocks_sql);
2179 1 : if (rc != DBRES_OK) { databasevm_finalize(meta_vm); goto cleanup_pks; }
2180 :
2181 1 : dbvm_t *val_vm = (dbvm_t *)table_column_lookup(table, col_name, false, NULL);
2182 :
2183 3 : for (int p = 0; p < pk_count; p++) {
2184 2 : const void *pk = pks[p];
2185 2 : size_t pklen = pklens[p];
2186 :
2187 2 : if (!val_vm) continue;
2188 :
2189 : // Read current column value from the base table
2190 2 : int bind_rc = pk_decode_prikey((char *)pk, pklen, pk_decode_bind_callback, (void *)val_vm);
2191 2 : if (bind_rc < 0) { databasevm_reset(val_vm); continue; }
2192 :
2193 2 : int step_rc = databasevm_step(val_vm);
2194 2 : const char *text = (step_rc == DBRES_ROW) ? database_column_text(val_vm, 0) : NULL;
2195 : // Make a copy of text before resetting val_vm, as the pointer is only valid until reset
2196 2 : char *text_copy = text ? cloudsync_string_dup(text) : NULL;
2197 2 : databasevm_reset(val_vm);
2198 :
2199 2 : if (!text_copy) continue; // NULL column value: nothing to migrate
2200 :
2201 : // Split text into blocks and store each one
2202 2 : block_list_t *blocks = block_split(text_copy, delim);
2203 2 : cloudsync_memory_free(text_copy);
2204 2 : if (!blocks) continue;
2205 :
2206 2 : char **positions = block_initial_positions(blocks->count);
2207 2 : if (positions) {
2208 7 : for (int b = 0; b < blocks->count; b++) {
2209 5 : char *block_cn = block_build_colname(col_name, positions[b]);
2210 5 : if (block_cn) {
2211 : // Metadata entry (skip if this block position already exists)
2212 5 : databasevm_bind_blob(meta_vm, 1, pk, (int)pklen);
2213 5 : databasevm_bind_text(meta_vm, 2, block_cn, -1);
2214 5 : databasevm_bind_int(meta_vm, 3, 1); // col_version = 1 (alive)
2215 5 : databasevm_bind_int(meta_vm, 4, db_version);
2216 5 : databasevm_bind_int(meta_vm, 5, cloudsync_bumpseq(data));
2217 5 : databasevm_step(meta_vm);
2218 5 : databasevm_reset(meta_vm);
2219 :
2220 : // Block value (skip if this block position already exists)
2221 5 : databasevm_bind_blob(blocks_vm, 1, pk, (int)pklen);
2222 5 : databasevm_bind_text(blocks_vm, 2, block_cn, -1);
2223 5 : databasevm_bind_text(blocks_vm, 3, blocks->entries[b].content, -1);
2224 5 : databasevm_step(blocks_vm);
2225 5 : databasevm_reset(blocks_vm);
2226 :
2227 5 : cloudsync_memory_free(block_cn);
2228 5 : }
2229 5 : cloudsync_memory_free(positions[b]);
2230 5 : }
2231 2 : cloudsync_memory_free(positions);
2232 2 : }
2233 2 : block_list_free(blocks);
2234 2 : }
2235 :
2236 1 : databasevm_finalize(meta_vm);
2237 1 : databasevm_finalize(blocks_vm);
2238 1 : rc = DBRES_OK;
2239 :
2240 : cleanup_pks:
2241 3 : for (int i = 0; i < pk_count; i++) cloudsync_memory_free(pks[i]);
2242 1 : cloudsync_memory_free(pks);
2243 1 : cloudsync_memory_free(pklens);
2244 1 : return rc;
2245 73 : }
2246 :
2247 74 : int cloudsync_setup_block_column (cloudsync_context *data, const char *table_name, const char *col_name, const char *delimiter, bool persist) {
2248 74 : cloudsync_table_context *table = table_lookup(data, table_name);
2249 74 : if (!table) return cloudsync_set_error(data, "cloudsync_setup_block_column: table not found", DBRES_ERROR);
2250 :
2251 : // Find column index
2252 74 : int col_idx = table_col_index(table, col_name);
2253 74 : if (col_idx < 0) {
2254 : char buf[1024];
2255 0 : snprintf(buf, sizeof(buf), "cloudsync_setup_block_column: column '%s' not found in table '%s'", col_name, table_name);
2256 0 : return cloudsync_set_error(data, buf, DBRES_ERROR);
2257 : }
2258 :
2259 : // Set column algo
2260 74 : table->col_algo[col_idx] = col_algo_block;
2261 74 : table->has_block_cols = true;
2262 :
2263 : // Set delimiter (can be NULL for default)
2264 74 : if (table->col_delimiter[col_idx]) {
2265 0 : cloudsync_memory_free(table->col_delimiter[col_idx]);
2266 0 : table->col_delimiter[col_idx] = NULL;
2267 0 : }
2268 74 : if (delimiter) {
2269 1 : table->col_delimiter[col_idx] = cloudsync_string_dup(delimiter);
2270 1 : }
2271 :
2272 : // Create blocks table if not already done
2273 74 : if (!table->blocks_ref) {
2274 71 : table->blocks_ref = database_build_blocks_ref(table->schema, table->name);
2275 71 : if (!table->blocks_ref) return DBRES_NOMEM;
2276 :
2277 : // CREATE TABLE IF NOT EXISTS
2278 71 : char *sql = cloudsync_memory_mprintf(SQL_BLOCKS_CREATE_TABLE, table->blocks_ref);
2279 71 : if (!sql) return DBRES_NOMEM;
2280 :
2281 71 : int rc = database_exec(data, sql);
2282 71 : cloudsync_memory_free(sql);
2283 71 : if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to create blocks table", rc);
2284 :
2285 : // Prepare block statements
2286 : // Write: upsert into blocks (pk, col_name, col_value)
2287 71 : sql = cloudsync_memory_mprintf(SQL_BLOCKS_UPSERT, table->blocks_ref);
2288 71 : if (!sql) return DBRES_NOMEM;
2289 71 : rc = databasevm_prepare(data, sql, (void **)&table->block_value_write_stmt, DBFLAG_PERSISTENT);
2290 71 : cloudsync_memory_free(sql);
2291 71 : if (rc != DBRES_OK) return rc;
2292 :
2293 : // Read: SELECT col_value FROM blocks WHERE pk = ? AND col_name = ?
2294 71 : sql = cloudsync_memory_mprintf(SQL_BLOCKS_SELECT, table->blocks_ref);
2295 71 : if (!sql) return DBRES_NOMEM;
2296 71 : rc = databasevm_prepare(data, sql, (void **)&table->block_value_read_stmt, DBFLAG_PERSISTENT);
2297 71 : cloudsync_memory_free(sql);
2298 71 : if (rc != DBRES_OK) return rc;
2299 :
2300 : // Delete: DELETE FROM blocks WHERE pk = ? AND col_name = ?
2301 71 : sql = cloudsync_memory_mprintf(SQL_BLOCKS_DELETE, table->blocks_ref);
2302 71 : if (!sql) return DBRES_NOMEM;
2303 71 : rc = databasevm_prepare(data, sql, (void **)&table->block_value_delete_stmt, DBFLAG_PERSISTENT);
2304 71 : cloudsync_memory_free(sql);
2305 71 : if (rc != DBRES_OK) return rc;
2306 :
2307 : // List alive blocks for materialization
2308 71 : sql = cloudsync_memory_mprintf(SQL_BLOCKS_LIST_ALIVE, table->blocks_ref, table->meta_ref);
2309 71 : if (!sql) return DBRES_NOMEM;
2310 71 : rc = databasevm_prepare(data, sql, (void **)&table->block_list_stmt, DBFLAG_PERSISTENT);
2311 71 : cloudsync_memory_free(sql);
2312 71 : if (rc != DBRES_OK) return rc;
2313 71 : }
2314 :
2315 : // Persist settings (skipped when called from the settings loader, since
2316 : // writing to cloudsync_table_settings while sqlite3_exec is iterating it
2317 : // re-feeds the rewritten row to the cursor and causes an infinite loop).
2318 74 : if (persist) {
2319 73 : int rc = dbutils_table_settings_set_key_value(data, table_name, col_name, "algo", "block");
2320 73 : if (rc != DBRES_OK) return rc;
2321 :
2322 73 : if (delimiter) {
2323 0 : rc = dbutils_table_settings_set_key_value(data, table_name, col_name, "delimiter", delimiter);
2324 0 : if (rc != DBRES_OK) return rc;
2325 0 : }
2326 :
2327 : // Migrate any existing tracked rows: populate the blocks table and metadata with
2328 : // block entries derived from the current column value, so that subsequent UPDATE
2329 : // operations can diff against the real existing state instead of treating everything
2330 : // as new, and so this node participates correctly in LWW conflict resolution.
2331 73 : rc = block_migrate_existing_rows(data, table, col_idx);
2332 73 : if (rc != DBRES_OK) return rc;
2333 73 : }
2334 :
2335 74 : return DBRES_OK;
2336 74 : }
2337 :
2338 : // MARK: - Private -
2339 :
2340 232 : bool cloudsync_config_exists (cloudsync_context *data) {
2341 232 : return database_internal_table_exists(data, CLOUDSYNC_SITEID_NAME) == true;
2342 : }
2343 :
2344 248 : cloudsync_context *cloudsync_context_create (void *db) {
2345 248 : cloudsync_context *data = (cloudsync_context *)cloudsync_memory_zeroalloc((uint64_t)(sizeof(cloudsync_context)));
2346 248 : if (!data) return NULL;
2347 : DEBUG_SETTINGS("cloudsync_context_create %p", data);
2348 :
2349 248 : data->libversion = CLOUDSYNC_VERSION;
2350 248 : data->pending_db_version = CLOUDSYNC_VALUE_NOTSET;
2351 : #if CLOUDSYNC_DEBUG
2352 : data->debug = 1;
2353 : #endif
2354 :
2355 : // allocate space for 64 tables (it can grow if needed)
2356 248 : uint64_t mem_needed = (uint64_t)(CLOUDSYNC_INIT_NTABLES * sizeof(cloudsync_table_context *));
2357 248 : data->tables = (cloudsync_table_context **)cloudsync_memory_zeroalloc(mem_needed);
2358 248 : if (!data->tables) {cloudsync_memory_free(data); return NULL;}
2359 :
2360 248 : data->tables_cap = CLOUDSYNC_INIT_NTABLES;
2361 248 : data->tables_count = 0;
2362 248 : data->db = db;
2363 :
2364 : // SQLite exposes col_value as ANY, but other databases require a concrete type.
2365 : // In PostgreSQL we expose col_value as bytea, which holds the pk-encoded value bytes (type + data).
2366 : // Because col_value is already encoded, we skip decoding this field and pass it through as bytea.
2367 : // It is decoded to the target column type just before applying changes to the base table.
2368 248 : data->skip_decode_idx = (db == NULL) ? CLOUDSYNC_PK_INDEX_COLVALUE : -1;
2369 :
2370 248 : return data;
2371 248 : }
2372 :
2373 248 : void cloudsync_context_free (void *ctx) {
2374 248 : cloudsync_context *data = (cloudsync_context *)ctx;
2375 : DEBUG_SETTINGS("cloudsync_context_free %p", data);
2376 248 : if (!data) return;
2377 :
2378 : // free all table contexts and prepared statements
2379 248 : cloudsync_terminate(data);
2380 :
2381 248 : cloudsync_memory_free(data->tables);
2382 248 : cloudsync_memory_free(data);
2383 248 : }
2384 :
2385 343 : const char *cloudsync_context_init (cloudsync_context *data) {
2386 343 : if (!data) return NULL;
2387 :
2388 : // perform init just the first time, if the site_id field is not set.
2389 : // The data->site_id value could exists while settings tables don't exists if the
2390 : // cloudsync_context_init was previously called in init transaction that was rolled back
2391 : // because of an error during the init process.
2392 343 : if (data->site_id[0] == 0 || !database_internal_table_exists(data, CLOUDSYNC_SITEID_NAME)) {
2393 228 : if (dbutils_settings_init(data) != DBRES_OK) return NULL;
2394 228 : if (cloudsync_add_dbvms(data) != DBRES_OK) return NULL;
2395 228 : if (cloudsync_load_siteid(data) != DBRES_OK) return NULL;
2396 228 : data->schema_hash = database_schema_hash(data);
2397 228 : }
2398 :
2399 343 : return (const char *)data->site_id;
2400 343 : }
2401 :
2402 1368 : void cloudsync_sync_key (cloudsync_context *data, const char *key, const char *value) {
2403 : DEBUG_SETTINGS("cloudsync_sync_key key: %s value: %s", key, value);
2404 :
2405 : // sync data
2406 1368 : if (strcmp(key, CLOUDSYNC_KEY_SCHEMAVERSION) == 0) {
2407 228 : data->schema_version = (int)strtol(value, NULL, 0);
2408 228 : return;
2409 : }
2410 :
2411 1140 : if (strcmp(key, CLOUDSYNC_KEY_DEBUG) == 0) {
2412 0 : data->debug = 0;
2413 0 : if (value && (value[0] != 0) && (value[0] != '0')) data->debug = 1;
2414 0 : return;
2415 : }
2416 :
2417 1140 : if (strcmp(key, CLOUDSYNC_KEY_SCHEMA) == 0) {
2418 0 : cloudsync_set_schema(data, value);
2419 0 : return;
2420 : }
2421 1368 : }
2422 :
2423 : #if 0
2424 : void cloudsync_sync_table_key(cloudsync_context *data, const char *table, const char *column, const char *key, const char *value) {
2425 : DEBUG_SETTINGS("cloudsync_sync_table_key table: %s column: %s key: %s value: %s", table, column, key, value);
2426 : // Unused in this version
2427 : return;
2428 : }
2429 : #endif
2430 :
2431 8310 : int cloudsync_commit_hook (void *ctx) {
2432 8310 : cloudsync_context *data = (cloudsync_context *)ctx;
2433 :
2434 8310 : data->db_version = data->pending_db_version;
2435 8310 : data->pending_db_version = CLOUDSYNC_VALUE_NOTSET;
2436 8310 : data->seq = 0;
2437 :
2438 8310 : return DBRES_OK;
2439 : }
2440 :
2441 3 : void cloudsync_rollback_hook (void *ctx) {
2442 3 : cloudsync_context *data = (cloudsync_context *)ctx;
2443 :
2444 3 : data->pending_db_version = CLOUDSYNC_VALUE_NOTSET;
2445 3 : data->seq = 0;
2446 3 : }
2447 :
2448 24 : int cloudsync_begin_alter (cloudsync_context *data, const char *table_name) {
2449 : // init cloudsync_settings
2450 24 : if (cloudsync_context_init(data) == NULL) {
2451 0 : return DBRES_MISUSE;
2452 : }
2453 :
2454 : // lookup table
2455 24 : cloudsync_table_context *table = table_lookup(data, table_name);
2456 24 : if (!table) {
2457 : char buffer[1024];
2458 1 : snprintf(buffer, sizeof(buffer), "Unable to find table %s", table_name);
2459 1 : return cloudsync_set_error(data, buffer, DBRES_MISUSE);
2460 : }
2461 :
2462 : // idempotent: if already altering, return OK
2463 23 : if (table->is_altering) return DBRES_OK;
2464 :
2465 : // retrieve primary key(s)
2466 23 : char **names = NULL;
2467 23 : int nrows = 0;
2468 23 : int rc = database_pk_names(data, table_name, &names, &nrows);
2469 23 : if (rc != DBRES_OK) {
2470 : char buffer[1024];
2471 0 : snprintf(buffer, sizeof(buffer), "Unable to get primary keys for table %s", table_name);
2472 0 : cloudsync_set_error(data, buffer, DBRES_MISUSE);
2473 0 : goto rollback_begin_alter;
2474 : }
2475 :
2476 : // sanity check the number of primary keys
2477 23 : if (nrows != table_count_pks(table)) {
2478 : char buffer[1024];
2479 0 : snprintf(buffer, sizeof(buffer), "Number of primary keys for table %s changed before ALTER", table_name);
2480 0 : cloudsync_set_error(data, buffer, DBRES_MISUSE);
2481 0 : goto rollback_begin_alter;
2482 : }
2483 :
2484 : // drop original triggers
2485 23 : rc = database_delete_triggers(data, table_name);
2486 23 : if (rc != DBRES_OK) {
2487 : char buffer[1024];
2488 0 : snprintf(buffer, sizeof(buffer), "Unable to delete triggers for table %s in cloudsync_begin_alter.", table_name);
2489 0 : cloudsync_set_error(data, buffer, DBRES_ERROR);
2490 0 : goto rollback_begin_alter;
2491 : }
2492 :
2493 23 : table_set_pknames(table, names);
2494 23 : table->is_altering = true;
2495 23 : return DBRES_OK;
2496 :
2497 : rollback_begin_alter:
2498 0 : if (names) table_pknames_free(names, nrows);
2499 0 : return rc;
2500 24 : }
2501 :
2502 23 : int cloudsync_finalize_alter (cloudsync_context *data, cloudsync_table_context *table) {
2503 : // check if dbversion needed to be updated
2504 23 : cloudsync_dbversion_check_uptodate(data);
2505 :
2506 : // if primary-key columns change, all row identities change.
2507 : // In that case, the clock table must be dropped, recreated,
2508 : // and backfilled. We detect this by comparing the unique index
2509 : // in the lookaside table with the source table's PKs.
2510 :
2511 : // retrieve primary keys (to check is they changed)
2512 23 : char **result = NULL;
2513 23 : int nrows = 0;
2514 23 : int rc = database_pk_names (data, table->name, &result, &nrows);
2515 23 : if (rc != DBRES_OK || nrows == 0) {
2516 0 : if (nrows == 0) rc = DBRES_MISUSE;
2517 0 : goto finalize;
2518 : }
2519 :
2520 : // check if there are differences
2521 23 : bool pk_diff = (nrows != table->npks);
2522 23 : if (!pk_diff) {
2523 45 : for (int i = 0; i < nrows; ++i) {
2524 34 : if (strcmp(table->pk_name[i], result[i]) != 0) {
2525 6 : pk_diff = true;
2526 6 : break;
2527 : }
2528 28 : }
2529 17 : }
2530 :
2531 23 : if (pk_diff) {
2532 : // drop meta-table, it will be recreated
2533 12 : char *sql = cloudsync_memory_mprintf(SQL_DROP_CLOUDSYNC_TABLE, table->meta_ref);
2534 12 : rc = database_exec(data, sql);
2535 12 : cloudsync_memory_free(sql);
2536 12 : if (rc != DBRES_OK) {
2537 0 : DEBUG_DBERROR(rc, "cloudsync_finalize_alter", data);
2538 0 : goto finalize;
2539 : }
2540 12 : } else {
2541 : // compact meta-table
2542 : // delete entries for removed columns
2543 11 : const char *schema = table->schema ? table->schema : "";
2544 11 : char *sql = sql_build_delete_cols_not_in_schema_query(schema, table->name, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
2545 11 : rc = database_exec(data, sql);
2546 11 : cloudsync_memory_free(sql);
2547 11 : if (rc != DBRES_OK) {
2548 0 : DEBUG_DBERROR(rc, "cloudsync_finalize_alter", data);
2549 0 : goto finalize;
2550 : }
2551 :
2552 11 : sql = sql_build_pk_qualified_collist_query(schema, table->name);
2553 11 : if (!sql) {rc = DBRES_NOMEM; goto finalize;}
2554 :
2555 11 : char *pkclause = NULL;
2556 11 : rc = database_select_text(data, sql, &pkclause);
2557 11 : cloudsync_memory_free(sql);
2558 11 : if (rc != DBRES_OK) goto finalize;
2559 11 : char *pkvalues = (pkclause) ? pkclause : "rowid";
2560 :
2561 : // delete entries related to rows that no longer exist in the original table, but preserve tombstone
2562 11 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_GC_DELETE_ORPHANED_PK, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE, CLOUDSYNC_TOMBSTONE_VALUE, table->base_ref, table->meta_ref, pkvalues);
2563 11 : rc = database_exec(data, sql);
2564 11 : if (pkclause) cloudsync_memory_free(pkclause);
2565 11 : cloudsync_memory_free(sql);
2566 11 : if (rc != DBRES_OK) {
2567 0 : DEBUG_DBERROR(rc, "cloudsync_finalize_alter", data);
2568 0 : goto finalize;
2569 : }
2570 :
2571 : }
2572 :
2573 : // update key to be later used in cloudsync_dbversion_rebuild
2574 : char buf[256];
2575 23 : snprintf(buf, sizeof(buf), "%" PRId64, data->db_version);
2576 23 : dbutils_settings_set_key_value(data, "pre_alter_dbversion", buf);
2577 :
2578 : finalize:
2579 23 : table_pknames_free(result, nrows);
2580 23 : return rc;
2581 : }
2582 :
2583 24 : int cloudsync_commit_alter (cloudsync_context *data, const char *table_name) {
2584 24 : int rc = DBRES_MISUSE;
2585 24 : cloudsync_table_context *table = NULL;
2586 :
2587 : // init cloudsync_settings
2588 24 : if (cloudsync_context_init(data) == NULL) {
2589 0 : cloudsync_set_error(data, "Unable to initialize cloudsync context", DBRES_MISUSE);
2590 0 : goto rollback_finalize_alter;
2591 : }
2592 :
2593 : // lookup table
2594 24 : table = table_lookup(data, table_name);
2595 24 : if (!table) {
2596 : char buffer[1024];
2597 1 : snprintf(buffer, sizeof(buffer), "Unable to find table %s", table_name);
2598 1 : cloudsync_set_error(data, buffer, DBRES_MISUSE);
2599 1 : goto rollback_finalize_alter;
2600 : }
2601 :
2602 : // idempotent: if not altering, return OK
2603 23 : if (!table->is_altering) return DBRES_OK;
2604 :
2605 23 : rc = cloudsync_finalize_alter(data, table);
2606 23 : if (rc != DBRES_OK) goto rollback_finalize_alter;
2607 :
2608 : // the table is outdated, delete it and it will be reloaded in the cloudsync_init_internal
2609 : // is_altering is reset implicitly because table_free + cloudsync_init_table
2610 : // will reallocate the table context with zero-initialized memory
2611 23 : table_remove(data, table);
2612 23 : table_free(table);
2613 23 : table = NULL;
2614 :
2615 : // init again cloudsync for the table
2616 23 : table_algo algo_current = dbutils_table_settings_get_algo(data, table_name);
2617 23 : if (algo_current == table_algo_none) algo_current = dbutils_table_settings_get_algo(data, "*");
2618 23 : rc = cloudsync_init_table(data, table_name, cloudsync_algo_name(algo_current), CLOUDSYNC_INIT_FLAG_SKIP_INT_PK_CHECK);
2619 23 : if (rc != DBRES_OK) goto rollback_finalize_alter;
2620 :
2621 23 : return DBRES_OK;
2622 :
2623 : rollback_finalize_alter:
2624 1 : if (table) {
2625 0 : table_set_pknames(table, NULL);
2626 0 : table->is_altering = false;
2627 0 : }
2628 1 : return rc;
2629 24 : }
2630 :
2631 : // MARK: - Filter Rewrite -
2632 :
2633 : // Replace bare column names in a filter expression with prefix-qualified names.
2634 : // E.g., filter="user_id = 42", prefix="NEW", columns=["user_id","id"] → "NEW.\"user_id\" = 42"
2635 : // Columns must be sorted by length descending by the caller to avoid partial matches.
2636 : // Skips content inside single-quoted string literals.
2637 : // Returns a newly allocated string (caller must free with cloudsync_memory_free), or NULL on error.
2638 : // Helper: check if an identifier token matches a column name.
2639 112 : static bool filter_is_column (const char *token, size_t token_len, char **columns, int ncols) {
2640 450 : for (int i = 0; i < ncols; ++i) {
2641 388 : if (strlen(columns[i]) == token_len && strncmp(token, columns[i], token_len) == 0)
2642 50 : return true;
2643 338 : }
2644 62 : return false;
2645 112 : }
2646 :
2647 : // Helper: check if character is part of a SQL identifier.
2648 796 : static bool filter_is_ident_char (char c) {
2649 1262 : return (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') ||
2650 466 : (c >= '0' && c <= '9') || c == '_';
2651 : }
2652 :
2653 40 : char *cloudsync_filter_add_row_prefix (const char *filter, const char *prefix, char **columns, int ncols) {
2654 40 : if (!filter || !prefix || !columns || ncols <= 0) return NULL;
2655 :
2656 40 : size_t filter_len = strlen(filter);
2657 40 : size_t prefix_len = strlen(prefix);
2658 :
2659 : // Each identifier match grows by at most (prefix_len + 3) bytes.
2660 : // Worst case: the entire filter is one repeated column reference separated by
2661 : // single characters, so up to (filter_len / 2) matches. Use a safe upper bound.
2662 40 : size_t max_growth = (filter_len / 2 + 1) * (prefix_len + 3);
2663 40 : size_t cap = filter_len + max_growth + 64;
2664 40 : char *result = (char *)cloudsync_memory_alloc(cap);
2665 40 : if (!result) return NULL;
2666 40 : size_t out = 0;
2667 :
2668 : // Single pass: tokenize into identifiers, quoted strings, and everything else.
2669 40 : size_t i = 0;
2670 336 : while (i < filter_len) {
2671 : // Skip single-quoted string literals verbatim (handle '' escape)
2672 296 : if (filter[i] == '\'') {
2673 6 : result[out++] = filter[i++];
2674 38 : while (i < filter_len) {
2675 38 : if (filter[i] == '\'') {
2676 6 : result[out++] = filter[i++];
2677 : // '' is an escaped quote — keep going
2678 6 : if (i < filter_len && filter[i] == '\'') {
2679 0 : result[out++] = filter[i++];
2680 0 : continue;
2681 : }
2682 6 : break; // single ' ends the literal
2683 : }
2684 32 : result[out++] = filter[i++];
2685 : }
2686 6 : continue;
2687 : }
2688 :
2689 : // Extract identifier token
2690 290 : if (filter_is_ident_char(filter[i])) {
2691 112 : size_t start = i;
2692 540 : while (i < filter_len && filter_is_ident_char(filter[i])) ++i;
2693 112 : size_t token_len = i - start;
2694 :
2695 112 : if (filter_is_column(&filter[start], token_len, columns, ncols)) {
2696 : // Emit PREFIX."column_name"
2697 50 : memcpy(&result[out], prefix, prefix_len); out += prefix_len;
2698 50 : result[out++] = '.';
2699 50 : result[out++] = '"';
2700 50 : memcpy(&result[out], &filter[start], token_len); out += token_len;
2701 50 : result[out++] = '"';
2702 50 : } else {
2703 : // Not a column — copy as-is
2704 62 : memcpy(&result[out], &filter[start], token_len); out += token_len;
2705 : }
2706 112 : continue;
2707 : }
2708 :
2709 : // Any other character — copy as-is
2710 178 : result[out++] = filter[i++];
2711 : }
2712 :
2713 40 : result[out] = '\0';
2714 40 : return result;
2715 40 : }
2716 :
2717 24 : int cloudsync_reset_metatable (cloudsync_context *data, const char *table_name) {
2718 24 : cloudsync_table_context *table = table_lookup(data, table_name);
2719 24 : if (!table) return DBRES_ERROR;
2720 :
2721 24 : char *sql = cloudsync_memory_mprintf(SQL_DELETE_ALL_FROM_CLOUDSYNC_TABLE, table->meta_ref);
2722 24 : int rc = database_exec(data, sql);
2723 24 : cloudsync_memory_free(sql);
2724 24 : if (rc != DBRES_OK) return rc;
2725 :
2726 24 : return cloudsync_refill_metatable(data, table_name);
2727 24 : }
2728 :
2729 307 : int cloudsync_refill_metatable (cloudsync_context *data, const char *table_name) {
2730 307 : cloudsync_table_context *table = table_lookup(data, table_name);
2731 307 : if (!table) return DBRES_ERROR;
2732 :
2733 307 : dbvm_t *vm = NULL;
2734 307 : int64_t db_version = cloudsync_dbversion_next(data, CLOUDSYNC_VALUE_NOTSET);
2735 :
2736 : // Read row-level filter from settings (if any)
2737 : char filter_buf[2048];
2738 307 : int frc = dbutils_table_settings_get_value(data, table_name, "*", "filter", filter_buf, sizeof(filter_buf));
2739 307 : const char *filter = (frc == DBRES_OK && filter_buf[0]) ? filter_buf : NULL;
2740 :
2741 307 : const char *schema = table->schema ? table->schema : "";
2742 307 : char *sql = sql_build_pk_collist_query(schema, table_name);
2743 307 : char *pkclause_identifiers = NULL;
2744 307 : int rc = database_select_text(data, sql, &pkclause_identifiers);
2745 307 : cloudsync_memory_free(sql);
2746 307 : if (rc != DBRES_OK) goto finalize;
2747 307 : char *pkvalues_identifiers = (pkclause_identifiers) ? pkclause_identifiers : "rowid";
2748 :
2749 : // Use database-specific query builder to handle type differences in composite PKs
2750 307 : sql = sql_build_insert_missing_pks_query(schema, table_name, pkvalues_identifiers, table->base_ref, table->meta_ref, filter);
2751 307 : if (!sql) {rc = DBRES_NOMEM; goto finalize;}
2752 307 : rc = database_exec(data, sql);
2753 307 : cloudsync_memory_free(sql);
2754 307 : if (rc != DBRES_OK) goto finalize;
2755 :
2756 : // fill missing colums
2757 : // for each non-pk column:
2758 : // The new query does 1 encode per source row and one indexed NOT-EXISTS probe.
2759 : // The old plan does many decodes per candidate and can't use an index to rule out matches quickly—so it burns CPU and I/O.
2760 :
2761 307 : if (filter) {
2762 20 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_PKS_NOT_IN_SYNC_FOR_COL_FILTERED, pkvalues_identifiers, table->base_ref, filter, table->meta_ref);
2763 20 : } else {
2764 287 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_PKS_NOT_IN_SYNC_FOR_COL, pkvalues_identifiers, table->base_ref, table->meta_ref);
2765 : }
2766 307 : rc = databasevm_prepare(data, sql, (void **)&vm, DBFLAG_PERSISTENT);
2767 307 : cloudsync_memory_free(sql);
2768 307 : if (rc != DBRES_OK) goto finalize;
2769 :
2770 1455 : for (int i=0; i<table->ncols; ++i) {
2771 1148 : char *col_name = table->col_name[i];
2772 :
2773 1148 : rc = databasevm_bind_text(vm, 1, col_name, -1);
2774 1148 : if (rc != DBRES_OK) goto finalize;
2775 :
2776 1186 : while (1) {
2777 1186 : rc = databasevm_step(vm);
2778 1186 : if (rc == DBRES_ROW) {
2779 38 : size_t pklen = 0;
2780 38 : const void *pk = (const char *)database_column_blob(vm, 0, &pklen);
2781 38 : if (!pk) { rc = DBRES_ERROR; break; }
2782 38 : rc = local_mark_insert_or_update_meta(table, pk, pklen, col_name, db_version, cloudsync_bumpseq(data));
2783 1186 : } else if (rc == DBRES_DONE) {
2784 1148 : rc = DBRES_OK;
2785 1148 : break;
2786 : } else {
2787 0 : break;
2788 : }
2789 : }
2790 1148 : if (rc != DBRES_OK) goto finalize;
2791 :
2792 1148 : databasevm_reset(vm);
2793 1455 : }
2794 :
2795 : finalize:
2796 307 : if (rc != DBRES_OK) {DEBUG_ALWAYS("cloudsync_refill_metatable error: %s", database_errmsg(data));}
2797 307 : if (pkclause_identifiers) cloudsync_memory_free(pkclause_identifiers);
2798 307 : if (vm) databasevm_finalize(vm);
2799 307 : return rc;
2800 307 : }
2801 :
2802 : // MARK: - Local -
2803 :
2804 4 : int local_update_sentinel (cloudsync_table_context *table, const void *pk, size_t pklen, int64_t db_version, int seq) {
2805 4 : dbvm_t *vm = table->meta_sentinel_update_stmt;
2806 4 : if (!vm) return -1;
2807 :
2808 4 : int rc = databasevm_bind_int(vm, 1, db_version);
2809 4 : if (rc != DBRES_OK) goto cleanup;
2810 :
2811 4 : rc = databasevm_bind_int(vm, 2, seq);
2812 4 : if (rc != DBRES_OK) goto cleanup;
2813 :
2814 4 : rc = databasevm_bind_blob(vm, 3, pk, (int)pklen);
2815 4 : if (rc != DBRES_OK) goto cleanup;
2816 :
2817 4 : rc = databasevm_step(vm);
2818 4 : if (rc == DBRES_DONE) rc = DBRES_OK;
2819 :
2820 : cleanup:
2821 4 : DEBUG_DBERROR(rc, "local_update_sentinel", table->context);
2822 4 : databasevm_reset(vm);
2823 4 : return rc;
2824 4 : }
2825 :
2826 127 : int local_mark_insert_sentinel_meta (cloudsync_table_context *table, const void *pk, size_t pklen, int64_t db_version, int seq) {
2827 127 : dbvm_t *vm = table->meta_sentinel_insert_stmt;
2828 127 : if (!vm) return -1;
2829 :
2830 127 : int rc = databasevm_bind_blob(vm, 1, pk, (int)pklen);
2831 127 : if (rc != DBRES_OK) goto cleanup;
2832 :
2833 127 : rc = databasevm_bind_int(vm, 2, db_version);
2834 127 : if (rc != DBRES_OK) goto cleanup;
2835 :
2836 127 : rc = databasevm_bind_int(vm, 3, seq);
2837 127 : if (rc != DBRES_OK) goto cleanup;
2838 :
2839 127 : rc = databasevm_bind_int(vm, 4, db_version);
2840 127 : if (rc != DBRES_OK) goto cleanup;
2841 :
2842 127 : rc = databasevm_bind_int(vm, 5, seq);
2843 127 : if (rc != DBRES_OK) goto cleanup;
2844 :
2845 127 : rc = databasevm_step(vm);
2846 127 : if (rc == DBRES_DONE) rc = DBRES_OK;
2847 :
2848 : cleanup:
2849 127 : DEBUG_DBERROR(rc, "local_insert_sentinel", table->context);
2850 127 : databasevm_reset(vm);
2851 127 : return rc;
2852 127 : }
2853 :
2854 11954 : int local_mark_insert_or_update_meta_impl (cloudsync_table_context *table, const void *pk, size_t pklen, const char *col_name, int col_version, int64_t db_version, int seq) {
2855 :
2856 11954 : dbvm_t *vm = table->meta_row_insert_update_stmt;
2857 11954 : if (!vm) return -1;
2858 :
2859 11954 : int rc = databasevm_bind_blob(vm, 1, pk, pklen);
2860 11954 : if (rc != DBRES_OK) goto cleanup;
2861 :
2862 11954 : rc = databasevm_bind_text(vm, 2, (col_name) ? col_name : CLOUDSYNC_TOMBSTONE_VALUE, -1);
2863 11954 : if (rc != DBRES_OK) goto cleanup;
2864 :
2865 11954 : rc = databasevm_bind_int(vm, 3, col_version);
2866 11954 : if (rc != DBRES_OK) goto cleanup;
2867 :
2868 11954 : rc = databasevm_bind_int(vm, 4, db_version);
2869 11954 : if (rc != DBRES_OK) goto cleanup;
2870 :
2871 11954 : rc = databasevm_bind_int(vm, 5, seq);
2872 11954 : if (rc != DBRES_OK) goto cleanup;
2873 :
2874 11954 : rc = databasevm_bind_int(vm, 6, db_version);
2875 11954 : if (rc != DBRES_OK) goto cleanup;
2876 :
2877 11954 : rc = databasevm_bind_int(vm, 7, seq);
2878 11954 : if (rc != DBRES_OK) goto cleanup;
2879 :
2880 11954 : rc = databasevm_step(vm);
2881 11954 : if (rc == DBRES_DONE) rc = DBRES_OK;
2882 :
2883 : cleanup:
2884 11954 : DEBUG_DBERROR(rc, "local_insert_or_update", table->context);
2885 11954 : databasevm_reset(vm);
2886 11954 : return rc;
2887 11954 : }
2888 :
2889 11847 : int local_mark_insert_or_update_meta (cloudsync_table_context *table, const void *pk, size_t pklen, const char *col_name, int64_t db_version, int seq) {
2890 11847 : return local_mark_insert_or_update_meta_impl(table, pk, pklen, col_name, 1, db_version, seq);
2891 : }
2892 :
2893 41 : int local_mark_delete_block_meta (cloudsync_table_context *table, const void *pk, size_t pklen, const char *block_colname, int64_t db_version, int seq) {
2894 : // Mark a block as deleted by setting col_version = 2 (even = deleted)
2895 41 : return local_mark_insert_or_update_meta_impl(table, pk, pklen, block_colname, 2, db_version, seq);
2896 : }
2897 :
2898 41 : int block_delete_value_external (cloudsync_context *data, cloudsync_table_context *table, const void *pk, size_t pklen, const char *block_colname) {
2899 41 : return block_delete_value(data, table, pk, (int)pklen, block_colname);
2900 : }
2901 :
2902 66 : int local_mark_delete_meta (cloudsync_table_context *table, const void *pk, size_t pklen, int64_t db_version, int seq) {
2903 66 : return local_mark_insert_or_update_meta_impl(table, pk, pklen, NULL, 2, db_version, seq);
2904 : }
2905 :
2906 36 : int local_drop_meta (cloudsync_table_context *table, const void *pk, size_t pklen) {
2907 36 : dbvm_t *vm = table->meta_row_drop_stmt;
2908 36 : if (!vm) return -1;
2909 :
2910 36 : int rc = databasevm_bind_blob(vm, 1, pk, pklen);
2911 36 : if (rc != DBRES_OK) goto cleanup;
2912 :
2913 36 : rc = databasevm_step(vm);
2914 36 : if (rc == DBRES_DONE) rc = DBRES_OK;
2915 :
2916 : cleanup:
2917 36 : DEBUG_DBERROR(rc, "local_drop_meta", table->context);
2918 36 : databasevm_reset(vm);
2919 36 : return rc;
2920 36 : }
2921 :
2922 30 : int local_update_move_meta (cloudsync_table_context *table, const void *pk, size_t pklen, const void *pk2, size_t pklen2, int64_t db_version) {
2923 : /*
2924 : * This function moves non-sentinel metadata entries from an old primary key (OLD.pk)
2925 : * to a new primary key (NEW.pk) when a primary key change occurs.
2926 : *
2927 : * To ensure consistency and proper conflict resolution in a CRDT (Conflict-free Replicated Data Type) system,
2928 : * each non-sentinel metadata entry involved in the move must have a unique sequence value (seq).
2929 : *
2930 : * The `seq` is crucial for tracking the order of operations and for detecting and resolving conflicts
2931 : * during synchronization between replicas. Without a unique `seq` for each entry, concurrent updates
2932 : * may be applied incorrectly, leading to data inconsistency.
2933 : *
2934 : * When performing the update, a unique `seq` must be assigned to each metadata row. This can be achieved
2935 : * by either incrementing the maximum sequence value in the table or using a function (e.g., cloudsync_bumpseq(data))
2936 : * that generates a unique sequence for each row. The update query should ensure that each row moved
2937 : * from OLD.pk to NEW.pk gets a distinct `seq` to maintain proper versioning and ordering of changes.
2938 : */
2939 :
2940 : // see https://github.com/sqliteai/sqlite-sync/blob/main/docs/PriKey.md for more details
2941 : // pk2 is the old pk
2942 :
2943 30 : dbvm_t *vm = table->meta_update_move_stmt;
2944 30 : if (!vm) return -1;
2945 :
2946 : // new primary key
2947 30 : int rc = databasevm_bind_blob(vm, 1, pk, pklen);
2948 30 : if (rc != DBRES_OK) goto cleanup;
2949 :
2950 : // new db_version
2951 30 : rc = databasevm_bind_int(vm, 2, db_version);
2952 30 : if (rc != DBRES_OK) goto cleanup;
2953 :
2954 : // old primary key
2955 30 : rc = databasevm_bind_blob(vm, 3, pk2, pklen2);
2956 30 : if (rc != DBRES_OK) goto cleanup;
2957 :
2958 30 : rc = databasevm_step(vm);
2959 30 : if (rc == DBRES_DONE) rc = DBRES_OK;
2960 :
2961 : cleanup:
2962 30 : DEBUG_DBERROR(rc, "local_update_move_meta", table->context);
2963 30 : databasevm_reset(vm);
2964 30 : return rc;
2965 30 : }
2966 :
2967 : // MARK: - Payload Encode / Decode -
2968 :
2969 687 : static void cloudsync_payload_checksum_store (cloudsync_payload_header *header, uint64_t checksum) {
2970 687 : uint64_t h = checksum & 0xFFFFFFFFFFFFULL; // keep 48 bits
2971 687 : header->checksum[0] = (uint8_t)(h >> 40);
2972 687 : header->checksum[1] = (uint8_t)(h >> 32);
2973 687 : header->checksum[2] = (uint8_t)(h >> 24);
2974 687 : header->checksum[3] = (uint8_t)(h >> 16);
2975 687 : header->checksum[4] = (uint8_t)(h >> 8);
2976 687 : header->checksum[5] = (uint8_t)(h >> 0);
2977 687 : }
2978 :
2979 682 : static uint64_t cloudsync_payload_checksum_load (cloudsync_payload_header *header) {
2980 2046 : return ((uint64_t)header->checksum[0] << 40) |
2981 1364 : ((uint64_t)header->checksum[1] << 32) |
2982 1364 : ((uint64_t)header->checksum[2] << 24) |
2983 1364 : ((uint64_t)header->checksum[3] << 16) |
2984 1364 : ((uint64_t)header->checksum[4] << 8) |
2985 682 : ((uint64_t)header->checksum[5] << 0);
2986 : }
2987 :
2988 682 : static bool cloudsync_payload_checksum_verify (cloudsync_payload_header *header, uint64_t checksum) {
2989 682 : uint64_t checksum1 = cloudsync_payload_checksum_load(header);
2990 682 : uint64_t checksum2 = checksum & 0xFFFFFFFFFFFFULL;
2991 682 : return (checksum1 == checksum2);
2992 : }
2993 :
2994 49172 : static bool cloudsync_payload_encode_check (cloudsync_payload_context *payload, size_t needed) {
2995 49172 : if (payload->nrows == 0) needed += sizeof(cloudsync_payload_header);
2996 :
2997 : // alloc/resize buffer
2998 49172 : if (payload->bused + needed > payload->balloc) {
2999 696 : if (needed < CLOUDSYNC_PAYLOAD_MINBUF_SIZE) needed = CLOUDSYNC_PAYLOAD_MINBUF_SIZE;
3000 696 : size_t balloc = payload->balloc + needed;
3001 :
3002 696 : char *buffer = cloudsync_memory_realloc(payload->buffer, balloc);
3003 696 : if (!buffer) {
3004 0 : if (payload->buffer) cloudsync_memory_free(payload->buffer);
3005 0 : memset(payload, 0, sizeof(cloudsync_payload_context));
3006 0 : return false;
3007 : }
3008 :
3009 696 : payload->buffer = buffer;
3010 696 : payload->balloc = balloc;
3011 696 : if (payload->nrows == 0) payload->bused = sizeof(cloudsync_payload_header);
3012 696 : }
3013 :
3014 49172 : return true;
3015 49172 : }
3016 :
3017 50554 : size_t cloudsync_payload_context_size (size_t *header_size) {
3018 50554 : if (header_size) *header_size = sizeof(cloudsync_payload_header);
3019 50554 : return sizeof(cloudsync_payload_context);
3020 : }
3021 :
3022 687 : void cloudsync_payload_header_init (cloudsync_payload_header *header, uint32_t expanded_size, uint16_t ncols, uint32_t nrows, uint64_t hash) {
3023 687 : memset(header, 0, sizeof(cloudsync_payload_header));
3024 : assert(sizeof(cloudsync_payload_header)==32);
3025 :
3026 : int major, minor, patch;
3027 687 : sscanf(CLOUDSYNC_VERSION, "%d.%d.%d", &major, &minor, &patch);
3028 :
3029 687 : header->signature = htonl(CLOUDSYNC_PAYLOAD_SIGNATURE);
3030 687 : header->version = CLOUDSYNC_PAYLOAD_VERSION_2;
3031 687 : header->libversion[0] = (uint8_t)major;
3032 687 : header->libversion[1] = (uint8_t)minor;
3033 687 : header->libversion[2] = (uint8_t)patch;
3034 687 : header->expanded_size = htonl(expanded_size);
3035 687 : header->ncols = htons(ncols);
3036 687 : header->nrows = htonl(nrows);
3037 687 : header->schema_hash = htonll(hash);
3038 687 : }
3039 :
3040 49172 : int cloudsync_payload_encode_step (cloudsync_payload_context *payload, cloudsync_context *data, int argc, dbvalue_t **argv) {
3041 : DEBUG_FUNCTION("cloudsync_payload_encode_step");
3042 : // debug_values(argc, argv);
3043 :
3044 : // check if the step function is called for the first time
3045 49172 : if (payload->nrows == 0) payload->ncols = (uint16_t)argc;
3046 :
3047 49172 : size_t breq = pk_encode_size((dbvalue_t **)argv, argc, 0, data->skip_decode_idx);
3048 49172 : if (cloudsync_payload_encode_check(payload, breq) == false) {
3049 0 : return cloudsync_set_error(data, "Not enough memory to resize payload internal buffer", DBRES_NOMEM);
3050 : }
3051 :
3052 49172 : char *buffer = payload->buffer + payload->bused;
3053 49172 : size_t bsize = payload->balloc - payload->bused;
3054 49172 : char *p = pk_encode((dbvalue_t **)argv, argc, buffer, false, &bsize, data->skip_decode_idx);
3055 49172 : if (!p) return cloudsync_set_error(data, "An error occurred while encoding payload", DBRES_ERROR);
3056 :
3057 : // update buffer
3058 49172 : payload->bused += breq;
3059 :
3060 : // increment row counter
3061 49172 : ++payload->nrows;
3062 :
3063 49172 : return DBRES_OK;
3064 49172 : }
3065 :
3066 695 : int cloudsync_payload_encode_final (cloudsync_payload_context *payload, cloudsync_context *data) {
3067 : DEBUG_FUNCTION("cloudsync_payload_encode_final");
3068 :
3069 695 : if (payload->nrows == 0) {
3070 8 : if (payload->buffer) cloudsync_memory_free(payload->buffer);
3071 8 : payload->buffer = NULL;
3072 8 : payload->bsize = 0;
3073 8 : return DBRES_OK;
3074 : }
3075 :
3076 687 : if (payload->nrows > UINT32_MAX) {
3077 0 : if (payload->buffer) cloudsync_memory_free(payload->buffer);
3078 0 : payload->buffer = NULL;
3079 0 : payload->bsize = 0;
3080 0 : cloudsync_set_error(data, "Maximum number of payload rows reached", DBRES_ERROR);
3081 0 : return DBRES_ERROR;
3082 : }
3083 :
3084 : // sanity check about buffer size
3085 687 : int header_size = (int)sizeof(cloudsync_payload_header);
3086 687 : int64_t buffer_size = (int64_t)payload->bused - (int64_t)header_size;
3087 687 : if (buffer_size < 0) {
3088 0 : if (payload->buffer) cloudsync_memory_free(payload->buffer);
3089 0 : payload->buffer = NULL;
3090 0 : payload->bsize = 0;
3091 0 : cloudsync_set_error(data, "cloudsync_encode: internal size underflow", DBRES_ERROR);
3092 0 : return DBRES_ERROR;
3093 : }
3094 687 : if (buffer_size > INT_MAX) {
3095 0 : if (payload->buffer) cloudsync_memory_free(payload->buffer);
3096 0 : payload->buffer = NULL;
3097 0 : payload->bsize = 0;
3098 0 : cloudsync_set_error(data, "cloudsync_encode: payload too large to compress (INT_MAX limit)", DBRES_ERROR);
3099 0 : return DBRES_ERROR;
3100 : }
3101 : // try to allocate buffer used for compressed data
3102 687 : int real_buffer_size = (int)buffer_size;
3103 687 : int zbound = LZ4_compressBound(real_buffer_size);
3104 687 : char *zbuffer = cloudsync_memory_alloc(zbound + header_size); // if for some reasons allocation fails then just skip compression
3105 :
3106 : // skip the reserved header from the buffer to compress
3107 687 : char *src_buffer = payload->buffer + sizeof(cloudsync_payload_header);
3108 687 : int zused = (zbuffer) ? LZ4_compress_default(src_buffer, zbuffer+header_size, real_buffer_size, zbound) : 0;
3109 687 : bool use_uncompressed_buffer = (!zused || zused > real_buffer_size);
3110 687 : CHECK_FORCE_UNCOMPRESSED_BUFFER();
3111 :
3112 : // setup payload header
3113 687 : cloudsync_payload_header header = {0};
3114 687 : uint32_t expanded_size = (use_uncompressed_buffer) ? 0 : real_buffer_size;
3115 687 : cloudsync_payload_header_init(&header, expanded_size, payload->ncols, (uint32_t)payload->nrows, data->schema_hash);
3116 :
3117 : // if compression fails or if compressed size is bigger than original buffer, then use the uncompressed buffer
3118 687 : if (use_uncompressed_buffer) {
3119 16 : if (zbuffer) cloudsync_memory_free(zbuffer);
3120 16 : zbuffer = payload->buffer;
3121 16 : zused = real_buffer_size;
3122 16 : }
3123 :
3124 : // compute checksum of the buffer
3125 687 : uint64_t checksum = pk_checksum(zbuffer + header_size, zused);
3126 687 : cloudsync_payload_checksum_store(&header, checksum);
3127 :
3128 : // copy header and data to SQLite BLOB
3129 687 : memcpy(zbuffer, &header, sizeof(cloudsync_payload_header));
3130 687 : int blob_size = zused + sizeof(cloudsync_payload_header);
3131 687 : payload->bsize = blob_size;
3132 :
3133 : // cleanup memory
3134 687 : if (zbuffer != payload->buffer) {
3135 671 : cloudsync_memory_free (payload->buffer);
3136 671 : payload->buffer = zbuffer;
3137 671 : }
3138 :
3139 687 : return DBRES_OK;
3140 695 : }
3141 :
3142 695 : char *cloudsync_payload_blob (cloudsync_payload_context *payload, int64_t *blob_size, int64_t *nrows) {
3143 : DEBUG_FUNCTION("cloudsync_payload_blob");
3144 :
3145 695 : if (blob_size) *blob_size = (int64_t)payload->bsize;
3146 695 : if (nrows) *nrows = (int64_t)payload->nrows;
3147 695 : return payload->buffer;
3148 : }
3149 :
3150 441828 : static int cloudsync_payload_decode_callback (void *xdata, int index, int type, int64_t ival, double dval, char *pval) {
3151 441828 : cloudsync_pk_decode_bind_context *decode_context = (cloudsync_pk_decode_bind_context*)xdata;
3152 441828 : int rc = pk_decode_bind_callback(decode_context->vm, index, type, ival, dval, pval);
3153 :
3154 441828 : if (rc == DBRES_OK) {
3155 : // the dbversion index is smaller than seq index, so it is processed first
3156 : // when processing the dbversion column: save the value to the tmp_dbversion field
3157 : // when processing the seq column: update the dbversion and seq fields only if the current dbversion is greater than the last max value
3158 441828 : switch (index) {
3159 : case CLOUDSYNC_PK_INDEX_TBL:
3160 49092 : if (type == DBTYPE_TEXT) {
3161 49092 : decode_context->tbl = pval;
3162 49092 : decode_context->tbl_len = ival;
3163 49092 : }
3164 49092 : break;
3165 : case CLOUDSYNC_PK_INDEX_PK:
3166 49092 : if (type == DBTYPE_BLOB) {
3167 49092 : decode_context->pk = pval;
3168 49092 : decode_context->pk_len = ival;
3169 49092 : }
3170 49092 : break;
3171 : case CLOUDSYNC_PK_INDEX_COLNAME:
3172 49092 : if (type == DBTYPE_TEXT) {
3173 49092 : decode_context->col_name = pval;
3174 49092 : decode_context->col_name_len = ival;
3175 49092 : }
3176 49092 : break;
3177 : case CLOUDSYNC_PK_INDEX_COLVERSION:
3178 49092 : if (type == DBTYPE_INTEGER) decode_context->col_version = ival;
3179 49092 : break;
3180 : case CLOUDSYNC_PK_INDEX_DBVERSION:
3181 49092 : if (type == DBTYPE_INTEGER) decode_context->db_version = ival;
3182 49092 : break;
3183 : case CLOUDSYNC_PK_INDEX_SITEID:
3184 49092 : if (type == DBTYPE_BLOB) {
3185 49092 : decode_context->site_id = pval;
3186 49092 : decode_context->site_id_len = ival;
3187 49092 : }
3188 49092 : break;
3189 : case CLOUDSYNC_PK_INDEX_CL:
3190 49092 : if (type == DBTYPE_INTEGER) decode_context->cl = ival;
3191 49092 : break;
3192 : case CLOUDSYNC_PK_INDEX_SEQ:
3193 49092 : if (type == DBTYPE_INTEGER) decode_context->seq = ival;
3194 49092 : break;
3195 : }
3196 441828 : }
3197 :
3198 441828 : return rc;
3199 : }
3200 :
3201 : // #ifndef CLOUDSYNC_OMIT_RLS_VALIDATION
3202 :
3203 684 : int cloudsync_payload_apply (cloudsync_context *data, const char *payload, int blen, int *pnrows) {
3204 : // sanity check
3205 684 : if (blen < (int)sizeof(cloudsync_payload_header)) return cloudsync_set_error(data, "Error on cloudsync_payload_apply: invalid payload length", DBRES_MISUSE);
3206 :
3207 : // decode header
3208 : cloudsync_payload_header header;
3209 684 : memcpy(&header, payload, sizeof(cloudsync_payload_header));
3210 :
3211 684 : header.signature = ntohl(header.signature);
3212 684 : header.expanded_size = ntohl(header.expanded_size);
3213 684 : header.ncols = ntohs(header.ncols);
3214 684 : header.nrows = ntohl(header.nrows);
3215 684 : header.schema_hash = ntohll(header.schema_hash);
3216 :
3217 : // compare schema_hash only if not disabled and if the received payload was created with the current header version
3218 : // to avoid schema hash mismatch when processed by a peer with a different extension version during software updates.
3219 684 : if (dbutils_settings_get_int64_value(data, CLOUDSYNC_KEY_SKIP_SCHEMA_HASH_CHECK) == 0 && header.version == CLOUDSYNC_PAYLOAD_VERSION_LATEST ) {
3220 684 : if (header.schema_hash != data->schema_hash) {
3221 4 : if (!database_check_schema_hash(data, header.schema_hash)) {
3222 : char buffer[1024];
3223 2 : snprintf(buffer, sizeof(buffer), "Cannot apply the received payload because the schema hash is unknown %llu.", header.schema_hash);
3224 2 : return cloudsync_set_error(data, buffer, DBRES_MISUSE);
3225 : }
3226 2 : }
3227 682 : }
3228 :
3229 : // sanity check header
3230 682 : if ((header.signature != CLOUDSYNC_PAYLOAD_SIGNATURE) || (header.ncols == 0)) {
3231 0 : return cloudsync_set_error(data, "Error on cloudsync_payload_apply: invalid signature or column size", DBRES_MISUSE);
3232 : }
3233 :
3234 682 : const char *buffer = payload + sizeof(cloudsync_payload_header);
3235 682 : size_t buf_len = (size_t)blen - sizeof(cloudsync_payload_header);
3236 :
3237 : // sanity check checksum (only if version is >= 2)
3238 682 : if (header.version >= CLOUDSYNC_PAYLOAD_MIN_VERSION_WITH_CHECKSUM) {
3239 682 : uint64_t checksum = pk_checksum(buffer, buf_len);
3240 682 : if (cloudsync_payload_checksum_verify(&header, checksum) == false) {
3241 1 : return cloudsync_set_error(data, "Error on cloudsync_payload_apply: invalid checksum", DBRES_MISUSE);
3242 : }
3243 681 : }
3244 :
3245 : // check if payload is compressed
3246 681 : char *clone = NULL;
3247 681 : if (header.expanded_size != 0) {
3248 665 : clone = (char *)cloudsync_memory_alloc(header.expanded_size);
3249 665 : if (!clone) return cloudsync_set_error(data, "Unable to allocate memory to uncompress payload", DBRES_NOMEM);
3250 :
3251 665 : int lz4_rc = LZ4_decompress_safe(buffer, clone, (int)buf_len, (int)header.expanded_size);
3252 665 : if (lz4_rc <= 0 || (uint32_t)lz4_rc != header.expanded_size) {
3253 0 : if (clone) cloudsync_memory_free(clone);
3254 0 : return cloudsync_set_error(data, "Error on cloudsync_payload_apply: unable to decompress BLOB", DBRES_MISUSE);
3255 : }
3256 :
3257 665 : buffer = (const char *)clone;
3258 665 : buf_len = (size_t)header.expanded_size;
3259 665 : }
3260 :
3261 : // precompile the insert statement
3262 681 : dbvm_t *vm = NULL;
3263 681 : int rc = databasevm_prepare(data, SQL_CHANGES_INSERT_ROW, &vm, 0);
3264 681 : if (rc != DBRES_OK) {
3265 0 : if (clone) cloudsync_memory_free(clone);
3266 0 : return cloudsync_set_error(data, "Error on cloudsync_payload_apply: error while compiling SQL statement", rc);
3267 : }
3268 :
3269 : // process buffer, one row at a time
3270 681 : uint16_t ncols = header.ncols;
3271 681 : uint32_t nrows = header.nrows;
3272 681 : int64_t last_payload_db_version = -1;
3273 681 : int dbversion = dbutils_settings_get_int_value(data, CLOUDSYNC_KEY_CHECK_DBVERSION);
3274 681 : int seq = dbutils_settings_get_int_value(data, CLOUDSYNC_KEY_CHECK_SEQ);
3275 681 : cloudsync_pk_decode_bind_context decoded_context = {.vm = vm};
3276 :
3277 : // Initialize deferred column-batch merge
3278 681 : merge_pending_batch batch = {0};
3279 681 : data->pending_batch = &batch;
3280 681 : bool in_savepoint = false;
3281 681 : const void *last_pk = NULL;
3282 681 : int64_t last_pk_len = 0;
3283 681 : const char *last_tbl = NULL;
3284 681 : int64_t last_tbl_len = 0;
3285 :
3286 49773 : for (uint32_t i=0; i<nrows; ++i) {
3287 49092 : size_t seek = 0;
3288 49092 : int res = pk_decode((char *)buffer, buf_len, ncols, &seek, data->skip_decode_idx, cloudsync_payload_decode_callback, &decoded_context);
3289 49092 : if (res == -1) {
3290 0 : merge_flush_pending(data);
3291 0 : data->pending_batch = NULL;
3292 0 : if (batch.cached_vm) { databasevm_finalize(batch.cached_vm); batch.cached_vm = NULL; }
3293 0 : if (batch.cached_col_names) { cloudsync_memory_free(batch.cached_col_names); batch.cached_col_names = NULL; }
3294 0 : if (batch.entries) { cloudsync_memory_free(batch.entries); batch.entries = NULL; }
3295 0 : if (in_savepoint) database_rollback_savepoint(data, "cloudsync_payload_apply");
3296 0 : rc = DBRES_ERROR;
3297 0 : goto cleanup;
3298 : }
3299 :
3300 : // Detect PK/table/db_version boundary to flush pending batch
3301 97503 : bool pk_changed = (last_pk != NULL &&
3302 48411 : (last_pk_len != decoded_context.pk_len ||
3303 46485 : memcmp(last_pk, decoded_context.pk, last_pk_len) != 0));
3304 97503 : bool tbl_changed = (last_tbl != NULL &&
3305 48411 : (last_tbl_len != decoded_context.tbl_len ||
3306 48339 : memcmp(last_tbl, decoded_context.tbl, last_tbl_len) != 0));
3307 49092 : bool db_version_changed = (last_payload_db_version != decoded_context.db_version);
3308 :
3309 : // Flush pending batch before any boundary change
3310 49092 : if (pk_changed || tbl_changed || db_version_changed) {
3311 18223 : int flush_rc = merge_flush_pending(data);
3312 18223 : if (flush_rc != DBRES_OK) {
3313 1 : rc = flush_rc;
3314 : // continue processing remaining rows
3315 1 : }
3316 18223 : }
3317 :
3318 : // Per-db_version savepoints group rows with the same source db_version
3319 : // into one transaction. In SQLite autocommit mode, the RELEASE triggers
3320 : // the commit hook which bumps data->db_version and resets seq, ensuring
3321 : // unique (db_version, seq) tuples across groups. In PostgreSQL SPI,
3322 : // database_in_transaction() is always true so this block is inactive —
3323 : // the inner per-PK savepoint in merge_flush_pending handles RLS instead.
3324 49092 : if (in_savepoint && db_version_changed) {
3325 4287 : rc = database_commit_savepoint(data, "cloudsync_payload_apply");
3326 4287 : if (rc != DBRES_OK) {
3327 0 : merge_pending_free_entries(&batch);
3328 0 : data->pending_batch = NULL;
3329 0 : cloudsync_set_error(data, "Error on cloudsync_payload_apply: unable to release a savepoint", rc);
3330 0 : goto cleanup;
3331 : }
3332 4287 : in_savepoint = false;
3333 4287 : }
3334 :
3335 49092 : if (!in_savepoint && db_version_changed && !database_in_transaction(data)) {
3336 4968 : rc = database_begin_savepoint(data, "cloudsync_payload_apply");
3337 4968 : if (rc != DBRES_OK) {
3338 0 : merge_pending_free_entries(&batch);
3339 0 : data->pending_batch = NULL;
3340 0 : cloudsync_set_error(data, "Error on cloudsync_payload_apply: unable to start a transaction", rc);
3341 0 : goto cleanup;
3342 : }
3343 4968 : in_savepoint = true;
3344 4968 : }
3345 :
3346 : // Track db_version for batch-flush boundary detection
3347 49092 : if (db_version_changed) {
3348 4968 : last_payload_db_version = decoded_context.db_version;
3349 4968 : }
3350 :
3351 : // Update PK/table tracking
3352 49092 : last_pk = decoded_context.pk;
3353 49092 : last_pk_len = decoded_context.pk_len;
3354 49092 : last_tbl = decoded_context.tbl;
3355 49092 : last_tbl_len = decoded_context.tbl_len;
3356 :
3357 49092 : rc = databasevm_step(vm);
3358 49092 : if (rc != DBRES_DONE) {
3359 : // don't "break;", the error can be due to a RLS policy.
3360 : // in case of error we try to apply the following changes
3361 2 : }
3362 :
3363 49092 : buffer += seek;
3364 49092 : buf_len -= seek;
3365 49092 : dbvm_reset(vm);
3366 49092 : }
3367 :
3368 : // Final flush after loop
3369 : {
3370 681 : int flush_rc = merge_flush_pending(data);
3371 681 : if (flush_rc != DBRES_OK && rc == DBRES_OK) rc = flush_rc;
3372 : }
3373 681 : data->pending_batch = NULL;
3374 :
3375 681 : if (in_savepoint) {
3376 681 : int rc1 = database_commit_savepoint(data, "cloudsync_payload_apply");
3377 681 : if (rc1 != DBRES_OK) rc = rc1;
3378 681 : }
3379 :
3380 : // save last error (unused if function returns OK)
3381 681 : if (rc != DBRES_OK && rc != DBRES_DONE) {
3382 1 : cloudsync_set_dberror(data);
3383 1 : }
3384 :
3385 681 : if (rc == DBRES_DONE) rc = DBRES_OK;
3386 1361 : if (rc == DBRES_OK) {
3387 : char buf[256];
3388 680 : if (decoded_context.db_version >= dbversion) {
3389 546 : snprintf(buf, sizeof(buf), "%" PRId64, decoded_context.db_version);
3390 546 : dbutils_settings_set_key_value(data, CLOUDSYNC_KEY_CHECK_DBVERSION, buf);
3391 :
3392 546 : if (decoded_context.seq != seq) {
3393 338 : snprintf(buf, sizeof(buf), "%" PRId64, decoded_context.seq);
3394 338 : dbutils_settings_set_key_value(data, CLOUDSYNC_KEY_CHECK_SEQ, buf);
3395 338 : }
3396 546 : }
3397 680 : }
3398 :
3399 : cleanup:
3400 : // cleanup merge_pending_batch
3401 681 : if (batch.cached_vm) { databasevm_finalize(batch.cached_vm); batch.cached_vm = NULL; }
3402 681 : if (batch.cached_col_names) { cloudsync_memory_free(batch.cached_col_names); batch.cached_col_names = NULL; }
3403 681 : if (batch.entries) { cloudsync_memory_free(batch.entries); batch.entries = NULL; }
3404 :
3405 : // cleanup vm
3406 681 : if (vm) databasevm_finalize(vm);
3407 :
3408 : // cleanup memory
3409 681 : if (clone) cloudsync_memory_free(clone);
3410 :
3411 : // error already saved in (save last error)
3412 681 : if (rc != DBRES_OK) return rc;
3413 :
3414 : // return the number of processed rows
3415 680 : if (pnrows) *pnrows = nrows;
3416 680 : return DBRES_OK;
3417 684 : }
3418 :
3419 : // MARK: - Payload load/store -
3420 :
3421 0 : int cloudsync_payload_get (cloudsync_context *data, char **blob, int *blob_size, int *db_version, int64_t *new_db_version) {
3422 : // retrieve current db_version and seq
3423 0 : *db_version = dbutils_settings_get_int_value(data, CLOUDSYNC_KEY_SEND_DBVERSION);
3424 0 : if (*db_version < 0) return DBRES_ERROR;
3425 :
3426 : // retrieve BLOB
3427 : char sql[1024];
3428 0 : snprintf(sql, sizeof(sql), "WITH max_db_version AS (SELECT MAX(db_version) AS max_db_version FROM cloudsync_changes WHERE site_id=cloudsync_siteid()) "
3429 : "SELECT * FROM (SELECT cloudsync_payload_encode(tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq) AS payload, max_db_version AS max_db_version FROM cloudsync_changes, max_db_version WHERE site_id=cloudsync_siteid() AND db_version>%d) WHERE payload IS NOT NULL", *db_version);
3430 :
3431 0 : int64_t len = 0;
3432 0 : int rc = database_select_blob_int(data, sql, blob, &len, new_db_version);
3433 0 : *blob_size = (int)len;
3434 0 : if (rc != DBRES_OK) return rc;
3435 :
3436 : // exit if there is no data to send
3437 0 : if (*blob == NULL || *blob_size == 0) return DBRES_OK;
3438 0 : return rc;
3439 0 : }
3440 :
3441 : #ifdef CLOUDSYNC_DESKTOP_OS
3442 0 : int cloudsync_payload_save (cloudsync_context *data, const char *payload_path, int *size) {
3443 : DEBUG_FUNCTION("cloudsync_payload_save");
3444 :
3445 : // silently delete any other payload with the same name
3446 0 : cloudsync_file_delete(payload_path);
3447 :
3448 : // retrieve payload
3449 0 : char *blob = NULL;
3450 0 : int blob_size = 0, db_version = 0;
3451 0 : int64_t new_db_version = 0;
3452 0 : int rc = cloudsync_payload_get(data, &blob, &blob_size, &db_version, &new_db_version);
3453 0 : if (rc != DBRES_OK) {
3454 0 : if (db_version < 0) return cloudsync_set_error(data, "Unable to retrieve db_version", rc);
3455 0 : return cloudsync_set_error(data, "Unable to retrieve changes in cloudsync_payload_save", rc);
3456 : }
3457 :
3458 : // exit if there is no data to save
3459 0 : if (blob == NULL || blob_size == 0) {
3460 0 : if (size) *size = 0;
3461 0 : return DBRES_OK;
3462 : }
3463 :
3464 : // write payload to file
3465 0 : bool res = cloudsync_file_write(payload_path, blob, (size_t)blob_size);
3466 0 : cloudsync_memory_free(blob);
3467 0 : if (res == false) {
3468 0 : return cloudsync_set_error(data, "Unable to write payload to file path", DBRES_IOERR);
3469 : }
3470 :
3471 : // returns blob size
3472 0 : if (size) *size = blob_size;
3473 0 : return DBRES_OK;
3474 0 : }
3475 : #endif
3476 :
3477 : // MARK: - Core -
3478 :
3479 293 : int cloudsync_table_sanity_check (cloudsync_context *data, const char *name, CLOUDSYNC_INIT_FLAG init_flags) {
3480 : DEBUG_DBFUNCTION("cloudsync_table_sanity_check %s", name);
3481 : char buffer[2048];
3482 :
3483 : // sanity check table name
3484 293 : if (name == NULL) {
3485 1 : return cloudsync_set_error(data, "cloudsync_init requires a non-null table parameter", DBRES_ERROR);
3486 : }
3487 :
3488 : // avoid allocating heap memory for SQL statements by setting a maximum length of 512 characters
3489 : // for table names. This limit is reasonable and helps prevent memory management issues.
3490 292 : const size_t maxlen = CLOUDSYNC_MAX_TABLENAME_LEN;
3491 292 : if (strlen(name) > maxlen) {
3492 1 : snprintf(buffer, sizeof(buffer), "Table name cannot be longer than %d characters", (int)maxlen);
3493 1 : return cloudsync_set_error(data, buffer, DBRES_ERROR);
3494 : }
3495 :
3496 : // check if already initialized
3497 291 : cloudsync_table_context *table = table_lookup(data, name);
3498 291 : if (table) return DBRES_OK;
3499 :
3500 : // check if table exists
3501 287 : if (database_table_exists(data, name, cloudsync_schema(data)) == false) {
3502 2 : snprintf(buffer, sizeof(buffer), "Table %s does not exist", name);
3503 2 : return cloudsync_set_error(data, buffer, DBRES_ERROR);
3504 : }
3505 :
3506 : // no more than 128 columns can be used as a composite primary key (SQLite hard limit)
3507 285 : int npri_keys = database_count_pk(data, name, false, cloudsync_schema(data));
3508 285 : if (npri_keys < 0) return cloudsync_set_dberror(data);
3509 285 : if (npri_keys > 128) return cloudsync_set_error(data, "No more than 128 columns can be used to form a composite primary key", DBRES_ERROR);
3510 :
3511 : #if CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
3512 : // if count == 0 means that rowid will be used as primary key (BTW: very bad choice for the user)
3513 285 : if (npri_keys == 0) {
3514 1 : snprintf(buffer, sizeof(buffer), "Rowid only tables are not supported, all primary keys must be explicitly set and declared as NOT NULL (table %s)", name);
3515 1 : return cloudsync_set_error(data, buffer, DBRES_ERROR);
3516 : }
3517 : #endif
3518 :
3519 284 : bool skip_int_pk_check = (init_flags & CLOUDSYNC_INIT_FLAG_SKIP_INT_PK_CHECK) != 0;
3520 284 : if (!skip_int_pk_check) {
3521 221 : if (npri_keys == 1) {
3522 : // the affinity of a column is determined by the declared type of the column,
3523 : // according to the following rules in the order shown:
3524 : // 1. If the declared type contains the string "INT" then it is assigned INTEGER affinity.
3525 133 : int npri_keys_int = database_count_int_pk(data, name, cloudsync_schema(data));
3526 133 : if (npri_keys_int < 0) return cloudsync_set_dberror(data);
3527 133 : if (npri_keys == npri_keys_int) {
3528 1 : snprintf(buffer, sizeof(buffer), "Table %s uses a single-column INTEGER primary key. For CRDT replication, primary keys must be globally unique. Consider using a TEXT primary key with UUIDs or ULID to avoid conflicts across nodes. If you understand the risk and still want to use this INTEGER primary key, set the third argument of the cloudsync_init function to 1 to skip this check.", name);
3529 1 : return cloudsync_set_error(data, buffer, DBRES_ERROR);
3530 : }
3531 :
3532 132 : }
3533 220 : }
3534 :
3535 : // if user declared explicit primary key(s) then make sure they are all declared as NOT NULL
3536 : #if CLOUDSYNC_CHECK_NOTNULL_PRIKEYS
3537 : bool skip_notnull_prikeys_check = (init_flags & CLOUDSYNC_INIT_FLAG_SKIP_NOT_NULL_PRIKEYS_CHECK) != 0;
3538 : if (!skip_notnull_prikeys_check) {
3539 : if (npri_keys > 0) {
3540 : int npri_keys_notnull = database_count_pk(data, name, true, cloudsync_schema(data));
3541 : if (npri_keys_notnull < 0) return cloudsync_set_dberror(data);
3542 : if (npri_keys != npri_keys_notnull) {
3543 : snprintf(buffer, sizeof(buffer), "All primary keys must be explicitly declared as NOT NULL (table %s)", name);
3544 : return cloudsync_set_error(data, buffer, DBRES_ERROR);
3545 : }
3546 : }
3547 : }
3548 : #endif
3549 :
3550 : // check for columns declared as NOT NULL without a DEFAULT value.
3551 : // Otherwise, col_merge_stmt would fail if changes to other columns are inserted first.
3552 283 : bool skip_notnull_default_check = (init_flags & CLOUDSYNC_INIT_FLAG_SKIP_NOT_NULL_DEFAULT_CHECK) != 0;
3553 283 : if (!skip_notnull_default_check) {
3554 283 : int n_notnull_nodefault = database_count_notnull_without_default(data, name, cloudsync_schema(data));
3555 283 : if (n_notnull_nodefault < 0) return cloudsync_set_dberror(data);
3556 283 : if (n_notnull_nodefault > 0) {
3557 0 : snprintf(buffer, sizeof(buffer), "All non-primary key columns declared as NOT NULL must have a DEFAULT value. (table %s)", name);
3558 0 : return cloudsync_set_error(data, buffer, DBRES_ERROR);
3559 : }
3560 283 : }
3561 :
3562 283 : return DBRES_OK;
3563 293 : }
3564 :
3565 4 : int cloudsync_cleanup_internal (cloudsync_context *data, cloudsync_table_context *table) {
3566 4 : if (cloudsync_context_init(data) == NULL) return DBRES_MISUSE;
3567 :
3568 : // drop meta-table
3569 4 : const char *table_name = table->name;
3570 4 : char *sql = cloudsync_memory_mprintf(SQL_DROP_CLOUDSYNC_TABLE, table->meta_ref);
3571 4 : int rc = database_exec(data, sql);
3572 4 : cloudsync_memory_free(sql);
3573 4 : if (rc != DBRES_OK) {
3574 : char buffer[1024];
3575 0 : snprintf(buffer, sizeof(buffer), "Unable to drop cloudsync table %s_cloudsync in cloudsync_cleanup", table_name);
3576 0 : return cloudsync_set_error(data, buffer, rc);
3577 : }
3578 :
3579 : // drop blocks table if this table has block LWW columns
3580 4 : if (table->blocks_ref) {
3581 1 : sql = cloudsync_memory_mprintf(SQL_DROP_CLOUDSYNC_TABLE, table->blocks_ref);
3582 1 : rc = database_exec(data, sql);
3583 1 : cloudsync_memory_free(sql);
3584 1 : if (rc != DBRES_OK) {
3585 : char buffer[1024];
3586 0 : snprintf(buffer, sizeof(buffer), "Unable to drop blocks table %s_cloudsync_blocks in cloudsync_cleanup", table_name);
3587 0 : return cloudsync_set_error(data, buffer, rc);
3588 : }
3589 1 : }
3590 :
3591 : // drop original triggers
3592 4 : rc = database_delete_triggers(data, table_name);
3593 4 : if (rc != DBRES_OK) {
3594 : char buffer[1024];
3595 0 : snprintf(buffer, sizeof(buffer), "Unable to delete triggers for table %s", table_name);
3596 0 : return cloudsync_set_error(data, buffer, rc);
3597 : }
3598 :
3599 : // remove all table related settings
3600 4 : dbutils_table_settings_set_key_value(data, table_name, NULL, NULL, NULL);
3601 4 : return DBRES_OK;
3602 4 : }
3603 :
3604 4 : int cloudsync_cleanup (cloudsync_context *data, const char *table_name) {
3605 4 : cloudsync_table_context *table = table_lookup(data, table_name);
3606 4 : if (!table) return DBRES_OK;
3607 :
3608 : // TODO: check what happen if cloudsync_cleanup_internal failes (not eveything dropped) and the table is still in memory?
3609 :
3610 4 : int rc = cloudsync_cleanup_internal(data, table);
3611 4 : if (rc != DBRES_OK) return rc;
3612 :
3613 4 : int counter = table_remove(data, table);
3614 4 : table_free(table);
3615 :
3616 4 : if (counter == 0) {
3617 : // cleanup database on last table
3618 2 : cloudsync_reset_siteid(data);
3619 2 : dbutils_settings_cleanup(data);
3620 2 : } else {
3621 2 : if (database_internal_table_exists(data, CLOUDSYNC_TABLE_SETTINGS_NAME) == true) {
3622 2 : cloudsync_update_schema_hash(data);
3623 2 : }
3624 : }
3625 :
3626 4 : return DBRES_OK;
3627 4 : }
3628 :
3629 0 : int cloudsync_cleanup_all (cloudsync_context *data) {
3630 0 : return database_cleanup(data);
3631 : }
3632 :
3633 482 : int cloudsync_terminate (cloudsync_context *data) {
3634 : // can't use for/loop here because data->tables_count is changed by table_remove
3635 737 : while (data->tables_count > 0) {
3636 255 : cloudsync_table_context *t = data->tables[data->tables_count - 1];
3637 255 : table_remove(data, t);
3638 255 : table_free(t);
3639 : }
3640 :
3641 482 : if (data->schema_version_stmt) databasevm_finalize(data->schema_version_stmt);
3642 482 : if (data->data_version_stmt) databasevm_finalize(data->data_version_stmt);
3643 482 : if (data->db_version_stmt) databasevm_finalize(data->db_version_stmt);
3644 482 : if (data->getset_siteid_stmt) databasevm_finalize(data->getset_siteid_stmt);
3645 482 : if (data->current_schema) cloudsync_memory_free(data->current_schema);
3646 :
3647 482 : data->schema_version_stmt = NULL;
3648 482 : data->data_version_stmt = NULL;
3649 482 : data->db_version_stmt = NULL;
3650 482 : data->getset_siteid_stmt = NULL;
3651 482 : data->current_schema = NULL;
3652 :
3653 : // reset the site_id so the cloudsync_context_init will be executed again
3654 : // if any other cloudsync function is called after terminate
3655 482 : data->site_id[0] = 0;
3656 :
3657 482 : return 1;
3658 : }
3659 :
3660 288 : int cloudsync_init_table (cloudsync_context *data, const char *table_name, const char *algo_name, CLOUDSYNC_INIT_FLAG init_flags) {
3661 : // sanity check table and its primary key(s)
3662 288 : int rc = cloudsync_table_sanity_check(data, table_name, init_flags);
3663 288 : if (rc != DBRES_OK) return rc;
3664 :
3665 : // init cloudsync_settings
3666 286 : if (cloudsync_context_init(data) == NULL) {
3667 0 : return cloudsync_set_error(data, "Unable to initialize cloudsync context", DBRES_MISUSE);
3668 : }
3669 :
3670 : // sanity check algo name (if exists)
3671 286 : table_algo algo_new = table_algo_none;
3672 286 : if (!algo_name) algo_name = CLOUDSYNC_DEFAULT_ALGO;
3673 :
3674 286 : algo_new = cloudsync_algo_from_name(algo_name);
3675 286 : if (algo_new == table_algo_none) {
3676 : char buffer[1024];
3677 1 : snprintf(buffer, sizeof(buffer), "Unknown CRDT algorithm name %s", algo_name);
3678 1 : return cloudsync_set_error(data, buffer, DBRES_ERROR);
3679 : }
3680 :
3681 : // DWS and AWS algorithms are not yet implemented in the merge logic
3682 285 : if (algo_new == table_algo_crdt_dws || algo_new == table_algo_crdt_aws) {
3683 : char buffer[1024];
3684 2 : snprintf(buffer, sizeof(buffer), "CRDT algorithm %s is not yet supported", algo_name);
3685 2 : return cloudsync_set_error(data, buffer, DBRES_ERROR);
3686 : }
3687 :
3688 : // check if table name was already augmented
3689 283 : table_algo algo_current = dbutils_table_settings_get_algo(data, table_name);
3690 :
3691 : // sanity check algorithm
3692 283 : if ((algo_new == algo_current) && (algo_current != table_algo_none)) {
3693 : // if table algorithms and the same and not none, do nothing
3694 283 : } else if ((algo_new == table_algo_none) && (algo_current == table_algo_none)) {
3695 : // nothing is written into settings because the default table_algo_crdt_cls will be used
3696 0 : algo_new = algo_current = table_algo_crdt_cls;
3697 255 : } else if ((algo_new == table_algo_none) && (algo_current != table_algo_none)) {
3698 : // algo is already written into settins so just use it
3699 0 : algo_new = algo_current;
3700 255 : } else if ((algo_new != table_algo_none) && (algo_current == table_algo_none)) {
3701 : // write table algo name in settings
3702 255 : dbutils_table_settings_set_key_value(data, table_name, "*", "algo", algo_name);
3703 255 : } else {
3704 : // error condition
3705 0 : return cloudsync_set_error(data, "The function cloudsync_cleanup(table) must be called before changing a table algorithm", DBRES_MISUSE);
3706 : }
3707 :
3708 : // Run the following function even if table was already augmented.
3709 : // It is safe to call the following function multiple times, if there is nothing to update nothing will be changed.
3710 : // After an alter table, in contrast, all the cloudsync triggers, tables and stmts must be recreated.
3711 :
3712 : // sync algo with table (unused in this version)
3713 : // cloudsync_sync_table_key(data, table_name, "*", CLOUDSYNC_KEY_ALGO, crdt_algo_name(algo_new));
3714 :
3715 : // read row-level filter from settings (if any)
3716 : char init_filter_buf[2048];
3717 283 : int init_frc = dbutils_table_settings_get_value(data, table_name, "*", "filter", init_filter_buf, sizeof(init_filter_buf));
3718 283 : const char *init_filter = (init_frc == DBRES_OK && init_filter_buf[0]) ? init_filter_buf : NULL;
3719 :
3720 : // check triggers
3721 283 : rc = database_create_triggers(data, table_name, algo_new, init_filter);
3722 283 : if (rc != DBRES_OK) return cloudsync_set_error(data, "An error occurred while creating triggers", DBRES_MISUSE);
3723 :
3724 : // check meta-table
3725 283 : rc = database_create_metatable(data, table_name);
3726 283 : if (rc != DBRES_OK) return cloudsync_set_error(data, "An error occurred while creating metatable", DBRES_MISUSE);
3727 :
3728 : // add prepared statements
3729 283 : if (cloudsync_add_dbvms(data) != DBRES_OK) {
3730 0 : return cloudsync_set_error(data, "An error occurred while trying to compile prepared SQL statements", DBRES_MISUSE);
3731 : }
3732 :
3733 : // add table to in-memory data context
3734 283 : if (table_add_to_context(data, algo_new, table_name) == false) {
3735 : char buffer[1024];
3736 0 : snprintf(buffer, sizeof(buffer), "An error occurred while adding %s table information to global context", table_name);
3737 0 : return cloudsync_set_error(data, buffer, DBRES_MISUSE);
3738 : }
3739 :
3740 283 : if (cloudsync_refill_metatable(data, table_name) != DBRES_OK) {
3741 0 : return cloudsync_set_error(data, "An error occurred while trying to fill the augmented table", DBRES_MISUSE);
3742 : }
3743 :
3744 283 : return DBRES_OK;
3745 288 : }
|