Line data Source code
1 : //
2 : // cloudsync.c
3 : // cloudsync
4 : //
5 : // Created by Marco Bambini on 16/05/24.
6 : //
7 :
8 : #include <inttypes.h>
9 : #include <stdbool.h>
10 : #include <limits.h>
11 : #include <stdint.h>
12 : #include <stdlib.h>
13 : #include <string.h>
14 : #include <assert.h>
15 : #include <stdio.h>
16 : #include <errno.h>
17 : #include <math.h>
18 :
19 : #include "cloudsync.h"
20 : #include "lz4.h"
21 : #include "pk.h"
22 : #include "sql.h"
23 : #include "utils.h"
24 : #include "dbutils.h"
25 : #include "block.h"
26 :
27 : #ifdef _WIN32
28 : #include <winsock2.h>
29 : #include <ws2tcpip.h>
30 : #else
31 : #include <arpa/inet.h> // for htonl, htons, ntohl, ntohs
32 : #include <netinet/in.h> // for struct sockaddr_in, INADDR_ANY, etc. (if needed)
33 : #endif
34 :
35 : #ifndef htonll
36 : #if __BIG_ENDIAN__
37 : #define htonll(x) (x)
38 : #define ntohll(x) (x)
39 : #else
40 : #ifndef htobe64
41 : #define htonll(x) ((uint64_t)htonl((x) & 0xFFFFFFFF) << 32 | (uint64_t)htonl((x) >> 32))
42 : #define ntohll(x) ((uint64_t)ntohl((x) & 0xFFFFFFFF) << 32 | (uint64_t)ntohl((x) >> 32))
43 : #else
44 : #define htonll(x) htobe64(x)
45 : #define ntohll(x) be64toh(x)
46 : #endif
47 : #endif
48 : #endif
49 :
50 : #define CLOUDSYNC_INIT_NTABLES 64
51 : #define CLOUDSYNC_MIN_DB_VERSION 0
52 :
53 : #define CLOUDSYNC_PAYLOAD_MINBUF_SIZE (512*1024)
54 : #define CLOUDSYNC_PAYLOAD_SIGNATURE 0x434C5359 /* 'C','L','S','Y' */
55 : #define CLOUDSYNC_PAYLOAD_VERSION_ORIGNAL 1
56 : #define CLOUDSYNC_PAYLOAD_VERSION_1 CLOUDSYNC_PAYLOAD_VERSION_ORIGNAL
57 : #define CLOUDSYNC_PAYLOAD_VERSION_2 2
58 : #define CLOUDSYNC_PAYLOAD_VERSION_LATEST CLOUDSYNC_PAYLOAD_VERSION_2
59 : #define CLOUDSYNC_PAYLOAD_MIN_VERSION_WITH_CHECKSUM CLOUDSYNC_PAYLOAD_VERSION_2
60 :
61 : #ifndef MAX
62 : #define MAX(a, b) (((a)>(b))?(a):(b))
63 : #endif
64 :
65 : #define DEBUG_DBERROR(_rc, _fn, _data) do {if (_rc != DBRES_OK) printf("Error in %s: %s\n", _fn, database_errmsg(_data));} while (0)
66 :
67 : typedef enum {
68 : CLOUDSYNC_PK_INDEX_TBL = 0,
69 : CLOUDSYNC_PK_INDEX_PK = 1,
70 : CLOUDSYNC_PK_INDEX_COLNAME = 2,
71 : CLOUDSYNC_PK_INDEX_COLVALUE = 3,
72 : CLOUDSYNC_PK_INDEX_COLVERSION = 4,
73 : CLOUDSYNC_PK_INDEX_DBVERSION = 5,
74 : CLOUDSYNC_PK_INDEX_SITEID = 6,
75 : CLOUDSYNC_PK_INDEX_CL = 7,
76 : CLOUDSYNC_PK_INDEX_SEQ = 8
77 : } CLOUDSYNC_PK_INDEX;
78 :
79 : typedef enum {
80 : DBVM_VALUE_ERROR = -1,
81 : DBVM_VALUE_UNCHANGED = 0,
82 : DBVM_VALUE_CHANGED = 1,
83 : } DBVM_VALUE;
84 :
85 : #define SYNCBIT_SET(_data) _data->insync = 1
86 : #define SYNCBIT_RESET(_data) _data->insync = 0
87 :
88 : // MARK: - Deferred column-batch merge -
89 :
90 : typedef struct {
91 : const char *col_name; // pointer into table_context->col_name[idx] (stable)
92 : dbvalue_t *col_value; // duplicated via database_value_dup (owned)
93 : int64_t col_version;
94 : int64_t db_version;
95 : uint8_t site_id[UUID_LEN];
96 : int site_id_len;
97 : int64_t seq;
98 : } merge_pending_entry;
99 :
100 : typedef struct {
101 : cloudsync_table_context *table;
102 : char *pk; // malloc'd copy, freed on flush
103 : int pk_len;
104 : int64_t cl;
105 : bool sentinel_pending;
106 : bool row_exists; // true when the PK already exists locally
107 : int count;
108 : int capacity;
109 : merge_pending_entry *entries;
110 :
111 : // Statement cache — reuse the prepared statement when the column
112 : // combination and row_exists flag match between consecutive PK flushes.
113 : dbvm_t *cached_vm;
114 : bool cached_row_exists;
115 : int cached_col_count;
116 : const char **cached_col_names; // array of pointers into table_context (not owned)
117 : } merge_pending_batch;
118 :
119 : // MARK: -
120 :
121 : struct cloudsync_pk_decode_bind_context {
122 : dbvm_t *vm;
123 : char *tbl;
124 : int64_t tbl_len;
125 : const void *pk;
126 : int64_t pk_len;
127 : char *col_name;
128 : int64_t col_name_len;
129 : int64_t col_version;
130 : int64_t db_version;
131 : const void *site_id;
132 : int64_t site_id_len;
133 : int64_t cl;
134 : int64_t seq;
135 : };
136 :
137 : struct cloudsync_context {
138 : void *db;
139 : char errmsg[1024];
140 : int errcode;
141 :
142 : char *libversion;
143 : uint8_t site_id[UUID_LEN];
144 : int insync;
145 : int debug;
146 : bool merge_equal_values;
147 : void *aux_data;
148 :
149 : // stmts and context values
150 : dbvm_t *schema_version_stmt;
151 : dbvm_t *data_version_stmt;
152 : dbvm_t *db_version_stmt;
153 : dbvm_t *getset_siteid_stmt;
154 : int data_version;
155 : int schema_version;
156 : uint64_t schema_hash;
157 :
158 : // set at transaction start and reset on commit/rollback
159 : int64_t db_version;
160 : // version the DB would have if the transaction committed now
161 : int64_t pending_db_version;
162 : // used to set an order inside each transaction
163 : int seq;
164 :
165 : // optional schema_name to be set in the cloudsync_table_context
166 : char *current_schema;
167 :
168 : // augmented tables are stored in-memory so we do not need to retrieve information about
169 : // col_names and cid from the disk each time a write statement is performed
170 : // we do also not need to use an hash map here because for few tables the direct
171 : // in-memory comparison with table name is faster
172 : cloudsync_table_context **tables; // dense vector: [0..tables_count-1] are valid
173 : int tables_count; // size
174 : int tables_cap; // capacity
175 :
176 : int skip_decode_idx; // -1 in sqlite, col_value index in postgresql
177 :
178 : // deferred column-batch merge (active during payload_apply)
179 : merge_pending_batch *pending_batch;
180 : };
181 :
182 : struct cloudsync_table_context {
183 : table_algo algo; // CRDT algoritm associated to the table
184 : char *name; // table name
185 : char *schema; // table schema
186 : char *meta_ref; // schema-qualified meta table name (e.g. "schema"."name_cloudsync")
187 : char *base_ref; // schema-qualified base table name (e.g. "schema"."name")
188 : char **col_name; // array of column names
189 : dbvm_t **col_merge_stmt; // array of merge insert stmt (indexed by col_name)
190 : dbvm_t **col_value_stmt; // array of column value stmt (indexed by col_name)
191 : int *col_id; // array of column id
192 : col_algo_t *col_algo; // per-column algorithm (normal or block)
193 : char **col_delimiter; // per-column delimiter for block splitting (NULL = default "\n")
194 : bool has_block_cols; // quick check: does this table have any block columns?
195 : dbvm_t *block_value_read_stmt; // SELECT col_value FROM blocks table
196 : dbvm_t *block_value_write_stmt; // INSERT OR REPLACE into blocks table
197 : dbvm_t *block_value_delete_stmt; // DELETE from blocks table
198 : dbvm_t *block_list_stmt; // SELECT block entries for materialization
199 : char *blocks_ref; // schema-qualified blocks table name
200 : int ncols; // number of non primary key cols
201 : int npks; // number of primary key cols
202 : bool enabled; // flag to check if a table is enabled or disabled
203 : #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
204 : bool rowid_only; // a table with no primary keys other than the implicit rowid
205 : #endif
206 :
207 : char **pk_name; // array of primary key names
208 :
209 : // precompiled statements
210 : dbvm_t *meta_pkexists_stmt; // check if a primary key already exist in the augmented table
211 : dbvm_t *meta_sentinel_update_stmt; // update a local sentinel row
212 : dbvm_t *meta_sentinel_insert_stmt; // insert a local sentinel row
213 : dbvm_t *meta_row_insert_update_stmt; // insert/update a local row
214 : dbvm_t *meta_row_drop_stmt; // delete rows from meta
215 : dbvm_t *meta_update_move_stmt; // update rows in meta when pk changes
216 : dbvm_t *meta_local_cl_stmt; // compute local cl value
217 : dbvm_t *meta_winner_clock_stmt; // get the rowid of the last inserted/updated row in the meta table
218 : dbvm_t *meta_merge_delete_drop;
219 : dbvm_t *meta_zero_clock_stmt;
220 : dbvm_t *meta_col_version_stmt;
221 : dbvm_t *meta_site_id_stmt;
222 :
223 : dbvm_t *real_col_values_stmt; // retrieve all column values based on pk
224 : dbvm_t *real_merge_delete_stmt;
225 : dbvm_t *real_merge_sentinel_stmt;
226 :
227 : bool is_altering; // flag to track if a table alteration is in progress
228 :
229 : // context
230 : cloudsync_context *context;
231 : };
232 :
233 : struct cloudsync_payload_context {
234 : char *buffer;
235 : size_t bsize;
236 : size_t balloc;
237 : size_t bused;
238 : uint64_t nrows;
239 : uint16_t ncols;
240 : };
241 :
242 : #ifdef _MSC_VER
243 : #pragma pack(push, 1) // For MSVC: pack struct with 1-byte alignment
244 : #define PACKED
245 : #else
246 : #define PACKED __attribute__((__packed__))
247 : #endif
248 :
249 : typedef struct PACKED {
250 : uint32_t signature; // 'CLSY'
251 : uint8_t version; // protocol version
252 : uint8_t libversion[3]; // major.minor.patch
253 : uint32_t expanded_size;
254 : uint16_t ncols;
255 : uint32_t nrows;
256 : uint64_t schema_hash;
257 : uint8_t checksum[6]; // 48 bits checksum (to ensure struct is 32 bytes)
258 : } cloudsync_payload_header;
259 :
260 : #ifdef _MSC_VER
261 : #pragma pack(pop)
262 : #endif
263 :
264 : #if CLOUDSYNC_UNITTEST
265 : bool force_uncompressed_blob = false;
266 : #define CHECK_FORCE_UNCOMPRESSED_BUFFER() if (force_uncompressed_blob) use_uncompressed_buffer = true
267 : #else
268 : #define CHECK_FORCE_UNCOMPRESSED_BUFFER()
269 : #endif
270 :
271 : // Internal prototypes
272 : int local_mark_insert_or_update_meta (cloudsync_table_context *table, const void *pk, size_t pklen, const char *col_name, int64_t db_version, int seq);
273 :
274 : // MARK: - CRDT algos -
275 :
276 622 : table_algo cloudsync_algo_from_name (const char *algo_name) {
277 622 : if (algo_name == NULL) return table_algo_none;
278 :
279 622 : if ((strcasecmp(algo_name, "CausalLengthSet") == 0) || (strcasecmp(algo_name, "cls") == 0)) return table_algo_crdt_cls;
280 266 : if ((strcasecmp(algo_name, "GrowOnlySet") == 0) || (strcasecmp(algo_name, "gos") == 0)) return table_algo_crdt_gos;
281 259 : if ((strcasecmp(algo_name, "DeleteWinsSet") == 0) || (strcasecmp(algo_name, "dws") == 0)) return table_algo_crdt_dws;
282 258 : if ((strcasecmp(algo_name, "AddWinsSet") == 0) || (strcasecmp(algo_name, "aws") == 0)) return table_algo_crdt_aws;
283 :
284 : // if nothing is found
285 257 : return table_algo_none;
286 622 : }
287 :
288 110 : const char *cloudsync_algo_name (table_algo algo) {
289 110 : switch (algo) {
290 105 : case table_algo_crdt_cls: return "cls";
291 1 : case table_algo_crdt_gos: return "gos";
292 1 : case table_algo_crdt_dws: return "dws";
293 1 : case table_algo_crdt_aws: return "aws";
294 1 : case table_algo_none: return NULL;
295 : }
296 1 : return NULL;
297 110 : }
298 :
299 : // MARK: - DBVM Utils -
300 :
301 32019 : DBVM_VALUE dbvm_execute (dbvm_t *stmt, cloudsync_context *data) {
302 32019 : if (!stmt) return DBVM_VALUE_ERROR;
303 :
304 32017 : int rc = databasevm_step(stmt);
305 32017 : if (rc != DBRES_ROW && rc != DBRES_DONE) {
306 2 : if (data) DEBUG_DBERROR(rc, "stmt_execute", data);
307 2 : databasevm_reset(stmt);
308 2 : return DBVM_VALUE_ERROR;
309 : }
310 :
311 32015 : DBVM_VALUE result = DBVM_VALUE_CHANGED;
312 32015 : if (stmt == data->data_version_stmt) {
313 29607 : int version = (int)database_column_int(stmt, 0);
314 29607 : if (version != data->data_version) {
315 225 : data->data_version = version;
316 225 : } else {
317 29382 : result = DBVM_VALUE_UNCHANGED;
318 : }
319 32015 : } else if (stmt == data->schema_version_stmt) {
320 1204 : int version = (int)database_column_int(stmt, 0);
321 1204 : if (version > data->schema_version) {
322 380 : data->schema_version = version;
323 380 : } else {
324 824 : result = DBVM_VALUE_UNCHANGED;
325 : }
326 :
327 2408 : } else if (stmt == data->db_version_stmt) {
328 1204 : data->db_version = (rc == DBRES_DONE) ? CLOUDSYNC_MIN_DB_VERSION : database_column_int(stmt, 0);
329 1204 : }
330 :
331 32015 : databasevm_reset(stmt);
332 32015 : return result;
333 32019 : }
334 :
335 3587 : int dbvm_count (dbvm_t *stmt, const char *value, size_t len, int type) {
336 3587 : int result = -1;
337 3587 : int rc = DBRES_OK;
338 :
339 3587 : if (value) {
340 3586 : rc = (type == DBTYPE_TEXT) ? databasevm_bind_text(stmt, 1, value, (int)len) : databasevm_bind_blob(stmt, 1, value, len);
341 3586 : if (rc != DBRES_OK) goto cleanup;
342 3586 : }
343 :
344 3587 : rc = databasevm_step(stmt);
345 7174 : if (rc == DBRES_DONE) {
346 1 : result = 0;
347 1 : rc = DBRES_OK;
348 3587 : } else if (rc == DBRES_ROW) {
349 3586 : result = (int)database_column_int(stmt, 0);
350 3586 : rc = DBRES_OK;
351 3586 : }
352 :
353 : cleanup:
354 3587 : databasevm_reset(stmt);
355 3587 : return result;
356 : }
357 :
358 255427 : void dbvm_reset (dbvm_t *stmt) {
359 255427 : if (!stmt) return;
360 232452 : databasevm_clear_bindings(stmt);
361 232452 : databasevm_reset(stmt);
362 255427 : }
363 :
364 : // MARK: - Database Version -
365 :
366 671 : int cloudsync_dbversion_build_query (cloudsync_context *data, char **sql_out) {
367 : // this function must be manually called each time tables changes
368 : // because the query plan changes too and it must be re-prepared
369 : // unfortunately there is no other way
370 :
371 : // we need to execute a query like:
372 : /*
373 : SELECT max(version) as version FROM (
374 : SELECT max(db_version) as version FROM "table1_cloudsync"
375 : UNION ALL
376 : SELECT max(db_version) as version FROM "table2_cloudsync"
377 : UNION ALL
378 : SELECT max(db_version) as version FROM "table3_cloudsync"
379 : UNION
380 : SELECT value as version FROM cloudsync_settings WHERE key = 'pre_alter_dbversion'
381 : )
382 : */
383 :
384 : // the good news is that the query can be computed in SQLite without the need to do any extra computation from the host language
385 :
386 671 : *sql_out = NULL;
387 671 : return database_select_text(data, SQL_DBVERSION_BUILD_QUERY, sql_out);
388 : }
389 :
390 894 : int cloudsync_dbversion_rebuild (cloudsync_context *data) {
391 894 : if (data->db_version_stmt) {
392 443 : databasevm_finalize(data->db_version_stmt);
393 443 : data->db_version_stmt = NULL;
394 443 : }
395 :
396 894 : int64_t count = dbutils_table_settings_count_tables(data);
397 894 : if (count == 0) return DBRES_OK;
398 671 : else if (count == -1) return cloudsync_set_dberror(data);
399 :
400 671 : char *sql = NULL;
401 671 : int rc = cloudsync_dbversion_build_query(data, &sql);
402 671 : if (rc != DBRES_OK) return cloudsync_set_dberror(data);
403 :
404 : // A NULL SQL with rc == OK means the generator produced a NULL row:
405 : // sqlite_master has no *_cloudsync meta-tables (for example, the user
406 : // dropped the base table and its meta-table without calling
407 : // cloudsync_cleanup, leaving stale cloudsync_table_settings rows).
408 : // Treat this the same as count == 0: no prepared statement, db_version
409 : // stays at the minimum and will be rebuilt on the next cloudsync_init.
410 : // Genuine errors from database_select_text are handled above.
411 670 : if (!sql) return DBRES_OK;
412 : DEBUG_SQL("db_version_stmt: %s", sql);
413 :
414 669 : rc = databasevm_prepare(data, sql, (void **)&data->db_version_stmt, DBFLAG_PERSISTENT);
415 : DEBUG_STMT("db_version_stmt %p", data->db_version_stmt);
416 669 : cloudsync_memory_free(sql);
417 669 : return rc;
418 894 : }
419 :
420 1204 : int cloudsync_dbversion_rerun (cloudsync_context *data) {
421 1204 : DBVM_VALUE schema_changed = dbvm_execute(data->schema_version_stmt, data);
422 1204 : if (schema_changed == DBVM_VALUE_ERROR) return -1;
423 :
424 1204 : if (schema_changed == DBVM_VALUE_CHANGED) {
425 380 : int rc = cloudsync_dbversion_rebuild(data);
426 380 : if (rc != DBRES_OK) return -1;
427 380 : }
428 :
429 1204 : if (!data->db_version_stmt) {
430 0 : data->db_version = CLOUDSYNC_MIN_DB_VERSION;
431 0 : return 0;
432 : }
433 :
434 1204 : DBVM_VALUE rc = dbvm_execute(data->db_version_stmt, data);
435 1204 : if (rc == DBVM_VALUE_ERROR) return -1;
436 1204 : return 0;
437 1204 : }
438 :
439 29609 : int cloudsync_dbversion_check_uptodate (cloudsync_context *data) {
440 : // perform a PRAGMA data_version to check if some other process write any data
441 29609 : DBVM_VALUE rc = dbvm_execute(data->data_version_stmt, data);
442 29609 : if (rc == DBVM_VALUE_ERROR) return -1;
443 :
444 : // db_version is already set and there is no need to update it
445 29607 : if (data->db_version != CLOUDSYNC_VALUE_NOTSET && rc == DBVM_VALUE_UNCHANGED) return 0;
446 :
447 1204 : return cloudsync_dbversion_rerun(data);
448 29609 : }
449 :
450 29583 : int64_t cloudsync_dbversion_next (cloudsync_context *data, int64_t merging_version) {
451 29583 : int rc = cloudsync_dbversion_check_uptodate(data);
452 29583 : if (rc != DBRES_OK) return -1;
453 :
454 29582 : int64_t result = data->db_version + 1;
455 29582 : if (result < data->pending_db_version) result = data->pending_db_version;
456 29582 : if (merging_version != CLOUDSYNC_VALUE_NOTSET && result < merging_version) result = merging_version;
457 29582 : data->pending_db_version = result;
458 :
459 29582 : return result;
460 29583 : }
461 :
462 : // MARK: - PK Context -
463 :
464 0 : char *cloudsync_pk_context_tbl (cloudsync_pk_decode_bind_context *ctx, int64_t *tbl_len) {
465 0 : *tbl_len = ctx->tbl_len;
466 0 : return ctx->tbl;
467 : }
468 :
469 0 : void *cloudsync_pk_context_pk (cloudsync_pk_decode_bind_context *ctx, int64_t *pk_len) {
470 0 : *pk_len = ctx->pk_len;
471 0 : return (void *)ctx->pk;
472 : }
473 :
474 0 : char *cloudsync_pk_context_colname (cloudsync_pk_decode_bind_context *ctx, int64_t *colname_len) {
475 0 : *colname_len = ctx->col_name_len;
476 0 : return ctx->col_name;
477 : }
478 :
479 0 : int64_t cloudsync_pk_context_cl (cloudsync_pk_decode_bind_context *ctx) {
480 0 : return ctx->cl;
481 : }
482 :
483 0 : int64_t cloudsync_pk_context_dbversion (cloudsync_pk_decode_bind_context *ctx) {
484 0 : return ctx->db_version;
485 : }
486 :
487 : // MARK: - CloudSync Context -
488 :
489 12663 : int cloudsync_insync (cloudsync_context *data) {
490 12663 : return data->insync;
491 : }
492 :
493 797 : void *cloudsync_siteid (cloudsync_context *data) {
494 797 : return (void *)data->site_id;
495 : }
496 :
497 2 : void cloudsync_reset_siteid (cloudsync_context *data) {
498 2 : memset(data->site_id, 0, sizeof(uint8_t) * UUID_LEN);
499 2 : }
500 :
501 229 : int cloudsync_load_siteid (cloudsync_context *data) {
502 : // check if site_id was already loaded
503 229 : if (data->site_id[0] != 0) return DBRES_OK;
504 :
505 : // load site_id
506 227 : char *buffer = NULL;
507 227 : int64_t size = 0;
508 227 : int rc = database_select_blob(data, SQL_SITEID_SELECT_ROWID0, &buffer, &size);
509 227 : if (rc != DBRES_OK) return rc;
510 227 : if (!buffer || size != UUID_LEN) {
511 0 : if (buffer) cloudsync_memory_free(buffer);
512 0 : return cloudsync_set_error(data, "Unable to retrieve siteid", DBRES_MISUSE);
513 : }
514 :
515 227 : memcpy(data->site_id, buffer, UUID_LEN);
516 227 : cloudsync_memory_free(buffer);
517 :
518 227 : return DBRES_OK;
519 229 : }
520 :
521 2 : int64_t cloudsync_dbversion (cloudsync_context *data) {
522 2 : return data->db_version;
523 : }
524 :
525 12140 : int cloudsync_bumpseq (cloudsync_context *data) {
526 12140 : int value = data->seq;
527 12140 : data->seq += 1;
528 12140 : return value;
529 : }
530 :
531 286 : void cloudsync_update_schema_hash (cloudsync_context *data) {
532 286 : database_update_schema_hash(data, &data->schema_hash);
533 286 : }
534 :
535 62337 : void *cloudsync_db (cloudsync_context *data) {
536 62337 : return data->db;
537 : }
538 :
539 513 : int cloudsync_add_dbvms (cloudsync_context *data) {
540 : DEBUG_DBFUNCTION("cloudsync_add_stmts");
541 :
542 513 : if (data->data_version_stmt == NULL) {
543 227 : int rc = databasevm_prepare(data, SQL_DATA_VERSION, (void **)&data->data_version_stmt, DBFLAG_PERSISTENT);
544 : DEBUG_STMT("data_version_stmt %p", data->data_version_stmt);
545 227 : if (rc != DBRES_OK) return rc;
546 : DEBUG_SQL("data_version_stmt: %s", SQL_DATA_VERSION);
547 227 : }
548 :
549 513 : if (data->schema_version_stmt == NULL) {
550 227 : int rc = databasevm_prepare(data, SQL_SCHEMA_VERSION, (void **)&data->schema_version_stmt, DBFLAG_PERSISTENT);
551 : DEBUG_STMT("schema_version_stmt %p", data->schema_version_stmt);
552 227 : if (rc != DBRES_OK) return rc;
553 : DEBUG_SQL("schema_version_stmt: %s", SQL_SCHEMA_VERSION);
554 227 : }
555 :
556 513 : if (data->getset_siteid_stmt == NULL) {
557 : // get and set index of the site_id
558 : // in SQLite, we can’t directly combine an INSERT and a SELECT to both insert a row and return an identifier (rowid) in a single statement,
559 : // however, we can use a workaround by leveraging the INSERT statement with ON CONFLICT DO UPDATE and then combining it with RETURNING rowid
560 227 : int rc = databasevm_prepare(data, SQL_SITEID_GETSET_ROWID_BY_SITEID, (void **)&data->getset_siteid_stmt, DBFLAG_PERSISTENT);
561 : DEBUG_STMT("getset_siteid_stmt %p", data->getset_siteid_stmt);
562 227 : if (rc != DBRES_OK) return rc;
563 : DEBUG_SQL("getset_siteid_stmt: %s", SQL_SITEID_GETSET_ROWID_BY_SITEID);
564 227 : }
565 :
566 513 : return cloudsync_dbversion_rebuild(data);
567 513 : }
568 :
569 23 : int cloudsync_set_error (cloudsync_context *data, const char *err_user, int err_code) {
570 : // force err_code to be something different than OK
571 23 : if (err_code == DBRES_OK) err_code = database_errcode(data);
572 23 : if (err_code == DBRES_OK) err_code = DBRES_ERROR;
573 :
574 : // compute a meaningful error message
575 23 : if (err_user == NULL) {
576 6 : snprintf(data->errmsg, sizeof(data->errmsg), "%s", database_errmsg(data));
577 6 : } else {
578 17 : const char *db_error = database_errmsg(data);
579 : char db_error_copy[sizeof(data->errmsg)];
580 17 : int rc = database_errcode(data);
581 17 : if (rc == DBRES_OK) {
582 17 : snprintf(data->errmsg, sizeof(data->errmsg), "%s", err_user);
583 17 : } else {
584 0 : if (db_error == data->errmsg) {
585 0 : snprintf(db_error_copy, sizeof(db_error_copy), "%s", db_error);
586 0 : db_error = db_error_copy;
587 0 : }
588 0 : snprintf(data->errmsg, sizeof(data->errmsg), "%s (%s)", err_user, db_error);
589 : }
590 : }
591 :
592 23 : data->errcode = err_code;
593 23 : return err_code;
594 : }
595 :
596 6 : int cloudsync_set_dberror (cloudsync_context *data) {
597 6 : return cloudsync_set_error(data, NULL, DBRES_OK);
598 : }
599 :
600 14 : const char *cloudsync_errmsg (cloudsync_context *data) {
601 14 : return data->errmsg;
602 : }
603 :
604 4 : int cloudsync_errcode (cloudsync_context *data) {
605 4 : return data->errcode;
606 : }
607 :
608 2 : void cloudsync_reset_error (cloudsync_context *data) {
609 2 : data->errmsg[0] = 0;
610 2 : data->errcode = DBRES_OK;
611 2 : }
612 :
613 3 : void *cloudsync_auxdata (cloudsync_context *data) {
614 3 : return data->aux_data;
615 : }
616 :
617 2 : void cloudsync_set_auxdata (cloudsync_context *data, void *xdata) {
618 2 : data->aux_data = xdata;
619 2 : }
620 :
621 6 : void cloudsync_set_schema (cloudsync_context *data, const char *schema) {
622 6 : if (data->current_schema && schema && strcmp(data->current_schema, schema) == 0) return;
623 5 : if (data->current_schema) cloudsync_memory_free(data->current_schema);
624 5 : data->current_schema = NULL;
625 5 : if (schema) data->current_schema = cloudsync_string_dup_lowercase(schema);
626 6 : }
627 :
628 1289 : const char *cloudsync_schema (cloudsync_context *data) {
629 1289 : return data->current_schema;
630 : }
631 :
632 4 : const char *cloudsync_table_schema (cloudsync_context *data, const char *table_name) {
633 4 : cloudsync_table_context *table = table_lookup(data, table_name);
634 4 : if (!table) return NULL;
635 :
636 1 : return table->schema;
637 4 : }
638 :
639 : // MARK: - Table Utils -
640 :
641 69 : void table_pknames_free (char **names, int nrows) {
642 69 : if (!names) return;
643 132 : for (int i = 0; i < nrows; ++i) {cloudsync_memory_free(names[i]);}
644 46 : cloudsync_memory_free(names);
645 69 : }
646 :
647 284 : char *table_build_mergedelete_sql (cloudsync_table_context *table) {
648 : #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
649 : if (table->rowid_only) {
650 : char *sql = memory_mprintf(SQL_DELETE_ROW_BY_ROWID, table->name);
651 : return sql;
652 : }
653 : #endif
654 :
655 284 : return sql_build_delete_by_pk(table->context, table->name, table->schema);
656 : }
657 :
658 1367 : char *table_build_mergeinsert_sql (cloudsync_table_context *table, const char *colname) {
659 1367 : char *sql = NULL;
660 :
661 : #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
662 : if (table->rowid_only) {
663 : if (colname == NULL) {
664 : // INSERT OR IGNORE INTO customers (first_name,last_name) VALUES (?,?);
665 : sql = memory_mprintf(SQL_INSERT_ROWID_IGNORE, table->name);
666 : } else {
667 : // INSERT INTO customers (first_name,last_name,age) VALUES (?,?,?) ON CONFLICT DO UPDATE SET age=?;
668 : sql = memory_mprintf(SQL_UPSERT_ROWID_AND_COL_BY_ROWID, table->name, colname, colname);
669 : }
670 : return sql;
671 : }
672 : #endif
673 :
674 1367 : if (colname == NULL) {
675 : // is sentinel insert
676 284 : sql = sql_build_insert_pk_ignore(table->context, table->name, table->schema);
677 284 : } else {
678 1083 : sql = sql_build_upsert_pk_and_col(table->context, table->name, colname, table->schema);
679 : }
680 1367 : return sql;
681 : }
682 :
683 1082 : char *table_build_value_sql (cloudsync_table_context *table, const char *colname) {
684 : #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
685 : if (table->rowid_only) {
686 : char *colnamequote = "\"";
687 : char *sql = memory_mprintf(SQL_SELECT_COLS_BY_ROWID_FMT, colnamequote, colname, colnamequote, table->name);
688 : return sql;
689 : }
690 : #endif
691 :
692 : // SELECT age FROM customers WHERE first_name=? AND last_name=?;
693 1082 : return sql_build_select_cols_by_pk(table->context, table->name, colname, table->schema);
694 : }
695 :
696 284 : cloudsync_table_context *table_create (cloudsync_context *data, const char *name, table_algo algo) {
697 : DEBUG_DBFUNCTION("table_create %s", name);
698 :
699 284 : cloudsync_table_context *table = (cloudsync_table_context *)cloudsync_memory_zeroalloc(sizeof(cloudsync_table_context));
700 284 : if (!table) return NULL;
701 :
702 284 : table->context = data;
703 284 : table->algo = algo;
704 284 : table->name = cloudsync_string_dup_lowercase(name);
705 :
706 : // Detect schema from metadata table location. If metadata table doesn't
707 : // exist yet (during initialization), fall back to cloudsync_schema() which
708 : // returns the explicitly set schema or current_schema().
709 284 : table->schema = database_table_schema(name);
710 284 : if (!table->schema) {
711 284 : const char *fallback_schema = cloudsync_schema(data);
712 284 : if (fallback_schema) {
713 0 : table->schema = cloudsync_string_dup(fallback_schema);
714 0 : }
715 284 : }
716 :
717 284 : if (!table->name) {
718 0 : cloudsync_memory_free(table);
719 0 : return NULL;
720 : }
721 284 : table->meta_ref = database_build_meta_ref(table->schema, table->name);
722 284 : table->base_ref = database_build_base_ref(table->schema, table->name);
723 284 : table->enabled = true;
724 :
725 284 : return table;
726 284 : }
727 :
728 284 : void table_free (cloudsync_table_context *table) {
729 : DEBUG_DBFUNCTION("table_free %s", (table) ? (table->name) : "NULL");
730 284 : if (!table) return;
731 :
732 284 : if (table->col_name) {
733 1327 : for (int i=0; i<table->ncols; ++i) {
734 1082 : cloudsync_memory_free(table->col_name[i]);
735 1082 : }
736 245 : cloudsync_memory_free(table->col_name);
737 245 : }
738 284 : if (table->col_merge_stmt) {
739 1327 : for (int i=0; i<table->ncols; ++i) {
740 1082 : databasevm_finalize(table->col_merge_stmt[i]);
741 1082 : }
742 245 : cloudsync_memory_free(table->col_merge_stmt);
743 245 : }
744 284 : if (table->col_value_stmt) {
745 1327 : for (int i=0; i<table->ncols; ++i) {
746 1082 : databasevm_finalize(table->col_value_stmt[i]);
747 1082 : }
748 245 : cloudsync_memory_free(table->col_value_stmt);
749 245 : }
750 284 : if (table->col_id) {
751 245 : cloudsync_memory_free(table->col_id);
752 245 : }
753 284 : if (table->col_algo) {
754 245 : cloudsync_memory_free(table->col_algo);
755 245 : }
756 284 : if (table->col_delimiter) {
757 1327 : for (int i=0; i<table->ncols; ++i) {
758 1082 : if (table->col_delimiter[i]) cloudsync_memory_free(table->col_delimiter[i]);
759 1082 : }
760 245 : cloudsync_memory_free(table->col_delimiter);
761 245 : }
762 :
763 284 : if (table->block_value_read_stmt) databasevm_finalize(table->block_value_read_stmt);
764 284 : if (table->block_value_write_stmt) databasevm_finalize(table->block_value_write_stmt);
765 284 : if (table->block_value_delete_stmt) databasevm_finalize(table->block_value_delete_stmt);
766 284 : if (table->block_list_stmt) databasevm_finalize(table->block_list_stmt);
767 284 : if (table->blocks_ref) cloudsync_memory_free(table->blocks_ref);
768 :
769 284 : if (table->name) cloudsync_memory_free(table->name);
770 284 : if (table->schema) cloudsync_memory_free(table->schema);
771 284 : if (table->meta_ref) cloudsync_memory_free(table->meta_ref);
772 284 : if (table->base_ref) cloudsync_memory_free(table->base_ref);
773 284 : if (table->pk_name) table_pknames_free(table->pk_name, table->npks);
774 284 : if (table->meta_pkexists_stmt) databasevm_finalize(table->meta_pkexists_stmt);
775 284 : if (table->meta_sentinel_update_stmt) databasevm_finalize(table->meta_sentinel_update_stmt);
776 284 : if (table->meta_sentinel_insert_stmt) databasevm_finalize(table->meta_sentinel_insert_stmt);
777 284 : if (table->meta_row_insert_update_stmt) databasevm_finalize(table->meta_row_insert_update_stmt);
778 284 : if (table->meta_row_drop_stmt) databasevm_finalize(table->meta_row_drop_stmt);
779 284 : if (table->meta_update_move_stmt) databasevm_finalize(table->meta_update_move_stmt);
780 284 : if (table->meta_local_cl_stmt) databasevm_finalize(table->meta_local_cl_stmt);
781 284 : if (table->meta_winner_clock_stmt) databasevm_finalize(table->meta_winner_clock_stmt);
782 284 : if (table->meta_merge_delete_drop) databasevm_finalize(table->meta_merge_delete_drop);
783 284 : if (table->meta_zero_clock_stmt) databasevm_finalize(table->meta_zero_clock_stmt);
784 284 : if (table->meta_col_version_stmt) databasevm_finalize(table->meta_col_version_stmt);
785 284 : if (table->meta_site_id_stmt) databasevm_finalize(table->meta_site_id_stmt);
786 :
787 284 : if (table->real_col_values_stmt) databasevm_finalize(table->real_col_values_stmt);
788 284 : if (table->real_merge_delete_stmt) databasevm_finalize(table->real_merge_delete_stmt);
789 284 : if (table->real_merge_sentinel_stmt) databasevm_finalize(table->real_merge_sentinel_stmt);
790 :
791 284 : cloudsync_memory_free(table);
792 284 : }
793 :
794 284 : int table_add_stmts (cloudsync_table_context *table, int ncols) {
795 284 : int rc = DBRES_OK;
796 284 : char *sql = NULL;
797 284 : cloudsync_context *data = table->context;
798 :
799 : // META TABLE statements
800 :
801 : // CREATE TABLE IF NOT EXISTS \"%w_cloudsync\" (pk BLOB NOT NULL, col_name TEXT NOT NULL, col_version INTEGER, db_version INTEGER, site_id INTEGER DEFAULT 0, seq INTEGER, PRIMARY KEY (pk, col_name));
802 :
803 : // precompile the pk exists statement
804 : // we do not need an index on the pk column because it is already covered by the fact that it is part of the prikeys
805 : // EXPLAIN QUERY PLAN reports: SEARCH table_name USING PRIMARY KEY (pk=?)
806 284 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_ROW_EXISTS_BY_PK, table->meta_ref);
807 284 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
808 : DEBUG_SQL("meta_pkexists_stmt: %s", sql);
809 :
810 284 : rc = databasevm_prepare(data, sql, (void **)&table->meta_pkexists_stmt, DBFLAG_PERSISTENT);
811 284 : cloudsync_memory_free(sql);
812 284 : if (rc != DBRES_OK) goto cleanup;
813 :
814 : // precompile the update local sentinel statement
815 284 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_UPDATE_COL_BUMP_VERSION, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
816 284 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
817 : DEBUG_SQL("meta_sentinel_update_stmt: %s", sql);
818 :
819 284 : rc = databasevm_prepare(data, sql, (void **)&table->meta_sentinel_update_stmt, DBFLAG_PERSISTENT);
820 284 : cloudsync_memory_free(sql);
821 284 : if (rc != DBRES_OK) goto cleanup;
822 :
823 : // precompile the insert local sentinel statement
824 284 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_UPSERT_COL_INIT_OR_BUMP_VERSION, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE, table->meta_ref, table->meta_ref, table->meta_ref);
825 284 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
826 : DEBUG_SQL("meta_sentinel_insert_stmt: %s", sql);
827 :
828 284 : rc = databasevm_prepare(data, sql, (void **)&table->meta_sentinel_insert_stmt, DBFLAG_PERSISTENT);
829 284 : cloudsync_memory_free(sql);
830 284 : if (rc != DBRES_OK) goto cleanup;
831 :
832 : // precompile the insert/update local row statement
833 284 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_UPSERT_RAW_COLVERSION, table->meta_ref, table->meta_ref);
834 284 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
835 : DEBUG_SQL("meta_row_insert_update_stmt: %s", sql);
836 :
837 284 : rc = databasevm_prepare(data, sql, (void **)&table->meta_row_insert_update_stmt, DBFLAG_PERSISTENT);
838 284 : cloudsync_memory_free(sql);
839 284 : if (rc != DBRES_OK) goto cleanup;
840 :
841 : // precompile the delete rows from meta
842 284 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_DELETE_PK_EXCEPT_COL, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
843 284 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
844 : DEBUG_SQL("meta_row_drop_stmt: %s", sql);
845 :
846 284 : rc = databasevm_prepare(data, sql, (void **)&table->meta_row_drop_stmt, DBFLAG_PERSISTENT);
847 284 : cloudsync_memory_free(sql);
848 284 : if (rc != DBRES_OK) goto cleanup;
849 :
850 : // precompile the update rows from meta when pk changes
851 : // see https://github.com/sqliteai/sqlite-sync/blob/main/docs/PriKey.md for more details
852 284 : sql = sql_build_rekey_pk_and_reset_version_except_col(data, table->name, CLOUDSYNC_TOMBSTONE_VALUE);
853 284 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
854 : DEBUG_SQL("meta_update_move_stmt: %s", sql);
855 :
856 284 : rc = databasevm_prepare(data, sql, (void **)&table->meta_update_move_stmt, DBFLAG_PERSISTENT);
857 284 : cloudsync_memory_free(sql);
858 284 : if (rc != DBRES_OK) goto cleanup;
859 :
860 : // local cl
861 284 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_GET_COL_VERSION_OR_ROW_EXISTS, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE, table->meta_ref);
862 284 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
863 : DEBUG_SQL("meta_local_cl_stmt: %s", sql);
864 :
865 284 : rc = databasevm_prepare(data, sql, (void **)&table->meta_local_cl_stmt, DBFLAG_PERSISTENT);
866 284 : cloudsync_memory_free(sql);
867 284 : if (rc != DBRES_OK) goto cleanup;
868 :
869 : // rowid of the last inserted/updated row in the meta table
870 284 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_INSERT_RETURN_CHANGE_ID, table->meta_ref);
871 284 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
872 : DEBUG_SQL("meta_winner_clock_stmt: %s", sql);
873 :
874 284 : rc = databasevm_prepare(data, sql, (void **)&table->meta_winner_clock_stmt, DBFLAG_PERSISTENT);
875 284 : cloudsync_memory_free(sql);
876 284 : if (rc != DBRES_OK) goto cleanup;
877 :
878 284 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_DELETE_PK_EXCEPT_COL, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
879 284 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
880 : DEBUG_SQL("meta_merge_delete_drop: %s", sql);
881 :
882 284 : rc = databasevm_prepare(data, sql, (void **)&table->meta_merge_delete_drop, DBFLAG_PERSISTENT);
883 284 : cloudsync_memory_free(sql);
884 284 : if (rc != DBRES_OK) goto cleanup;
885 :
886 : // zero clock
887 284 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_TOMBSTONE_PK_EXCEPT_COL, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
888 284 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
889 : DEBUG_SQL("meta_zero_clock_stmt: %s", sql);
890 :
891 284 : rc = databasevm_prepare(data, sql, (void **)&table->meta_zero_clock_stmt, DBFLAG_PERSISTENT);
892 284 : cloudsync_memory_free(sql);
893 284 : if (rc != DBRES_OK) goto cleanup;
894 :
895 : // col_version
896 284 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_COL_VERSION_BY_PK_COL, table->meta_ref);
897 284 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
898 : DEBUG_SQL("meta_col_version_stmt: %s", sql);
899 :
900 284 : rc = databasevm_prepare(data, sql, (void **)&table->meta_col_version_stmt, DBFLAG_PERSISTENT);
901 284 : cloudsync_memory_free(sql);
902 284 : if (rc != DBRES_OK) goto cleanup;
903 :
904 : // site_id
905 284 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_SITE_ID_BY_PK_COL, table->meta_ref);
906 284 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
907 : DEBUG_SQL("meta_site_id_stmt: %s", sql);
908 :
909 284 : rc = databasevm_prepare(data, sql, (void **)&table->meta_site_id_stmt, DBFLAG_PERSISTENT);
910 284 : cloudsync_memory_free(sql);
911 284 : if (rc != DBRES_OK) goto cleanup;
912 :
913 : // REAL TABLE statements
914 :
915 : // precompile the get column value statement
916 284 : if (ncols > 0) {
917 245 : sql = sql_build_select_nonpk_by_pk(data, table->name, table->schema);
918 245 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
919 : DEBUG_SQL("real_col_values_stmt: %s", sql);
920 :
921 245 : rc = databasevm_prepare(data, sql, (void **)&table->real_col_values_stmt, DBFLAG_PERSISTENT);
922 245 : cloudsync_memory_free(sql);
923 245 : if (rc != DBRES_OK) goto cleanup;
924 245 : }
925 :
926 284 : sql = table_build_mergedelete_sql(table);
927 284 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
928 : DEBUG_SQL("real_merge_delete: %s", sql);
929 :
930 284 : rc = databasevm_prepare(data, sql, (void **)&table->real_merge_delete_stmt, DBFLAG_PERSISTENT);
931 284 : cloudsync_memory_free(sql);
932 284 : if (rc != DBRES_OK) goto cleanup;
933 :
934 284 : sql = table_build_mergeinsert_sql(table, NULL);
935 284 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
936 : DEBUG_SQL("real_merge_sentinel: %s", sql);
937 :
938 284 : rc = databasevm_prepare(data, sql, (void **)&table->real_merge_sentinel_stmt, DBFLAG_PERSISTENT);
939 284 : cloudsync_memory_free(sql);
940 284 : if (rc != DBRES_OK) goto cleanup;
941 :
942 : cleanup:
943 284 : if (rc != DBRES_OK) DEBUG_ALWAYS("table_add_stmts error: %d %s\n", rc, database_errmsg(data));
944 284 : return rc;
945 : }
946 :
947 184072 : cloudsync_table_context *table_lookup (cloudsync_context *data, const char *table_name) {
948 : DEBUG_DBFUNCTION("table_lookup %s", table_name);
949 :
950 184072 : if (table_name) {
951 185698 : for (int i=0; i<data->tables_count; ++i) {
952 185119 : if ((strcasecmp(data->tables[i]->name, table_name) == 0)) return data->tables[i];
953 1626 : }
954 579 : }
955 :
956 579 : return NULL;
957 184072 : }
958 :
959 169267 : void *table_column_lookup (cloudsync_table_context *table, const char *col_name, bool is_merge, int *index) {
960 : DEBUG_DBFUNCTION("table_column_lookup %s", col_name);
961 :
962 315425 : for (int i=0; i<table->ncols; ++i) {
963 315425 : if (strcasecmp(table->col_name[i], col_name) == 0) {
964 169267 : if (index) *index = i;
965 169267 : return (is_merge) ? table->col_merge_stmt[i] : table->col_value_stmt[i];
966 : }
967 146158 : }
968 :
969 0 : if (index) *index = -1;
970 0 : return NULL;
971 169267 : }
972 :
973 283 : int table_remove (cloudsync_context *data, cloudsync_table_context *table) {
974 283 : const char *table_name = table->name;
975 : DEBUG_DBFUNCTION("table_remove %s", table_name);
976 :
977 333 : for (int i = 0; i < data->tables_count; ++i) {
978 333 : cloudsync_table_context *t = data->tables[i];
979 :
980 : // pointer compare is fastest but fallback to strcasecmp if not same pointer
981 333 : if ((t == table) || ((strcasecmp(t->name, table_name) == 0))) {
982 283 : int last = data->tables_count - 1;
983 283 : data->tables[i] = data->tables[last]; // move last into the hole (keeps array dense)
984 283 : data->tables[last] = NULL; // NULLify tail (as an extra security measure)
985 283 : data->tables_count--;
986 283 : return data->tables_count;
987 : }
988 50 : }
989 :
990 0 : return -1;
991 283 : }
992 :
993 1083 : int table_add_to_context_cb (void *xdata, int ncols, char **values, char **names) {
994 1083 : cloudsync_table_context *table = (cloudsync_table_context *)xdata;
995 1083 : cloudsync_context *data = table->context;
996 :
997 1083 : int index = table->ncols;
998 2165 : for (int i=0; i<ncols; i+=2) {
999 1083 : const char *name = values[i];
1000 1083 : int cid = (int)strtol(values[i+1], NULL, 0);
1001 :
1002 1083 : table->col_id[index] = cid;
1003 1083 : table->col_name[index] = cloudsync_string_dup_lowercase(name);
1004 1083 : if (!table->col_name[index]) goto error;
1005 :
1006 1083 : char *sql = table_build_mergeinsert_sql(table, name);
1007 1083 : if (!sql) goto error;
1008 : DEBUG_SQL("col_merge_stmt[%d]: %s", index, sql);
1009 :
1010 1083 : int rc = databasevm_prepare(data, sql, (void **)&table->col_merge_stmt[index], DBFLAG_PERSISTENT);
1011 1083 : cloudsync_memory_free(sql);
1012 1083 : if (rc != DBRES_OK) goto error;
1013 1082 : if (!table->col_merge_stmt[index]) goto error;
1014 :
1015 1082 : sql = table_build_value_sql(table, name);
1016 1082 : if (!sql) goto error;
1017 : DEBUG_SQL("col_value_stmt[%d]: %s", index, sql);
1018 :
1019 1082 : rc = databasevm_prepare(data, sql, (void **)&table->col_value_stmt[index], DBFLAG_PERSISTENT);
1020 1082 : cloudsync_memory_free(sql);
1021 1082 : if (rc != DBRES_OK) goto error;
1022 1082 : if (!table->col_value_stmt[index]) goto error;
1023 1082 : }
1024 1082 : table->ncols += 1;
1025 :
1026 1082 : return 0;
1027 :
1028 : error:
1029 : // clean up partially-initialized entry at index
1030 1 : if (table->col_name[index]) {cloudsync_memory_free(table->col_name[index]); table->col_name[index] = NULL;}
1031 1 : if (table->col_merge_stmt[index]) {databasevm_finalize(table->col_merge_stmt[index]); table->col_merge_stmt[index] = NULL;}
1032 1 : if (table->col_value_stmt[index]) {databasevm_finalize(table->col_value_stmt[index]); table->col_value_stmt[index] = NULL;}
1033 1 : return 1;
1034 1083 : }
1035 :
1036 284 : bool table_ensure_capacity (cloudsync_context *data) {
1037 284 : if (data->tables_count < data->tables_cap) return true;
1038 :
1039 0 : int new_cap = data->tables_cap ? data->tables_cap * 2 : CLOUDSYNC_INIT_NTABLES;
1040 0 : size_t bytes = (size_t)new_cap * sizeof(*data->tables);
1041 0 : void *p = cloudsync_memory_realloc(data->tables, bytes);
1042 0 : if (!p) return false;
1043 :
1044 0 : data->tables = (cloudsync_table_context **)p;
1045 0 : data->tables_cap = new_cap;
1046 0 : return true;
1047 284 : }
1048 :
1049 289 : bool table_add_to_context (cloudsync_context *data, table_algo algo, const char *table_name) {
1050 : DEBUG_DBFUNCTION("cloudsync_context_add_table %s", table_name);
1051 :
1052 : // Check if table already initialized in this connection's context.
1053 : // Note: This prevents same-connection duplicate initialization.
1054 : // SQLite clients cannot distinguish schemas, so having 'public.users'
1055 : // and 'auth.users' would cause sync ambiguity. Users should avoid
1056 : // initializing tables with the same name in different schemas.
1057 : // If two concurrent connections initialize tables with the same name
1058 : // in different schemas, the behavior is undefined.
1059 289 : cloudsync_table_context *table = table_lookup(data, table_name);
1060 289 : if (table) return true;
1061 :
1062 : // check for space availability
1063 284 : if (!table_ensure_capacity(data)) return false;
1064 :
1065 : // setup a new table
1066 284 : table = table_create(data, table_name, algo);
1067 284 : if (!table) return false;
1068 :
1069 : // fill remaining metadata in the table
1070 284 : int count = database_count_pk(data, table_name, false, table->schema);
1071 284 : if (count < 0) {cloudsync_set_dberror(data); goto abort_add_table;}
1072 284 : table->npks = count;
1073 284 : if (table->npks == 0) {
1074 : #if CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
1075 0 : goto abort_add_table;
1076 : #else
1077 : table->rowid_only = true;
1078 : table->npks = 1; // rowid
1079 : #endif
1080 : }
1081 :
1082 284 : int ncols = database_count_nonpk(data, table_name, table->schema);
1083 284 : if (ncols < 0) {cloudsync_set_dberror(data); goto abort_add_table;}
1084 284 : int rc = table_add_stmts(table, ncols);
1085 284 : if (rc != DBRES_OK) goto abort_add_table;
1086 :
1087 : // a table with only pk(s) is totally legal
1088 284 : if (ncols > 0) {
1089 245 : table->col_name = (char **)cloudsync_memory_zeroalloc((uint64_t)(sizeof(char *) * ncols));
1090 245 : if (!table->col_name) goto abort_add_table;
1091 :
1092 245 : table->col_id = (int *)cloudsync_memory_zeroalloc((uint64_t)(sizeof(int) * ncols));
1093 245 : if (!table->col_id) goto abort_add_table;
1094 :
1095 245 : table->col_merge_stmt = (dbvm_t **)cloudsync_memory_zeroalloc((uint64_t)(sizeof(void *) * ncols));
1096 245 : if (!table->col_merge_stmt) goto abort_add_table;
1097 :
1098 245 : table->col_value_stmt = (dbvm_t **)cloudsync_memory_zeroalloc((uint64_t)(sizeof(void *) * ncols));
1099 245 : if (!table->col_value_stmt) goto abort_add_table;
1100 :
1101 245 : table->col_algo = (col_algo_t *)cloudsync_memory_zeroalloc((uint64_t)(sizeof(col_algo_t) * ncols));
1102 245 : if (!table->col_algo) goto abort_add_table;
1103 :
1104 245 : table->col_delimiter = (char **)cloudsync_memory_zeroalloc((uint64_t)(sizeof(char *) * ncols));
1105 245 : if (!table->col_delimiter) goto abort_add_table;
1106 :
1107 : // Pass empty string when schema is NULL; SQL will fall back to current_schema()
1108 245 : const char *schema = table->schema ? table->schema : "";
1109 490 : char *sql = cloudsync_memory_mprintf(SQL_PRAGMA_TABLEINFO_LIST_NONPK_NAME_CID,
1110 245 : table_name, schema, table_name, schema);
1111 245 : if (!sql) goto abort_add_table;
1112 245 : rc = database_exec_callback(data, sql, table_add_to_context_cb, (void *)table);
1113 245 : cloudsync_memory_free(sql);
1114 245 : if (rc == DBRES_ABORT) goto abort_add_table;
1115 244 : }
1116 :
1117 : // append newly created table
1118 283 : data->tables[data->tables_count++] = table;
1119 283 : return true;
1120 :
1121 : abort_add_table:
1122 1 : table_free(table);
1123 1 : return false;
1124 289 : }
1125 :
1126 0 : dbvm_t *cloudsync_colvalue_stmt (cloudsync_context *data, const char *tbl_name, bool *persistent) {
1127 0 : dbvm_t *vm = NULL;
1128 0 : *persistent = false;
1129 :
1130 0 : cloudsync_table_context *table = table_lookup(data, tbl_name);
1131 0 : if (table) {
1132 0 : char *col_name = NULL;
1133 0 : if (table->ncols > 0) {
1134 0 : col_name = table->col_name[0];
1135 : // retrieve col_value precompiled statement
1136 0 : vm = table_column_lookup(table, col_name, false, NULL);
1137 0 : *persistent = true;
1138 0 : } else {
1139 0 : char *sql = table_build_value_sql(table, "*");
1140 0 : databasevm_prepare(data, sql, (void **)&vm, 0);
1141 0 : cloudsync_memory_free(sql);
1142 0 : *persistent = false;
1143 : }
1144 0 : }
1145 :
1146 0 : return vm;
1147 : }
1148 :
1149 6875 : bool table_enabled (cloudsync_table_context *table) {
1150 6875 : return table->enabled;
1151 : }
1152 :
1153 6 : void table_set_enabled (cloudsync_table_context *table, bool value) {
1154 6 : table->enabled = value;
1155 6 : }
1156 :
1157 19846 : int table_count_cols (cloudsync_table_context *table) {
1158 19846 : return table->ncols;
1159 : }
1160 :
1161 6761 : int table_count_pks (cloudsync_table_context *table) {
1162 6761 : return table->npks;
1163 : }
1164 :
1165 32920 : const char *table_colname (cloudsync_table_context *table, int index) {
1166 32920 : return table->col_name[index];
1167 : }
1168 :
1169 3586 : bool table_pk_exists (cloudsync_table_context *table, const char *value, size_t len) {
1170 : // check if a row with the same primary key already exists
1171 : // if so, this means the row might have been previously deleted (sentinel)
1172 3586 : return (dbvm_count(table->meta_pkexists_stmt, value, len, DBTYPE_BLOB) > 0);
1173 : }
1174 :
1175 0 : char **table_pknames (cloudsync_table_context *table) {
1176 0 : return table->pk_name;
1177 : }
1178 :
1179 23 : void table_set_pknames (cloudsync_table_context *table, char **pknames) {
1180 23 : table_pknames_free(table->pk_name, table->npks);
1181 23 : table->pk_name = pknames;
1182 23 : }
1183 :
1184 49240 : bool table_algo_isgos (cloudsync_table_context *table) {
1185 49240 : return (table->algo == table_algo_crdt_gos);
1186 : }
1187 :
1188 0 : const char *table_schema (cloudsync_table_context *table) {
1189 0 : return table->schema;
1190 : }
1191 :
1192 : // MARK: - Merge Insert -
1193 :
1194 46090 : int64_t merge_get_local_cl (cloudsync_table_context *table, const char *pk, int pklen) {
1195 46090 : dbvm_t *vm = table->meta_local_cl_stmt;
1196 46090 : int64_t result = -1;
1197 :
1198 46090 : int rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
1199 46090 : if (rc != DBRES_OK) goto cleanup;
1200 :
1201 46090 : rc = databasevm_bind_blob(vm, 2, (const void *)pk, pklen);
1202 46090 : if (rc != DBRES_OK) goto cleanup;
1203 :
1204 46090 : rc = databasevm_step(vm);
1205 46090 : if (rc == DBRES_ROW) result = database_column_int(vm, 0);
1206 0 : else if (rc == DBRES_DONE) result = 0;
1207 :
1208 : cleanup:
1209 46090 : if (result == -1) cloudsync_set_dberror(table->context);
1210 46090 : dbvm_reset(vm);
1211 46090 : return result;
1212 : }
1213 :
1214 45084 : int merge_get_col_version (cloudsync_table_context *table, const char *col_name, const char *pk, int pklen, int64_t *version) {
1215 45084 : dbvm_t *vm = table->meta_col_version_stmt;
1216 :
1217 45084 : int rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
1218 45084 : if (rc != DBRES_OK) goto cleanup;
1219 :
1220 45084 : rc = databasevm_bind_text(vm, 2, col_name, -1);
1221 45084 : if (rc != DBRES_OK) goto cleanup;
1222 :
1223 45084 : rc = databasevm_step(vm);
1224 70045 : if (rc == DBRES_ROW) {
1225 24961 : *version = database_column_int(vm, 0);
1226 24961 : rc = DBRES_OK;
1227 24961 : }
1228 :
1229 : cleanup:
1230 45084 : if ((rc != DBRES_OK) && (rc != DBRES_DONE)) cloudsync_set_dberror(table->context);
1231 45084 : dbvm_reset(vm);
1232 45084 : return rc;
1233 : }
1234 :
1235 25133 : int merge_set_winner_clock (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pk_len, const char *colname, int64_t col_version, int64_t db_version, const char *site_id, int site_len, int64_t seq, int64_t *rowid) {
1236 :
1237 : // get/set site_id
1238 25133 : dbvm_t *vm = data->getset_siteid_stmt;
1239 25133 : int rc = databasevm_bind_blob(vm, 1, (const void *)site_id, site_len);
1240 25133 : if (rc != DBRES_OK) goto cleanup_merge;
1241 :
1242 25133 : rc = databasevm_step(vm);
1243 25133 : if (rc != DBRES_ROW) goto cleanup_merge;
1244 :
1245 25133 : int64_t ord = database_column_int(vm, 0);
1246 25133 : dbvm_reset(vm);
1247 :
1248 25133 : vm = table->meta_winner_clock_stmt;
1249 25133 : rc = databasevm_bind_blob(vm, 1, (const void *)pk, pk_len);
1250 25133 : if (rc != DBRES_OK) goto cleanup_merge;
1251 :
1252 25133 : rc = databasevm_bind_text(vm, 2, (colname) ? colname : CLOUDSYNC_TOMBSTONE_VALUE, -1);
1253 25133 : if (rc != DBRES_OK) goto cleanup_merge;
1254 :
1255 25133 : rc = databasevm_bind_int(vm, 3, col_version);
1256 25133 : if (rc != DBRES_OK) goto cleanup_merge;
1257 :
1258 25133 : rc = databasevm_bind_int(vm, 4, db_version);
1259 25133 : if (rc != DBRES_OK) goto cleanup_merge;
1260 :
1261 25133 : rc = databasevm_bind_int(vm, 5, seq);
1262 25133 : if (rc != DBRES_OK) goto cleanup_merge;
1263 :
1264 25133 : rc = databasevm_bind_int(vm, 6, ord);
1265 25133 : if (rc != DBRES_OK) goto cleanup_merge;
1266 :
1267 25133 : rc = databasevm_step(vm);
1268 50266 : if (rc == DBRES_ROW) {
1269 25133 : *rowid = database_column_int(vm, 0);
1270 25133 : rc = DBRES_OK;
1271 25133 : }
1272 :
1273 : cleanup_merge:
1274 25133 : if (rc != DBRES_OK) cloudsync_set_dberror(data);
1275 25133 : dbvm_reset(vm);
1276 25133 : return rc;
1277 : }
1278 :
1279 : // MARK: - Deferred column-batch merge functions -
1280 :
1281 21342 : static int merge_pending_add (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, const char *col_name, dbvalue_t *col_value, int64_t col_version, int64_t db_version, const char *site_id, int site_len, int64_t seq) {
1282 21342 : merge_pending_batch *batch = data->pending_batch;
1283 :
1284 : // Store table and PK on first entry
1285 21342 : if (batch->table == NULL) {
1286 7702 : batch->table = table;
1287 7702 : batch->pk = (char *)cloudsync_memory_alloc(pklen);
1288 7702 : if (!batch->pk) return cloudsync_set_error(data, "merge_pending_add: out of memory for pk", DBRES_NOMEM);
1289 7702 : memcpy(batch->pk, pk, pklen);
1290 7702 : batch->pk_len = pklen;
1291 7702 : }
1292 :
1293 : // Ensure capacity
1294 21342 : if (batch->count >= batch->capacity) {
1295 502 : int new_cap = batch->capacity ? batch->capacity * 2 : 8;
1296 502 : merge_pending_entry *new_entries = (merge_pending_entry *)cloudsync_memory_realloc(batch->entries, new_cap * sizeof(merge_pending_entry));
1297 502 : if (!new_entries) return cloudsync_set_error(data, "merge_pending_add: out of memory for entries", DBRES_NOMEM);
1298 502 : batch->entries = new_entries;
1299 502 : batch->capacity = new_cap;
1300 502 : }
1301 :
1302 : // Resolve col_name to a stable pointer from the table context
1303 : // (the incoming col_name may point to VM-owned memory that gets freed on reset)
1304 21342 : int col_idx = -1;
1305 21342 : table_column_lookup(table, col_name, true, &col_idx);
1306 21342 : const char *stable_col_name = (col_idx >= 0) ? table_colname(table, col_idx) : NULL;
1307 21342 : if (!stable_col_name) return cloudsync_set_error(data, "merge_pending_add: column not found in table context", DBRES_ERROR);
1308 :
1309 21342 : merge_pending_entry *e = &batch->entries[batch->count];
1310 21342 : e->col_name = stable_col_name;
1311 21342 : e->col_value = col_value ? (dbvalue_t *)database_value_dup(col_value) : NULL;
1312 21342 : e->col_version = col_version;
1313 21342 : e->db_version = db_version;
1314 21342 : e->site_id_len = (site_len <= (int)sizeof(e->site_id)) ? site_len : (int)sizeof(e->site_id);
1315 21342 : memcpy(e->site_id, site_id, e->site_id_len);
1316 21342 : e->seq = seq;
1317 :
1318 21342 : batch->count++;
1319 21342 : return DBRES_OK;
1320 21342 : }
1321 :
1322 18908 : static void merge_pending_free_entries (merge_pending_batch *batch) {
1323 18908 : if (batch->entries) {
1324 35582 : for (int i = 0; i < batch->count; i++) {
1325 21342 : if (batch->entries[i].col_value) {
1326 21342 : database_value_free(batch->entries[i].col_value);
1327 21342 : batch->entries[i].col_value = NULL;
1328 21342 : }
1329 21342 : }
1330 14240 : }
1331 18908 : if (batch->pk) {
1332 7734 : cloudsync_memory_free(batch->pk);
1333 7734 : batch->pk = NULL;
1334 7734 : }
1335 18908 : batch->table = NULL;
1336 18908 : batch->pk_len = 0;
1337 18908 : batch->cl = 0;
1338 18908 : batch->sentinel_pending = false;
1339 18908 : batch->row_exists = false;
1340 18908 : batch->count = 0;
1341 18908 : }
1342 :
1343 18908 : static int merge_flush_pending (cloudsync_context *data) {
1344 18908 : merge_pending_batch *batch = data->pending_batch;
1345 18908 : if (!batch) return DBRES_OK;
1346 :
1347 18908 : int rc = DBRES_OK;
1348 18908 : bool flush_savepoint = false;
1349 :
1350 : // Nothing to write — handle sentinel-only case or skip
1351 18908 : if (batch->count == 0 && !(batch->sentinel_pending && batch->table)) {
1352 11174 : goto cleanup;
1353 : }
1354 :
1355 : // Wrap database operations in a savepoint so that on failure (e.g. RLS
1356 : // denial) the rollback properly releases all executor resources (open
1357 : // relations, snapshots, plan cache) acquired during the failed statement.
1358 7734 : flush_savepoint = (database_begin_savepoint(data, "merge_flush") == DBRES_OK);
1359 :
1360 7734 : if (batch->count == 0) {
1361 : // Sentinel with no winning columns (PK-only row)
1362 30 : dbvm_t *vm = batch->table->real_merge_sentinel_stmt;
1363 30 : rc = pk_decode_prikey(batch->pk, (size_t)batch->pk_len, pk_decode_bind_callback, vm);
1364 30 : if (rc < 0) {
1365 0 : cloudsync_set_dberror(data);
1366 0 : dbvm_reset(vm);
1367 0 : goto cleanup;
1368 : }
1369 30 : SYNCBIT_SET(data);
1370 30 : rc = databasevm_step(vm);
1371 30 : dbvm_reset(vm);
1372 30 : SYNCBIT_RESET(data);
1373 30 : if (rc == DBRES_DONE) rc = DBRES_OK;
1374 30 : if (rc != DBRES_OK) {
1375 0 : cloudsync_set_dberror(data);
1376 0 : goto cleanup;
1377 : }
1378 30 : goto cleanup;
1379 : }
1380 :
1381 : // Check if cached prepared statement can be reused
1382 7704 : cloudsync_table_context *table = batch->table;
1383 7704 : dbvm_t *vm = NULL;
1384 7704 : bool cache_hit = false;
1385 :
1386 14601 : if (batch->cached_vm &&
1387 7202 : batch->cached_row_exists == batch->row_exists &&
1388 6897 : batch->cached_col_count == batch->count) {
1389 6870 : cache_hit = true;
1390 26559 : for (int i = 0; i < batch->count; i++) {
1391 19715 : if (batch->cached_col_names[i] != batch->entries[i].col_name) {
1392 26 : cache_hit = false;
1393 26 : break;
1394 : }
1395 19689 : }
1396 6870 : }
1397 :
1398 7704 : if (cache_hit) {
1399 6844 : vm = batch->cached_vm;
1400 6844 : dbvm_reset(vm);
1401 6844 : } else {
1402 : // Invalidate old cache
1403 860 : if (batch->cached_vm) {
1404 358 : databasevm_finalize(batch->cached_vm);
1405 358 : batch->cached_vm = NULL;
1406 358 : }
1407 :
1408 : // Build multi-column SQL
1409 860 : const char **colnames = (const char **)cloudsync_memory_alloc(batch->count * sizeof(const char *));
1410 860 : if (!colnames) {
1411 0 : rc = cloudsync_set_error(data, "merge_flush_pending: out of memory", DBRES_NOMEM);
1412 0 : goto cleanup;
1413 : }
1414 2513 : for (int i = 0; i < batch->count; i++) {
1415 1653 : colnames[i] = batch->entries[i].col_name;
1416 1653 : }
1417 :
1418 860 : char *sql = batch->row_exists
1419 421 : ? sql_build_update_pk_and_multi_cols(data, table->name, colnames, batch->count, table->schema)
1420 439 : : sql_build_upsert_pk_and_multi_cols(data, table->name, colnames, batch->count, table->schema);
1421 860 : cloudsync_memory_free(colnames);
1422 :
1423 860 : if (!sql) {
1424 0 : rc = cloudsync_set_error(data, "merge_flush_pending: unable to build multi-column upsert SQL", DBRES_ERROR);
1425 0 : goto cleanup;
1426 : }
1427 :
1428 860 : rc = databasevm_prepare(data, sql, &vm, 0);
1429 860 : cloudsync_memory_free(sql);
1430 860 : if (rc != DBRES_OK) {
1431 0 : rc = cloudsync_set_error(data, "merge_flush_pending: unable to prepare statement", rc);
1432 0 : goto cleanup;
1433 : }
1434 :
1435 : // Update cache
1436 860 : batch->cached_vm = vm;
1437 860 : batch->cached_row_exists = batch->row_exists;
1438 860 : batch->cached_col_count = batch->count;
1439 : // Reallocate cached_col_names if needed
1440 860 : if (batch->cached_col_count > 0) {
1441 860 : const char **new_names = (const char **)cloudsync_memory_realloc(
1442 860 : batch->cached_col_names, batch->count * sizeof(const char *));
1443 860 : if (new_names) {
1444 2513 : for (int i = 0; i < batch->count; i++) {
1445 1653 : new_names[i] = batch->entries[i].col_name;
1446 1653 : }
1447 860 : batch->cached_col_names = new_names;
1448 860 : }
1449 860 : }
1450 : }
1451 :
1452 : // Bind PKs (positions 1..npks)
1453 7704 : int npks = pk_decode_prikey(batch->pk, (size_t)batch->pk_len, pk_decode_bind_callback, vm);
1454 7704 : if (npks < 0) {
1455 0 : cloudsync_set_dberror(data);
1456 0 : dbvm_reset(vm);
1457 0 : rc = DBRES_ERROR;
1458 0 : goto cleanup;
1459 : }
1460 :
1461 : // Bind column values (positions npks+1..npks+count)
1462 29046 : for (int i = 0; i < batch->count; i++) {
1463 21342 : merge_pending_entry *e = &batch->entries[i];
1464 21342 : int bind_idx = npks + 1 + i;
1465 21342 : if (e->col_value) {
1466 21342 : rc = databasevm_bind_value(vm, bind_idx, e->col_value);
1467 21342 : } else {
1468 0 : rc = databasevm_bind_null(vm, bind_idx);
1469 : }
1470 21342 : if (rc != DBRES_OK) {
1471 0 : cloudsync_set_dberror(data);
1472 0 : dbvm_reset(vm);
1473 0 : goto cleanup;
1474 : }
1475 21342 : }
1476 :
1477 : // Execute with SYNCBIT and GOS handling
1478 7704 : if (table->algo == table_algo_crdt_gos) table->enabled = 0;
1479 7704 : SYNCBIT_SET(data);
1480 7704 : rc = databasevm_step(vm);
1481 7704 : dbvm_reset(vm);
1482 7704 : SYNCBIT_RESET(data);
1483 7704 : if (table->algo == table_algo_crdt_gos) table->enabled = 1;
1484 :
1485 7704 : if (rc != DBRES_DONE) {
1486 3 : cloudsync_set_dberror(data);
1487 3 : goto cleanup;
1488 : }
1489 7701 : rc = DBRES_OK;
1490 :
1491 : // Call merge_set_winner_clock for each buffered entry
1492 7701 : int64_t rowid = 0;
1493 29035 : for (int i = 0; i < batch->count; i++) {
1494 21334 : merge_pending_entry *e = &batch->entries[i];
1495 42668 : int clock_rc = merge_set_winner_clock(data, table, batch->pk, batch->pk_len,
1496 21334 : e->col_name, e->col_version, e->db_version,
1497 21334 : (const char *)e->site_id, e->site_id_len,
1498 21334 : e->seq, &rowid);
1499 21334 : if (clock_rc != DBRES_OK) {
1500 0 : rc = clock_rc;
1501 0 : goto cleanup;
1502 : }
1503 29035 : }
1504 :
1505 : cleanup:
1506 18908 : merge_pending_free_entries(batch);
1507 18908 : if (flush_savepoint) {
1508 7734 : if (rc == DBRES_OK) database_commit_savepoint(data, "merge_flush");
1509 3 : else database_rollback_savepoint(data, "merge_flush");
1510 7734 : }
1511 18908 : return rc;
1512 18908 : }
1513 :
1514 3167 : int merge_insert_col (cloudsync_context *data, cloudsync_table_context *table, const void *pk, int pklen, const char *col_name, dbvalue_t *col_value, int64_t col_version, int64_t db_version, const char *site_id, int site_len, int64_t seq, int64_t *rowid) {
1515 : int index;
1516 3167 : dbvm_t *vm = table_column_lookup(table, col_name, true, &index);
1517 3167 : if (vm == NULL) return cloudsync_set_error(data, "Unable to retrieve column merge precompiled statement in merge_insert_col", DBRES_MISUSE);
1518 :
1519 : // INSERT INTO table (pk1, pk2, col_name) VALUES (?, ?, ?) ON CONFLICT DO UPDATE SET col_name=?;"
1520 :
1521 : // bind primary key(s)
1522 3167 : int rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, vm);
1523 3167 : if (rc < 0) {
1524 0 : cloudsync_set_dberror(data);
1525 0 : dbvm_reset(vm);
1526 0 : return rc;
1527 : }
1528 :
1529 : // bind value (always bind all expected parameters for correct prepared statement handling)
1530 3167 : if (col_value) {
1531 3167 : rc = databasevm_bind_value(vm, table->npks+1, col_value);
1532 3167 : if (rc == DBRES_OK) rc = databasevm_bind_value(vm, table->npks+2, col_value);
1533 3167 : } else {
1534 0 : rc = databasevm_bind_null(vm, table->npks+1);
1535 0 : if (rc == DBRES_OK) rc = databasevm_bind_null(vm, table->npks+2);
1536 : }
1537 3167 : if (rc != DBRES_OK) {
1538 0 : cloudsync_set_dberror(data);
1539 0 : dbvm_reset(vm);
1540 0 : return rc;
1541 : }
1542 :
1543 : // perform real operation and disable triggers
1544 :
1545 : // in case of GOS we reused the table->col_merge_stmt statement
1546 : // which looks like: INSERT INTO table (pk1, pk2, col_name) VALUES (?, ?, ?) ON CONFLICT DO UPDATE SET col_name=?;"
1547 : // but the UPDATE in the CONFLICT statement would return SQLITE_CONSTRAINT because the trigger raises the error
1548 : // the trick is to disable that trigger before executing the statement
1549 3167 : if (table->algo == table_algo_crdt_gos) table->enabled = 0;
1550 3167 : SYNCBIT_SET(data);
1551 3167 : rc = databasevm_step(vm);
1552 : DEBUG_MERGE("merge_insert(%02x%02x): %s (%d)", data->site_id[UUID_LEN-2], data->site_id[UUID_LEN-1], databasevm_sql(vm), rc);
1553 3167 : dbvm_reset(vm);
1554 3167 : SYNCBIT_RESET(data);
1555 3167 : if (table->algo == table_algo_crdt_gos) table->enabled = 1;
1556 :
1557 3167 : if (rc != DBRES_DONE) {
1558 0 : cloudsync_set_dberror(data);
1559 0 : return rc;
1560 : }
1561 :
1562 3167 : return merge_set_winner_clock(data, table, pk, pklen, col_name, col_version, db_version, site_id, site_len, seq, rowid);
1563 3167 : }
1564 :
1565 162 : int merge_delete (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, const char *colname, int64_t cl, int64_t db_version, const char *site_id, int site_len, int64_t seq, int64_t *rowid) {
1566 162 : int rc = DBRES_OK;
1567 :
1568 : // reset return value
1569 162 : *rowid = 0;
1570 :
1571 : // bind pk
1572 162 : dbvm_t *vm = table->real_merge_delete_stmt;
1573 162 : rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, vm);
1574 162 : if (rc < 0) {
1575 0 : rc = cloudsync_set_dberror(data);
1576 0 : dbvm_reset(vm);
1577 0 : return rc;
1578 : }
1579 :
1580 : // perform real operation and disable triggers
1581 162 : SYNCBIT_SET(data);
1582 162 : rc = databasevm_step(vm);
1583 : DEBUG_MERGE("merge_delete(%02x%02x): %s (%d)", data->site_id[UUID_LEN-2], data->site_id[UUID_LEN-1], databasevm_sql(vm), rc);
1584 162 : dbvm_reset(vm);
1585 162 : SYNCBIT_RESET(data);
1586 162 : if (rc == DBRES_DONE) rc = DBRES_OK;
1587 162 : if (rc != DBRES_OK) {
1588 0 : cloudsync_set_dberror(data);
1589 0 : return rc;
1590 : }
1591 :
1592 162 : rc = merge_set_winner_clock(data, table, pk, pklen, colname, cl, db_version, site_id, site_len, seq, rowid);
1593 162 : if (rc != DBRES_OK) return rc;
1594 :
1595 : // drop clocks _after_ setting the winner clock so we don't lose track of the max db_version!!
1596 : // this must never come before `set_winner_clock`
1597 162 : vm = table->meta_merge_delete_drop;
1598 162 : rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
1599 162 : if (rc == DBRES_OK) rc = databasevm_step(vm);
1600 162 : dbvm_reset(vm);
1601 :
1602 162 : if (rc == DBRES_DONE) rc = DBRES_OK;
1603 162 : if (rc != DBRES_OK) cloudsync_set_dberror(data);
1604 162 : return rc;
1605 162 : }
1606 :
1607 58 : int merge_zeroclock_on_resurrect(cloudsync_table_context *table, int64_t db_version, const char *pk, int pklen) {
1608 58 : dbvm_t *vm = table->meta_zero_clock_stmt;
1609 :
1610 58 : int rc = databasevm_bind_int(vm, 1, db_version);
1611 58 : if (rc != DBRES_OK) goto cleanup;
1612 :
1613 58 : rc = databasevm_bind_blob(vm, 2, (const void *)pk, pklen);
1614 58 : if (rc != DBRES_OK) goto cleanup;
1615 :
1616 58 : rc = databasevm_step(vm);
1617 58 : if (rc == DBRES_DONE) rc = DBRES_OK;
1618 :
1619 : cleanup:
1620 58 : if (rc != DBRES_OK) cloudsync_set_dberror(table->context);
1621 58 : dbvm_reset(vm);
1622 58 : return rc;
1623 : }
1624 :
1625 : // executed only if insert_cl == local_cl
1626 45084 : int merge_did_cid_win (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, dbvalue_t *insert_value, const char *site_id, int site_len, const char *col_name, int64_t col_version, bool *didwin_flag) {
1627 :
1628 45084 : if (col_name == NULL) col_name = CLOUDSYNC_TOMBSTONE_VALUE;
1629 :
1630 : int64_t local_version;
1631 45084 : int rc = merge_get_col_version(table, col_name, pk, pklen, &local_version);
1632 45084 : if (rc == DBRES_DONE) {
1633 : // no rows returned, the incoming change wins if there's nothing there locally
1634 20123 : *didwin_flag = true;
1635 20123 : return DBRES_OK;
1636 : }
1637 24961 : if (rc != DBRES_OK) return rc;
1638 :
1639 : // rc == DBRES_OK, means that a row with a version exists
1640 24961 : if (local_version != col_version) {
1641 1986 : if (col_version > local_version) {*didwin_flag = true; return DBRES_OK;}
1642 743 : if (col_version < local_version) {*didwin_flag = false; return DBRES_OK;}
1643 0 : }
1644 :
1645 : // rc == DBRES_ROW and col_version == local_version, need to compare values
1646 :
1647 : // retrieve col_value precompiled statement
1648 22975 : bool is_block_col = block_is_block_colname(col_name) && table_has_block_cols(table);
1649 : dbvm_t *vm;
1650 22975 : if (is_block_col) {
1651 : // Block column: read value from blocks table (pk + col_name bindings)
1652 43 : vm = table_block_value_read_stmt(table);
1653 43 : if (!vm) return cloudsync_set_error(data, "Unable to retrieve block value read statement in merge_did_cid_win", DBRES_ERROR);
1654 43 : rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
1655 43 : if (rc != DBRES_OK) { dbvm_reset(vm); return cloudsync_set_dberror(data); }
1656 43 : rc = databasevm_bind_text(vm, 2, col_name, -1);
1657 43 : if (rc != DBRES_OK) { dbvm_reset(vm); return cloudsync_set_dberror(data); }
1658 43 : } else {
1659 22932 : vm = table_column_lookup(table, col_name, false, NULL);
1660 22932 : if (!vm) return cloudsync_set_error(data, "Unable to retrieve column value precompiled statement in merge_did_cid_win", DBRES_ERROR);
1661 :
1662 : // bind primary key values
1663 22932 : rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, (void *)vm);
1664 22932 : if (rc < 0) {
1665 0 : rc = cloudsync_set_dberror(data);
1666 0 : dbvm_reset(vm);
1667 0 : return rc;
1668 : }
1669 : }
1670 :
1671 : // execute vm
1672 : dbvalue_t *local_value;
1673 22975 : rc = databasevm_step(vm);
1674 22975 : if (rc == DBRES_DONE) {
1675 : // meta entry exists but the actual value is missing
1676 : // we should allow the value_compare function to make a decision
1677 : // value_compare has been modified to handle the case where lvalue is NULL
1678 2 : local_value = NULL;
1679 2 : rc = DBRES_OK;
1680 22975 : } else if (rc == DBRES_ROW) {
1681 22973 : local_value = database_column_value(vm, 0);
1682 22973 : rc = DBRES_OK;
1683 22973 : } else {
1684 0 : goto cleanup;
1685 : }
1686 :
1687 : // compare values
1688 22975 : int ret = dbutils_value_compare(insert_value, local_value);
1689 : // reset after compare, otherwise local value would be deallocated
1690 22975 : dbvm_reset(vm);
1691 22975 : vm = NULL;
1692 :
1693 22975 : bool compare_site_id = (ret == 0 && data->merge_equal_values == true);
1694 22975 : if (!compare_site_id) {
1695 22975 : *didwin_flag = (ret > 0);
1696 22975 : goto cleanup;
1697 : }
1698 :
1699 : // values are the same and merge_equal_values is true
1700 0 : vm = table->meta_site_id_stmt;
1701 0 : rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
1702 0 : if (rc != DBRES_OK) goto cleanup;
1703 :
1704 0 : rc = databasevm_bind_text(vm, 2, col_name, -1);
1705 0 : if (rc != DBRES_OK) goto cleanup;
1706 :
1707 0 : rc = databasevm_step(vm);
1708 0 : if (rc == DBRES_ROW) {
1709 0 : const void *local_site_id = database_column_blob(vm, 0, NULL);
1710 0 : if (!local_site_id) {
1711 0 : dbvm_reset(vm);
1712 0 : return cloudsync_set_error(data, "NULL site_id in cloudsync table, table is probably corrupted", DBRES_ERROR);
1713 : }
1714 0 : ret = memcmp(site_id, local_site_id, site_len);
1715 0 : *didwin_flag = (ret > 0);
1716 0 : dbvm_reset(vm);
1717 0 : return DBRES_OK;
1718 : }
1719 :
1720 : // handle error condition here
1721 0 : dbvm_reset(vm);
1722 0 : return cloudsync_set_error(data, "Unable to find site_id for previous change, cloudsync table is probably corrupted", DBRES_ERROR);
1723 :
1724 : cleanup:
1725 22975 : if (rc != DBRES_OK) cloudsync_set_dberror(data);
1726 22975 : dbvm_reset(vm);
1727 22975 : return rc;
1728 45084 : }
1729 :
1730 58 : int merge_sentinel_only_insert (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, int64_t cl, int64_t db_version, const char *site_id, int site_len, int64_t seq, int64_t *rowid) {
1731 :
1732 : // reset return value
1733 58 : *rowid = 0;
1734 :
1735 58 : if (data->pending_batch == NULL) {
1736 : // Immediate mode: execute base table INSERT
1737 0 : dbvm_t *vm = table->real_merge_sentinel_stmt;
1738 0 : int rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, vm);
1739 0 : if (rc < 0) {
1740 0 : rc = cloudsync_set_dberror(data);
1741 0 : dbvm_reset(vm);
1742 0 : return rc;
1743 : }
1744 :
1745 0 : SYNCBIT_SET(data);
1746 0 : rc = databasevm_step(vm);
1747 0 : dbvm_reset(vm);
1748 0 : SYNCBIT_RESET(data);
1749 0 : if (rc == DBRES_DONE) rc = DBRES_OK;
1750 0 : if (rc != DBRES_OK) {
1751 0 : cloudsync_set_dberror(data);
1752 0 : return rc;
1753 : }
1754 0 : } else {
1755 : // Batch mode: skip base table INSERT, the batch flush will create the row
1756 58 : merge_pending_batch *batch = data->pending_batch;
1757 58 : batch->sentinel_pending = true;
1758 58 : if (batch->table == NULL) {
1759 32 : batch->table = table;
1760 32 : batch->pk = (char *)cloudsync_memory_alloc(pklen);
1761 32 : if (!batch->pk) return cloudsync_set_error(data, "merge_sentinel_only_insert: out of memory for pk", DBRES_NOMEM);
1762 32 : memcpy(batch->pk, pk, pklen);
1763 32 : batch->pk_len = pklen;
1764 32 : }
1765 : }
1766 :
1767 : // Metadata operations always execute regardless of batch mode
1768 58 : int rc = merge_zeroclock_on_resurrect(table, db_version, pk, pklen);
1769 58 : if (rc != DBRES_OK) return rc;
1770 :
1771 58 : return merge_set_winner_clock(data, table, pk, pklen, NULL, cl, db_version, site_id, site_len, seq, rowid);
1772 58 : }
1773 :
1774 : // MARK: - Block-level merge helpers -
1775 :
1776 : // Store a block value in the blocks table
1777 379 : static int block_store_value (cloudsync_context *data, cloudsync_table_context *table, const void *pk, int pklen, const char *block_colname, dbvalue_t *col_value) {
1778 379 : dbvm_t *vm = table->block_value_write_stmt;
1779 379 : if (!vm) return cloudsync_set_error(data, "block_store_value: blocks table not initialized", DBRES_MISUSE);
1780 :
1781 379 : int rc = databasevm_bind_blob(vm, 1, pk, pklen);
1782 379 : if (rc != DBRES_OK) goto cleanup;
1783 379 : rc = databasevm_bind_text(vm, 2, block_colname, -1);
1784 379 : if (rc != DBRES_OK) goto cleanup;
1785 379 : if (col_value) {
1786 379 : rc = databasevm_bind_value(vm, 3, col_value);
1787 379 : } else {
1788 0 : rc = databasevm_bind_null(vm, 3);
1789 : }
1790 379 : if (rc != DBRES_OK) goto cleanup;
1791 :
1792 379 : rc = databasevm_step(vm);
1793 379 : if (rc == DBRES_DONE) rc = DBRES_OK;
1794 :
1795 : cleanup:
1796 379 : if (rc != DBRES_OK) cloudsync_set_dberror(data);
1797 379 : databasevm_reset(vm);
1798 379 : return rc;
1799 379 : }
1800 :
1801 : // Delete a block value from the blocks table
1802 74 : static int block_delete_value (cloudsync_context *data, cloudsync_table_context *table, const void *pk, int pklen, const char *block_colname) {
1803 74 : dbvm_t *vm = table->block_value_delete_stmt;
1804 74 : if (!vm) return cloudsync_set_error(data, "block_delete_value: blocks table not initialized", DBRES_MISUSE);
1805 :
1806 74 : int rc = databasevm_bind_blob(vm, 1, pk, pklen);
1807 74 : if (rc != DBRES_OK) goto cleanup;
1808 74 : rc = databasevm_bind_text(vm, 2, block_colname, -1);
1809 74 : if (rc != DBRES_OK) goto cleanup;
1810 :
1811 74 : rc = databasevm_step(vm);
1812 74 : if (rc == DBRES_DONE) rc = DBRES_OK;
1813 :
1814 : cleanup:
1815 74 : if (rc != DBRES_OK) cloudsync_set_dberror(data);
1816 74 : databasevm_reset(vm);
1817 74 : return rc;
1818 74 : }
1819 :
1820 : // Materialize all alive blocks for a base column into the base table
1821 462 : int block_materialize_column (cloudsync_context *data, cloudsync_table_context *table, const void *pk, int pklen, const char *base_col_name) {
1822 462 : if (!table->block_list_stmt) return cloudsync_set_error(data, "block_materialize_column: blocks table not initialized", DBRES_MISUSE);
1823 :
1824 : // Find column index and delimiter
1825 462 : int col_idx = -1;
1826 468 : for (int i = 0; i < table->ncols; i++) {
1827 468 : if (strcasecmp(table->col_name[i], base_col_name) == 0) {
1828 462 : col_idx = i;
1829 462 : break;
1830 : }
1831 6 : }
1832 462 : if (col_idx < 0) return cloudsync_set_error(data, "block_materialize_column: column not found", DBRES_ERROR);
1833 462 : const char *delimiter = table->col_delimiter[col_idx] ? table->col_delimiter[col_idx] : BLOCK_DEFAULT_DELIMITER;
1834 :
1835 : // Build the LIKE pattern for block col_names: "base_col\x1F%"
1836 462 : char *like_pattern = block_build_colname(base_col_name, "%");
1837 462 : if (!like_pattern) return DBRES_NOMEM;
1838 :
1839 : // Query alive blocks from blocks table joined with metadata
1840 : // block_list_stmt: SELECT b.col_value FROM blocks b JOIN meta m
1841 : // ON b.pk = m.pk AND b.col_name = m.col_name
1842 : // WHERE b.pk = ? AND b.col_name LIKE ? AND m.col_version % 2 = 1
1843 : // ORDER BY b.col_name
1844 462 : dbvm_t *vm = table->block_list_stmt;
1845 462 : int rc = databasevm_bind_blob(vm, 1, pk, pklen);
1846 462 : if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_reset(vm); return rc; }
1847 462 : rc = databasevm_bind_text(vm, 2, like_pattern, -1);
1848 462 : if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_reset(vm); return rc; }
1849 : // Bind pk again for the join condition (parameter 3)
1850 462 : rc = databasevm_bind_blob(vm, 3, pk, pklen);
1851 462 : if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_reset(vm); return rc; }
1852 462 : rc = databasevm_bind_text(vm, 4, like_pattern, -1);
1853 462 : if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_reset(vm); return rc; }
1854 :
1855 : // Collect block values
1856 462 : const char **block_values = NULL;
1857 462 : int block_count = 0;
1858 462 : int block_cap = 0;
1859 :
1860 22695 : while ((rc = databasevm_step(vm)) == DBRES_ROW) {
1861 22233 : const char *value = database_column_text(vm, 0);
1862 22233 : if (block_count >= block_cap) {
1863 1076 : int new_cap = block_cap ? block_cap * 2 : 16;
1864 1076 : const char **new_arr = (const char **)cloudsync_memory_realloc((void *)block_values, (uint64_t)(new_cap * sizeof(char *)));
1865 1076 : if (!new_arr) { rc = DBRES_NOMEM; break; }
1866 1076 : block_values = new_arr;
1867 1076 : block_cap = new_cap;
1868 1076 : }
1869 22233 : block_values[block_count] = value ? cloudsync_string_dup(value) : cloudsync_string_dup("");
1870 22233 : block_count++;
1871 : }
1872 462 : databasevm_reset(vm);
1873 462 : cloudsync_memory_free(like_pattern);
1874 :
1875 462 : if (rc != DBRES_DONE && rc != DBRES_OK && rc != DBRES_ROW) {
1876 : // Free collected values
1877 0 : for (int i = 0; i < block_count; i++) cloudsync_memory_free((void *)block_values[i]);
1878 0 : if (block_values) cloudsync_memory_free((void *)block_values);
1879 0 : return cloudsync_set_dberror(data);
1880 : }
1881 :
1882 : // Materialize text (NULL when no alive blocks)
1883 462 : char *text = (block_count > 0) ? block_materialize_text(block_values, block_count, delimiter) : NULL;
1884 22695 : for (int i = 0; i < block_count; i++) cloudsync_memory_free((void *)block_values[i]);
1885 462 : if (block_values) cloudsync_memory_free((void *)block_values);
1886 462 : if (block_count > 0 && !text) return DBRES_NOMEM;
1887 :
1888 : // Update the base table column via the col_merge_stmt (with triggers disabled)
1889 462 : dbvm_t *merge_vm = table->col_merge_stmt[col_idx];
1890 462 : if (!merge_vm) { cloudsync_memory_free(text); return DBRES_ERROR; }
1891 :
1892 : // Bind PKs
1893 462 : rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, merge_vm);
1894 462 : if (rc < 0) { cloudsync_memory_free(text); databasevm_reset(merge_vm); return DBRES_ERROR; }
1895 :
1896 : // Bind the text value twice (INSERT value + ON CONFLICT UPDATE value)
1897 462 : int npks = table->npks;
1898 462 : if (text) {
1899 458 : rc = databasevm_bind_text(merge_vm, npks + 1, text, -1);
1900 458 : if (rc != DBRES_OK) { cloudsync_memory_free(text); databasevm_reset(merge_vm); return rc; }
1901 458 : rc = databasevm_bind_text(merge_vm, npks + 2, text, -1);
1902 458 : if (rc != DBRES_OK) { cloudsync_memory_free(text); databasevm_reset(merge_vm); return rc; }
1903 458 : } else {
1904 4 : rc = databasevm_bind_null(merge_vm, npks + 1);
1905 4 : if (rc != DBRES_OK) { databasevm_reset(merge_vm); return rc; }
1906 4 : rc = databasevm_bind_null(merge_vm, npks + 2);
1907 4 : if (rc != DBRES_OK) { databasevm_reset(merge_vm); return rc; }
1908 : }
1909 :
1910 : // Execute with triggers disabled
1911 462 : table->enabled = 0;
1912 462 : SYNCBIT_SET(data);
1913 462 : rc = databasevm_step(merge_vm);
1914 462 : databasevm_reset(merge_vm);
1915 462 : SYNCBIT_RESET(data);
1916 462 : table->enabled = 1;
1917 :
1918 462 : cloudsync_memory_free(text);
1919 :
1920 462 : if (rc == DBRES_DONE) rc = DBRES_OK;
1921 462 : if (rc != DBRES_OK) return cloudsync_set_dberror(data);
1922 462 : return DBRES_OK;
1923 462 : }
1924 :
1925 : // Accessor for has_block_cols flag
1926 1024 : bool table_has_block_cols (cloudsync_table_context *table) {
1927 1024 : return table && table->has_block_cols;
1928 : }
1929 :
1930 : // Get block column algo for a given column index
1931 11587 : col_algo_t table_col_algo (cloudsync_table_context *table, int index) {
1932 11587 : if (!table || !table->col_algo || index < 0 || index >= table->ncols) return col_algo_normal;
1933 11587 : return table->col_algo[index];
1934 11587 : }
1935 :
1936 : // Get block delimiter for a given column index
1937 137 : const char *table_col_delimiter (cloudsync_table_context *table, int index) {
1938 137 : if (!table || !table->col_delimiter || index < 0 || index >= table->ncols) return BLOCK_DEFAULT_DELIMITER;
1939 137 : return table->col_delimiter[index] ? table->col_delimiter[index] : BLOCK_DEFAULT_DELIMITER;
1940 137 : }
1941 :
1942 : // Block column struct accessors (for use outside cloudsync.c where struct is opaque)
1943 1024 : dbvm_t *table_block_value_read_stmt (cloudsync_table_context *table) { return table ? table->block_value_read_stmt : NULL; }
1944 506 : dbvm_t *table_block_value_write_stmt (cloudsync_table_context *table) { return table ? table->block_value_write_stmt : NULL; }
1945 93 : dbvm_t *table_block_list_stmt (cloudsync_table_context *table) { return table ? table->block_list_stmt : NULL; }
1946 93 : const char *table_blocks_ref (cloudsync_table_context *table) { return table ? table->blocks_ref : NULL; }
1947 :
1948 3 : void table_set_col_delimiter (cloudsync_table_context *table, int col_idx, const char *delimiter) {
1949 3 : if (!table || !table->col_delimiter || col_idx < 0 || col_idx >= table->ncols) return;
1950 3 : if (table->col_delimiter[col_idx]) cloudsync_memory_free(table->col_delimiter[col_idx]);
1951 3 : table->col_delimiter[col_idx] = delimiter ? cloudsync_string_dup(delimiter) : NULL;
1952 3 : }
1953 :
1954 : // Find column index by name
1955 127 : int table_col_index (cloudsync_table_context *table, const char *col_name) {
1956 127 : if (!table || !col_name) return -1;
1957 131 : for (int i = 0; i < table->ncols; i++) {
1958 131 : if (strcasecmp(table->col_name[i], col_name) == 0) return i;
1959 4 : }
1960 0 : return -1;
1961 127 : }
1962 :
1963 46090 : int merge_insert (cloudsync_context *data, cloudsync_table_context *table, const char *insert_pk, int insert_pk_len, int64_t insert_cl, const char *insert_name, dbvalue_t *insert_value, int64_t insert_col_version, int64_t insert_db_version, const char *insert_site_id, int insert_site_id_len, int64_t insert_seq, int64_t *rowid) {
1964 : // Handle DWS and AWS algorithms here
1965 : // Delete-Wins Set (DWS): table_algo_crdt_dws
1966 : // Add-Wins Set (AWS): table_algo_crdt_aws
1967 :
1968 : // Causal-Length Set (CLS) Algorithm (default)
1969 :
1970 : // compute the local causal length for the row based on the primary key
1971 : // the causal length is used to determine the order of operations and resolve conflicts.
1972 46090 : int64_t local_cl = merge_get_local_cl(table, insert_pk, insert_pk_len);
1973 46090 : if (local_cl < 0) return cloudsync_set_error(data, "Unable to compute local causal length", DBRES_ERROR);
1974 :
1975 : // if the incoming causal length is older than the local causal length, we can safely ignore it
1976 : // because the local changes are more recent
1977 46090 : if (insert_cl < local_cl) return DBRES_OK;
1978 :
1979 : // check if the operation is a delete by examining the causal length
1980 : // even causal lengths typically signify delete operations
1981 45880 : bool is_delete = (insert_cl % 2 == 0);
1982 45880 : if (is_delete) {
1983 : // if it's a delete, check if the local state is at the same causal length
1984 : // if it is, no further action is needed
1985 606 : if (local_cl == insert_cl) return DBRES_OK;
1986 :
1987 : // perform a delete merge if the causal length is newer than the local one
1988 324 : int rc = merge_delete(data, table, insert_pk, insert_pk_len, insert_name, insert_col_version,
1989 162 : insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid);
1990 162 : if (rc != DBRES_OK) cloudsync_set_error(data, "Unable to perform merge_delete", rc);
1991 162 : return rc;
1992 : }
1993 :
1994 : // if the operation is a sentinel-only insert (indicating a new row or resurrected row with no column update), handle it separately.
1995 45274 : bool is_sentinel_only = (strcmp(insert_name, CLOUDSYNC_TOMBSTONE_VALUE) == 0);
1996 45274 : if (is_sentinel_only) {
1997 190 : if (local_cl == insert_cl) return DBRES_OK;
1998 :
1999 : // perform a sentinel-only insert to track the existence of the row
2000 116 : int rc = merge_sentinel_only_insert(data, table, insert_pk, insert_pk_len, insert_col_version,
2001 58 : insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid);
2002 58 : if (rc != DBRES_OK) cloudsync_set_error(data, "Unable to perform merge_sentinel_only_insert", rc);
2003 58 : return rc;
2004 : }
2005 :
2006 : // from this point I can be sure that insert_name is not sentinel
2007 :
2008 : // handle the case where a row is being resurrected (e.g., after a delete, a new insert for the same row)
2009 : // odd causal lengths can "resurrect" rows
2010 45084 : bool needs_resurrect = (insert_cl > local_cl && insert_cl % 2 == 1);
2011 45084 : bool row_exists_locally = local_cl != 0;
2012 :
2013 : // if a resurrection is needed, insert a sentinel to mark the row as alive
2014 : // this handles out-of-order deliveries where the row was deleted and is now being re-inserted
2015 45084 : if (needs_resurrect && (row_exists_locally || (!row_exists_locally && insert_cl > 1))) {
2016 0 : int rc = merge_sentinel_only_insert(data, table, insert_pk, insert_pk_len, insert_cl,
2017 0 : insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid);
2018 0 : if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to perform merge_sentinel_only_insert", rc);
2019 0 : }
2020 :
2021 : // at this point, we determine whether the incoming change wins based on causal length
2022 : // this can be due to a resurrection, a non-existent local row, or a conflict resolution
2023 45084 : bool flag = false;
2024 45084 : int rc = merge_did_cid_win(data, table, insert_pk, insert_pk_len, insert_value, insert_site_id, insert_site_id_len, insert_name, insert_col_version, &flag);
2025 45084 : if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to perform merge_did_cid_win", rc);
2026 :
2027 : // check if the incoming change wins and should be applied
2028 45084 : bool does_cid_win = ((needs_resurrect) || (!row_exists_locally) || (flag));
2029 45084 : if (!does_cid_win) return DBRES_OK;
2030 :
2031 : // Block-level LWW: if the incoming col_name is a block entry (contains \x1F),
2032 : // bypass the normal base-table write and instead store the value in the blocks table.
2033 : // The base table column will be materialized from all alive blocks.
2034 21771 : if (block_is_block_colname(insert_name) && table->has_block_cols) {
2035 : // Store or delete block value in blocks table depending on tombstone status
2036 412 : if (insert_col_version % 2 == 0) {
2037 : // Tombstone: remove from blocks table
2038 33 : rc = block_delete_value(data, table, insert_pk, insert_pk_len, insert_name);
2039 33 : } else {
2040 379 : rc = block_store_value(data, table, insert_pk, insert_pk_len, insert_name, insert_value);
2041 : }
2042 412 : if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to store/delete block value", rc);
2043 :
2044 : // Set winner clock in metadata
2045 824 : rc = merge_set_winner_clock(data, table, insert_pk, insert_pk_len, insert_name,
2046 412 : insert_col_version, insert_db_version,
2047 412 : insert_site_id, insert_site_id_len, insert_seq, rowid);
2048 412 : if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to set winner clock for block", rc);
2049 :
2050 : // Materialize the full column from blocks into the base table
2051 412 : char *base_col = block_extract_base_colname(insert_name);
2052 412 : if (base_col) {
2053 412 : rc = block_materialize_column(data, table, insert_pk, insert_pk_len, base_col);
2054 412 : cloudsync_memory_free(base_col);
2055 412 : if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to materialize block column", rc);
2056 412 : }
2057 :
2058 412 : return DBRES_OK;
2059 : }
2060 :
2061 : // perform the final column insert or update if the incoming change wins
2062 21359 : if (data->pending_batch) {
2063 : // Propagate row_exists_locally to the batch on the first winning column.
2064 : // This lets merge_flush_pending choose UPDATE vs INSERT ON CONFLICT,
2065 : // which matters when RLS policies reference columns not in the payload.
2066 21342 : if (data->pending_batch->table == NULL) {
2067 7702 : data->pending_batch->row_exists = row_exists_locally;
2068 7702 : }
2069 21342 : rc = merge_pending_add(data, table, insert_pk, insert_pk_len, insert_name, insert_value, insert_col_version, insert_db_version, insert_site_id, insert_site_id_len, insert_seq);
2070 21342 : if (rc != DBRES_OK) cloudsync_set_error(data, "Unable to perform merge_pending_add", rc);
2071 21342 : } else {
2072 17 : rc = merge_insert_col(data, table, insert_pk, insert_pk_len, insert_name, insert_value, insert_col_version, insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid);
2073 17 : if (rc != DBRES_OK) cloudsync_set_error(data, "Unable to perform merge_insert_col", rc);
2074 : }
2075 :
2076 21359 : return rc;
2077 46090 : }
2078 :
2079 : // MARK: - Block column setup -
2080 :
2081 : // Migrate existing tracked rows to block format when block-level LWW is first enabled on a column.
2082 : // Scans the metadata table for alive rows with the plain col_name entry (not yet block entries),
2083 : // reads each row's current value from the base table, splits it into blocks, and inserts
2084 : // the block entries into both the blocks table and the metadata table.
2085 : // Uses INSERT OR IGNORE semantics so the operation is safe to call multiple times.
2086 73 : static int block_migrate_existing_rows (cloudsync_context *data, cloudsync_table_context *table, int col_idx) {
2087 73 : const char *col_name = table->col_name[col_idx];
2088 73 : if (!col_name || !table->meta_ref || !table->blocks_ref) return DBRES_OK;
2089 :
2090 73 : const char *delim = table->col_delimiter[col_idx] ? table->col_delimiter[col_idx] : BLOCK_DEFAULT_DELIMITER;
2091 73 : int64_t db_version = cloudsync_dbversion_next(data, CLOUDSYNC_VALUE_NOTSET);
2092 :
2093 : // Phase 1: collect all existing PKs that have an alive regular col_name entry
2094 : // AND do not yet have any entries in the blocks table for this column.
2095 : // The NOT IN filter makes this idempotent: rows that were already migrated
2096 : // (or had their blocks created via INSERT) are skipped on subsequent calls.
2097 : // We collect PKs before writing so that writes to the metadata table (Phase 2)
2098 : // do not perturb the read cursor on the same table.
2099 73 : char *like_pattern = block_build_colname(col_name, "%");
2100 73 : if (!like_pattern) return DBRES_NOMEM;
2101 :
2102 73 : char *scan_sql = cloudsync_memory_mprintf(SQL_META_SCAN_COL_FOR_MIGRATION, table->meta_ref, table->blocks_ref);
2103 73 : if (!scan_sql) { cloudsync_memory_free(like_pattern); return DBRES_NOMEM; }
2104 73 : dbvm_t *scan_vm = NULL;
2105 73 : int rc = databasevm_prepare(data, scan_sql, &scan_vm, 0);
2106 73 : cloudsync_memory_free(scan_sql);
2107 73 : if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); return rc; }
2108 :
2109 73 : rc = databasevm_bind_text(scan_vm, 1, col_name, -1);
2110 73 : if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_finalize(scan_vm); return rc; }
2111 : // Bind like_pattern as ?2 and keep it alive until after all scan steps complete,
2112 : // because databasevm_bind_text uses SQLITE_STATIC (no copy).
2113 73 : rc = databasevm_bind_text(scan_vm, 2, like_pattern, -1);
2114 73 : if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_finalize(scan_vm); return rc; }
2115 :
2116 : // Collect pk blobs into a dynamically-grown array of owned copies
2117 73 : void **pks = NULL;
2118 73 : size_t *pklens = NULL;
2119 73 : int pk_count = 0;
2120 73 : int pk_cap = 0;
2121 :
2122 75 : while ((rc = databasevm_step(scan_vm)) == DBRES_ROW) {
2123 2 : size_t pklen = 0;
2124 2 : const void *pk = database_column_blob(scan_vm, 0, &pklen);
2125 2 : if (!pk || pklen == 0) continue;
2126 :
2127 2 : if (pk_count >= pk_cap) {
2128 1 : int new_cap = pk_cap ? pk_cap * 2 : 8;
2129 1 : void **new_pks = (void **)cloudsync_memory_realloc(pks, (uint64_t)(new_cap * sizeof(void *)));
2130 1 : size_t *new_pklens = (size_t *)cloudsync_memory_realloc(pklens, (uint64_t)(new_cap * sizeof(size_t)));
2131 1 : if (!new_pks || !new_pklens) {
2132 0 : cloudsync_memory_free(new_pks ? new_pks : pks);
2133 0 : cloudsync_memory_free(new_pklens ? new_pklens : pklens);
2134 0 : databasevm_finalize(scan_vm);
2135 0 : return DBRES_NOMEM;
2136 : }
2137 1 : pks = new_pks;
2138 1 : pklens = new_pklens;
2139 1 : pk_cap = new_cap;
2140 1 : }
2141 :
2142 2 : pks[pk_count] = cloudsync_memory_alloc((uint64_t)pklen);
2143 2 : if (!pks[pk_count]) { rc = DBRES_NOMEM; break; }
2144 2 : memcpy(pks[pk_count], pk, pklen);
2145 2 : pklens[pk_count] = pklen;
2146 2 : pk_count++;
2147 : }
2148 :
2149 73 : databasevm_finalize(scan_vm);
2150 73 : cloudsync_memory_free(like_pattern); // safe to free after scan_vm is finalized
2151 73 : if (rc != DBRES_DONE && rc != DBRES_OK) {
2152 0 : for (int i = 0; i < pk_count; i++) cloudsync_memory_free(pks[i]);
2153 0 : cloudsync_memory_free(pks);
2154 0 : cloudsync_memory_free(pklens);
2155 0 : return rc;
2156 : }
2157 :
2158 73 : if (pk_count == 0) {
2159 72 : cloudsync_memory_free(pks);
2160 72 : cloudsync_memory_free(pklens);
2161 72 : return DBRES_OK;
2162 : }
2163 :
2164 : // Phase 2: for each collected PK, read the column value, split into blocks,
2165 : // and insert into the blocks table + metadata using INSERT OR IGNORE.
2166 :
2167 1 : char *meta_sql = cloudsync_memory_mprintf(SQL_META_INSERT_BLOCK_IGNORE, table->meta_ref);
2168 1 : if (!meta_sql) { rc = DBRES_NOMEM; goto cleanup_pks; }
2169 1 : dbvm_t *meta_vm = NULL;
2170 1 : rc = databasevm_prepare(data, meta_sql, &meta_vm, 0);
2171 1 : cloudsync_memory_free(meta_sql);
2172 1 : if (rc != DBRES_OK) goto cleanup_pks;
2173 :
2174 1 : char *blocks_sql = cloudsync_memory_mprintf(SQL_BLOCKS_INSERT_IGNORE, table->blocks_ref);
2175 1 : if (!blocks_sql) { databasevm_finalize(meta_vm); rc = DBRES_NOMEM; goto cleanup_pks; }
2176 1 : dbvm_t *blocks_vm = NULL;
2177 1 : rc = databasevm_prepare(data, blocks_sql, &blocks_vm, 0);
2178 1 : cloudsync_memory_free(blocks_sql);
2179 1 : if (rc != DBRES_OK) { databasevm_finalize(meta_vm); goto cleanup_pks; }
2180 :
2181 1 : dbvm_t *val_vm = (dbvm_t *)table_column_lookup(table, col_name, false, NULL);
2182 :
2183 3 : for (int p = 0; p < pk_count; p++) {
2184 2 : const void *pk = pks[p];
2185 2 : size_t pklen = pklens[p];
2186 :
2187 2 : if (!val_vm) continue;
2188 :
2189 : // Read current column value from the base table
2190 2 : int bind_rc = pk_decode_prikey((char *)pk, pklen, pk_decode_bind_callback, (void *)val_vm);
2191 2 : if (bind_rc < 0) { databasevm_reset(val_vm); continue; }
2192 :
2193 2 : int step_rc = databasevm_step(val_vm);
2194 2 : const char *text = (step_rc == DBRES_ROW) ? database_column_text(val_vm, 0) : NULL;
2195 : // Make a copy of text before resetting val_vm, as the pointer is only valid until reset
2196 2 : char *text_copy = text ? cloudsync_string_dup(text) : NULL;
2197 2 : databasevm_reset(val_vm);
2198 :
2199 2 : if (!text_copy) continue; // NULL column value: nothing to migrate
2200 :
2201 : // Split text into blocks and store each one
2202 2 : block_list_t *blocks = block_split(text_copy, delim);
2203 2 : cloudsync_memory_free(text_copy);
2204 2 : if (!blocks) continue;
2205 :
2206 2 : char **positions = block_initial_positions(blocks->count);
2207 2 : if (positions) {
2208 7 : for (int b = 0; b < blocks->count; b++) {
2209 5 : char *block_cn = block_build_colname(col_name, positions[b]);
2210 5 : if (block_cn) {
2211 : // Metadata entry (skip if this block position already exists)
2212 5 : databasevm_bind_blob(meta_vm, 1, pk, (int)pklen);
2213 5 : databasevm_bind_text(meta_vm, 2, block_cn, -1);
2214 5 : databasevm_bind_int(meta_vm, 3, 1); // col_version = 1 (alive)
2215 5 : databasevm_bind_int(meta_vm, 4, db_version);
2216 5 : databasevm_bind_int(meta_vm, 5, cloudsync_bumpseq(data));
2217 5 : databasevm_step(meta_vm);
2218 5 : databasevm_reset(meta_vm);
2219 :
2220 : // Block value (skip if this block position already exists)
2221 5 : databasevm_bind_blob(blocks_vm, 1, pk, (int)pklen);
2222 5 : databasevm_bind_text(blocks_vm, 2, block_cn, -1);
2223 5 : databasevm_bind_text(blocks_vm, 3, blocks->entries[b].content, -1);
2224 5 : databasevm_step(blocks_vm);
2225 5 : databasevm_reset(blocks_vm);
2226 :
2227 5 : cloudsync_memory_free(block_cn);
2228 5 : }
2229 5 : cloudsync_memory_free(positions[b]);
2230 5 : }
2231 2 : cloudsync_memory_free(positions);
2232 2 : }
2233 2 : block_list_free(blocks);
2234 2 : }
2235 :
2236 1 : databasevm_finalize(meta_vm);
2237 1 : databasevm_finalize(blocks_vm);
2238 1 : rc = DBRES_OK;
2239 :
2240 : cleanup_pks:
2241 3 : for (int i = 0; i < pk_count; i++) cloudsync_memory_free(pks[i]);
2242 1 : cloudsync_memory_free(pks);
2243 1 : cloudsync_memory_free(pklens);
2244 1 : return rc;
2245 73 : }
2246 :
2247 74 : int cloudsync_setup_block_column (cloudsync_context *data, const char *table_name, const char *col_name, const char *delimiter, bool persist) {
2248 74 : cloudsync_table_context *table = table_lookup(data, table_name);
2249 74 : if (!table) return cloudsync_set_error(data, "cloudsync_setup_block_column: table not found", DBRES_ERROR);
2250 :
2251 : // Find column index
2252 74 : int col_idx = table_col_index(table, col_name);
2253 74 : if (col_idx < 0) {
2254 : char buf[1024];
2255 0 : snprintf(buf, sizeof(buf), "cloudsync_setup_block_column: column '%s' not found in table '%s'", col_name, table_name);
2256 0 : return cloudsync_set_error(data, buf, DBRES_ERROR);
2257 : }
2258 :
2259 : // Set column algo
2260 74 : table->col_algo[col_idx] = col_algo_block;
2261 74 : table->has_block_cols = true;
2262 :
2263 : // Set delimiter (can be NULL for default)
2264 74 : if (table->col_delimiter[col_idx]) {
2265 0 : cloudsync_memory_free(table->col_delimiter[col_idx]);
2266 0 : table->col_delimiter[col_idx] = NULL;
2267 0 : }
2268 74 : if (delimiter) {
2269 1 : table->col_delimiter[col_idx] = cloudsync_string_dup(delimiter);
2270 1 : }
2271 :
2272 : // Create blocks table if not already done
2273 74 : if (!table->blocks_ref) {
2274 71 : table->blocks_ref = database_build_blocks_ref(table->schema, table->name);
2275 71 : if (!table->blocks_ref) return DBRES_NOMEM;
2276 :
2277 : // CREATE TABLE IF NOT EXISTS
2278 71 : char *sql = cloudsync_memory_mprintf(SQL_BLOCKS_CREATE_TABLE, table->blocks_ref);
2279 71 : if (!sql) return DBRES_NOMEM;
2280 :
2281 71 : int rc = database_exec(data, sql);
2282 71 : cloudsync_memory_free(sql);
2283 71 : if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to create blocks table", rc);
2284 :
2285 : // Prepare block statements
2286 : // Write: upsert into blocks (pk, col_name, col_value)
2287 71 : sql = cloudsync_memory_mprintf(SQL_BLOCKS_UPSERT, table->blocks_ref);
2288 71 : if (!sql) return DBRES_NOMEM;
2289 71 : rc = databasevm_prepare(data, sql, (void **)&table->block_value_write_stmt, DBFLAG_PERSISTENT);
2290 71 : cloudsync_memory_free(sql);
2291 71 : if (rc != DBRES_OK) return rc;
2292 :
2293 : // Read: SELECT col_value FROM blocks WHERE pk = ? AND col_name = ?
2294 71 : sql = cloudsync_memory_mprintf(SQL_BLOCKS_SELECT, table->blocks_ref);
2295 71 : if (!sql) return DBRES_NOMEM;
2296 71 : rc = databasevm_prepare(data, sql, (void **)&table->block_value_read_stmt, DBFLAG_PERSISTENT);
2297 71 : cloudsync_memory_free(sql);
2298 71 : if (rc != DBRES_OK) return rc;
2299 :
2300 : // Delete: DELETE FROM blocks WHERE pk = ? AND col_name = ?
2301 71 : sql = cloudsync_memory_mprintf(SQL_BLOCKS_DELETE, table->blocks_ref);
2302 71 : if (!sql) return DBRES_NOMEM;
2303 71 : rc = databasevm_prepare(data, sql, (void **)&table->block_value_delete_stmt, DBFLAG_PERSISTENT);
2304 71 : cloudsync_memory_free(sql);
2305 71 : if (rc != DBRES_OK) return rc;
2306 :
2307 : // List alive blocks for materialization
2308 71 : sql = cloudsync_memory_mprintf(SQL_BLOCKS_LIST_ALIVE, table->blocks_ref, table->meta_ref);
2309 71 : if (!sql) return DBRES_NOMEM;
2310 71 : rc = databasevm_prepare(data, sql, (void **)&table->block_list_stmt, DBFLAG_PERSISTENT);
2311 71 : cloudsync_memory_free(sql);
2312 71 : if (rc != DBRES_OK) return rc;
2313 71 : }
2314 :
2315 : // Persist settings (skipped when called from the settings loader, since
2316 : // writing to cloudsync_table_settings while sqlite3_exec is iterating it
2317 : // re-feeds the rewritten row to the cursor and causes an infinite loop).
2318 74 : if (persist) {
2319 73 : int rc = dbutils_table_settings_set_key_value(data, table_name, col_name, "algo", "block");
2320 73 : if (rc != DBRES_OK) return rc;
2321 :
2322 73 : if (delimiter) {
2323 0 : rc = dbutils_table_settings_set_key_value(data, table_name, col_name, "delimiter", delimiter);
2324 0 : if (rc != DBRES_OK) return rc;
2325 0 : }
2326 :
2327 : // Migrate any existing tracked rows: populate the blocks table and metadata with
2328 : // block entries derived from the current column value, so that subsequent UPDATE
2329 : // operations can diff against the real existing state instead of treating everything
2330 : // as new, and so this node participates correctly in LWW conflict resolution.
2331 73 : rc = block_migrate_existing_rows(data, table, col_idx);
2332 73 : if (rc != DBRES_OK) return rc;
2333 73 : }
2334 :
2335 74 : return DBRES_OK;
2336 74 : }
2337 :
2338 : // MARK: - Private -
2339 :
2340 234 : bool cloudsync_config_exists (cloudsync_context *data) {
2341 234 : return database_internal_table_exists(data, CLOUDSYNC_SITEID_NAME) == true;
2342 : }
2343 :
2344 713 : bool cloudsync_context_is_initialized (cloudsync_context *data) {
2345 : // A fully initialized context has its persistent "is the DB stale" probe
2346 : // prepared. cloudsync_context_init prepares data_version_stmt (via
2347 : // cloudsync_add_dbvms) only after the cloudsync_site_id table exists, so
2348 : // a non-NULL pointer means cloudsync_init has been called at least once
2349 : // on this connection. Used to produce actionable error messages when
2350 : // callers hit a function before calling cloudsync_init.
2351 713 : return data != NULL && data->data_version_stmt != NULL;
2352 : }
2353 :
2354 249 : cloudsync_context *cloudsync_context_create (void *db) {
2355 249 : cloudsync_context *data = (cloudsync_context *)cloudsync_memory_zeroalloc((uint64_t)(sizeof(cloudsync_context)));
2356 249 : if (!data) return NULL;
2357 : DEBUG_SETTINGS("cloudsync_context_create %p", data);
2358 :
2359 249 : data->libversion = CLOUDSYNC_VERSION;
2360 249 : data->pending_db_version = CLOUDSYNC_VALUE_NOTSET;
2361 : #if CLOUDSYNC_DEBUG
2362 : data->debug = 1;
2363 : #endif
2364 :
2365 : // allocate space for 64 tables (it can grow if needed)
2366 249 : uint64_t mem_needed = (uint64_t)(CLOUDSYNC_INIT_NTABLES * sizeof(cloudsync_table_context *));
2367 249 : data->tables = (cloudsync_table_context **)cloudsync_memory_zeroalloc(mem_needed);
2368 249 : if (!data->tables) {cloudsync_memory_free(data); return NULL;}
2369 :
2370 249 : data->tables_cap = CLOUDSYNC_INIT_NTABLES;
2371 249 : data->tables_count = 0;
2372 249 : data->db = db;
2373 :
2374 : // SQLite exposes col_value as ANY, but other databases require a concrete type.
2375 : // In PostgreSQL we expose col_value as bytea, which holds the pk-encoded value bytes (type + data).
2376 : // Because col_value is already encoded, we skip decoding this field and pass it through as bytea.
2377 : // It is decoded to the target column type just before applying changes to the base table.
2378 249 : data->skip_decode_idx = (db == NULL) ? CLOUDSYNC_PK_INDEX_COLVALUE : -1;
2379 :
2380 249 : return data;
2381 249 : }
2382 :
2383 249 : void cloudsync_context_free (void *ctx) {
2384 249 : cloudsync_context *data = (cloudsync_context *)ctx;
2385 : DEBUG_SETTINGS("cloudsync_context_free %p", data);
2386 249 : if (!data) return;
2387 :
2388 : // free all table contexts and prepared statements
2389 249 : cloudsync_terminate(data);
2390 :
2391 249 : cloudsync_memory_free(data->tables);
2392 249 : cloudsync_memory_free(data);
2393 249 : }
2394 :
2395 344 : const char *cloudsync_context_init (cloudsync_context *data) {
2396 344 : if (!data) return NULL;
2397 :
2398 : // perform init just the first time, if the site_id field is not set.
2399 : // The data->site_id value could exists while settings tables don't exists if the
2400 : // cloudsync_context_init was previously called in init transaction that was rolled back
2401 : // because of an error during the init process.
2402 344 : if (data->site_id[0] == 0 || !database_internal_table_exists(data, CLOUDSYNC_SITEID_NAME)) {
2403 229 : if (dbutils_settings_init(data) != DBRES_OK) return NULL;
2404 229 : if (cloudsync_add_dbvms(data) != DBRES_OK) return NULL;
2405 229 : if (cloudsync_load_siteid(data) != DBRES_OK) return NULL;
2406 229 : data->schema_hash = database_schema_hash(data);
2407 229 : }
2408 :
2409 344 : return (const char *)data->site_id;
2410 344 : }
2411 :
2412 1370 : void cloudsync_sync_key (cloudsync_context *data, const char *key, const char *value) {
2413 : DEBUG_SETTINGS("cloudsync_sync_key key: %s value: %s", key, value);
2414 :
2415 : // sync data
2416 1370 : if (strcmp(key, CLOUDSYNC_KEY_SCHEMAVERSION) == 0) {
2417 229 : data->schema_version = (int)strtol(value, NULL, 0);
2418 229 : return;
2419 : }
2420 :
2421 1141 : if (strcmp(key, CLOUDSYNC_KEY_DEBUG) == 0) {
2422 0 : data->debug = 0;
2423 0 : if (value && (value[0] != 0) && (value[0] != '0')) data->debug = 1;
2424 0 : return;
2425 : }
2426 :
2427 1141 : if (strcmp(key, CLOUDSYNC_KEY_SCHEMA) == 0) {
2428 0 : cloudsync_set_schema(data, value);
2429 0 : return;
2430 : }
2431 1370 : }
2432 :
2433 : #if 0
2434 : void cloudsync_sync_table_key(cloudsync_context *data, const char *table, const char *column, const char *key, const char *value) {
2435 : DEBUG_SETTINGS("cloudsync_sync_table_key table: %s column: %s key: %s value: %s", table, column, key, value);
2436 : // Unused in this version
2437 : return;
2438 : }
2439 : #endif
2440 :
2441 8316 : int cloudsync_commit_hook (void *ctx) {
2442 8316 : cloudsync_context *data = (cloudsync_context *)ctx;
2443 :
2444 8316 : data->db_version = data->pending_db_version;
2445 8316 : data->pending_db_version = CLOUDSYNC_VALUE_NOTSET;
2446 8316 : data->seq = 0;
2447 :
2448 8316 : return DBRES_OK;
2449 : }
2450 :
2451 3 : void cloudsync_rollback_hook (void *ctx) {
2452 3 : cloudsync_context *data = (cloudsync_context *)ctx;
2453 :
2454 3 : data->pending_db_version = CLOUDSYNC_VALUE_NOTSET;
2455 3 : data->seq = 0;
2456 3 : }
2457 :
2458 24 : int cloudsync_begin_alter (cloudsync_context *data, const char *table_name) {
2459 : // init cloudsync_settings
2460 24 : if (cloudsync_context_init(data) == NULL) {
2461 0 : return DBRES_MISUSE;
2462 : }
2463 :
2464 : // lookup table
2465 24 : cloudsync_table_context *table = table_lookup(data, table_name);
2466 24 : if (!table) {
2467 : char buffer[1024];
2468 1 : snprintf(buffer, sizeof(buffer), "Unable to find table %s", table_name);
2469 1 : return cloudsync_set_error(data, buffer, DBRES_MISUSE);
2470 : }
2471 :
2472 : // idempotent: if already altering, return OK
2473 23 : if (table->is_altering) return DBRES_OK;
2474 :
2475 : // retrieve primary key(s)
2476 23 : char **names = NULL;
2477 23 : int nrows = 0;
2478 23 : int rc = database_pk_names(data, table_name, &names, &nrows);
2479 23 : if (rc != DBRES_OK) {
2480 : char buffer[1024];
2481 0 : snprintf(buffer, sizeof(buffer), "Unable to get primary keys for table %s", table_name);
2482 0 : cloudsync_set_error(data, buffer, DBRES_MISUSE);
2483 0 : goto rollback_begin_alter;
2484 : }
2485 :
2486 : // sanity check the number of primary keys
2487 23 : if (nrows != table_count_pks(table)) {
2488 : char buffer[1024];
2489 0 : snprintf(buffer, sizeof(buffer), "Number of primary keys for table %s changed before ALTER", table_name);
2490 0 : cloudsync_set_error(data, buffer, DBRES_MISUSE);
2491 0 : goto rollback_begin_alter;
2492 : }
2493 :
2494 : // drop original triggers
2495 23 : rc = database_delete_triggers(data, table_name);
2496 23 : if (rc != DBRES_OK) {
2497 : char buffer[1024];
2498 0 : snprintf(buffer, sizeof(buffer), "Unable to delete triggers for table %s in cloudsync_begin_alter.", table_name);
2499 0 : cloudsync_set_error(data, buffer, DBRES_ERROR);
2500 0 : goto rollback_begin_alter;
2501 : }
2502 :
2503 23 : table_set_pknames(table, names);
2504 23 : table->is_altering = true;
2505 23 : return DBRES_OK;
2506 :
2507 : rollback_begin_alter:
2508 0 : if (names) table_pknames_free(names, nrows);
2509 0 : return rc;
2510 24 : }
2511 :
2512 23 : int cloudsync_finalize_alter (cloudsync_context *data, cloudsync_table_context *table) {
2513 : // check if dbversion needed to be updated
2514 23 : cloudsync_dbversion_check_uptodate(data);
2515 :
2516 : // if primary-key columns change, all row identities change.
2517 : // In that case, the clock table must be dropped, recreated,
2518 : // and backfilled. We detect this by comparing the unique index
2519 : // in the lookaside table with the source table's PKs.
2520 :
2521 : // retrieve primary keys (to check is they changed)
2522 23 : char **result = NULL;
2523 23 : int nrows = 0;
2524 23 : int rc = database_pk_names (data, table->name, &result, &nrows);
2525 23 : if (rc != DBRES_OK || nrows == 0) {
2526 0 : if (nrows == 0) rc = DBRES_MISUSE;
2527 0 : goto finalize;
2528 : }
2529 :
2530 : // check if there are differences
2531 23 : bool pk_diff = (nrows != table->npks);
2532 23 : if (!pk_diff) {
2533 45 : for (int i = 0; i < nrows; ++i) {
2534 34 : if (strcmp(table->pk_name[i], result[i]) != 0) {
2535 6 : pk_diff = true;
2536 6 : break;
2537 : }
2538 28 : }
2539 17 : }
2540 :
2541 23 : if (pk_diff) {
2542 : // drop meta-table, it will be recreated
2543 12 : char *sql = cloudsync_memory_mprintf(SQL_DROP_CLOUDSYNC_TABLE, table->meta_ref);
2544 12 : rc = database_exec(data, sql);
2545 12 : cloudsync_memory_free(sql);
2546 12 : if (rc != DBRES_OK) {
2547 0 : DEBUG_DBERROR(rc, "cloudsync_finalize_alter", data);
2548 0 : goto finalize;
2549 : }
2550 12 : } else {
2551 : // compact meta-table
2552 : // delete entries for removed columns
2553 11 : const char *schema = table->schema ? table->schema : "";
2554 11 : char *sql = sql_build_delete_cols_not_in_schema_query(schema, table->name, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
2555 11 : rc = database_exec(data, sql);
2556 11 : cloudsync_memory_free(sql);
2557 11 : if (rc != DBRES_OK) {
2558 0 : DEBUG_DBERROR(rc, "cloudsync_finalize_alter", data);
2559 0 : goto finalize;
2560 : }
2561 :
2562 11 : sql = sql_build_pk_qualified_collist_query(schema, table->name);
2563 11 : if (!sql) {rc = DBRES_NOMEM; goto finalize;}
2564 :
2565 11 : char *pkclause = NULL;
2566 11 : rc = database_select_text(data, sql, &pkclause);
2567 11 : cloudsync_memory_free(sql);
2568 11 : if (rc != DBRES_OK) goto finalize;
2569 11 : char *pkvalues = (pkclause) ? pkclause : "rowid";
2570 :
2571 : // delete entries related to rows that no longer exist in the original table, but preserve tombstone
2572 11 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_GC_DELETE_ORPHANED_PK, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE, CLOUDSYNC_TOMBSTONE_VALUE, table->base_ref, table->meta_ref, pkvalues);
2573 11 : rc = database_exec(data, sql);
2574 11 : if (pkclause) cloudsync_memory_free(pkclause);
2575 11 : cloudsync_memory_free(sql);
2576 11 : if (rc != DBRES_OK) {
2577 0 : DEBUG_DBERROR(rc, "cloudsync_finalize_alter", data);
2578 0 : goto finalize;
2579 : }
2580 :
2581 : }
2582 :
2583 : // update key to be later used in cloudsync_dbversion_rebuild
2584 : char buf[256];
2585 23 : snprintf(buf, sizeof(buf), "%" PRId64, data->db_version);
2586 23 : dbutils_settings_set_key_value(data, "pre_alter_dbversion", buf);
2587 :
2588 : finalize:
2589 23 : table_pknames_free(result, nrows);
2590 23 : return rc;
2591 : }
2592 :
2593 24 : int cloudsync_commit_alter (cloudsync_context *data, const char *table_name) {
2594 24 : int rc = DBRES_MISUSE;
2595 24 : cloudsync_table_context *table = NULL;
2596 :
2597 : // init cloudsync_settings
2598 24 : if (cloudsync_context_init(data) == NULL) {
2599 0 : cloudsync_set_error(data, "Unable to initialize cloudsync context", DBRES_MISUSE);
2600 0 : goto rollback_finalize_alter;
2601 : }
2602 :
2603 : // lookup table
2604 24 : table = table_lookup(data, table_name);
2605 24 : if (!table) {
2606 : char buffer[1024];
2607 1 : snprintf(buffer, sizeof(buffer), "Unable to find table %s", table_name);
2608 1 : cloudsync_set_error(data, buffer, DBRES_MISUSE);
2609 1 : goto rollback_finalize_alter;
2610 : }
2611 :
2612 : // idempotent: if not altering, return OK
2613 23 : if (!table->is_altering) return DBRES_OK;
2614 :
2615 23 : rc = cloudsync_finalize_alter(data, table);
2616 23 : if (rc != DBRES_OK) goto rollback_finalize_alter;
2617 :
2618 : // the table is outdated, delete it and it will be reloaded in the cloudsync_init_internal
2619 : // is_altering is reset implicitly because table_free + cloudsync_init_table
2620 : // will reallocate the table context with zero-initialized memory
2621 23 : table_remove(data, table);
2622 23 : table_free(table);
2623 23 : table = NULL;
2624 :
2625 : // init again cloudsync for the table
2626 23 : table_algo algo_current = dbutils_table_settings_get_algo(data, table_name);
2627 23 : if (algo_current == table_algo_none) algo_current = dbutils_table_settings_get_algo(data, "*");
2628 23 : rc = cloudsync_init_table(data, table_name, cloudsync_algo_name(algo_current), CLOUDSYNC_INIT_FLAG_SKIP_INT_PK_CHECK);
2629 23 : if (rc != DBRES_OK) goto rollback_finalize_alter;
2630 :
2631 23 : return DBRES_OK;
2632 :
2633 : rollback_finalize_alter:
2634 1 : if (table) {
2635 0 : table_set_pknames(table, NULL);
2636 0 : table->is_altering = false;
2637 0 : }
2638 1 : return rc;
2639 24 : }
2640 :
2641 : // MARK: - Filter Rewrite -
2642 :
2643 : // Replace bare column names in a filter expression with prefix-qualified names.
2644 : // E.g., filter="user_id = 42", prefix="NEW", columns=["user_id","id"] → "NEW.\"user_id\" = 42"
2645 : // Columns must be sorted by length descending by the caller to avoid partial matches.
2646 : // Skips content inside single-quoted string literals.
2647 : // Returns a newly allocated string (caller must free with cloudsync_memory_free), or NULL on error.
2648 : // Helper: check if an identifier token matches a column name.
2649 112 : static bool filter_is_column (const char *token, size_t token_len, char **columns, int ncols) {
2650 450 : for (int i = 0; i < ncols; ++i) {
2651 388 : if (strlen(columns[i]) == token_len && strncmp(token, columns[i], token_len) == 0)
2652 50 : return true;
2653 338 : }
2654 62 : return false;
2655 112 : }
2656 :
2657 : // Helper: check if character is part of a SQL identifier.
2658 796 : static bool filter_is_ident_char (char c) {
2659 1262 : return (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') ||
2660 466 : (c >= '0' && c <= '9') || c == '_';
2661 : }
2662 :
2663 40 : char *cloudsync_filter_add_row_prefix (const char *filter, const char *prefix, char **columns, int ncols) {
2664 40 : if (!filter || !prefix || !columns || ncols <= 0) return NULL;
2665 :
2666 40 : size_t filter_len = strlen(filter);
2667 40 : size_t prefix_len = strlen(prefix);
2668 :
2669 : // Each identifier match grows by at most (prefix_len + 3) bytes.
2670 : // Worst case: the entire filter is one repeated column reference separated by
2671 : // single characters, so up to (filter_len / 2) matches. Use a safe upper bound.
2672 40 : size_t max_growth = (filter_len / 2 + 1) * (prefix_len + 3);
2673 40 : size_t cap = filter_len + max_growth + 64;
2674 40 : char *result = (char *)cloudsync_memory_alloc(cap);
2675 40 : if (!result) return NULL;
2676 40 : size_t out = 0;
2677 :
2678 : // Single pass: tokenize into identifiers, quoted strings, and everything else.
2679 40 : size_t i = 0;
2680 336 : while (i < filter_len) {
2681 : // Skip single-quoted string literals verbatim (handle '' escape)
2682 296 : if (filter[i] == '\'') {
2683 6 : result[out++] = filter[i++];
2684 38 : while (i < filter_len) {
2685 38 : if (filter[i] == '\'') {
2686 6 : result[out++] = filter[i++];
2687 : // '' is an escaped quote — keep going
2688 6 : if (i < filter_len && filter[i] == '\'') {
2689 0 : result[out++] = filter[i++];
2690 0 : continue;
2691 : }
2692 6 : break; // single ' ends the literal
2693 : }
2694 32 : result[out++] = filter[i++];
2695 : }
2696 6 : continue;
2697 : }
2698 :
2699 : // Extract identifier token
2700 290 : if (filter_is_ident_char(filter[i])) {
2701 112 : size_t start = i;
2702 540 : while (i < filter_len && filter_is_ident_char(filter[i])) ++i;
2703 112 : size_t token_len = i - start;
2704 :
2705 112 : if (filter_is_column(&filter[start], token_len, columns, ncols)) {
2706 : // Emit PREFIX."column_name"
2707 50 : memcpy(&result[out], prefix, prefix_len); out += prefix_len;
2708 50 : result[out++] = '.';
2709 50 : result[out++] = '"';
2710 50 : memcpy(&result[out], &filter[start], token_len); out += token_len;
2711 50 : result[out++] = '"';
2712 50 : } else {
2713 : // Not a column — copy as-is
2714 62 : memcpy(&result[out], &filter[start], token_len); out += token_len;
2715 : }
2716 112 : continue;
2717 : }
2718 :
2719 : // Any other character — copy as-is
2720 178 : result[out++] = filter[i++];
2721 : }
2722 :
2723 40 : result[out] = '\0';
2724 40 : return result;
2725 40 : }
2726 :
2727 24 : int cloudsync_reset_metatable (cloudsync_context *data, const char *table_name) {
2728 24 : cloudsync_table_context *table = table_lookup(data, table_name);
2729 24 : if (!table) return DBRES_ERROR;
2730 :
2731 24 : char *sql = cloudsync_memory_mprintf(SQL_DELETE_ALL_FROM_CLOUDSYNC_TABLE, table->meta_ref);
2732 24 : int rc = database_exec(data, sql);
2733 24 : cloudsync_memory_free(sql);
2734 24 : if (rc != DBRES_OK) return rc;
2735 :
2736 24 : return cloudsync_refill_metatable(data, table_name);
2737 24 : }
2738 :
2739 308 : int cloudsync_refill_metatable (cloudsync_context *data, const char *table_name) {
2740 308 : cloudsync_table_context *table = table_lookup(data, table_name);
2741 308 : if (!table) return DBRES_ERROR;
2742 :
2743 308 : dbvm_t *vm = NULL;
2744 308 : int64_t db_version = cloudsync_dbversion_next(data, CLOUDSYNC_VALUE_NOTSET);
2745 :
2746 : // Read row-level filter from settings (if any)
2747 : char filter_buf[2048];
2748 308 : int frc = dbutils_table_settings_get_value(data, table_name, "*", "filter", filter_buf, sizeof(filter_buf));
2749 308 : const char *filter = (frc == DBRES_OK && filter_buf[0]) ? filter_buf : NULL;
2750 :
2751 308 : const char *schema = table->schema ? table->schema : "";
2752 308 : char *sql = sql_build_pk_collist_query(schema, table_name);
2753 308 : char *pkclause_identifiers = NULL;
2754 308 : int rc = database_select_text(data, sql, &pkclause_identifiers);
2755 308 : cloudsync_memory_free(sql);
2756 308 : if (rc != DBRES_OK) goto finalize;
2757 308 : char *pkvalues_identifiers = (pkclause_identifiers) ? pkclause_identifiers : "rowid";
2758 :
2759 : // Use database-specific query builder to handle type differences in composite PKs
2760 308 : sql = sql_build_insert_missing_pks_query(schema, table_name, pkvalues_identifiers, table->base_ref, table->meta_ref, filter);
2761 308 : if (!sql) {rc = DBRES_NOMEM; goto finalize;}
2762 308 : rc = database_exec(data, sql);
2763 308 : cloudsync_memory_free(sql);
2764 308 : if (rc != DBRES_OK) goto finalize;
2765 :
2766 : // fill missing colums
2767 : // for each non-pk column:
2768 : // The new query does 1 encode per source row and one indexed NOT-EXISTS probe.
2769 : // The old plan does many decodes per candidate and can't use an index to rule out matches quickly—so it burns CPU and I/O.
2770 :
2771 308 : if (filter) {
2772 20 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_PKS_NOT_IN_SYNC_FOR_COL_FILTERED, pkvalues_identifiers, table->base_ref, filter, table->meta_ref);
2773 20 : } else {
2774 288 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_PKS_NOT_IN_SYNC_FOR_COL, pkvalues_identifiers, table->base_ref, table->meta_ref);
2775 : }
2776 308 : rc = databasevm_prepare(data, sql, (void **)&vm, DBFLAG_PERSISTENT);
2777 308 : cloudsync_memory_free(sql);
2778 308 : if (rc != DBRES_OK) goto finalize;
2779 :
2780 1457 : for (int i=0; i<table->ncols; ++i) {
2781 1149 : char *col_name = table->col_name[i];
2782 :
2783 1149 : rc = databasevm_bind_text(vm, 1, col_name, -1);
2784 1149 : if (rc != DBRES_OK) goto finalize;
2785 :
2786 1187 : while (1) {
2787 1187 : rc = databasevm_step(vm);
2788 1187 : if (rc == DBRES_ROW) {
2789 38 : size_t pklen = 0;
2790 38 : const void *pk = (const char *)database_column_blob(vm, 0, &pklen);
2791 38 : if (!pk) { rc = DBRES_ERROR; break; }
2792 38 : rc = local_mark_insert_or_update_meta(table, pk, pklen, col_name, db_version, cloudsync_bumpseq(data));
2793 1187 : } else if (rc == DBRES_DONE) {
2794 1149 : rc = DBRES_OK;
2795 1149 : break;
2796 : } else {
2797 0 : break;
2798 : }
2799 : }
2800 1149 : if (rc != DBRES_OK) goto finalize;
2801 :
2802 1149 : databasevm_reset(vm);
2803 1457 : }
2804 :
2805 : finalize:
2806 308 : if (rc != DBRES_OK) {DEBUG_ALWAYS("cloudsync_refill_metatable error: %s", database_errmsg(data));}
2807 308 : if (pkclause_identifiers) cloudsync_memory_free(pkclause_identifiers);
2808 308 : if (vm) databasevm_finalize(vm);
2809 308 : return rc;
2810 308 : }
2811 :
2812 : // MARK: - Local -
2813 :
2814 4 : int local_update_sentinel (cloudsync_table_context *table, const void *pk, size_t pklen, int64_t db_version, int seq) {
2815 4 : dbvm_t *vm = table->meta_sentinel_update_stmt;
2816 4 : if (!vm) return -1;
2817 :
2818 4 : int rc = databasevm_bind_int(vm, 1, db_version);
2819 4 : if (rc != DBRES_OK) goto cleanup;
2820 :
2821 4 : rc = databasevm_bind_int(vm, 2, seq);
2822 4 : if (rc != DBRES_OK) goto cleanup;
2823 :
2824 4 : rc = databasevm_bind_blob(vm, 3, pk, (int)pklen);
2825 4 : if (rc != DBRES_OK) goto cleanup;
2826 :
2827 4 : rc = databasevm_step(vm);
2828 4 : if (rc == DBRES_DONE) rc = DBRES_OK;
2829 :
2830 : cleanup:
2831 4 : DEBUG_DBERROR(rc, "local_update_sentinel", table->context);
2832 4 : databasevm_reset(vm);
2833 4 : return rc;
2834 4 : }
2835 :
2836 125 : int local_mark_insert_sentinel_meta (cloudsync_table_context *table, const void *pk, size_t pklen, int64_t db_version, int seq) {
2837 125 : dbvm_t *vm = table->meta_sentinel_insert_stmt;
2838 125 : if (!vm) return -1;
2839 :
2840 125 : int rc = databasevm_bind_blob(vm, 1, pk, (int)pklen);
2841 125 : if (rc != DBRES_OK) goto cleanup;
2842 :
2843 125 : rc = databasevm_bind_int(vm, 2, db_version);
2844 125 : if (rc != DBRES_OK) goto cleanup;
2845 :
2846 125 : rc = databasevm_bind_int(vm, 3, seq);
2847 125 : if (rc != DBRES_OK) goto cleanup;
2848 :
2849 125 : rc = databasevm_bind_int(vm, 4, db_version);
2850 125 : if (rc != DBRES_OK) goto cleanup;
2851 :
2852 125 : rc = databasevm_bind_int(vm, 5, seq);
2853 125 : if (rc != DBRES_OK) goto cleanup;
2854 :
2855 125 : rc = databasevm_step(vm);
2856 125 : if (rc == DBRES_DONE) rc = DBRES_OK;
2857 :
2858 : cleanup:
2859 125 : DEBUG_DBERROR(rc, "local_insert_sentinel", table->context);
2860 125 : databasevm_reset(vm);
2861 125 : return rc;
2862 125 : }
2863 :
2864 11951 : int local_mark_insert_or_update_meta_impl (cloudsync_table_context *table, const void *pk, size_t pklen, const char *col_name, int col_version, int64_t db_version, int seq) {
2865 :
2866 11951 : dbvm_t *vm = table->meta_row_insert_update_stmt;
2867 11951 : if (!vm) return -1;
2868 :
2869 11951 : int rc = databasevm_bind_blob(vm, 1, pk, pklen);
2870 11951 : if (rc != DBRES_OK) goto cleanup;
2871 :
2872 11951 : rc = databasevm_bind_text(vm, 2, (col_name) ? col_name : CLOUDSYNC_TOMBSTONE_VALUE, -1);
2873 11951 : if (rc != DBRES_OK) goto cleanup;
2874 :
2875 11951 : rc = databasevm_bind_int(vm, 3, col_version);
2876 11951 : if (rc != DBRES_OK) goto cleanup;
2877 :
2878 11951 : rc = databasevm_bind_int(vm, 4, db_version);
2879 11951 : if (rc != DBRES_OK) goto cleanup;
2880 :
2881 11951 : rc = databasevm_bind_int(vm, 5, seq);
2882 11951 : if (rc != DBRES_OK) goto cleanup;
2883 :
2884 11951 : rc = databasevm_bind_int(vm, 6, db_version);
2885 11951 : if (rc != DBRES_OK) goto cleanup;
2886 :
2887 11951 : rc = databasevm_bind_int(vm, 7, seq);
2888 11951 : if (rc != DBRES_OK) goto cleanup;
2889 :
2890 11951 : rc = databasevm_step(vm);
2891 11951 : if (rc == DBRES_DONE) rc = DBRES_OK;
2892 :
2893 : cleanup:
2894 11951 : DEBUG_DBERROR(rc, "local_insert_or_update", table->context);
2895 11951 : databasevm_reset(vm);
2896 11951 : return rc;
2897 11951 : }
2898 :
2899 11846 : int local_mark_insert_or_update_meta (cloudsync_table_context *table, const void *pk, size_t pklen, const char *col_name, int64_t db_version, int seq) {
2900 11846 : return local_mark_insert_or_update_meta_impl(table, pk, pklen, col_name, 1, db_version, seq);
2901 : }
2902 :
2903 41 : int local_mark_delete_block_meta (cloudsync_table_context *table, const void *pk, size_t pklen, const char *block_colname, int64_t db_version, int seq) {
2904 : // Mark a block as deleted by setting col_version = 2 (even = deleted)
2905 41 : return local_mark_insert_or_update_meta_impl(table, pk, pklen, block_colname, 2, db_version, seq);
2906 : }
2907 :
2908 41 : int block_delete_value_external (cloudsync_context *data, cloudsync_table_context *table, const void *pk, size_t pklen, const char *block_colname) {
2909 41 : return block_delete_value(data, table, pk, (int)pklen, block_colname);
2910 : }
2911 :
2912 64 : int local_mark_delete_meta (cloudsync_table_context *table, const void *pk, size_t pklen, int64_t db_version, int seq) {
2913 64 : return local_mark_insert_or_update_meta_impl(table, pk, pklen, NULL, 2, db_version, seq);
2914 : }
2915 :
2916 36 : int local_drop_meta (cloudsync_table_context *table, const void *pk, size_t pklen) {
2917 36 : dbvm_t *vm = table->meta_row_drop_stmt;
2918 36 : if (!vm) return -1;
2919 :
2920 36 : int rc = databasevm_bind_blob(vm, 1, pk, pklen);
2921 36 : if (rc != DBRES_OK) goto cleanup;
2922 :
2923 36 : rc = databasevm_step(vm);
2924 36 : if (rc == DBRES_DONE) rc = DBRES_OK;
2925 :
2926 : cleanup:
2927 36 : DEBUG_DBERROR(rc, "local_drop_meta", table->context);
2928 36 : databasevm_reset(vm);
2929 36 : return rc;
2930 36 : }
2931 :
2932 28 : int local_update_move_meta (cloudsync_table_context *table, const void *pk, size_t pklen, const void *pk2, size_t pklen2, int64_t db_version) {
2933 : /*
2934 : * This function moves non-sentinel metadata entries from an old primary key (OLD.pk)
2935 : * to a new primary key (NEW.pk) when a primary key change occurs.
2936 : *
2937 : * To ensure consistency and proper conflict resolution in a CRDT (Conflict-free Replicated Data Type) system,
2938 : * each non-sentinel metadata entry involved in the move must have a unique sequence value (seq).
2939 : *
2940 : * The `seq` is crucial for tracking the order of operations and for detecting and resolving conflicts
2941 : * during synchronization between replicas. Without a unique `seq` for each entry, concurrent updates
2942 : * may be applied incorrectly, leading to data inconsistency.
2943 : *
2944 : * When performing the update, a unique `seq` must be assigned to each metadata row. This can be achieved
2945 : * by either incrementing the maximum sequence value in the table or using a function (e.g., cloudsync_bumpseq(data))
2946 : * that generates a unique sequence for each row. The update query should ensure that each row moved
2947 : * from OLD.pk to NEW.pk gets a distinct `seq` to maintain proper versioning and ordering of changes.
2948 : */
2949 :
2950 : // see https://github.com/sqliteai/sqlite-sync/blob/main/docs/PriKey.md for more details
2951 : // pk2 is the old pk
2952 :
2953 28 : dbvm_t *vm = table->meta_update_move_stmt;
2954 28 : if (!vm) return -1;
2955 :
2956 : // new primary key
2957 28 : int rc = databasevm_bind_blob(vm, 1, pk, pklen);
2958 28 : if (rc != DBRES_OK) goto cleanup;
2959 :
2960 : // new db_version
2961 28 : rc = databasevm_bind_int(vm, 2, db_version);
2962 28 : if (rc != DBRES_OK) goto cleanup;
2963 :
2964 : // old primary key
2965 28 : rc = databasevm_bind_blob(vm, 3, pk2, pklen2);
2966 28 : if (rc != DBRES_OK) goto cleanup;
2967 :
2968 28 : rc = databasevm_step(vm);
2969 28 : if (rc == DBRES_DONE) rc = DBRES_OK;
2970 :
2971 : cleanup:
2972 28 : DEBUG_DBERROR(rc, "local_update_move_meta", table->context);
2973 28 : databasevm_reset(vm);
2974 28 : return rc;
2975 28 : }
2976 :
2977 : // MARK: - Payload Encode / Decode -
2978 :
2979 687 : static void cloudsync_payload_checksum_store (cloudsync_payload_header *header, uint64_t checksum) {
2980 687 : uint64_t h = checksum & 0xFFFFFFFFFFFFULL; // keep 48 bits
2981 687 : header->checksum[0] = (uint8_t)(h >> 40);
2982 687 : header->checksum[1] = (uint8_t)(h >> 32);
2983 687 : header->checksum[2] = (uint8_t)(h >> 24);
2984 687 : header->checksum[3] = (uint8_t)(h >> 16);
2985 687 : header->checksum[4] = (uint8_t)(h >> 8);
2986 687 : header->checksum[5] = (uint8_t)(h >> 0);
2987 687 : }
2988 :
2989 682 : static uint64_t cloudsync_payload_checksum_load (cloudsync_payload_header *header) {
2990 2046 : return ((uint64_t)header->checksum[0] << 40) |
2991 1364 : ((uint64_t)header->checksum[1] << 32) |
2992 1364 : ((uint64_t)header->checksum[2] << 24) |
2993 1364 : ((uint64_t)header->checksum[3] << 16) |
2994 1364 : ((uint64_t)header->checksum[4] << 8) |
2995 682 : ((uint64_t)header->checksum[5] << 0);
2996 : }
2997 :
2998 682 : static bool cloudsync_payload_checksum_verify (cloudsync_payload_header *header, uint64_t checksum) {
2999 682 : uint64_t checksum1 = cloudsync_payload_checksum_load(header);
3000 682 : uint64_t checksum2 = checksum & 0xFFFFFFFFFFFFULL;
3001 682 : return (checksum1 == checksum2);
3002 : }
3003 :
3004 49184 : static bool cloudsync_payload_encode_check (cloudsync_payload_context *payload, size_t needed) {
3005 49184 : if (payload->nrows == 0) needed += sizeof(cloudsync_payload_header);
3006 :
3007 : // alloc/resize buffer
3008 49184 : if (payload->bused + needed > payload->balloc) {
3009 696 : if (needed < CLOUDSYNC_PAYLOAD_MINBUF_SIZE) needed = CLOUDSYNC_PAYLOAD_MINBUF_SIZE;
3010 696 : size_t balloc = payload->balloc + needed;
3011 :
3012 696 : char *buffer = cloudsync_memory_realloc(payload->buffer, balloc);
3013 696 : if (!buffer) {
3014 0 : if (payload->buffer) cloudsync_memory_free(payload->buffer);
3015 0 : memset(payload, 0, sizeof(cloudsync_payload_context));
3016 0 : return false;
3017 : }
3018 :
3019 696 : payload->buffer = buffer;
3020 696 : payload->balloc = balloc;
3021 696 : if (payload->nrows == 0) payload->bused = sizeof(cloudsync_payload_header);
3022 696 : }
3023 :
3024 49184 : return true;
3025 49184 : }
3026 :
3027 50567 : size_t cloudsync_payload_context_size (size_t *header_size) {
3028 50567 : if (header_size) *header_size = sizeof(cloudsync_payload_header);
3029 50567 : return sizeof(cloudsync_payload_context);
3030 : }
3031 :
3032 687 : void cloudsync_payload_header_init (cloudsync_payload_header *header, uint32_t expanded_size, uint16_t ncols, uint32_t nrows, uint64_t hash) {
3033 687 : memset(header, 0, sizeof(cloudsync_payload_header));
3034 : assert(sizeof(cloudsync_payload_header)==32);
3035 :
3036 : int major, minor, patch;
3037 687 : sscanf(CLOUDSYNC_VERSION, "%d.%d.%d", &major, &minor, &patch);
3038 :
3039 687 : header->signature = htonl(CLOUDSYNC_PAYLOAD_SIGNATURE);
3040 687 : header->version = CLOUDSYNC_PAYLOAD_VERSION_2;
3041 687 : header->libversion[0] = (uint8_t)major;
3042 687 : header->libversion[1] = (uint8_t)minor;
3043 687 : header->libversion[2] = (uint8_t)patch;
3044 687 : header->expanded_size = htonl(expanded_size);
3045 687 : header->ncols = htons(ncols);
3046 687 : header->nrows = htonl(nrows);
3047 687 : header->schema_hash = htonll(hash);
3048 687 : }
3049 :
3050 49184 : int cloudsync_payload_encode_step (cloudsync_payload_context *payload, cloudsync_context *data, int argc, dbvalue_t **argv) {
3051 : DEBUG_FUNCTION("cloudsync_payload_encode_step");
3052 : // debug_values(argc, argv);
3053 :
3054 : // check if the step function is called for the first time
3055 49184 : if (payload->nrows == 0) payload->ncols = (uint16_t)argc;
3056 :
3057 49184 : size_t breq = pk_encode_size((dbvalue_t **)argv, argc, 0, data->skip_decode_idx);
3058 49184 : if (cloudsync_payload_encode_check(payload, breq) == false) {
3059 0 : return cloudsync_set_error(data, "Not enough memory to resize payload internal buffer", DBRES_NOMEM);
3060 : }
3061 :
3062 49184 : char *buffer = payload->buffer + payload->bused;
3063 49184 : size_t bsize = payload->balloc - payload->bused;
3064 49184 : char *p = pk_encode((dbvalue_t **)argv, argc, buffer, false, &bsize, data->skip_decode_idx);
3065 49184 : if (!p) return cloudsync_set_error(data, "An error occurred while encoding payload", DBRES_ERROR);
3066 :
3067 : // update buffer
3068 49184 : payload->bused += breq;
3069 :
3070 : // increment row counter
3071 49184 : ++payload->nrows;
3072 :
3073 49184 : return DBRES_OK;
3074 49184 : }
3075 :
3076 695 : int cloudsync_payload_encode_final (cloudsync_payload_context *payload, cloudsync_context *data) {
3077 : DEBUG_FUNCTION("cloudsync_payload_encode_final");
3078 :
3079 695 : if (payload->nrows == 0) {
3080 8 : if (payload->buffer) cloudsync_memory_free(payload->buffer);
3081 8 : payload->buffer = NULL;
3082 8 : payload->bsize = 0;
3083 8 : return DBRES_OK;
3084 : }
3085 :
3086 687 : if (payload->nrows > UINT32_MAX) {
3087 0 : if (payload->buffer) cloudsync_memory_free(payload->buffer);
3088 0 : payload->buffer = NULL;
3089 0 : payload->bsize = 0;
3090 0 : cloudsync_set_error(data, "Maximum number of payload rows reached", DBRES_ERROR);
3091 0 : return DBRES_ERROR;
3092 : }
3093 :
3094 : // sanity check about buffer size
3095 687 : int header_size = (int)sizeof(cloudsync_payload_header);
3096 687 : int64_t buffer_size = (int64_t)payload->bused - (int64_t)header_size;
3097 687 : if (buffer_size < 0) {
3098 0 : if (payload->buffer) cloudsync_memory_free(payload->buffer);
3099 0 : payload->buffer = NULL;
3100 0 : payload->bsize = 0;
3101 0 : cloudsync_set_error(data, "cloudsync_encode: internal size underflow", DBRES_ERROR);
3102 0 : return DBRES_ERROR;
3103 : }
3104 687 : if (buffer_size > INT_MAX) {
3105 0 : if (payload->buffer) cloudsync_memory_free(payload->buffer);
3106 0 : payload->buffer = NULL;
3107 0 : payload->bsize = 0;
3108 0 : cloudsync_set_error(data, "cloudsync_encode: payload too large to compress (INT_MAX limit)", DBRES_ERROR);
3109 0 : return DBRES_ERROR;
3110 : }
3111 : // try to allocate buffer used for compressed data
3112 687 : int real_buffer_size = (int)buffer_size;
3113 687 : int zbound = LZ4_compressBound(real_buffer_size);
3114 687 : char *zbuffer = cloudsync_memory_alloc(zbound + header_size); // if for some reasons allocation fails then just skip compression
3115 :
3116 : // skip the reserved header from the buffer to compress
3117 687 : char *src_buffer = payload->buffer + sizeof(cloudsync_payload_header);
3118 687 : int zused = (zbuffer) ? LZ4_compress_default(src_buffer, zbuffer+header_size, real_buffer_size, zbound) : 0;
3119 687 : bool use_uncompressed_buffer = (!zused || zused > real_buffer_size);
3120 687 : CHECK_FORCE_UNCOMPRESSED_BUFFER();
3121 :
3122 : // setup payload header
3123 687 : cloudsync_payload_header header = {0};
3124 687 : uint32_t expanded_size = (use_uncompressed_buffer) ? 0 : real_buffer_size;
3125 687 : cloudsync_payload_header_init(&header, expanded_size, payload->ncols, (uint32_t)payload->nrows, data->schema_hash);
3126 :
3127 : // if compression fails or if compressed size is bigger than original buffer, then use the uncompressed buffer
3128 687 : if (use_uncompressed_buffer) {
3129 16 : if (zbuffer) cloudsync_memory_free(zbuffer);
3130 16 : zbuffer = payload->buffer;
3131 16 : zused = real_buffer_size;
3132 16 : }
3133 :
3134 : // compute checksum of the buffer
3135 687 : uint64_t checksum = pk_checksum(zbuffer + header_size, zused);
3136 687 : cloudsync_payload_checksum_store(&header, checksum);
3137 :
3138 : // copy header and data to SQLite BLOB
3139 687 : memcpy(zbuffer, &header, sizeof(cloudsync_payload_header));
3140 687 : int blob_size = zused + sizeof(cloudsync_payload_header);
3141 687 : payload->bsize = blob_size;
3142 :
3143 : // cleanup memory
3144 687 : if (zbuffer != payload->buffer) {
3145 671 : cloudsync_memory_free (payload->buffer);
3146 671 : payload->buffer = zbuffer;
3147 671 : }
3148 :
3149 687 : return DBRES_OK;
3150 695 : }
3151 :
3152 695 : char *cloudsync_payload_blob (cloudsync_payload_context *payload, int64_t *blob_size, int64_t *nrows) {
3153 : DEBUG_FUNCTION("cloudsync_payload_blob");
3154 :
3155 695 : if (blob_size) *blob_size = (int64_t)payload->bsize;
3156 695 : if (nrows) *nrows = (int64_t)payload->nrows;
3157 695 : return payload->buffer;
3158 : }
3159 :
3160 441936 : static int cloudsync_payload_decode_callback (void *xdata, int index, int type, int64_t ival, double dval, char *pval) {
3161 441936 : cloudsync_pk_decode_bind_context *decode_context = (cloudsync_pk_decode_bind_context*)xdata;
3162 441936 : int rc = pk_decode_bind_callback(decode_context->vm, index, type, ival, dval, pval);
3163 :
3164 441936 : if (rc == DBRES_OK) {
3165 : // the dbversion index is smaller than seq index, so it is processed first
3166 : // when processing the dbversion column: save the value to the tmp_dbversion field
3167 : // when processing the seq column: update the dbversion and seq fields only if the current dbversion is greater than the last max value
3168 441936 : switch (index) {
3169 : case CLOUDSYNC_PK_INDEX_TBL:
3170 49104 : if (type == DBTYPE_TEXT) {
3171 49104 : decode_context->tbl = pval;
3172 49104 : decode_context->tbl_len = ival;
3173 49104 : }
3174 49104 : break;
3175 : case CLOUDSYNC_PK_INDEX_PK:
3176 49104 : if (type == DBTYPE_BLOB) {
3177 49104 : decode_context->pk = pval;
3178 49104 : decode_context->pk_len = ival;
3179 49104 : }
3180 49104 : break;
3181 : case CLOUDSYNC_PK_INDEX_COLNAME:
3182 49104 : if (type == DBTYPE_TEXT) {
3183 49104 : decode_context->col_name = pval;
3184 49104 : decode_context->col_name_len = ival;
3185 49104 : }
3186 49104 : break;
3187 : case CLOUDSYNC_PK_INDEX_COLVERSION:
3188 49104 : if (type == DBTYPE_INTEGER) decode_context->col_version = ival;
3189 49104 : break;
3190 : case CLOUDSYNC_PK_INDEX_DBVERSION:
3191 49104 : if (type == DBTYPE_INTEGER) decode_context->db_version = ival;
3192 49104 : break;
3193 : case CLOUDSYNC_PK_INDEX_SITEID:
3194 49104 : if (type == DBTYPE_BLOB) {
3195 49104 : decode_context->site_id = pval;
3196 49104 : decode_context->site_id_len = ival;
3197 49104 : }
3198 49104 : break;
3199 : case CLOUDSYNC_PK_INDEX_CL:
3200 49104 : if (type == DBTYPE_INTEGER) decode_context->cl = ival;
3201 49104 : break;
3202 : case CLOUDSYNC_PK_INDEX_SEQ:
3203 49104 : if (type == DBTYPE_INTEGER) decode_context->seq = ival;
3204 49104 : break;
3205 : }
3206 441936 : }
3207 :
3208 441936 : return rc;
3209 : }
3210 :
3211 : // #ifndef CLOUDSYNC_OMIT_RLS_VALIDATION
3212 :
3213 685 : int cloudsync_payload_apply (cloudsync_context *data, const char *payload, int blen, int *pnrows) {
3214 : // Guard against calling payload_apply before cloudsync_init: without this,
3215 : // the settings lookups at the top of this function would each emit a
3216 : // "no such table: cloudsync_settings" debug line, control would fall
3217 : // through to the meta-table insert, and the function would ultimately
3218 : // return an error with an empty errmsg — SQLite then surfaces that as
3219 : // the confusing "Runtime error: not an error".
3220 685 : if (!cloudsync_context_is_initialized(data)) {
3221 1 : return cloudsync_set_error(data,
3222 : "cloudsync is not initialized: call SELECT cloudsync_init('<table_name>') "
3223 : "to enable sync on a table before calling cloudsync_payload_apply().",
3224 : DBRES_MISUSE);
3225 : }
3226 :
3227 : // sanity check
3228 684 : if (blen < (int)sizeof(cloudsync_payload_header)) return cloudsync_set_error(data, "Error on cloudsync_payload_apply: invalid payload length", DBRES_MISUSE);
3229 :
3230 : // decode header
3231 : cloudsync_payload_header header;
3232 684 : memcpy(&header, payload, sizeof(cloudsync_payload_header));
3233 :
3234 684 : header.signature = ntohl(header.signature);
3235 684 : header.expanded_size = ntohl(header.expanded_size);
3236 684 : header.ncols = ntohs(header.ncols);
3237 684 : header.nrows = ntohl(header.nrows);
3238 684 : header.schema_hash = ntohll(header.schema_hash);
3239 :
3240 : // compare schema_hash only if not disabled and if the received payload was created with the current header version
3241 : // to avoid schema hash mismatch when processed by a peer with a different extension version during software updates.
3242 684 : if (dbutils_settings_get_int64_value(data, CLOUDSYNC_KEY_SKIP_SCHEMA_HASH_CHECK) == 0 && header.version == CLOUDSYNC_PAYLOAD_VERSION_LATEST ) {
3243 684 : if (header.schema_hash != data->schema_hash) {
3244 4 : if (!database_check_schema_hash(data, header.schema_hash)) {
3245 : char buffer[1024];
3246 2 : snprintf(buffer, sizeof(buffer), "Cannot apply the received payload because the schema hash is unknown %llu.", header.schema_hash);
3247 2 : return cloudsync_set_error(data, buffer, DBRES_MISUSE);
3248 : }
3249 2 : }
3250 682 : }
3251 :
3252 : // sanity check header
3253 682 : if ((header.signature != CLOUDSYNC_PAYLOAD_SIGNATURE) || (header.ncols == 0)) {
3254 0 : return cloudsync_set_error(data, "Error on cloudsync_payload_apply: invalid signature or column size", DBRES_MISUSE);
3255 : }
3256 :
3257 682 : const char *buffer = payload + sizeof(cloudsync_payload_header);
3258 682 : size_t buf_len = (size_t)blen - sizeof(cloudsync_payload_header);
3259 :
3260 : // sanity check checksum (only if version is >= 2)
3261 682 : if (header.version >= CLOUDSYNC_PAYLOAD_MIN_VERSION_WITH_CHECKSUM) {
3262 682 : uint64_t checksum = pk_checksum(buffer, buf_len);
3263 682 : if (cloudsync_payload_checksum_verify(&header, checksum) == false) {
3264 1 : return cloudsync_set_error(data, "Error on cloudsync_payload_apply: invalid checksum", DBRES_MISUSE);
3265 : }
3266 681 : }
3267 :
3268 : // check if payload is compressed
3269 681 : char *clone = NULL;
3270 681 : if (header.expanded_size != 0) {
3271 665 : clone = (char *)cloudsync_memory_alloc(header.expanded_size);
3272 665 : if (!clone) return cloudsync_set_error(data, "Unable to allocate memory to uncompress payload", DBRES_NOMEM);
3273 :
3274 665 : int lz4_rc = LZ4_decompress_safe(buffer, clone, (int)buf_len, (int)header.expanded_size);
3275 665 : if (lz4_rc <= 0 || (uint32_t)lz4_rc != header.expanded_size) {
3276 0 : if (clone) cloudsync_memory_free(clone);
3277 0 : return cloudsync_set_error(data, "Error on cloudsync_payload_apply: unable to decompress BLOB", DBRES_MISUSE);
3278 : }
3279 :
3280 665 : buffer = (const char *)clone;
3281 665 : buf_len = (size_t)header.expanded_size;
3282 665 : }
3283 :
3284 : // precompile the insert statement
3285 681 : dbvm_t *vm = NULL;
3286 681 : int rc = databasevm_prepare(data, SQL_CHANGES_INSERT_ROW, &vm, 0);
3287 681 : if (rc != DBRES_OK) {
3288 0 : if (clone) cloudsync_memory_free(clone);
3289 0 : return cloudsync_set_error(data, "Error on cloudsync_payload_apply: error while compiling SQL statement", rc);
3290 : }
3291 :
3292 : // process buffer, one row at a time
3293 681 : uint16_t ncols = header.ncols;
3294 681 : uint32_t nrows = header.nrows;
3295 681 : int64_t last_payload_db_version = -1;
3296 681 : int dbversion = dbutils_settings_get_int_value(data, CLOUDSYNC_KEY_CHECK_DBVERSION);
3297 681 : int seq = dbutils_settings_get_int_value(data, CLOUDSYNC_KEY_CHECK_SEQ);
3298 681 : cloudsync_pk_decode_bind_context decoded_context = {.vm = vm};
3299 :
3300 : // Initialize deferred column-batch merge
3301 681 : merge_pending_batch batch = {0};
3302 681 : data->pending_batch = &batch;
3303 681 : bool in_savepoint = false;
3304 681 : const void *last_pk = NULL;
3305 681 : int64_t last_pk_len = 0;
3306 681 : const char *last_tbl = NULL;
3307 681 : int64_t last_tbl_len = 0;
3308 :
3309 49785 : for (uint32_t i=0; i<nrows; ++i) {
3310 49104 : size_t seek = 0;
3311 49104 : int res = pk_decode((char *)buffer, buf_len, ncols, &seek, data->skip_decode_idx, cloudsync_payload_decode_callback, &decoded_context);
3312 49104 : if (res == -1) {
3313 0 : merge_flush_pending(data);
3314 0 : data->pending_batch = NULL;
3315 0 : if (batch.cached_vm) { databasevm_finalize(batch.cached_vm); batch.cached_vm = NULL; }
3316 0 : if (batch.cached_col_names) { cloudsync_memory_free(batch.cached_col_names); batch.cached_col_names = NULL; }
3317 0 : if (batch.entries) { cloudsync_memory_free(batch.entries); batch.entries = NULL; }
3318 0 : if (in_savepoint) database_rollback_savepoint(data, "cloudsync_payload_apply");
3319 0 : rc = DBRES_ERROR;
3320 0 : goto cleanup;
3321 : }
3322 :
3323 : // Detect PK/table/db_version boundary to flush pending batch
3324 97527 : bool pk_changed = (last_pk != NULL &&
3325 48423 : (last_pk_len != decoded_context.pk_len ||
3326 46497 : memcmp(last_pk, decoded_context.pk, last_pk_len) != 0));
3327 97527 : bool tbl_changed = (last_tbl != NULL &&
3328 48423 : (last_tbl_len != decoded_context.tbl_len ||
3329 48351 : memcmp(last_tbl, decoded_context.tbl, last_tbl_len) != 0));
3330 49104 : bool db_version_changed = (last_payload_db_version != decoded_context.db_version);
3331 :
3332 : // Flush pending batch before any boundary change
3333 49104 : if (pk_changed || tbl_changed || db_version_changed) {
3334 18227 : int flush_rc = merge_flush_pending(data);
3335 18227 : if (flush_rc != DBRES_OK) {
3336 1 : rc = flush_rc;
3337 : // continue processing remaining rows
3338 1 : }
3339 18227 : }
3340 :
3341 : // Per-db_version savepoints group rows with the same source db_version
3342 : // into one transaction. In SQLite autocommit mode, the RELEASE triggers
3343 : // the commit hook which bumps data->db_version and resets seq, ensuring
3344 : // unique (db_version, seq) tuples across groups. In PostgreSQL SPI,
3345 : // database_in_transaction() is always true so this block is inactive —
3346 : // the inner per-PK savepoint in merge_flush_pending handles RLS instead.
3347 49104 : if (in_savepoint && db_version_changed) {
3348 4290 : rc = database_commit_savepoint(data, "cloudsync_payload_apply");
3349 4290 : if (rc != DBRES_OK) {
3350 0 : merge_pending_free_entries(&batch);
3351 0 : data->pending_batch = NULL;
3352 0 : cloudsync_set_error(data, "Error on cloudsync_payload_apply: unable to release a savepoint", rc);
3353 0 : goto cleanup;
3354 : }
3355 4290 : in_savepoint = false;
3356 4290 : }
3357 :
3358 49104 : if (!in_savepoint && db_version_changed && !database_in_transaction(data)) {
3359 4971 : rc = database_begin_savepoint(data, "cloudsync_payload_apply");
3360 4971 : if (rc != DBRES_OK) {
3361 0 : merge_pending_free_entries(&batch);
3362 0 : data->pending_batch = NULL;
3363 0 : cloudsync_set_error(data, "Error on cloudsync_payload_apply: unable to start a transaction", rc);
3364 0 : goto cleanup;
3365 : }
3366 4971 : in_savepoint = true;
3367 4971 : }
3368 :
3369 : // Track db_version for batch-flush boundary detection
3370 49104 : if (db_version_changed) {
3371 4971 : last_payload_db_version = decoded_context.db_version;
3372 4971 : }
3373 :
3374 : // Update PK/table tracking
3375 49104 : last_pk = decoded_context.pk;
3376 49104 : last_pk_len = decoded_context.pk_len;
3377 49104 : last_tbl = decoded_context.tbl;
3378 49104 : last_tbl_len = decoded_context.tbl_len;
3379 :
3380 49104 : rc = databasevm_step(vm);
3381 49104 : if (rc != DBRES_DONE) {
3382 : // don't "break;", the error can be due to a RLS policy.
3383 : // in case of error we try to apply the following changes
3384 2 : }
3385 :
3386 49104 : buffer += seek;
3387 49104 : buf_len -= seek;
3388 49104 : dbvm_reset(vm);
3389 49104 : }
3390 :
3391 : // Final flush after loop
3392 : {
3393 681 : int flush_rc = merge_flush_pending(data);
3394 681 : if (flush_rc != DBRES_OK && rc == DBRES_OK) rc = flush_rc;
3395 : }
3396 681 : data->pending_batch = NULL;
3397 :
3398 681 : if (in_savepoint) {
3399 681 : int rc1 = database_commit_savepoint(data, "cloudsync_payload_apply");
3400 681 : if (rc1 != DBRES_OK) rc = rc1;
3401 681 : }
3402 :
3403 : // save last error (unused if function returns OK)
3404 681 : if (rc != DBRES_OK && rc != DBRES_DONE) {
3405 1 : cloudsync_set_dberror(data);
3406 1 : }
3407 :
3408 681 : if (rc == DBRES_DONE) rc = DBRES_OK;
3409 1361 : if (rc == DBRES_OK) {
3410 : char buf[256];
3411 680 : if (decoded_context.db_version >= dbversion) {
3412 546 : snprintf(buf, sizeof(buf), "%" PRId64, decoded_context.db_version);
3413 546 : dbutils_settings_set_key_value(data, CLOUDSYNC_KEY_CHECK_DBVERSION, buf);
3414 :
3415 546 : if (decoded_context.seq != seq) {
3416 338 : snprintf(buf, sizeof(buf), "%" PRId64, decoded_context.seq);
3417 338 : dbutils_settings_set_key_value(data, CLOUDSYNC_KEY_CHECK_SEQ, buf);
3418 338 : }
3419 546 : }
3420 680 : }
3421 :
3422 : cleanup:
3423 : // cleanup merge_pending_batch
3424 681 : if (batch.cached_vm) { databasevm_finalize(batch.cached_vm); batch.cached_vm = NULL; }
3425 681 : if (batch.cached_col_names) { cloudsync_memory_free(batch.cached_col_names); batch.cached_col_names = NULL; }
3426 681 : if (batch.entries) { cloudsync_memory_free(batch.entries); batch.entries = NULL; }
3427 :
3428 : // cleanup vm
3429 681 : if (vm) databasevm_finalize(vm);
3430 :
3431 : // cleanup memory
3432 681 : if (clone) cloudsync_memory_free(clone);
3433 :
3434 : // error already saved in (save last error)
3435 681 : if (rc != DBRES_OK) return rc;
3436 :
3437 : // return the number of processed rows
3438 680 : if (pnrows) *pnrows = nrows;
3439 680 : return DBRES_OK;
3440 685 : }
3441 :
3442 : // MARK: - Payload load/store -
3443 :
3444 0 : int cloudsync_payload_get (cloudsync_context *data, char **blob, int *blob_size, int *db_version, int64_t *new_db_version) {
3445 : // retrieve current db_version and seq
3446 0 : *db_version = dbutils_settings_get_int_value(data, CLOUDSYNC_KEY_SEND_DBVERSION);
3447 0 : if (*db_version < 0) return DBRES_ERROR;
3448 :
3449 : // retrieve BLOB
3450 : char sql[1024];
3451 0 : snprintf(sql, sizeof(sql), "WITH max_db_version AS (SELECT MAX(db_version) AS max_db_version FROM cloudsync_changes WHERE site_id=cloudsync_siteid()) "
3452 : "SELECT * FROM (SELECT cloudsync_payload_encode(tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq) AS payload, max_db_version AS max_db_version FROM cloudsync_changes, max_db_version WHERE site_id=cloudsync_siteid() AND db_version>%d) WHERE payload IS NOT NULL", *db_version);
3453 :
3454 0 : int64_t len = 0;
3455 0 : int rc = database_select_blob_int(data, sql, blob, &len, new_db_version);
3456 0 : *blob_size = (int)len;
3457 0 : if (rc != DBRES_OK) return rc;
3458 :
3459 : // exit if there is no data to send
3460 0 : if (*blob == NULL || *blob_size == 0) return DBRES_OK;
3461 0 : return rc;
3462 0 : }
3463 :
3464 : #ifdef CLOUDSYNC_DESKTOP_OS
3465 0 : int cloudsync_payload_save (cloudsync_context *data, const char *payload_path, int *size) {
3466 : DEBUG_FUNCTION("cloudsync_payload_save");
3467 :
3468 : // silently delete any other payload with the same name
3469 0 : cloudsync_file_delete(payload_path);
3470 :
3471 : // retrieve payload
3472 0 : char *blob = NULL;
3473 0 : int blob_size = 0, db_version = 0;
3474 0 : int64_t new_db_version = 0;
3475 0 : int rc = cloudsync_payload_get(data, &blob, &blob_size, &db_version, &new_db_version);
3476 0 : if (rc != DBRES_OK) {
3477 0 : if (db_version < 0) return cloudsync_set_error(data, "Unable to retrieve db_version", rc);
3478 0 : return cloudsync_set_error(data, "Unable to retrieve changes in cloudsync_payload_save", rc);
3479 : }
3480 :
3481 : // exit if there is no data to save
3482 0 : if (blob == NULL || blob_size == 0) {
3483 0 : if (size) *size = 0;
3484 0 : return DBRES_OK;
3485 : }
3486 :
3487 : // write payload to file
3488 0 : bool res = cloudsync_file_write(payload_path, blob, (size_t)blob_size);
3489 0 : cloudsync_memory_free(blob);
3490 0 : if (res == false) {
3491 0 : return cloudsync_set_error(data, "Unable to write payload to file path", DBRES_IOERR);
3492 : }
3493 :
3494 : // returns blob size
3495 0 : if (size) *size = blob_size;
3496 0 : return DBRES_OK;
3497 0 : }
3498 : #endif
3499 :
3500 : // MARK: - Core -
3501 :
3502 294 : int cloudsync_table_sanity_check (cloudsync_context *data, const char *name, CLOUDSYNC_INIT_FLAG init_flags) {
3503 : DEBUG_DBFUNCTION("cloudsync_table_sanity_check %s", name);
3504 : char buffer[2048];
3505 :
3506 : // sanity check table name
3507 294 : if (name == NULL) {
3508 1 : return cloudsync_set_error(data, "cloudsync_init requires a non-null table parameter", DBRES_ERROR);
3509 : }
3510 :
3511 : // avoid allocating heap memory for SQL statements by setting a maximum length of 512 characters
3512 : // for table names. This limit is reasonable and helps prevent memory management issues.
3513 293 : const size_t maxlen = CLOUDSYNC_MAX_TABLENAME_LEN;
3514 293 : if (strlen(name) > maxlen) {
3515 1 : snprintf(buffer, sizeof(buffer), "Table name cannot be longer than %d characters", (int)maxlen);
3516 1 : return cloudsync_set_error(data, buffer, DBRES_ERROR);
3517 : }
3518 :
3519 : // check if already initialized
3520 292 : cloudsync_table_context *table = table_lookup(data, name);
3521 292 : if (table) return DBRES_OK;
3522 :
3523 : // check if table exists
3524 288 : if (database_table_exists(data, name, cloudsync_schema(data)) == false) {
3525 2 : snprintf(buffer, sizeof(buffer), "Table %s does not exist", name);
3526 2 : return cloudsync_set_error(data, buffer, DBRES_ERROR);
3527 : }
3528 :
3529 : // no more than 128 columns can be used as a composite primary key (SQLite hard limit)
3530 286 : int npri_keys = database_count_pk(data, name, false, cloudsync_schema(data));
3531 286 : if (npri_keys < 0) return cloudsync_set_dberror(data);
3532 286 : if (npri_keys > 128) return cloudsync_set_error(data, "No more than 128 columns can be used to form a composite primary key", DBRES_ERROR);
3533 :
3534 : #if CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
3535 : // if count == 0 means that rowid will be used as primary key (BTW: very bad choice for the user)
3536 286 : if (npri_keys == 0) {
3537 1 : snprintf(buffer, sizeof(buffer), "Rowid only tables are not supported, all primary keys must be explicitly set and declared as NOT NULL (table %s)", name);
3538 1 : return cloudsync_set_error(data, buffer, DBRES_ERROR);
3539 : }
3540 : #endif
3541 :
3542 285 : bool skip_int_pk_check = (init_flags & CLOUDSYNC_INIT_FLAG_SKIP_INT_PK_CHECK) != 0;
3543 285 : if (!skip_int_pk_check) {
3544 222 : if (npri_keys == 1) {
3545 : // the affinity of a column is determined by the declared type of the column,
3546 : // according to the following rules in the order shown:
3547 : // 1. If the declared type contains the string "INT" then it is assigned INTEGER affinity.
3548 134 : int npri_keys_int = database_count_int_pk(data, name, cloudsync_schema(data));
3549 134 : if (npri_keys_int < 0) return cloudsync_set_dberror(data);
3550 134 : if (npri_keys == npri_keys_int) {
3551 1 : snprintf(buffer, sizeof(buffer), "Table %s uses a single-column INTEGER primary key. For CRDT replication, primary keys must be globally unique. Consider using a TEXT primary key with UUIDs or ULID to avoid conflicts across nodes. If you understand the risk and still want to use this INTEGER primary key, set the third argument of the cloudsync_init function to 1 to skip this check.", name);
3552 1 : return cloudsync_set_error(data, buffer, DBRES_ERROR);
3553 : }
3554 :
3555 133 : }
3556 221 : }
3557 :
3558 : // if user declared explicit primary key(s) then make sure they are all declared as NOT NULL
3559 : #if CLOUDSYNC_CHECK_NOTNULL_PRIKEYS
3560 : bool skip_notnull_prikeys_check = (init_flags & CLOUDSYNC_INIT_FLAG_SKIP_NOT_NULL_PRIKEYS_CHECK) != 0;
3561 : if (!skip_notnull_prikeys_check) {
3562 : if (npri_keys > 0) {
3563 : int npri_keys_notnull = database_count_pk(data, name, true, cloudsync_schema(data));
3564 : if (npri_keys_notnull < 0) return cloudsync_set_dberror(data);
3565 : if (npri_keys != npri_keys_notnull) {
3566 : snprintf(buffer, sizeof(buffer), "All primary keys must be explicitly declared as NOT NULL (table %s)", name);
3567 : return cloudsync_set_error(data, buffer, DBRES_ERROR);
3568 : }
3569 : }
3570 : }
3571 : #endif
3572 :
3573 : // check for columns declared as NOT NULL without a DEFAULT value.
3574 : // Otherwise, col_merge_stmt would fail if changes to other columns are inserted first.
3575 284 : bool skip_notnull_default_check = (init_flags & CLOUDSYNC_INIT_FLAG_SKIP_NOT_NULL_DEFAULT_CHECK) != 0;
3576 284 : if (!skip_notnull_default_check) {
3577 284 : int n_notnull_nodefault = database_count_notnull_without_default(data, name, cloudsync_schema(data));
3578 284 : if (n_notnull_nodefault < 0) return cloudsync_set_dberror(data);
3579 284 : if (n_notnull_nodefault > 0) {
3580 0 : snprintf(buffer, sizeof(buffer), "All non-primary key columns declared as NOT NULL must have a DEFAULT value. (table %s)", name);
3581 0 : return cloudsync_set_error(data, buffer, DBRES_ERROR);
3582 : }
3583 284 : }
3584 :
3585 284 : return DBRES_OK;
3586 294 : }
3587 :
3588 4 : int cloudsync_cleanup_internal (cloudsync_context *data, cloudsync_table_context *table) {
3589 4 : if (cloudsync_context_init(data) == NULL) return DBRES_MISUSE;
3590 :
3591 : // drop meta-table
3592 4 : const char *table_name = table->name;
3593 4 : char *sql = cloudsync_memory_mprintf(SQL_DROP_CLOUDSYNC_TABLE, table->meta_ref);
3594 4 : int rc = database_exec(data, sql);
3595 4 : cloudsync_memory_free(sql);
3596 4 : if (rc != DBRES_OK) {
3597 : char buffer[1024];
3598 0 : snprintf(buffer, sizeof(buffer), "Unable to drop cloudsync table %s_cloudsync in cloudsync_cleanup", table_name);
3599 0 : return cloudsync_set_error(data, buffer, rc);
3600 : }
3601 :
3602 : // drop blocks table if this table has block LWW columns
3603 4 : if (table->blocks_ref) {
3604 1 : sql = cloudsync_memory_mprintf(SQL_DROP_CLOUDSYNC_TABLE, table->blocks_ref);
3605 1 : rc = database_exec(data, sql);
3606 1 : cloudsync_memory_free(sql);
3607 1 : if (rc != DBRES_OK) {
3608 : char buffer[1024];
3609 0 : snprintf(buffer, sizeof(buffer), "Unable to drop blocks table %s_cloudsync_blocks in cloudsync_cleanup", table_name);
3610 0 : return cloudsync_set_error(data, buffer, rc);
3611 : }
3612 1 : }
3613 :
3614 : // drop original triggers
3615 4 : rc = database_delete_triggers(data, table_name);
3616 4 : if (rc != DBRES_OK) {
3617 : char buffer[1024];
3618 0 : snprintf(buffer, sizeof(buffer), "Unable to delete triggers for table %s", table_name);
3619 0 : return cloudsync_set_error(data, buffer, rc);
3620 : }
3621 :
3622 : // remove all table related settings
3623 4 : dbutils_table_settings_set_key_value(data, table_name, NULL, NULL, NULL);
3624 4 : return DBRES_OK;
3625 4 : }
3626 :
3627 4 : int cloudsync_cleanup (cloudsync_context *data, const char *table_name) {
3628 4 : cloudsync_table_context *table = table_lookup(data, table_name);
3629 4 : if (!table) return DBRES_OK;
3630 :
3631 : // TODO: check what happen if cloudsync_cleanup_internal failes (not eveything dropped) and the table is still in memory?
3632 :
3633 4 : int rc = cloudsync_cleanup_internal(data, table);
3634 4 : if (rc != DBRES_OK) return rc;
3635 :
3636 4 : int counter = table_remove(data, table);
3637 4 : table_free(table);
3638 :
3639 4 : if (counter == 0) {
3640 : // cleanup database on last table
3641 2 : cloudsync_reset_siteid(data);
3642 2 : dbutils_settings_cleanup(data);
3643 2 : } else {
3644 2 : if (database_internal_table_exists(data, CLOUDSYNC_TABLE_SETTINGS_NAME) == true) {
3645 2 : cloudsync_update_schema_hash(data);
3646 2 : }
3647 : }
3648 :
3649 4 : return DBRES_OK;
3650 4 : }
3651 :
3652 0 : int cloudsync_cleanup_all (cloudsync_context *data) {
3653 0 : return database_cleanup(data);
3654 : }
3655 :
3656 484 : int cloudsync_terminate (cloudsync_context *data) {
3657 : // can't use for/loop here because data->tables_count is changed by table_remove
3658 740 : while (data->tables_count > 0) {
3659 256 : cloudsync_table_context *t = data->tables[data->tables_count - 1];
3660 256 : table_remove(data, t);
3661 256 : table_free(t);
3662 : }
3663 :
3664 484 : if (data->schema_version_stmt) databasevm_finalize(data->schema_version_stmt);
3665 484 : if (data->data_version_stmt) databasevm_finalize(data->data_version_stmt);
3666 484 : if (data->db_version_stmt) databasevm_finalize(data->db_version_stmt);
3667 484 : if (data->getset_siteid_stmt) databasevm_finalize(data->getset_siteid_stmt);
3668 484 : if (data->current_schema) cloudsync_memory_free(data->current_schema);
3669 :
3670 484 : data->schema_version_stmt = NULL;
3671 484 : data->data_version_stmt = NULL;
3672 484 : data->db_version_stmt = NULL;
3673 484 : data->getset_siteid_stmt = NULL;
3674 484 : data->current_schema = NULL;
3675 :
3676 : // reset the site_id so the cloudsync_context_init will be executed again
3677 : // if any other cloudsync function is called after terminate
3678 484 : data->site_id[0] = 0;
3679 :
3680 484 : return 1;
3681 : }
3682 :
3683 289 : int cloudsync_init_table (cloudsync_context *data, const char *table_name, const char *algo_name, CLOUDSYNC_INIT_FLAG init_flags) {
3684 : // sanity check table and its primary key(s)
3685 289 : int rc = cloudsync_table_sanity_check(data, table_name, init_flags);
3686 289 : if (rc != DBRES_OK) return rc;
3687 :
3688 : // init cloudsync_settings
3689 287 : if (cloudsync_context_init(data) == NULL) {
3690 0 : return cloudsync_set_error(data, "Unable to initialize cloudsync context", DBRES_MISUSE);
3691 : }
3692 :
3693 : // sanity check algo name (if exists)
3694 287 : table_algo algo_new = table_algo_none;
3695 287 : if (!algo_name) algo_name = CLOUDSYNC_DEFAULT_ALGO;
3696 :
3697 287 : algo_new = cloudsync_algo_from_name(algo_name);
3698 287 : if (algo_new == table_algo_none) {
3699 : char buffer[1024];
3700 1 : snprintf(buffer, sizeof(buffer), "Unknown CRDT algorithm name %s", algo_name);
3701 1 : return cloudsync_set_error(data, buffer, DBRES_ERROR);
3702 : }
3703 :
3704 : // DWS and AWS algorithms are not yet implemented in the merge logic
3705 286 : if (algo_new == table_algo_crdt_dws || algo_new == table_algo_crdt_aws) {
3706 : char buffer[1024];
3707 2 : snprintf(buffer, sizeof(buffer), "CRDT algorithm %s is not yet supported", algo_name);
3708 2 : return cloudsync_set_error(data, buffer, DBRES_ERROR);
3709 : }
3710 :
3711 : // check if table name was already augmented
3712 284 : table_algo algo_current = dbutils_table_settings_get_algo(data, table_name);
3713 :
3714 : // sanity check algorithm
3715 284 : if ((algo_new == algo_current) && (algo_current != table_algo_none)) {
3716 : // if table algorithms and the same and not none, do nothing
3717 284 : } else if ((algo_new == table_algo_none) && (algo_current == table_algo_none)) {
3718 : // nothing is written into settings because the default table_algo_crdt_cls will be used
3719 0 : algo_new = algo_current = table_algo_crdt_cls;
3720 256 : } else if ((algo_new == table_algo_none) && (algo_current != table_algo_none)) {
3721 : // algo is already written into settins so just use it
3722 0 : algo_new = algo_current;
3723 256 : } else if ((algo_new != table_algo_none) && (algo_current == table_algo_none)) {
3724 : // write table algo name in settings
3725 256 : dbutils_table_settings_set_key_value(data, table_name, "*", "algo", algo_name);
3726 256 : } else {
3727 : // error condition
3728 0 : return cloudsync_set_error(data, "The function cloudsync_cleanup(table) must be called before changing a table algorithm", DBRES_MISUSE);
3729 : }
3730 :
3731 : // Run the following function even if table was already augmented.
3732 : // It is safe to call the following function multiple times, if there is nothing to update nothing will be changed.
3733 : // After an alter table, in contrast, all the cloudsync triggers, tables and stmts must be recreated.
3734 :
3735 : // sync algo with table (unused in this version)
3736 : // cloudsync_sync_table_key(data, table_name, "*", CLOUDSYNC_KEY_ALGO, crdt_algo_name(algo_new));
3737 :
3738 : // read row-level filter from settings (if any)
3739 : char init_filter_buf[2048];
3740 284 : int init_frc = dbutils_table_settings_get_value(data, table_name, "*", "filter", init_filter_buf, sizeof(init_filter_buf));
3741 284 : const char *init_filter = (init_frc == DBRES_OK && init_filter_buf[0]) ? init_filter_buf : NULL;
3742 :
3743 : // check triggers
3744 284 : rc = database_create_triggers(data, table_name, algo_new, init_filter);
3745 284 : if (rc != DBRES_OK) return cloudsync_set_error(data, "An error occurred while creating triggers", DBRES_MISUSE);
3746 :
3747 : // check meta-table
3748 284 : rc = database_create_metatable(data, table_name);
3749 284 : if (rc != DBRES_OK) return cloudsync_set_error(data, "An error occurred while creating metatable", DBRES_MISUSE);
3750 :
3751 : // add prepared statements
3752 284 : if (cloudsync_add_dbvms(data) != DBRES_OK) {
3753 0 : return cloudsync_set_error(data, "An error occurred while trying to compile prepared SQL statements", DBRES_MISUSE);
3754 : }
3755 :
3756 : // add table to in-memory data context
3757 284 : if (table_add_to_context(data, algo_new, table_name) == false) {
3758 : char buffer[1024];
3759 0 : snprintf(buffer, sizeof(buffer), "An error occurred while adding %s table information to global context", table_name);
3760 0 : return cloudsync_set_error(data, buffer, DBRES_MISUSE);
3761 : }
3762 :
3763 284 : if (cloudsync_refill_metatable(data, table_name) != DBRES_OK) {
3764 0 : return cloudsync_set_error(data, "An error occurred while trying to fill the augmented table", DBRES_MISUSE);
3765 : }
3766 :
3767 284 : return DBRES_OK;
3768 289 : }
|