Line data Source code
1 : //
2 : // cloudsync.c
3 : // cloudsync
4 : //
5 : // Created by Marco Bambini on 16/05/24.
6 : //
7 :
8 : #include <inttypes.h>
9 : #include <stdbool.h>
10 : #include <limits.h>
11 : #include <stdint.h>
12 : #include <stdlib.h>
13 : #include <string.h>
14 : #include <assert.h>
15 : #include <stdio.h>
16 : #include <errno.h>
17 : #include <math.h>
18 :
19 : #include "cloudsync.h"
20 : #include "lz4.h"
21 : #include "pk.h"
22 : #include "sql.h"
23 : #include "utils.h"
24 : #include "dbutils.h"
25 : #include "block.h"
26 :
27 : #ifdef _WIN32
28 : #include <winsock2.h>
29 : #include <ws2tcpip.h>
30 : #else
31 : #include <arpa/inet.h> // for htonl, htons, ntohl, ntohs
32 : #include <netinet/in.h> // for struct sockaddr_in, INADDR_ANY, etc. (if needed)
33 : #endif
34 :
35 : #ifndef htonll
36 : #if __BIG_ENDIAN__
37 : #define htonll(x) (x)
38 : #define ntohll(x) (x)
39 : #else
40 : #ifndef htobe64
41 : #define htonll(x) ((uint64_t)htonl((x) & 0xFFFFFFFF) << 32 | (uint64_t)htonl((x) >> 32))
42 : #define ntohll(x) ((uint64_t)ntohl((x) & 0xFFFFFFFF) << 32 | (uint64_t)ntohl((x) >> 32))
43 : #else
44 : #define htonll(x) htobe64(x)
45 : #define ntohll(x) be64toh(x)
46 : #endif
47 : #endif
48 : #endif
49 :
50 : #define CLOUDSYNC_INIT_NTABLES 64
51 : #define CLOUDSYNC_MIN_DB_VERSION 0
52 :
53 : #define CLOUDSYNC_PAYLOAD_MINBUF_SIZE (512*1024)
54 : #define CLOUDSYNC_PAYLOAD_SIGNATURE 0x434C5359 /* 'C','L','S','Y' */
55 : #define CLOUDSYNC_PAYLOAD_VERSION_ORIGNAL 1
56 : #define CLOUDSYNC_PAYLOAD_VERSION_1 CLOUDSYNC_PAYLOAD_VERSION_ORIGNAL
57 : #define CLOUDSYNC_PAYLOAD_VERSION_2 2
58 : #define CLOUDSYNC_PAYLOAD_VERSION_LATEST CLOUDSYNC_PAYLOAD_VERSION_2
59 : #define CLOUDSYNC_PAYLOAD_MIN_VERSION_WITH_CHECKSUM CLOUDSYNC_PAYLOAD_VERSION_2
60 :
61 : #ifndef MAX
62 : #define MAX(a, b) (((a)>(b))?(a):(b))
63 : #endif
64 :
65 : #define DEBUG_DBERROR(_rc, _fn, _data) do {if (_rc != DBRES_OK) printf("Error in %s: %s\n", _fn, database_errmsg(_data));} while (0)
66 :
67 : typedef enum {
68 : CLOUDSYNC_PK_INDEX_TBL = 0,
69 : CLOUDSYNC_PK_INDEX_PK = 1,
70 : CLOUDSYNC_PK_INDEX_COLNAME = 2,
71 : CLOUDSYNC_PK_INDEX_COLVALUE = 3,
72 : CLOUDSYNC_PK_INDEX_COLVERSION = 4,
73 : CLOUDSYNC_PK_INDEX_DBVERSION = 5,
74 : CLOUDSYNC_PK_INDEX_SITEID = 6,
75 : CLOUDSYNC_PK_INDEX_CL = 7,
76 : CLOUDSYNC_PK_INDEX_SEQ = 8
77 : } CLOUDSYNC_PK_INDEX;
78 :
79 : typedef enum {
80 : DBVM_VALUE_ERROR = -1,
81 : DBVM_VALUE_UNCHANGED = 0,
82 : DBVM_VALUE_CHANGED = 1,
83 : } DBVM_VALUE;
84 :
85 : #define SYNCBIT_SET(_data) _data->insync = 1
86 : #define SYNCBIT_RESET(_data) _data->insync = 0
87 :
88 : // MARK: - Deferred column-batch merge -
89 :
90 : typedef struct {
91 : const char *col_name; // pointer into table_context->col_name[idx] (stable)
92 : dbvalue_t *col_value; // duplicated via database_value_dup (owned)
93 : int64_t col_version;
94 : int64_t db_version;
95 : uint8_t site_id[UUID_LEN];
96 : int site_id_len;
97 : int64_t seq;
98 : } merge_pending_entry;
99 :
100 : typedef struct {
101 : cloudsync_table_context *table;
102 : char *pk; // malloc'd copy, freed on flush
103 : int pk_len;
104 : int64_t cl;
105 : bool sentinel_pending;
106 : bool row_exists; // true when the PK already exists locally
107 : int count;
108 : int capacity;
109 : merge_pending_entry *entries;
110 :
111 : // Statement cache — reuse the prepared statement when the column
112 : // combination and row_exists flag match between consecutive PK flushes.
113 : dbvm_t *cached_vm;
114 : bool cached_row_exists;
115 : int cached_col_count;
116 : const char **cached_col_names; // array of pointers into table_context (not owned)
117 : } merge_pending_batch;
118 :
119 : // MARK: -
120 :
121 : struct cloudsync_pk_decode_bind_context {
122 : dbvm_t *vm;
123 : char *tbl;
124 : int64_t tbl_len;
125 : const void *pk;
126 : int64_t pk_len;
127 : char *col_name;
128 : int64_t col_name_len;
129 : int64_t col_version;
130 : int64_t db_version;
131 : const void *site_id;
132 : int64_t site_id_len;
133 : int64_t cl;
134 : int64_t seq;
135 : };
136 :
137 : struct cloudsync_context {
138 : void *db;
139 : char errmsg[1024];
140 : int errcode;
141 :
142 : char *libversion;
143 : uint8_t site_id[UUID_LEN];
144 : int insync;
145 : int debug;
146 : bool merge_equal_values;
147 : void *aux_data;
148 :
149 : // stmts and context values
150 : dbvm_t *schema_version_stmt;
151 : dbvm_t *data_version_stmt;
152 : dbvm_t *db_version_stmt;
153 : dbvm_t *getset_siteid_stmt;
154 : int data_version;
155 : int schema_version;
156 : uint64_t schema_hash;
157 :
158 : // set at transaction start and reset on commit/rollback
159 : int64_t db_version;
160 : // version the DB would have if the transaction committed now
161 : int64_t pending_db_version;
162 : // used to set an order inside each transaction
163 : int seq;
164 :
165 : // optional schema_name to be set in the cloudsync_table_context
166 : char *current_schema;
167 :
168 : // augmented tables are stored in-memory so we do not need to retrieve information about
169 : // col_names and cid from the disk each time a write statement is performed
170 : // we do also not need to use an hash map here because for few tables the direct
171 : // in-memory comparison with table name is faster
172 : cloudsync_table_context **tables; // dense vector: [0..tables_count-1] are valid
173 : int tables_count; // size
174 : int tables_cap; // capacity
175 :
176 : int skip_decode_idx; // -1 in sqlite, col_value index in postgresql
177 :
178 : // deferred column-batch merge (active during payload_apply)
179 : merge_pending_batch *pending_batch;
180 : };
181 :
182 : struct cloudsync_table_context {
183 : table_algo algo; // CRDT algoritm associated to the table
184 : char *name; // table name
185 : char *schema; // table schema
186 : char *meta_ref; // schema-qualified meta table name (e.g. "schema"."name_cloudsync")
187 : char *base_ref; // schema-qualified base table name (e.g. "schema"."name")
188 : char **col_name; // array of column names
189 : dbvm_t **col_merge_stmt; // array of merge insert stmt (indexed by col_name)
190 : dbvm_t **col_value_stmt; // array of column value stmt (indexed by col_name)
191 : int *col_id; // array of column id
192 : col_algo_t *col_algo; // per-column algorithm (normal or block)
193 : char **col_delimiter; // per-column delimiter for block splitting (NULL = default "\n")
194 : bool has_block_cols; // quick check: does this table have any block columns?
195 : dbvm_t *block_value_read_stmt; // SELECT col_value FROM blocks table
196 : dbvm_t *block_value_write_stmt; // INSERT OR REPLACE into blocks table
197 : dbvm_t *block_value_delete_stmt; // DELETE from blocks table
198 : dbvm_t *block_list_stmt; // SELECT block entries for materialization
199 : char *blocks_ref; // schema-qualified blocks table name
200 : int ncols; // number of non primary key cols
201 : int npks; // number of primary key cols
202 : bool enabled; // flag to check if a table is enabled or disabled
203 : #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
204 : bool rowid_only; // a table with no primary keys other than the implicit rowid
205 : #endif
206 :
207 : char **pk_name; // array of primary key names
208 :
209 : // precompiled statements
210 : dbvm_t *meta_pkexists_stmt; // check if a primary key already exist in the augmented table
211 : dbvm_t *meta_sentinel_update_stmt; // update a local sentinel row
212 : dbvm_t *meta_sentinel_insert_stmt; // insert a local sentinel row
213 : dbvm_t *meta_row_insert_update_stmt; // insert/update a local row
214 : dbvm_t *meta_row_drop_stmt; // delete rows from meta
215 : dbvm_t *meta_update_move_stmt; // update rows in meta when pk changes
216 : dbvm_t *meta_local_cl_stmt; // compute local cl value
217 : dbvm_t *meta_winner_clock_stmt; // get the rowid of the last inserted/updated row in the meta table
218 : dbvm_t *meta_merge_delete_drop;
219 : dbvm_t *meta_zero_clock_stmt;
220 : dbvm_t *meta_col_version_stmt;
221 : dbvm_t *meta_site_id_stmt;
222 :
223 : dbvm_t *real_col_values_stmt; // retrieve all column values based on pk
224 : dbvm_t *real_merge_delete_stmt;
225 : dbvm_t *real_merge_sentinel_stmt;
226 :
227 : bool is_altering; // flag to track if a table alteration is in progress
228 :
229 : // context
230 : cloudsync_context *context;
231 : };
232 :
233 : struct cloudsync_payload_context {
234 : char *buffer;
235 : size_t bsize;
236 : size_t balloc;
237 : size_t bused;
238 : uint64_t nrows;
239 : uint16_t ncols;
240 : };
241 :
242 : #ifdef _MSC_VER
243 : #pragma pack(push, 1) // For MSVC: pack struct with 1-byte alignment
244 : #define PACKED
245 : #else
246 : #define PACKED __attribute__((__packed__))
247 : #endif
248 :
249 : typedef struct PACKED {
250 : uint32_t signature; // 'CLSY'
251 : uint8_t version; // protocol version
252 : uint8_t libversion[3]; // major.minor.patch
253 : uint32_t expanded_size;
254 : uint16_t ncols;
255 : uint32_t nrows;
256 : uint64_t schema_hash;
257 : uint8_t checksum[6]; // 48 bits checksum (to ensure struct is 32 bytes)
258 : } cloudsync_payload_header;
259 :
260 : #ifdef _MSC_VER
261 : #pragma pack(pop)
262 : #endif
263 :
264 : #if CLOUDSYNC_UNITTEST
265 : bool force_uncompressed_blob = false;
266 : #define CHECK_FORCE_UNCOMPRESSED_BUFFER() if (force_uncompressed_blob) use_uncompressed_buffer = true
267 : #else
268 : #define CHECK_FORCE_UNCOMPRESSED_BUFFER()
269 : #endif
270 :
271 : // Internal prototypes
272 : int local_mark_insert_or_update_meta (cloudsync_table_context *table, const void *pk, size_t pklen, const char *col_name, int64_t db_version, int seq);
273 :
274 : // MARK: - CRDT algos -
275 :
276 552 : table_algo cloudsync_algo_from_name (const char *algo_name) {
277 552 : if (algo_name == NULL) return table_algo_none;
278 :
279 552 : if ((strcasecmp(algo_name, "CausalLengthSet") == 0) || (strcasecmp(algo_name, "cls") == 0)) return table_algo_crdt_cls;
280 243 : if ((strcasecmp(algo_name, "GrowOnlySet") == 0) || (strcasecmp(algo_name, "gos") == 0)) return table_algo_crdt_gos;
281 236 : if ((strcasecmp(algo_name, "DeleteWinsSet") == 0) || (strcasecmp(algo_name, "dws") == 0)) return table_algo_crdt_dws;
282 235 : if ((strcasecmp(algo_name, "AddWinsSet") == 0) || (strcasecmp(algo_name, "aws") == 0)) return table_algo_crdt_aws;
283 :
284 : // if nothing is found
285 234 : return table_algo_none;
286 552 : }
287 :
288 110 : const char *cloudsync_algo_name (table_algo algo) {
289 110 : switch (algo) {
290 105 : case table_algo_crdt_cls: return "cls";
291 1 : case table_algo_crdt_gos: return "gos";
292 1 : case table_algo_crdt_dws: return "dws";
293 1 : case table_algo_crdt_aws: return "aws";
294 1 : case table_algo_none: return NULL;
295 : }
296 1 : return NULL;
297 110 : }
298 :
299 : // MARK: - DBVM Utils -
300 :
301 31645 : DBVM_VALUE dbvm_execute (dbvm_t *stmt, cloudsync_context *data) {
302 31645 : if (!stmt) return DBVM_VALUE_ERROR;
303 :
304 31645 : int rc = databasevm_step(stmt);
305 31645 : if (rc != DBRES_ROW && rc != DBRES_DONE) {
306 2 : if (data) DEBUG_DBERROR(rc, "stmt_execute", data);
307 2 : databasevm_reset(stmt);
308 2 : return DBVM_VALUE_ERROR;
309 : }
310 :
311 31643 : DBVM_VALUE result = DBVM_VALUE_CHANGED;
312 31643 : if (stmt == data->data_version_stmt) {
313 29413 : int version = (int)database_column_int(stmt, 0);
314 29413 : if (version != data->data_version) {
315 203 : data->data_version = version;
316 203 : } else {
317 29210 : result = DBVM_VALUE_UNCHANGED;
318 : }
319 31643 : } else if (stmt == data->schema_version_stmt) {
320 1115 : int version = (int)database_column_int(stmt, 0);
321 1115 : if (version > data->schema_version) {
322 332 : data->schema_version = version;
323 332 : } else {
324 783 : result = DBVM_VALUE_UNCHANGED;
325 : }
326 :
327 2230 : } else if (stmt == data->db_version_stmt) {
328 1115 : data->db_version = (rc == DBRES_DONE) ? CLOUDSYNC_MIN_DB_VERSION : database_column_int(stmt, 0);
329 1115 : }
330 :
331 31643 : databasevm_reset(stmt);
332 31643 : return result;
333 31645 : }
334 :
335 3531 : int dbvm_count (dbvm_t *stmt, const char *value, size_t len, int type) {
336 3531 : int result = -1;
337 3531 : int rc = DBRES_OK;
338 :
339 3531 : if (value) {
340 3530 : rc = (type == DBTYPE_TEXT) ? databasevm_bind_text(stmt, 1, value, (int)len) : databasevm_bind_blob(stmt, 1, value, len);
341 3530 : if (rc != DBRES_OK) goto cleanup;
342 3530 : }
343 :
344 3531 : rc = databasevm_step(stmt);
345 7062 : if (rc == DBRES_DONE) {
346 1 : result = 0;
347 1 : rc = DBRES_OK;
348 3531 : } else if (rc == DBRES_ROW) {
349 3530 : result = (int)database_column_int(stmt, 0);
350 3530 : rc = DBRES_OK;
351 3530 : }
352 :
353 : cleanup:
354 3531 : databasevm_reset(stmt);
355 3531 : return result;
356 : }
357 :
358 255105 : void dbvm_reset (dbvm_t *stmt) {
359 255105 : if (!stmt) return;
360 232187 : databasevm_clear_bindings(stmt);
361 232187 : databasevm_reset(stmt);
362 255105 : }
363 :
364 : // MARK: - Database Version -
365 :
366 595 : char *cloudsync_dbversion_build_query (cloudsync_context *data) {
367 : // this function must be manually called each time tables changes
368 : // because the query plan changes too and it must be re-prepared
369 : // unfortunately there is no other way
370 :
371 : // we need to execute a query like:
372 : /*
373 : SELECT max(version) as version FROM (
374 : SELECT max(db_version) as version FROM "table1_cloudsync"
375 : UNION ALL
376 : SELECT max(db_version) as version FROM "table2_cloudsync"
377 : UNION ALL
378 : SELECT max(db_version) as version FROM "table3_cloudsync"
379 : UNION
380 : SELECT value as version FROM cloudsync_settings WHERE key = 'pre_alter_dbversion'
381 : )
382 : */
383 :
384 : // the good news is that the query can be computed in SQLite without the need to do any extra computation from the host language
385 :
386 595 : char *value = NULL;
387 595 : int rc = database_select_text(data, SQL_DBVERSION_BUILD_QUERY, &value);
388 595 : return (rc == DBRES_OK) ? value : NULL;
389 : }
390 :
391 798 : int cloudsync_dbversion_rebuild (cloudsync_context *data) {
392 798 : if (data->db_version_stmt) {
393 392 : databasevm_finalize(data->db_version_stmt);
394 392 : data->db_version_stmt = NULL;
395 392 : }
396 :
397 798 : int64_t count = dbutils_table_settings_count_tables(data);
398 798 : if (count == 0) return DBRES_OK;
399 595 : else if (count == -1) return cloudsync_set_dberror(data);
400 :
401 595 : char *sql = cloudsync_dbversion_build_query(data);
402 595 : if (!sql) return DBRES_NOMEM;
403 : DEBUG_SQL("db_version_stmt: %s", sql);
404 :
405 595 : int rc = databasevm_prepare(data, sql, (void **)&data->db_version_stmt, DBFLAG_PERSISTENT);
406 : DEBUG_STMT("db_version_stmt %p", data->db_version_stmt);
407 595 : cloudsync_memory_free(sql);
408 595 : return rc;
409 798 : }
410 :
411 1115 : int cloudsync_dbversion_rerun (cloudsync_context *data) {
412 1115 : DBVM_VALUE schema_changed = dbvm_execute(data->schema_version_stmt, data);
413 1115 : if (schema_changed == DBVM_VALUE_ERROR) return -1;
414 :
415 1115 : if (schema_changed == DBVM_VALUE_CHANGED) {
416 332 : int rc = cloudsync_dbversion_rebuild(data);
417 332 : if (rc != DBRES_OK) return -1;
418 332 : }
419 :
420 1115 : if (!data->db_version_stmt) {
421 0 : data->db_version = CLOUDSYNC_MIN_DB_VERSION;
422 0 : return 0;
423 : }
424 :
425 1115 : DBVM_VALUE rc = dbvm_execute(data->db_version_stmt, data);
426 1115 : if (rc == DBVM_VALUE_ERROR) return -1;
427 1115 : return 0;
428 1115 : }
429 :
430 29413 : int cloudsync_dbversion_check_uptodate (cloudsync_context *data) {
431 : // perform a PRAGMA data_version to check if some other process write any data
432 29413 : DBVM_VALUE rc = dbvm_execute(data->data_version_stmt, data);
433 29413 : if (rc == DBVM_VALUE_ERROR) return -1;
434 :
435 : // db_version is already set and there is no need to update it
436 29413 : if (data->db_version != CLOUDSYNC_VALUE_NOTSET && rc == DBVM_VALUE_UNCHANGED) return 0;
437 :
438 1115 : return cloudsync_dbversion_rerun(data);
439 29413 : }
440 :
441 29389 : int64_t cloudsync_dbversion_next (cloudsync_context *data, int64_t merging_version) {
442 29389 : int rc = cloudsync_dbversion_check_uptodate(data);
443 29389 : if (rc != DBRES_OK) return -1;
444 :
445 29389 : int64_t result = data->db_version + 1;
446 29389 : if (result < data->pending_db_version) result = data->pending_db_version;
447 29389 : if (merging_version != CLOUDSYNC_VALUE_NOTSET && result < merging_version) result = merging_version;
448 29389 : data->pending_db_version = result;
449 :
450 29389 : return result;
451 29389 : }
452 :
453 : // MARK: - PK Context -
454 :
455 0 : char *cloudsync_pk_context_tbl (cloudsync_pk_decode_bind_context *ctx, int64_t *tbl_len) {
456 0 : *tbl_len = ctx->tbl_len;
457 0 : return ctx->tbl;
458 : }
459 :
460 0 : void *cloudsync_pk_context_pk (cloudsync_pk_decode_bind_context *ctx, int64_t *pk_len) {
461 0 : *pk_len = ctx->pk_len;
462 0 : return (void *)ctx->pk;
463 : }
464 :
465 0 : char *cloudsync_pk_context_colname (cloudsync_pk_decode_bind_context *ctx, int64_t *colname_len) {
466 0 : *colname_len = ctx->col_name_len;
467 0 : return ctx->col_name;
468 : }
469 :
470 0 : int64_t cloudsync_pk_context_cl (cloudsync_pk_decode_bind_context *ctx) {
471 0 : return ctx->cl;
472 : }
473 :
474 0 : int64_t cloudsync_pk_context_dbversion (cloudsync_pk_decode_bind_context *ctx) {
475 0 : return ctx->db_version;
476 : }
477 :
478 : // MARK: - CloudSync Context -
479 :
480 12609 : int cloudsync_insync (cloudsync_context *data) {
481 12609 : return data->insync;
482 : }
483 :
484 764 : void *cloudsync_siteid (cloudsync_context *data) {
485 764 : return (void *)data->site_id;
486 : }
487 :
488 1 : void cloudsync_reset_siteid (cloudsync_context *data) {
489 1 : memset(data->site_id, 0, sizeof(uint8_t) * UUID_LEN);
490 1 : }
491 :
492 205 : int cloudsync_load_siteid (cloudsync_context *data) {
493 : // check if site_id was already loaded
494 205 : if (data->site_id[0] != 0) return DBRES_OK;
495 :
496 : // load site_id
497 203 : char *buffer = NULL;
498 203 : int64_t size = 0;
499 203 : int rc = database_select_blob(data, SQL_SITEID_SELECT_ROWID0, &buffer, &size);
500 203 : if (rc != DBRES_OK) return rc;
501 203 : if (!buffer || size != UUID_LEN) {
502 0 : if (buffer) cloudsync_memory_free(buffer);
503 0 : return cloudsync_set_error(data, "Unable to retrieve siteid", DBRES_MISUSE);
504 : }
505 :
506 203 : memcpy(data->site_id, buffer, UUID_LEN);
507 203 : cloudsync_memory_free(buffer);
508 :
509 203 : return DBRES_OK;
510 205 : }
511 :
512 1 : int64_t cloudsync_dbversion (cloudsync_context *data) {
513 1 : return data->db_version;
514 : }
515 :
516 11977 : int cloudsync_bumpseq (cloudsync_context *data) {
517 11977 : int value = data->seq;
518 11977 : data->seq += 1;
519 11977 : return value;
520 : }
521 :
522 263 : void cloudsync_update_schema_hash (cloudsync_context *data) {
523 263 : database_update_schema_hash(data, &data->schema_hash);
524 263 : }
525 :
526 59466 : void *cloudsync_db (cloudsync_context *data) {
527 59466 : return data->db;
528 : }
529 :
530 466 : int cloudsync_add_dbvms (cloudsync_context *data) {
531 : DEBUG_DBFUNCTION("cloudsync_add_stmts");
532 :
533 466 : if (data->data_version_stmt == NULL) {
534 203 : int rc = databasevm_prepare(data, SQL_DATA_VERSION, (void **)&data->data_version_stmt, DBFLAG_PERSISTENT);
535 : DEBUG_STMT("data_version_stmt %p", data->data_version_stmt);
536 203 : if (rc != DBRES_OK) return rc;
537 : DEBUG_SQL("data_version_stmt: %s", SQL_DATA_VERSION);
538 203 : }
539 :
540 466 : if (data->schema_version_stmt == NULL) {
541 203 : int rc = databasevm_prepare(data, SQL_SCHEMA_VERSION, (void **)&data->schema_version_stmt, DBFLAG_PERSISTENT);
542 : DEBUG_STMT("schema_version_stmt %p", data->schema_version_stmt);
543 203 : if (rc != DBRES_OK) return rc;
544 : DEBUG_SQL("schema_version_stmt: %s", SQL_SCHEMA_VERSION);
545 203 : }
546 :
547 466 : if (data->getset_siteid_stmt == NULL) {
548 : // get and set index of the site_id
549 : // in SQLite, we can’t directly combine an INSERT and a SELECT to both insert a row and return an identifier (rowid) in a single statement,
550 : // however, we can use a workaround by leveraging the INSERT statement with ON CONFLICT DO UPDATE and then combining it with RETURNING rowid
551 203 : int rc = databasevm_prepare(data, SQL_SITEID_GETSET_ROWID_BY_SITEID, (void **)&data->getset_siteid_stmt, DBFLAG_PERSISTENT);
552 : DEBUG_STMT("getset_siteid_stmt %p", data->getset_siteid_stmt);
553 203 : if (rc != DBRES_OK) return rc;
554 : DEBUG_SQL("getset_siteid_stmt: %s", SQL_SITEID_GETSET_ROWID_BY_SITEID);
555 203 : }
556 :
557 466 : return cloudsync_dbversion_rebuild(data);
558 466 : }
559 :
560 21 : int cloudsync_set_error (cloudsync_context *data, const char *err_user, int err_code) {
561 : // force err_code to be something different than OK
562 21 : if (err_code == DBRES_OK) err_code = database_errcode(data);
563 21 : if (err_code == DBRES_OK) err_code = DBRES_ERROR;
564 :
565 : // compute a meaningful error message
566 21 : if (err_user == NULL) {
567 5 : snprintf(data->errmsg, sizeof(data->errmsg), "%s", database_errmsg(data));
568 5 : } else {
569 16 : const char *db_error = database_errmsg(data);
570 : char db_error_copy[sizeof(data->errmsg)];
571 16 : int rc = database_errcode(data);
572 16 : if (rc == DBRES_OK) {
573 16 : snprintf(data->errmsg, sizeof(data->errmsg), "%s", err_user);
574 16 : } else {
575 0 : if (db_error == data->errmsg) {
576 0 : snprintf(db_error_copy, sizeof(db_error_copy), "%s", db_error);
577 0 : db_error = db_error_copy;
578 0 : }
579 0 : snprintf(data->errmsg, sizeof(data->errmsg), "%s (%s)", err_user, db_error);
580 : }
581 : }
582 :
583 21 : data->errcode = err_code;
584 21 : return err_code;
585 : }
586 :
587 5 : int cloudsync_set_dberror (cloudsync_context *data) {
588 5 : return cloudsync_set_error(data, NULL, DBRES_OK);
589 : }
590 :
591 12 : const char *cloudsync_errmsg (cloudsync_context *data) {
592 12 : return data->errmsg;
593 : }
594 :
595 3 : int cloudsync_errcode (cloudsync_context *data) {
596 3 : return data->errcode;
597 : }
598 :
599 2 : void cloudsync_reset_error (cloudsync_context *data) {
600 2 : data->errmsg[0] = 0;
601 2 : data->errcode = DBRES_OK;
602 2 : }
603 :
604 3 : void *cloudsync_auxdata (cloudsync_context *data) {
605 3 : return data->aux_data;
606 : }
607 :
608 2 : void cloudsync_set_auxdata (cloudsync_context *data, void *xdata) {
609 2 : data->aux_data = xdata;
610 2 : }
611 :
612 6 : void cloudsync_set_schema (cloudsync_context *data, const char *schema) {
613 6 : if (data->current_schema && schema && strcmp(data->current_schema, schema) == 0) return;
614 5 : if (data->current_schema) cloudsync_memory_free(data->current_schema);
615 5 : data->current_schema = NULL;
616 5 : if (schema) data->current_schema = cloudsync_string_dup_lowercase(schema);
617 6 : }
618 :
619 1169 : const char *cloudsync_schema (cloudsync_context *data) {
620 1169 : return data->current_schema;
621 : }
622 :
623 4 : const char *cloudsync_table_schema (cloudsync_context *data, const char *table_name) {
624 4 : cloudsync_table_context *table = table_lookup(data, table_name);
625 4 : if (!table) return NULL;
626 :
627 1 : return table->schema;
628 4 : }
629 :
630 : // MARK: - Table Utils -
631 :
632 69 : void table_pknames_free (char **names, int nrows) {
633 69 : if (!names) return;
634 132 : for (int i = 0; i < nrows; ++i) {cloudsync_memory_free(names[i]);}
635 46 : cloudsync_memory_free(names);
636 69 : }
637 :
638 258 : char *table_build_mergedelete_sql (cloudsync_table_context *table) {
639 : #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
640 : if (table->rowid_only) {
641 : char *sql = memory_mprintf(SQL_DELETE_ROW_BY_ROWID, table->name);
642 : return sql;
643 : }
644 : #endif
645 :
646 258 : return sql_build_delete_by_pk(table->context, table->name, table->schema);
647 : }
648 :
649 1298 : char *table_build_mergeinsert_sql (cloudsync_table_context *table, const char *colname) {
650 1298 : char *sql = NULL;
651 :
652 : #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
653 : if (table->rowid_only) {
654 : if (colname == NULL) {
655 : // INSERT OR IGNORE INTO customers (first_name,last_name) VALUES (?,?);
656 : sql = memory_mprintf(SQL_INSERT_ROWID_IGNORE, table->name);
657 : } else {
658 : // INSERT INTO customers (first_name,last_name,age) VALUES (?,?,?) ON CONFLICT DO UPDATE SET age=?;
659 : sql = memory_mprintf(SQL_UPSERT_ROWID_AND_COL_BY_ROWID, table->name, colname, colname);
660 : }
661 : return sql;
662 : }
663 : #endif
664 :
665 1298 : if (colname == NULL) {
666 : // is sentinel insert
667 258 : sql = sql_build_insert_pk_ignore(table->context, table->name, table->schema);
668 258 : } else {
669 1040 : sql = sql_build_upsert_pk_and_col(table->context, table->name, colname, table->schema);
670 : }
671 1298 : return sql;
672 : }
673 :
674 1040 : char *table_build_value_sql (cloudsync_table_context *table, const char *colname) {
675 : #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
676 : if (table->rowid_only) {
677 : char *colnamequote = "\"";
678 : char *sql = memory_mprintf(SQL_SELECT_COLS_BY_ROWID_FMT, colnamequote, colname, colnamequote, table->name);
679 : return sql;
680 : }
681 : #endif
682 :
683 : // SELECT age FROM customers WHERE first_name=? AND last_name=?;
684 1040 : return sql_build_select_cols_by_pk(table->context, table->name, colname, table->schema);
685 : }
686 :
687 258 : cloudsync_table_context *table_create (cloudsync_context *data, const char *name, table_algo algo) {
688 : DEBUG_DBFUNCTION("table_create %s", name);
689 :
690 258 : cloudsync_table_context *table = (cloudsync_table_context *)cloudsync_memory_zeroalloc(sizeof(cloudsync_table_context));
691 258 : if (!table) return NULL;
692 :
693 258 : table->context = data;
694 258 : table->algo = algo;
695 258 : table->name = cloudsync_string_dup_lowercase(name);
696 :
697 : // Detect schema from metadata table location. If metadata table doesn't
698 : // exist yet (during initialization), fall back to cloudsync_schema() which
699 : // returns the explicitly set schema or current_schema().
700 258 : table->schema = database_table_schema(name);
701 258 : if (!table->schema) {
702 258 : const char *fallback_schema = cloudsync_schema(data);
703 258 : if (fallback_schema) {
704 0 : table->schema = cloudsync_string_dup(fallback_schema);
705 0 : }
706 258 : }
707 :
708 258 : if (!table->name) {
709 0 : cloudsync_memory_free(table);
710 0 : return NULL;
711 : }
712 258 : table->meta_ref = database_build_meta_ref(table->schema, table->name);
713 258 : table->base_ref = database_build_base_ref(table->schema, table->name);
714 258 : table->enabled = true;
715 :
716 258 : return table;
717 258 : }
718 :
719 258 : void table_free (cloudsync_table_context *table) {
720 : DEBUG_DBFUNCTION("table_free %s", (table) ? (table->name) : "NULL");
721 258 : if (!table) return;
722 :
723 258 : if (table->ncols > 0) {
724 219 : if (table->col_name) {
725 1259 : for (int i=0; i<table->ncols; ++i) {
726 1040 : cloudsync_memory_free(table->col_name[i]);
727 1040 : }
728 219 : cloudsync_memory_free(table->col_name);
729 219 : }
730 219 : if (table->col_merge_stmt) {
731 1259 : for (int i=0; i<table->ncols; ++i) {
732 1040 : databasevm_finalize(table->col_merge_stmt[i]);
733 1040 : }
734 219 : cloudsync_memory_free(table->col_merge_stmt);
735 219 : }
736 219 : if (table->col_value_stmt) {
737 1259 : for (int i=0; i<table->ncols; ++i) {
738 1040 : databasevm_finalize(table->col_value_stmt[i]);
739 1040 : }
740 219 : cloudsync_memory_free(table->col_value_stmt);
741 219 : }
742 219 : if (table->col_id) {
743 219 : cloudsync_memory_free(table->col_id);
744 219 : }
745 219 : if (table->col_algo) {
746 219 : cloudsync_memory_free(table->col_algo);
747 219 : }
748 219 : if (table->col_delimiter) {
749 1259 : for (int i=0; i<table->ncols; ++i) {
750 1040 : if (table->col_delimiter[i]) cloudsync_memory_free(table->col_delimiter[i]);
751 1040 : }
752 219 : cloudsync_memory_free(table->col_delimiter);
753 219 : }
754 219 : }
755 :
756 258 : if (table->block_value_read_stmt) databasevm_finalize(table->block_value_read_stmt);
757 258 : if (table->block_value_write_stmt) databasevm_finalize(table->block_value_write_stmt);
758 258 : if (table->block_value_delete_stmt) databasevm_finalize(table->block_value_delete_stmt);
759 258 : if (table->block_list_stmt) databasevm_finalize(table->block_list_stmt);
760 258 : if (table->blocks_ref) cloudsync_memory_free(table->blocks_ref);
761 :
762 258 : if (table->name) cloudsync_memory_free(table->name);
763 258 : if (table->schema) cloudsync_memory_free(table->schema);
764 258 : if (table->meta_ref) cloudsync_memory_free(table->meta_ref);
765 258 : if (table->base_ref) cloudsync_memory_free(table->base_ref);
766 258 : if (table->pk_name) table_pknames_free(table->pk_name, table->npks);
767 258 : if (table->meta_pkexists_stmt) databasevm_finalize(table->meta_pkexists_stmt);
768 258 : if (table->meta_sentinel_update_stmt) databasevm_finalize(table->meta_sentinel_update_stmt);
769 258 : if (table->meta_sentinel_insert_stmt) databasevm_finalize(table->meta_sentinel_insert_stmt);
770 258 : if (table->meta_row_insert_update_stmt) databasevm_finalize(table->meta_row_insert_update_stmt);
771 258 : if (table->meta_row_drop_stmt) databasevm_finalize(table->meta_row_drop_stmt);
772 258 : if (table->meta_update_move_stmt) databasevm_finalize(table->meta_update_move_stmt);
773 258 : if (table->meta_local_cl_stmt) databasevm_finalize(table->meta_local_cl_stmt);
774 258 : if (table->meta_winner_clock_stmt) databasevm_finalize(table->meta_winner_clock_stmt);
775 258 : if (table->meta_merge_delete_drop) databasevm_finalize(table->meta_merge_delete_drop);
776 258 : if (table->meta_zero_clock_stmt) databasevm_finalize(table->meta_zero_clock_stmt);
777 258 : if (table->meta_col_version_stmt) databasevm_finalize(table->meta_col_version_stmt);
778 258 : if (table->meta_site_id_stmt) databasevm_finalize(table->meta_site_id_stmt);
779 :
780 258 : if (table->real_col_values_stmt) databasevm_finalize(table->real_col_values_stmt);
781 258 : if (table->real_merge_delete_stmt) databasevm_finalize(table->real_merge_delete_stmt);
782 258 : if (table->real_merge_sentinel_stmt) databasevm_finalize(table->real_merge_sentinel_stmt);
783 :
784 258 : cloudsync_memory_free(table);
785 258 : }
786 :
787 258 : int table_add_stmts (cloudsync_table_context *table, int ncols) {
788 258 : int rc = DBRES_OK;
789 258 : char *sql = NULL;
790 258 : cloudsync_context *data = table->context;
791 :
792 : // META TABLE statements
793 :
794 : // CREATE TABLE IF NOT EXISTS \"%w_cloudsync\" (pk BLOB NOT NULL, col_name TEXT NOT NULL, col_version INTEGER, db_version INTEGER, site_id INTEGER DEFAULT 0, seq INTEGER, PRIMARY KEY (pk, col_name));
795 :
796 : // precompile the pk exists statement
797 : // we do not need an index on the pk column because it is already covered by the fact that it is part of the prikeys
798 : // EXPLAIN QUERY PLAN reports: SEARCH table_name USING PRIMARY KEY (pk=?)
799 258 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_ROW_EXISTS_BY_PK, table->meta_ref);
800 258 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
801 : DEBUG_SQL("meta_pkexists_stmt: %s", sql);
802 :
803 258 : rc = databasevm_prepare(data, sql, (void **)&table->meta_pkexists_stmt, DBFLAG_PERSISTENT);
804 258 : cloudsync_memory_free(sql);
805 258 : if (rc != DBRES_OK) goto cleanup;
806 :
807 : // precompile the update local sentinel statement
808 258 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_UPDATE_COL_BUMP_VERSION, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
809 258 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
810 : DEBUG_SQL("meta_sentinel_update_stmt: %s", sql);
811 :
812 258 : rc = databasevm_prepare(data, sql, (void **)&table->meta_sentinel_update_stmt, DBFLAG_PERSISTENT);
813 258 : cloudsync_memory_free(sql);
814 258 : if (rc != DBRES_OK) goto cleanup;
815 :
816 : // precompile the insert local sentinel statement
817 258 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_UPSERT_COL_INIT_OR_BUMP_VERSION, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE, table->meta_ref, table->meta_ref, table->meta_ref);
818 258 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
819 : DEBUG_SQL("meta_sentinel_insert_stmt: %s", sql);
820 :
821 258 : rc = databasevm_prepare(data, sql, (void **)&table->meta_sentinel_insert_stmt, DBFLAG_PERSISTENT);
822 258 : cloudsync_memory_free(sql);
823 258 : if (rc != DBRES_OK) goto cleanup;
824 :
825 : // precompile the insert/update local row statement
826 258 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_UPSERT_RAW_COLVERSION, table->meta_ref, table->meta_ref);
827 258 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
828 : DEBUG_SQL("meta_row_insert_update_stmt: %s", sql);
829 :
830 258 : rc = databasevm_prepare(data, sql, (void **)&table->meta_row_insert_update_stmt, DBFLAG_PERSISTENT);
831 258 : cloudsync_memory_free(sql);
832 258 : if (rc != DBRES_OK) goto cleanup;
833 :
834 : // precompile the delete rows from meta
835 258 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_DELETE_PK_EXCEPT_COL, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
836 258 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
837 : DEBUG_SQL("meta_row_drop_stmt: %s", sql);
838 :
839 258 : rc = databasevm_prepare(data, sql, (void **)&table->meta_row_drop_stmt, DBFLAG_PERSISTENT);
840 258 : cloudsync_memory_free(sql);
841 258 : if (rc != DBRES_OK) goto cleanup;
842 :
843 : // precompile the update rows from meta when pk changes
844 : // see https://github.com/sqliteai/sqlite-sync/blob/main/docs/PriKey.md for more details
845 258 : sql = sql_build_rekey_pk_and_reset_version_except_col(data, table->name, CLOUDSYNC_TOMBSTONE_VALUE);
846 258 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
847 : DEBUG_SQL("meta_update_move_stmt: %s", sql);
848 :
849 258 : rc = databasevm_prepare(data, sql, (void **)&table->meta_update_move_stmt, DBFLAG_PERSISTENT);
850 258 : cloudsync_memory_free(sql);
851 258 : if (rc != DBRES_OK) goto cleanup;
852 :
853 : // local cl
854 258 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_GET_COL_VERSION_OR_ROW_EXISTS, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE, table->meta_ref);
855 258 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
856 : DEBUG_SQL("meta_local_cl_stmt: %s", sql);
857 :
858 258 : rc = databasevm_prepare(data, sql, (void **)&table->meta_local_cl_stmt, DBFLAG_PERSISTENT);
859 258 : cloudsync_memory_free(sql);
860 258 : if (rc != DBRES_OK) goto cleanup;
861 :
862 : // rowid of the last inserted/updated row in the meta table
863 258 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_INSERT_RETURN_CHANGE_ID, table->meta_ref);
864 258 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
865 : DEBUG_SQL("meta_winner_clock_stmt: %s", sql);
866 :
867 258 : rc = databasevm_prepare(data, sql, (void **)&table->meta_winner_clock_stmt, DBFLAG_PERSISTENT);
868 258 : cloudsync_memory_free(sql);
869 258 : if (rc != DBRES_OK) goto cleanup;
870 :
871 258 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_DELETE_PK_EXCEPT_COL, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
872 258 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
873 : DEBUG_SQL("meta_merge_delete_drop: %s", sql);
874 :
875 258 : rc = databasevm_prepare(data, sql, (void **)&table->meta_merge_delete_drop, DBFLAG_PERSISTENT);
876 258 : cloudsync_memory_free(sql);
877 258 : if (rc != DBRES_OK) goto cleanup;
878 :
879 : // zero clock
880 258 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_TOMBSTONE_PK_EXCEPT_COL, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
881 258 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
882 : DEBUG_SQL("meta_zero_clock_stmt: %s", sql);
883 :
884 258 : rc = databasevm_prepare(data, sql, (void **)&table->meta_zero_clock_stmt, DBFLAG_PERSISTENT);
885 258 : cloudsync_memory_free(sql);
886 258 : if (rc != DBRES_OK) goto cleanup;
887 :
888 : // col_version
889 258 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_COL_VERSION_BY_PK_COL, table->meta_ref);
890 258 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
891 : DEBUG_SQL("meta_col_version_stmt: %s", sql);
892 :
893 258 : rc = databasevm_prepare(data, sql, (void **)&table->meta_col_version_stmt, DBFLAG_PERSISTENT);
894 258 : cloudsync_memory_free(sql);
895 258 : if (rc != DBRES_OK) goto cleanup;
896 :
897 : // site_id
898 258 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_SITE_ID_BY_PK_COL, table->meta_ref);
899 258 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
900 : DEBUG_SQL("meta_site_id_stmt: %s", sql);
901 :
902 258 : rc = databasevm_prepare(data, sql, (void **)&table->meta_site_id_stmt, DBFLAG_PERSISTENT);
903 258 : cloudsync_memory_free(sql);
904 258 : if (rc != DBRES_OK) goto cleanup;
905 :
906 : // REAL TABLE statements
907 :
908 : // precompile the get column value statement
909 258 : if (ncols > 0) {
910 219 : sql = sql_build_select_nonpk_by_pk(data, table->name, table->schema);
911 219 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
912 : DEBUG_SQL("real_col_values_stmt: %s", sql);
913 :
914 219 : rc = databasevm_prepare(data, sql, (void **)&table->real_col_values_stmt, DBFLAG_PERSISTENT);
915 219 : cloudsync_memory_free(sql);
916 219 : if (rc != DBRES_OK) goto cleanup;
917 219 : }
918 :
919 258 : sql = table_build_mergedelete_sql(table);
920 258 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
921 : DEBUG_SQL("real_merge_delete: %s", sql);
922 :
923 258 : rc = databasevm_prepare(data, sql, (void **)&table->real_merge_delete_stmt, DBFLAG_PERSISTENT);
924 258 : cloudsync_memory_free(sql);
925 258 : if (rc != DBRES_OK) goto cleanup;
926 :
927 258 : sql = table_build_mergeinsert_sql(table, NULL);
928 258 : if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
929 : DEBUG_SQL("real_merge_sentinel: %s", sql);
930 :
931 258 : rc = databasevm_prepare(data, sql, (void **)&table->real_merge_sentinel_stmt, DBFLAG_PERSISTENT);
932 258 : cloudsync_memory_free(sql);
933 258 : if (rc != DBRES_OK) goto cleanup;
934 :
935 : cleanup:
936 258 : if (rc != DBRES_OK) DEBUG_ALWAYS("table_add_stmts error: %d %s\n", rc, database_errmsg(data));
937 258 : return rc;
938 : }
939 :
940 183643 : cloudsync_table_context *table_lookup (cloudsync_context *data, const char *table_name) {
941 : DEBUG_DBFUNCTION("table_lookup %s", table_name);
942 :
943 183643 : if (table_name) {
944 185247 : for (int i=0; i<data->tables_count; ++i) {
945 184717 : if ((strcasecmp(data->tables[i]->name, table_name) == 0)) return data->tables[i];
946 1604 : }
947 530 : }
948 :
949 530 : return NULL;
950 183643 : }
951 :
952 169057 : void *table_column_lookup (cloudsync_table_context *table, const char *col_name, bool is_merge, int *index) {
953 : DEBUG_DBFUNCTION("table_column_lookup %s", col_name);
954 :
955 315065 : for (int i=0; i<table->ncols; ++i) {
956 315065 : if (strcasecmp(table->col_name[i], col_name) == 0) {
957 169057 : if (index) *index = i;
958 169057 : return (is_merge) ? table->col_merge_stmt[i] : table->col_value_stmt[i];
959 : }
960 146008 : }
961 :
962 0 : if (index) *index = -1;
963 0 : return NULL;
964 169057 : }
965 :
966 258 : int table_remove (cloudsync_context *data, cloudsync_table_context *table) {
967 258 : const char *table_name = table->name;
968 : DEBUG_DBFUNCTION("table_remove %s", table_name);
969 :
970 306 : for (int i = 0; i < data->tables_count; ++i) {
971 306 : cloudsync_table_context *t = data->tables[i];
972 :
973 : // pointer compare is fastest but fallback to strcasecmp if not same pointer
974 306 : if ((t == table) || ((strcasecmp(t->name, table_name) == 0))) {
975 258 : int last = data->tables_count - 1;
976 258 : data->tables[i] = data->tables[last]; // move last into the hole (keeps array dense)
977 258 : data->tables[last] = NULL; // NULLify tail (as an extra security measure)
978 258 : data->tables_count--;
979 258 : return data->tables_count;
980 : }
981 48 : }
982 :
983 0 : return -1;
984 258 : }
985 :
986 1040 : int table_add_to_context_cb (void *xdata, int ncols, char **values, char **names) {
987 1040 : cloudsync_table_context *table = (cloudsync_table_context *)xdata;
988 1040 : cloudsync_context *data = table->context;
989 :
990 1040 : int index = table->ncols;
991 2080 : for (int i=0; i<ncols; i+=2) {
992 1040 : const char *name = values[i];
993 1040 : int cid = (int)strtol(values[i+1], NULL, 0);
994 :
995 1040 : table->col_id[index] = cid;
996 1040 : table->col_name[index] = cloudsync_string_dup_lowercase(name);
997 1040 : if (!table->col_name[index]) goto error;
998 :
999 1040 : char *sql = table_build_mergeinsert_sql(table, name);
1000 1040 : if (!sql) goto error;
1001 : DEBUG_SQL("col_merge_stmt[%d]: %s", index, sql);
1002 :
1003 1040 : int rc = databasevm_prepare(data, sql, (void **)&table->col_merge_stmt[index], DBFLAG_PERSISTENT);
1004 1040 : cloudsync_memory_free(sql);
1005 1040 : if (rc != DBRES_OK) goto error;
1006 1040 : if (!table->col_merge_stmt[index]) goto error;
1007 :
1008 1040 : sql = table_build_value_sql(table, name);
1009 1040 : if (!sql) goto error;
1010 : DEBUG_SQL("col_value_stmt[%d]: %s", index, sql);
1011 :
1012 1040 : rc = databasevm_prepare(data, sql, (void **)&table->col_value_stmt[index], DBFLAG_PERSISTENT);
1013 1040 : cloudsync_memory_free(sql);
1014 1040 : if (rc != DBRES_OK) goto error;
1015 1040 : if (!table->col_value_stmt[index]) goto error;
1016 1040 : }
1017 1040 : table->ncols += 1;
1018 :
1019 1040 : return 0;
1020 :
1021 : error:
1022 : // clean up partially-initialized entry at index
1023 0 : if (table->col_name[index]) {cloudsync_memory_free(table->col_name[index]); table->col_name[index] = NULL;}
1024 0 : if (table->col_merge_stmt[index]) {databasevm_finalize(table->col_merge_stmt[index]); table->col_merge_stmt[index] = NULL;}
1025 0 : if (table->col_value_stmt[index]) {databasevm_finalize(table->col_value_stmt[index]); table->col_value_stmt[index] = NULL;}
1026 0 : return 1;
1027 1040 : }
1028 :
1029 258 : bool table_ensure_capacity (cloudsync_context *data) {
1030 258 : if (data->tables_count < data->tables_cap) return true;
1031 :
1032 0 : int new_cap = data->tables_cap ? data->tables_cap * 2 : CLOUDSYNC_INIT_NTABLES;
1033 0 : size_t bytes = (size_t)new_cap * sizeof(*data->tables);
1034 0 : void *p = cloudsync_memory_realloc(data->tables, bytes);
1035 0 : if (!p) return false;
1036 :
1037 0 : data->tables = (cloudsync_table_context **)p;
1038 0 : data->tables_cap = new_cap;
1039 0 : return true;
1040 258 : }
1041 :
1042 263 : bool table_add_to_context (cloudsync_context *data, table_algo algo, const char *table_name) {
1043 : DEBUG_DBFUNCTION("cloudsync_context_add_table %s", table_name);
1044 :
1045 : // Check if table already initialized in this connection's context.
1046 : // Note: This prevents same-connection duplicate initialization.
1047 : // SQLite clients cannot distinguish schemas, so having 'public.users'
1048 : // and 'auth.users' would cause sync ambiguity. Users should avoid
1049 : // initializing tables with the same name in different schemas.
1050 : // If two concurrent connections initialize tables with the same name
1051 : // in different schemas, the behavior is undefined.
1052 263 : cloudsync_table_context *table = table_lookup(data, table_name);
1053 263 : if (table) return true;
1054 :
1055 : // check for space availability
1056 258 : if (!table_ensure_capacity(data)) return false;
1057 :
1058 : // setup a new table
1059 258 : table = table_create(data, table_name, algo);
1060 258 : if (!table) return false;
1061 :
1062 : // fill remaining metadata in the table
1063 258 : int count = database_count_pk(data, table_name, false, table->schema);
1064 258 : if (count < 0) {cloudsync_set_dberror(data); goto abort_add_table;}
1065 258 : table->npks = count;
1066 258 : if (table->npks == 0) {
1067 : #if CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
1068 0 : goto abort_add_table;
1069 : #else
1070 : table->rowid_only = true;
1071 : table->npks = 1; // rowid
1072 : #endif
1073 : }
1074 :
1075 258 : int ncols = database_count_nonpk(data, table_name, table->schema);
1076 258 : if (ncols < 0) {cloudsync_set_dberror(data); goto abort_add_table;}
1077 258 : int rc = table_add_stmts(table, ncols);
1078 258 : if (rc != DBRES_OK) goto abort_add_table;
1079 :
1080 : // a table with only pk(s) is totally legal
1081 258 : if (ncols > 0) {
1082 219 : table->col_name = (char **)cloudsync_memory_alloc((uint64_t)(sizeof(char *) * ncols));
1083 219 : if (!table->col_name) goto abort_add_table;
1084 :
1085 219 : table->col_id = (int *)cloudsync_memory_alloc((uint64_t)(sizeof(int) * ncols));
1086 219 : if (!table->col_id) goto abort_add_table;
1087 :
1088 219 : table->col_merge_stmt = (dbvm_t **)cloudsync_memory_alloc((uint64_t)(sizeof(void *) * ncols));
1089 219 : if (!table->col_merge_stmt) goto abort_add_table;
1090 :
1091 219 : table->col_value_stmt = (dbvm_t **)cloudsync_memory_alloc((uint64_t)(sizeof(void *) * ncols));
1092 219 : if (!table->col_value_stmt) goto abort_add_table;
1093 :
1094 219 : table->col_algo = (col_algo_t *)cloudsync_memory_zeroalloc((uint64_t)(sizeof(col_algo_t) * ncols));
1095 219 : if (!table->col_algo) goto abort_add_table;
1096 :
1097 219 : table->col_delimiter = (char **)cloudsync_memory_zeroalloc((uint64_t)(sizeof(char *) * ncols));
1098 219 : if (!table->col_delimiter) goto abort_add_table;
1099 :
1100 : // Pass empty string when schema is NULL; SQL will fall back to current_schema()
1101 219 : const char *schema = table->schema ? table->schema : "";
1102 438 : char *sql = cloudsync_memory_mprintf(SQL_PRAGMA_TABLEINFO_LIST_NONPK_NAME_CID,
1103 219 : table_name, schema, table_name, schema);
1104 219 : if (!sql) goto abort_add_table;
1105 219 : rc = database_exec_callback(data, sql, table_add_to_context_cb, (void *)table);
1106 219 : cloudsync_memory_free(sql);
1107 219 : if (rc == DBRES_ABORT) goto abort_add_table;
1108 219 : }
1109 :
1110 : // append newly created table
1111 258 : data->tables[data->tables_count++] = table;
1112 258 : return true;
1113 :
1114 : abort_add_table:
1115 0 : table_free(table);
1116 0 : return false;
1117 263 : }
1118 :
1119 0 : dbvm_t *cloudsync_colvalue_stmt (cloudsync_context *data, const char *tbl_name, bool *persistent) {
1120 0 : dbvm_t *vm = NULL;
1121 0 : *persistent = false;
1122 :
1123 0 : cloudsync_table_context *table = table_lookup(data, tbl_name);
1124 0 : if (table) {
1125 0 : char *col_name = NULL;
1126 0 : if (table->ncols > 0) {
1127 0 : col_name = table->col_name[0];
1128 : // retrieve col_value precompiled statement
1129 0 : vm = table_column_lookup(table, col_name, false, NULL);
1130 0 : *persistent = true;
1131 0 : } else {
1132 0 : char *sql = table_build_value_sql(table, "*");
1133 0 : databasevm_prepare(data, sql, (void **)&vm, 0);
1134 0 : cloudsync_memory_free(sql);
1135 0 : *persistent = false;
1136 : }
1137 0 : }
1138 :
1139 0 : return vm;
1140 : }
1141 :
1142 6832 : bool table_enabled (cloudsync_table_context *table) {
1143 6832 : return table->enabled;
1144 : }
1145 :
1146 6 : void table_set_enabled (cloudsync_table_context *table, bool value) {
1147 6 : table->enabled = value;
1148 6 : }
1149 :
1150 19566 : int table_count_cols (cloudsync_table_context *table) {
1151 19566 : return table->ncols;
1152 : }
1153 :
1154 6687 : int table_count_pks (cloudsync_table_context *table) {
1155 6687 : return table->npks;
1156 : }
1157 :
1158 32728 : const char *table_colname (cloudsync_table_context *table, int index) {
1159 32728 : return table->col_name[index];
1160 : }
1161 :
1162 3530 : bool table_pk_exists (cloudsync_table_context *table, const char *value, size_t len) {
1163 : // check if a row with the same primary key already exists
1164 : // if so, this means the row might have been previously deleted (sentinel)
1165 3530 : return (dbvm_count(table->meta_pkexists_stmt, value, len, DBTYPE_BLOB) > 0);
1166 : }
1167 :
1168 0 : char **table_pknames (cloudsync_table_context *table) {
1169 0 : return table->pk_name;
1170 : }
1171 :
1172 23 : void table_set_pknames (cloudsync_table_context *table, char **pknames) {
1173 23 : table_pknames_free(table->pk_name, table->npks);
1174 23 : table->pk_name = pknames;
1175 23 : }
1176 :
1177 49192 : bool table_algo_isgos (cloudsync_table_context *table) {
1178 49192 : return (table->algo == table_algo_crdt_gos);
1179 : }
1180 :
1181 0 : const char *table_schema (cloudsync_table_context *table) {
1182 0 : return table->schema;
1183 : }
1184 :
1185 : // MARK: - Merge Insert -
1186 :
1187 46042 : int64_t merge_get_local_cl (cloudsync_table_context *table, const char *pk, int pklen) {
1188 46042 : dbvm_t *vm = table->meta_local_cl_stmt;
1189 46042 : int64_t result = -1;
1190 :
1191 46042 : int rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
1192 46042 : if (rc != DBRES_OK) goto cleanup;
1193 :
1194 46042 : rc = databasevm_bind_blob(vm, 2, (const void *)pk, pklen);
1195 46042 : if (rc != DBRES_OK) goto cleanup;
1196 :
1197 46042 : rc = databasevm_step(vm);
1198 46042 : if (rc == DBRES_ROW) result = database_column_int(vm, 0);
1199 0 : else if (rc == DBRES_DONE) result = 0;
1200 :
1201 : cleanup:
1202 46042 : if (result == -1) cloudsync_set_dberror(table->context);
1203 46042 : dbvm_reset(vm);
1204 46042 : return result;
1205 : }
1206 :
1207 45005 : int merge_get_col_version (cloudsync_table_context *table, const char *col_name, const char *pk, int pklen, int64_t *version) {
1208 45005 : dbvm_t *vm = table->meta_col_version_stmt;
1209 :
1210 45005 : int rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
1211 45005 : if (rc != DBRES_OK) goto cleanup;
1212 :
1213 45005 : rc = databasevm_bind_text(vm, 2, col_name, -1);
1214 45005 : if (rc != DBRES_OK) goto cleanup;
1215 :
1216 45005 : rc = databasevm_step(vm);
1217 69907 : if (rc == DBRES_ROW) {
1218 24902 : *version = database_column_int(vm, 0);
1219 24902 : rc = DBRES_OK;
1220 24902 : }
1221 :
1222 : cleanup:
1223 45005 : if ((rc != DBRES_OK) && (rc != DBRES_DONE)) cloudsync_set_dberror(table->context);
1224 45005 : dbvm_reset(vm);
1225 45005 : return rc;
1226 : }
1227 :
1228 25121 : int merge_set_winner_clock (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pk_len, const char *colname, int64_t col_version, int64_t db_version, const char *site_id, int site_len, int64_t seq, int64_t *rowid) {
1229 :
1230 : // get/set site_id
1231 25121 : dbvm_t *vm = data->getset_siteid_stmt;
1232 25121 : int rc = databasevm_bind_blob(vm, 1, (const void *)site_id, site_len);
1233 25121 : if (rc != DBRES_OK) goto cleanup_merge;
1234 :
1235 25121 : rc = databasevm_step(vm);
1236 25121 : if (rc != DBRES_ROW) goto cleanup_merge;
1237 :
1238 25121 : int64_t ord = database_column_int(vm, 0);
1239 25121 : dbvm_reset(vm);
1240 :
1241 25121 : vm = table->meta_winner_clock_stmt;
1242 25121 : rc = databasevm_bind_blob(vm, 1, (const void *)pk, pk_len);
1243 25121 : if (rc != DBRES_OK) goto cleanup_merge;
1244 :
1245 25121 : rc = databasevm_bind_text(vm, 2, (colname) ? colname : CLOUDSYNC_TOMBSTONE_VALUE, -1);
1246 25121 : if (rc != DBRES_OK) goto cleanup_merge;
1247 :
1248 25121 : rc = databasevm_bind_int(vm, 3, col_version);
1249 25121 : if (rc != DBRES_OK) goto cleanup_merge;
1250 :
1251 25121 : rc = databasevm_bind_int(vm, 4, db_version);
1252 25121 : if (rc != DBRES_OK) goto cleanup_merge;
1253 :
1254 25121 : rc = databasevm_bind_int(vm, 5, seq);
1255 25121 : if (rc != DBRES_OK) goto cleanup_merge;
1256 :
1257 25121 : rc = databasevm_bind_int(vm, 6, ord);
1258 25121 : if (rc != DBRES_OK) goto cleanup_merge;
1259 :
1260 25121 : rc = databasevm_step(vm);
1261 50242 : if (rc == DBRES_ROW) {
1262 25121 : *rowid = database_column_int(vm, 0);
1263 25121 : rc = DBRES_OK;
1264 25121 : }
1265 :
1266 : cleanup_merge:
1267 25121 : if (rc != DBRES_OK) cloudsync_set_dberror(data);
1268 25121 : dbvm_reset(vm);
1269 25121 : return rc;
1270 : }
1271 :
1272 : // MARK: - Deferred column-batch merge functions -
1273 :
1274 21318 : static int merge_pending_add (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, const char *col_name, dbvalue_t *col_value, int64_t col_version, int64_t db_version, const char *site_id, int site_len, int64_t seq) {
1275 21318 : merge_pending_batch *batch = data->pending_batch;
1276 :
1277 : // Store table and PK on first entry
1278 21318 : if (batch->table == NULL) {
1279 7685 : batch->table = table;
1280 7685 : batch->pk = (char *)cloudsync_memory_alloc(pklen);
1281 7685 : if (!batch->pk) return cloudsync_set_error(data, "merge_pending_add: out of memory for pk", DBRES_NOMEM);
1282 7685 : memcpy(batch->pk, pk, pklen);
1283 7685 : batch->pk_len = pklen;
1284 7685 : }
1285 :
1286 : // Ensure capacity
1287 21318 : if (batch->count >= batch->capacity) {
1288 494 : int new_cap = batch->capacity ? batch->capacity * 2 : 8;
1289 494 : merge_pending_entry *new_entries = (merge_pending_entry *)cloudsync_memory_realloc(batch->entries, new_cap * sizeof(merge_pending_entry));
1290 494 : if (!new_entries) return cloudsync_set_error(data, "merge_pending_add: out of memory for entries", DBRES_NOMEM);
1291 494 : batch->entries = new_entries;
1292 494 : batch->capacity = new_cap;
1293 494 : }
1294 :
1295 : // Resolve col_name to a stable pointer from the table context
1296 : // (the incoming col_name may point to VM-owned memory that gets freed on reset)
1297 21318 : int col_idx = -1;
1298 21318 : table_column_lookup(table, col_name, true, &col_idx);
1299 21318 : const char *stable_col_name = (col_idx >= 0) ? table_colname(table, col_idx) : NULL;
1300 21318 : if (!stable_col_name) return cloudsync_set_error(data, "merge_pending_add: column not found in table context", DBRES_ERROR);
1301 :
1302 21318 : merge_pending_entry *e = &batch->entries[batch->count];
1303 21318 : e->col_name = stable_col_name;
1304 21318 : e->col_value = col_value ? (dbvalue_t *)database_value_dup(col_value) : NULL;
1305 21318 : e->col_version = col_version;
1306 21318 : e->db_version = db_version;
1307 21318 : e->site_id_len = (site_len <= (int)sizeof(e->site_id)) ? site_len : (int)sizeof(e->site_id);
1308 21318 : memcpy(e->site_id, site_id, e->site_id_len);
1309 21318 : e->seq = seq;
1310 :
1311 21318 : batch->count++;
1312 21318 : return DBRES_OK;
1313 21318 : }
1314 :
1315 18878 : static void merge_pending_free_entries (merge_pending_batch *batch) {
1316 18878 : if (batch->entries) {
1317 35561 : for (int i = 0; i < batch->count; i++) {
1318 21318 : if (batch->entries[i].col_value) {
1319 21318 : database_value_free(batch->entries[i].col_value);
1320 21318 : batch->entries[i].col_value = NULL;
1321 21318 : }
1322 21318 : }
1323 14243 : }
1324 18878 : if (batch->pk) {
1325 7717 : cloudsync_memory_free(batch->pk);
1326 7717 : batch->pk = NULL;
1327 7717 : }
1328 18878 : batch->table = NULL;
1329 18878 : batch->pk_len = 0;
1330 18878 : batch->cl = 0;
1331 18878 : batch->sentinel_pending = false;
1332 18878 : batch->row_exists = false;
1333 18878 : batch->count = 0;
1334 18878 : }
1335 :
1336 18878 : static int merge_flush_pending (cloudsync_context *data) {
1337 18878 : merge_pending_batch *batch = data->pending_batch;
1338 18878 : if (!batch) return DBRES_OK;
1339 :
1340 18878 : int rc = DBRES_OK;
1341 18878 : bool flush_savepoint = false;
1342 :
1343 : // Nothing to write — handle sentinel-only case or skip
1344 18878 : if (batch->count == 0 && !(batch->sentinel_pending && batch->table)) {
1345 11161 : goto cleanup;
1346 : }
1347 :
1348 : // Wrap database operations in a savepoint so that on failure (e.g. RLS
1349 : // denial) the rollback properly releases all executor resources (open
1350 : // relations, snapshots, plan cache) acquired during the failed statement.
1351 7717 : flush_savepoint = (database_begin_savepoint(data, "merge_flush") == DBRES_OK);
1352 :
1353 7717 : if (batch->count == 0) {
1354 : // Sentinel with no winning columns (PK-only row)
1355 30 : dbvm_t *vm = batch->table->real_merge_sentinel_stmt;
1356 30 : rc = pk_decode_prikey(batch->pk, (size_t)batch->pk_len, pk_decode_bind_callback, vm);
1357 30 : if (rc < 0) {
1358 0 : cloudsync_set_dberror(data);
1359 0 : dbvm_reset(vm);
1360 0 : goto cleanup;
1361 : }
1362 30 : SYNCBIT_SET(data);
1363 30 : rc = databasevm_step(vm);
1364 30 : dbvm_reset(vm);
1365 30 : SYNCBIT_RESET(data);
1366 30 : if (rc == DBRES_DONE) rc = DBRES_OK;
1367 30 : if (rc != DBRES_OK) {
1368 0 : cloudsync_set_dberror(data);
1369 0 : goto cleanup;
1370 : }
1371 30 : goto cleanup;
1372 : }
1373 :
1374 : // Check if cached prepared statement can be reused
1375 7687 : cloudsync_table_context *table = batch->table;
1376 7687 : dbvm_t *vm = NULL;
1377 7687 : bool cache_hit = false;
1378 :
1379 14576 : if (batch->cached_vm &&
1380 7193 : batch->cached_row_exists == batch->row_exists &&
1381 6889 : batch->cached_col_count == batch->count) {
1382 6867 : cache_hit = true;
1383 26557 : for (int i = 0; i < batch->count; i++) {
1384 19715 : if (batch->cached_col_names[i] != batch->entries[i].col_name) {
1385 25 : cache_hit = false;
1386 25 : break;
1387 : }
1388 19690 : }
1389 6867 : }
1390 :
1391 7687 : if (cache_hit) {
1392 6842 : vm = batch->cached_vm;
1393 6842 : dbvm_reset(vm);
1394 6842 : } else {
1395 : // Invalidate old cache
1396 845 : if (batch->cached_vm) {
1397 351 : databasevm_finalize(batch->cached_vm);
1398 351 : batch->cached_vm = NULL;
1399 351 : }
1400 :
1401 : // Build multi-column SQL
1402 845 : const char **colnames = (const char **)cloudsync_memory_alloc(batch->count * sizeof(const char *));
1403 845 : if (!colnames) {
1404 0 : rc = cloudsync_set_error(data, "merge_flush_pending: out of memory", DBRES_NOMEM);
1405 0 : goto cleanup;
1406 : }
1407 2473 : for (int i = 0; i < batch->count; i++) {
1408 1628 : colnames[i] = batch->entries[i].col_name;
1409 1628 : }
1410 :
1411 845 : char *sql = batch->row_exists
1412 417 : ? sql_build_update_pk_and_multi_cols(data, table->name, colnames, batch->count, table->schema)
1413 428 : : sql_build_upsert_pk_and_multi_cols(data, table->name, colnames, batch->count, table->schema);
1414 845 : cloudsync_memory_free(colnames);
1415 :
1416 845 : if (!sql) {
1417 0 : rc = cloudsync_set_error(data, "merge_flush_pending: unable to build multi-column upsert SQL", DBRES_ERROR);
1418 0 : goto cleanup;
1419 : }
1420 :
1421 845 : rc = databasevm_prepare(data, sql, &vm, 0);
1422 845 : cloudsync_memory_free(sql);
1423 845 : if (rc != DBRES_OK) {
1424 0 : rc = cloudsync_set_error(data, "merge_flush_pending: unable to prepare statement", rc);
1425 0 : goto cleanup;
1426 : }
1427 :
1428 : // Update cache
1429 845 : batch->cached_vm = vm;
1430 845 : batch->cached_row_exists = batch->row_exists;
1431 845 : batch->cached_col_count = batch->count;
1432 : // Reallocate cached_col_names if needed
1433 845 : if (batch->cached_col_count > 0) {
1434 845 : const char **new_names = (const char **)cloudsync_memory_realloc(
1435 845 : batch->cached_col_names, batch->count * sizeof(const char *));
1436 845 : if (new_names) {
1437 2473 : for (int i = 0; i < batch->count; i++) {
1438 1628 : new_names[i] = batch->entries[i].col_name;
1439 1628 : }
1440 845 : batch->cached_col_names = new_names;
1441 845 : }
1442 845 : }
1443 : }
1444 :
1445 : // Bind PKs (positions 1..npks)
1446 7687 : int npks = pk_decode_prikey(batch->pk, (size_t)batch->pk_len, pk_decode_bind_callback, vm);
1447 7687 : if (npks < 0) {
1448 0 : cloudsync_set_dberror(data);
1449 0 : dbvm_reset(vm);
1450 0 : rc = DBRES_ERROR;
1451 0 : goto cleanup;
1452 : }
1453 :
1454 : // Bind column values (positions npks+1..npks+count)
1455 29005 : for (int i = 0; i < batch->count; i++) {
1456 21318 : merge_pending_entry *e = &batch->entries[i];
1457 21318 : int bind_idx = npks + 1 + i;
1458 21318 : if (e->col_value) {
1459 21318 : rc = databasevm_bind_value(vm, bind_idx, e->col_value);
1460 21318 : } else {
1461 0 : rc = databasevm_bind_null(vm, bind_idx);
1462 : }
1463 21318 : if (rc != DBRES_OK) {
1464 0 : cloudsync_set_dberror(data);
1465 0 : dbvm_reset(vm);
1466 0 : goto cleanup;
1467 : }
1468 21318 : }
1469 :
1470 : // Execute with SYNCBIT and GOS handling
1471 7687 : if (table->algo == table_algo_crdt_gos) table->enabled = 0;
1472 7687 : SYNCBIT_SET(data);
1473 7687 : rc = databasevm_step(vm);
1474 7687 : dbvm_reset(vm);
1475 7687 : SYNCBIT_RESET(data);
1476 7687 : if (table->algo == table_algo_crdt_gos) table->enabled = 1;
1477 :
1478 7687 : if (rc != DBRES_DONE) {
1479 3 : cloudsync_set_dberror(data);
1480 3 : goto cleanup;
1481 : }
1482 7684 : rc = DBRES_OK;
1483 :
1484 : // Call merge_set_winner_clock for each buffered entry
1485 7684 : int64_t rowid = 0;
1486 28994 : for (int i = 0; i < batch->count; i++) {
1487 21310 : merge_pending_entry *e = &batch->entries[i];
1488 42620 : int clock_rc = merge_set_winner_clock(data, table, batch->pk, batch->pk_len,
1489 21310 : e->col_name, e->col_version, e->db_version,
1490 21310 : (const char *)e->site_id, e->site_id_len,
1491 21310 : e->seq, &rowid);
1492 21310 : if (clock_rc != DBRES_OK) {
1493 0 : rc = clock_rc;
1494 0 : goto cleanup;
1495 : }
1496 28994 : }
1497 :
1498 : cleanup:
1499 18878 : merge_pending_free_entries(batch);
1500 18878 : if (flush_savepoint) {
1501 7717 : if (rc == DBRES_OK) database_commit_savepoint(data, "merge_flush");
1502 3 : else database_rollback_savepoint(data, "merge_flush");
1503 7717 : }
1504 18878 : return rc;
1505 18878 : }
1506 :
1507 3167 : int merge_insert_col (cloudsync_context *data, cloudsync_table_context *table, const void *pk, int pklen, const char *col_name, dbvalue_t *col_value, int64_t col_version, int64_t db_version, const char *site_id, int site_len, int64_t seq, int64_t *rowid) {
1508 : int index;
1509 3167 : dbvm_t *vm = table_column_lookup(table, col_name, true, &index);
1510 3167 : if (vm == NULL) return cloudsync_set_error(data, "Unable to retrieve column merge precompiled statement in merge_insert_col", DBRES_MISUSE);
1511 :
1512 : // INSERT INTO table (pk1, pk2, col_name) VALUES (?, ?, ?) ON CONFLICT DO UPDATE SET col_name=?;"
1513 :
1514 : // bind primary key(s)
1515 3167 : int rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, vm);
1516 3167 : if (rc < 0) {
1517 0 : cloudsync_set_dberror(data);
1518 0 : dbvm_reset(vm);
1519 0 : return rc;
1520 : }
1521 :
1522 : // bind value (always bind all expected parameters for correct prepared statement handling)
1523 3167 : if (col_value) {
1524 3167 : rc = databasevm_bind_value(vm, table->npks+1, col_value);
1525 3167 : if (rc == DBRES_OK) rc = databasevm_bind_value(vm, table->npks+2, col_value);
1526 3167 : } else {
1527 0 : rc = databasevm_bind_null(vm, table->npks+1);
1528 0 : if (rc == DBRES_OK) rc = databasevm_bind_null(vm, table->npks+2);
1529 : }
1530 3167 : if (rc != DBRES_OK) {
1531 0 : cloudsync_set_dberror(data);
1532 0 : dbvm_reset(vm);
1533 0 : return rc;
1534 : }
1535 :
1536 : // perform real operation and disable triggers
1537 :
1538 : // in case of GOS we reused the table->col_merge_stmt statement
1539 : // which looks like: INSERT INTO table (pk1, pk2, col_name) VALUES (?, ?, ?) ON CONFLICT DO UPDATE SET col_name=?;"
1540 : // but the UPDATE in the CONFLICT statement would return SQLITE_CONSTRAINT because the trigger raises the error
1541 : // the trick is to disable that trigger before executing the statement
1542 3167 : if (table->algo == table_algo_crdt_gos) table->enabled = 0;
1543 3167 : SYNCBIT_SET(data);
1544 3167 : rc = databasevm_step(vm);
1545 : DEBUG_MERGE("merge_insert(%02x%02x): %s (%d)", data->site_id[UUID_LEN-2], data->site_id[UUID_LEN-1], databasevm_sql(vm), rc);
1546 3167 : dbvm_reset(vm);
1547 3167 : SYNCBIT_RESET(data);
1548 3167 : if (table->algo == table_algo_crdt_gos) table->enabled = 1;
1549 :
1550 3167 : if (rc != DBRES_DONE) {
1551 0 : cloudsync_set_dberror(data);
1552 0 : return rc;
1553 : }
1554 :
1555 3167 : return merge_set_winner_clock(data, table, pk, pklen, col_name, col_version, db_version, site_id, site_len, seq, rowid);
1556 3167 : }
1557 :
1558 170 : int merge_delete (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, const char *colname, int64_t cl, int64_t db_version, const char *site_id, int site_len, int64_t seq, int64_t *rowid) {
1559 170 : int rc = DBRES_OK;
1560 :
1561 : // reset return value
1562 170 : *rowid = 0;
1563 :
1564 : // bind pk
1565 170 : dbvm_t *vm = table->real_merge_delete_stmt;
1566 170 : rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, vm);
1567 170 : if (rc < 0) {
1568 0 : rc = cloudsync_set_dberror(data);
1569 0 : dbvm_reset(vm);
1570 0 : return rc;
1571 : }
1572 :
1573 : // perform real operation and disable triggers
1574 170 : SYNCBIT_SET(data);
1575 170 : rc = databasevm_step(vm);
1576 : DEBUG_MERGE("merge_delete(%02x%02x): %s (%d)", data->site_id[UUID_LEN-2], data->site_id[UUID_LEN-1], databasevm_sql(vm), rc);
1577 170 : dbvm_reset(vm);
1578 170 : SYNCBIT_RESET(data);
1579 170 : if (rc == DBRES_DONE) rc = DBRES_OK;
1580 170 : if (rc != DBRES_OK) {
1581 0 : cloudsync_set_dberror(data);
1582 0 : return rc;
1583 : }
1584 :
1585 170 : rc = merge_set_winner_clock(data, table, pk, pklen, colname, cl, db_version, site_id, site_len, seq, rowid);
1586 170 : if (rc != DBRES_OK) return rc;
1587 :
1588 : // drop clocks _after_ setting the winner clock so we don't lose track of the max db_version!!
1589 : // this must never come before `set_winner_clock`
1590 170 : vm = table->meta_merge_delete_drop;
1591 170 : rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
1592 170 : if (rc == DBRES_OK) rc = databasevm_step(vm);
1593 170 : dbvm_reset(vm);
1594 :
1595 170 : if (rc == DBRES_DONE) rc = DBRES_OK;
1596 170 : if (rc != DBRES_OK) cloudsync_set_dberror(data);
1597 170 : return rc;
1598 170 : }
1599 :
1600 62 : int merge_zeroclock_on_resurrect(cloudsync_table_context *table, int64_t db_version, const char *pk, int pklen) {
1601 62 : dbvm_t *vm = table->meta_zero_clock_stmt;
1602 :
1603 62 : int rc = databasevm_bind_int(vm, 1, db_version);
1604 62 : if (rc != DBRES_OK) goto cleanup;
1605 :
1606 62 : rc = databasevm_bind_blob(vm, 2, (const void *)pk, pklen);
1607 62 : if (rc != DBRES_OK) goto cleanup;
1608 :
1609 62 : rc = databasevm_step(vm);
1610 62 : if (rc == DBRES_DONE) rc = DBRES_OK;
1611 :
1612 : cleanup:
1613 62 : if (rc != DBRES_OK) cloudsync_set_dberror(table->context);
1614 62 : dbvm_reset(vm);
1615 62 : return rc;
1616 : }
1617 :
1618 : // executed only if insert_cl == local_cl
1619 45005 : int merge_did_cid_win (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, dbvalue_t *insert_value, const char *site_id, int site_len, const char *col_name, int64_t col_version, bool *didwin_flag) {
1620 :
1621 45005 : if (col_name == NULL) col_name = CLOUDSYNC_TOMBSTONE_VALUE;
1622 :
1623 : int64_t local_version;
1624 45005 : int rc = merge_get_col_version(table, col_name, pk, pklen, &local_version);
1625 45005 : if (rc == DBRES_DONE) {
1626 : // no rows returned, the incoming change wins if there's nothing there locally
1627 20103 : *didwin_flag = true;
1628 20103 : return DBRES_OK;
1629 : }
1630 24902 : if (rc != DBRES_OK) return rc;
1631 :
1632 : // rc == DBRES_OK, means that a row with a version exists
1633 24902 : if (local_version != col_version) {
1634 1984 : if (col_version > local_version) {*didwin_flag = true; return DBRES_OK;}
1635 749 : if (col_version < local_version) {*didwin_flag = false; return DBRES_OK;}
1636 0 : }
1637 :
1638 : // rc == DBRES_ROW and col_version == local_version, need to compare values
1639 :
1640 : // retrieve col_value precompiled statement
1641 22918 : bool is_block_col = block_is_block_colname(col_name) && table_has_block_cols(table);
1642 : dbvm_t *vm;
1643 22918 : if (is_block_col) {
1644 : // Block column: read value from blocks table (pk + col_name bindings)
1645 43 : vm = table_block_value_read_stmt(table);
1646 43 : if (!vm) return cloudsync_set_error(data, "Unable to retrieve block value read statement in merge_did_cid_win", DBRES_ERROR);
1647 43 : rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
1648 43 : if (rc != DBRES_OK) { dbvm_reset(vm); return cloudsync_set_dberror(data); }
1649 43 : rc = databasevm_bind_text(vm, 2, col_name, -1);
1650 43 : if (rc != DBRES_OK) { dbvm_reset(vm); return cloudsync_set_dberror(data); }
1651 43 : } else {
1652 22875 : vm = table_column_lookup(table, col_name, false, NULL);
1653 22875 : if (!vm) return cloudsync_set_error(data, "Unable to retrieve column value precompiled statement in merge_did_cid_win", DBRES_ERROR);
1654 :
1655 : // bind primary key values
1656 22875 : rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, (void *)vm);
1657 22875 : if (rc < 0) {
1658 0 : rc = cloudsync_set_dberror(data);
1659 0 : dbvm_reset(vm);
1660 0 : return rc;
1661 : }
1662 : }
1663 :
1664 : // execute vm
1665 : dbvalue_t *local_value;
1666 22918 : rc = databasevm_step(vm);
1667 22918 : if (rc == DBRES_DONE) {
1668 : // meta entry exists but the actual value is missing
1669 : // we should allow the value_compare function to make a decision
1670 : // value_compare has been modified to handle the case where lvalue is NULL
1671 2 : local_value = NULL;
1672 2 : rc = DBRES_OK;
1673 22918 : } else if (rc == DBRES_ROW) {
1674 22916 : local_value = database_column_value(vm, 0);
1675 22916 : rc = DBRES_OK;
1676 22916 : } else {
1677 0 : goto cleanup;
1678 : }
1679 :
1680 : // compare values
1681 22918 : int ret = dbutils_value_compare(insert_value, local_value);
1682 : // reset after compare, otherwise local value would be deallocated
1683 22918 : dbvm_reset(vm);
1684 22918 : vm = NULL;
1685 :
1686 22918 : bool compare_site_id = (ret == 0 && data->merge_equal_values == true);
1687 22918 : if (!compare_site_id) {
1688 22918 : *didwin_flag = (ret > 0);
1689 22918 : goto cleanup;
1690 : }
1691 :
1692 : // values are the same and merge_equal_values is true
1693 0 : vm = table->meta_site_id_stmt;
1694 0 : rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
1695 0 : if (rc != DBRES_OK) goto cleanup;
1696 :
1697 0 : rc = databasevm_bind_text(vm, 2, col_name, -1);
1698 0 : if (rc != DBRES_OK) goto cleanup;
1699 :
1700 0 : rc = databasevm_step(vm);
1701 0 : if (rc == DBRES_ROW) {
1702 0 : const void *local_site_id = database_column_blob(vm, 0, NULL);
1703 0 : if (!local_site_id) {
1704 0 : dbvm_reset(vm);
1705 0 : return cloudsync_set_error(data, "NULL site_id in cloudsync table, table is probably corrupted", DBRES_ERROR);
1706 : }
1707 0 : ret = memcmp(site_id, local_site_id, site_len);
1708 0 : *didwin_flag = (ret > 0);
1709 0 : dbvm_reset(vm);
1710 0 : return DBRES_OK;
1711 : }
1712 :
1713 : // handle error condition here
1714 0 : dbvm_reset(vm);
1715 0 : return cloudsync_set_error(data, "Unable to find site_id for previous change, cloudsync table is probably corrupted", DBRES_ERROR);
1716 :
1717 : cleanup:
1718 22918 : if (rc != DBRES_OK) cloudsync_set_dberror(data);
1719 22918 : dbvm_reset(vm);
1720 22918 : return rc;
1721 45005 : }
1722 :
1723 62 : int merge_sentinel_only_insert (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, int64_t cl, int64_t db_version, const char *site_id, int site_len, int64_t seq, int64_t *rowid) {
1724 :
1725 : // reset return value
1726 62 : *rowid = 0;
1727 :
1728 62 : if (data->pending_batch == NULL) {
1729 : // Immediate mode: execute base table INSERT
1730 0 : dbvm_t *vm = table->real_merge_sentinel_stmt;
1731 0 : int rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, vm);
1732 0 : if (rc < 0) {
1733 0 : rc = cloudsync_set_dberror(data);
1734 0 : dbvm_reset(vm);
1735 0 : return rc;
1736 : }
1737 :
1738 0 : SYNCBIT_SET(data);
1739 0 : rc = databasevm_step(vm);
1740 0 : dbvm_reset(vm);
1741 0 : SYNCBIT_RESET(data);
1742 0 : if (rc == DBRES_DONE) rc = DBRES_OK;
1743 0 : if (rc != DBRES_OK) {
1744 0 : cloudsync_set_dberror(data);
1745 0 : return rc;
1746 : }
1747 0 : } else {
1748 : // Batch mode: skip base table INSERT, the batch flush will create the row
1749 62 : merge_pending_batch *batch = data->pending_batch;
1750 62 : batch->sentinel_pending = true;
1751 62 : if (batch->table == NULL) {
1752 32 : batch->table = table;
1753 32 : batch->pk = (char *)cloudsync_memory_alloc(pklen);
1754 32 : if (!batch->pk) return cloudsync_set_error(data, "merge_sentinel_only_insert: out of memory for pk", DBRES_NOMEM);
1755 32 : memcpy(batch->pk, pk, pklen);
1756 32 : batch->pk_len = pklen;
1757 32 : }
1758 : }
1759 :
1760 : // Metadata operations always execute regardless of batch mode
1761 62 : int rc = merge_zeroclock_on_resurrect(table, db_version, pk, pklen);
1762 62 : if (rc != DBRES_OK) return rc;
1763 :
1764 62 : return merge_set_winner_clock(data, table, pk, pklen, NULL, cl, db_version, site_id, site_len, seq, rowid);
1765 62 : }
1766 :
1767 : // MARK: - Block-level merge helpers -
1768 :
1769 : // Store a block value in the blocks table
1770 379 : static int block_store_value (cloudsync_context *data, cloudsync_table_context *table, const void *pk, int pklen, const char *block_colname, dbvalue_t *col_value) {
1771 379 : dbvm_t *vm = table->block_value_write_stmt;
1772 379 : if (!vm) return cloudsync_set_error(data, "block_store_value: blocks table not initialized", DBRES_MISUSE);
1773 :
1774 379 : int rc = databasevm_bind_blob(vm, 1, pk, pklen);
1775 379 : if (rc != DBRES_OK) goto cleanup;
1776 379 : rc = databasevm_bind_text(vm, 2, block_colname, -1);
1777 379 : if (rc != DBRES_OK) goto cleanup;
1778 379 : if (col_value) {
1779 379 : rc = databasevm_bind_value(vm, 3, col_value);
1780 379 : } else {
1781 0 : rc = databasevm_bind_null(vm, 3);
1782 : }
1783 379 : if (rc != DBRES_OK) goto cleanup;
1784 :
1785 379 : rc = databasevm_step(vm);
1786 379 : if (rc == DBRES_DONE) rc = DBRES_OK;
1787 :
1788 : cleanup:
1789 379 : if (rc != DBRES_OK) cloudsync_set_dberror(data);
1790 379 : databasevm_reset(vm);
1791 379 : return rc;
1792 379 : }
1793 :
1794 : // Delete a block value from the blocks table
1795 72 : static int block_delete_value (cloudsync_context *data, cloudsync_table_context *table, const void *pk, int pklen, const char *block_colname) {
1796 72 : dbvm_t *vm = table->block_value_delete_stmt;
1797 72 : if (!vm) return cloudsync_set_error(data, "block_delete_value: blocks table not initialized", DBRES_MISUSE);
1798 :
1799 72 : int rc = databasevm_bind_blob(vm, 1, pk, pklen);
1800 72 : if (rc != DBRES_OK) goto cleanup;
1801 72 : rc = databasevm_bind_text(vm, 2, block_colname, -1);
1802 72 : if (rc != DBRES_OK) goto cleanup;
1803 :
1804 72 : rc = databasevm_step(vm);
1805 72 : if (rc == DBRES_DONE) rc = DBRES_OK;
1806 :
1807 : cleanup:
1808 72 : if (rc != DBRES_OK) cloudsync_set_dberror(data);
1809 72 : databasevm_reset(vm);
1810 72 : return rc;
1811 72 : }
1812 :
1813 : // Materialize all alive blocks for a base column into the base table
1814 462 : int block_materialize_column (cloudsync_context *data, cloudsync_table_context *table, const void *pk, int pklen, const char *base_col_name) {
1815 462 : if (!table->block_list_stmt) return cloudsync_set_error(data, "block_materialize_column: blocks table not initialized", DBRES_MISUSE);
1816 :
1817 : // Find column index and delimiter
1818 462 : int col_idx = -1;
1819 468 : for (int i = 0; i < table->ncols; i++) {
1820 468 : if (strcasecmp(table->col_name[i], base_col_name) == 0) {
1821 462 : col_idx = i;
1822 462 : break;
1823 : }
1824 6 : }
1825 462 : if (col_idx < 0) return cloudsync_set_error(data, "block_materialize_column: column not found", DBRES_ERROR);
1826 462 : const char *delimiter = table->col_delimiter[col_idx] ? table->col_delimiter[col_idx] : BLOCK_DEFAULT_DELIMITER;
1827 :
1828 : // Build the LIKE pattern for block col_names: "base_col\x1F%"
1829 462 : char *like_pattern = block_build_colname(base_col_name, "%");
1830 462 : if (!like_pattern) return DBRES_NOMEM;
1831 :
1832 : // Query alive blocks from blocks table joined with metadata
1833 : // block_list_stmt: SELECT b.col_value FROM blocks b JOIN meta m
1834 : // ON b.pk = m.pk AND b.col_name = m.col_name
1835 : // WHERE b.pk = ? AND b.col_name LIKE ? AND m.col_version % 2 = 1
1836 : // ORDER BY b.col_name
1837 462 : dbvm_t *vm = table->block_list_stmt;
1838 462 : int rc = databasevm_bind_blob(vm, 1, pk, pklen);
1839 462 : if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_reset(vm); return rc; }
1840 462 : rc = databasevm_bind_text(vm, 2, like_pattern, -1);
1841 462 : if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_reset(vm); return rc; }
1842 : // Bind pk again for the join condition (parameter 3)
1843 462 : rc = databasevm_bind_blob(vm, 3, pk, pklen);
1844 462 : if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_reset(vm); return rc; }
1845 462 : rc = databasevm_bind_text(vm, 4, like_pattern, -1);
1846 462 : if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_reset(vm); return rc; }
1847 :
1848 : // Collect block values
1849 462 : const char **block_values = NULL;
1850 462 : int block_count = 0;
1851 462 : int block_cap = 0;
1852 :
1853 22695 : while ((rc = databasevm_step(vm)) == DBRES_ROW) {
1854 22233 : const char *value = database_column_text(vm, 0);
1855 22233 : if (block_count >= block_cap) {
1856 1076 : int new_cap = block_cap ? block_cap * 2 : 16;
1857 1076 : const char **new_arr = (const char **)cloudsync_memory_realloc((void *)block_values, (uint64_t)(new_cap * sizeof(char *)));
1858 1076 : if (!new_arr) { rc = DBRES_NOMEM; break; }
1859 1076 : block_values = new_arr;
1860 1076 : block_cap = new_cap;
1861 1076 : }
1862 22233 : block_values[block_count] = value ? cloudsync_string_dup(value) : cloudsync_string_dup("");
1863 22233 : block_count++;
1864 : }
1865 462 : databasevm_reset(vm);
1866 462 : cloudsync_memory_free(like_pattern);
1867 :
1868 462 : if (rc != DBRES_DONE && rc != DBRES_OK && rc != DBRES_ROW) {
1869 : // Free collected values
1870 0 : for (int i = 0; i < block_count; i++) cloudsync_memory_free((void *)block_values[i]);
1871 0 : if (block_values) cloudsync_memory_free((void *)block_values);
1872 0 : return cloudsync_set_dberror(data);
1873 : }
1874 :
1875 : // Materialize text (NULL when no alive blocks)
1876 462 : char *text = (block_count > 0) ? block_materialize_text(block_values, block_count, delimiter) : NULL;
1877 22695 : for (int i = 0; i < block_count; i++) cloudsync_memory_free((void *)block_values[i]);
1878 462 : if (block_values) cloudsync_memory_free((void *)block_values);
1879 462 : if (block_count > 0 && !text) return DBRES_NOMEM;
1880 :
1881 : // Update the base table column via the col_merge_stmt (with triggers disabled)
1882 462 : dbvm_t *merge_vm = table->col_merge_stmt[col_idx];
1883 462 : if (!merge_vm) { cloudsync_memory_free(text); return DBRES_ERROR; }
1884 :
1885 : // Bind PKs
1886 462 : rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, merge_vm);
1887 462 : if (rc < 0) { cloudsync_memory_free(text); databasevm_reset(merge_vm); return DBRES_ERROR; }
1888 :
1889 : // Bind the text value twice (INSERT value + ON CONFLICT UPDATE value)
1890 462 : int npks = table->npks;
1891 462 : if (text) {
1892 458 : rc = databasevm_bind_text(merge_vm, npks + 1, text, -1);
1893 458 : if (rc != DBRES_OK) { cloudsync_memory_free(text); databasevm_reset(merge_vm); return rc; }
1894 458 : rc = databasevm_bind_text(merge_vm, npks + 2, text, -1);
1895 458 : if (rc != DBRES_OK) { cloudsync_memory_free(text); databasevm_reset(merge_vm); return rc; }
1896 458 : } else {
1897 4 : rc = databasevm_bind_null(merge_vm, npks + 1);
1898 4 : if (rc != DBRES_OK) { databasevm_reset(merge_vm); return rc; }
1899 4 : rc = databasevm_bind_null(merge_vm, npks + 2);
1900 4 : if (rc != DBRES_OK) { databasevm_reset(merge_vm); return rc; }
1901 : }
1902 :
1903 : // Execute with triggers disabled
1904 462 : table->enabled = 0;
1905 462 : SYNCBIT_SET(data);
1906 462 : rc = databasevm_step(merge_vm);
1907 462 : databasevm_reset(merge_vm);
1908 462 : SYNCBIT_RESET(data);
1909 462 : table->enabled = 1;
1910 :
1911 462 : cloudsync_memory_free(text);
1912 :
1913 462 : if (rc == DBRES_DONE) rc = DBRES_OK;
1914 462 : if (rc != DBRES_OK) return cloudsync_set_dberror(data);
1915 462 : return DBRES_OK;
1916 462 : }
1917 :
1918 : // Accessor for has_block_cols flag
1919 1024 : bool table_has_block_cols (cloudsync_table_context *table) {
1920 1024 : return table && table->has_block_cols;
1921 : }
1922 :
1923 : // Get block column algo for a given column index
1924 11420 : col_algo_t table_col_algo (cloudsync_table_context *table, int index) {
1925 11420 : if (!table || !table->col_algo || index < 0 || index >= table->ncols) return col_algo_normal;
1926 11420 : return table->col_algo[index];
1927 11420 : }
1928 :
1929 : // Get block delimiter for a given column index
1930 133 : const char *table_col_delimiter (cloudsync_table_context *table, int index) {
1931 133 : if (!table || !table->col_delimiter || index < 0 || index >= table->ncols) return BLOCK_DEFAULT_DELIMITER;
1932 133 : return table->col_delimiter[index] ? table->col_delimiter[index] : BLOCK_DEFAULT_DELIMITER;
1933 133 : }
1934 :
1935 : // Block column struct accessors (for use outside cloudsync.c where struct is opaque)
1936 1024 : dbvm_t *table_block_value_read_stmt (cloudsync_table_context *table) { return table ? table->block_value_read_stmt : NULL; }
1937 496 : dbvm_t *table_block_value_write_stmt (cloudsync_table_context *table) { return table ? table->block_value_write_stmt : NULL; }
1938 91 : dbvm_t *table_block_list_stmt (cloudsync_table_context *table) { return table ? table->block_list_stmt : NULL; }
1939 91 : const char *table_blocks_ref (cloudsync_table_context *table) { return table ? table->blocks_ref : NULL; }
1940 :
1941 2 : void table_set_col_delimiter (cloudsync_table_context *table, int col_idx, const char *delimiter) {
1942 2 : if (!table || !table->col_delimiter || col_idx < 0 || col_idx >= table->ncols) return;
1943 2 : if (table->col_delimiter[col_idx]) cloudsync_memory_free(table->col_delimiter[col_idx]);
1944 2 : table->col_delimiter[col_idx] = delimiter ? cloudsync_string_dup(delimiter) : NULL;
1945 2 : }
1946 :
1947 : // Find column index by name
1948 121 : int table_col_index (cloudsync_table_context *table, const char *col_name) {
1949 121 : if (!table || !col_name) return -1;
1950 125 : for (int i = 0; i < table->ncols; i++) {
1951 125 : if (strcasecmp(table->col_name[i], col_name) == 0) return i;
1952 4 : }
1953 0 : return -1;
1954 121 : }
1955 :
1956 46042 : int merge_insert (cloudsync_context *data, cloudsync_table_context *table, const char *insert_pk, int insert_pk_len, int64_t insert_cl, const char *insert_name, dbvalue_t *insert_value, int64_t insert_col_version, int64_t insert_db_version, const char *insert_site_id, int insert_site_id_len, int64_t insert_seq, int64_t *rowid) {
1957 : // Handle DWS and AWS algorithms here
1958 : // Delete-Wins Set (DWS): table_algo_crdt_dws
1959 : // Add-Wins Set (AWS): table_algo_crdt_aws
1960 :
1961 : // Causal-Length Set (CLS) Algorithm (default)
1962 :
1963 : // compute the local causal length for the row based on the primary key
1964 : // the causal length is used to determine the order of operations and resolve conflicts.
1965 46042 : int64_t local_cl = merge_get_local_cl(table, insert_pk, insert_pk_len);
1966 46042 : if (local_cl < 0) return cloudsync_set_error(data, "Unable to compute local causal length", DBRES_ERROR);
1967 :
1968 : // if the incoming causal length is older than the local causal length, we can safely ignore it
1969 : // because the local changes are more recent
1970 46042 : if (insert_cl < local_cl) return DBRES_OK;
1971 :
1972 : // check if the operation is a delete by examining the causal length
1973 : // even causal lengths typically signify delete operations
1974 45814 : bool is_delete = (insert_cl % 2 == 0);
1975 45814 : if (is_delete) {
1976 : // if it's a delete, check if the local state is at the same causal length
1977 : // if it is, no further action is needed
1978 621 : if (local_cl == insert_cl) return DBRES_OK;
1979 :
1980 : // perform a delete merge if the causal length is newer than the local one
1981 340 : int rc = merge_delete(data, table, insert_pk, insert_pk_len, insert_name, insert_col_version,
1982 170 : insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid);
1983 170 : if (rc != DBRES_OK) cloudsync_set_error(data, "Unable to perform merge_delete", rc);
1984 170 : return rc;
1985 : }
1986 :
1987 : // if the operation is a sentinel-only insert (indicating a new row or resurrected row with no column update), handle it separately.
1988 45193 : bool is_sentinel_only = (strcmp(insert_name, CLOUDSYNC_TOMBSTONE_VALUE) == 0);
1989 45193 : if (is_sentinel_only) {
1990 188 : if (local_cl == insert_cl) return DBRES_OK;
1991 :
1992 : // perform a sentinel-only insert to track the existence of the row
1993 124 : int rc = merge_sentinel_only_insert(data, table, insert_pk, insert_pk_len, insert_col_version,
1994 62 : insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid);
1995 62 : if (rc != DBRES_OK) cloudsync_set_error(data, "Unable to perform merge_sentinel_only_insert", rc);
1996 62 : return rc;
1997 : }
1998 :
1999 : // from this point I can be sure that insert_name is not sentinel
2000 :
2001 : // handle the case where a row is being resurrected (e.g., after a delete, a new insert for the same row)
2002 : // odd causal lengths can "resurrect" rows
2003 45005 : bool needs_resurrect = (insert_cl > local_cl && insert_cl % 2 == 1);
2004 45005 : bool row_exists_locally = local_cl != 0;
2005 :
2006 : // if a resurrection is needed, insert a sentinel to mark the row as alive
2007 : // this handles out-of-order deliveries where the row was deleted and is now being re-inserted
2008 45005 : if (needs_resurrect && (row_exists_locally || (!row_exists_locally && insert_cl > 1))) {
2009 0 : int rc = merge_sentinel_only_insert(data, table, insert_pk, insert_pk_len, insert_cl,
2010 0 : insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid);
2011 0 : if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to perform merge_sentinel_only_insert", rc);
2012 0 : }
2013 :
2014 : // at this point, we determine whether the incoming change wins based on causal length
2015 : // this can be due to a resurrection, a non-existent local row, or a conflict resolution
2016 45005 : bool flag = false;
2017 45005 : int rc = merge_did_cid_win(data, table, insert_pk, insert_pk_len, insert_value, insert_site_id, insert_site_id_len, insert_name, insert_col_version, &flag);
2018 45005 : if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to perform merge_did_cid_win", rc);
2019 :
2020 : // check if the incoming change wins and should be applied
2021 45005 : bool does_cid_win = ((needs_resurrect) || (!row_exists_locally) || (flag));
2022 45005 : if (!does_cid_win) return DBRES_OK;
2023 :
2024 : // Block-level LWW: if the incoming col_name is a block entry (contains \x1F),
2025 : // bypass the normal base-table write and instead store the value in the blocks table.
2026 : // The base table column will be materialized from all alive blocks.
2027 21747 : if (block_is_block_colname(insert_name) && table->has_block_cols) {
2028 : // Store or delete block value in blocks table depending on tombstone status
2029 412 : if (insert_col_version % 2 == 0) {
2030 : // Tombstone: remove from blocks table
2031 33 : rc = block_delete_value(data, table, insert_pk, insert_pk_len, insert_name);
2032 33 : } else {
2033 379 : rc = block_store_value(data, table, insert_pk, insert_pk_len, insert_name, insert_value);
2034 : }
2035 412 : if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to store/delete block value", rc);
2036 :
2037 : // Set winner clock in metadata
2038 824 : rc = merge_set_winner_clock(data, table, insert_pk, insert_pk_len, insert_name,
2039 412 : insert_col_version, insert_db_version,
2040 412 : insert_site_id, insert_site_id_len, insert_seq, rowid);
2041 412 : if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to set winner clock for block", rc);
2042 :
2043 : // Materialize the full column from blocks into the base table
2044 412 : char *base_col = block_extract_base_colname(insert_name);
2045 412 : if (base_col) {
2046 412 : rc = block_materialize_column(data, table, insert_pk, insert_pk_len, base_col);
2047 412 : cloudsync_memory_free(base_col);
2048 412 : if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to materialize block column", rc);
2049 412 : }
2050 :
2051 412 : return DBRES_OK;
2052 : }
2053 :
2054 : // perform the final column insert or update if the incoming change wins
2055 21335 : if (data->pending_batch) {
2056 : // Propagate row_exists_locally to the batch on the first winning column.
2057 : // This lets merge_flush_pending choose UPDATE vs INSERT ON CONFLICT,
2058 : // which matters when RLS policies reference columns not in the payload.
2059 21318 : if (data->pending_batch->table == NULL) {
2060 7685 : data->pending_batch->row_exists = row_exists_locally;
2061 7685 : }
2062 21318 : rc = merge_pending_add(data, table, insert_pk, insert_pk_len, insert_name, insert_value, insert_col_version, insert_db_version, insert_site_id, insert_site_id_len, insert_seq);
2063 21318 : if (rc != DBRES_OK) cloudsync_set_error(data, "Unable to perform merge_pending_add", rc);
2064 21318 : } else {
2065 17 : rc = merge_insert_col(data, table, insert_pk, insert_pk_len, insert_name, insert_value, insert_col_version, insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid);
2066 17 : if (rc != DBRES_OK) cloudsync_set_error(data, "Unable to perform merge_insert_col", rc);
2067 : }
2068 :
2069 21335 : return rc;
2070 46042 : }
2071 :
2072 : // MARK: - Block column setup -
2073 :
2074 69 : int cloudsync_setup_block_column (cloudsync_context *data, const char *table_name, const char *col_name, const char *delimiter) {
2075 69 : cloudsync_table_context *table = table_lookup(data, table_name);
2076 69 : if (!table) return cloudsync_set_error(data, "cloudsync_setup_block_column: table not found", DBRES_ERROR);
2077 :
2078 : // Find column index
2079 69 : int col_idx = table_col_index(table, col_name);
2080 69 : if (col_idx < 0) {
2081 : char buf[1024];
2082 0 : snprintf(buf, sizeof(buf), "cloudsync_setup_block_column: column '%s' not found in table '%s'", col_name, table_name);
2083 0 : return cloudsync_set_error(data, buf, DBRES_ERROR);
2084 : }
2085 :
2086 : // Set column algo
2087 69 : table->col_algo[col_idx] = col_algo_block;
2088 69 : table->has_block_cols = true;
2089 :
2090 : // Set delimiter (can be NULL for default)
2091 69 : if (table->col_delimiter[col_idx]) {
2092 0 : cloudsync_memory_free(table->col_delimiter[col_idx]);
2093 0 : table->col_delimiter[col_idx] = NULL;
2094 0 : }
2095 69 : if (delimiter) {
2096 0 : table->col_delimiter[col_idx] = cloudsync_string_dup(delimiter);
2097 0 : }
2098 :
2099 : // Create blocks table if not already done
2100 69 : if (!table->blocks_ref) {
2101 67 : table->blocks_ref = database_build_blocks_ref(table->schema, table->name);
2102 67 : if (!table->blocks_ref) return DBRES_NOMEM;
2103 :
2104 : // CREATE TABLE IF NOT EXISTS
2105 67 : char *sql = cloudsync_memory_mprintf(SQL_BLOCKS_CREATE_TABLE, table->blocks_ref);
2106 67 : if (!sql) return DBRES_NOMEM;
2107 :
2108 67 : int rc = database_exec(data, sql);
2109 67 : cloudsync_memory_free(sql);
2110 67 : if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to create blocks table", rc);
2111 :
2112 : // Prepare block statements
2113 : // Write: upsert into blocks (pk, col_name, col_value)
2114 67 : sql = cloudsync_memory_mprintf(SQL_BLOCKS_UPSERT, table->blocks_ref);
2115 67 : if (!sql) return DBRES_NOMEM;
2116 67 : rc = databasevm_prepare(data, sql, (void **)&table->block_value_write_stmt, DBFLAG_PERSISTENT);
2117 67 : cloudsync_memory_free(sql);
2118 67 : if (rc != DBRES_OK) return rc;
2119 :
2120 : // Read: SELECT col_value FROM blocks WHERE pk = ? AND col_name = ?
2121 67 : sql = cloudsync_memory_mprintf(SQL_BLOCKS_SELECT, table->blocks_ref);
2122 67 : if (!sql) return DBRES_NOMEM;
2123 67 : rc = databasevm_prepare(data, sql, (void **)&table->block_value_read_stmt, DBFLAG_PERSISTENT);
2124 67 : cloudsync_memory_free(sql);
2125 67 : if (rc != DBRES_OK) return rc;
2126 :
2127 : // Delete: DELETE FROM blocks WHERE pk = ? AND col_name = ?
2128 67 : sql = cloudsync_memory_mprintf(SQL_BLOCKS_DELETE, table->blocks_ref);
2129 67 : if (!sql) return DBRES_NOMEM;
2130 67 : rc = databasevm_prepare(data, sql, (void **)&table->block_value_delete_stmt, DBFLAG_PERSISTENT);
2131 67 : cloudsync_memory_free(sql);
2132 67 : if (rc != DBRES_OK) return rc;
2133 :
2134 : // List alive blocks for materialization
2135 67 : sql = cloudsync_memory_mprintf(SQL_BLOCKS_LIST_ALIVE, table->blocks_ref, table->meta_ref);
2136 67 : if (!sql) return DBRES_NOMEM;
2137 67 : rc = databasevm_prepare(data, sql, (void **)&table->block_list_stmt, DBFLAG_PERSISTENT);
2138 67 : cloudsync_memory_free(sql);
2139 67 : if (rc != DBRES_OK) return rc;
2140 67 : }
2141 :
2142 : // Persist settings
2143 69 : int rc = dbutils_table_settings_set_key_value(data, table_name, col_name, "algo", "block");
2144 69 : if (rc != DBRES_OK) return rc;
2145 :
2146 69 : if (delimiter) {
2147 0 : rc = dbutils_table_settings_set_key_value(data, table_name, col_name, "delimiter", delimiter);
2148 0 : if (rc != DBRES_OK) return rc;
2149 0 : }
2150 :
2151 69 : return DBRES_OK;
2152 69 : }
2153 :
2154 : // MARK: - Private -
2155 :
2156 210 : bool cloudsync_config_exists (cloudsync_context *data) {
2157 210 : return database_internal_table_exists(data, CLOUDSYNC_SITEID_NAME) == true;
2158 : }
2159 :
2160 224 : cloudsync_context *cloudsync_context_create (void *db) {
2161 224 : cloudsync_context *data = (cloudsync_context *)cloudsync_memory_zeroalloc((uint64_t)(sizeof(cloudsync_context)));
2162 224 : if (!data) return NULL;
2163 : DEBUG_SETTINGS("cloudsync_context_create %p", data);
2164 :
2165 224 : data->libversion = CLOUDSYNC_VERSION;
2166 224 : data->pending_db_version = CLOUDSYNC_VALUE_NOTSET;
2167 : #if CLOUDSYNC_DEBUG
2168 : data->debug = 1;
2169 : #endif
2170 :
2171 : // allocate space for 64 tables (it can grow if needed)
2172 224 : uint64_t mem_needed = (uint64_t)(CLOUDSYNC_INIT_NTABLES * sizeof(cloudsync_table_context *));
2173 224 : data->tables = (cloudsync_table_context **)cloudsync_memory_zeroalloc(mem_needed);
2174 224 : if (!data->tables) {cloudsync_memory_free(data); return NULL;}
2175 :
2176 224 : data->tables_cap = CLOUDSYNC_INIT_NTABLES;
2177 224 : data->tables_count = 0;
2178 224 : data->db = db;
2179 :
2180 : // SQLite exposes col_value as ANY, but other databases require a concrete type.
2181 : // In PostgreSQL we expose col_value as bytea, which holds the pk-encoded value bytes (type + data).
2182 : // Because col_value is already encoded, we skip decoding this field and pass it through as bytea.
2183 : // It is decoded to the target column type just before applying changes to the base table.
2184 224 : data->skip_decode_idx = (db == NULL) ? CLOUDSYNC_PK_INDEX_COLVALUE : -1;
2185 :
2186 224 : return data;
2187 224 : }
2188 :
2189 224 : void cloudsync_context_free (void *ctx) {
2190 224 : cloudsync_context *data = (cloudsync_context *)ctx;
2191 : DEBUG_SETTINGS("cloudsync_context_free %p", data);
2192 224 : if (!data) return;
2193 :
2194 : // free all table contexts and prepared statements
2195 224 : cloudsync_terminate(data);
2196 :
2197 224 : cloudsync_memory_free(data->tables);
2198 224 : cloudsync_memory_free(data);
2199 224 : }
2200 :
2201 316 : const char *cloudsync_context_init (cloudsync_context *data) {
2202 316 : if (!data) return NULL;
2203 :
2204 : // perform init just the first time, if the site_id field is not set.
2205 : // The data->site_id value could exists while settings tables don't exists if the
2206 : // cloudsync_context_init was previously called in init transaction that was rolled back
2207 : // because of an error during the init process.
2208 316 : if (data->site_id[0] == 0 || !database_internal_table_exists(data, CLOUDSYNC_SITEID_NAME)) {
2209 205 : if (dbutils_settings_init(data) != DBRES_OK) return NULL;
2210 205 : if (cloudsync_add_dbvms(data) != DBRES_OK) return NULL;
2211 205 : if (cloudsync_load_siteid(data) != DBRES_OK) return NULL;
2212 205 : data->schema_hash = database_schema_hash(data);
2213 205 : }
2214 :
2215 316 : return (const char *)data->site_id;
2216 316 : }
2217 :
2218 1303 : void cloudsync_sync_key (cloudsync_context *data, const char *key, const char *value) {
2219 : DEBUG_SETTINGS("cloudsync_sync_key key: %s value: %s", key, value);
2220 :
2221 : // sync data
2222 1303 : if (strcmp(key, CLOUDSYNC_KEY_SCHEMAVERSION) == 0) {
2223 205 : data->schema_version = (int)strtol(value, NULL, 0);
2224 205 : return;
2225 : }
2226 :
2227 1098 : if (strcmp(key, CLOUDSYNC_KEY_DEBUG) == 0) {
2228 0 : data->debug = 0;
2229 0 : if (value && (value[0] != 0) && (value[0] != '0')) data->debug = 1;
2230 0 : return;
2231 : }
2232 :
2233 1098 : if (strcmp(key, CLOUDSYNC_KEY_SCHEMA) == 0) {
2234 0 : cloudsync_set_schema(data, value);
2235 0 : return;
2236 : }
2237 1303 : }
2238 :
2239 : #if 0
2240 : void cloudsync_sync_table_key(cloudsync_context *data, const char *table, const char *column, const char *key, const char *value) {
2241 : DEBUG_SETTINGS("cloudsync_sync_table_key table: %s column: %s key: %s value: %s", table, column, key, value);
2242 : // Unused in this version
2243 : return;
2244 : }
2245 : #endif
2246 :
2247 7847 : int cloudsync_commit_hook (void *ctx) {
2248 7847 : cloudsync_context *data = (cloudsync_context *)ctx;
2249 :
2250 7847 : data->db_version = data->pending_db_version;
2251 7847 : data->pending_db_version = CLOUDSYNC_VALUE_NOTSET;
2252 7847 : data->seq = 0;
2253 :
2254 7847 : return DBRES_OK;
2255 : }
2256 :
2257 3 : void cloudsync_rollback_hook (void *ctx) {
2258 3 : cloudsync_context *data = (cloudsync_context *)ctx;
2259 :
2260 3 : data->pending_db_version = CLOUDSYNC_VALUE_NOTSET;
2261 3 : data->seq = 0;
2262 3 : }
2263 :
2264 24 : int cloudsync_begin_alter (cloudsync_context *data, const char *table_name) {
2265 : // init cloudsync_settings
2266 24 : if (cloudsync_context_init(data) == NULL) {
2267 0 : return DBRES_MISUSE;
2268 : }
2269 :
2270 : // lookup table
2271 24 : cloudsync_table_context *table = table_lookup(data, table_name);
2272 24 : if (!table) {
2273 : char buffer[1024];
2274 1 : snprintf(buffer, sizeof(buffer), "Unable to find table %s", table_name);
2275 1 : return cloudsync_set_error(data, buffer, DBRES_MISUSE);
2276 : }
2277 :
2278 : // idempotent: if already altering, return OK
2279 23 : if (table->is_altering) return DBRES_OK;
2280 :
2281 : // retrieve primary key(s)
2282 23 : char **names = NULL;
2283 23 : int nrows = 0;
2284 23 : int rc = database_pk_names(data, table_name, &names, &nrows);
2285 23 : if (rc != DBRES_OK) {
2286 : char buffer[1024];
2287 0 : snprintf(buffer, sizeof(buffer), "Unable to get primary keys for table %s", table_name);
2288 0 : cloudsync_set_error(data, buffer, DBRES_MISUSE);
2289 0 : goto rollback_begin_alter;
2290 : }
2291 :
2292 : // sanity check the number of primary keys
2293 23 : if (nrows != table_count_pks(table)) {
2294 : char buffer[1024];
2295 0 : snprintf(buffer, sizeof(buffer), "Number of primary keys for table %s changed before ALTER", table_name);
2296 0 : cloudsync_set_error(data, buffer, DBRES_MISUSE);
2297 0 : goto rollback_begin_alter;
2298 : }
2299 :
2300 : // drop original triggers
2301 23 : rc = database_delete_triggers(data, table_name);
2302 23 : if (rc != DBRES_OK) {
2303 : char buffer[1024];
2304 0 : snprintf(buffer, sizeof(buffer), "Unable to delete triggers for table %s in cloudsync_begin_alter.", table_name);
2305 0 : cloudsync_set_error(data, buffer, DBRES_ERROR);
2306 0 : goto rollback_begin_alter;
2307 : }
2308 :
2309 23 : table_set_pknames(table, names);
2310 23 : table->is_altering = true;
2311 23 : return DBRES_OK;
2312 :
2313 : rollback_begin_alter:
2314 0 : if (names) table_pknames_free(names, nrows);
2315 0 : return rc;
2316 24 : }
2317 :
2318 23 : int cloudsync_finalize_alter (cloudsync_context *data, cloudsync_table_context *table) {
2319 : // check if dbversion needed to be updated
2320 23 : cloudsync_dbversion_check_uptodate(data);
2321 :
2322 : // if primary-key columns change, all row identities change.
2323 : // In that case, the clock table must be dropped, recreated,
2324 : // and backfilled. We detect this by comparing the unique index
2325 : // in the lookaside table with the source table's PKs.
2326 :
2327 : // retrieve primary keys (to check is they changed)
2328 23 : char **result = NULL;
2329 23 : int nrows = 0;
2330 23 : int rc = database_pk_names (data, table->name, &result, &nrows);
2331 23 : if (rc != DBRES_OK || nrows == 0) {
2332 0 : if (nrows == 0) rc = DBRES_MISUSE;
2333 0 : goto finalize;
2334 : }
2335 :
2336 : // check if there are differences
2337 23 : bool pk_diff = (nrows != table->npks);
2338 23 : if (!pk_diff) {
2339 45 : for (int i = 0; i < nrows; ++i) {
2340 34 : if (strcmp(table->pk_name[i], result[i]) != 0) {
2341 6 : pk_diff = true;
2342 6 : break;
2343 : }
2344 28 : }
2345 17 : }
2346 :
2347 23 : if (pk_diff) {
2348 : // drop meta-table, it will be recreated
2349 12 : char *sql = cloudsync_memory_mprintf(SQL_DROP_CLOUDSYNC_TABLE, table->meta_ref);
2350 12 : rc = database_exec(data, sql);
2351 12 : cloudsync_memory_free(sql);
2352 12 : if (rc != DBRES_OK) {
2353 0 : DEBUG_DBERROR(rc, "cloudsync_finalize_alter", data);
2354 0 : goto finalize;
2355 : }
2356 12 : } else {
2357 : // compact meta-table
2358 : // delete entries for removed columns
2359 11 : const char *schema = table->schema ? table->schema : "";
2360 11 : char *sql = sql_build_delete_cols_not_in_schema_query(schema, table->name, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
2361 11 : rc = database_exec(data, sql);
2362 11 : cloudsync_memory_free(sql);
2363 11 : if (rc != DBRES_OK) {
2364 0 : DEBUG_DBERROR(rc, "cloudsync_finalize_alter", data);
2365 0 : goto finalize;
2366 : }
2367 :
2368 11 : sql = sql_build_pk_qualified_collist_query(schema, table->name);
2369 11 : if (!sql) {rc = DBRES_NOMEM; goto finalize;}
2370 :
2371 11 : char *pkclause = NULL;
2372 11 : rc = database_select_text(data, sql, &pkclause);
2373 11 : cloudsync_memory_free(sql);
2374 11 : if (rc != DBRES_OK) goto finalize;
2375 11 : char *pkvalues = (pkclause) ? pkclause : "rowid";
2376 :
2377 : // delete entries related to rows that no longer exist in the original table, but preserve tombstone
2378 11 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_GC_DELETE_ORPHANED_PK, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE, CLOUDSYNC_TOMBSTONE_VALUE, table->base_ref, table->meta_ref, pkvalues);
2379 11 : rc = database_exec(data, sql);
2380 11 : if (pkclause) cloudsync_memory_free(pkclause);
2381 11 : cloudsync_memory_free(sql);
2382 11 : if (rc != DBRES_OK) {
2383 0 : DEBUG_DBERROR(rc, "cloudsync_finalize_alter", data);
2384 0 : goto finalize;
2385 : }
2386 :
2387 : }
2388 :
2389 : // update key to be later used in cloudsync_dbversion_rebuild
2390 : char buf[256];
2391 23 : snprintf(buf, sizeof(buf), "%" PRId64, data->db_version);
2392 23 : dbutils_settings_set_key_value(data, "pre_alter_dbversion", buf);
2393 :
2394 : finalize:
2395 23 : table_pknames_free(result, nrows);
2396 23 : return rc;
2397 : }
2398 :
2399 24 : int cloudsync_commit_alter (cloudsync_context *data, const char *table_name) {
2400 24 : int rc = DBRES_MISUSE;
2401 24 : cloudsync_table_context *table = NULL;
2402 :
2403 : // init cloudsync_settings
2404 24 : if (cloudsync_context_init(data) == NULL) {
2405 0 : cloudsync_set_error(data, "Unable to initialize cloudsync context", DBRES_MISUSE);
2406 0 : goto rollback_finalize_alter;
2407 : }
2408 :
2409 : // lookup table
2410 24 : table = table_lookup(data, table_name);
2411 24 : if (!table) {
2412 : char buffer[1024];
2413 1 : snprintf(buffer, sizeof(buffer), "Unable to find table %s", table_name);
2414 1 : cloudsync_set_error(data, buffer, DBRES_MISUSE);
2415 1 : goto rollback_finalize_alter;
2416 : }
2417 :
2418 : // idempotent: if not altering, return OK
2419 23 : if (!table->is_altering) return DBRES_OK;
2420 :
2421 23 : rc = cloudsync_finalize_alter(data, table);
2422 23 : if (rc != DBRES_OK) goto rollback_finalize_alter;
2423 :
2424 : // the table is outdated, delete it and it will be reloaded in the cloudsync_init_internal
2425 : // is_altering is reset implicitly because table_free + cloudsync_init_table
2426 : // will reallocate the table context with zero-initialized memory
2427 23 : table_remove(data, table);
2428 23 : table_free(table);
2429 23 : table = NULL;
2430 :
2431 : // init again cloudsync for the table
2432 23 : table_algo algo_current = dbutils_table_settings_get_algo(data, table_name);
2433 23 : if (algo_current == table_algo_none) algo_current = dbutils_table_settings_get_algo(data, "*");
2434 23 : rc = cloudsync_init_table(data, table_name, cloudsync_algo_name(algo_current), true);
2435 23 : if (rc != DBRES_OK) goto rollback_finalize_alter;
2436 :
2437 23 : return DBRES_OK;
2438 :
2439 : rollback_finalize_alter:
2440 1 : if (table) {
2441 0 : table_set_pknames(table, NULL);
2442 0 : table->is_altering = false;
2443 0 : }
2444 1 : return rc;
2445 24 : }
2446 :
2447 : // MARK: - Filter Rewrite -
2448 :
2449 : // Replace bare column names in a filter expression with prefix-qualified names.
2450 : // E.g., filter="user_id = 42", prefix="NEW", columns=["user_id","id"] → "NEW.\"user_id\" = 42"
2451 : // Columns must be sorted by length descending by the caller to avoid partial matches.
2452 : // Skips content inside single-quoted string literals.
2453 : // Returns a newly allocated string (caller must free with cloudsync_memory_free), or NULL on error.
2454 : // Helper: check if an identifier token matches a column name.
2455 8 : static bool filter_is_column (const char *token, size_t token_len, char **columns, int ncols) {
2456 28 : for (int i = 0; i < ncols; ++i) {
2457 24 : if (strlen(columns[i]) == token_len && strncmp(token, columns[i], token_len) == 0)
2458 4 : return true;
2459 20 : }
2460 4 : return false;
2461 8 : }
2462 :
2463 : // Helper: check if character is part of a SQL identifier.
2464 56 : static bool filter_is_ident_char (char c) {
2465 92 : return (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') ||
2466 36 : (c >= '0' && c <= '9') || c == '_';
2467 : }
2468 :
2469 4 : char *cloudsync_filter_add_row_prefix (const char *filter, const char *prefix, char **columns, int ncols) {
2470 4 : if (!filter || !prefix || !columns || ncols <= 0) return NULL;
2471 :
2472 4 : size_t filter_len = strlen(filter);
2473 4 : size_t prefix_len = strlen(prefix);
2474 :
2475 : // Each identifier match grows by at most (prefix_len + 3) bytes.
2476 : // Worst case: the entire filter is one repeated column reference separated by
2477 : // single characters, so up to (filter_len / 2) matches. Use a safe upper bound.
2478 4 : size_t max_growth = (filter_len / 2 + 1) * (prefix_len + 3);
2479 4 : size_t cap = filter_len + max_growth + 64;
2480 4 : char *result = (char *)cloudsync_memory_alloc(cap);
2481 4 : if (!result) return NULL;
2482 4 : size_t out = 0;
2483 :
2484 : // Single pass: tokenize into identifiers, quoted strings, and everything else.
2485 4 : size_t i = 0;
2486 24 : while (i < filter_len) {
2487 : // Skip single-quoted string literals verbatim (handle '' escape)
2488 20 : if (filter[i] == '\'') {
2489 0 : result[out++] = filter[i++];
2490 0 : while (i < filter_len) {
2491 0 : if (filter[i] == '\'') {
2492 0 : result[out++] = filter[i++];
2493 : // '' is an escaped quote — keep going
2494 0 : if (i < filter_len && filter[i] == '\'') {
2495 0 : result[out++] = filter[i++];
2496 0 : continue;
2497 : }
2498 0 : break; // single ' ends the literal
2499 : }
2500 0 : result[out++] = filter[i++];
2501 : }
2502 0 : continue;
2503 : }
2504 :
2505 : // Extract identifier token
2506 20 : if (filter_is_ident_char(filter[i])) {
2507 8 : size_t start = i;
2508 40 : while (i < filter_len && filter_is_ident_char(filter[i])) ++i;
2509 8 : size_t token_len = i - start;
2510 :
2511 8 : if (filter_is_column(&filter[start], token_len, columns, ncols)) {
2512 : // Emit PREFIX."column_name"
2513 4 : memcpy(&result[out], prefix, prefix_len); out += prefix_len;
2514 4 : result[out++] = '.';
2515 4 : result[out++] = '"';
2516 4 : memcpy(&result[out], &filter[start], token_len); out += token_len;
2517 4 : result[out++] = '"';
2518 4 : } else {
2519 : // Not a column — copy as-is
2520 4 : memcpy(&result[out], &filter[start], token_len); out += token_len;
2521 : }
2522 8 : continue;
2523 : }
2524 :
2525 : // Any other character — copy as-is
2526 12 : result[out++] = filter[i++];
2527 : }
2528 :
2529 4 : result[out] = '\0';
2530 4 : return result;
2531 4 : }
2532 :
2533 261 : int cloudsync_refill_metatable (cloudsync_context *data, const char *table_name) {
2534 261 : cloudsync_table_context *table = table_lookup(data, table_name);
2535 261 : if (!table) return DBRES_ERROR;
2536 :
2537 261 : dbvm_t *vm = NULL;
2538 261 : int64_t db_version = cloudsync_dbversion_next(data, CLOUDSYNC_VALUE_NOTSET);
2539 :
2540 : // Read row-level filter from settings (if any)
2541 : char filter_buf[2048];
2542 261 : int frc = dbutils_table_settings_get_value(data, table_name, "*", "filter", filter_buf, sizeof(filter_buf));
2543 261 : const char *filter = (frc == DBRES_OK && filter_buf[0]) ? filter_buf : NULL;
2544 :
2545 261 : const char *schema = table->schema ? table->schema : "";
2546 261 : char *sql = sql_build_pk_collist_query(schema, table_name);
2547 261 : char *pkclause_identifiers = NULL;
2548 261 : int rc = database_select_text(data, sql, &pkclause_identifiers);
2549 261 : cloudsync_memory_free(sql);
2550 261 : if (rc != DBRES_OK) goto finalize;
2551 261 : char *pkvalues_identifiers = (pkclause_identifiers) ? pkclause_identifiers : "rowid";
2552 :
2553 : // Use database-specific query builder to handle type differences in composite PKs
2554 261 : sql = sql_build_insert_missing_pks_query(schema, table_name, pkvalues_identifiers, table->base_ref, table->meta_ref, filter);
2555 261 : if (!sql) {rc = DBRES_NOMEM; goto finalize;}
2556 261 : rc = database_exec(data, sql);
2557 261 : cloudsync_memory_free(sql);
2558 261 : if (rc != DBRES_OK) goto finalize;
2559 :
2560 : // fill missing colums
2561 : // for each non-pk column:
2562 : // The new query does 1 encode per source row and one indexed NOT-EXISTS probe.
2563 : // The old plan does many decodes per candidate and can't use an index to rule out matches quickly—so it burns CPU and I/O.
2564 :
2565 261 : if (filter) {
2566 0 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_PKS_NOT_IN_SYNC_FOR_COL_FILTERED, pkvalues_identifiers, table->base_ref, filter, table->meta_ref);
2567 0 : } else {
2568 261 : sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_PKS_NOT_IN_SYNC_FOR_COL, pkvalues_identifiers, table->base_ref, table->meta_ref);
2569 : }
2570 261 : rc = databasevm_prepare(data, sql, (void **)&vm, DBFLAG_PERSISTENT);
2571 261 : cloudsync_memory_free(sql);
2572 261 : if (rc != DBRES_OK) goto finalize;
2573 :
2574 1310 : for (int i=0; i<table->ncols; ++i) {
2575 1049 : char *col_name = table->col_name[i];
2576 :
2577 1049 : rc = databasevm_bind_text(vm, 1, col_name, -1);
2578 1049 : if (rc != DBRES_OK) goto finalize;
2579 :
2580 1087 : while (1) {
2581 1087 : rc = databasevm_step(vm);
2582 1087 : if (rc == DBRES_ROW) {
2583 38 : size_t pklen = 0;
2584 38 : const void *pk = (const char *)database_column_blob(vm, 0, &pklen);
2585 38 : if (!pk) { rc = DBRES_ERROR; break; }
2586 38 : rc = local_mark_insert_or_update_meta(table, pk, pklen, col_name, db_version, cloudsync_bumpseq(data));
2587 1087 : } else if (rc == DBRES_DONE) {
2588 1049 : rc = DBRES_OK;
2589 1049 : break;
2590 : } else {
2591 0 : break;
2592 : }
2593 : }
2594 1049 : if (rc != DBRES_OK) goto finalize;
2595 :
2596 1049 : databasevm_reset(vm);
2597 1310 : }
2598 :
2599 : finalize:
2600 261 : if (rc != DBRES_OK) {DEBUG_ALWAYS("cloudsync_refill_metatable error: %s", database_errmsg(data));}
2601 261 : if (pkclause_identifiers) cloudsync_memory_free(pkclause_identifiers);
2602 261 : if (vm) databasevm_finalize(vm);
2603 261 : return rc;
2604 261 : }
2605 :
2606 : // MARK: - Local -
2607 :
2608 4 : int local_update_sentinel (cloudsync_table_context *table, const void *pk, size_t pklen, int64_t db_version, int seq) {
2609 4 : dbvm_t *vm = table->meta_sentinel_update_stmt;
2610 4 : if (!vm) return -1;
2611 :
2612 4 : int rc = databasevm_bind_int(vm, 1, db_version);
2613 4 : if (rc != DBRES_OK) goto cleanup;
2614 :
2615 4 : rc = databasevm_bind_int(vm, 2, seq);
2616 4 : if (rc != DBRES_OK) goto cleanup;
2617 :
2618 4 : rc = databasevm_bind_blob(vm, 3, pk, (int)pklen);
2619 4 : if (rc != DBRES_OK) goto cleanup;
2620 :
2621 4 : rc = databasevm_step(vm);
2622 4 : if (rc == DBRES_DONE) rc = DBRES_OK;
2623 :
2624 : cleanup:
2625 4 : DEBUG_DBERROR(rc, "local_update_sentinel", table->context);
2626 4 : databasevm_reset(vm);
2627 4 : return rc;
2628 4 : }
2629 :
2630 128 : int local_mark_insert_sentinel_meta (cloudsync_table_context *table, const void *pk, size_t pklen, int64_t db_version, int seq) {
2631 128 : dbvm_t *vm = table->meta_sentinel_insert_stmt;
2632 128 : if (!vm) return -1;
2633 :
2634 128 : int rc = databasevm_bind_blob(vm, 1, pk, (int)pklen);
2635 128 : if (rc != DBRES_OK) goto cleanup;
2636 :
2637 128 : rc = databasevm_bind_int(vm, 2, db_version);
2638 128 : if (rc != DBRES_OK) goto cleanup;
2639 :
2640 128 : rc = databasevm_bind_int(vm, 3, seq);
2641 128 : if (rc != DBRES_OK) goto cleanup;
2642 :
2643 128 : rc = databasevm_bind_int(vm, 4, db_version);
2644 128 : if (rc != DBRES_OK) goto cleanup;
2645 :
2646 128 : rc = databasevm_bind_int(vm, 5, seq);
2647 128 : if (rc != DBRES_OK) goto cleanup;
2648 :
2649 128 : rc = databasevm_step(vm);
2650 128 : if (rc == DBRES_DONE) rc = DBRES_OK;
2651 :
2652 : cleanup:
2653 128 : DEBUG_DBERROR(rc, "local_insert_sentinel", table->context);
2654 128 : databasevm_reset(vm);
2655 128 : return rc;
2656 128 : }
2657 :
2658 11781 : int local_mark_insert_or_update_meta_impl (cloudsync_table_context *table, const void *pk, size_t pklen, const char *col_name, int col_version, int64_t db_version, int seq) {
2659 :
2660 11781 : dbvm_t *vm = table->meta_row_insert_update_stmt;
2661 11781 : if (!vm) return -1;
2662 :
2663 11781 : int rc = databasevm_bind_blob(vm, 1, pk, pklen);
2664 11781 : if (rc != DBRES_OK) goto cleanup;
2665 :
2666 11781 : rc = databasevm_bind_text(vm, 2, (col_name) ? col_name : CLOUDSYNC_TOMBSTONE_VALUE, -1);
2667 11781 : if (rc != DBRES_OK) goto cleanup;
2668 :
2669 11781 : rc = databasevm_bind_int(vm, 3, col_version);
2670 11781 : if (rc != DBRES_OK) goto cleanup;
2671 :
2672 11781 : rc = databasevm_bind_int(vm, 4, db_version);
2673 11781 : if (rc != DBRES_OK) goto cleanup;
2674 :
2675 11781 : rc = databasevm_bind_int(vm, 5, seq);
2676 11781 : if (rc != DBRES_OK) goto cleanup;
2677 :
2678 11781 : rc = databasevm_bind_int(vm, 6, db_version);
2679 11781 : if (rc != DBRES_OK) goto cleanup;
2680 :
2681 11781 : rc = databasevm_bind_int(vm, 7, seq);
2682 11781 : if (rc != DBRES_OK) goto cleanup;
2683 :
2684 11781 : rc = databasevm_step(vm);
2685 11781 : if (rc == DBRES_DONE) rc = DBRES_OK;
2686 :
2687 : cleanup:
2688 11781 : DEBUG_DBERROR(rc, "local_insert_or_update", table->context);
2689 11781 : databasevm_reset(vm);
2690 11781 : return rc;
2691 11781 : }
2692 :
2693 11676 : int local_mark_insert_or_update_meta (cloudsync_table_context *table, const void *pk, size_t pklen, const char *col_name, int64_t db_version, int seq) {
2694 11676 : return local_mark_insert_or_update_meta_impl(table, pk, pklen, col_name, 1, db_version, seq);
2695 : }
2696 :
2697 39 : int local_mark_delete_block_meta (cloudsync_table_context *table, const void *pk, size_t pklen, const char *block_colname, int64_t db_version, int seq) {
2698 : // Mark a block as deleted by setting col_version = 2 (even = deleted)
2699 39 : return local_mark_insert_or_update_meta_impl(table, pk, pklen, block_colname, 2, db_version, seq);
2700 : }
2701 :
2702 39 : int block_delete_value_external (cloudsync_context *data, cloudsync_table_context *table, const void *pk, size_t pklen, const char *block_colname) {
2703 39 : return block_delete_value(data, table, pk, (int)pklen, block_colname);
2704 : }
2705 :
2706 66 : int local_mark_delete_meta (cloudsync_table_context *table, const void *pk, size_t pklen, int64_t db_version, int seq) {
2707 66 : return local_mark_insert_or_update_meta_impl(table, pk, pklen, NULL, 2, db_version, seq);
2708 : }
2709 :
2710 35 : int local_drop_meta (cloudsync_table_context *table, const void *pk, size_t pklen) {
2711 35 : dbvm_t *vm = table->meta_row_drop_stmt;
2712 35 : if (!vm) return -1;
2713 :
2714 35 : int rc = databasevm_bind_blob(vm, 1, pk, pklen);
2715 35 : if (rc != DBRES_OK) goto cleanup;
2716 :
2717 35 : rc = databasevm_step(vm);
2718 35 : if (rc == DBRES_DONE) rc = DBRES_OK;
2719 :
2720 : cleanup:
2721 35 : DEBUG_DBERROR(rc, "local_drop_meta", table->context);
2722 35 : databasevm_reset(vm);
2723 35 : return rc;
2724 35 : }
2725 :
2726 31 : int local_update_move_meta (cloudsync_table_context *table, const void *pk, size_t pklen, const void *pk2, size_t pklen2, int64_t db_version) {
2727 : /*
2728 : * This function moves non-sentinel metadata entries from an old primary key (OLD.pk)
2729 : * to a new primary key (NEW.pk) when a primary key change occurs.
2730 : *
2731 : * To ensure consistency and proper conflict resolution in a CRDT (Conflict-free Replicated Data Type) system,
2732 : * each non-sentinel metadata entry involved in the move must have a unique sequence value (seq).
2733 : *
2734 : * The `seq` is crucial for tracking the order of operations and for detecting and resolving conflicts
2735 : * during synchronization between replicas. Without a unique `seq` for each entry, concurrent updates
2736 : * may be applied incorrectly, leading to data inconsistency.
2737 : *
2738 : * When performing the update, a unique `seq` must be assigned to each metadata row. This can be achieved
2739 : * by either incrementing the maximum sequence value in the table or using a function (e.g., cloudsync_bumpseq(data))
2740 : * that generates a unique sequence for each row. The update query should ensure that each row moved
2741 : * from OLD.pk to NEW.pk gets a distinct `seq` to maintain proper versioning and ordering of changes.
2742 : */
2743 :
2744 : // see https://github.com/sqliteai/sqlite-sync/blob/main/docs/PriKey.md for more details
2745 : // pk2 is the old pk
2746 :
2747 31 : dbvm_t *vm = table->meta_update_move_stmt;
2748 31 : if (!vm) return -1;
2749 :
2750 : // new primary key
2751 31 : int rc = databasevm_bind_blob(vm, 1, pk, pklen);
2752 31 : if (rc != DBRES_OK) goto cleanup;
2753 :
2754 : // new db_version
2755 31 : rc = databasevm_bind_int(vm, 2, db_version);
2756 31 : if (rc != DBRES_OK) goto cleanup;
2757 :
2758 : // old primary key
2759 31 : rc = databasevm_bind_blob(vm, 3, pk2, pklen2);
2760 31 : if (rc != DBRES_OK) goto cleanup;
2761 :
2762 31 : rc = databasevm_step(vm);
2763 31 : if (rc == DBRES_DONE) rc = DBRES_OK;
2764 :
2765 : cleanup:
2766 31 : DEBUG_DBERROR(rc, "local_update_move_meta", table->context);
2767 31 : databasevm_reset(vm);
2768 31 : return rc;
2769 31 : }
2770 :
2771 : // MARK: - Payload Encode / Decode -
2772 :
2773 677 : static void cloudsync_payload_checksum_store (cloudsync_payload_header *header, uint64_t checksum) {
2774 677 : uint64_t h = checksum & 0xFFFFFFFFFFFFULL; // keep 48 bits
2775 677 : header->checksum[0] = (uint8_t)(h >> 40);
2776 677 : header->checksum[1] = (uint8_t)(h >> 32);
2777 677 : header->checksum[2] = (uint8_t)(h >> 24);
2778 677 : header->checksum[3] = (uint8_t)(h >> 16);
2779 677 : header->checksum[4] = (uint8_t)(h >> 8);
2780 677 : header->checksum[5] = (uint8_t)(h >> 0);
2781 677 : }
2782 :
2783 672 : static uint64_t cloudsync_payload_checksum_load (cloudsync_payload_header *header) {
2784 2016 : return ((uint64_t)header->checksum[0] << 40) |
2785 1344 : ((uint64_t)header->checksum[1] << 32) |
2786 1344 : ((uint64_t)header->checksum[2] << 24) |
2787 1344 : ((uint64_t)header->checksum[3] << 16) |
2788 1344 : ((uint64_t)header->checksum[4] << 8) |
2789 672 : ((uint64_t)header->checksum[5] << 0);
2790 : }
2791 :
2792 672 : static bool cloudsync_payload_checksum_verify (cloudsync_payload_header *header, uint64_t checksum) {
2793 672 : uint64_t checksum1 = cloudsync_payload_checksum_load(header);
2794 672 : uint64_t checksum2 = checksum & 0xFFFFFFFFFFFFULL;
2795 672 : return (checksum1 == checksum2);
2796 : }
2797 :
2798 49136 : static bool cloudsync_payload_encode_check (cloudsync_payload_context *payload, size_t needed) {
2799 49136 : if (payload->nrows == 0) needed += sizeof(cloudsync_payload_header);
2800 :
2801 : // alloc/resize buffer
2802 49136 : if (payload->bused + needed > payload->balloc) {
2803 686 : if (needed < CLOUDSYNC_PAYLOAD_MINBUF_SIZE) needed = CLOUDSYNC_PAYLOAD_MINBUF_SIZE;
2804 686 : size_t balloc = payload->balloc + needed;
2805 :
2806 686 : char *buffer = cloudsync_memory_realloc(payload->buffer, balloc);
2807 686 : if (!buffer) {
2808 0 : if (payload->buffer) cloudsync_memory_free(payload->buffer);
2809 0 : memset(payload, 0, sizeof(cloudsync_payload_context));
2810 0 : return false;
2811 : }
2812 :
2813 686 : payload->buffer = buffer;
2814 686 : payload->balloc = balloc;
2815 686 : if (payload->nrows == 0) payload->bused = sizeof(cloudsync_payload_header);
2816 686 : }
2817 :
2818 49136 : return true;
2819 49136 : }
2820 :
2821 50498 : size_t cloudsync_payload_context_size (size_t *header_size) {
2822 50498 : if (header_size) *header_size = sizeof(cloudsync_payload_header);
2823 50498 : return sizeof(cloudsync_payload_context);
2824 : }
2825 :
2826 677 : void cloudsync_payload_header_init (cloudsync_payload_header *header, uint32_t expanded_size, uint16_t ncols, uint32_t nrows, uint64_t hash) {
2827 677 : memset(header, 0, sizeof(cloudsync_payload_header));
2828 : assert(sizeof(cloudsync_payload_header)==32);
2829 :
2830 : int major, minor, patch;
2831 677 : sscanf(CLOUDSYNC_VERSION, "%d.%d.%d", &major, &minor, &patch);
2832 :
2833 677 : header->signature = htonl(CLOUDSYNC_PAYLOAD_SIGNATURE);
2834 677 : header->version = CLOUDSYNC_PAYLOAD_VERSION_2;
2835 677 : header->libversion[0] = (uint8_t)major;
2836 677 : header->libversion[1] = (uint8_t)minor;
2837 677 : header->libversion[2] = (uint8_t)patch;
2838 677 : header->expanded_size = htonl(expanded_size);
2839 677 : header->ncols = htons(ncols);
2840 677 : header->nrows = htonl(nrows);
2841 677 : header->schema_hash = htonll(hash);
2842 677 : }
2843 :
2844 49136 : int cloudsync_payload_encode_step (cloudsync_payload_context *payload, cloudsync_context *data, int argc, dbvalue_t **argv) {
2845 : DEBUG_FUNCTION("cloudsync_payload_encode_step");
2846 : // debug_values(argc, argv);
2847 :
2848 : // check if the step function is called for the first time
2849 49136 : if (payload->nrows == 0) payload->ncols = (uint16_t)argc;
2850 :
2851 49136 : size_t breq = pk_encode_size((dbvalue_t **)argv, argc, 0, data->skip_decode_idx);
2852 49136 : if (cloudsync_payload_encode_check(payload, breq) == false) {
2853 0 : return cloudsync_set_error(data, "Not enough memory to resize payload internal buffer", DBRES_NOMEM);
2854 : }
2855 :
2856 49136 : char *buffer = payload->buffer + payload->bused;
2857 49136 : size_t bsize = payload->balloc - payload->bused;
2858 49136 : char *p = pk_encode((dbvalue_t **)argv, argc, buffer, false, &bsize, data->skip_decode_idx);
2859 49136 : if (!p) return cloudsync_set_error(data, "An error occurred while encoding payload", DBRES_ERROR);
2860 :
2861 : // update buffer
2862 49136 : payload->bused += breq;
2863 :
2864 : // increment row counter
2865 49136 : ++payload->nrows;
2866 :
2867 49136 : return DBRES_OK;
2868 49136 : }
2869 :
2870 685 : int cloudsync_payload_encode_final (cloudsync_payload_context *payload, cloudsync_context *data) {
2871 : DEBUG_FUNCTION("cloudsync_payload_encode_final");
2872 :
2873 685 : if (payload->nrows == 0) {
2874 8 : if (payload->buffer) cloudsync_memory_free(payload->buffer);
2875 8 : payload->buffer = NULL;
2876 8 : payload->bsize = 0;
2877 8 : return DBRES_OK;
2878 : }
2879 :
2880 677 : if (payload->nrows > UINT32_MAX) {
2881 0 : if (payload->buffer) cloudsync_memory_free(payload->buffer);
2882 0 : payload->buffer = NULL;
2883 0 : payload->bsize = 0;
2884 0 : cloudsync_set_error(data, "Maximum number of payload rows reached", DBRES_ERROR);
2885 0 : return DBRES_ERROR;
2886 : }
2887 :
2888 : // sanity check about buffer size
2889 677 : int header_size = (int)sizeof(cloudsync_payload_header);
2890 677 : int64_t buffer_size = (int64_t)payload->bused - (int64_t)header_size;
2891 677 : if (buffer_size < 0) {
2892 0 : if (payload->buffer) cloudsync_memory_free(payload->buffer);
2893 0 : payload->buffer = NULL;
2894 0 : payload->bsize = 0;
2895 0 : cloudsync_set_error(data, "cloudsync_encode: internal size underflow", DBRES_ERROR);
2896 0 : return DBRES_ERROR;
2897 : }
2898 677 : if (buffer_size > INT_MAX) {
2899 0 : if (payload->buffer) cloudsync_memory_free(payload->buffer);
2900 0 : payload->buffer = NULL;
2901 0 : payload->bsize = 0;
2902 0 : cloudsync_set_error(data, "cloudsync_encode: payload too large to compress (INT_MAX limit)", DBRES_ERROR);
2903 0 : return DBRES_ERROR;
2904 : }
2905 : // try to allocate buffer used for compressed data
2906 677 : int real_buffer_size = (int)buffer_size;
2907 677 : int zbound = LZ4_compressBound(real_buffer_size);
2908 677 : char *zbuffer = cloudsync_memory_alloc(zbound + header_size); // if for some reasons allocation fails then just skip compression
2909 :
2910 : // skip the reserved header from the buffer to compress
2911 677 : char *src_buffer = payload->buffer + sizeof(cloudsync_payload_header);
2912 677 : int zused = (zbuffer) ? LZ4_compress_default(src_buffer, zbuffer+header_size, real_buffer_size, zbound) : 0;
2913 677 : bool use_uncompressed_buffer = (!zused || zused > real_buffer_size);
2914 677 : CHECK_FORCE_UNCOMPRESSED_BUFFER();
2915 :
2916 : // setup payload header
2917 677 : cloudsync_payload_header header = {0};
2918 677 : uint32_t expanded_size = (use_uncompressed_buffer) ? 0 : real_buffer_size;
2919 677 : cloudsync_payload_header_init(&header, expanded_size, payload->ncols, (uint32_t)payload->nrows, data->schema_hash);
2920 :
2921 : // if compression fails or if compressed size is bigger than original buffer, then use the uncompressed buffer
2922 677 : if (use_uncompressed_buffer) {
2923 15 : if (zbuffer) cloudsync_memory_free(zbuffer);
2924 15 : zbuffer = payload->buffer;
2925 15 : zused = real_buffer_size;
2926 15 : }
2927 :
2928 : // compute checksum of the buffer
2929 677 : uint64_t checksum = pk_checksum(zbuffer + header_size, zused);
2930 677 : cloudsync_payload_checksum_store(&header, checksum);
2931 :
2932 : // copy header and data to SQLite BLOB
2933 677 : memcpy(zbuffer, &header, sizeof(cloudsync_payload_header));
2934 677 : int blob_size = zused + sizeof(cloudsync_payload_header);
2935 677 : payload->bsize = blob_size;
2936 :
2937 : // cleanup memory
2938 677 : if (zbuffer != payload->buffer) {
2939 662 : cloudsync_memory_free (payload->buffer);
2940 662 : payload->buffer = zbuffer;
2941 662 : }
2942 :
2943 677 : return DBRES_OK;
2944 685 : }
2945 :
2946 685 : char *cloudsync_payload_blob (cloudsync_payload_context *payload, int64_t *blob_size, int64_t *nrows) {
2947 : DEBUG_FUNCTION("cloudsync_payload_blob");
2948 :
2949 685 : if (blob_size) *blob_size = (int64_t)payload->bsize;
2950 685 : if (nrows) *nrows = (int64_t)payload->nrows;
2951 685 : return payload->buffer;
2952 : }
2953 :
2954 441504 : static int cloudsync_payload_decode_callback (void *xdata, int index, int type, int64_t ival, double dval, char *pval) {
2955 441504 : cloudsync_pk_decode_bind_context *decode_context = (cloudsync_pk_decode_bind_context*)xdata;
2956 441504 : int rc = pk_decode_bind_callback(decode_context->vm, index, type, ival, dval, pval);
2957 :
2958 441504 : if (rc == DBRES_OK) {
2959 : // the dbversion index is smaller than seq index, so it is processed first
2960 : // when processing the dbversion column: save the value to the tmp_dbversion field
2961 : // when processing the seq column: update the dbversion and seq fields only if the current dbversion is greater than the last max value
2962 441504 : switch (index) {
2963 : case CLOUDSYNC_PK_INDEX_TBL:
2964 49056 : if (type == DBTYPE_TEXT) {
2965 49056 : decode_context->tbl = pval;
2966 49056 : decode_context->tbl_len = ival;
2967 49056 : }
2968 49056 : break;
2969 : case CLOUDSYNC_PK_INDEX_PK:
2970 49056 : if (type == DBTYPE_BLOB) {
2971 49056 : decode_context->pk = pval;
2972 49056 : decode_context->pk_len = ival;
2973 49056 : }
2974 49056 : break;
2975 : case CLOUDSYNC_PK_INDEX_COLNAME:
2976 49056 : if (type == DBTYPE_TEXT) {
2977 49056 : decode_context->col_name = pval;
2978 49056 : decode_context->col_name_len = ival;
2979 49056 : }
2980 49056 : break;
2981 : case CLOUDSYNC_PK_INDEX_COLVERSION:
2982 49056 : if (type == DBTYPE_INTEGER) decode_context->col_version = ival;
2983 49056 : break;
2984 : case CLOUDSYNC_PK_INDEX_DBVERSION:
2985 49056 : if (type == DBTYPE_INTEGER) decode_context->db_version = ival;
2986 49056 : break;
2987 : case CLOUDSYNC_PK_INDEX_SITEID:
2988 49056 : if (type == DBTYPE_BLOB) {
2989 49056 : decode_context->site_id = pval;
2990 49056 : decode_context->site_id_len = ival;
2991 49056 : }
2992 49056 : break;
2993 : case CLOUDSYNC_PK_INDEX_CL:
2994 49056 : if (type == DBTYPE_INTEGER) decode_context->cl = ival;
2995 49056 : break;
2996 : case CLOUDSYNC_PK_INDEX_SEQ:
2997 49056 : if (type == DBTYPE_INTEGER) decode_context->seq = ival;
2998 49056 : break;
2999 : }
3000 441504 : }
3001 :
3002 441504 : return rc;
3003 : }
3004 :
3005 : // #ifndef CLOUDSYNC_OMIT_RLS_VALIDATION
3006 :
3007 674 : int cloudsync_payload_apply (cloudsync_context *data, const char *payload, int blen, int *pnrows) {
3008 : // sanity check
3009 674 : if (blen < (int)sizeof(cloudsync_payload_header)) return cloudsync_set_error(data, "Error on cloudsync_payload_apply: invalid payload length", DBRES_MISUSE);
3010 :
3011 : // decode header
3012 : cloudsync_payload_header header;
3013 674 : memcpy(&header, payload, sizeof(cloudsync_payload_header));
3014 :
3015 674 : header.signature = ntohl(header.signature);
3016 674 : header.expanded_size = ntohl(header.expanded_size);
3017 674 : header.ncols = ntohs(header.ncols);
3018 674 : header.nrows = ntohl(header.nrows);
3019 674 : header.schema_hash = ntohll(header.schema_hash);
3020 :
3021 : // compare schema_hash only if not disabled and if the received payload was created with the current header version
3022 : // to avoid schema hash mismatch when processed by a peer with a different extension version during software updates.
3023 674 : if (dbutils_settings_get_int64_value(data, CLOUDSYNC_KEY_SKIP_SCHEMA_HASH_CHECK) == 0 && header.version == CLOUDSYNC_PAYLOAD_VERSION_LATEST ) {
3024 674 : if (header.schema_hash != data->schema_hash) {
3025 4 : if (!database_check_schema_hash(data, header.schema_hash)) {
3026 : char buffer[1024];
3027 2 : snprintf(buffer, sizeof(buffer), "Cannot apply the received payload because the schema hash is unknown %llu.", header.schema_hash);
3028 2 : return cloudsync_set_error(data, buffer, DBRES_MISUSE);
3029 : }
3030 2 : }
3031 672 : }
3032 :
3033 : // sanity check header
3034 672 : if ((header.signature != CLOUDSYNC_PAYLOAD_SIGNATURE) || (header.ncols == 0)) {
3035 0 : return cloudsync_set_error(data, "Error on cloudsync_payload_apply: invalid signature or column size", DBRES_MISUSE);
3036 : }
3037 :
3038 672 : const char *buffer = payload + sizeof(cloudsync_payload_header);
3039 672 : size_t buf_len = (size_t)blen - sizeof(cloudsync_payload_header);
3040 :
3041 : // sanity check checksum (only if version is >= 2)
3042 672 : if (header.version >= CLOUDSYNC_PAYLOAD_MIN_VERSION_WITH_CHECKSUM) {
3043 672 : uint64_t checksum = pk_checksum(buffer, buf_len);
3044 672 : if (cloudsync_payload_checksum_verify(&header, checksum) == false) {
3045 1 : return cloudsync_set_error(data, "Error on cloudsync_payload_apply: invalid checksum", DBRES_MISUSE);
3046 : }
3047 671 : }
3048 :
3049 : // check if payload is compressed
3050 671 : char *clone = NULL;
3051 671 : if (header.expanded_size != 0) {
3052 656 : clone = (char *)cloudsync_memory_alloc(header.expanded_size);
3053 656 : if (!clone) return cloudsync_set_error(data, "Unable to allocate memory to uncompress payload", DBRES_NOMEM);
3054 :
3055 656 : int lz4_rc = LZ4_decompress_safe(buffer, clone, (int)buf_len, (int)header.expanded_size);
3056 656 : if (lz4_rc <= 0 || (uint32_t)lz4_rc != header.expanded_size) {
3057 0 : if (clone) cloudsync_memory_free(clone);
3058 0 : return cloudsync_set_error(data, "Error on cloudsync_payload_apply: unable to decompress BLOB", DBRES_MISUSE);
3059 : }
3060 :
3061 656 : buffer = (const char *)clone;
3062 656 : buf_len = (size_t)header.expanded_size;
3063 656 : }
3064 :
3065 : // precompile the insert statement
3066 671 : dbvm_t *vm = NULL;
3067 671 : int rc = databasevm_prepare(data, SQL_CHANGES_INSERT_ROW, &vm, 0);
3068 671 : if (rc != DBRES_OK) {
3069 0 : if (clone) cloudsync_memory_free(clone);
3070 0 : return cloudsync_set_error(data, "Error on cloudsync_payload_apply: error while compiling SQL statement", rc);
3071 : }
3072 :
3073 : // process buffer, one row at a time
3074 671 : uint16_t ncols = header.ncols;
3075 671 : uint32_t nrows = header.nrows;
3076 671 : int64_t last_payload_db_version = -1;
3077 671 : int dbversion = dbutils_settings_get_int_value(data, CLOUDSYNC_KEY_CHECK_DBVERSION);
3078 671 : int seq = dbutils_settings_get_int_value(data, CLOUDSYNC_KEY_CHECK_SEQ);
3079 671 : cloudsync_pk_decode_bind_context decoded_context = {.vm = vm};
3080 :
3081 : // Initialize deferred column-batch merge
3082 671 : merge_pending_batch batch = {0};
3083 671 : data->pending_batch = &batch;
3084 671 : bool in_savepoint = false;
3085 671 : const void *last_pk = NULL;
3086 671 : int64_t last_pk_len = 0;
3087 671 : const char *last_tbl = NULL;
3088 671 : int64_t last_tbl_len = 0;
3089 :
3090 49727 : for (uint32_t i=0; i<nrows; ++i) {
3091 49056 : size_t seek = 0;
3092 49056 : int res = pk_decode((char *)buffer, buf_len, ncols, &seek, data->skip_decode_idx, cloudsync_payload_decode_callback, &decoded_context);
3093 49056 : if (res == -1) {
3094 0 : merge_flush_pending(data);
3095 0 : data->pending_batch = NULL;
3096 0 : if (batch.cached_vm) { databasevm_finalize(batch.cached_vm); batch.cached_vm = NULL; }
3097 0 : if (batch.cached_col_names) { cloudsync_memory_free(batch.cached_col_names); batch.cached_col_names = NULL; }
3098 0 : if (batch.entries) { cloudsync_memory_free(batch.entries); batch.entries = NULL; }
3099 0 : if (in_savepoint) database_rollback_savepoint(data, "cloudsync_payload_apply");
3100 0 : rc = DBRES_ERROR;
3101 0 : goto cleanup;
3102 : }
3103 :
3104 : // Detect PK/table/db_version boundary to flush pending batch
3105 97441 : bool pk_changed = (last_pk != NULL &&
3106 48385 : (last_pk_len != decoded_context.pk_len ||
3107 46469 : memcmp(last_pk, decoded_context.pk, last_pk_len) != 0));
3108 97441 : bool tbl_changed = (last_tbl != NULL &&
3109 48385 : (last_tbl_len != decoded_context.tbl_len ||
3110 48315 : memcmp(last_tbl, decoded_context.tbl, last_tbl_len) != 0));
3111 49056 : bool db_version_changed = (last_payload_db_version != decoded_context.db_version);
3112 :
3113 : // Flush pending batch before any boundary change
3114 49056 : if (pk_changed || tbl_changed || db_version_changed) {
3115 18207 : int flush_rc = merge_flush_pending(data);
3116 18207 : if (flush_rc != DBRES_OK) {
3117 1 : rc = flush_rc;
3118 : // continue processing remaining rows
3119 1 : }
3120 18207 : }
3121 :
3122 : // Per-db_version savepoints group rows with the same source db_version
3123 : // into one transaction. In SQLite autocommit mode, the RELEASE triggers
3124 : // the commit hook which bumps data->db_version and resets seq, ensuring
3125 : // unique (db_version, seq) tuples across groups. In PostgreSQL SPI,
3126 : // database_in_transaction() is always true so this block is inactive —
3127 : // the inner per-PK savepoint in merge_flush_pending handles RLS instead.
3128 49056 : if (in_savepoint && db_version_changed) {
3129 4280 : rc = database_commit_savepoint(data, "cloudsync_payload_apply");
3130 4280 : if (rc != DBRES_OK) {
3131 0 : merge_pending_free_entries(&batch);
3132 0 : data->pending_batch = NULL;
3133 0 : cloudsync_set_error(data, "Error on cloudsync_payload_apply: unable to release a savepoint", rc);
3134 0 : goto cleanup;
3135 : }
3136 4280 : in_savepoint = false;
3137 4280 : }
3138 :
3139 49056 : if (!in_savepoint && db_version_changed && !database_in_transaction(data)) {
3140 4951 : rc = database_begin_savepoint(data, "cloudsync_payload_apply");
3141 4951 : if (rc != DBRES_OK) {
3142 0 : merge_pending_free_entries(&batch);
3143 0 : data->pending_batch = NULL;
3144 0 : cloudsync_set_error(data, "Error on cloudsync_payload_apply: unable to start a transaction", rc);
3145 0 : goto cleanup;
3146 : }
3147 4951 : in_savepoint = true;
3148 4951 : }
3149 :
3150 : // Track db_version for batch-flush boundary detection
3151 49056 : if (db_version_changed) {
3152 4951 : last_payload_db_version = decoded_context.db_version;
3153 4951 : }
3154 :
3155 : // Update PK/table tracking
3156 49056 : last_pk = decoded_context.pk;
3157 49056 : last_pk_len = decoded_context.pk_len;
3158 49056 : last_tbl = decoded_context.tbl;
3159 49056 : last_tbl_len = decoded_context.tbl_len;
3160 :
3161 49056 : rc = databasevm_step(vm);
3162 49056 : if (rc != DBRES_DONE) {
3163 : // don't "break;", the error can be due to a RLS policy.
3164 : // in case of error we try to apply the following changes
3165 2 : }
3166 :
3167 49056 : buffer += seek;
3168 49056 : buf_len -= seek;
3169 49056 : dbvm_reset(vm);
3170 49056 : }
3171 :
3172 : // Final flush after loop
3173 : {
3174 671 : int flush_rc = merge_flush_pending(data);
3175 671 : if (flush_rc != DBRES_OK && rc == DBRES_OK) rc = flush_rc;
3176 : }
3177 671 : data->pending_batch = NULL;
3178 :
3179 671 : if (in_savepoint) {
3180 671 : int rc1 = database_commit_savepoint(data, "cloudsync_payload_apply");
3181 671 : if (rc1 != DBRES_OK) rc = rc1;
3182 671 : }
3183 :
3184 : // save last error (unused if function returns OK)
3185 671 : if (rc != DBRES_OK && rc != DBRES_DONE) {
3186 1 : cloudsync_set_dberror(data);
3187 1 : }
3188 :
3189 671 : if (rc == DBRES_DONE) rc = DBRES_OK;
3190 1341 : if (rc == DBRES_OK) {
3191 : char buf[256];
3192 670 : if (decoded_context.db_version >= dbversion) {
3193 536 : snprintf(buf, sizeof(buf), "%" PRId64, decoded_context.db_version);
3194 536 : dbutils_settings_set_key_value(data, CLOUDSYNC_KEY_CHECK_DBVERSION, buf);
3195 :
3196 536 : if (decoded_context.seq != seq) {
3197 332 : snprintf(buf, sizeof(buf), "%" PRId64, decoded_context.seq);
3198 332 : dbutils_settings_set_key_value(data, CLOUDSYNC_KEY_CHECK_SEQ, buf);
3199 332 : }
3200 536 : }
3201 670 : }
3202 :
3203 : cleanup:
3204 : // cleanup merge_pending_batch
3205 671 : if (batch.cached_vm) { databasevm_finalize(batch.cached_vm); batch.cached_vm = NULL; }
3206 671 : if (batch.cached_col_names) { cloudsync_memory_free(batch.cached_col_names); batch.cached_col_names = NULL; }
3207 671 : if (batch.entries) { cloudsync_memory_free(batch.entries); batch.entries = NULL; }
3208 :
3209 : // cleanup vm
3210 671 : if (vm) databasevm_finalize(vm);
3211 :
3212 : // cleanup memory
3213 671 : if (clone) cloudsync_memory_free(clone);
3214 :
3215 : // error already saved in (save last error)
3216 671 : if (rc != DBRES_OK) return rc;
3217 :
3218 : // return the number of processed rows
3219 670 : if (pnrows) *pnrows = nrows;
3220 670 : return DBRES_OK;
3221 674 : }
3222 :
3223 : // MARK: - Payload load/store -
3224 :
3225 0 : int cloudsync_payload_get (cloudsync_context *data, char **blob, int *blob_size, int *db_version, int64_t *new_db_version) {
3226 : // retrieve current db_version and seq
3227 0 : *db_version = dbutils_settings_get_int_value(data, CLOUDSYNC_KEY_SEND_DBVERSION);
3228 0 : if (*db_version < 0) return DBRES_ERROR;
3229 :
3230 : // retrieve BLOB
3231 : char sql[1024];
3232 0 : snprintf(sql, sizeof(sql), "WITH max_db_version AS (SELECT MAX(db_version) AS max_db_version FROM cloudsync_changes WHERE site_id=cloudsync_siteid()) "
3233 : "SELECT * FROM (SELECT cloudsync_payload_encode(tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq) AS payload, max_db_version AS max_db_version FROM cloudsync_changes, max_db_version WHERE site_id=cloudsync_siteid() AND db_version>%d) WHERE payload IS NOT NULL", *db_version);
3234 :
3235 0 : int64_t len = 0;
3236 0 : int rc = database_select_blob_int(data, sql, blob, &len, new_db_version);
3237 0 : *blob_size = (int)len;
3238 0 : if (rc != DBRES_OK) return rc;
3239 :
3240 : // exit if there is no data to send
3241 0 : if (*blob == NULL || *blob_size == 0) return DBRES_OK;
3242 0 : return rc;
3243 0 : }
3244 :
3245 : #ifdef CLOUDSYNC_DESKTOP_OS
3246 0 : int cloudsync_payload_save (cloudsync_context *data, const char *payload_path, int *size) {
3247 : DEBUG_FUNCTION("cloudsync_payload_save");
3248 :
3249 : // silently delete any other payload with the same name
3250 0 : cloudsync_file_delete(payload_path);
3251 :
3252 : // retrieve payload
3253 0 : char *blob = NULL;
3254 0 : int blob_size = 0, db_version = 0;
3255 0 : int64_t new_db_version = 0;
3256 0 : int rc = cloudsync_payload_get(data, &blob, &blob_size, &db_version, &new_db_version);
3257 0 : if (rc != DBRES_OK) {
3258 0 : if (db_version < 0) return cloudsync_set_error(data, "Unable to retrieve db_version", rc);
3259 0 : return cloudsync_set_error(data, "Unable to retrieve changes in cloudsync_payload_save", rc);
3260 : }
3261 :
3262 : // exit if there is no data to save
3263 0 : if (blob == NULL || blob_size == 0) {
3264 0 : if (size) *size = 0;
3265 0 : return DBRES_OK;
3266 : }
3267 :
3268 : // write payload to file
3269 0 : bool res = cloudsync_file_write(payload_path, blob, (size_t)blob_size);
3270 0 : cloudsync_memory_free(blob);
3271 0 : if (res == false) {
3272 0 : return cloudsync_set_error(data, "Unable to write payload to file path", DBRES_IOERR);
3273 : }
3274 :
3275 : // returns blob size
3276 0 : if (size) *size = blob_size;
3277 0 : return DBRES_OK;
3278 0 : }
3279 : #endif
3280 :
3281 : // MARK: - Core -
3282 :
3283 271 : int cloudsync_table_sanity_check (cloudsync_context *data, const char *name, bool skip_int_pk_check) {
3284 : DEBUG_DBFUNCTION("cloudsync_table_sanity_check %s", name);
3285 : char buffer[2048];
3286 :
3287 : // sanity check table name
3288 271 : if (name == NULL) {
3289 1 : return cloudsync_set_error(data, "cloudsync_init requires a non-null table parameter", DBRES_ERROR);
3290 : }
3291 :
3292 : // avoid allocating heap memory for SQL statements by setting a maximum length of 512 characters
3293 : // for table names. This limit is reasonable and helps prevent memory management issues.
3294 270 : const size_t maxlen = CLOUDSYNC_MAX_TABLENAME_LEN;
3295 270 : if (strlen(name) > maxlen) {
3296 1 : snprintf(buffer, sizeof(buffer), "Table name cannot be longer than %d characters", (int)maxlen);
3297 1 : return cloudsync_set_error(data, buffer, DBRES_ERROR);
3298 : }
3299 :
3300 : // check if already initialized
3301 269 : cloudsync_table_context *table = table_lookup(data, name);
3302 269 : if (table) return DBRES_OK;
3303 :
3304 : // check if table exists
3305 265 : if (database_table_exists(data, name, cloudsync_schema(data)) == false) {
3306 2 : snprintf(buffer, sizeof(buffer), "Table %s does not exist", name);
3307 2 : return cloudsync_set_error(data, buffer, DBRES_ERROR);
3308 : }
3309 :
3310 : // no more than 128 columns can be used as a composite primary key (SQLite hard limit)
3311 263 : int npri_keys = database_count_pk(data, name, false, cloudsync_schema(data));
3312 263 : if (npri_keys < 0) return cloudsync_set_dberror(data);
3313 263 : if (npri_keys > 128) return cloudsync_set_error(data, "No more than 128 columns can be used to form a composite primary key", DBRES_ERROR);
3314 :
3315 : #if CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
3316 : // if count == 0 means that rowid will be used as primary key (BTW: very bad choice for the user)
3317 263 : if (npri_keys == 0) {
3318 1 : snprintf(buffer, sizeof(buffer), "Rowid only tables are not supported, all primary keys must be explicitly set and declared as NOT NULL (table %s)", name);
3319 1 : return cloudsync_set_error(data, buffer, DBRES_ERROR);
3320 : }
3321 : #endif
3322 :
3323 262 : if (!skip_int_pk_check) {
3324 199 : if (npri_keys == 1) {
3325 : // the affinity of a column is determined by the declared type of the column,
3326 : // according to the following rules in the order shown:
3327 : // 1. If the declared type contains the string "INT" then it is assigned INTEGER affinity.
3328 115 : int npri_keys_int = database_count_int_pk(data, name, cloudsync_schema(data));
3329 115 : if (npri_keys_int < 0) return cloudsync_set_dberror(data);
3330 115 : if (npri_keys == npri_keys_int) {
3331 1 : snprintf(buffer, sizeof(buffer), "Table %s uses a single-column INTEGER primary key. For CRDT replication, primary keys must be globally unique. Consider using a TEXT primary key with UUIDs or ULID to avoid conflicts across nodes. If you understand the risk and still want to use this INTEGER primary key, set the third argument of the cloudsync_init function to 1 to skip this check.", name);
3332 1 : return cloudsync_set_error(data, buffer, DBRES_ERROR);
3333 : }
3334 :
3335 114 : }
3336 198 : }
3337 :
3338 : // if user declared explicit primary key(s) then make sure they are all declared as NOT NULL
3339 : #if CLOUDSYNC_CHECK_NOTNULL_PRIKEYS
3340 : if (npri_keys > 0) {
3341 : int npri_keys_notnull = database_count_pk(data, name, true, cloudsync_schema(data));
3342 : if (npri_keys_notnull < 0) return cloudsync_set_dberror(data);
3343 : if (npri_keys != npri_keys_notnull) {
3344 : snprintf(buffer, sizeof(buffer), "All primary keys must be explicitly declared as NOT NULL (table %s)", name);
3345 : return cloudsync_set_error(data, buffer, DBRES_ERROR);
3346 : }
3347 : }
3348 : #endif
3349 :
3350 : // check for columns declared as NOT NULL without a DEFAULT value.
3351 : // Otherwise, col_merge_stmt would fail if changes to other columns are inserted first.
3352 261 : int n_notnull_nodefault = database_count_notnull_without_default(data, name, cloudsync_schema(data));
3353 261 : if (n_notnull_nodefault < 0) return cloudsync_set_dberror(data);
3354 261 : if (n_notnull_nodefault > 0) {
3355 0 : snprintf(buffer, sizeof(buffer), "All non-primary key columns declared as NOT NULL must have a DEFAULT value. (table %s)", name);
3356 0 : return cloudsync_set_error(data, buffer, DBRES_ERROR);
3357 : }
3358 :
3359 261 : return DBRES_OK;
3360 271 : }
3361 :
3362 3 : int cloudsync_cleanup_internal (cloudsync_context *data, cloudsync_table_context *table) {
3363 3 : if (cloudsync_context_init(data) == NULL) return DBRES_MISUSE;
3364 :
3365 : // drop meta-table
3366 3 : const char *table_name = table->name;
3367 3 : char *sql = cloudsync_memory_mprintf(SQL_DROP_CLOUDSYNC_TABLE, table->meta_ref);
3368 3 : int rc = database_exec(data, sql);
3369 3 : cloudsync_memory_free(sql);
3370 3 : if (rc != DBRES_OK) {
3371 : char buffer[1024];
3372 0 : snprintf(buffer, sizeof(buffer), "Unable to drop cloudsync table %s_cloudsync in cloudsync_cleanup", table_name);
3373 0 : return cloudsync_set_error(data, buffer, rc);
3374 : }
3375 :
3376 : // drop original triggers
3377 3 : rc = database_delete_triggers(data, table_name);
3378 3 : if (rc != DBRES_OK) {
3379 : char buffer[1024];
3380 0 : snprintf(buffer, sizeof(buffer), "Unable to delete triggers for table %s", table_name);
3381 0 : return cloudsync_set_error(data, buffer, rc);
3382 : }
3383 :
3384 : // remove all table related settings
3385 3 : dbutils_table_settings_set_key_value(data, table_name, NULL, NULL, NULL);
3386 3 : return DBRES_OK;
3387 3 : }
3388 :
3389 3 : int cloudsync_cleanup (cloudsync_context *data, const char *table_name) {
3390 3 : cloudsync_table_context *table = table_lookup(data, table_name);
3391 3 : if (!table) return DBRES_OK;
3392 :
3393 : // TODO: check what happen if cloudsync_cleanup_internal failes (not eveything dropped) and the table is still in memory?
3394 :
3395 3 : int rc = cloudsync_cleanup_internal(data, table);
3396 3 : if (rc != DBRES_OK) return rc;
3397 :
3398 3 : int counter = table_remove(data, table);
3399 3 : table_free(table);
3400 :
3401 3 : if (counter == 0) {
3402 : // cleanup database on last table
3403 1 : cloudsync_reset_siteid(data);
3404 1 : dbutils_settings_cleanup(data);
3405 1 : } else {
3406 2 : if (database_internal_table_exists(data, CLOUDSYNC_TABLE_SETTINGS_NAME) == true) {
3407 2 : cloudsync_update_schema_hash(data);
3408 2 : }
3409 : }
3410 :
3411 3 : return DBRES_OK;
3412 3 : }
3413 :
3414 0 : int cloudsync_cleanup_all (cloudsync_context *data) {
3415 0 : return database_cleanup(data);
3416 : }
3417 :
3418 436 : int cloudsync_terminate (cloudsync_context *data) {
3419 : // can't use for/loop here because data->tables_count is changed by table_remove
3420 668 : while (data->tables_count > 0) {
3421 232 : cloudsync_table_context *t = data->tables[data->tables_count - 1];
3422 232 : table_remove(data, t);
3423 232 : table_free(t);
3424 : }
3425 :
3426 436 : if (data->schema_version_stmt) databasevm_finalize(data->schema_version_stmt);
3427 436 : if (data->data_version_stmt) databasevm_finalize(data->data_version_stmt);
3428 436 : if (data->db_version_stmt) databasevm_finalize(data->db_version_stmt);
3429 436 : if (data->getset_siteid_stmt) databasevm_finalize(data->getset_siteid_stmt);
3430 436 : if (data->current_schema) cloudsync_memory_free(data->current_schema);
3431 :
3432 436 : data->schema_version_stmt = NULL;
3433 436 : data->data_version_stmt = NULL;
3434 436 : data->db_version_stmt = NULL;
3435 436 : data->getset_siteid_stmt = NULL;
3436 436 : data->current_schema = NULL;
3437 :
3438 : // reset the site_id so the cloudsync_context_init will be executed again
3439 : // if any other cloudsync function is called after terminate
3440 436 : data->site_id[0] = 0;
3441 :
3442 436 : return 1;
3443 : }
3444 :
3445 266 : int cloudsync_init_table (cloudsync_context *data, const char *table_name, const char *algo_name, bool skip_int_pk_check) {
3446 : // sanity check table and its primary key(s)
3447 266 : int rc = cloudsync_table_sanity_check(data, table_name, skip_int_pk_check);
3448 266 : if (rc != DBRES_OK) return rc;
3449 :
3450 : // init cloudsync_settings
3451 264 : if (cloudsync_context_init(data) == NULL) {
3452 0 : return cloudsync_set_error(data, "Unable to initialize cloudsync context", DBRES_MISUSE);
3453 : }
3454 :
3455 : // sanity check algo name (if exists)
3456 264 : table_algo algo_new = table_algo_none;
3457 264 : if (!algo_name) algo_name = CLOUDSYNC_DEFAULT_ALGO;
3458 :
3459 264 : algo_new = cloudsync_algo_from_name(algo_name);
3460 264 : if (algo_new == table_algo_none) {
3461 : char buffer[1024];
3462 1 : snprintf(buffer, sizeof(buffer), "Unknown CRDT algorithm name %s", algo_name);
3463 1 : return cloudsync_set_error(data, buffer, DBRES_ERROR);
3464 : }
3465 :
3466 : // DWS and AWS algorithms are not yet implemented in the merge logic
3467 263 : if (algo_new == table_algo_crdt_dws || algo_new == table_algo_crdt_aws) {
3468 : char buffer[1024];
3469 2 : snprintf(buffer, sizeof(buffer), "CRDT algorithm %s is not yet supported", algo_name);
3470 2 : return cloudsync_set_error(data, buffer, DBRES_ERROR);
3471 : }
3472 :
3473 : // check if table name was already augmented
3474 261 : table_algo algo_current = dbutils_table_settings_get_algo(data, table_name);
3475 :
3476 : // sanity check algorithm
3477 261 : if ((algo_new == algo_current) && (algo_current != table_algo_none)) {
3478 : // if table algorithms and the same and not none, do nothing
3479 261 : } else if ((algo_new == table_algo_none) && (algo_current == table_algo_none)) {
3480 : // nothing is written into settings because the default table_algo_crdt_cls will be used
3481 0 : algo_new = algo_current = table_algo_crdt_cls;
3482 233 : } else if ((algo_new == table_algo_none) && (algo_current != table_algo_none)) {
3483 : // algo is already written into settins so just use it
3484 0 : algo_new = algo_current;
3485 233 : } else if ((algo_new != table_algo_none) && (algo_current == table_algo_none)) {
3486 : // write table algo name in settings
3487 233 : dbutils_table_settings_set_key_value(data, table_name, "*", "algo", algo_name);
3488 233 : } else {
3489 : // error condition
3490 0 : return cloudsync_set_error(data, "The function cloudsync_cleanup(table) must be called before changing a table algorithm", DBRES_MISUSE);
3491 : }
3492 :
3493 : // Run the following function even if table was already augmented.
3494 : // It is safe to call the following function multiple times, if there is nothing to update nothing will be changed.
3495 : // After an alter table, in contrast, all the cloudsync triggers, tables and stmts must be recreated.
3496 :
3497 : // sync algo with table (unused in this version)
3498 : // cloudsync_sync_table_key(data, table_name, "*", CLOUDSYNC_KEY_ALGO, crdt_algo_name(algo_new));
3499 :
3500 : // read row-level filter from settings (if any)
3501 : char init_filter_buf[2048];
3502 261 : int init_frc = dbutils_table_settings_get_value(data, table_name, "*", "filter", init_filter_buf, sizeof(init_filter_buf));
3503 261 : const char *init_filter = (init_frc == DBRES_OK && init_filter_buf[0]) ? init_filter_buf : NULL;
3504 :
3505 : // check triggers
3506 261 : rc = database_create_triggers(data, table_name, algo_new, init_filter);
3507 261 : if (rc != DBRES_OK) return cloudsync_set_error(data, "An error occurred while creating triggers", DBRES_MISUSE);
3508 :
3509 : // check meta-table
3510 261 : rc = database_create_metatable(data, table_name);
3511 261 : if (rc != DBRES_OK) return cloudsync_set_error(data, "An error occurred while creating metatable", DBRES_MISUSE);
3512 :
3513 : // add prepared statements
3514 261 : if (cloudsync_add_dbvms(data) != DBRES_OK) {
3515 0 : return cloudsync_set_error(data, "An error occurred while trying to compile prepared SQL statements", DBRES_MISUSE);
3516 : }
3517 :
3518 : // add table to in-memory data context
3519 261 : if (table_add_to_context(data, algo_new, table_name) == false) {
3520 : char buffer[1024];
3521 0 : snprintf(buffer, sizeof(buffer), "An error occurred while adding %s table information to global context", table_name);
3522 0 : return cloudsync_set_error(data, buffer, DBRES_MISUSE);
3523 : }
3524 :
3525 261 : if (cloudsync_refill_metatable(data, table_name) != DBRES_OK) {
3526 0 : return cloudsync_set_error(data, "An error occurred while trying to fill the augmented table", DBRES_MISUSE);
3527 : }
3528 :
3529 261 : return DBRES_OK;
3530 266 : }
|