Line data Source code
1 : //
2 : // cloudsync.c
3 : // cloudsync
4 : //
5 : // Created by Marco Bambini on 16/05/24.
6 : //
7 :
8 : #include <inttypes.h>
9 : #include <stdbool.h>
10 : #include <limits.h>
11 : #include <stdint.h>
12 : #include <stdlib.h>
13 : #include <string.h>
14 : #include <assert.h>
15 : #include <stdio.h>
16 : #include <errno.h>
17 : #include <math.h>
18 :
19 : #include "cloudsync.h"
20 : #include "lz4.h"
21 : #include "pk.h"
22 : #include "sql.h"
23 : #include "utils.h"
24 : #include "dbutils.h"
25 : #include "block.h"
26 :
27 : #ifdef _WIN32
28 : #include <winsock2.h>
29 : #include <ws2tcpip.h>
30 : #else
31 : #include <arpa/inet.h> // for htonl, htons, ntohl, ntohs
32 : #include <netinet/in.h> // for struct sockaddr_in, INADDR_ANY, etc. (if needed)
33 : #endif
34 :
35 : #ifndef htonll
36 : #if __BIG_ENDIAN__
37 : #define htonll(x) (x)
38 : #define ntohll(x) (x)
39 : #else
40 : #ifndef htobe64
41 : #define htonll(x) ((uint64_t)htonl((x) & 0xFFFFFFFF) << 32 | (uint64_t)htonl((x) >> 32))
42 : #define ntohll(x) ((uint64_t)ntohl((x) & 0xFFFFFFFF) << 32 | (uint64_t)ntohl((x) >> 32))
43 : #else
44 : #define htonll(x) htobe64(x)
45 : #define ntohll(x) be64toh(x)
46 : #endif
47 : #endif
48 : #endif
49 :
50 : #define CLOUDSYNC_INIT_NTABLES 64
51 : #define CLOUDSYNC_MIN_DB_VERSION 0
52 :
53 : #define CLOUDSYNC_PAYLOAD_MINBUF_SIZE (512*1024)
54 : #define CLOUDSYNC_PAYLOAD_SIGNATURE 0x434C5359 /* 'C','L','S','Y' */
55 : #define CLOUDSYNC_PAYLOAD_VERSION_ORIGNAL 1
56 : #define CLOUDSYNC_PAYLOAD_VERSION_1 CLOUDSYNC_PAYLOAD_VERSION_ORIGNAL
57 : #define CLOUDSYNC_PAYLOAD_VERSION_2 2
58 : #define CLOUDSYNC_PAYLOAD_VERSION_LATEST CLOUDSYNC_PAYLOAD_VERSION_2
59 : #define CLOUDSYNC_PAYLOAD_MIN_VERSION_WITH_CHECKSUM CLOUDSYNC_PAYLOAD_VERSION_2
60 :
61 : #ifndef MAX
62 : #define MAX(a, b) (((a)>(b))?(a):(b))
63 : #endif
64 :
65 : #define DEBUG_DBERROR(_rc, _fn, _data) do {if (_rc != DBRES_OK) printf("Error in %s: %s\n", _fn, database_errmsg(_data));} while (0)
66 :
67 : typedef enum {
68 : CLOUDSYNC_PK_INDEX_TBL = 0,
69 : CLOUDSYNC_PK_INDEX_PK = 1,
70 : CLOUDSYNC_PK_INDEX_COLNAME = 2,
71 : CLOUDSYNC_PK_INDEX_COLVALUE = 3,
72 : CLOUDSYNC_PK_INDEX_COLVERSION = 4,
73 : CLOUDSYNC_PK_INDEX_DBVERSION = 5,
74 : CLOUDSYNC_PK_INDEX_SITEID = 6,
75 : CLOUDSYNC_PK_INDEX_CL = 7,
76 : CLOUDSYNC_PK_INDEX_SEQ = 8
77 : } CLOUDSYNC_PK_INDEX;
78 :
79 : typedef enum {
80 : DBVM_VALUE_ERROR = -1,
81 : DBVM_VALUE_UNCHANGED = 0,
82 : DBVM_VALUE_CHANGED = 1,
83 : } DBVM_VALUE;
84 :
85 : #define SYNCBIT_SET(_data) _data->insync = 1
86 : #define SYNCBIT_RESET(_data) _data->insync = 0
87 :
88 : // MARK: - Deferred column-batch merge -
89 :
90 : typedef struct {
91 : const char *col_name; // pointer into table_context->col_name[idx] (stable)
92 : dbvalue_t *col_value; // duplicated via database_value_dup (owned)
93 : int64_t col_version;
94 : int64_t db_version;
95 : uint8_t site_id[UUID_LEN];
96 : int site_id_len;
97 : int64_t seq;
98 : } merge_pending_entry;
99 :
100 : typedef struct {
101 : cloudsync_table_context *table;
102 : char *pk; // malloc'd copy, freed on flush
103 : int pk_len;
104 : int64_t cl;
105 : bool sentinel_pending;
106 : bool row_exists; // true when the PK already exists locally
107 : int count;
108 : int capacity;
109 : merge_pending_entry *entries;
110 :
111 : // Statement cache — reuse the prepared statement when the column
112 : // combination and row_exists flag match between consecutive PK flushes.
113 : dbvm_t *cached_vm;
114 : bool cached_row_exists;
115 : int cached_col_count;
116 : const char **cached_col_names; // array of pointers into table_context (not owned)
117 : } merge_pending_batch;
118 :
119 : // MARK: -
120 :
121 : struct cloudsync_pk_decode_bind_context {
122 : dbvm_t *vm;
123 : char *tbl;
124 : int64_t tbl_len;
125 : const void *pk;
126 : int64_t pk_len;
127 : char *col_name;
128 : int64_t col_name_len;
129 : int64_t col_version;
130 : int64_t db_version;
131 : const void *site_id;
132 : int64_t site_id_len;
133 : int64_t cl;
134 : int64_t seq;
135 : };
136 :
137 : struct cloudsync_context {
138 : void *db;
139 : char errmsg[1024];
140 : int errcode;
141 :
142 : char *libversion;
143 : uint8_t site_id[UUID_LEN];
144 : int insync;
145 : int debug;
146 : bool merge_equal_values;
147 : void *aux_data;
148 :
149 : // stmts and context values
150 : dbvm_t *schema_version_stmt;
151 : dbvm_t *data_version_stmt;
152 : dbvm_t *db_version_stmt;
153 : dbvm_t *getset_siteid_stmt;
154 : int data_version;
155 : int schema_version;
156 : uint64_t schema_hash;
157 :
158 : // set at transaction start and reset on commit/rollback
159 : int64_t db_version;
160 : // version the DB would have if the transaction committed now
161 : int64_t pending_db_version;
162 : // used to set an order inside each transaction
163 : int seq;
164 :
165 : // optional schema_name to be set in the cloudsync_table_context
166 : char *current_schema;
167 :
168 : // augmented tables are stored in-memory so we do not need to retrieve information about
169 : // col_names and cid from the disk each time a write statement is performed
170 : // we do also not need to use an hash map here because for few tables the direct
171 : // in-memory comparison with table name is faster
172 : cloudsync_table_context **tables; // dense vector: [0..tables_count-1] are valid
173 : int tables_count; // size
174 : int tables_cap; // capacity
175 :
176 : int skip_decode_idx; // -1 in sqlite, col_value index in postgresql
177 :
178 : // deferred column-batch merge (active during payload_apply)
179 : merge_pending_batch *pending_batch;
180 : };
181 :
182 : struct cloudsync_table_context {
183 : table_algo algo; // CRDT algoritm associated to the table
184 : char *name; // table name
185 : char *schema; // table schema
186 : char *meta_ref; // schema-qualified meta table name (e.g. "schema"."name_cloudsync")
187 : char *base_ref; // schema-qualified base table name (e.g. "schema"."name")
188 : char **col_name; // array of column names
189 : dbvm_t **col_merge_stmt; // array of merge insert stmt (indexed by col_name)
190 : dbvm_t **col_value_stmt; // array of column value stmt (indexed by col_name)
191 : int *col_id; // array of column id
192 : col_algo_t *col_algo; // per-column algorithm (normal or block)
193 : char **col_delimiter; // per-column delimiter for block splitting (NULL = default "\n")
194 : bool has_block_cols; // quick check: does this table have any block columns?
195 : dbvm_t *block_value_read_stmt; // SELECT col_value FROM blocks table
196 : dbvm_t *block_value_write_stmt; // INSERT OR REPLACE into blocks table
197 : dbvm_t *block_value_delete_stmt; // DELETE from blocks table
198 : dbvm_t *block_list_stmt; // SELECT block entries for materialization
199 : char *blocks_ref; // schema-qualified blocks table name
200 : int ncols; // number of non primary key cols
201 : int npks; // number of primary key cols
202 : bool enabled; // flag to check if a table is enabled or disabled
203 : #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
204 : bool rowid_only; // a table with no primary keys other than the implicit rowid
205 : #endif
206 :
207 : char **pk_name; // array of primary key names
208 :
209 : // precompiled statements
210 : dbvm_t *meta_pkexists_stmt; // check if a primary key already exist in the augmented table
211 : dbvm_t *meta_sentinel_update_stmt; // update a local sentinel row
212 : dbvm_t *meta_sentinel_insert_stmt; // insert a local sentinel row
213 : dbvm_t *meta_row_insert_update_stmt; // insert/update a local row
214 : dbvm_t *meta_row_drop_stmt; // delete rows from meta
215 : dbvm_t *meta_update_move_stmt; // update rows in meta when pk changes
216 : dbvm_t *meta_local_cl_stmt; // compute local cl value
217 : dbvm_t *meta_winner_clock_stmt; // get the rowid of the last inserted/updated row in the meta table
218 : dbvm_t *meta_merge_delete_drop;
219 : dbvm_t *meta_zero_clock_stmt;
220 : dbvm_t *meta_col_version_stmt;
221 : dbvm_t *meta_site_id_stmt;
222 :
223 : dbvm_t *real_col_values_stmt; // retrieve all column values based on pk
224 : dbvm_t *real_merge_delete_stmt;
225 : dbvm_t *real_merge_sentinel_stmt;
226 :
227 : // context
228 : cloudsync_context *context;
229 : };
230 :
231 : struct cloudsync_payload_context {
232 : char *buffer;
233 : size_t bsize;
234 : size_t balloc;
235 : size_t bused;
236 : uint64_t nrows;
237 : uint16_t ncols;
238 : };
239 :
240 : #ifdef _MSC_VER
241 : #pragma pack(push, 1) // For MSVC: pack struct with 1-byte alignment
242 : #define PACKED
243 : #else
244 : #define PACKED __attribute__((__packed__))
245 : #endif
246 :
247 : typedef struct PACKED {
248 : uint32_t signature; // 'CLSY'
249 : uint8_t version; // protocol version
250 : uint8_t libversion[3]; // major.minor.patch
251 : uint32_t expanded_size;
252 : uint16_t ncols;
253 : uint32_t nrows;
254 : uint64_t schema_hash;
255 : uint8_t checksum[6]; // 48 bits checksum (to ensure struct is 32 bytes)
256 : } cloudsync_payload_header;
257 :
258 : #ifdef _MSC_VER
259 : #pragma pack(pop)
260 : #endif
261 :
262 : #if CLOUDSYNC_UNITTEST
263 : bool force_uncompressed_blob = false;
264 : #define CHECK_FORCE_UNCOMPRESSED_BUFFER() if (force_uncompressed_blob) use_uncompressed_buffer = true
265 : #else
266 : #define CHECK_FORCE_UNCOMPRESSED_BUFFER()
267 : #endif
268 :
269 : // Internal prototypes
270 : int local_mark_insert_or_update_meta (cloudsync_table_context *table, const void *pk, size_t pklen, const char *col_name, int64_t db_version, int seq);
271 :
272 : // MARK: - CRDT algos -
273 :
274 513 : table_algo cloudsync_algo_from_name (const char *algo_name) {
275 513 : if (algo_name == NULL) return table_algo_none;
276 :
277 513 : if ((strcasecmp(algo_name, "CausalLengthSet") == 0) || (strcasecmp(algo_name, "cls") == 0)) return table_algo_crdt_cls;
278 224 : if ((strcasecmp(algo_name, "GrowOnlySet") == 0) || (strcasecmp(algo_name, "gos") == 0)) return table_algo_crdt_gos;
279 217 : if ((strcasecmp(algo_name, "DeleteWinsSet") == 0) || (strcasecmp(algo_name, "dws") == 0)) return table_algo_crdt_dws;
280 217 : if ((strcasecmp(algo_name, "AddWinsSet") == 0) || (strcasecmp(algo_name, "aws") == 0)) return table_algo_crdt_aws;
281 :
282 : // if nothing is found
283 217 : return table_algo_none;
284 513 : }
285 :
286 110 : const char *cloudsync_algo_name (table_algo algo) {
287 110 : switch (algo) {
288 105 : case table_algo_crdt_cls: return "cls";
289 1 : case table_algo_crdt_gos: return "gos";
290 1 : case table_algo_crdt_dws: return "dws";
291 1 : case table_algo_crdt_aws: return "aws";
292 1 : case table_algo_none: return NULL;
293 : }
294 1 : return NULL;
295 110 : }
296 :
297 : // MARK: - DBVM Utils -
298 :
299 31455 : DBVM_VALUE dbvm_execute (dbvm_t *stmt, cloudsync_context *data) {
300 31455 : if (!stmt) return DBVM_VALUE_ERROR;
301 :
302 31455 : int rc = databasevm_step(stmt);
303 31455 : if (rc != DBRES_ROW && rc != DBRES_DONE) {
304 2 : if (data) DEBUG_DBERROR(rc, "stmt_execute", data);
305 2 : databasevm_reset(stmt);
306 2 : return DBVM_VALUE_ERROR;
307 : }
308 :
309 31453 : DBVM_VALUE result = DBVM_VALUE_CHANGED;
310 31453 : if (stmt == data->data_version_stmt) {
311 29319 : int version = (int)database_column_int(stmt, 0);
312 29319 : if (version != data->data_version) {
313 185 : data->data_version = version;
314 185 : } else {
315 29134 : result = DBVM_VALUE_UNCHANGED;
316 : }
317 31453 : } else if (stmt == data->schema_version_stmt) {
318 1067 : int version = (int)database_column_int(stmt, 0);
319 1067 : if (version > data->schema_version) {
320 313 : data->schema_version = version;
321 313 : } else {
322 754 : result = DBVM_VALUE_UNCHANGED;
323 : }
324 :
325 2134 : } else if (stmt == data->db_version_stmt) {
326 1067 : data->db_version = (rc == DBRES_DONE) ? CLOUDSYNC_MIN_DB_VERSION : database_column_int(stmt, 0);
327 1067 : }
328 :
329 31453 : databasevm_reset(stmt);
330 31453 : return result;
331 31455 : }
332 :
333 3514 : int dbvm_count (dbvm_t *stmt, const char *value, size_t len, int type) {
334 3514 : int result = -1;
335 3514 : int rc = DBRES_OK;
336 :
337 3514 : if (value) {
338 3513 : rc = (type == DBTYPE_TEXT) ? databasevm_bind_text(stmt, 1, value, (int)len) : databasevm_bind_blob(stmt, 1, value, len);
339 3513 : if (rc != DBRES_OK) goto cleanup;
340 3513 : }
341 :
342 3514 : rc = databasevm_step(stmt);
343 7028 : if (rc == DBRES_DONE) {
344 1 : result = 0;
345 1 : rc = DBRES_OK;
346 3514 : } else if (rc == DBRES_ROW) {
347 3513 : result = (int)database_column_int(stmt, 0);
348 3513 : rc = DBRES_OK;
349 3513 : }
350 :
351 : cleanup:
352 3514 : databasevm_reset(stmt);
353 3514 : return result;
354 : }
355 :
356 254869 : void dbvm_reset (dbvm_t *stmt) {
357 254869 : if (!stmt) return;
358 231934 : databasevm_clear_bindings(stmt);
359 231934 : databasevm_reset(stmt);
360 254869 : }
361 :
362 : // MARK: - Database Version -
363 :
364 557 : char *cloudsync_dbversion_build_query (cloudsync_context *data) {
365 : // this function must be manually called each time tables changes
366 : // because the query plan changes too and it must be re-prepared
367 : // unfortunately there is no other way
368 :
369 : // we need to execute a query like:
370 : /*
371 : SELECT max(version) as version FROM (
372 : SELECT max(db_version) as version FROM "table1_cloudsync"
373 : UNION ALL
374 : SELECT max(db_version) as version FROM "table2_cloudsync"
375 : UNION ALL
376 : SELECT max(db_version) as version FROM "table3_cloudsync"
377 : UNION
378 : SELECT value as version FROM cloudsync_settings WHERE key = 'pre_alter_dbversion'
379 : )
380 : */
381 :
382 : // the good news is that the query can be computed in SQLite without the need to do any extra computation from the host language
383 :
384 557 : char *value = NULL;
385 557 : int rc = database_select_text(data, SQL_DBVERSION_BUILD_QUERY, &value);
386 557 : return (rc == DBRES_OK) ? value : NULL;
387 : }
388 :
389 743 : int cloudsync_dbversion_rebuild (cloudsync_context *data) {
390 743 : if (data->db_version_stmt) {
391 371 : databasevm_finalize(data->db_version_stmt);
392 371 : data->db_version_stmt = NULL;
393 371 : }
394 :
395 743 : int64_t count = dbutils_table_settings_count_tables(data);
396 743 : if (count == 0) return DBRES_OK;
397 557 : else if (count == -1) return cloudsync_set_dberror(data);
398 :
399 557 : char *sql = cloudsync_dbversion_build_query(data);
400 557 : if (!sql) return DBRES_NOMEM;
401 : DEBUG_SQL("db_version_stmt: %s", sql);
402 :
403 557 : int rc = databasevm_prepare(data, sql, (void **)&data->db_version_stmt, DBFLAG_PERSISTENT);
404 : DEBUG_STMT("db_version_stmt %p", data->db_version_stmt);
405 557 : cloudsync_memory_free(sql);
406 557 : return rc;
407 743 : }
408 :
409 1067 : int cloudsync_dbversion_rerun (cloudsync_context *data) {
410 1067 : DBVM_VALUE schema_changed = dbvm_execute(data->schema_version_stmt, data);
411 1067 : if (schema_changed == DBVM_VALUE_ERROR) return -1;
412 :
413 1067 : if (schema_changed == DBVM_VALUE_CHANGED) {
414 313 : int rc = cloudsync_dbversion_rebuild(data);
415 313 : if (rc != DBRES_OK) return -1;
416 313 : }
417 :
418 1067 : if (!data->db_version_stmt) {
419 0 : data->db_version = CLOUDSYNC_MIN_DB_VERSION;
420 0 : return 0;
421 : }
422 :
423 1067 : DBVM_VALUE rc = dbvm_execute(data->db_version_stmt, data);
424 1067 : if (rc == DBVM_VALUE_ERROR) return -1;
425 1067 : return 0;
426 1067 : }
427 :
428 29319 : int cloudsync_dbversion_check_uptodate (cloudsync_context *data) {
429 : // perform a PRAGMA data_version to check if some other process write any data
430 29319 : DBVM_VALUE rc = dbvm_execute(data->data_version_stmt, data);
431 29319 : if (rc == DBVM_VALUE_ERROR) return -1;
432 :
433 : // db_version is already set and there is no need to update it
434 29319 : if (data->db_version != CLOUDSYNC_VALUE_NOTSET && rc == DBVM_VALUE_UNCHANGED) return 0;
435 :
436 1067 : return cloudsync_dbversion_rerun(data);
437 29319 : }
438 :
439 29295 : int64_t cloudsync_dbversion_next (cloudsync_context *data, int64_t merging_version) {
440 29295 : int rc = cloudsync_dbversion_check_uptodate(data);
441 29295 : if (rc != DBRES_OK) return -1;
442 :
443 29295 : int64_t result = data->db_version + 1;
444 29295 : if (result < data->pending_db_version) result = data->pending_db_version;
445 29295 : if (merging_version != CLOUDSYNC_VALUE_NOTSET && result < merging_version) result = merging_version;
446 29295 : data->pending_db_version = result;
447 :
448 29295 : return result;
449 29295 : }
450 :
451 : // MARK: - PK Context -
452 :
453 0 : char *cloudsync_pk_context_tbl (cloudsync_pk_decode_bind_context *ctx, int64_t *tbl_len) {
454 0 : *tbl_len = ctx->tbl_len;
455 0 : return ctx->tbl;
456 : }
457 :
458 0 : void *cloudsync_pk_context_pk (cloudsync_pk_decode_bind_context *ctx, int64_t *pk_len) {
459 0 : *pk_len = ctx->pk_len;
460 0 : return (void *)ctx->pk;
461 : }
462 :
463 0 : char *cloudsync_pk_context_colname (cloudsync_pk_decode_bind_context *ctx, int64_t *colname_len) {
464 0 : *colname_len = ctx->col_name_len;
465 0 : return ctx->col_name;
466 : }
467 :
468 0 : int64_t cloudsync_pk_context_cl (cloudsync_pk_decode_bind_context *ctx) {
469 0 : return ctx->cl;
470 : }
471 :
472 0 : int64_t cloudsync_pk_context_dbversion (cloudsync_pk_decode_bind_context *ctx) {
473 0 : return ctx->db_version;
474 : }
475 :
476 : // MARK: - CloudSync Context -
477 :
478 12547 : int cloudsync_insync (cloudsync_context *data) {
479 12547 : return data->insync;
480 : }
481 :
482 731 : void *cloudsync_siteid (cloudsync_context *data) {
483 731 : return (void *)data->site_id;
484 : }
485 :
486 1 : void cloudsync_reset_siteid (cloudsync_context *data) {
487 1 : memset(data->site_id, 0, sizeof(uint8_t) * UUID_LEN);
488 1 : }
489 :
490 187 : int cloudsync_load_siteid (cloudsync_context *data) {
491 : // check if site_id was already loaded
492 187 : if (data->site_id[0] != 0) return DBRES_OK;
493 :
494 : // load site_id
495 186 : char *buffer = NULL;
496 186 : int64_t size = 0;
497 186 : int rc = database_select_blob(data, SQL_SITEID_SELECT_ROWID0, &buffer, &size);
498 186 : if (rc != DBRES_OK) return rc;
499 186 : if (!buffer || size != UUID_LEN) {
500 0 : if (buffer) cloudsync_memory_free(buffer);
501 0 : return cloudsync_set_error(data, "Unable to retrieve siteid", DBRES_MISUSE);
502 : }
503 :
504 186 : memcpy(data->site_id, buffer, UUID_LEN);
505 186 : cloudsync_memory_free(buffer);
506 :
507 186 : return DBRES_OK;
508 187 : }
509 :
510 1 : int64_t cloudsync_dbversion (cloudsync_context *data) {
511 1 : return data->db_version;
512 : }
513 :
514 11937 : int cloudsync_bumpseq (cloudsync_context *data) {
515 11937 : int value = data->seq;
516 11937 : data->seq += 1;
517 11937 : return value;
518 : }
519 :
520 245 : void cloudsync_update_schema_hash (cloudsync_context *data) {
521 245 : database_update_schema_hash(data, &data->schema_hash);
522 245 : }
523 :
524 57595 : void *cloudsync_db (cloudsync_context *data) {
525 57595 : return data->db;
526 : }
527 :
528 430 : int cloudsync_add_dbvms (cloudsync_context *data) {
529 : DEBUG_DBFUNCTION("cloudsync_add_stmts");
530 :
531 430 : if (data->data_version_stmt == NULL) {
532 186 : int rc = databasevm_prepare(data, SQL_DATA_VERSION, (void **)&data->data_version_stmt, DBFLAG_PERSISTENT);
533 : DEBUG_STMT("data_version_stmt %p", data->data_version_stmt);
534 186 : if (rc != DBRES_OK) return rc;
535 : DEBUG_SQL("data_version_stmt: %s", SQL_DATA_VERSION);
536 186 : }
537 :
538 430 : if (data->schema_version_stmt == NULL) {
539 186 : int rc = databasevm_prepare(data, SQL_SCHEMA_VERSION, (void **)&data->schema_version_stmt, DBFLAG_PERSISTENT);
540 : DEBUG_STMT("schema_version_stmt %p", data->schema_version_stmt);
541 186 : if (rc != DBRES_OK) return rc;
542 : DEBUG_SQL("schema_version_stmt: %s", SQL_SCHEMA_VERSION);
543 186 : }
544 :
545 430 : if (data->getset_siteid_stmt == NULL) {
546 : // get and set index of the site_id
547 : // in SQLite, we can’t directly combine an INSERT and a SELECT to both insert a row and return an identifier (rowid) in a single statement,
548 : // however, we can use a workaround by leveraging the INSERT statement with ON CONFLICT DO UPDATE and then combining it with RETURNING rowid
549 186 : int rc = databasevm_prepare(data, SQL_SITEID_GETSET_ROWID_BY_SITEID, (void **)&data->getset_siteid_stmt, DBFLAG_PERSISTENT);
550 : DEBUG_STMT("getset_siteid_stmt %p", data->getset_siteid_stmt);
551 186 : if (rc != DBRES_OK) return rc;
552 : DEBUG_SQL("getset_siteid_stmt: %s", SQL_SITEID_GETSET_ROWID_BY_SITEID);
553 186 : }
554 :
555 430 : return cloudsync_dbversion_rebuild(data);
556 430 : }
557 :
558 17 : int cloudsync_set_error (cloudsync_context *data, const char *err_user, int err_code) {
559 : // force err_code to be something different than OK
560 17 : if (err_code == DBRES_OK) err_code = database_errcode(data);
561 17 : if (err_code == DBRES_OK) err_code = DBRES_ERROR;
562 :
563 : // compute a meaningful error message
564 17 : if (err_user == NULL) {
565 4 : snprintf(data->errmsg, sizeof(data->errmsg), "%s", database_errmsg(data));
566 4 : } else {
567 13 : const char *db_error = database_errmsg(data);
568 : char db_error_copy[sizeof(data->errmsg)];
569 13 : int rc = database_errcode(data);
570 13 : if (rc == DBRES_OK) {
571 13 : snprintf(data->errmsg, sizeof(data->errmsg), "%s", err_user);
572 13 : } else {
573 0 : if (db_error == data->errmsg) {
574 0 : snprintf(db_error_copy, sizeof(db_error_copy), "%s", db_error);
575 0 : db_error = db_error_copy;
576 0 : }
577 0 : snprintf(data->errmsg, sizeof(data->errmsg), "%s (%s)", err_user, db_error);
578 : }
579 : }
580 :
581 17 : data->errcode = err_code;
582 17 : return err_code;
583 : }
584 :
585 4 : int cloudsync_set_dberror (cloudsync_context *data) {
586 4 : return cloudsync_set_error(data, NULL, DBRES_OK);
587 : }
588 :
589 8 : const char *cloudsync_errmsg (cloudsync_context *data) {
590 8 : return data->errmsg;
591 : }
592 :
593 3 : int cloudsync_errcode (cloudsync_context *data) {
594 3 : return data->errcode;
595 : }
596 :
597 2 : void cloudsync_reset_error (cloudsync_context *data) {
598 2 : data->errmsg[0] = 0;
599 2 : data->errcode = DBRES_OK;
600 2 : }
601 :
602 3 : void *cloudsync_auxdata (cloudsync_context *data) {
603 3 : return data->aux_data;
604 : }
605 :
606 2 : void cloudsync_set_auxdata (cloudsync_context *data, void *xdata) {
607 2 : data->aux_data = xdata;
608 2 : }
609 :
610 6 : void cloudsync_set_schema (cloudsync_context *data, const char *schema) {
611 6 : if (data->current_schema && schema && strcmp(data->current_schema, schema) == 0) return;
612 5 : if (data->current_schema) cloudsync_memory_free(data->current_schema);
613 5 : data->current_schema = NULL;
614 5 : if (schema) data->current_schema = cloudsync_string_dup_lowercase(schema);
615 6 : }
616 :
617 1079 : const char *cloudsync_schema (cloudsync_context *data) {
618 1079 : return data->current_schema;
619 : }
620 :
621 4 : const char *cloudsync_table_schema (cloudsync_context *data, const char *table_name) {
622 4 : cloudsync_table_context *table = table_lookup(data, table_name);
623 4 : if (!table) return NULL;
624 :
625 1 : return table->schema;
626 4 : }
627 :
628 : // MARK: - Table Utils -
629 :
630 69 : void table_pknames_free (char **names, int nrows) {
631 69 : if (!names) return;
632 132 : for (int i = 0; i < nrows; ++i) {cloudsync_memory_free(names[i]);}
633 46 : cloudsync_memory_free(names);
634 69 : }
635 :
636 240 : char *table_build_mergedelete_sql (cloudsync_table_context *table) {
637 : #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
638 : if (table->rowid_only) {
639 : char *sql = memory_mprintf(SQL_DELETE_ROW_BY_ROWID, table->name);
640 : return sql;
641 : }
642 : #endif
643 :
644 240 : return sql_build_delete_by_pk(table->context, table->name, table->schema);
645 : }
646 :
647 1258 : char *table_build_mergeinsert_sql (cloudsync_table_context *table, const char *colname) {
648 1258 : char *sql = NULL;
649 :
650 : #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
651 : if (table->rowid_only) {
652 : if (colname == NULL) {
653 : // INSERT OR IGNORE INTO customers (first_name,last_name) VALUES (?,?);
654 : sql = memory_mprintf(SQL_INSERT_ROWID_IGNORE, table->name);
655 : } else {
656 : // INSERT INTO customers (first_name,last_name,age) VALUES (?,?,?) ON CONFLICT DO UPDATE SET age=?;
657 : sql = memory_mprintf(SQL_UPSERT_ROWID_AND_COL_BY_ROWID, table->name, colname, colname);
658 : }
659 : return sql;
660 : }
661 : #endif
662 :
663 1258 : if (colname == NULL) {
664 : // is sentinel insert
665 240 : sql = sql_build_insert_pk_ignore(table->context, table->name, table->schema);
666 240 : } else {
667 1018 : sql = sql_build_upsert_pk_and_col(table->context, table->name, colname, table->schema);
668 : }
669 1258 : return sql;
670 : }
671 :
672 1018 : char *table_build_value_sql (cloudsync_table_context *table, const char *colname) {
673 : #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
674 : if (table->rowid_only) {
675 : char *colnamequote = "\"";
676 : char *sql = memory_mprintf(SQL_SELECT_COLS_BY_ROWID_FMT, colnamequote, colname, colnamequote, table->name);
677 : return sql;
678 : }
679 : #endif
680 :
681 : // SELECT age FROM customers WHERE first_name=? AND last_name=?;
682 1018 : return sql_build_select_cols_by_pk(table->context, table->name, colname, table->schema);
683 : }
684 :
685 240 : cloudsync_table_context *table_create (cloudsync_context *data, const char *name, table_algo algo) {
686 : DEBUG_DBFUNCTION("table_create %s", name);
687 :
688 240 : cloudsync_table_context *table = (cloudsync_table_context *)cloudsync_memory_zeroalloc(sizeof(cloudsync_table_context));
689 240 : if (!table) return NULL;
690 :
691 240 : table->context = data;
692 240 : table->algo = algo;
693 240 : table->name = cloudsync_string_dup_lowercase(name);
694 :
695 : // Detect schema from metadata table location. If metadata table doesn't
696 : // exist yet (during initialization), fall back to cloudsync_schema() which
697 : // returns the explicitly set schema or current_schema().
698 240 : table->schema = database_table_schema(name);
699 240 : if (!table->schema) {
700 240 : const char *fallback_schema = cloudsync_schema(data);
701 240 : if (fallback_schema) {
702 0 : table->schema = cloudsync_string_dup(fallback_schema);
703 0 : }
704 240 : }
705 :
706 240 : if (!table->name) {
707 0 : cloudsync_memory_free(table);
708 0 : return NULL;
709 : }
710 240 : table->meta_ref = database_build_meta_ref(table->schema, table->name);
711 240 : table->base_ref = database_build_base_ref(table->schema, table->name);
712 240 : table->enabled = true;
713 :
714 240 : return table;
715 240 : }
716 :
717 240 : void table_free (cloudsync_table_context *table) {
718 : DEBUG_DBFUNCTION("table_free %s", (table) ? (table->name) : "NULL");
719 240 : if (!table) return;
720 :
721 240 : if (table->ncols > 0) {
722 201 : if (table->col_name) {
723 1219 : for (int i=0; i<table->ncols; ++i) {
724 1018 : cloudsync_memory_free(table->col_name[i]);
725 1018 : }
726 201 : cloudsync_memory_free(table->col_name);
727 201 : }
728 201 : if (table->col_merge_stmt) {
729 1219 : for (int i=0; i<table->ncols; ++i) {
730 1018 : databasevm_finalize(table->col_merge_stmt[i]);
731 1018 : }
732 201 : cloudsync_memory_free(table->col_merge_stmt);
733 201 : }
734 201 : if (table->col_value_stmt) {
735 1219 : for (int i=0; i<table->ncols; ++i) {
736 1018 : databasevm_finalize(table->col_value_stmt[i]);
737 1018 : }
738 201 : cloudsync_memory_free(table->col_value_stmt);
739 201 : }
740 201 : if (table->col_id) {
741 201 : cloudsync_memory_free(table->col_id);
742 201 : }
743 201 : if (table->col_algo) {
744 201 : cloudsync_memory_free(table->col_algo);
745 201 : }
746 201 : if (table->col_delimiter) {
747 1219 : for (int i=0; i<table->ncols; ++i) {
748 1018 : if (table->col_delimiter[i]) cloudsync_memory_free(table->col_delimiter[i]);
749 1018 : }
750 201 : cloudsync_memory_free(table->col_delimiter);
751 201 : }
752 201 : }
753 :
754 240 : if (table->block_value_read_stmt) databasevm_finalize(table->block_value_read_stmt);
755 240 : if (table->block_value_write_stmt) databasevm_finalize(table->block_value_write_stmt);
756 240 : if (table->block_value_delete_stmt) databasevm_finalize(table->block_value_delete_stmt);
757 240 : if (table->block_list_stmt) databasevm_finalize(table->block_list_stmt);
758 240 : if (table->blocks_ref) cloudsync_memory_free(table->blocks_ref);
759 :
760 240 : if (table->name) cloudsync_memory_free(table->name);
761 240 : if (table->schema) cloudsync_memory_free(table->schema);
762 240 : if (table->meta_ref) cloudsync_memory_free(table->meta_ref);
763 240 : if (table->base_ref) cloudsync_memory_free(table->base_ref);
764 240 : if (table->pk_name) table_pknames_free(table->pk_name, table->npks);
765 240 : if (table->meta_pkexists_stmt) databasevm_finalize(table->meta_pkexists_stmt);
766 240 : if (table->meta_sentinel_update_stmt) databasevm_finalize(table->meta_sentinel_update_stmt);
767 240 : if (table->meta_sentinel_insert_stmt) databasevm_finalize(table->meta_sentinel_insert_stmt);
768 240 : if (table->meta_row_insert_update_stmt) databasevm_finalize(table->meta_row_insert_update_stmt);
769 240 : if (table->meta_row_drop_stmt) databasevm_finalize(table->meta_row_drop_stmt);
770 240 : if (table->meta_update_move_stmt) databasevm_finalize(table->meta_update_move_stmt);
771 240 : if (table->meta_local_cl_stmt) databasevm_finalize(table->meta_local_cl_stmt);
772 240 : if (table->meta_winner_clock_stmt) databasevm_finalize(table->meta_winner_clock_stmt);
773 240 : if (table->meta_merge_delete_drop) databasevm_finalize(table->meta_merge_delete_drop);
774 240 : if (table->meta_zero_clock_stmt) databasevm_finalize(table->meta_zero_clock_stmt);
775 240 : if (table->meta_col_version_stmt) databasevm_finalize(table->meta_col_version_stmt);
776 240 : if (table->meta_site_id_stmt) databasevm_finalize(table->meta_site_id_stmt);
777 :
778 240 : if (table->real_col_values_stmt) databasevm_finalize(table->real_col_values_stmt);
779 240 : if (table->real_merge_delete_stmt) databasevm_finalize(table->real_merge_delete_stmt);
780 240 : if (table->real_merge_sentinel_stmt) databasevm_finalize(table->real_merge_sentinel_stmt);
781 :
782 240 : cloudsync_memory_free(table);
783 240 : }
784 :
785 240 : int table_add_stmts (cloudsync_table_context *table, int ncols) {
786 240 : int rc = DBRES_OK;
787 240 : char *sql = NULL;
788 240 : cloudsync_context *data = table->context;
789 :
790 : // META TABLE statements
791 :
792 : // CREATE TABLE IF NOT EXISTS \"%w_cloudsync\" (pk BLOB NOT NULL, col_name TEXT NOT NULL, col_version INTEGER, db_version INTEGER, site_id INTEGER DEFAULT 0, seq INTEGER, PRIMARY KEY (pk, col_name));
793 :
794 : // precompile the pk exists statement
795 : // we do not need an index on the pk column because it is already covered by the fact that it is part of the prikeys
796 : // EXPLAIN QUERY PLAN reports: SEARCH table_name USING PRIMARY KEY (pk=?)
797 240 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_ROW_EXISTS_BY_PK, table->meta_ref);
798 240 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
799 : DEBUG_SQL("meta_pkexists_stmt: %s", sql);
800 :
801 240 : rc = databasevm_prepare(data, sql, (void **)&table->meta_pkexists_stmt, DBFLAG_PERSISTENT);
802 240 : cloudsync_memory_free(sql);
803 240 : if (rc != DBRES_OK) goto cleanup;
804 :
805 : // precompile the update local sentinel statement
806 240 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_UPDATE_COL_BUMP_VERSION, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
807 240 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
808 : DEBUG_SQL("meta_sentinel_update_stmt: %s", sql);
809 :
810 240 : rc = databasevm_prepare(data, sql, (void **)&table->meta_sentinel_update_stmt, DBFLAG_PERSISTENT);
811 240 : cloudsync_memory_free(sql);
812 240 : if (rc != DBRES_OK) goto cleanup;
813 :
814 : // precompile the insert local sentinel statement
815 240 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_UPSERT_COL_INIT_OR_BUMP_VERSION, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE, table->meta_ref, table->meta_ref, table->meta_ref);
816 240 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
817 : DEBUG_SQL("meta_sentinel_insert_stmt: %s", sql);
818 :
819 240 : rc = databasevm_prepare(data, sql, (void **)&table->meta_sentinel_insert_stmt, DBFLAG_PERSISTENT);
820 240 : cloudsync_memory_free(sql);
821 240 : if (rc != DBRES_OK) goto cleanup;
822 :
823 : // precompile the insert/update local row statement
824 240 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_UPSERT_RAW_COLVERSION, table->meta_ref, table->meta_ref);
825 240 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
826 : DEBUG_SQL("meta_row_insert_update_stmt: %s", sql);
827 :
828 240 : rc = databasevm_prepare(data, sql, (void **)&table->meta_row_insert_update_stmt, DBFLAG_PERSISTENT);
829 240 : cloudsync_memory_free(sql);
830 240 : if (rc != DBRES_OK) goto cleanup;
831 :
832 : // precompile the delete rows from meta
833 240 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_DELETE_PK_EXCEPT_COL, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
834 240 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
835 : DEBUG_SQL("meta_row_drop_stmt: %s", sql);
836 :
837 240 : rc = databasevm_prepare(data, sql, (void **)&table->meta_row_drop_stmt, DBFLAG_PERSISTENT);
838 240 : cloudsync_memory_free(sql);
839 240 : if (rc != DBRES_OK) goto cleanup;
840 :
841 : // precompile the update rows from meta when pk changes
842 : // see https://github.com/sqliteai/sqlite-sync/blob/main/docs/PriKey.md for more details
843 240 : sql = sql_build_rekey_pk_and_reset_version_except_col(data, table->name, CLOUDSYNC_TOMBSTONE_VALUE);
844 240 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
845 : DEBUG_SQL("meta_update_move_stmt: %s", sql);
846 :
847 240 : rc = databasevm_prepare(data, sql, (void **)&table->meta_update_move_stmt, DBFLAG_PERSISTENT);
848 240 : cloudsync_memory_free(sql);
849 240 : if (rc != DBRES_OK) goto cleanup;
850 :
851 : // local cl
852 240 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_GET_COL_VERSION_OR_ROW_EXISTS, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE, table->meta_ref);
853 240 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
854 : DEBUG_SQL("meta_local_cl_stmt: %s", sql);
855 :
856 240 : rc = databasevm_prepare(data, sql, (void **)&table->meta_local_cl_stmt, DBFLAG_PERSISTENT);
857 240 : cloudsync_memory_free(sql);
858 240 : if (rc != DBRES_OK) goto cleanup;
859 :
860 : // rowid of the last inserted/updated row in the meta table
861 240 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_INSERT_RETURN_CHANGE_ID, table->meta_ref);
862 240 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
863 : DEBUG_SQL("meta_winner_clock_stmt: %s", sql);
864 :
865 240 : rc = databasevm_prepare(data, sql, (void **)&table->meta_winner_clock_stmt, DBFLAG_PERSISTENT);
866 240 : cloudsync_memory_free(sql);
867 240 : if (rc != DBRES_OK) goto cleanup;
868 :
869 240 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_DELETE_PK_EXCEPT_COL, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
870 240 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
871 : DEBUG_SQL("meta_merge_delete_drop: %s", sql);
872 :
873 240 : rc = databasevm_prepare(data, sql, (void **)&table->meta_merge_delete_drop, DBFLAG_PERSISTENT);
874 240 : cloudsync_memory_free(sql);
875 240 : if (rc != DBRES_OK) goto cleanup;
876 :
877 : // zero clock
878 240 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_TOMBSTONE_PK_EXCEPT_COL, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
879 240 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
880 : DEBUG_SQL("meta_zero_clock_stmt: %s", sql);
881 :
882 240 : rc = databasevm_prepare(data, sql, (void **)&table->meta_zero_clock_stmt, DBFLAG_PERSISTENT);
883 240 : cloudsync_memory_free(sql);
884 240 : if (rc != DBRES_OK) goto cleanup;
885 :
886 : // col_version
887 240 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_COL_VERSION_BY_PK_COL, table->meta_ref);
888 240 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
889 : DEBUG_SQL("meta_col_version_stmt: %s", sql);
890 :
891 240 : rc = databasevm_prepare(data, sql, (void **)&table->meta_col_version_stmt, DBFLAG_PERSISTENT);
892 240 : cloudsync_memory_free(sql);
893 240 : if (rc != DBRES_OK) goto cleanup;
894 :
895 : // site_id
896 240 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_SITE_ID_BY_PK_COL, table->meta_ref);
897 240 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
898 : DEBUG_SQL("meta_site_id_stmt: %s", sql);
899 :
900 240 : rc = databasevm_prepare(data, sql, (void **)&table->meta_site_id_stmt, DBFLAG_PERSISTENT);
901 240 : cloudsync_memory_free(sql);
902 240 : if (rc != DBRES_OK) goto cleanup;
903 :
904 : // REAL TABLE statements
905 :
906 : // precompile the get column value statement
907 240 : if (ncols > 0) {
908 201 : sql = sql_build_select_nonpk_by_pk(data, table->name, table->schema);
909 201 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
910 : DEBUG_SQL("real_col_values_stmt: %s", sql);
911 :
912 201 : rc = databasevm_prepare(data, sql, (void **)&table->real_col_values_stmt, DBFLAG_PERSISTENT);
913 201 : cloudsync_memory_free(sql);
914 201 : if (rc != DBRES_OK) goto cleanup;
915 201 : }
916 :
917 240 : sql = table_build_mergedelete_sql(table);
918 240 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
919 : DEBUG_SQL("real_merge_delete: %s", sql);
920 :
921 240 : rc = databasevm_prepare(data, sql, (void **)&table->real_merge_delete_stmt, DBFLAG_PERSISTENT);
922 240 : cloudsync_memory_free(sql);
923 240 : if (rc != DBRES_OK) goto cleanup;
924 :
925 240 : sql = table_build_mergeinsert_sql(table, NULL);
926 240 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
927 : DEBUG_SQL("real_merge_sentinel: %s", sql);
928 :
929 240 : rc = databasevm_prepare(data, sql, (void **)&table->real_merge_sentinel_stmt, DBFLAG_PERSISTENT);
930 240 : cloudsync_memory_free(sql);
931 240 : if (rc != DBRES_OK) goto cleanup;
932 :
933 : cleanup:
934 240 : if (rc != DBRES_OK) DEBUG_ALWAYS("table_add_stmts error: %d %s\n", rc, database_errmsg(data));
935 240 : return rc;
936 : }
937 :
938 183416 : cloudsync_table_context *table_lookup (cloudsync_context *data, const char *table_name) {
939 : DEBUG_DBFUNCTION("table_lookup %s", table_name);
940 :
941 183416 : if (table_name) {
942 184448 : for (int i=0; i<data->tables_count; ++i) {
943 183955 : if ((strcasecmp(data->tables[i]->name, table_name) == 0)) return data->tables[i];
944 1032 : }
945 493 : }
946 :
947 493 : return NULL;
948 183416 : }
949 :
950 168954 : void *table_column_lookup (cloudsync_table_context *table, const char *col_name, bool is_merge, int *index) {
951 : DEBUG_DBFUNCTION("table_column_lookup %s", col_name);
952 :
953 314989 : for (int i=0; i<table->ncols; ++i) {
954 314989 : if (strcasecmp(table->col_name[i], col_name) == 0) {
955 168954 : if (index) *index = i;
956 168954 : return (is_merge) ? table->col_merge_stmt[i] : table->col_value_stmt[i];
957 : }
958 146035 : }
959 :
960 0 : if (index) *index = -1;
961 0 : return NULL;
962 168954 : }
963 :
964 240 : int table_remove (cloudsync_context *data, cloudsync_table_context *table) {
965 240 : const char *table_name = table->name;
966 : DEBUG_DBFUNCTION("table_remove %s", table_name);
967 :
968 282 : for (int i = 0; i < data->tables_count; ++i) {
969 282 : cloudsync_table_context *t = data->tables[i];
970 :
971 : // pointer compare is fastest but fallback to strcasecmp if not same pointer
972 282 : if ((t == table) || ((strcasecmp(t->name, table_name) == 0))) {
973 240 : int last = data->tables_count - 1;
974 240 : data->tables[i] = data->tables[last]; // move last into the hole (keeps array dense)
975 240 : data->tables[last] = NULL; // NULLify tail (as an extra security measure)
976 240 : data->tables_count--;
977 240 : return data->tables_count;
978 : }
979 42 : }
980 :
981 0 : return -1;
982 240 : }
983 :
984 1018 : int table_add_to_context_cb (void *xdata, int ncols, char **values, char **names) {
985 1018 : cloudsync_table_context *table = (cloudsync_table_context *)xdata;
986 1018 : cloudsync_context *data = table->context;
987 :
988 1018 : int index = table->ncols;
989 2036 : for (int i=0; i<ncols; i+=2) {
990 1018 : const char *name = values[i];
991 1018 : int cid = (int)strtol(values[i+1], NULL, 0);
992 :
993 1018 : table->col_id[index] = cid;
994 1018 : table->col_name[index] = cloudsync_string_dup_lowercase(name);
995 1018 : if (!table->col_name[index]) goto error;
996 :
997 1018 : char *sql = table_build_mergeinsert_sql(table, name);
998 1018 : if (!sql) goto error;
999 : DEBUG_SQL("col_merge_stmt[%d]: %s", index, sql);
1000 :
1001 1018 : int rc = databasevm_prepare(data, sql, (void **)&table->col_merge_stmt[index], DBFLAG_PERSISTENT);
1002 1018 : cloudsync_memory_free(sql);
1003 1018 : if (rc != DBRES_OK) goto error;
1004 1018 : if (!table->col_merge_stmt[index]) goto error;
1005 :
1006 1018 : sql = table_build_value_sql(table, name);
1007 1018 : if (!sql) goto error;
1008 : DEBUG_SQL("col_value_stmt[%d]: %s", index, sql);
1009 :
1010 1018 : rc = databasevm_prepare(data, sql, (void **)&table->col_value_stmt[index], DBFLAG_PERSISTENT);
1011 1018 : cloudsync_memory_free(sql);
1012 1018 : if (rc != DBRES_OK) goto error;
1013 1018 : if (!table->col_value_stmt[index]) goto error;
1014 1018 : }
1015 1018 : table->ncols += 1;
1016 :
1017 1018 : return 0;
1018 :
1019 : error:
1020 : // clean up partially-initialized entry at index
1021 0 : if (table->col_name[index]) {cloudsync_memory_free(table->col_name[index]); table->col_name[index] = NULL;}
1022 0 : if (table->col_merge_stmt[index]) {databasevm_finalize(table->col_merge_stmt[index]); table->col_merge_stmt[index] = NULL;}
1023 0 : if (table->col_value_stmt[index]) {databasevm_finalize(table->col_value_stmt[index]); table->col_value_stmt[index] = NULL;}
1024 0 : return 1;
1025 1018 : }
1026 :
1027 240 : bool table_ensure_capacity (cloudsync_context *data) {
1028 240 : if (data->tables_count < data->tables_cap) return true;
1029 :
1030 0 : int new_cap = data->tables_cap ? data->tables_cap * 2 : CLOUDSYNC_INIT_NTABLES;
1031 0 : size_t bytes = (size_t)new_cap * sizeof(*data->tables);
1032 0 : void *p = cloudsync_memory_realloc(data->tables, bytes);
1033 0 : if (!p) return false;
1034 :
1035 0 : data->tables = (cloudsync_table_context **)p;
1036 0 : data->tables_cap = new_cap;
1037 0 : return true;
1038 240 : }
1039 :
1040 244 : bool table_add_to_context (cloudsync_context *data, table_algo algo, const char *table_name) {
1041 : DEBUG_DBFUNCTION("cloudsync_context_add_table %s", table_name);
1042 :
1043 : // Check if table already initialized in this connection's context.
1044 : // Note: This prevents same-connection duplicate initialization.
1045 : // SQLite clients cannot distinguish schemas, so having 'public.users'
1046 : // and 'auth.users' would cause sync ambiguity. Users should avoid
1047 : // initializing tables with the same name in different schemas.
1048 : // If two concurrent connections initialize tables with the same name
1049 : // in different schemas, the behavior is undefined.
1050 244 : cloudsync_table_context *table = table_lookup(data, table_name);
1051 244 : if (table) return true;
1052 :
1053 : // check for space availability
1054 240 : if (!table_ensure_capacity(data)) return false;
1055 :
1056 : // setup a new table
1057 240 : table = table_create(data, table_name, algo);
1058 240 : if (!table) return false;
1059 :
1060 : // fill remaining metadata in the table
1061 240 : int count = database_count_pk(data, table_name, false, table->schema);
1062 240 : if (count < 0) {cloudsync_set_dberror(data); goto abort_add_table;}
1063 240 : table->npks = count;
1064 240 : if (table->npks == 0) {
1065 : #if CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
1066 0 : goto abort_add_table;
1067 : #else
1068 : table->rowid_only = true;
1069 : table->npks = 1; // rowid
1070 : #endif
1071 : }
1072 :
1073 240 : int ncols = database_count_nonpk(data, table_name, table->schema);
1074 240 : if (ncols < 0) {cloudsync_set_dberror(data); goto abort_add_table;}
1075 240 : int rc = table_add_stmts(table, ncols);
1076 240 : if (rc != DBRES_OK) goto abort_add_table;
1077 :
1078 : // a table with only pk(s) is totally legal
1079 240 : if (ncols > 0) {
1080 201 : table->col_name = (char **)cloudsync_memory_alloc((uint64_t)(sizeof(char *) * ncols));
1081 201 : if (!table->col_name) goto abort_add_table;
1082 :
1083 201 : table->col_id = (int *)cloudsync_memory_alloc((uint64_t)(sizeof(int) * ncols));
1084 201 : if (!table->col_id) goto abort_add_table;
1085 :
1086 201 : table->col_merge_stmt = (dbvm_t **)cloudsync_memory_alloc((uint64_t)(sizeof(void *) * ncols));
1087 201 : if (!table->col_merge_stmt) goto abort_add_table;
1088 :
1089 201 : table->col_value_stmt = (dbvm_t **)cloudsync_memory_alloc((uint64_t)(sizeof(void *) * ncols));
1090 201 : if (!table->col_value_stmt) goto abort_add_table;
1091 :
1092 201 : table->col_algo = (col_algo_t *)cloudsync_memory_zeroalloc((uint64_t)(sizeof(col_algo_t) * ncols));
1093 201 : if (!table->col_algo) goto abort_add_table;
1094 :
1095 201 : table->col_delimiter = (char **)cloudsync_memory_zeroalloc((uint64_t)(sizeof(char *) * ncols));
1096 201 : if (!table->col_delimiter) goto abort_add_table;
1097 :
1098 : // Pass empty string when schema is NULL; SQL will fall back to current_schema()
1099 201 : const char *schema = table->schema ? table->schema : "";
1100 402 : char *sql = cloudsync_memory_mprintf(SQL_PRAGMA_TABLEINFO_LIST_NONPK_NAME_CID,
1101 201 : table_name, schema, table_name, schema);
1102 201 : if (!sql) goto abort_add_table;
1103 201 : rc = database_exec_callback(data, sql, table_add_to_context_cb, (void *)table);
1104 201 : cloudsync_memory_free(sql);
1105 201 : if (rc == DBRES_ABORT) goto abort_add_table;
1106 201 : }
1107 :
1108 : // append newly created table
1109 240 : data->tables[data->tables_count++] = table;
1110 240 : return true;
1111 :
1112 : abort_add_table:
1113 0 : table_free(table);
1114 0 : return false;
1115 244 : }
1116 :
1117 0 : dbvm_t *cloudsync_colvalue_stmt (cloudsync_context *data, const char *tbl_name, bool *persistent) {
1118 0 : dbvm_t *vm = NULL;
1119 0 : *persistent = false;
1120 :
1121 0 : cloudsync_table_context *table = table_lookup(data, tbl_name);
1122 0 : if (table) {
1123 0 : char *col_name = NULL;
1124 0 : if (table->ncols > 0) {
1125 0 : col_name = table->col_name[0];
1126 : // retrieve col_value precompiled statement
1127 0 : vm = table_column_lookup(table, col_name, false, NULL);
1128 0 : *persistent = true;
1129 0 : } else {
1130 0 : char *sql = table_build_value_sql(table, "*");
1131 0 : databasevm_prepare(data, sql, (void **)&vm, 0);
1132 0 : cloudsync_memory_free(sql);
1133 0 : *persistent = false;
1134 : }
1135 0 : }
1136 :
1137 0 : return vm;
1138 : }
1139 :
1140 6806 : bool table_enabled (cloudsync_table_context *table) {
1141 6806 : return table->enabled;
1142 : }
1143 :
1144 6 : void table_set_enabled (cloudsync_table_context *table, bool value) {
1145 6 : table->enabled = value;
1146 6 : }
1147 :
1148 19484 : int table_count_cols (cloudsync_table_context *table) {
1149 19484 : return table->ncols;
1150 : }
1151 :
1152 6622 : int table_count_pks (cloudsync_table_context *table) {
1153 6622 : return table->npks;
1154 : }
1155 :
1156 32650 : const char *table_colname (cloudsync_table_context *table, int index) {
1157 32650 : return table->col_name[index];
1158 : }
1159 :
1160 3513 : bool table_pk_exists (cloudsync_table_context *table, const char *value, size_t len) {
1161 : // check if a row with the same primary key already exists
1162 : // if so, this means the row might have been previously deleted (sentinel)
1163 3513 : return (dbvm_count(table->meta_pkexists_stmt, value, len, DBTYPE_BLOB) > 0);
1164 : }
1165 :
1166 0 : char **table_pknames (cloudsync_table_context *table) {
1167 0 : return table->pk_name;
1168 : }
1169 :
1170 23 : void table_set_pknames (cloudsync_table_context *table, char **pknames) {
1171 23 : table_pknames_free(table->pk_name, table->npks);
1172 23 : table->pk_name = pknames;
1173 23 : }
1174 :
1175 49154 : bool table_algo_isgos (cloudsync_table_context *table) {
1176 49154 : return (table->algo == table_algo_crdt_gos);
1177 : }
1178 :
1179 0 : const char *table_schema (cloudsync_table_context *table) {
1180 0 : return table->schema;
1181 : }
1182 :
1183 : // MARK: - Merge Insert -
1184 :
1185 46004 : int64_t merge_get_local_cl (cloudsync_table_context *table, const char *pk, int pklen) {
1186 46004 : dbvm_t *vm = table->meta_local_cl_stmt;
1187 46004 : int64_t result = -1;
1188 :
1189 46004 : int rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
1190 46004 : if (rc != DBRES_OK) goto cleanup;
1191 :
1192 46004 : rc = databasevm_bind_blob(vm, 2, (const void *)pk, pklen);
1193 46004 : if (rc != DBRES_OK) goto cleanup;
1194 :
1195 46004 : rc = databasevm_step(vm);
1196 46004 : if (rc == DBRES_ROW) result = database_column_int(vm, 0);
1197 0 : else if (rc == DBRES_DONE) result = 0;
1198 :
1199 : cleanup:
1200 46004 : if (result == -1) cloudsync_set_dberror(table->context);
1201 46004 : dbvm_reset(vm);
1202 46004 : return result;
1203 : }
1204 :
1205 44985 : int merge_get_col_version (cloudsync_table_context *table, const char *col_name, const char *pk, int pklen, int64_t *version) {
1206 44985 : dbvm_t *vm = table->meta_col_version_stmt;
1207 :
1208 44985 : int rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
1209 44985 : if (rc != DBRES_OK) goto cleanup;
1210 :
1211 44985 : rc = databasevm_bind_text(vm, 2, col_name, -1);
1212 44985 : if (rc != DBRES_OK) goto cleanup;
1213 :
1214 44985 : rc = databasevm_step(vm);
1215 69889 : if (rc == DBRES_ROW) {
1216 24904 : *version = database_column_int(vm, 0);
1217 24904 : rc = DBRES_OK;
1218 24904 : }
1219 :
1220 : cleanup:
1221 44985 : if ((rc != DBRES_OK) && (rc != DBRES_DONE)) cloudsync_set_dberror(table->context);
1222 44985 : dbvm_reset(vm);
1223 44985 : return rc;
1224 : }
1225 :
1226 25072 : int merge_set_winner_clock (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pk_len, const char *colname, int64_t col_version, int64_t db_version, const char *site_id, int site_len, int64_t seq, int64_t *rowid) {
1227 :
1228 : // get/set site_id
1229 25072 : dbvm_t *vm = data->getset_siteid_stmt;
1230 25072 : int rc = databasevm_bind_blob(vm, 1, (const void *)site_id, site_len);
1231 25072 : if (rc != DBRES_OK) goto cleanup_merge;
1232 :
1233 25072 : rc = databasevm_step(vm);
1234 25072 : if (rc != DBRES_ROW) goto cleanup_merge;
1235 :
1236 25072 : int64_t ord = database_column_int(vm, 0);
1237 25072 : dbvm_reset(vm);
1238 :
1239 25072 : vm = table->meta_winner_clock_stmt;
1240 25072 : rc = databasevm_bind_blob(vm, 1, (const void *)pk, pk_len);
1241 25072 : if (rc != DBRES_OK) goto cleanup_merge;
1242 :
1243 25072 : rc = databasevm_bind_text(vm, 2, (colname) ? colname : CLOUDSYNC_TOMBSTONE_VALUE, -1);
1244 25072 : if (rc != DBRES_OK) goto cleanup_merge;
1245 :
1246 25072 : rc = databasevm_bind_int(vm, 3, col_version);
1247 25072 : if (rc != DBRES_OK) goto cleanup_merge;
1248 :
1249 25072 : rc = databasevm_bind_int(vm, 4, db_version);
1250 25072 : if (rc != DBRES_OK) goto cleanup_merge;
1251 :
1252 25072 : rc = databasevm_bind_int(vm, 5, seq);
1253 25072 : if (rc != DBRES_OK) goto cleanup_merge;
1254 :
1255 25072 : rc = databasevm_bind_int(vm, 6, ord);
1256 25072 : if (rc != DBRES_OK) goto cleanup_merge;
1257 :
1258 25072 : rc = databasevm_step(vm);
1259 50144 : if (rc == DBRES_ROW) {
1260 25072 : *rowid = database_column_int(vm, 0);
1261 25072 : rc = DBRES_OK;
1262 25072 : }
1263 :
1264 : cleanup_merge:
1265 25072 : if (rc != DBRES_OK) cloudsync_set_dberror(data);
1266 25072 : dbvm_reset(vm);
1267 25072 : return rc;
1268 : }
1269 :
1270 : // MARK: - Deferred column-batch merge functions -
1271 :
1272 21278 : static int merge_pending_add (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, const char *col_name, dbvalue_t *col_value, int64_t col_version, int64_t db_version, const char *site_id, int site_len, int64_t seq) {
1273 21278 : merge_pending_batch *batch = data->pending_batch;
1274 :
1275 : // Store table and PK on first entry
1276 21278 : if (batch->table == NULL) {
1277 7658 : batch->table = table;
1278 7658 : batch->pk = (char *)cloudsync_memory_alloc(pklen);
1279 7658 : if (!batch->pk) return cloudsync_set_error(data, "merge_pending_add: out of memory for pk", DBRES_NOMEM);
1280 7658 : memcpy(batch->pk, pk, pklen);
1281 7658 : batch->pk_len = pklen;
1282 7658 : }
1283 :
1284 : // Ensure capacity
1285 21278 : if (batch->count >= batch->capacity) {
1286 476 : int new_cap = batch->capacity ? batch->capacity * 2 : 8;
1287 476 : merge_pending_entry *new_entries = (merge_pending_entry *)cloudsync_memory_realloc(batch->entries, new_cap * sizeof(merge_pending_entry));
1288 476 : if (!new_entries) return cloudsync_set_error(data, "merge_pending_add: out of memory for entries", DBRES_NOMEM);
1289 476 : batch->entries = new_entries;
1290 476 : batch->capacity = new_cap;
1291 476 : }
1292 :
1293 : // Resolve col_name to a stable pointer from the table context
1294 : // (the incoming col_name may point to VM-owned memory that gets freed on reset)
1295 21278 : int col_idx = -1;
1296 21278 : table_column_lookup(table, col_name, true, &col_idx);
1297 21278 : const char *stable_col_name = (col_idx >= 0) ? table_colname(table, col_idx) : NULL;
1298 21278 : if (!stable_col_name) return cloudsync_set_error(data, "merge_pending_add: column not found in table context", DBRES_ERROR);
1299 :
1300 21278 : merge_pending_entry *e = &batch->entries[batch->count];
1301 21278 : e->col_name = stable_col_name;
1302 21278 : e->col_value = col_value ? (dbvalue_t *)database_value_dup(col_value) : NULL;
1303 21278 : e->col_version = col_version;
1304 21278 : e->db_version = db_version;
1305 21278 : e->site_id_len = (site_len <= (int)sizeof(e->site_id)) ? site_len : (int)sizeof(e->site_id);
1306 21278 : memcpy(e->site_id, site_id, e->site_id_len);
1307 21278 : e->seq = seq;
1308 :
1309 21278 : batch->count++;
1310 21278 : return DBRES_OK;
1311 21278 : }
1312 :
1313 18801 : static void merge_pending_free_entries (merge_pending_batch *batch) {
1314 18801 : if (batch->entries) {
1315 35474 : for (int i = 0; i < batch->count; i++) {
1316 21278 : if (batch->entries[i].col_value) {
1317 21278 : database_value_free(batch->entries[i].col_value);
1318 21278 : batch->entries[i].col_value = NULL;
1319 21278 : }
1320 21278 : }
1321 14196 : }
1322 18801 : if (batch->pk) {
1323 7688 : cloudsync_memory_free(batch->pk);
1324 7688 : batch->pk = NULL;
1325 7688 : }
1326 18801 : batch->table = NULL;
1327 18801 : batch->pk_len = 0;
1328 18801 : batch->cl = 0;
1329 18801 : batch->sentinel_pending = false;
1330 18801 : batch->row_exists = false;
1331 18801 : batch->count = 0;
1332 18801 : }
1333 :
1334 18801 : static int merge_flush_pending (cloudsync_context *data) {
1335 18801 : merge_pending_batch *batch = data->pending_batch;
1336 18801 : if (!batch) return DBRES_OK;
1337 :
1338 18801 : int rc = DBRES_OK;
1339 18801 : bool flush_savepoint = false;
1340 :
1341 : // Nothing to write — handle sentinel-only case or skip
1342 18801 : if (batch->count == 0 && !(batch->sentinel_pending && batch->table)) {
1343 11113 : goto cleanup;
1344 : }
1345 :
1346 : // Wrap database operations in a savepoint so that on failure (e.g. RLS
1347 : // denial) the rollback properly releases all executor resources (open
1348 : // relations, snapshots, plan cache) acquired during the failed statement.
1349 7688 : flush_savepoint = (database_begin_savepoint(data, "merge_flush") == DBRES_OK);
1350 :
1351 7688 : if (batch->count == 0) {
1352 : // Sentinel with no winning columns (PK-only row)
1353 30 : dbvm_t *vm = batch->table->real_merge_sentinel_stmt;
1354 30 : rc = pk_decode_prikey(batch->pk, (size_t)batch->pk_len, pk_decode_bind_callback, vm);
1355 30 : if (rc < 0) {
1356 0 : cloudsync_set_dberror(data);
1357 0 : dbvm_reset(vm);
1358 0 : goto cleanup;
1359 : }
1360 30 : SYNCBIT_SET(data);
1361 30 : rc = databasevm_step(vm);
1362 30 : dbvm_reset(vm);
1363 30 : SYNCBIT_RESET(data);
1364 30 : if (rc == DBRES_DONE) rc = DBRES_OK;
1365 30 : if (rc != DBRES_OK) {
1366 0 : cloudsync_set_dberror(data);
1367 0 : goto cleanup;
1368 : }
1369 30 : goto cleanup;
1370 : }
1371 :
1372 : // Check if cached prepared statement can be reused
1373 7658 : cloudsync_table_context *table = batch->table;
1374 7658 : dbvm_t *vm = NULL;
1375 7658 : bool cache_hit = false;
1376 :
1377 14543 : if (batch->cached_vm &&
1378 7182 : batch->cached_row_exists == batch->row_exists &&
1379 6885 : batch->cached_col_count == batch->count) {
1380 6864 : cache_hit = true;
1381 26550 : for (int i = 0; i < batch->count; i++) {
1382 19711 : if (batch->cached_col_names[i] != batch->entries[i].col_name) {
1383 25 : cache_hit = false;
1384 25 : break;
1385 : }
1386 19686 : }
1387 6864 : }
1388 :
1389 7658 : if (cache_hit) {
1390 6839 : vm = batch->cached_vm;
1391 6839 : dbvm_reset(vm);
1392 6839 : } else {
1393 : // Invalidate old cache
1394 819 : if (batch->cached_vm) {
1395 343 : databasevm_finalize(batch->cached_vm);
1396 343 : batch->cached_vm = NULL;
1397 343 : }
1398 :
1399 : // Build multi-column SQL
1400 819 : const char **colnames = (const char **)cloudsync_memory_alloc(batch->count * sizeof(const char *));
1401 819 : if (!colnames) {
1402 0 : rc = cloudsync_set_error(data, "merge_flush_pending: out of memory", DBRES_NOMEM);
1403 0 : goto cleanup;
1404 : }
1405 2411 : for (int i = 0; i < batch->count; i++) {
1406 1592 : colnames[i] = batch->entries[i].col_name;
1407 1592 : }
1408 :
1409 819 : char *sql = batch->row_exists
1410 406 : ? sql_build_update_pk_and_multi_cols(data, table->name, colnames, batch->count, table->schema)
1411 413 : : sql_build_upsert_pk_and_multi_cols(data, table->name, colnames, batch->count, table->schema);
1412 819 : cloudsync_memory_free(colnames);
1413 :
1414 819 : if (!sql) {
1415 0 : rc = cloudsync_set_error(data, "merge_flush_pending: unable to build multi-column upsert SQL", DBRES_ERROR);
1416 0 : goto cleanup;
1417 : }
1418 :
1419 819 : rc = databasevm_prepare(data, sql, &vm, 0);
1420 819 : cloudsync_memory_free(sql);
1421 819 : if (rc != DBRES_OK) {
1422 0 : rc = cloudsync_set_error(data, "merge_flush_pending: unable to prepare statement", rc);
1423 0 : goto cleanup;
1424 : }
1425 :
1426 : // Update cache
1427 819 : batch->cached_vm = vm;
1428 819 : batch->cached_row_exists = batch->row_exists;
1429 819 : batch->cached_col_count = batch->count;
1430 : // Reallocate cached_col_names if needed
1431 819 : if (batch->cached_col_count > 0) {
1432 819 : const char **new_names = (const char **)cloudsync_memory_realloc(
1433 819 : batch->cached_col_names, batch->count * sizeof(const char *));
1434 819 : if (new_names) {
1435 2411 : for (int i = 0; i < batch->count; i++) {
1436 1592 : new_names[i] = batch->entries[i].col_name;
1437 1592 : }
1438 819 : batch->cached_col_names = new_names;
1439 819 : }
1440 819 : }
1441 : }
1442 :
1443 : // Bind PKs (positions 1..npks)
1444 7658 : int npks = pk_decode_prikey(batch->pk, (size_t)batch->pk_len, pk_decode_bind_callback, vm);
1445 7658 : if (npks < 0) {
1446 0 : cloudsync_set_dberror(data);
1447 0 : dbvm_reset(vm);
1448 0 : rc = DBRES_ERROR;
1449 0 : goto cleanup;
1450 : }
1451 :
1452 : // Bind column values (positions npks+1..npks+count)
1453 28936 : for (int i = 0; i < batch->count; i++) {
1454 21278 : merge_pending_entry *e = &batch->entries[i];
1455 21278 : int bind_idx = npks + 1 + i;
1456 21278 : if (e->col_value) {
1457 21278 : rc = databasevm_bind_value(vm, bind_idx, e->col_value);
1458 21278 : } else {
1459 0 : rc = databasevm_bind_null(vm, bind_idx);
1460 : }
1461 21278 : if (rc != DBRES_OK) {
1462 0 : cloudsync_set_dberror(data);
1463 0 : dbvm_reset(vm);
1464 0 : goto cleanup;
1465 : }
1466 21278 : }
1467 :
1468 : // Execute with SYNCBIT and GOS handling
1469 7658 : if (table->algo == table_algo_crdt_gos) table->enabled = 0;
1470 7658 : SYNCBIT_SET(data);
1471 7658 : rc = databasevm_step(vm);
1472 7658 : dbvm_reset(vm);
1473 7658 : SYNCBIT_RESET(data);
1474 7658 : if (table->algo == table_algo_crdt_gos) table->enabled = 1;
1475 :
1476 7658 : if (rc != DBRES_DONE) {
1477 3 : cloudsync_set_dberror(data);
1478 3 : goto cleanup;
1479 : }
1480 7655 : rc = DBRES_OK;
1481 :
1482 : // Call merge_set_winner_clock for each buffered entry
1483 7655 : int64_t rowid = 0;
1484 28925 : for (int i = 0; i < batch->count; i++) {
1485 21270 : merge_pending_entry *e = &batch->entries[i];
1486 42540 : int clock_rc = merge_set_winner_clock(data, table, batch->pk, batch->pk_len,
1487 21270 : e->col_name, e->col_version, e->db_version,
1488 21270 : (const char *)e->site_id, e->site_id_len,
1489 21270 : e->seq, &rowid);
1490 21270 : if (clock_rc != DBRES_OK) {
1491 0 : rc = clock_rc;
1492 0 : goto cleanup;
1493 : }
1494 28925 : }
1495 :
1496 : cleanup:
1497 18801 : merge_pending_free_entries(batch);
1498 18801 : if (flush_savepoint) {
1499 7688 : if (rc == DBRES_OK) database_commit_savepoint(data, "merge_flush");
1500 3 : else database_rollback_savepoint(data, "merge_flush");
1501 7688 : }
1502 18801 : return rc;
1503 18801 : }
1504 :
1505 3167 : int merge_insert_col (cloudsync_context *data, cloudsync_table_context *table, const void *pk, int pklen, const char *col_name, dbvalue_t *col_value, int64_t col_version, int64_t db_version, const char *site_id, int site_len, int64_t seq, int64_t *rowid) {
1506 : int index;
1507 3167 : dbvm_t *vm = table_column_lookup(table, col_name, true, &index);
1508 3167 : if (vm == NULL) return cloudsync_set_error(data, "Unable to retrieve column merge precompiled statement in merge_insert_col", DBRES_MISUSE);
1509 :
1510 : // INSERT INTO table (pk1, pk2, col_name) VALUES (?, ?, ?) ON CONFLICT DO UPDATE SET col_name=?;"
1511 :
1512 : // bind primary key(s)
1513 3167 : int rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, vm);
1514 3167 : if (rc < 0) {
1515 0 : cloudsync_set_dberror(data);
1516 0 : dbvm_reset(vm);
1517 0 : return rc;
1518 : }
1519 :
1520 : // bind value (always bind all expected parameters for correct prepared statement handling)
1521 3167 : if (col_value) {
1522 3167 : rc = databasevm_bind_value(vm, table->npks+1, col_value);
1523 3167 : if (rc == DBRES_OK) rc = databasevm_bind_value(vm, table->npks+2, col_value);
1524 3167 : } else {
1525 0 : rc = databasevm_bind_null(vm, table->npks+1);
1526 0 : if (rc == DBRES_OK) rc = databasevm_bind_null(vm, table->npks+2);
1527 : }
1528 3167 : if (rc != DBRES_OK) {
1529 0 : cloudsync_set_dberror(data);
1530 0 : dbvm_reset(vm);
1531 0 : return rc;
1532 : }
1533 :
1534 : // perform real operation and disable triggers
1535 :
1536 : // in case of GOS we reused the table->col_merge_stmt statement
1537 : // which looks like: INSERT INTO table (pk1, pk2, col_name) VALUES (?, ?, ?) ON CONFLICT DO UPDATE SET col_name=?;"
1538 : // but the UPDATE in the CONFLICT statement would return SQLITE_CONSTRAINT because the trigger raises the error
1539 : // the trick is to disable that trigger before executing the statement
1540 3167 : if (table->algo == table_algo_crdt_gos) table->enabled = 0;
1541 3167 : SYNCBIT_SET(data);
1542 3167 : rc = databasevm_step(vm);
1543 : DEBUG_MERGE("merge_insert(%02x%02x): %s (%d)", data->site_id[UUID_LEN-2], data->site_id[UUID_LEN-1], databasevm_sql(vm), rc);
1544 3167 : dbvm_reset(vm);
1545 3167 : SYNCBIT_RESET(data);
1546 3167 : if (table->algo == table_algo_crdt_gos) table->enabled = 1;
1547 :
1548 3167 : if (rc != DBRES_DONE) {
1549 0 : cloudsync_set_dberror(data);
1550 0 : return rc;
1551 : }
1552 :
1553 3167 : return merge_set_winner_clock(data, table, pk, pklen, col_name, col_version, db_version, site_id, site_len, seq, rowid);
1554 3167 : }
1555 :
1556 163 : int merge_delete (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, const char *colname, int64_t cl, int64_t db_version, const char *site_id, int site_len, int64_t seq, int64_t *rowid) {
1557 163 : int rc = DBRES_OK;
1558 :
1559 : // reset return value
1560 163 : *rowid = 0;
1561 :
1562 : // bind pk
1563 163 : dbvm_t *vm = table->real_merge_delete_stmt;
1564 163 : rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, vm);
1565 163 : if (rc < 0) {
1566 0 : rc = cloudsync_set_dberror(data);
1567 0 : dbvm_reset(vm);
1568 0 : return rc;
1569 : }
1570 :
1571 : // perform real operation and disable triggers
1572 163 : SYNCBIT_SET(data);
1573 163 : rc = databasevm_step(vm);
1574 : DEBUG_MERGE("merge_delete(%02x%02x): %s (%d)", data->site_id[UUID_LEN-2], data->site_id[UUID_LEN-1], databasevm_sql(vm), rc);
1575 163 : dbvm_reset(vm);
1576 163 : SYNCBIT_RESET(data);
1577 163 : if (rc == DBRES_DONE) rc = DBRES_OK;
1578 163 : if (rc != DBRES_OK) {
1579 0 : cloudsync_set_dberror(data);
1580 0 : return rc;
1581 : }
1582 :
1583 163 : rc = merge_set_winner_clock(data, table, pk, pklen, colname, cl, db_version, site_id, site_len, seq, rowid);
1584 163 : if (rc != DBRES_OK) return rc;
1585 :
1586 : // drop clocks _after_ setting the winner clock so we don't lose track of the max db_version!!
1587 : // this must never come before `set_winner_clock`
1588 163 : vm = table->meta_merge_delete_drop;
1589 163 : rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
1590 163 : if (rc == DBRES_OK) rc = databasevm_step(vm);
1591 163 : dbvm_reset(vm);
1592 :
1593 163 : if (rc == DBRES_DONE) rc = DBRES_OK;
1594 163 : if (rc != DBRES_OK) cloudsync_set_dberror(data);
1595 163 : return rc;
1596 163 : }
1597 :
1598 60 : int merge_zeroclock_on_resurrect(cloudsync_table_context *table, int64_t db_version, const char *pk, int pklen) {
1599 60 : dbvm_t *vm = table->meta_zero_clock_stmt;
1600 :
1601 60 : int rc = databasevm_bind_int(vm, 1, db_version);
1602 60 : if (rc != DBRES_OK) goto cleanup;
1603 :
1604 60 : rc = databasevm_bind_blob(vm, 2, (const void *)pk, pklen);
1605 60 : if (rc != DBRES_OK) goto cleanup;
1606 :
1607 60 : rc = databasevm_step(vm);
1608 60 : if (rc == DBRES_DONE) rc = DBRES_OK;
1609 :
1610 : cleanup:
1611 60 : if (rc != DBRES_OK) cloudsync_set_dberror(table->context);
1612 60 : dbvm_reset(vm);
1613 60 : return rc;
1614 : }
1615 :
1616 : // executed only if insert_cl == local_cl
1617 44985 : int merge_did_cid_win (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, dbvalue_t *insert_value, const char *site_id, int site_len, const char *col_name, int64_t col_version, bool *didwin_flag) {
1618 :
1619 44985 : if (col_name == NULL) col_name = CLOUDSYNC_TOMBSTONE_VALUE;
1620 :
1621 : int64_t local_version;
1622 44985 : int rc = merge_get_col_version(table, col_name, pk, pklen, &local_version);
1623 44985 : if (rc == DBRES_DONE) {
1624 : // no rows returned, the incoming change wins if there's nothing there locally
1625 20081 : *didwin_flag = true;
1626 20081 : return DBRES_OK;
1627 : }
1628 24904 : if (rc != DBRES_OK) return rc;
1629 :
1630 : // rc == DBRES_OK, means that a row with a version exists
1631 24904 : if (local_version != col_version) {
1632 1969 : if (col_version > local_version) {*didwin_flag = true; return DBRES_OK;}
1633 743 : if (col_version < local_version) {*didwin_flag = false; return DBRES_OK;}
1634 0 : }
1635 :
1636 : // rc == DBRES_ROW and col_version == local_version, need to compare values
1637 :
1638 : // retrieve col_value precompiled statement
1639 22935 : bool is_block_col = block_is_block_colname(col_name) && table_has_block_cols(table);
1640 : dbvm_t *vm;
1641 22935 : if (is_block_col) {
1642 : // Block column: read value from blocks table (pk + col_name bindings)
1643 43 : vm = table_block_value_read_stmt(table);
1644 43 : if (!vm) return cloudsync_set_error(data, "Unable to retrieve block value read statement in merge_did_cid_win", DBRES_ERROR);
1645 43 : rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
1646 43 : if (rc != DBRES_OK) { dbvm_reset(vm); return cloudsync_set_dberror(data); }
1647 43 : rc = databasevm_bind_text(vm, 2, col_name, -1);
1648 43 : if (rc != DBRES_OK) { dbvm_reset(vm); return cloudsync_set_dberror(data); }
1649 43 : } else {
1650 22892 : vm = table_column_lookup(table, col_name, false, NULL);
1651 22892 : if (!vm) return cloudsync_set_error(data, "Unable to retrieve column value precompiled statement in merge_did_cid_win", DBRES_ERROR);
1652 :
1653 : // bind primary key values
1654 22892 : rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, (void *)vm);
1655 22892 : if (rc < 0) {
1656 0 : rc = cloudsync_set_dberror(data);
1657 0 : dbvm_reset(vm);
1658 0 : return rc;
1659 : }
1660 : }
1661 :
1662 : // execute vm
1663 : dbvalue_t *local_value;
1664 22935 : rc = databasevm_step(vm);
1665 22935 : if (rc == DBRES_DONE) {
1666 : // meta entry exists but the actual value is missing
1667 : // we should allow the value_compare function to make a decision
1668 : // value_compare has been modified to handle the case where lvalue is NULL
1669 2 : local_value = NULL;
1670 2 : rc = DBRES_OK;
1671 22935 : } else if (rc == DBRES_ROW) {
1672 22933 : local_value = database_column_value(vm, 0);
1673 22933 : rc = DBRES_OK;
1674 22933 : } else {
1675 0 : goto cleanup;
1676 : }
1677 :
1678 : // compare values
1679 22935 : int ret = dbutils_value_compare(insert_value, local_value);
1680 : // reset after compare, otherwise local value would be deallocated
1681 22935 : dbvm_reset(vm);
1682 22935 : vm = NULL;
1683 :
1684 22935 : bool compare_site_id = (ret == 0 && data->merge_equal_values == true);
1685 22935 : if (!compare_site_id) {
1686 22935 : *didwin_flag = (ret > 0);
1687 22935 : goto cleanup;
1688 : }
1689 :
1690 : // values are the same and merge_equal_values is true
1691 0 : vm = table->meta_site_id_stmt;
1692 0 : rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
1693 0 : if (rc != DBRES_OK) goto cleanup;
1694 :
1695 0 : rc = databasevm_bind_text(vm, 2, col_name, -1);
1696 0 : if (rc != DBRES_OK) goto cleanup;
1697 :
1698 0 : rc = databasevm_step(vm);
1699 0 : if (rc == DBRES_ROW) {
1700 0 : const void *local_site_id = database_column_blob(vm, 0, NULL);
1701 0 : if (!local_site_id) {
1702 0 : dbvm_reset(vm);
1703 0 : return cloudsync_set_error(data, "NULL site_id in cloudsync table, table is probably corrupted", DBRES_ERROR);
1704 : }
1705 0 : ret = memcmp(site_id, local_site_id, site_len);
1706 0 : *didwin_flag = (ret > 0);
1707 0 : dbvm_reset(vm);
1708 0 : return DBRES_OK;
1709 : }
1710 :
1711 : // handle error condition here
1712 0 : dbvm_reset(vm);
1713 0 : return cloudsync_set_error(data, "Unable to find site_id for previous change, cloudsync table is probably corrupted", DBRES_ERROR);
1714 :
1715 : cleanup:
1716 22935 : if (rc != DBRES_OK) cloudsync_set_dberror(data);
1717 22935 : dbvm_reset(vm);
1718 22935 : return rc;
1719 44985 : }
1720 :
1721 60 : int merge_sentinel_only_insert (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, int64_t cl, int64_t db_version, const char *site_id, int site_len, int64_t seq, int64_t *rowid) {
1722 :
1723 : // reset return value
1724 60 : *rowid = 0;
1725 :
1726 60 : if (data->pending_batch == NULL) {
1727 : // Immediate mode: execute base table INSERT
1728 0 : dbvm_t *vm = table->real_merge_sentinel_stmt;
1729 0 : int rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, vm);
1730 0 : if (rc < 0) {
1731 0 : rc = cloudsync_set_dberror(data);
1732 0 : dbvm_reset(vm);
1733 0 : return rc;
1734 : }
1735 :
1736 0 : SYNCBIT_SET(data);
1737 0 : rc = databasevm_step(vm);
1738 0 : dbvm_reset(vm);
1739 0 : SYNCBIT_RESET(data);
1740 0 : if (rc == DBRES_DONE) rc = DBRES_OK;
1741 0 : if (rc != DBRES_OK) {
1742 0 : cloudsync_set_dberror(data);
1743 0 : return rc;
1744 : }
1745 0 : } else {
1746 : // Batch mode: skip base table INSERT, the batch flush will create the row
1747 60 : merge_pending_batch *batch = data->pending_batch;
1748 60 : batch->sentinel_pending = true;
1749 60 : if (batch->table == NULL) {
1750 30 : batch->table = table;
1751 30 : batch->pk = (char *)cloudsync_memory_alloc(pklen);
1752 30 : if (!batch->pk) return cloudsync_set_error(data, "merge_sentinel_only_insert: out of memory for pk", DBRES_NOMEM);
1753 30 : memcpy(batch->pk, pk, pklen);
1754 30 : batch->pk_len = pklen;
1755 30 : }
1756 : }
1757 :
1758 : // Metadata operations always execute regardless of batch mode
1759 60 : int rc = merge_zeroclock_on_resurrect(table, db_version, pk, pklen);
1760 60 : if (rc != DBRES_OK) return rc;
1761 :
1762 60 : return merge_set_winner_clock(data, table, pk, pklen, NULL, cl, db_version, site_id, site_len, seq, rowid);
1763 60 : }
1764 :
1765 : // MARK: - Block-level merge helpers -
1766 :
1767 : // Store a block value in the blocks table
1768 379 : static int block_store_value (cloudsync_context *data, cloudsync_table_context *table, const void *pk, int pklen, const char *block_colname, dbvalue_t *col_value) {
1769 379 : dbvm_t *vm = table->block_value_write_stmt;
1770 379 : if (!vm) return cloudsync_set_error(data, "block_store_value: blocks table not initialized", DBRES_MISUSE);
1771 :
1772 379 : int rc = databasevm_bind_blob(vm, 1, pk, pklen);
1773 379 : if (rc != DBRES_OK) goto cleanup;
1774 379 : rc = databasevm_bind_text(vm, 2, block_colname, -1);
1775 379 : if (rc != DBRES_OK) goto cleanup;
1776 379 : if (col_value) {
1777 379 : rc = databasevm_bind_value(vm, 3, col_value);
1778 379 : } else {
1779 0 : rc = databasevm_bind_null(vm, 3);
1780 : }
1781 379 : if (rc != DBRES_OK) goto cleanup;
1782 :
1783 379 : rc = databasevm_step(vm);
1784 379 : if (rc == DBRES_DONE) rc = DBRES_OK;
1785 :
1786 : cleanup:
1787 379 : if (rc != DBRES_OK) cloudsync_set_dberror(data);
1788 379 : databasevm_reset(vm);
1789 379 : return rc;
1790 379 : }
1791 :
1792 : // Delete a block value from the blocks table
1793 72 : static int block_delete_value (cloudsync_context *data, cloudsync_table_context *table, const void *pk, int pklen, const char *block_colname) {
1794 72 : dbvm_t *vm = table->block_value_delete_stmt;
1795 72 : if (!vm) return cloudsync_set_error(data, "block_delete_value: blocks table not initialized", DBRES_MISUSE);
1796 :
1797 72 : int rc = databasevm_bind_blob(vm, 1, pk, pklen);
1798 72 : if (rc != DBRES_OK) goto cleanup;
1799 72 : rc = databasevm_bind_text(vm, 2, block_colname, -1);
1800 72 : if (rc != DBRES_OK) goto cleanup;
1801 :
1802 72 : rc = databasevm_step(vm);
1803 72 : if (rc == DBRES_DONE) rc = DBRES_OK;
1804 :
1805 : cleanup:
1806 72 : if (rc != DBRES_OK) cloudsync_set_dberror(data);
1807 72 : databasevm_reset(vm);
1808 72 : return rc;
1809 72 : }
1810 :
1811 : // Materialize all alive blocks for a base column into the base table
1812 462 : int block_materialize_column (cloudsync_context *data, cloudsync_table_context *table, const void *pk, int pklen, const char *base_col_name) {
1813 462 : if (!table->block_list_stmt) return cloudsync_set_error(data, "block_materialize_column: blocks table not initialized", DBRES_MISUSE);
1814 :
1815 : // Find column index and delimiter
1816 462 : int col_idx = -1;
1817 468 : for (int i = 0; i < table->ncols; i++) {
1818 468 : if (strcasecmp(table->col_name[i], base_col_name) == 0) {
1819 462 : col_idx = i;
1820 462 : break;
1821 : }
1822 6 : }
1823 462 : if (col_idx < 0) return cloudsync_set_error(data, "block_materialize_column: column not found", DBRES_ERROR);
1824 462 : const char *delimiter = table->col_delimiter[col_idx] ? table->col_delimiter[col_idx] : BLOCK_DEFAULT_DELIMITER;
1825 :
1826 : // Build the LIKE pattern for block col_names: "base_col\x1F%"
1827 462 : char *like_pattern = block_build_colname(base_col_name, "%");
1828 462 : if (!like_pattern) return DBRES_NOMEM;
1829 :
1830 : // Query alive blocks from blocks table joined with metadata
1831 : // block_list_stmt: SELECT b.col_value FROM blocks b JOIN meta m
1832 : // ON b.pk = m.pk AND b.col_name = m.col_name
1833 : // WHERE b.pk = ? AND b.col_name LIKE ? AND m.col_version % 2 = 1
1834 : // ORDER BY b.col_name
1835 462 : dbvm_t *vm = table->block_list_stmt;
1836 462 : int rc = databasevm_bind_blob(vm, 1, pk, pklen);
1837 462 : if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_reset(vm); return rc; }
1838 462 : rc = databasevm_bind_text(vm, 2, like_pattern, -1);
1839 462 : if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_reset(vm); return rc; }
1840 : // Bind pk again for the join condition (parameter 3)
1841 462 : rc = databasevm_bind_blob(vm, 3, pk, pklen);
1842 462 : if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_reset(vm); return rc; }
1843 462 : rc = databasevm_bind_text(vm, 4, like_pattern, -1);
1844 462 : if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_reset(vm); return rc; }
1845 :
1846 : // Collect block values
1847 462 : const char **block_values = NULL;
1848 462 : int block_count = 0;
1849 462 : int block_cap = 0;
1850 :
1851 22695 : while ((rc = databasevm_step(vm)) == DBRES_ROW) {
1852 22233 : const char *value = database_column_text(vm, 0);
1853 22233 : if (block_count >= block_cap) {
1854 1076 : int new_cap = block_cap ? block_cap * 2 : 16;
1855 1076 : const char **new_arr = (const char **)cloudsync_memory_realloc((void *)block_values, (uint64_t)(new_cap * sizeof(char *)));
1856 1076 : if (!new_arr) { rc = DBRES_NOMEM; break; }
1857 1076 : block_values = new_arr;
1858 1076 : block_cap = new_cap;
1859 1076 : }
1860 22233 : block_values[block_count] = value ? cloudsync_string_dup(value) : cloudsync_string_dup("");
1861 22233 : block_count++;
1862 : }
1863 462 : databasevm_reset(vm);
1864 462 : cloudsync_memory_free(like_pattern);
1865 :
1866 462 : if (rc != DBRES_DONE && rc != DBRES_OK && rc != DBRES_ROW) {
1867 : // Free collected values
1868 0 : for (int i = 0; i < block_count; i++) cloudsync_memory_free((void *)block_values[i]);
1869 0 : if (block_values) cloudsync_memory_free((void *)block_values);
1870 0 : return cloudsync_set_dberror(data);
1871 : }
1872 :
1873 : // Materialize text (NULL when no alive blocks)
1874 462 : char *text = (block_count > 0) ? block_materialize_text(block_values, block_count, delimiter) : NULL;
1875 22695 : for (int i = 0; i < block_count; i++) cloudsync_memory_free((void *)block_values[i]);
1876 462 : if (block_values) cloudsync_memory_free((void *)block_values);
1877 462 : if (block_count > 0 && !text) return DBRES_NOMEM;
1878 :
1879 : // Update the base table column via the col_merge_stmt (with triggers disabled)
1880 462 : dbvm_t *merge_vm = table->col_merge_stmt[col_idx];
1881 462 : if (!merge_vm) { cloudsync_memory_free(text); return DBRES_ERROR; }
1882 :
1883 : // Bind PKs
1884 462 : rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, merge_vm);
1885 462 : if (rc < 0) { cloudsync_memory_free(text); databasevm_reset(merge_vm); return DBRES_ERROR; }
1886 :
1887 : // Bind the text value twice (INSERT value + ON CONFLICT UPDATE value)
1888 462 : int npks = table->npks;
1889 462 : if (text) {
1890 458 : rc = databasevm_bind_text(merge_vm, npks + 1, text, -1);
1891 458 : if (rc != DBRES_OK) { cloudsync_memory_free(text); databasevm_reset(merge_vm); return rc; }
1892 458 : rc = databasevm_bind_text(merge_vm, npks + 2, text, -1);
1893 458 : if (rc != DBRES_OK) { cloudsync_memory_free(text); databasevm_reset(merge_vm); return rc; }
1894 458 : } else {
1895 4 : rc = databasevm_bind_null(merge_vm, npks + 1);
1896 4 : if (rc != DBRES_OK) { databasevm_reset(merge_vm); return rc; }
1897 4 : rc = databasevm_bind_null(merge_vm, npks + 2);
1898 4 : if (rc != DBRES_OK) { databasevm_reset(merge_vm); return rc; }
1899 : }
1900 :
1901 : // Execute with triggers disabled
1902 462 : table->enabled = 0;
1903 462 : SYNCBIT_SET(data);
1904 462 : rc = databasevm_step(merge_vm);
1905 462 : databasevm_reset(merge_vm);
1906 462 : SYNCBIT_RESET(data);
1907 462 : table->enabled = 1;
1908 :
1909 462 : cloudsync_memory_free(text);
1910 :
1911 462 : if (rc == DBRES_DONE) rc = DBRES_OK;
1912 462 : if (rc != DBRES_OK) return cloudsync_set_dberror(data);
1913 462 : return DBRES_OK;
1914 462 : }
1915 :
1916 : // Accessor for has_block_cols flag
1917 1024 : bool table_has_block_cols (cloudsync_table_context *table) {
1918 1024 : return table && table->has_block_cols;
1919 : }
1920 :
1921 : // Get block column algo for a given column index
1922 11382 : col_algo_t table_col_algo (cloudsync_table_context *table, int index) {
1923 11382 : if (!table || !table->col_algo || index < 0 || index >= table->ncols) return col_algo_normal;
1924 11382 : return table->col_algo[index];
1925 11382 : }
1926 :
1927 : // Get block delimiter for a given column index
1928 133 : const char *table_col_delimiter (cloudsync_table_context *table, int index) {
1929 133 : if (!table || !table->col_delimiter || index < 0 || index >= table->ncols) return BLOCK_DEFAULT_DELIMITER;
1930 133 : return table->col_delimiter[index] ? table->col_delimiter[index] : BLOCK_DEFAULT_DELIMITER;
1931 133 : }
1932 :
1933 : // Block column struct accessors (for use outside cloudsync.c where struct is opaque)
1934 1024 : dbvm_t *table_block_value_read_stmt (cloudsync_table_context *table) { return table ? table->block_value_read_stmt : NULL; }
1935 496 : dbvm_t *table_block_value_write_stmt (cloudsync_table_context *table) { return table ? table->block_value_write_stmt : NULL; }
1936 91 : dbvm_t *table_block_list_stmt (cloudsync_table_context *table) { return table ? table->block_list_stmt : NULL; }
1937 91 : const char *table_blocks_ref (cloudsync_table_context *table) { return table ? table->blocks_ref : NULL; }
1938 :
1939 2 : void table_set_col_delimiter (cloudsync_table_context *table, int col_idx, const char *delimiter) {
1940 2 : if (!table || !table->col_delimiter || col_idx < 0 || col_idx >= table->ncols) return;
1941 2 : if (table->col_delimiter[col_idx]) cloudsync_memory_free(table->col_delimiter[col_idx]);
1942 2 : table->col_delimiter[col_idx] = delimiter ? cloudsync_string_dup(delimiter) : NULL;
1943 2 : }
1944 :
1945 : // Find column index by name
1946 121 : int table_col_index (cloudsync_table_context *table, const char *col_name) {
1947 121 : if (!table || !col_name) return -1;
1948 125 : for (int i = 0; i < table->ncols; i++) {
1949 125 : if (strcasecmp(table->col_name[i], col_name) == 0) return i;
1950 4 : }
1951 0 : return -1;
1952 121 : }
1953 :
1954 46004 : int merge_insert (cloudsync_context *data, cloudsync_table_context *table, const char *insert_pk, int insert_pk_len, int64_t insert_cl, const char *insert_name, dbvalue_t *insert_value, int64_t insert_col_version, int64_t insert_db_version, const char *insert_site_id, int insert_site_id_len, int64_t insert_seq, int64_t *rowid) {
1955 : // Handle DWS and AWS algorithms here
1956 : // Delete-Wins Set (DWS): table_algo_crdt_dws
1957 : // Add-Wins Set (AWS): table_algo_crdt_aws
1958 :
1959 : // Causal-Length Set (CLS) Algorithm (default)
1960 :
1961 : // compute the local causal length for the row based on the primary key
1962 : // the causal length is used to determine the order of operations and resolve conflicts.
1963 46004 : int64_t local_cl = merge_get_local_cl(table, insert_pk, insert_pk_len);
1964 46004 : if (local_cl < 0) return cloudsync_set_error(data, "Unable to compute local causal length", DBRES_ERROR);
1965 :
1966 : // if the incoming causal length is older than the local causal length, we can safely ignore it
1967 : // because the local changes are more recent
1968 46004 : if (insert_cl < local_cl) return DBRES_OK;
1969 :
1970 : // check if the operation is a delete by examining the causal length
1971 : // even causal lengths typically signify delete operations
1972 45792 : bool is_delete = (insert_cl % 2 == 0);
1973 45792 : if (is_delete) {
1974 : // if it's a delete, check if the local state is at the same causal length
1975 : // if it is, no further action is needed
1976 614 : if (local_cl == insert_cl) return DBRES_OK;
1977 :
1978 : // perform a delete merge if the causal length is newer than the local one
1979 326 : int rc = merge_delete(data, table, insert_pk, insert_pk_len, insert_name, insert_col_version,
1980 163 : insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid);
1981 163 : if (rc != DBRES_OK) cloudsync_set_error(data, "Unable to perform merge_delete", rc);
1982 163 : return rc;
1983 : }
1984 :
1985 : // if the operation is a sentinel-only insert (indicating a new row or resurrected row with no column update), handle it separately.
1986 45178 : bool is_sentinel_only = (strcmp(insert_name, CLOUDSYNC_TOMBSTONE_VALUE) == 0);
1987 45178 : if (is_sentinel_only) {
1988 193 : if (local_cl == insert_cl) return DBRES_OK;
1989 :
1990 : // perform a sentinel-only insert to track the existence of the row
1991 120 : int rc = merge_sentinel_only_insert(data, table, insert_pk, insert_pk_len, insert_col_version,
1992 60 : insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid);
1993 60 : if (rc != DBRES_OK) cloudsync_set_error(data, "Unable to perform merge_sentinel_only_insert", rc);
1994 60 : return rc;
1995 : }
1996 :
1997 : // from this point I can be sure that insert_name is not sentinel
1998 :
1999 : // handle the case where a row is being resurrected (e.g., after a delete, a new insert for the same row)
2000 : // odd causal lengths can "resurrect" rows
2001 44985 : bool needs_resurrect = (insert_cl > local_cl && insert_cl % 2 == 1);
2002 44985 : bool row_exists_locally = local_cl != 0;
2003 :
2004 : // if a resurrection is needed, insert a sentinel to mark the row as alive
2005 : // this handles out-of-order deliveries where the row was deleted and is now being re-inserted
2006 44985 : if (needs_resurrect && (row_exists_locally || (!row_exists_locally && insert_cl > 1))) {
2007 0 : int rc = merge_sentinel_only_insert(data, table, insert_pk, insert_pk_len, insert_cl,
2008 0 : insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid);
2009 0 : if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to perform merge_sentinel_only_insert", rc);
2010 0 : }
2011 :
2012 : // at this point, we determine whether the incoming change wins based on causal length
2013 : // this can be due to a resurrection, a non-existent local row, or a conflict resolution
2014 44985 : bool flag = false;
2015 44985 : int rc = merge_did_cid_win(data, table, insert_pk, insert_pk_len, insert_value, insert_site_id, insert_site_id_len, insert_name, insert_col_version, &flag);
2016 44985 : if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to perform merge_did_cid_win", rc);
2017 :
2018 : // check if the incoming change wins and should be applied
2019 44985 : bool does_cid_win = ((needs_resurrect) || (!row_exists_locally) || (flag));
2020 44985 : if (!does_cid_win) return DBRES_OK;
2021 :
2022 : // Block-level LWW: if the incoming col_name is a block entry (contains \x1F),
2023 : // bypass the normal base-table write and instead store the value in the blocks table.
2024 : // The base table column will be materialized from all alive blocks.
2025 21707 : if (block_is_block_colname(insert_name) && table->has_block_cols) {
2026 : // Store or delete block value in blocks table depending on tombstone status
2027 412 : if (insert_col_version % 2 == 0) {
2028 : // Tombstone: remove from blocks table
2029 33 : rc = block_delete_value(data, table, insert_pk, insert_pk_len, insert_name);
2030 33 : } else {
2031 379 : rc = block_store_value(data, table, insert_pk, insert_pk_len, insert_name, insert_value);
2032 : }
2033 412 : if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to store/delete block value", rc);
2034 :
2035 : // Set winner clock in metadata
2036 824 : rc = merge_set_winner_clock(data, table, insert_pk, insert_pk_len, insert_name,
2037 412 : insert_col_version, insert_db_version,
2038 412 : insert_site_id, insert_site_id_len, insert_seq, rowid);
2039 412 : if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to set winner clock for block", rc);
2040 :
2041 : // Materialize the full column from blocks into the base table
2042 412 : char *base_col = block_extract_base_colname(insert_name);
2043 412 : if (base_col) {
2044 412 : rc = block_materialize_column(data, table, insert_pk, insert_pk_len, base_col);
2045 412 : cloudsync_memory_free(base_col);
2046 412 : if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to materialize block column", rc);
2047 412 : }
2048 :
2049 412 : return DBRES_OK;
2050 : }
2051 :
2052 : // perform the final column insert or update if the incoming change wins
2053 21295 : if (data->pending_batch) {
2054 : // Propagate row_exists_locally to the batch on the first winning column.
2055 : // This lets merge_flush_pending choose UPDATE vs INSERT ON CONFLICT,
2056 : // which matters when RLS policies reference columns not in the payload.
2057 21278 : if (data->pending_batch->table == NULL) {
2058 7658 : data->pending_batch->row_exists = row_exists_locally;
2059 7658 : }
2060 21278 : rc = merge_pending_add(data, table, insert_pk, insert_pk_len, insert_name, insert_value, insert_col_version, insert_db_version, insert_site_id, insert_site_id_len, insert_seq);
2061 21278 : if (rc != DBRES_OK) cloudsync_set_error(data, "Unable to perform merge_pending_add", rc);
2062 21278 : } else {
2063 17 : rc = merge_insert_col(data, table, insert_pk, insert_pk_len, insert_name, insert_value, insert_col_version, insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid);
2064 17 : if (rc != DBRES_OK) cloudsync_set_error(data, "Unable to perform merge_insert_col", rc);
2065 : }
2066 :
2067 21295 : return rc;
2068 46004 : }
2069 :
2070 : // MARK: - Block column setup -
2071 :
2072 69 : int cloudsync_setup_block_column (cloudsync_context *data, const char *table_name, const char *col_name, const char *delimiter) {
2073 69 : cloudsync_table_context *table = table_lookup(data, table_name);
2074 69 : if (!table) return cloudsync_set_error(data, "cloudsync_setup_block_column: table not found", DBRES_ERROR);
2075 :
2076 : // Find column index
2077 69 : int col_idx = table_col_index(table, col_name);
2078 69 : if (col_idx < 0) {
2079 : char buf[1024];
2080 0 : snprintf(buf, sizeof(buf), "cloudsync_setup_block_column: column '%s' not found in table '%s'", col_name, table_name);
2081 0 : return cloudsync_set_error(data, buf, DBRES_ERROR);
2082 : }
2083 :
2084 : // Set column algo
2085 69 : table->col_algo[col_idx] = col_algo_block;
2086 69 : table->has_block_cols = true;
2087 :
2088 : // Set delimiter (can be NULL for default)
2089 69 : if (table->col_delimiter[col_idx]) {
2090 0 : cloudsync_memory_free(table->col_delimiter[col_idx]);
2091 0 : table->col_delimiter[col_idx] = NULL;
2092 0 : }
2093 69 : if (delimiter) {
2094 0 : table->col_delimiter[col_idx] = cloudsync_string_dup(delimiter);
2095 0 : }
2096 :
2097 : // Create blocks table if not already done
2098 69 : if (!table->blocks_ref) {
2099 67 : table->blocks_ref = database_build_blocks_ref(table->schema, table->name);
2100 67 : if (!table->blocks_ref) return DBRES_NOMEM;
2101 :
2102 : // CREATE TABLE IF NOT EXISTS
2103 67 : char *sql = cloudsync_memory_mprintf(SQL_BLOCKS_CREATE_TABLE, table->blocks_ref);
2104 67 : if (!sql) return DBRES_NOMEM;
2105 :
2106 67 : int rc = database_exec(data, sql);
2107 67 : cloudsync_memory_free(sql);
2108 67 : if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to create blocks table", rc);
2109 :
2110 : // Prepare block statements
2111 : // Write: upsert into blocks (pk, col_name, col_value)
2112 67 : sql = cloudsync_memory_mprintf(SQL_BLOCKS_UPSERT, table->blocks_ref);
2113 67 : if (!sql) return DBRES_NOMEM;
2114 67 : rc = databasevm_prepare(data, sql, (void **)&table->block_value_write_stmt, DBFLAG_PERSISTENT);
2115 67 : cloudsync_memory_free(sql);
2116 67 : if (rc != DBRES_OK) return rc;
2117 :
2118 : // Read: SELECT col_value FROM blocks WHERE pk = ? AND col_name = ?
2119 67 : sql = cloudsync_memory_mprintf(SQL_BLOCKS_SELECT, table->blocks_ref);
2120 67 : if (!sql) return DBRES_NOMEM;
2121 67 : rc = databasevm_prepare(data, sql, (void **)&table->block_value_read_stmt, DBFLAG_PERSISTENT);
2122 67 : cloudsync_memory_free(sql);
2123 67 : if (rc != DBRES_OK) return rc;
2124 :
2125 : // Delete: DELETE FROM blocks WHERE pk = ? AND col_name = ?
2126 67 : sql = cloudsync_memory_mprintf(SQL_BLOCKS_DELETE, table->blocks_ref);
2127 67 : if (!sql) return DBRES_NOMEM;
2128 67 : rc = databasevm_prepare(data, sql, (void **)&table->block_value_delete_stmt, DBFLAG_PERSISTENT);
2129 67 : cloudsync_memory_free(sql);
2130 67 : if (rc != DBRES_OK) return rc;
2131 :
2132 : // List alive blocks for materialization
2133 67 : sql = cloudsync_memory_mprintf(SQL_BLOCKS_LIST_ALIVE, table->blocks_ref, table->meta_ref);
2134 67 : if (!sql) return DBRES_NOMEM;
2135 67 : rc = databasevm_prepare(data, sql, (void **)&table->block_list_stmt, DBFLAG_PERSISTENT);
2136 67 : cloudsync_memory_free(sql);
2137 67 : if (rc != DBRES_OK) return rc;
2138 67 : }
2139 :
2140 : // Persist settings
2141 69 : int rc = dbutils_table_settings_set_key_value(data, table_name, col_name, "algo", "block");
2142 69 : if (rc != DBRES_OK) return rc;
2143 :
2144 69 : if (delimiter) {
2145 0 : rc = dbutils_table_settings_set_key_value(data, table_name, col_name, "delimiter", delimiter);
2146 0 : if (rc != DBRES_OK) return rc;
2147 0 : }
2148 :
2149 69 : return DBRES_OK;
2150 69 : }
2151 :
2152 : // MARK: - Private -
2153 :
2154 193 : bool cloudsync_config_exists (cloudsync_context *data) {
2155 193 : return database_internal_table_exists(data, CLOUDSYNC_SITEID_NAME) == true;
2156 : }
2157 :
2158 207 : cloudsync_context *cloudsync_context_create (void *db) {
2159 207 : cloudsync_context *data = (cloudsync_context *)cloudsync_memory_zeroalloc((uint64_t)(sizeof(cloudsync_context)));
2160 207 : if (!data) return NULL;
2161 : DEBUG_SETTINGS("cloudsync_context_create %p", data);
2162 :
2163 207 : data->libversion = CLOUDSYNC_VERSION;
2164 207 : data->pending_db_version = CLOUDSYNC_VALUE_NOTSET;
2165 : #if CLOUDSYNC_DEBUG
2166 : data->debug = 1;
2167 : #endif
2168 :
2169 : // allocate space for 64 tables (it can grow if needed)
2170 207 : uint64_t mem_needed = (uint64_t)(CLOUDSYNC_INIT_NTABLES * sizeof(cloudsync_table_context *));
2171 207 : data->tables = (cloudsync_table_context **)cloudsync_memory_zeroalloc(mem_needed);
2172 207 : if (!data->tables) {cloudsync_memory_free(data); return NULL;}
2173 :
2174 207 : data->tables_cap = CLOUDSYNC_INIT_NTABLES;
2175 207 : data->tables_count = 0;
2176 207 : data->db = db;
2177 :
2178 : // SQLite exposes col_value as ANY, but other databases require a concrete type.
2179 : // In PostgreSQL we expose col_value as bytea, which holds the pk-encoded value bytes (type + data).
2180 : // Because col_value is already encoded, we skip decoding this field and pass it through as bytea.
2181 : // It is decoded to the target column type just before applying changes to the base table.
2182 207 : data->skip_decode_idx = (db == NULL) ? CLOUDSYNC_PK_INDEX_COLVALUE : -1;
2183 :
2184 207 : return data;
2185 207 : }
2186 :
2187 207 : void cloudsync_context_free (void *ctx) {
2188 207 : cloudsync_context *data = (cloudsync_context *)ctx;
2189 : DEBUG_SETTINGS("cloudsync_context_free %p", data);
2190 207 : if (!data) return;
2191 :
2192 : // free all table contexts and prepared statements
2193 207 : cloudsync_terminate(data);
2194 :
2195 207 : cloudsync_memory_free(data->tables);
2196 207 : cloudsync_memory_free(data);
2197 207 : }
2198 :
2199 295 : const char *cloudsync_context_init (cloudsync_context *data) {
2200 295 : if (!data) return NULL;
2201 :
2202 : // perform init just the first time, if the site_id field is not set.
2203 : // The data->site_id value could exists while settings tables don't exists if the
2204 : // cloudsync_context_init was previously called in init transaction that was rolled back
2205 : // because of an error during the init process.
2206 295 : if (data->site_id[0] == 0 || !database_internal_table_exists(data, CLOUDSYNC_SITEID_NAME)) {
2207 187 : if (dbutils_settings_init(data) != DBRES_OK) return NULL;
2208 187 : if (cloudsync_add_dbvms(data) != DBRES_OK) return NULL;
2209 187 : if (cloudsync_load_siteid(data) != DBRES_OK) return NULL;
2210 187 : data->schema_hash = database_schema_hash(data);
2211 187 : }
2212 :
2213 295 : return (const char *)data->site_id;
2214 295 : }
2215 :
2216 1235 : void cloudsync_sync_key (cloudsync_context *data, const char *key, const char *value) {
2217 : DEBUG_SETTINGS("cloudsync_sync_key key: %s value: %s", key, value);
2218 :
2219 : // sync data
2220 1235 : if (strcmp(key, CLOUDSYNC_KEY_SCHEMAVERSION) == 0) {
2221 187 : data->schema_version = (int)strtol(value, NULL, 0);
2222 187 : return;
2223 : }
2224 :
2225 1048 : if (strcmp(key, CLOUDSYNC_KEY_DEBUG) == 0) {
2226 0 : data->debug = 0;
2227 0 : if (value && (value[0] != 0) && (value[0] != '0')) data->debug = 1;
2228 0 : return;
2229 : }
2230 :
2231 1048 : if (strcmp(key, CLOUDSYNC_KEY_SCHEMA) == 0) {
2232 0 : cloudsync_set_schema(data, value);
2233 0 : return;
2234 : }
2235 1235 : }
2236 :
2237 : #if 0
2238 : void cloudsync_sync_table_key(cloudsync_context *data, const char *table, const char *column, const char *key, const char *value) {
2239 : DEBUG_SETTINGS("cloudsync_sync_table_key table: %s column: %s key: %s value: %s", table, column, key, value);
2240 : // Unused in this version
2241 : return;
2242 : }
2243 : #endif
2244 :
2245 7686 : int cloudsync_commit_hook (void *ctx) {
2246 7686 : cloudsync_context *data = (cloudsync_context *)ctx;
2247 :
2248 7686 : data->db_version = data->pending_db_version;
2249 7686 : data->pending_db_version = CLOUDSYNC_VALUE_NOTSET;
2250 7686 : data->seq = 0;
2251 :
2252 7686 : return DBRES_OK;
2253 : }
2254 :
2255 3 : void cloudsync_rollback_hook (void *ctx) {
2256 3 : cloudsync_context *data = (cloudsync_context *)ctx;
2257 :
2258 3 : data->pending_db_version = CLOUDSYNC_VALUE_NOTSET;
2259 3 : data->seq = 0;
2260 3 : }
2261 :
2262 24 : int cloudsync_begin_alter (cloudsync_context *data, const char *table_name) {
2263 : // init cloudsync_settings
2264 24 : if (cloudsync_context_init(data) == NULL) {
2265 0 : return DBRES_MISUSE;
2266 : }
2267 :
2268 : // lookup table
2269 24 : cloudsync_table_context *table = table_lookup(data, table_name);
2270 24 : if (!table) {
2271 : char buffer[1024];
2272 1 : snprintf(buffer, sizeof(buffer), "Unable to find table %s", table_name);
2273 1 : return cloudsync_set_error(data, buffer, DBRES_MISUSE);
2274 : }
2275 :
2276 : // create a savepoint to manage the alter operations as a transaction
2277 23 : int rc = database_begin_savepoint(data, "cloudsync_alter");
2278 23 : if (rc != DBRES_OK) {
2279 0 : return cloudsync_set_error(data, "Unable to create cloudsync_begin_alter savepoint", DBRES_MISUSE);
2280 : }
2281 :
2282 : // retrieve primary key(s)
2283 23 : char **names = NULL;
2284 23 : int nrows = 0;
2285 23 : rc = database_pk_names(data, table_name, &names, &nrows);
2286 23 : if (rc != DBRES_OK) {
2287 : char buffer[1024];
2288 0 : snprintf(buffer, sizeof(buffer), "Unable to get primary keys for table %s", table_name);
2289 0 : cloudsync_set_error(data, buffer, DBRES_MISUSE);
2290 0 : goto rollback_begin_alter;
2291 : }
2292 :
2293 : // sanity check the number of primary keys
2294 23 : if (nrows != table_count_pks(table)) {
2295 : char buffer[1024];
2296 0 : snprintf(buffer, sizeof(buffer), "Number of primary keys for table %s changed before ALTER", table_name);
2297 0 : cloudsync_set_error(data, buffer, DBRES_MISUSE);
2298 0 : goto rollback_begin_alter;
2299 : }
2300 :
2301 : // drop original triggers
2302 23 : rc = database_delete_triggers(data, table_name);
2303 23 : if (rc != DBRES_OK) {
2304 : char buffer[1024];
2305 0 : snprintf(buffer, sizeof(buffer), "Unable to delete triggers for table %s in cloudsync_begin_alter.", table_name);
2306 0 : cloudsync_set_error(data, buffer, DBRES_ERROR);
2307 0 : goto rollback_begin_alter;
2308 : }
2309 :
2310 23 : table_set_pknames(table, names);
2311 23 : return DBRES_OK;
2312 :
2313 : rollback_begin_alter:
2314 0 : database_rollback_savepoint(data, "cloudsync_alter");
2315 0 : if (names) table_pknames_free(names, nrows);
2316 0 : return rc;
2317 24 : }
2318 :
2319 23 : int cloudsync_finalize_alter (cloudsync_context *data, cloudsync_table_context *table) {
2320 : // check if dbversion needed to be updated
2321 23 : cloudsync_dbversion_check_uptodate(data);
2322 :
2323 : // if primary-key columns change, all row identities change.
2324 : // In that case, the clock table must be dropped, recreated,
2325 : // and backfilled. We detect this by comparing the unique index
2326 : // in the lookaside table with the source table's PKs.
2327 :
2328 : // retrieve primary keys (to check is they changed)
2329 23 : char **result = NULL;
2330 23 : int nrows = 0;
2331 23 : int rc = database_pk_names (data, table->name, &result, &nrows);
2332 23 : if (rc != DBRES_OK || nrows == 0) {
2333 0 : if (nrows == 0) rc = DBRES_MISUSE;
2334 0 : goto finalize;
2335 : }
2336 :
2337 : // check if there are differences
2338 23 : bool pk_diff = (nrows != table->npks);
2339 23 : if (!pk_diff) {
2340 45 : for (int i = 0; i < nrows; ++i) {
2341 34 : if (strcmp(table->pk_name[i], result[i]) != 0) {
2342 6 : pk_diff = true;
2343 6 : break;
2344 : }
2345 28 : }
2346 17 : }
2347 :
2348 23 : if (pk_diff) {
2349 : // drop meta-table, it will be recreated
2350 12 : char *sql = cloudsync_memory_mprintf(SQL_DROP_CLOUDSYNC_TABLE, table->meta_ref);
2351 12 : rc = database_exec(data, sql);
2352 12 : cloudsync_memory_free(sql);
2353 12 : if (rc != DBRES_OK) {
2354 0 : DEBUG_DBERROR(rc, "cloudsync_finalize_alter", data);
2355 0 : goto finalize;
2356 : }
2357 12 : } else {
2358 : // compact meta-table
2359 : // delete entries for removed columns
2360 11 : const char *schema = table->schema ? table->schema : "";
2361 11 : char *sql = sql_build_delete_cols_not_in_schema_query(schema, table->name, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
2362 11 : rc = database_exec(data, sql);
2363 11 : cloudsync_memory_free(sql);
2364 11 : if (rc != DBRES_OK) {
2365 0 : DEBUG_DBERROR(rc, "cloudsync_finalize_alter", data);
2366 0 : goto finalize;
2367 : }
2368 :
2369 11 : sql = sql_build_pk_qualified_collist_query(schema, table->name);
2370 11 : if (!sql) {rc = DBRES_NOMEM; goto finalize;}
2371 :
2372 11 : char *pkclause = NULL;
2373 11 : rc = database_select_text(data, sql, &pkclause);
2374 11 : cloudsync_memory_free(sql);
2375 11 : if (rc != DBRES_OK) goto finalize;
2376 11 : char *pkvalues = (pkclause) ? pkclause : "rowid";
2377 :
2378 : // delete entries related to rows that no longer exist in the original table, but preserve tombstone
2379 11 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_GC_DELETE_ORPHANED_PK, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE, CLOUDSYNC_TOMBSTONE_VALUE, table->base_ref, table->meta_ref, pkvalues);
2380 11 : rc = database_exec(data, sql);
2381 11 : if (pkclause) cloudsync_memory_free(pkclause);
2382 11 : cloudsync_memory_free(sql);
2383 11 : if (rc != DBRES_OK) {
2384 0 : DEBUG_DBERROR(rc, "cloudsync_finalize_alter", data);
2385 0 : goto finalize;
2386 : }
2387 :
2388 : }
2389 :
2390 : // update key to be later used in cloudsync_dbversion_rebuild
2391 : char buf[256];
2392 23 : snprintf(buf, sizeof(buf), "%" PRId64, data->db_version);
2393 23 : dbutils_settings_set_key_value(data, "pre_alter_dbversion", buf);
2394 :
2395 : finalize:
2396 23 : table_pknames_free(result, nrows);
2397 23 : return rc;
2398 : }
2399 :
2400 24 : int cloudsync_commit_alter (cloudsync_context *data, const char *table_name) {
2401 24 : int rc = DBRES_MISUSE;
2402 24 : cloudsync_table_context *table = NULL;
2403 :
2404 : // init cloudsync_settings
2405 24 : if (cloudsync_context_init(data) == NULL) {
2406 0 : cloudsync_set_error(data, "Unable to initialize cloudsync context", DBRES_MISUSE);
2407 0 : goto rollback_finalize_alter;
2408 : }
2409 :
2410 : // lookup table
2411 24 : table = table_lookup(data, table_name);
2412 24 : if (!table) {
2413 : char buffer[1024];
2414 1 : snprintf(buffer, sizeof(buffer), "Unable to find table %s", table_name);
2415 1 : cloudsync_set_error(data, buffer, DBRES_MISUSE);
2416 1 : goto rollback_finalize_alter;
2417 : }
2418 :
2419 23 : rc = cloudsync_finalize_alter(data, table);
2420 23 : if (rc != DBRES_OK) goto rollback_finalize_alter;
2421 :
2422 : // the table is outdated, delete it and it will be reloaded in the cloudsync_init_internal
2423 23 : table_remove(data, table);
2424 23 : table_free(table);
2425 23 : table = NULL;
2426 :
2427 : // init again cloudsync for the table
2428 23 : table_algo algo_current = dbutils_table_settings_get_algo(data, table_name);
2429 23 : if (algo_current == table_algo_none) algo_current = dbutils_table_settings_get_algo(data, "*");
2430 23 : rc = cloudsync_init_table(data, table_name, cloudsync_algo_name(algo_current), true);
2431 23 : if (rc != DBRES_OK) goto rollback_finalize_alter;
2432 :
2433 : // release savepoint
2434 23 : rc = database_commit_savepoint(data, "cloudsync_alter");
2435 23 : if (rc != DBRES_OK) {
2436 0 : cloudsync_set_dberror(data);
2437 0 : goto rollback_finalize_alter;
2438 : }
2439 :
2440 23 : cloudsync_update_schema_hash(data);
2441 23 : return DBRES_OK;
2442 :
2443 : rollback_finalize_alter:
2444 1 : database_rollback_savepoint(data, "cloudsync_alter");
2445 1 : if (table) table_set_pknames(table, NULL);
2446 1 : return rc;
2447 24 : }
2448 :
2449 : // MARK: - Filter Rewrite -
2450 :
2451 : // Replace bare column names in a filter expression with prefix-qualified names.
2452 : // E.g., filter="user_id = 42", prefix="NEW", columns=["user_id","id"] → "NEW.\"user_id\" = 42"
2453 : // Columns must be sorted by length descending by the caller to avoid partial matches.
2454 : // Skips content inside single-quoted string literals.
2455 : // Returns a newly allocated string (caller must free with cloudsync_memory_free), or NULL on error.
2456 : // Helper: check if an identifier token matches a column name.
2457 8 : static bool filter_is_column (const char *token, size_t token_len, char **columns, int ncols) {
2458 28 : for (int i = 0; i < ncols; ++i) {
2459 24 : if (strlen(columns[i]) == token_len && strncmp(token, columns[i], token_len) == 0)
2460 4 : return true;
2461 20 : }
2462 4 : return false;
2463 8 : }
2464 :
2465 : // Helper: check if character is part of a SQL identifier.
2466 56 : static bool filter_is_ident_char (char c) {
2467 92 : return (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') ||
2468 36 : (c >= '0' && c <= '9') || c == '_';
2469 : }
2470 :
2471 4 : char *cloudsync_filter_add_row_prefix (const char *filter, const char *prefix, char **columns, int ncols) {
2472 4 : if (!filter || !prefix || !columns || ncols <= 0) return NULL;
2473 :
2474 4 : size_t filter_len = strlen(filter);
2475 4 : size_t prefix_len = strlen(prefix);
2476 :
2477 : // Each identifier match grows by at most (prefix_len + 3) bytes.
2478 : // Worst case: the entire filter is one repeated column reference separated by
2479 : // single characters, so up to (filter_len / 2) matches. Use a safe upper bound.
2480 4 : size_t max_growth = (filter_len / 2 + 1) * (prefix_len + 3);
2481 4 : size_t cap = filter_len + max_growth + 64;
2482 4 : char *result = (char *)cloudsync_memory_alloc(cap);
2483 4 : if (!result) return NULL;
2484 4 : size_t out = 0;
2485 :
2486 : // Single pass: tokenize into identifiers, quoted strings, and everything else.
2487 4 : size_t i = 0;
2488 24 : while (i < filter_len) {
2489 : // Skip single-quoted string literals verbatim (handle '' escape)
2490 20 : if (filter[i] == '\'') {
2491 0 : result[out++] = filter[i++];
2492 0 : while (i < filter_len) {
2493 0 : if (filter[i] == '\'') {
2494 0 : result[out++] = filter[i++];
2495 : // '' is an escaped quote — keep going
2496 0 : if (i < filter_len && filter[i] == '\'') {
2497 0 : result[out++] = filter[i++];
2498 0 : continue;
2499 : }
2500 0 : break; // single ' ends the literal
2501 : }
2502 0 : result[out++] = filter[i++];
2503 : }
2504 0 : continue;
2505 : }
2506 :
2507 : // Extract identifier token
2508 20 : if (filter_is_ident_char(filter[i])) {
2509 8 : size_t start = i;
2510 40 : while (i < filter_len && filter_is_ident_char(filter[i])) ++i;
2511 8 : size_t token_len = i - start;
2512 :
2513 8 : if (filter_is_column(&filter[start], token_len, columns, ncols)) {
2514 : // Emit PREFIX."column_name"
2515 4 : memcpy(&result[out], prefix, prefix_len); out += prefix_len;
2516 4 : result[out++] = '.';
2517 4 : result[out++] = '"';
2518 4 : memcpy(&result[out], &filter[start], token_len); out += token_len;
2519 4 : result[out++] = '"';
2520 4 : } else {
2521 : // Not a column — copy as-is
2522 4 : memcpy(&result[out], &filter[start], token_len); out += token_len;
2523 : }
2524 8 : continue;
2525 : }
2526 :
2527 : // Any other character — copy as-is
2528 12 : result[out++] = filter[i++];
2529 : }
2530 :
2531 4 : result[out] = '\0';
2532 4 : return result;
2533 4 : }
2534 :
2535 243 : int cloudsync_refill_metatable (cloudsync_context *data, const char *table_name) {
2536 243 : cloudsync_table_context *table = table_lookup(data, table_name);
2537 243 : if (!table) return DBRES_ERROR;
2538 :
2539 243 : dbvm_t *vm = NULL;
2540 243 : int64_t db_version = cloudsync_dbversion_next(data, CLOUDSYNC_VALUE_NOTSET);
2541 :
2542 : // Read row-level filter from settings (if any)
2543 : char filter_buf[2048];
2544 243 : int frc = dbutils_table_settings_get_value(data, table_name, "*", "filter", filter_buf, sizeof(filter_buf));
2545 243 : const char *filter = (frc == DBRES_OK && filter_buf[0]) ? filter_buf : NULL;
2546 :
2547 243 : const char *schema = table->schema ? table->schema : "";
2548 243 : char *sql = sql_build_pk_collist_query(schema, table_name);
2549 243 : char *pkclause_identifiers = NULL;
2550 243 : int rc = database_select_text(data, sql, &pkclause_identifiers);
2551 243 : cloudsync_memory_free(sql);
2552 243 : if (rc != DBRES_OK) goto finalize;
2553 243 : char *pkvalues_identifiers = (pkclause_identifiers) ? pkclause_identifiers : "rowid";
2554 :
2555 : // Use database-specific query builder to handle type differences in composite PKs
2556 243 : sql = sql_build_insert_missing_pks_query(schema, table_name, pkvalues_identifiers, table->base_ref, table->meta_ref, filter);
2557 243 : if (!sql) {rc = DBRES_NOMEM; goto finalize;}
2558 243 : rc = database_exec(data, sql);
2559 243 : cloudsync_memory_free(sql);
2560 243 : if (rc != DBRES_OK) goto finalize;
2561 :
2562 : // fill missing colums
2563 : // for each non-pk column:
2564 : // The new query does 1 encode per source row and one indexed NOT-EXISTS probe.
2565 : // The old plan does many decodes per candidate and can't use an index to rule out matches quickly—so it burns CPU and I/O.
2566 :
2567 243 : if (filter) {
2568 0 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_PKS_NOT_IN_SYNC_FOR_COL_FILTERED, pkvalues_identifiers, table->base_ref, filter, table->meta_ref);
2569 0 : } else {
2570 243 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_PKS_NOT_IN_SYNC_FOR_COL, pkvalues_identifiers, table->base_ref, table->meta_ref);
2571 : }
2572 243 : rc = databasevm_prepare(data, sql, (void **)&vm, DBFLAG_PERSISTENT);
2573 243 : cloudsync_memory_free(sql);
2574 243 : if (rc != DBRES_OK) goto finalize;
2575 :
2576 1270 : for (int i=0; i<table->ncols; ++i) {
2577 1027 : char *col_name = table->col_name[i];
2578 :
2579 1027 : rc = databasevm_bind_text(vm, 1, col_name, -1);
2580 1027 : if (rc != DBRES_OK) goto finalize;
2581 :
2582 1065 : while (1) {
2583 1065 : rc = databasevm_step(vm);
2584 1065 : if (rc == DBRES_ROW) {
2585 38 : size_t pklen = 0;
2586 38 : const void *pk = (const char *)database_column_blob(vm, 0, &pklen);
2587 38 : if (!pk) { rc = DBRES_ERROR; break; }
2588 38 : rc = local_mark_insert_or_update_meta(table, pk, pklen, col_name, db_version, cloudsync_bumpseq(data));
2589 1065 : } else if (rc == DBRES_DONE) {
2590 1027 : rc = DBRES_OK;
2591 1027 : break;
2592 : } else {
2593 0 : break;
2594 : }
2595 : }
2596 1027 : if (rc != DBRES_OK) goto finalize;
2597 :
2598 1027 : databasevm_reset(vm);
2599 1270 : }
2600 :
2601 : finalize:
2602 243 : if (rc != DBRES_OK) {DEBUG_ALWAYS("cloudsync_refill_metatable error: %s", database_errmsg(data));}
2603 243 : if (pkclause_identifiers) cloudsync_memory_free(pkclause_identifiers);
2604 243 : if (vm) databasevm_finalize(vm);
2605 243 : return rc;
2606 243 : }
2607 :
2608 : // MARK: - Local -
2609 :
2610 3 : int local_update_sentinel (cloudsync_table_context *table, const void *pk, size_t pklen, int64_t db_version, int seq) {
2611 3 : dbvm_t *vm = table->meta_sentinel_update_stmt;
2612 3 : if (!vm) return -1;
2613 :
2614 3 : int rc = databasevm_bind_int(vm, 1, db_version);
2615 3 : if (rc != DBRES_OK) goto cleanup;
2616 :
2617 3 : rc = databasevm_bind_int(vm, 2, seq);
2618 3 : if (rc != DBRES_OK) goto cleanup;
2619 :
2620 3 : rc = databasevm_bind_blob(vm, 3, pk, (int)pklen);
2621 3 : if (rc != DBRES_OK) goto cleanup;
2622 :
2623 3 : rc = databasevm_step(vm);
2624 3 : if (rc == DBRES_DONE) rc = DBRES_OK;
2625 :
2626 : cleanup:
2627 3 : DEBUG_DBERROR(rc, "local_update_sentinel", table->context);
2628 3 : databasevm_reset(vm);
2629 3 : return rc;
2630 3 : }
2631 :
2632 128 : int local_mark_insert_sentinel_meta (cloudsync_table_context *table, const void *pk, size_t pklen, int64_t db_version, int seq) {
2633 128 : dbvm_t *vm = table->meta_sentinel_insert_stmt;
2634 128 : if (!vm) return -1;
2635 :
2636 128 : int rc = databasevm_bind_blob(vm, 1, pk, (int)pklen);
2637 128 : if (rc != DBRES_OK) goto cleanup;
2638 :
2639 128 : rc = databasevm_bind_int(vm, 2, db_version);
2640 128 : if (rc != DBRES_OK) goto cleanup;
2641 :
2642 128 : rc = databasevm_bind_int(vm, 3, seq);
2643 128 : if (rc != DBRES_OK) goto cleanup;
2644 :
2645 128 : rc = databasevm_bind_int(vm, 4, db_version);
2646 128 : if (rc != DBRES_OK) goto cleanup;
2647 :
2648 128 : rc = databasevm_bind_int(vm, 5, seq);
2649 128 : if (rc != DBRES_OK) goto cleanup;
2650 :
2651 128 : rc = databasevm_step(vm);
2652 128 : if (rc == DBRES_DONE) rc = DBRES_OK;
2653 :
2654 : cleanup:
2655 128 : DEBUG_DBERROR(rc, "local_insert_sentinel", table->context);
2656 128 : databasevm_reset(vm);
2657 128 : return rc;
2658 128 : }
2659 :
2660 11742 : int local_mark_insert_or_update_meta_impl (cloudsync_table_context *table, const void *pk, size_t pklen, const char *col_name, int col_version, int64_t db_version, int seq) {
2661 :
2662 11742 : dbvm_t *vm = table->meta_row_insert_update_stmt;
2663 11742 : if (!vm) return -1;
2664 :
2665 11742 : int rc = databasevm_bind_blob(vm, 1, pk, pklen);
2666 11742 : if (rc != DBRES_OK) goto cleanup;
2667 :
2668 11742 : rc = databasevm_bind_text(vm, 2, (col_name) ? col_name : CLOUDSYNC_TOMBSTONE_VALUE, -1);
2669 11742 : if (rc != DBRES_OK) goto cleanup;
2670 :
2671 11742 : rc = databasevm_bind_int(vm, 3, col_version);
2672 11742 : if (rc != DBRES_OK) goto cleanup;
2673 :
2674 11742 : rc = databasevm_bind_int(vm, 4, db_version);
2675 11742 : if (rc != DBRES_OK) goto cleanup;
2676 :
2677 11742 : rc = databasevm_bind_int(vm, 5, seq);
2678 11742 : if (rc != DBRES_OK) goto cleanup;
2679 :
2680 11742 : rc = databasevm_bind_int(vm, 6, db_version);
2681 11742 : if (rc != DBRES_OK) goto cleanup;
2682 :
2683 11742 : rc = databasevm_bind_int(vm, 7, seq);
2684 11742 : if (rc != DBRES_OK) goto cleanup;
2685 :
2686 11742 : rc = databasevm_step(vm);
2687 11742 : if (rc == DBRES_DONE) rc = DBRES_OK;
2688 :
2689 : cleanup:
2690 11742 : DEBUG_DBERROR(rc, "local_insert_or_update", table->context);
2691 11742 : databasevm_reset(vm);
2692 11742 : return rc;
2693 11742 : }
2694 :
2695 11638 : int local_mark_insert_or_update_meta (cloudsync_table_context *table, const void *pk, size_t pklen, const char *col_name, int64_t db_version, int seq) {
2696 11638 : return local_mark_insert_or_update_meta_impl(table, pk, pklen, col_name, 1, db_version, seq);
2697 : }
2698 :
2699 39 : int local_mark_delete_block_meta (cloudsync_table_context *table, const void *pk, size_t pklen, const char *block_colname, int64_t db_version, int seq) {
2700 : // Mark a block as deleted by setting col_version = 2 (even = deleted)
2701 39 : return local_mark_insert_or_update_meta_impl(table, pk, pklen, block_colname, 2, db_version, seq);
2702 : }
2703 :
2704 39 : int block_delete_value_external (cloudsync_context *data, cloudsync_table_context *table, const void *pk, size_t pklen, const char *block_colname) {
2705 39 : return block_delete_value(data, table, pk, (int)pklen, block_colname);
2706 : }
2707 :
2708 65 : int local_mark_delete_meta (cloudsync_table_context *table, const void *pk, size_t pklen, int64_t db_version, int seq) {
2709 65 : return local_mark_insert_or_update_meta_impl(table, pk, pklen, NULL, 2, db_version, seq);
2710 : }
2711 :
2712 34 : int local_drop_meta (cloudsync_table_context *table, const void *pk, size_t pklen) {
2713 34 : dbvm_t *vm = table->meta_row_drop_stmt;
2714 34 : if (!vm) return -1;
2715 :
2716 34 : int rc = databasevm_bind_blob(vm, 1, pk, pklen);
2717 34 : if (rc != DBRES_OK) goto cleanup;
2718 :
2719 34 : rc = databasevm_step(vm);
2720 34 : if (rc == DBRES_DONE) rc = DBRES_OK;
2721 :
2722 : cleanup:
2723 34 : DEBUG_DBERROR(rc, "local_drop_meta", table->context);
2724 34 : databasevm_reset(vm);
2725 34 : return rc;
2726 34 : }
2727 :
2728 31 : int local_update_move_meta (cloudsync_table_context *table, const void *pk, size_t pklen, const void *pk2, size_t pklen2, int64_t db_version) {
2729 : /*
2730 : * This function moves non-sentinel metadata entries from an old primary key (OLD.pk)
2731 : * to a new primary key (NEW.pk) when a primary key change occurs.
2732 : *
2733 : * To ensure consistency and proper conflict resolution in a CRDT (Conflict-free Replicated Data Type) system,
2734 : * each non-sentinel metadata entry involved in the move must have a unique sequence value (seq).
2735 : *
2736 : * The `seq` is crucial for tracking the order of operations and for detecting and resolving conflicts
2737 : * during synchronization between replicas. Without a unique `seq` for each entry, concurrent updates
2738 : * may be applied incorrectly, leading to data inconsistency.
2739 : *
2740 : * When performing the update, a unique `seq` must be assigned to each metadata row. This can be achieved
2741 : * by either incrementing the maximum sequence value in the table or using a function (e.g., cloudsync_bumpseq(data))
2742 : * that generates a unique sequence for each row. The update query should ensure that each row moved
2743 : * from OLD.pk to NEW.pk gets a distinct `seq` to maintain proper versioning and ordering of changes.
2744 : */
2745 :
2746 : // see https://github.com/sqliteai/sqlite-sync/blob/main/docs/PriKey.md for more details
2747 : // pk2 is the old pk
2748 :
2749 31 : dbvm_t *vm = table->meta_update_move_stmt;
2750 31 : if (!vm) return -1;
2751 :
2752 : // new primary key
2753 31 : int rc = databasevm_bind_blob(vm, 1, pk, pklen);
2754 31 : if (rc != DBRES_OK) goto cleanup;
2755 :
2756 : // new db_version
2757 31 : rc = databasevm_bind_int(vm, 2, db_version);
2758 31 : if (rc != DBRES_OK) goto cleanup;
2759 :
2760 : // old primary key
2761 31 : rc = databasevm_bind_blob(vm, 3, pk2, pklen2);
2762 31 : if (rc != DBRES_OK) goto cleanup;
2763 :
2764 31 : rc = databasevm_step(vm);
2765 31 : if (rc == DBRES_DONE) rc = DBRES_OK;
2766 :
2767 : cleanup:
2768 31 : DEBUG_DBERROR(rc, "local_update_move_meta", table->context);
2769 31 : databasevm_reset(vm);
2770 31 : return rc;
2771 31 : }
2772 :
2773 : // MARK: - Payload Encode / Decode -
2774 :
2775 649 : static void cloudsync_payload_checksum_store (cloudsync_payload_header *header, uint64_t checksum) {
2776 649 : uint64_t h = checksum & 0xFFFFFFFFFFFFULL; // keep 48 bits
2777 649 : header->checksum[0] = (uint8_t)(h >> 40);
2778 649 : header->checksum[1] = (uint8_t)(h >> 32);
2779 649 : header->checksum[2] = (uint8_t)(h >> 24);
2780 649 : header->checksum[3] = (uint8_t)(h >> 16);
2781 649 : header->checksum[4] = (uint8_t)(h >> 8);
2782 649 : header->checksum[5] = (uint8_t)(h >> 0);
2783 649 : }
2784 :
2785 643 : static uint64_t cloudsync_payload_checksum_load (cloudsync_payload_header *header) {
2786 1929 : return ((uint64_t)header->checksum[0] << 40) |
2787 1286 : ((uint64_t)header->checksum[1] << 32) |
2788 1286 : ((uint64_t)header->checksum[2] << 24) |
2789 1286 : ((uint64_t)header->checksum[3] << 16) |
2790 1286 : ((uint64_t)header->checksum[4] << 8) |
2791 643 : ((uint64_t)header->checksum[5] << 0);
2792 : }
2793 :
2794 643 : static bool cloudsync_payload_checksum_verify (cloudsync_payload_header *header, uint64_t checksum) {
2795 643 : uint64_t checksum1 = cloudsync_payload_checksum_load(header);
2796 643 : uint64_t checksum2 = checksum & 0xFFFFFFFFFFFFULL;
2797 643 : return (checksum1 == checksum2);
2798 : }
2799 :
2800 49096 : static bool cloudsync_payload_encode_check (cloudsync_payload_context *payload, size_t needed) {
2801 49096 : if (payload->nrows == 0) needed += sizeof(cloudsync_payload_header);
2802 :
2803 : // alloc/resize buffer
2804 49096 : if (payload->bused + needed > payload->balloc) {
2805 658 : if (needed < CLOUDSYNC_PAYLOAD_MINBUF_SIZE) needed = CLOUDSYNC_PAYLOAD_MINBUF_SIZE;
2806 658 : size_t balloc = payload->balloc + needed;
2807 :
2808 658 : char *buffer = cloudsync_memory_realloc(payload->buffer, balloc);
2809 658 : if (!buffer) {
2810 0 : if (payload->buffer) cloudsync_memory_free(payload->buffer);
2811 0 : memset(payload, 0, sizeof(cloudsync_payload_context));
2812 0 : return false;
2813 : }
2814 :
2815 658 : payload->buffer = buffer;
2816 658 : payload->balloc = balloc;
2817 658 : if (payload->nrows == 0) payload->bused = sizeof(cloudsync_payload_header);
2818 658 : }
2819 :
2820 49096 : return true;
2821 49096 : }
2822 :
2823 50398 : size_t cloudsync_payload_context_size (size_t *header_size) {
2824 50398 : if (header_size) *header_size = sizeof(cloudsync_payload_header);
2825 50398 : return sizeof(cloudsync_payload_context);
2826 : }
2827 :
2828 649 : void cloudsync_payload_header_init (cloudsync_payload_header *header, uint32_t expanded_size, uint16_t ncols, uint32_t nrows, uint64_t hash) {
2829 649 : memset(header, 0, sizeof(cloudsync_payload_header));
2830 : assert(sizeof(cloudsync_payload_header)==32);
2831 :
2832 : int major, minor, patch;
2833 649 : sscanf(CLOUDSYNC_VERSION, "%d.%d.%d", &major, &minor, &patch);
2834 :
2835 649 : header->signature = htonl(CLOUDSYNC_PAYLOAD_SIGNATURE);
2836 649 : header->version = CLOUDSYNC_PAYLOAD_VERSION_2;
2837 649 : header->libversion[0] = (uint8_t)major;
2838 649 : header->libversion[1] = (uint8_t)minor;
2839 649 : header->libversion[2] = (uint8_t)patch;
2840 649 : header->expanded_size = htonl(expanded_size);
2841 649 : header->ncols = htons(ncols);
2842 649 : header->nrows = htonl(nrows);
2843 649 : header->schema_hash = htonll(hash);
2844 649 : }
2845 :
2846 49096 : int cloudsync_payload_encode_step (cloudsync_payload_context *payload, cloudsync_context *data, int argc, dbvalue_t **argv) {
2847 : DEBUG_FUNCTION("cloudsync_payload_encode_step");
2848 : // debug_values(argc, argv);
2849 :
2850 : // check if the step function is called for the first time
2851 49096 : if (payload->nrows == 0) payload->ncols = (uint16_t)argc;
2852 :
2853 49096 : size_t breq = pk_encode_size((dbvalue_t **)argv, argc, 0, data->skip_decode_idx);
2854 49096 : if (cloudsync_payload_encode_check(payload, breq) == false) {
2855 0 : return cloudsync_set_error(data, "Not enough memory to resize payload internal buffer", DBRES_NOMEM);
2856 : }
2857 :
2858 49096 : char *buffer = payload->buffer + payload->bused;
2859 49096 : size_t bsize = payload->balloc - payload->bused;
2860 49096 : char *p = pk_encode((dbvalue_t **)argv, argc, buffer, false, &bsize, data->skip_decode_idx);
2861 49096 : if (!p) return cloudsync_set_error(data, "An error occurred while encoding payload", DBRES_ERROR);
2862 :
2863 : // update buffer
2864 49096 : payload->bused += breq;
2865 :
2866 : // increment row counter
2867 49096 : ++payload->nrows;
2868 :
2869 49096 : return DBRES_OK;
2870 49096 : }
2871 :
2872 657 : int cloudsync_payload_encode_final (cloudsync_payload_context *payload, cloudsync_context *data) {
2873 : DEBUG_FUNCTION("cloudsync_payload_encode_final");
2874 :
2875 657 : if (payload->nrows == 0) {
2876 8 : if (payload->buffer) cloudsync_memory_free(payload->buffer);
2877 8 : payload->buffer = NULL;
2878 8 : payload->bsize = 0;
2879 8 : return DBRES_OK;
2880 : }
2881 :
2882 649 : if (payload->nrows > UINT32_MAX) {
2883 0 : if (payload->buffer) cloudsync_memory_free(payload->buffer);
2884 0 : payload->buffer = NULL;
2885 0 : payload->bsize = 0;
2886 0 : cloudsync_set_error(data, "Maximum number of payload rows reached", DBRES_ERROR);
2887 0 : return DBRES_ERROR;
2888 : }
2889 :
2890 : // sanity check about buffer size
2891 649 : int header_size = (int)sizeof(cloudsync_payload_header);
2892 649 : int64_t buffer_size = (int64_t)payload->bused - (int64_t)header_size;
2893 649 : if (buffer_size < 0) {
2894 0 : if (payload->buffer) cloudsync_memory_free(payload->buffer);
2895 0 : payload->buffer = NULL;
2896 0 : payload->bsize = 0;
2897 0 : cloudsync_set_error(data, "cloudsync_encode: internal size underflow", DBRES_ERROR);
2898 0 : return DBRES_ERROR;
2899 : }
2900 649 : if (buffer_size > INT_MAX) {
2901 0 : if (payload->buffer) cloudsync_memory_free(payload->buffer);
2902 0 : payload->buffer = NULL;
2903 0 : payload->bsize = 0;
2904 0 : cloudsync_set_error(data, "cloudsync_encode: payload too large to compress (INT_MAX limit)", DBRES_ERROR);
2905 0 : return DBRES_ERROR;
2906 : }
2907 : // try to allocate buffer used for compressed data
2908 649 : int real_buffer_size = (int)buffer_size;
2909 649 : int zbound = LZ4_compressBound(real_buffer_size);
2910 649 : char *zbuffer = cloudsync_memory_alloc(zbound + header_size); // if for some reasons allocation fails then just skip compression
2911 :
2912 : // skip the reserved header from the buffer to compress
2913 649 : char *src_buffer = payload->buffer + sizeof(cloudsync_payload_header);
2914 649 : int zused = (zbuffer) ? LZ4_compress_default(src_buffer, zbuffer+header_size, real_buffer_size, zbound) : 0;
2915 649 : bool use_uncompressed_buffer = (!zused || zused > real_buffer_size);
2916 649 : CHECK_FORCE_UNCOMPRESSED_BUFFER();
2917 :
2918 : // setup payload header
2919 649 : cloudsync_payload_header header = {0};
2920 649 : uint32_t expanded_size = (use_uncompressed_buffer) ? 0 : real_buffer_size;
2921 649 : cloudsync_payload_header_init(&header, expanded_size, payload->ncols, (uint32_t)payload->nrows, data->schema_hash);
2922 :
2923 : // if compression fails or if compressed size is bigger than original buffer, then use the uncompressed buffer
2924 649 : if (use_uncompressed_buffer) {
2925 2 : if (zbuffer) cloudsync_memory_free(zbuffer);
2926 2 : zbuffer = payload->buffer;
2927 2 : zused = real_buffer_size;
2928 2 : }
2929 :
2930 : // compute checksum of the buffer
2931 649 : uint64_t checksum = pk_checksum(zbuffer + header_size, zused);
2932 649 : cloudsync_payload_checksum_store(&header, checksum);
2933 :
2934 : // copy header and data to SQLite BLOB
2935 649 : memcpy(zbuffer, &header, sizeof(cloudsync_payload_header));
2936 649 : int blob_size = zused + sizeof(cloudsync_payload_header);
2937 649 : payload->bsize = blob_size;
2938 :
2939 : // cleanup memory
2940 649 : if (zbuffer != payload->buffer) {
2941 647 : cloudsync_memory_free (payload->buffer);
2942 647 : payload->buffer = zbuffer;
2943 647 : }
2944 :
2945 649 : return DBRES_OK;
2946 657 : }
2947 :
2948 657 : char *cloudsync_payload_blob (cloudsync_payload_context *payload, int64_t *blob_size, int64_t *nrows) {
2949 : DEBUG_FUNCTION("cloudsync_payload_blob");
2950 :
2951 657 : if (blob_size) *blob_size = (int64_t)payload->bsize;
2952 657 : if (nrows) *nrows = (int64_t)payload->nrows;
2953 657 : return payload->buffer;
2954 : }
2955 :
2956 441144 : static int cloudsync_payload_decode_callback (void *xdata, int index, int type, int64_t ival, double dval, char *pval) {
2957 441144 : cloudsync_pk_decode_bind_context *decode_context = (cloudsync_pk_decode_bind_context*)xdata;
2958 441144 : int rc = pk_decode_bind_callback(decode_context->vm, index, type, ival, dval, pval);
2959 :
2960 441144 : if (rc == DBRES_OK) {
2961 : // the dbversion index is smaller than seq index, so it is processed first
2962 : // when processing the dbversion column: save the value to the tmp_dbversion field
2963 : // when processing the seq column: update the dbversion and seq fields only if the current dbversion is greater than the last max value
2964 441144 : switch (index) {
2965 : case CLOUDSYNC_PK_INDEX_TBL:
2966 49016 : if (type == DBTYPE_TEXT) {
2967 49016 : decode_context->tbl = pval;
2968 49016 : decode_context->tbl_len = ival;
2969 49016 : }
2970 49016 : break;
2971 : case CLOUDSYNC_PK_INDEX_PK:
2972 49016 : if (type == DBTYPE_BLOB) {
2973 49016 : decode_context->pk = pval;
2974 49016 : decode_context->pk_len = ival;
2975 49016 : }
2976 49016 : break;
2977 : case CLOUDSYNC_PK_INDEX_COLNAME:
2978 49016 : if (type == DBTYPE_TEXT) {
2979 49016 : decode_context->col_name = pval;
2980 49016 : decode_context->col_name_len = ival;
2981 49016 : }
2982 49016 : break;
2983 : case CLOUDSYNC_PK_INDEX_COLVERSION:
2984 49016 : if (type == DBTYPE_INTEGER) decode_context->col_version = ival;
2985 49016 : break;
2986 : case CLOUDSYNC_PK_INDEX_DBVERSION:
2987 49016 : if (type == DBTYPE_INTEGER) decode_context->db_version = ival;
2988 49016 : break;
2989 : case CLOUDSYNC_PK_INDEX_SITEID:
2990 49016 : if (type == DBTYPE_BLOB) {
2991 49016 : decode_context->site_id = pval;
2992 49016 : decode_context->site_id_len = ival;
2993 49016 : }
2994 49016 : break;
2995 : case CLOUDSYNC_PK_INDEX_CL:
2996 49016 : if (type == DBTYPE_INTEGER) decode_context->cl = ival;
2997 49016 : break;
2998 : case CLOUDSYNC_PK_INDEX_SEQ:
2999 49016 : if (type == DBTYPE_INTEGER) decode_context->seq = ival;
3000 49016 : break;
3001 : }
3002 441144 : }
3003 :
3004 441144 : return rc;
3005 : }
3006 :
3007 : // #ifndef CLOUDSYNC_OMIT_RLS_VALIDATION
3008 :
3009 645 : int cloudsync_payload_apply (cloudsync_context *data, const char *payload, int blen, int *pnrows) {
3010 : // sanity check
3011 645 : if (blen < (int)sizeof(cloudsync_payload_header)) return cloudsync_set_error(data, "Error on cloudsync_payload_apply: invalid payload length", DBRES_MISUSE);
3012 :
3013 : // decode header
3014 : cloudsync_payload_header header;
3015 645 : memcpy(&header, payload, sizeof(cloudsync_payload_header));
3016 :
3017 645 : header.signature = ntohl(header.signature);
3018 645 : header.expanded_size = ntohl(header.expanded_size);
3019 645 : header.ncols = ntohs(header.ncols);
3020 645 : header.nrows = ntohl(header.nrows);
3021 645 : header.schema_hash = ntohll(header.schema_hash);
3022 :
3023 : // compare schema_hash only if not disabled and if the received payload was created with the current header version
3024 : // to avoid schema hash mismatch when processed by a peer with a different extension version during software updates.
3025 645 : if (dbutils_settings_get_int64_value(data, CLOUDSYNC_KEY_SKIP_SCHEMA_HASH_CHECK) == 0 && header.version == CLOUDSYNC_PAYLOAD_VERSION_LATEST ) {
3026 645 : if (header.schema_hash != data->schema_hash) {
3027 3 : if (!database_check_schema_hash(data, header.schema_hash)) {
3028 : char buffer[1024];
3029 2 : snprintf(buffer, sizeof(buffer), "Cannot apply the received payload because the schema hash is unknown %llu.", header.schema_hash);
3030 2 : return cloudsync_set_error(data, buffer, DBRES_MISUSE);
3031 : }
3032 1 : }
3033 643 : }
3034 :
3035 : // sanity check header
3036 643 : if ((header.signature != CLOUDSYNC_PAYLOAD_SIGNATURE) || (header.ncols == 0)) {
3037 0 : return cloudsync_set_error(data, "Error on cloudsync_payload_apply: invalid signature or column size", DBRES_MISUSE);
3038 : }
3039 :
3040 643 : const char *buffer = payload + sizeof(cloudsync_payload_header);
3041 643 : size_t buf_len = (size_t)blen - sizeof(cloudsync_payload_header);
3042 :
3043 : // sanity check checksum (only if version is >= 2)
3044 643 : if (header.version >= CLOUDSYNC_PAYLOAD_MIN_VERSION_WITH_CHECKSUM) {
3045 643 : uint64_t checksum = pk_checksum(buffer, buf_len);
3046 643 : if (cloudsync_payload_checksum_verify(&header, checksum) == false) {
3047 0 : return cloudsync_set_error(data, "Error on cloudsync_payload_apply: invalid checksum", DBRES_MISUSE);
3048 : }
3049 643 : }
3050 :
3051 : // check if payload is compressed
3052 643 : char *clone = NULL;
3053 643 : if (header.expanded_size != 0) {
3054 641 : clone = (char *)cloudsync_memory_alloc(header.expanded_size);
3055 641 : if (!clone) return cloudsync_set_error(data, "Unable to allocate memory to uncompress payload", DBRES_NOMEM);
3056 :
3057 641 : int lz4_rc = LZ4_decompress_safe(buffer, clone, (int)buf_len, (int)header.expanded_size);
3058 641 : if (lz4_rc <= 0 || (uint32_t)lz4_rc != header.expanded_size) {
3059 0 : if (clone) cloudsync_memory_free(clone);
3060 0 : return cloudsync_set_error(data, "Error on cloudsync_payload_apply: unable to decompress BLOB", DBRES_MISUSE);
3061 : }
3062 :
3063 641 : buffer = (const char *)clone;
3064 641 : buf_len = (size_t)header.expanded_size;
3065 641 : }
3066 :
3067 : // precompile the insert statement
3068 643 : dbvm_t *vm = NULL;
3069 643 : int rc = databasevm_prepare(data, SQL_CHANGES_INSERT_ROW, &vm, 0);
3070 643 : if (rc != DBRES_OK) {
3071 0 : if (clone) cloudsync_memory_free(clone);
3072 0 : return cloudsync_set_error(data, "Error on cloudsync_payload_apply: error while compiling SQL statement", rc);
3073 : }
3074 :
3075 : // process buffer, one row at a time
3076 643 : uint16_t ncols = header.ncols;
3077 643 : uint32_t nrows = header.nrows;
3078 643 : int64_t last_payload_db_version = -1;
3079 643 : int dbversion = dbutils_settings_get_int_value(data, CLOUDSYNC_KEY_CHECK_DBVERSION);
3080 643 : int seq = dbutils_settings_get_int_value(data, CLOUDSYNC_KEY_CHECK_SEQ);
3081 643 : cloudsync_pk_decode_bind_context decoded_context = {.vm = vm};
3082 :
3083 : // Initialize deferred column-batch merge
3084 643 : merge_pending_batch batch = {0};
3085 643 : data->pending_batch = &batch;
3086 643 : bool in_savepoint = false;
3087 643 : const void *last_pk = NULL;
3088 643 : int64_t last_pk_len = 0;
3089 643 : const char *last_tbl = NULL;
3090 643 : int64_t last_tbl_len = 0;
3091 :
3092 49659 : for (uint32_t i=0; i<nrows; ++i) {
3093 49016 : size_t seek = 0;
3094 49016 : int res = pk_decode((char *)buffer, buf_len, ncols, &seek, data->skip_decode_idx, cloudsync_payload_decode_callback, &decoded_context);
3095 49016 : if (res == -1) {
3096 0 : merge_flush_pending(data);
3097 0 : data->pending_batch = NULL;
3098 0 : if (batch.cached_vm) { databasevm_finalize(batch.cached_vm); batch.cached_vm = NULL; }
3099 0 : if (batch.cached_col_names) { cloudsync_memory_free(batch.cached_col_names); batch.cached_col_names = NULL; }
3100 0 : if (batch.entries) { cloudsync_memory_free(batch.entries); batch.entries = NULL; }
3101 0 : if (in_savepoint) database_rollback_savepoint(data, "cloudsync_payload_apply");
3102 0 : rc = DBRES_ERROR;
3103 0 : goto cleanup;
3104 : }
3105 :
3106 : // Detect PK/table/db_version boundary to flush pending batch
3107 97389 : bool pk_changed = (last_pk != NULL &&
3108 48373 : (last_pk_len != decoded_context.pk_len ||
3109 46452 : memcmp(last_pk, decoded_context.pk, last_pk_len) != 0));
3110 97389 : bool tbl_changed = (last_tbl != NULL &&
3111 48373 : (last_tbl_len != decoded_context.tbl_len ||
3112 48303 : memcmp(last_tbl, decoded_context.tbl, last_tbl_len) != 0));
3113 49016 : bool db_version_changed = (last_payload_db_version != decoded_context.db_version);
3114 :
3115 : // Flush pending batch before any boundary change
3116 49016 : if (pk_changed || tbl_changed || db_version_changed) {
3117 18158 : int flush_rc = merge_flush_pending(data);
3118 18158 : if (flush_rc != DBRES_OK) {
3119 1 : rc = flush_rc;
3120 : // continue processing remaining rows
3121 1 : }
3122 18158 : }
3123 :
3124 : // Per-db_version savepoints group rows with the same source db_version
3125 : // into one transaction. In SQLite autocommit mode, the RELEASE triggers
3126 : // the commit hook which bumps data->db_version and resets seq, ensuring
3127 : // unique (db_version, seq) tuples across groups. In PostgreSQL SPI,
3128 : // database_in_transaction() is always true so this block is inactive —
3129 : // the inner per-PK savepoint in merge_flush_pending handles RLS instead.
3130 49016 : if (in_savepoint && db_version_changed) {
3131 4255 : rc = database_commit_savepoint(data, "cloudsync_payload_apply");
3132 4255 : if (rc != DBRES_OK) {
3133 0 : merge_pending_free_entries(&batch);
3134 0 : data->pending_batch = NULL;
3135 0 : cloudsync_set_error(data, "Error on cloudsync_payload_apply: unable to release a savepoint", rc);
3136 0 : goto cleanup;
3137 : }
3138 4255 : in_savepoint = false;
3139 4255 : }
3140 :
3141 49016 : if (!in_savepoint && db_version_changed && !database_in_transaction(data)) {
3142 4898 : rc = database_begin_savepoint(data, "cloudsync_payload_apply");
3143 4898 : if (rc != DBRES_OK) {
3144 0 : merge_pending_free_entries(&batch);
3145 0 : data->pending_batch = NULL;
3146 0 : cloudsync_set_error(data, "Error on cloudsync_payload_apply: unable to start a transaction", rc);
3147 0 : goto cleanup;
3148 : }
3149 4898 : in_savepoint = true;
3150 4898 : }
3151 :
3152 : // Track db_version for batch-flush boundary detection
3153 49016 : if (db_version_changed) {
3154 4898 : last_payload_db_version = decoded_context.db_version;
3155 4898 : }
3156 :
3157 : // Update PK/table tracking
3158 49016 : last_pk = decoded_context.pk;
3159 49016 : last_pk_len = decoded_context.pk_len;
3160 49016 : last_tbl = decoded_context.tbl;
3161 49016 : last_tbl_len = decoded_context.tbl_len;
3162 :
3163 49016 : rc = databasevm_step(vm);
3164 49016 : if (rc != DBRES_DONE) {
3165 : // don't "break;", the error can be due to a RLS policy.
3166 : // in case of error we try to apply the following changes
3167 0 : }
3168 :
3169 49016 : buffer += seek;
3170 49016 : buf_len -= seek;
3171 49016 : dbvm_reset(vm);
3172 49016 : }
3173 :
3174 : // Final flush after loop
3175 : {
3176 643 : int flush_rc = merge_flush_pending(data);
3177 643 : if (flush_rc != DBRES_OK && rc == DBRES_OK) rc = flush_rc;
3178 : }
3179 643 : data->pending_batch = NULL;
3180 :
3181 643 : if (in_savepoint) {
3182 643 : int rc1 = database_commit_savepoint(data, "cloudsync_payload_apply");
3183 643 : if (rc1 != DBRES_OK) rc = rc1;
3184 643 : }
3185 :
3186 : // save last error (unused if function returns OK)
3187 643 : if (rc != DBRES_OK && rc != DBRES_DONE) {
3188 0 : cloudsync_set_dberror(data);
3189 0 : }
3190 :
3191 643 : if (rc == DBRES_DONE) rc = DBRES_OK;
3192 1286 : if (rc == DBRES_OK) {
3193 : char buf[256];
3194 643 : if (decoded_context.db_version >= dbversion) {
3195 510 : snprintf(buf, sizeof(buf), "%" PRId64, decoded_context.db_version);
3196 510 : dbutils_settings_set_key_value(data, CLOUDSYNC_KEY_CHECK_DBVERSION, buf);
3197 :
3198 510 : if (decoded_context.seq != seq) {
3199 327 : snprintf(buf, sizeof(buf), "%" PRId64, decoded_context.seq);
3200 327 : dbutils_settings_set_key_value(data, CLOUDSYNC_KEY_CHECK_SEQ, buf);
3201 327 : }
3202 510 : }
3203 643 : }
3204 :
3205 : cleanup:
3206 : // cleanup merge_pending_batch
3207 643 : if (batch.cached_vm) { databasevm_finalize(batch.cached_vm); batch.cached_vm = NULL; }
3208 643 : if (batch.cached_col_names) { cloudsync_memory_free(batch.cached_col_names); batch.cached_col_names = NULL; }
3209 643 : if (batch.entries) { cloudsync_memory_free(batch.entries); batch.entries = NULL; }
3210 :
3211 : // cleanup vm
3212 643 : if (vm) databasevm_finalize(vm);
3213 :
3214 : // cleanup memory
3215 643 : if (clone) cloudsync_memory_free(clone);
3216 :
3217 : // error already saved in (save last error)
3218 643 : if (rc != DBRES_OK) return rc;
3219 :
3220 : // return the number of processed rows
3221 643 : if (pnrows) *pnrows = nrows;
3222 643 : return DBRES_OK;
3223 645 : }
3224 :
3225 : // MARK: - Payload load/store -
3226 :
3227 0 : int cloudsync_payload_get (cloudsync_context *data, char **blob, int *blob_size, int *db_version, int64_t *new_db_version) {
3228 : // retrieve current db_version and seq
3229 0 : *db_version = dbutils_settings_get_int_value(data, CLOUDSYNC_KEY_SEND_DBVERSION);
3230 0 : if (*db_version < 0) return DBRES_ERROR;
3231 :
3232 : // retrieve BLOB
3233 : char sql[1024];
3234 0 : snprintf(sql, sizeof(sql), "WITH max_db_version AS (SELECT MAX(db_version) AS max_db_version FROM cloudsync_changes WHERE site_id=cloudsync_siteid()) "
3235 : "SELECT * FROM (SELECT cloudsync_payload_encode(tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq) AS payload, max_db_version AS max_db_version FROM cloudsync_changes, max_db_version WHERE site_id=cloudsync_siteid() AND db_version>%d) WHERE payload IS NOT NULL", *db_version);
3236 :
3237 0 : int64_t len = 0;
3238 0 : int rc = database_select_blob_int(data, sql, blob, &len, new_db_version);
3239 0 : *blob_size = (int)len;
3240 0 : if (rc != DBRES_OK) return rc;
3241 :
3242 : // exit if there is no data to send
3243 0 : if (*blob == NULL || *blob_size == 0) return DBRES_OK;
3244 0 : return rc;
3245 0 : }
3246 :
3247 : #ifdef CLOUDSYNC_DESKTOP_OS
3248 0 : int cloudsync_payload_save (cloudsync_context *data, const char *payload_path, int *size) {
3249 : DEBUG_FUNCTION("cloudsync_payload_save");
3250 :
3251 : // silently delete any other payload with the same name
3252 0 : cloudsync_file_delete(payload_path);
3253 :
3254 : // retrieve payload
3255 0 : char *blob = NULL;
3256 0 : int blob_size = 0, db_version = 0;
3257 0 : int64_t new_db_version = 0;
3258 0 : int rc = cloudsync_payload_get(data, &blob, &blob_size, &db_version, &new_db_version);
3259 0 : if (rc != DBRES_OK) {
3260 0 : if (db_version < 0) return cloudsync_set_error(data, "Unable to retrieve db_version", rc);
3261 0 : return cloudsync_set_error(data, "Unable to retrieve changes in cloudsync_payload_save", rc);
3262 : }
3263 :
3264 : // exit if there is no data to save
3265 0 : if (blob == NULL || blob_size == 0) {
3266 0 : if (size) *size = 0;
3267 0 : return DBRES_OK;
3268 : }
3269 :
3270 : // write payload to file
3271 0 : bool res = cloudsync_file_write(payload_path, blob, (size_t)blob_size);
3272 0 : cloudsync_memory_free(blob);
3273 0 : if (res == false) {
3274 0 : return cloudsync_set_error(data, "Unable to write payload to file path", DBRES_IOERR);
3275 : }
3276 :
3277 : // returns blob size
3278 0 : if (size) *size = blob_size;
3279 0 : return DBRES_OK;
3280 0 : }
3281 : #endif
3282 :
3283 : // MARK: - Core -
3284 :
3285 251 : int cloudsync_table_sanity_check (cloudsync_context *data, const char *name, bool skip_int_pk_check) {
3286 : DEBUG_DBFUNCTION("cloudsync_table_sanity_check %s", name);
3287 : char buffer[2048];
3288 :
3289 : // sanity check table name
3290 251 : if (name == NULL) {
3291 1 : return cloudsync_set_error(data, "cloudsync_init requires a non-null table parameter", DBRES_ERROR);
3292 : }
3293 :
3294 : // avoid allocating heap memory for SQL statements by setting a maximum length of 512 characters
3295 : // for table names. This limit is reasonable and helps prevent memory management issues.
3296 250 : const size_t maxlen = CLOUDSYNC_MAX_TABLENAME_LEN;
3297 250 : if (strlen(name) > maxlen) {
3298 1 : snprintf(buffer, sizeof(buffer), "Table name cannot be longer than %d characters", (int)maxlen);
3299 1 : return cloudsync_set_error(data, buffer, DBRES_ERROR);
3300 : }
3301 :
3302 : // check if already initialized
3303 249 : cloudsync_table_context *table = table_lookup(data, name);
3304 249 : if (table) return DBRES_OK;
3305 :
3306 : // check if table exists
3307 246 : if (database_table_exists(data, name, cloudsync_schema(data)) == false) {
3308 2 : snprintf(buffer, sizeof(buffer), "Table %s does not exist", name);
3309 2 : return cloudsync_set_error(data, buffer, DBRES_ERROR);
3310 : }
3311 :
3312 : // no more than 128 columns can be used as a composite primary key (SQLite hard limit)
3313 244 : int npri_keys = database_count_pk(data, name, false, cloudsync_schema(data));
3314 244 : if (npri_keys < 0) return cloudsync_set_dberror(data);
3315 244 : if (npri_keys > 128) return cloudsync_set_error(data, "No more than 128 columns can be used to form a composite primary key", DBRES_ERROR);
3316 :
3317 : #if CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
3318 : // if count == 0 means that rowid will be used as primary key (BTW: very bad choice for the user)
3319 244 : if (npri_keys == 0) {
3320 1 : snprintf(buffer, sizeof(buffer), "Rowid only tables are not supported, all primary keys must be explicitly set and declared as NOT NULL (table %s)", name);
3321 1 : return cloudsync_set_error(data, buffer, DBRES_ERROR);
3322 : }
3323 : #endif
3324 :
3325 243 : if (!skip_int_pk_check) {
3326 182 : if (npri_keys == 1) {
3327 : // the affinity of a column is determined by the declared type of the column,
3328 : // according to the following rules in the order shown:
3329 : // 1. If the declared type contains the string "INT" then it is assigned INTEGER affinity.
3330 100 : int npri_keys_int = database_count_int_pk(data, name, cloudsync_schema(data));
3331 100 : if (npri_keys_int < 0) return cloudsync_set_dberror(data);
3332 100 : if (npri_keys == npri_keys_int) {
3333 1 : snprintf(buffer, sizeof(buffer), "Table %s uses a single-column INTEGER primary key. For CRDT replication, primary keys must be globally unique. Consider using a TEXT primary key with UUIDs or ULID to avoid conflicts across nodes. If you understand the risk and still want to use this INTEGER primary key, set the third argument of the cloudsync_init function to 1 to skip this check.", name);
3334 1 : return cloudsync_set_error(data, buffer, DBRES_ERROR);
3335 : }
3336 :
3337 99 : }
3338 181 : }
3339 :
3340 : // if user declared explicit primary key(s) then make sure they are all declared as NOT NULL
3341 : #if CLOUDSYNC_CHECK_NOTNULL_PRIKEYS
3342 : if (npri_keys > 0) {
3343 : int npri_keys_notnull = database_count_pk(data, name, true, cloudsync_schema(data));
3344 : if (npri_keys_notnull < 0) return cloudsync_set_dberror(data);
3345 : if (npri_keys != npri_keys_notnull) {
3346 : snprintf(buffer, sizeof(buffer), "All primary keys must be explicitly declared as NOT NULL (table %s)", name);
3347 : return cloudsync_set_error(data, buffer, DBRES_ERROR);
3348 : }
3349 : }
3350 : #endif
3351 :
3352 : // check for columns declared as NOT NULL without a DEFAULT value.
3353 : // Otherwise, col_merge_stmt would fail if changes to other columns are inserted first.
3354 242 : int n_notnull_nodefault = database_count_notnull_without_default(data, name, cloudsync_schema(data));
3355 242 : if (n_notnull_nodefault < 0) return cloudsync_set_dberror(data);
3356 242 : if (n_notnull_nodefault > 0) {
3357 0 : snprintf(buffer, sizeof(buffer), "All non-primary key columns declared as NOT NULL must have a DEFAULT value. (table %s)", name);
3358 0 : return cloudsync_set_error(data, buffer, DBRES_ERROR);
3359 : }
3360 :
3361 242 : return DBRES_OK;
3362 251 : }
3363 :
3364 3 : int cloudsync_cleanup_internal (cloudsync_context *data, cloudsync_table_context *table) {
3365 3 : if (cloudsync_context_init(data) == NULL) return DBRES_MISUSE;
3366 :
3367 : // drop meta-table
3368 3 : const char *table_name = table->name;
3369 3 : char *sql = cloudsync_memory_mprintf(SQL_DROP_CLOUDSYNC_TABLE, table->meta_ref);
3370 3 : int rc = database_exec(data, sql);
3371 3 : cloudsync_memory_free(sql);
3372 3 : if (rc != DBRES_OK) {
3373 : char buffer[1024];
3374 0 : snprintf(buffer, sizeof(buffer), "Unable to drop cloudsync table %s_cloudsync in cloudsync_cleanup", table_name);
3375 0 : return cloudsync_set_error(data, buffer, rc);
3376 : }
3377 :
3378 : // drop original triggers
3379 3 : rc = database_delete_triggers(data, table_name);
3380 3 : if (rc != DBRES_OK) {
3381 : char buffer[1024];
3382 0 : snprintf(buffer, sizeof(buffer), "Unable to delete triggers for table %s", table_name);
3383 0 : return cloudsync_set_error(data, buffer, rc);
3384 : }
3385 :
3386 : // remove all table related settings
3387 3 : dbutils_table_settings_set_key_value(data, table_name, NULL, NULL, NULL);
3388 3 : return DBRES_OK;
3389 3 : }
3390 :
3391 3 : int cloudsync_cleanup (cloudsync_context *data, const char *table_name) {
3392 3 : cloudsync_table_context *table = table_lookup(data, table_name);
3393 3 : if (!table) return DBRES_OK;
3394 :
3395 : // TODO: check what happen if cloudsync_cleanup_internal failes (not eveything dropped) and the table is still in memory?
3396 :
3397 3 : int rc = cloudsync_cleanup_internal(data, table);
3398 3 : if (rc != DBRES_OK) return rc;
3399 :
3400 3 : int counter = table_remove(data, table);
3401 3 : table_free(table);
3402 :
3403 3 : if (counter == 0) {
3404 : // cleanup database on last table
3405 1 : cloudsync_reset_siteid(data);
3406 1 : dbutils_settings_cleanup(data);
3407 1 : } else {
3408 2 : if (database_internal_table_exists(data, CLOUDSYNC_TABLE_SETTINGS_NAME) == true) {
3409 2 : cloudsync_update_schema_hash(data);
3410 2 : }
3411 : }
3412 :
3413 3 : return DBRES_OK;
3414 3 : }
3415 :
3416 0 : int cloudsync_cleanup_all (cloudsync_context *data) {
3417 0 : return database_cleanup(data);
3418 : }
3419 :
3420 402 : int cloudsync_terminate (cloudsync_context *data) {
3421 : // can't use for/loop here because data->tables_count is changed by table_remove
3422 616 : while (data->tables_count > 0) {
3423 214 : cloudsync_table_context *t = data->tables[data->tables_count - 1];
3424 214 : table_remove(data, t);
3425 214 : table_free(t);
3426 : }
3427 :
3428 402 : if (data->schema_version_stmt) databasevm_finalize(data->schema_version_stmt);
3429 402 : if (data->data_version_stmt) databasevm_finalize(data->data_version_stmt);
3430 402 : if (data->db_version_stmt) databasevm_finalize(data->db_version_stmt);
3431 402 : if (data->getset_siteid_stmt) databasevm_finalize(data->getset_siteid_stmt);
3432 402 : if (data->current_schema) cloudsync_memory_free(data->current_schema);
3433 :
3434 402 : data->schema_version_stmt = NULL;
3435 402 : data->data_version_stmt = NULL;
3436 402 : data->db_version_stmt = NULL;
3437 402 : data->getset_siteid_stmt = NULL;
3438 402 : data->current_schema = NULL;
3439 :
3440 : // reset the site_id so the cloudsync_context_init will be executed again
3441 : // if any other cloudsync function is called after terminate
3442 402 : data->site_id[0] = 0;
3443 :
3444 402 : return 1;
3445 : }
3446 :
3447 246 : int cloudsync_init_table (cloudsync_context *data, const char *table_name, const char *algo_name, bool skip_int_pk_check) {
3448 : // sanity check table and its primary key(s)
3449 246 : int rc = cloudsync_table_sanity_check(data, table_name, skip_int_pk_check);
3450 246 : if (rc != DBRES_OK) return rc;
3451 :
3452 : // init cloudsync_settings
3453 244 : if (cloudsync_context_init(data) == NULL) {
3454 0 : return cloudsync_set_error(data, "Unable to initialize cloudsync context", DBRES_MISUSE);
3455 : }
3456 :
3457 : // sanity check algo name (if exists)
3458 244 : table_algo algo_new = table_algo_none;
3459 244 : if (!algo_name) algo_name = CLOUDSYNC_DEFAULT_ALGO;
3460 :
3461 244 : algo_new = cloudsync_algo_from_name(algo_name);
3462 244 : if (algo_new == table_algo_none) {
3463 : char buffer[1024];
3464 1 : snprintf(buffer, sizeof(buffer), "Unknown CRDT algorithm name %s", algo_name);
3465 1 : return cloudsync_set_error(data, buffer, DBRES_ERROR);
3466 : }
3467 :
3468 : // DWS and AWS algorithms are not yet implemented in the merge logic
3469 243 : if (algo_new == table_algo_crdt_dws || algo_new == table_algo_crdt_aws) {
3470 : char buffer[1024];
3471 0 : snprintf(buffer, sizeof(buffer), "CRDT algorithm %s is not yet supported", algo_name);
3472 0 : return cloudsync_set_error(data, buffer, DBRES_ERROR);
3473 : }
3474 :
3475 : // check if table name was already augmented
3476 243 : table_algo algo_current = dbutils_table_settings_get_algo(data, table_name);
3477 :
3478 : // sanity check algorithm
3479 243 : if ((algo_new == algo_current) && (algo_current != table_algo_none)) {
3480 : // if table algorithms and the same and not none, do nothing
3481 243 : } else if ((algo_new == table_algo_none) && (algo_current == table_algo_none)) {
3482 : // nothing is written into settings because the default table_algo_crdt_cls will be used
3483 0 : algo_new = algo_current = table_algo_crdt_cls;
3484 216 : } else if ((algo_new == table_algo_none) && (algo_current != table_algo_none)) {
3485 : // algo is already written into settins so just use it
3486 0 : algo_new = algo_current;
3487 216 : } else if ((algo_new != table_algo_none) && (algo_current == table_algo_none)) {
3488 : // write table algo name in settings
3489 216 : dbutils_table_settings_set_key_value(data, table_name, "*", "algo", algo_name);
3490 216 : } else {
3491 : // error condition
3492 0 : return cloudsync_set_error(data, "The function cloudsync_cleanup(table) must be called before changing a table algorithm", DBRES_MISUSE);
3493 : }
3494 :
3495 : // Run the following function even if table was already augmented.
3496 : // It is safe to call the following function multiple times, if there is nothing to update nothing will be changed.
3497 : // After an alter table, in contrast, all the cloudsync triggers, tables and stmts must be recreated.
3498 :
3499 : // sync algo with table (unused in this version)
3500 : // cloudsync_sync_table_key(data, table_name, "*", CLOUDSYNC_KEY_ALGO, crdt_algo_name(algo_new));
3501 :
3502 : // read row-level filter from settings (if any)
3503 : char init_filter_buf[2048];
3504 243 : int init_frc = dbutils_table_settings_get_value(data, table_name, "*", "filter", init_filter_buf, sizeof(init_filter_buf));
3505 243 : const char *init_filter = (init_frc == DBRES_OK && init_filter_buf[0]) ? init_filter_buf : NULL;
3506 :
3507 : // check triggers
3508 243 : rc = database_create_triggers(data, table_name, algo_new, init_filter);
3509 243 : if (rc != DBRES_OK) return cloudsync_set_error(data, "An error occurred while creating triggers", DBRES_MISUSE);
3510 :
3511 : // check meta-table
3512 243 : rc = database_create_metatable(data, table_name);
3513 243 : if (rc != DBRES_OK) return cloudsync_set_error(data, "An error occurred while creating metatable", DBRES_MISUSE);
3514 :
3515 : // add prepared statements
3516 243 : if (cloudsync_add_dbvms(data) != DBRES_OK) {
3517 0 : return cloudsync_set_error(data, "An error occurred while trying to compile prepared SQL statements", DBRES_MISUSE);
3518 : }
3519 :
3520 : // add table to in-memory data context
3521 243 : if (table_add_to_context(data, algo_new, table_name) == false) {
3522 : char buffer[1024];
3523 0 : snprintf(buffer, sizeof(buffer), "An error occurred while adding %s table information to global context", table_name);
3524 0 : return cloudsync_set_error(data, buffer, DBRES_MISUSE);
3525 : }
3526 :
3527 243 : if (cloudsync_refill_metatable(data, table_name) != DBRES_OK) {
3528 0 : return cloudsync_set_error(data, "An error occurred while trying to fill the augmented table", DBRES_MISUSE);
3529 : }
3530 :
3531 243 : return DBRES_OK;
3532 246 : }
|