LCOV - code coverage report
Current view: top level - src - cloudsync.c (source / functions) Coverage Total Hit
Test: coverage.info Lines: 86.9 % 2120 1842
Test Date: 2026-04-24 14:45:44 Functions: 91.3 % 127 116

            Line data    Source code
       1              : //
       2              : //  cloudsync.c
       3              : //  cloudsync
       4              : //
       5              : //  Created by Marco Bambini on 16/05/24.
       6              : //
       7              : 
       8              : #include <inttypes.h>
       9              : #include <stdbool.h>
      10              : #include <limits.h>
      11              : #include <stdint.h>
      12              : #include <stdlib.h>
      13              : #include <string.h>
      14              : #include <assert.h>
      15              : #include <stdio.h>
      16              : #include <errno.h>
      17              : #include <math.h>
      18              : 
      19              : #include "cloudsync.h"
      20              : #include "lz4.h"
      21              : #include "pk.h"
      22              : #include "sql.h"
      23              : #include "utils.h"
      24              : #include "dbutils.h"
      25              : #include "block.h"
      26              : 
      27              : #ifdef _WIN32
      28              : #include <winsock2.h>
      29              : #include <ws2tcpip.h>
      30              : #else
      31              : #include <arpa/inet.h>                          // for htonl, htons, ntohl, ntohs
      32              : #include <netinet/in.h>                         // for struct sockaddr_in, INADDR_ANY, etc. (if needed)
      33              : #endif
      34              : 
      35              : #ifndef htonll
      36              : #if __BIG_ENDIAN__
      37              : #define htonll(x)                               (x)
      38              : #define ntohll(x)                               (x)
      39              : #else
      40              : #ifndef htobe64
      41              : #define htonll(x)                               ((uint64_t)htonl((x) & 0xFFFFFFFF) << 32 | (uint64_t)htonl((x) >> 32))
      42              : #define ntohll(x)                               ((uint64_t)ntohl((x) & 0xFFFFFFFF) << 32 | (uint64_t)ntohl((x) >> 32))
      43              : #else
      44              : #define htonll(x)                               htobe64(x)
      45              : #define ntohll(x)                               be64toh(x)
      46              : #endif
      47              : #endif
      48              : #endif
      49              : 
      50              : #define CLOUDSYNC_INIT_NTABLES                  64
      51              : #define CLOUDSYNC_MIN_DB_VERSION                0
      52              : 
      53              : #define CLOUDSYNC_PAYLOAD_MINBUF_SIZE                   (512*1024)
      54              : #define CLOUDSYNC_PAYLOAD_SIGNATURE                     0x434C5359  /* 'C','L','S','Y' */
      55              : #define CLOUDSYNC_PAYLOAD_VERSION_ORIGNAL               1
      56              : #define CLOUDSYNC_PAYLOAD_VERSION_1                     CLOUDSYNC_PAYLOAD_VERSION_ORIGNAL
      57              : #define CLOUDSYNC_PAYLOAD_VERSION_2                     2
      58              : #define CLOUDSYNC_PAYLOAD_VERSION_LATEST                CLOUDSYNC_PAYLOAD_VERSION_2
      59              : #define CLOUDSYNC_PAYLOAD_MIN_VERSION_WITH_CHECKSUM     CLOUDSYNC_PAYLOAD_VERSION_2
      60              : 
      61              : #ifndef MAX
      62              : #define MAX(a, b)                               (((a)>(b))?(a):(b))
      63              : #endif
      64              : 
      65              : #define DEBUG_DBERROR(_rc, _fn, _data)   do {if (_rc != DBRES_OK) printf("Error in %s: %s\n", _fn, database_errmsg(_data));} while (0)
      66              : 
      67              : typedef enum {
      68              :     CLOUDSYNC_PK_INDEX_TBL          = 0,
      69              :     CLOUDSYNC_PK_INDEX_PK           = 1,
      70              :     CLOUDSYNC_PK_INDEX_COLNAME      = 2,
      71              :     CLOUDSYNC_PK_INDEX_COLVALUE     = 3,
      72              :     CLOUDSYNC_PK_INDEX_COLVERSION   = 4,
      73              :     CLOUDSYNC_PK_INDEX_DBVERSION    = 5,
      74              :     CLOUDSYNC_PK_INDEX_SITEID       = 6,
      75              :     CLOUDSYNC_PK_INDEX_CL           = 7,
      76              :     CLOUDSYNC_PK_INDEX_SEQ          = 8
      77              : } CLOUDSYNC_PK_INDEX;
      78              : 
      79              : typedef enum {
      80              :     DBVM_VALUE_ERROR      = -1,
      81              :     DBVM_VALUE_UNCHANGED  = 0,
      82              :     DBVM_VALUE_CHANGED    = 1,
      83              : } DBVM_VALUE;
      84              : 
      85              : #define SYNCBIT_SET(_data)                  _data->insync = 1
      86              : #define SYNCBIT_RESET(_data)                _data->insync = 0
      87              : 
      88              : // MARK: - Deferred column-batch merge -
      89              : 
      90              : typedef struct {
      91              :     const char  *col_name;       // pointer into table_context->col_name[idx] (stable)
      92              :     dbvalue_t   *col_value;      // duplicated via database_value_dup (owned)
      93              :     int64_t     col_version;
      94              :     int64_t     db_version;
      95              :     uint8_t     site_id[UUID_LEN];
      96              :     int         site_id_len;
      97              :     int64_t     seq;
      98              : } merge_pending_entry;
      99              : 
     100              : typedef struct {
     101              :     cloudsync_table_context *table;
     102              :     char        *pk;             // malloc'd copy, freed on flush
     103              :     int         pk_len;
     104              :     int64_t     cl;
     105              :     bool        sentinel_pending;
     106              :     bool        row_exists;       // true when the PK already exists locally
     107              :     int         count;
     108              :     int         capacity;
     109              :     merge_pending_entry *entries;
     110              : 
     111              :     // Statement cache — reuse the prepared statement when the column
     112              :     // combination and row_exists flag match between consecutive PK flushes.
     113              :     dbvm_t      *cached_vm;
     114              :     bool        cached_row_exists;
     115              :     int         cached_col_count;
     116              :     const char  **cached_col_names; // array of pointers into table_context (not owned)
     117              : } merge_pending_batch;
     118              : 
     119              : // MARK: -
     120              : 
     121              : struct cloudsync_pk_decode_bind_context {
     122              :     dbvm_t      *vm;
     123              :     char        *tbl;
     124              :     int64_t     tbl_len;
     125              :     const void  *pk;
     126              :     int64_t     pk_len;
     127              :     char        *col_name;
     128              :     int64_t     col_name_len;
     129              :     int64_t     col_version;
     130              :     int64_t     db_version;
     131              :     const void  *site_id;
     132              :     int64_t     site_id_len;
     133              :     int64_t     cl;
     134              :     int64_t     seq;
     135              : };
     136              : 
     137              : struct cloudsync_context {
     138              :     void        *db;
     139              :     char        errmsg[1024];
     140              :     int         errcode;
     141              :     
     142              :     char        *libversion;
     143              :     uint8_t     site_id[UUID_LEN];
     144              :     int         insync;
     145              :     int         debug;
     146              :     bool        merge_equal_values;
     147              :     void        *aux_data;
     148              :     
     149              :     // stmts and context values
     150              :     dbvm_t      *schema_version_stmt;
     151              :     dbvm_t      *data_version_stmt;
     152              :     dbvm_t      *db_version_stmt;
     153              :     dbvm_t      *getset_siteid_stmt;
     154              :     int         data_version;
     155              :     int         schema_version;
     156              :     uint64_t    schema_hash;
     157              :     
     158              :     // set at transaction start and reset on commit/rollback
     159              :     int64_t    db_version;
     160              :     // version the DB would have if the transaction committed now
     161              :     int64_t    pending_db_version;
     162              :     // used to set an order inside each transaction
     163              :     int        seq;
     164              :     
     165              :     // optional schema_name to be set in the cloudsync_table_context
     166              :     char       *current_schema;
     167              :     
     168              :     // augmented tables are stored in-memory so we do not need to retrieve information about
     169              :     // col_names and cid from the disk each time a write statement is performed
     170              :     // we do also not need to use an hash map here because for few tables the direct
     171              :     // in-memory comparison with table name is faster
     172              :     cloudsync_table_context **tables;   // dense vector: [0..tables_count-1] are valid
     173              :     int tables_count;                   // size
     174              :     int tables_cap;                     // capacity
     175              : 
     176              :     int skip_decode_idx;                // -1 in sqlite, col_value index in postgresql
     177              : 
     178              :     // deferred column-batch merge (active during payload_apply)
     179              :     merge_pending_batch *pending_batch;
     180              : };
     181              : 
     182              : struct cloudsync_table_context {
     183              :     table_algo  algo;                           // CRDT algoritm associated to the table
     184              :     char        *name;                          // table name
     185              :     char        *schema;                        // table schema
     186              :     char        *meta_ref;                      // schema-qualified meta table name (e.g. "schema"."name_cloudsync")
     187              :     char        *base_ref;                      // schema-qualified base table name (e.g. "schema"."name")
     188              :     char        **col_name;                     // array of column names
     189              :     dbvm_t      **col_merge_stmt;               // array of merge insert stmt (indexed by col_name)
     190              :     dbvm_t      **col_value_stmt;               // array of column value stmt (indexed by col_name)
     191              :     int         *col_id;                        // array of column id
     192              :     col_algo_t  *col_algo;                      // per-column algorithm (normal or block)
     193              :     char        **col_delimiter;                // per-column delimiter for block splitting (NULL = default "\n")
     194              :     bool        has_block_cols;                 // quick check: does this table have any block columns?
     195              :     dbvm_t      *block_value_read_stmt;         // SELECT col_value FROM blocks table
     196              :     dbvm_t      *block_value_write_stmt;        // INSERT OR REPLACE into blocks table
     197              :     dbvm_t      *block_value_delete_stmt;       // DELETE from blocks table
     198              :     dbvm_t      *block_list_stmt;               // SELECT block entries for materialization
     199              :     char        *blocks_ref;                    // schema-qualified blocks table name
     200              :     int         ncols;                          // number of non primary key cols
     201              :     int         npks;                           // number of primary key cols
     202              :     bool        enabled;                        // flag to check if a table is enabled or disabled
     203              :     #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
     204              :     bool        rowid_only;                     // a table with no primary keys other than the implicit rowid
     205              :     #endif
     206              :     
     207              :     char        **pk_name;                      // array of primary key names
     208              : 
     209              :     // precompiled statements
     210              :     dbvm_t      *meta_pkexists_stmt;            // check if a primary key already exist in the augmented table
     211              :     dbvm_t      *meta_sentinel_update_stmt;     // update a local sentinel row
     212              :     dbvm_t      *meta_sentinel_insert_stmt;     // insert a local sentinel row
     213              :     dbvm_t      *meta_row_insert_update_stmt;   // insert/update a local row
     214              :     dbvm_t      *meta_row_drop_stmt;            // delete rows from meta
     215              :     dbvm_t      *meta_update_move_stmt;         // update rows in meta when pk changes
     216              :     dbvm_t      *meta_local_cl_stmt;            // compute local cl value
     217              :     dbvm_t      *meta_winner_clock_stmt;        // get the rowid of the last inserted/updated row in the meta table
     218              :     dbvm_t      *meta_merge_delete_drop;
     219              :     dbvm_t      *meta_zero_clock_stmt;
     220              :     dbvm_t      *meta_col_version_stmt;
     221              :     dbvm_t      *meta_site_id_stmt;
     222              :     
     223              :     dbvm_t      *real_col_values_stmt;          // retrieve all column values based on pk
     224              :     dbvm_t      *real_merge_delete_stmt;
     225              :     dbvm_t      *real_merge_sentinel_stmt;
     226              :     
     227              :     bool        is_altering;                    // flag to track if a table alteration is in progress
     228              : 
     229              :     // context
     230              :     cloudsync_context *context;
     231              : };
     232              : 
     233              : struct cloudsync_payload_context {
     234              :     char        *buffer;
     235              :     size_t      bsize;
     236              :     size_t      balloc;
     237              :     size_t      bused;
     238              :     uint64_t    nrows;
     239              :     uint16_t    ncols;
     240              : };
     241              : 
     242              : #ifdef _MSC_VER
     243              :     #pragma pack(push, 1) // For MSVC: pack struct with 1-byte alignment
     244              :     #define PACKED
     245              : #else
     246              :     #define PACKED __attribute__((__packed__))
     247              : #endif
     248              : 
     249              : typedef struct PACKED {
     250              :     uint32_t    signature;          // 'CLSY'
     251              :     uint8_t     version;            // protocol version
     252              :     uint8_t     libversion[3];      // major.minor.patch
     253              :     uint32_t    expanded_size;
     254              :     uint16_t    ncols;
     255              :     uint32_t    nrows;
     256              :     uint64_t    schema_hash;
     257              :     uint8_t     checksum[6];        // 48 bits checksum (to ensure struct is 32 bytes)
     258              : } cloudsync_payload_header;
     259              : 
     260              : #ifdef _MSC_VER
     261              :     #pragma pack(pop)
     262              : #endif
     263              : 
     264              : #if CLOUDSYNC_UNITTEST
     265              : bool force_uncompressed_blob = false;
     266              : #define CHECK_FORCE_UNCOMPRESSED_BUFFER()   if (force_uncompressed_blob) use_uncompressed_buffer = true
     267              : #else
     268              : #define CHECK_FORCE_UNCOMPRESSED_BUFFER()
     269              : #endif
     270              : 
     271              : // Internal prototypes
     272              : int local_mark_insert_or_update_meta (cloudsync_table_context *table, const void *pk, size_t pklen, const char *col_name, int64_t db_version, int seq);
     273              : 
     274              : // MARK: - CRDT algos -
     275              : 
     276          620 : table_algo cloudsync_algo_from_name (const char *algo_name) {
     277          620 :     if (algo_name == NULL) return table_algo_none;
     278              :     
     279          620 :     if ((strcasecmp(algo_name, "CausalLengthSet") == 0) || (strcasecmp(algo_name, "cls") == 0)) return table_algo_crdt_cls;
     280          265 :     if ((strcasecmp(algo_name, "GrowOnlySet") == 0) || (strcasecmp(algo_name, "gos") == 0)) return table_algo_crdt_gos;
     281          258 :     if ((strcasecmp(algo_name, "DeleteWinsSet") == 0) || (strcasecmp(algo_name, "dws") == 0)) return table_algo_crdt_dws;
     282          257 :     if ((strcasecmp(algo_name, "AddWinsSet") == 0) || (strcasecmp(algo_name, "aws") == 0)) return table_algo_crdt_aws;
     283              :     
     284              :     // if nothing is found
     285          256 :     return table_algo_none;
     286          620 : }
     287              : 
     288          110 : const char *cloudsync_algo_name (table_algo algo) {
     289          110 :     switch (algo) {
     290          105 :         case table_algo_crdt_cls: return "cls";
     291            1 :         case table_algo_crdt_gos: return "gos";
     292            1 :         case table_algo_crdt_dws: return "dws";
     293            1 :         case table_algo_crdt_aws: return "aws";
     294            1 :         case table_algo_none: return NULL;
     295              :     }
     296            1 :     return NULL;
     297          110 : }
     298              : 
     299              : // MARK: - DBVM Utils -
     300              : 
     301        32013 : DBVM_VALUE dbvm_execute (dbvm_t *stmt, cloudsync_context *data) {
     302        32013 :     if (!stmt) return DBVM_VALUE_ERROR;
     303              : 
     304        32013 :     int rc = databasevm_step(stmt);
     305        32013 :     if (rc != DBRES_ROW && rc != DBRES_DONE) {
     306            2 :         if (data) DEBUG_DBERROR(rc, "stmt_execute", data);
     307            2 :         databasevm_reset(stmt);
     308            2 :         return DBVM_VALUE_ERROR;
     309              :     }
     310              : 
     311        32011 :     DBVM_VALUE result = DBVM_VALUE_CHANGED;
     312        32011 :     if (stmt == data->data_version_stmt) {
     313        29609 :         int version = (int)database_column_int(stmt, 0);
     314        29609 :         if (version != data->data_version) {
     315          224 :             data->data_version = version;
     316          224 :         } else {
     317        29385 :             result = DBVM_VALUE_UNCHANGED;
     318              :         }
     319        32011 :     } else if (stmt == data->schema_version_stmt) {
     320         1201 :         int version = (int)database_column_int(stmt, 0);
     321         1201 :         if (version > data->schema_version) {
     322          379 :             data->schema_version = version;
     323          379 :         } else {
     324          822 :             result = DBVM_VALUE_UNCHANGED;
     325              :         }
     326              :         
     327         2402 :     } else if (stmt == data->db_version_stmt) {
     328         1201 :         data->db_version = (rc == DBRES_DONE) ? CLOUDSYNC_MIN_DB_VERSION : database_column_int(stmt, 0);
     329         1201 :     }
     330              :     
     331        32011 :     databasevm_reset(stmt);
     332        32011 :     return result;
     333        32013 : }
     334              : 
     335         3587 : int dbvm_count (dbvm_t *stmt, const char *value, size_t len, int type) {
     336         3587 :     int result = -1;
     337         3587 :     int rc = DBRES_OK;
     338              :     
     339         3587 :     if (value) {
     340         3586 :         rc = (type == DBTYPE_TEXT) ? databasevm_bind_text(stmt, 1, value, (int)len) : databasevm_bind_blob(stmt, 1, value, len);
     341         3586 :         if (rc != DBRES_OK) goto cleanup;
     342         3586 :     }
     343              : 
     344         3587 :     rc = databasevm_step(stmt);
     345         7174 :     if (rc == DBRES_DONE) {
     346            1 :         result = 0;
     347            1 :         rc = DBRES_OK;
     348         3587 :     } else if (rc == DBRES_ROW) {
     349         3586 :         result = (int)database_column_int(stmt, 0);
     350         3586 :         rc = DBRES_OK;
     351         3586 :     }
     352              :     
     353              : cleanup:
     354         3587 :     databasevm_reset(stmt);
     355         3587 :     return result;
     356              : }
     357              : 
     358       255334 : void dbvm_reset (dbvm_t *stmt) {
     359       255334 :     if (!stmt) return;
     360       232386 :     databasevm_clear_bindings(stmt);
     361       232386 :     databasevm_reset(stmt);
     362       255334 : }
     363              : 
     364              : // MARK: - Database Version -
     365              : 
     366          669 : int cloudsync_dbversion_build_query (cloudsync_context *data, char **sql_out) {
     367              :     // this function must be manually called each time tables changes
     368              :     // because the query plan changes too and it must be re-prepared
     369              :     // unfortunately there is no other way
     370              : 
     371              :     // we need to execute a query like:
     372              :     /*
     373              :      SELECT max(version) as version FROM (
     374              :          SELECT max(db_version) as version FROM "table1_cloudsync"
     375              :          UNION ALL
     376              :          SELECT max(db_version) as version FROM "table2_cloudsync"
     377              :          UNION ALL
     378              :          SELECT max(db_version) as version FROM "table3_cloudsync"
     379              :          UNION
     380              :          SELECT value as version FROM cloudsync_settings WHERE key = 'pre_alter_dbversion'
     381              :      )
     382              :      */
     383              : 
     384              :     // the good news is that the query can be computed in SQLite without the need to do any extra computation from the host language
     385              : 
     386          669 :     *sql_out = NULL;
     387          669 :     return database_select_text(data, SQL_DBVERSION_BUILD_QUERY, sql_out);
     388              : }
     389              : 
     390          891 : int cloudsync_dbversion_rebuild (cloudsync_context *data) {
     391          891 :     if (data->db_version_stmt) {
     392          442 :         databasevm_finalize(data->db_version_stmt);
     393          442 :         data->db_version_stmt = NULL;
     394          442 :     }
     395              : 
     396          891 :     int64_t count = dbutils_table_settings_count_tables(data);
     397          891 :     if (count == 0) return DBRES_OK;
     398          669 :     else if (count == -1) return cloudsync_set_dberror(data);
     399              : 
     400          669 :     char *sql = NULL;
     401          669 :     int rc = cloudsync_dbversion_build_query(data, &sql);
     402          669 :     if (rc != DBRES_OK) return cloudsync_set_dberror(data);
     403              : 
     404              :     // A NULL SQL with rc == OK means the generator produced a NULL row:
     405              :     // sqlite_master has no *_cloudsync meta-tables (for example, the user
     406              :     // dropped the base table and its meta-table without calling
     407              :     // cloudsync_cleanup, leaving stale cloudsync_table_settings rows).
     408              :     // Treat this the same as count == 0: no prepared statement, db_version
     409              :     // stays at the minimum and will be rebuilt on the next cloudsync_init.
     410              :     // Genuine errors from database_select_text are handled above.
     411          668 :     if (!sql) return DBRES_OK;
     412              :     DEBUG_SQL("db_version_stmt: %s", sql);
     413              : 
     414          667 :     rc = databasevm_prepare(data, sql, (void **)&data->db_version_stmt, DBFLAG_PERSISTENT);
     415              :     DEBUG_STMT("db_version_stmt %p", data->db_version_stmt);
     416          667 :     cloudsync_memory_free(sql);
     417          667 :     return rc;
     418          891 : }
     419              : 
     420         1201 : int cloudsync_dbversion_rerun (cloudsync_context *data) {
     421         1201 :     DBVM_VALUE schema_changed = dbvm_execute(data->schema_version_stmt, data);
     422         1201 :     if (schema_changed == DBVM_VALUE_ERROR) return -1;
     423              : 
     424         1201 :     if (schema_changed == DBVM_VALUE_CHANGED) {
     425          379 :         int rc = cloudsync_dbversion_rebuild(data);
     426          379 :         if (rc != DBRES_OK) return -1;
     427          379 :     }
     428              : 
     429         1201 :     if (!data->db_version_stmt) {
     430            0 :         data->db_version = CLOUDSYNC_MIN_DB_VERSION;
     431            0 :         return 0;
     432              :     }
     433              : 
     434         1201 :     DBVM_VALUE rc = dbvm_execute(data->db_version_stmt, data);
     435         1201 :     if (rc == DBVM_VALUE_ERROR) return -1;
     436         1201 :     return 0;
     437         1201 : }
     438              : 
     439        29609 : int cloudsync_dbversion_check_uptodate (cloudsync_context *data) {
     440              :     // perform a PRAGMA data_version to check if some other process write any data
     441        29609 :     DBVM_VALUE rc = dbvm_execute(data->data_version_stmt, data);
     442        29609 :     if (rc == DBVM_VALUE_ERROR) return -1;
     443              :     
     444              :     // db_version is already set and there is no need to update it
     445        29609 :     if (data->db_version != CLOUDSYNC_VALUE_NOTSET && rc == DBVM_VALUE_UNCHANGED) return 0;
     446              :     
     447         1201 :     return cloudsync_dbversion_rerun(data);
     448        29609 : }
     449              : 
     450        29585 : int64_t cloudsync_dbversion_next (cloudsync_context *data, int64_t merging_version) {
     451        29585 :     int rc = cloudsync_dbversion_check_uptodate(data);
     452        29585 :     if (rc != DBRES_OK) return -1;
     453              :     
     454        29585 :     int64_t result = data->db_version + 1;
     455        29585 :     if (result < data->pending_db_version) result = data->pending_db_version;
     456        29585 :     if (merging_version != CLOUDSYNC_VALUE_NOTSET && result < merging_version) result = merging_version;
     457        29585 :     data->pending_db_version = result;
     458              :     
     459        29585 :     return result;
     460        29585 : }
     461              : 
     462              : // MARK: - PK Context -
     463              : 
     464            0 : char *cloudsync_pk_context_tbl (cloudsync_pk_decode_bind_context *ctx, int64_t *tbl_len) {
     465            0 :     *tbl_len = ctx->tbl_len;
     466            0 :     return ctx->tbl;
     467              : }
     468              : 
     469            0 : void *cloudsync_pk_context_pk (cloudsync_pk_decode_bind_context *ctx, int64_t *pk_len) {
     470            0 :     *pk_len = ctx->pk_len;
     471            0 :     return (void *)ctx->pk;
     472              : }
     473              : 
     474            0 : char *cloudsync_pk_context_colname (cloudsync_pk_decode_bind_context *ctx, int64_t *colname_len) {
     475            0 :     *colname_len = ctx->col_name_len;
     476            0 :     return ctx->col_name;
     477              : }
     478              : 
     479            0 : int64_t cloudsync_pk_context_cl (cloudsync_pk_decode_bind_context *ctx) {
     480            0 :     return ctx->cl;
     481              : }
     482              : 
     483            0 : int64_t cloudsync_pk_context_dbversion (cloudsync_pk_decode_bind_context *ctx) {
     484            0 :     return ctx->db_version;
     485              : }
     486              : 
     487              : // MARK: - CloudSync Context -
     488              : 
     489        12665 : int cloudsync_insync (cloudsync_context *data) {
     490        12665 :     return data->insync;
     491              : }
     492              : 
     493          796 : void *cloudsync_siteid (cloudsync_context *data) {
     494          796 :     return (void *)data->site_id;
     495              : }
     496              : 
     497            2 : void cloudsync_reset_siteid (cloudsync_context *data) {
     498            2 :     memset(data->site_id, 0, sizeof(uint8_t) * UUID_LEN);
     499            2 : }
     500              : 
     501          228 : int cloudsync_load_siteid (cloudsync_context *data) {
     502              :     // check if site_id was already loaded
     503          228 :     if (data->site_id[0] != 0) return DBRES_OK;
     504              :     
     505              :     // load site_id
     506          226 :     char *buffer = NULL;
     507          226 :     int64_t size = 0;
     508          226 :     int rc = database_select_blob(data, SQL_SITEID_SELECT_ROWID0, &buffer, &size);
     509          226 :     if (rc != DBRES_OK) return rc;
     510          226 :     if (!buffer || size != UUID_LEN) {
     511            0 :         if (buffer) cloudsync_memory_free(buffer);
     512            0 :         return cloudsync_set_error(data, "Unable to retrieve siteid", DBRES_MISUSE);
     513              :     }
     514              :     
     515          226 :     memcpy(data->site_id, buffer, UUID_LEN);
     516          226 :     cloudsync_memory_free(buffer);
     517              :     
     518          226 :     return DBRES_OK;
     519          228 : }
     520              : 
     521            1 : int64_t cloudsync_dbversion (cloudsync_context *data) {
     522            1 :     return data->db_version;
     523              : }
     524              : 
     525        12151 : int cloudsync_bumpseq (cloudsync_context *data) {
     526        12151 :     int value = data->seq;
     527        12151 :     data->seq += 1;
     528        12151 :     return value;
     529              : }
     530              : 
     531          285 : void cloudsync_update_schema_hash (cloudsync_context *data) {
     532          285 :     database_update_schema_hash(data, &data->schema_hash);
     533          285 : }
     534              : 
     535        62234 : void *cloudsync_db (cloudsync_context *data) {
     536        62234 :     return data->db;
     537              : }
     538              : 
     539          511 : int cloudsync_add_dbvms (cloudsync_context *data) {
     540              :     DEBUG_DBFUNCTION("cloudsync_add_stmts");
     541              :     
     542          511 :     if (data->data_version_stmt == NULL) {
     543          226 :         int rc = databasevm_prepare(data, SQL_DATA_VERSION, (void **)&data->data_version_stmt, DBFLAG_PERSISTENT);
     544              :         DEBUG_STMT("data_version_stmt %p", data->data_version_stmt);
     545          226 :         if (rc != DBRES_OK) return rc;
     546              :         DEBUG_SQL("data_version_stmt: %s", SQL_DATA_VERSION);
     547          226 :     }
     548              :     
     549          511 :     if (data->schema_version_stmt == NULL) {
     550          226 :         int rc = databasevm_prepare(data, SQL_SCHEMA_VERSION, (void **)&data->schema_version_stmt, DBFLAG_PERSISTENT);
     551              :         DEBUG_STMT("schema_version_stmt %p", data->schema_version_stmt);
     552          226 :         if (rc != DBRES_OK) return rc;
     553              :         DEBUG_SQL("schema_version_stmt: %s", SQL_SCHEMA_VERSION);
     554          226 :     }
     555              :     
     556          511 :     if (data->getset_siteid_stmt == NULL) {
     557              :         // get and set index of the site_id
     558              :         // in SQLite, we can’t directly combine an INSERT and a SELECT to both insert a row and return an identifier (rowid) in a single statement,
     559              :         // however, we can use a workaround by leveraging the INSERT statement with ON CONFLICT DO UPDATE and then combining it with RETURNING rowid
     560          226 :         int rc = databasevm_prepare(data, SQL_SITEID_GETSET_ROWID_BY_SITEID, (void **)&data->getset_siteid_stmt, DBFLAG_PERSISTENT);
     561              :         DEBUG_STMT("getset_siteid_stmt %p", data->getset_siteid_stmt);
     562          226 :         if (rc != DBRES_OK) return rc;
     563              :         DEBUG_SQL("getset_siteid_stmt: %s", SQL_SITEID_GETSET_ROWID_BY_SITEID);
     564          226 :     }
     565              :     
     566          511 :     return cloudsync_dbversion_rebuild(data);
     567          511 : }
     568              : 
     569           22 : int cloudsync_set_error (cloudsync_context *data, const char *err_user, int err_code) {
     570              :     // force err_code to be something different than OK
     571           22 :     if (err_code == DBRES_OK) err_code = database_errcode(data);
     572           22 :     if (err_code == DBRES_OK) err_code = DBRES_ERROR;
     573              :     
     574              :     // compute a meaningful error message
     575           22 :     if (err_user == NULL) {
     576            6 :         snprintf(data->errmsg, sizeof(data->errmsg), "%s", database_errmsg(data));
     577            6 :     } else {
     578           16 :         const char *db_error = database_errmsg(data);
     579              :         char db_error_copy[sizeof(data->errmsg)];
     580           16 :         int rc = database_errcode(data);
     581           16 :         if (rc == DBRES_OK) {
     582           16 :             snprintf(data->errmsg, sizeof(data->errmsg), "%s", err_user);
     583           16 :         } else {
     584            0 :             if (db_error == data->errmsg) {
     585            0 :                 snprintf(db_error_copy, sizeof(db_error_copy), "%s", db_error);
     586            0 :                 db_error = db_error_copy;
     587            0 :             }
     588            0 :             snprintf(data->errmsg, sizeof(data->errmsg), "%s (%s)", err_user, db_error);
     589              :         }
     590              :     }
     591              :     
     592           22 :     data->errcode = err_code;
     593           22 :     return err_code;
     594              : }
     595              : 
     596            6 : int cloudsync_set_dberror (cloudsync_context *data) {
     597            6 :     return cloudsync_set_error(data, NULL, DBRES_OK);
     598              : }
     599              : 
     600           13 : const char *cloudsync_errmsg (cloudsync_context *data) {
     601           13 :     return data->errmsg;
     602              : }
     603              : 
     604            4 : int cloudsync_errcode (cloudsync_context *data) {
     605            4 :     return data->errcode;
     606              : }
     607              : 
     608            2 : void cloudsync_reset_error (cloudsync_context *data) {
     609            2 :     data->errmsg[0] = 0;
     610            2 :     data->errcode = DBRES_OK;
     611            2 : }
     612              : 
     613            3 : void *cloudsync_auxdata (cloudsync_context *data) {
     614            3 :     return data->aux_data;
     615              : }
     616              : 
     617            2 : void cloudsync_set_auxdata (cloudsync_context *data, void *xdata) {
     618            2 :     data->aux_data = xdata;
     619            2 : }
     620              : 
     621            6 : void cloudsync_set_schema (cloudsync_context *data, const char *schema) {
     622            6 :     if (data->current_schema && schema && strcmp(data->current_schema, schema) == 0) return;
     623            5 :     if (data->current_schema) cloudsync_memory_free(data->current_schema);
     624            5 :     data->current_schema = NULL;
     625            5 :     if (schema) data->current_schema = cloudsync_string_dup_lowercase(schema);
     626            6 : }
     627              : 
     628         1284 : const char *cloudsync_schema (cloudsync_context *data) {
     629         1284 :     return data->current_schema;
     630              : }
     631              : 
     632            4 : const char *cloudsync_table_schema (cloudsync_context *data, const char *table_name) {
     633            4 :     cloudsync_table_context *table = table_lookup(data, table_name);
     634            4 :     if (!table) return NULL;
     635              : 
     636            1 :     return table->schema;
     637            4 : }
     638              : 
     639              : // MARK: - Table Utils -
     640              : 
     641           69 : void table_pknames_free (char **names, int nrows) {
     642           69 :     if (!names) return;
     643          132 :     for (int i = 0; i < nrows; ++i) {cloudsync_memory_free(names[i]);}
     644           46 :     cloudsync_memory_free(names);
     645           69 : }
     646              : 
     647          283 : char *table_build_mergedelete_sql (cloudsync_table_context *table) {
     648              :     #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
     649              :     if (table->rowid_only) {
     650              :         char *sql = memory_mprintf(SQL_DELETE_ROW_BY_ROWID, table->name);
     651              :         return sql;
     652              :     }
     653              :     #endif
     654              : 
     655          283 :     return sql_build_delete_by_pk(table->context, table->name, table->schema);
     656              : }
     657              : 
     658         1365 : char *table_build_mergeinsert_sql (cloudsync_table_context *table, const char *colname) {
     659         1365 :     char *sql = NULL;
     660              :     
     661              :     #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
     662              :     if (table->rowid_only) {
     663              :         if (colname == NULL) {
     664              :             // INSERT OR IGNORE INTO customers (first_name,last_name) VALUES (?,?);
     665              :             sql = memory_mprintf(SQL_INSERT_ROWID_IGNORE, table->name);
     666              :         } else {
     667              :             // INSERT INTO customers (first_name,last_name,age) VALUES (?,?,?) ON CONFLICT DO UPDATE SET age=?;
     668              :             sql = memory_mprintf(SQL_UPSERT_ROWID_AND_COL_BY_ROWID, table->name, colname, colname);
     669              :         }
     670              :         return sql;
     671              :     }
     672              :     #endif
     673              :     
     674         1365 :     if (colname == NULL) {
     675              :         // is sentinel insert
     676          283 :         sql = sql_build_insert_pk_ignore(table->context, table->name, table->schema);
     677          283 :     } else {
     678         1082 :         sql = sql_build_upsert_pk_and_col(table->context, table->name, colname, table->schema);
     679              :     }
     680         1365 :     return sql;
     681              : }
     682              : 
     683         1081 : char *table_build_value_sql (cloudsync_table_context *table, const char *colname) {
     684              :     #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
     685              :     if (table->rowid_only) {
     686              :         char *colnamequote = "\"";
     687              :         char *sql = memory_mprintf(SQL_SELECT_COLS_BY_ROWID_FMT, colnamequote, colname, colnamequote, table->name);
     688              :         return sql;
     689              :     }
     690              :     #endif
     691              :         
     692              :     // SELECT age FROM customers WHERE first_name=? AND last_name=?;
     693         1081 :     return sql_build_select_cols_by_pk(table->context, table->name, colname, table->schema);
     694              : }
     695              :     
     696          283 : cloudsync_table_context *table_create (cloudsync_context *data, const char *name, table_algo algo) {
     697              :     DEBUG_DBFUNCTION("table_create %s", name);
     698              :     
     699          283 :     cloudsync_table_context *table = (cloudsync_table_context *)cloudsync_memory_zeroalloc(sizeof(cloudsync_table_context));
     700          283 :     if (!table) return NULL;
     701              :     
     702          283 :     table->context = data;
     703          283 :     table->algo = algo;
     704          283 :     table->name = cloudsync_string_dup_lowercase(name);
     705              : 
     706              :     // Detect schema from metadata table location. If metadata table doesn't
     707              :     // exist yet (during initialization), fall back to cloudsync_schema() which
     708              :     // returns the explicitly set schema or current_schema().
     709          283 :     table->schema = database_table_schema(name);
     710          283 :     if (!table->schema) {
     711          283 :         const char *fallback_schema = cloudsync_schema(data);
     712          283 :         if (fallback_schema) {
     713            0 :             table->schema = cloudsync_string_dup(fallback_schema);
     714            0 :         }
     715          283 :     }
     716              : 
     717          283 :     if (!table->name) {
     718            0 :         cloudsync_memory_free(table);
     719            0 :         return NULL;
     720              :     }
     721          283 :     table->meta_ref = database_build_meta_ref(table->schema, table->name);
     722          283 :     table->base_ref = database_build_base_ref(table->schema, table->name);
     723          283 :     table->enabled = true;
     724              :         
     725          283 :     return table;
     726          283 : }
     727              : 
     728          283 : void table_free (cloudsync_table_context *table) {
     729              :     DEBUG_DBFUNCTION("table_free %s", (table) ? (table->name) : "NULL");
     730          283 :     if (!table) return;
     731              :     
     732          283 :     if (table->col_name) {
     733         1325 :         for (int i=0; i<table->ncols; ++i) {
     734         1081 :             cloudsync_memory_free(table->col_name[i]);
     735         1081 :         }
     736          244 :         cloudsync_memory_free(table->col_name);
     737          244 :     }
     738          283 :     if (table->col_merge_stmt) {
     739         1325 :         for (int i=0; i<table->ncols; ++i) {
     740         1081 :             databasevm_finalize(table->col_merge_stmt[i]);
     741         1081 :         }
     742          244 :         cloudsync_memory_free(table->col_merge_stmt);
     743          244 :     }
     744          283 :     if (table->col_value_stmt) {
     745         1325 :         for (int i=0; i<table->ncols; ++i) {
     746         1081 :             databasevm_finalize(table->col_value_stmt[i]);
     747         1081 :         }
     748          244 :         cloudsync_memory_free(table->col_value_stmt);
     749          244 :     }
     750          283 :     if (table->col_id) {
     751          244 :         cloudsync_memory_free(table->col_id);
     752          244 :     }
     753          283 :     if (table->col_algo) {
     754          244 :         cloudsync_memory_free(table->col_algo);
     755          244 :     }
     756          283 :     if (table->col_delimiter) {
     757         1325 :         for (int i=0; i<table->ncols; ++i) {
     758         1081 :             if (table->col_delimiter[i]) cloudsync_memory_free(table->col_delimiter[i]);
     759         1081 :         }
     760          244 :         cloudsync_memory_free(table->col_delimiter);
     761          244 :     }
     762              : 
     763          283 :     if (table->block_value_read_stmt) databasevm_finalize(table->block_value_read_stmt);
     764          283 :     if (table->block_value_write_stmt) databasevm_finalize(table->block_value_write_stmt);
     765          283 :     if (table->block_value_delete_stmt) databasevm_finalize(table->block_value_delete_stmt);
     766          283 :     if (table->block_list_stmt) databasevm_finalize(table->block_list_stmt);
     767          283 :     if (table->blocks_ref) cloudsync_memory_free(table->blocks_ref);
     768              : 
     769          283 :     if (table->name) cloudsync_memory_free(table->name);
     770          283 :     if (table->schema) cloudsync_memory_free(table->schema);
     771          283 :     if (table->meta_ref) cloudsync_memory_free(table->meta_ref);
     772          283 :     if (table->base_ref) cloudsync_memory_free(table->base_ref);
     773          283 :     if (table->pk_name) table_pknames_free(table->pk_name, table->npks);
     774          283 :     if (table->meta_pkexists_stmt) databasevm_finalize(table->meta_pkexists_stmt);
     775          283 :     if (table->meta_sentinel_update_stmt) databasevm_finalize(table->meta_sentinel_update_stmt);
     776          283 :     if (table->meta_sentinel_insert_stmt) databasevm_finalize(table->meta_sentinel_insert_stmt);
     777          283 :     if (table->meta_row_insert_update_stmt) databasevm_finalize(table->meta_row_insert_update_stmt);
     778          283 :     if (table->meta_row_drop_stmt) databasevm_finalize(table->meta_row_drop_stmt);
     779          283 :     if (table->meta_update_move_stmt) databasevm_finalize(table->meta_update_move_stmt);
     780          283 :     if (table->meta_local_cl_stmt) databasevm_finalize(table->meta_local_cl_stmt);
     781          283 :     if (table->meta_winner_clock_stmt) databasevm_finalize(table->meta_winner_clock_stmt);
     782          283 :     if (table->meta_merge_delete_drop) databasevm_finalize(table->meta_merge_delete_drop);
     783          283 :     if (table->meta_zero_clock_stmt) databasevm_finalize(table->meta_zero_clock_stmt);
     784          283 :     if (table->meta_col_version_stmt) databasevm_finalize(table->meta_col_version_stmt);
     785          283 :     if (table->meta_site_id_stmt) databasevm_finalize(table->meta_site_id_stmt);
     786              :     
     787          283 :     if (table->real_col_values_stmt) databasevm_finalize(table->real_col_values_stmt);
     788          283 :     if (table->real_merge_delete_stmt) databasevm_finalize(table->real_merge_delete_stmt);
     789          283 :     if (table->real_merge_sentinel_stmt) databasevm_finalize(table->real_merge_sentinel_stmt);
     790              :     
     791          283 :     cloudsync_memory_free(table);
     792          283 : }
     793              : 
     794          283 : int table_add_stmts (cloudsync_table_context *table, int ncols) {
     795          283 :     int rc = DBRES_OK;
     796          283 :     char *sql = NULL;
     797          283 :     cloudsync_context *data = table->context;
     798              :     
     799              :     // META TABLE statements
     800              :     
     801              :     // CREATE TABLE IF NOT EXISTS \"%w_cloudsync\" (pk BLOB NOT NULL, col_name TEXT NOT NULL, col_version INTEGER, db_version INTEGER, site_id INTEGER DEFAULT 0, seq INTEGER, PRIMARY KEY (pk, col_name));
     802              :     
     803              :     // precompile the pk exists statement
     804              :     // we do not need an index on the pk column because it is already covered by the fact that it is part of the prikeys
     805              :     // EXPLAIN QUERY PLAN reports: SEARCH table_name USING PRIMARY KEY (pk=?)
     806          283 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_ROW_EXISTS_BY_PK, table->meta_ref);
     807          283 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     808              :     DEBUG_SQL("meta_pkexists_stmt: %s", sql);
     809              : 
     810          283 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_pkexists_stmt, DBFLAG_PERSISTENT);
     811          283 :     cloudsync_memory_free(sql);
     812          283 :     if (rc != DBRES_OK) goto cleanup;
     813              : 
     814              :     // precompile the update local sentinel statement
     815          283 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_UPDATE_COL_BUMP_VERSION, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
     816          283 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     817              :     DEBUG_SQL("meta_sentinel_update_stmt: %s", sql);
     818              :     
     819          283 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_sentinel_update_stmt, DBFLAG_PERSISTENT);
     820          283 :     cloudsync_memory_free(sql);
     821          283 :     if (rc != DBRES_OK) goto cleanup;
     822              :     
     823              :     // precompile the insert local sentinel statement
     824          283 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_UPSERT_COL_INIT_OR_BUMP_VERSION, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE, table->meta_ref, table->meta_ref, table->meta_ref);
     825          283 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     826              :     DEBUG_SQL("meta_sentinel_insert_stmt: %s", sql);
     827              :     
     828          283 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_sentinel_insert_stmt, DBFLAG_PERSISTENT);
     829          283 :     cloudsync_memory_free(sql);
     830          283 :     if (rc != DBRES_OK) goto cleanup;
     831              : 
     832              :     // precompile the insert/update local row statement
     833          283 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_UPSERT_RAW_COLVERSION, table->meta_ref, table->meta_ref);
     834          283 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     835              :     DEBUG_SQL("meta_row_insert_update_stmt: %s", sql);
     836              :     
     837          283 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_row_insert_update_stmt, DBFLAG_PERSISTENT);
     838          283 :     cloudsync_memory_free(sql);
     839          283 :     if (rc != DBRES_OK) goto cleanup;
     840              :     
     841              :     // precompile the delete rows from meta
     842          283 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_DELETE_PK_EXCEPT_COL, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
     843          283 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     844              :     DEBUG_SQL("meta_row_drop_stmt: %s", sql);
     845              : 
     846          283 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_row_drop_stmt, DBFLAG_PERSISTENT);
     847          283 :     cloudsync_memory_free(sql);
     848          283 :     if (rc != DBRES_OK) goto cleanup;
     849              :     
     850              :     // precompile the update rows from meta when pk changes
     851              :     // see https://github.com/sqliteai/sqlite-sync/blob/main/docs/PriKey.md for more details
     852          283 :     sql = sql_build_rekey_pk_and_reset_version_except_col(data, table->name, CLOUDSYNC_TOMBSTONE_VALUE);
     853          283 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     854              :     DEBUG_SQL("meta_update_move_stmt: %s", sql);
     855              :     
     856          283 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_update_move_stmt, DBFLAG_PERSISTENT);
     857          283 :     cloudsync_memory_free(sql);
     858          283 :     if (rc != DBRES_OK) goto cleanup;
     859              :     
     860              :     // local cl
     861          283 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_GET_COL_VERSION_OR_ROW_EXISTS, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE, table->meta_ref);
     862          283 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     863              :     DEBUG_SQL("meta_local_cl_stmt: %s", sql);
     864              :     
     865          283 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_local_cl_stmt, DBFLAG_PERSISTENT);
     866          283 :     cloudsync_memory_free(sql);
     867          283 :     if (rc != DBRES_OK) goto cleanup;
     868              :     
     869              :     // rowid of the last inserted/updated row in the meta table
     870          283 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_INSERT_RETURN_CHANGE_ID, table->meta_ref);
     871          283 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     872              :     DEBUG_SQL("meta_winner_clock_stmt: %s", sql);
     873              :     
     874          283 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_winner_clock_stmt, DBFLAG_PERSISTENT);
     875          283 :     cloudsync_memory_free(sql);
     876          283 :     if (rc != DBRES_OK) goto cleanup;
     877              :     
     878          283 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_DELETE_PK_EXCEPT_COL, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
     879          283 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     880              :     DEBUG_SQL("meta_merge_delete_drop: %s", sql);
     881              : 
     882          283 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_merge_delete_drop, DBFLAG_PERSISTENT);
     883          283 :     cloudsync_memory_free(sql);
     884          283 :     if (rc != DBRES_OK) goto cleanup;
     885              :     
     886              :     // zero clock
     887          283 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_TOMBSTONE_PK_EXCEPT_COL, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
     888          283 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     889              :     DEBUG_SQL("meta_zero_clock_stmt: %s", sql);
     890              :     
     891          283 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_zero_clock_stmt, DBFLAG_PERSISTENT);
     892          283 :     cloudsync_memory_free(sql);
     893          283 :     if (rc != DBRES_OK) goto cleanup;
     894              :     
     895              :     // col_version
     896          283 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_COL_VERSION_BY_PK_COL, table->meta_ref);
     897          283 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     898              :     DEBUG_SQL("meta_col_version_stmt: %s", sql);
     899              :     
     900          283 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_col_version_stmt, DBFLAG_PERSISTENT);
     901          283 :     cloudsync_memory_free(sql);
     902          283 :     if (rc != DBRES_OK) goto cleanup;
     903              :     
     904              :     // site_id
     905          283 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_SITE_ID_BY_PK_COL, table->meta_ref);
     906          283 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     907              :     DEBUG_SQL("meta_site_id_stmt: %s", sql);
     908              :     
     909          283 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_site_id_stmt, DBFLAG_PERSISTENT);
     910          283 :     cloudsync_memory_free(sql);
     911          283 :     if (rc != DBRES_OK) goto cleanup;
     912              :     
     913              :     // REAL TABLE statements
     914              : 
     915              :     // precompile the get column value statement
     916          283 :     if (ncols > 0) {
     917          244 :         sql = sql_build_select_nonpk_by_pk(data, table->name, table->schema);
     918          244 :         if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     919              :         DEBUG_SQL("real_col_values_stmt: %s", sql);
     920              :         
     921          244 :         rc = databasevm_prepare(data, sql, (void **)&table->real_col_values_stmt, DBFLAG_PERSISTENT);
     922          244 :         cloudsync_memory_free(sql);
     923          244 :         if (rc != DBRES_OK) goto cleanup;
     924          244 :     }
     925              :     
     926          283 :     sql = table_build_mergedelete_sql(table);
     927          283 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     928              :     DEBUG_SQL("real_merge_delete: %s", sql);
     929              :     
     930          283 :     rc = databasevm_prepare(data, sql, (void **)&table->real_merge_delete_stmt, DBFLAG_PERSISTENT);
     931          283 :     cloudsync_memory_free(sql);
     932          283 :     if (rc != DBRES_OK) goto cleanup;
     933              :     
     934          283 :     sql = table_build_mergeinsert_sql(table, NULL);
     935          283 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     936              :     DEBUG_SQL("real_merge_sentinel: %s", sql);
     937              :     
     938          283 :     rc = databasevm_prepare(data, sql, (void **)&table->real_merge_sentinel_stmt, DBFLAG_PERSISTENT);
     939          283 :     cloudsync_memory_free(sql);
     940          283 :     if (rc != DBRES_OK) goto cleanup;
     941              :     
     942              : cleanup:
     943          283 :     if (rc != DBRES_OK) DEBUG_ALWAYS("table_add_stmts error: %d %s\n", rc, database_errmsg(data));
     944          283 :     return rc;
     945              : }
     946              : 
     947       183996 : cloudsync_table_context *table_lookup (cloudsync_context *data, const char *table_name) {
     948              :     DEBUG_DBFUNCTION("table_lookup %s", table_name);
     949              :     
     950       183996 :     if (table_name) {
     951       185620 :         for (int i=0; i<data->tables_count; ++i) {
     952       185043 :             if ((strcasecmp(data->tables[i]->name, table_name) == 0)) return data->tables[i];
     953         1624 :         }
     954          577 :     }
     955              :     
     956          577 :     return NULL;
     957       183996 : }
     958              : 
     959       169196 : void *table_column_lookup (cloudsync_table_context *table, const char *col_name, bool is_merge, int *index) {
     960              :     DEBUG_DBFUNCTION("table_column_lookup %s", col_name);
     961              :     
     962       315284 :     for (int i=0; i<table->ncols; ++i) {
     963       315284 :         if (strcasecmp(table->col_name[i], col_name) == 0) {
     964       169196 :             if (index) *index = i;
     965       169196 :             return (is_merge) ? table->col_merge_stmt[i] : table->col_value_stmt[i];
     966              :         }
     967       146088 :     }
     968              :     
     969            0 :     if (index) *index = -1;
     970            0 :     return NULL;
     971       169196 : }
     972              : 
     973          282 : int table_remove (cloudsync_context *data, cloudsync_table_context *table) {
     974          282 :     const char *table_name = table->name;
     975              :     DEBUG_DBFUNCTION("table_remove %s", table_name);
     976              :     
     977          332 :     for (int i = 0; i < data->tables_count; ++i) {
     978          332 :         cloudsync_table_context *t = data->tables[i];
     979              :         
     980              :         // pointer compare is fastest but fallback to strcasecmp if not same pointer
     981          332 :         if ((t == table) || ((strcasecmp(t->name, table_name) == 0))) {
     982          282 :             int last = data->tables_count - 1;
     983          282 :             data->tables[i] = data->tables[last];   // move last into the hole (keeps array dense)
     984          282 :             data->tables[last] = NULL;              // NULLify tail (as an extra security measure)
     985          282 :             data->tables_count--;
     986          282 :             return data->tables_count;
     987              :         }
     988           50 :     }
     989              :     
     990            0 :     return -1;
     991          282 : }
     992              : 
     993         1082 : int table_add_to_context_cb (void *xdata, int ncols, char **values, char **names) {
     994         1082 :     cloudsync_table_context *table = (cloudsync_table_context *)xdata;
     995         1082 :     cloudsync_context *data = table->context;
     996              : 
     997         1082 :     int index = table->ncols;
     998         2163 :     for (int i=0; i<ncols; i+=2) {
     999         1082 :         const char *name = values[i];
    1000         1082 :         int cid = (int)strtol(values[i+1], NULL, 0);
    1001              : 
    1002         1082 :         table->col_id[index] = cid;
    1003         1082 :         table->col_name[index] = cloudsync_string_dup_lowercase(name);
    1004         1082 :         if (!table->col_name[index]) goto error;
    1005              : 
    1006         1082 :         char *sql = table_build_mergeinsert_sql(table, name);
    1007         1082 :         if (!sql) goto error;
    1008              :         DEBUG_SQL("col_merge_stmt[%d]: %s", index, sql);
    1009              : 
    1010         1082 :         int rc = databasevm_prepare(data, sql, (void **)&table->col_merge_stmt[index], DBFLAG_PERSISTENT);
    1011         1082 :         cloudsync_memory_free(sql);
    1012         1082 :         if (rc != DBRES_OK) goto error;
    1013         1081 :         if (!table->col_merge_stmt[index]) goto error;
    1014              : 
    1015         1081 :         sql = table_build_value_sql(table, name);
    1016         1081 :         if (!sql) goto error;
    1017              :         DEBUG_SQL("col_value_stmt[%d]: %s", index, sql);
    1018              : 
    1019         1081 :         rc = databasevm_prepare(data, sql, (void **)&table->col_value_stmt[index], DBFLAG_PERSISTENT);
    1020         1081 :         cloudsync_memory_free(sql);
    1021         1081 :         if (rc != DBRES_OK) goto error;
    1022         1081 :         if (!table->col_value_stmt[index]) goto error;
    1023         1081 :     }
    1024         1081 :     table->ncols += 1;
    1025              : 
    1026         1081 :     return 0;
    1027              : 
    1028              : error:
    1029              :     // clean up partially-initialized entry at index
    1030            1 :     if (table->col_name[index]) {cloudsync_memory_free(table->col_name[index]); table->col_name[index] = NULL;}
    1031            1 :     if (table->col_merge_stmt[index]) {databasevm_finalize(table->col_merge_stmt[index]); table->col_merge_stmt[index] = NULL;}
    1032            1 :     if (table->col_value_stmt[index]) {databasevm_finalize(table->col_value_stmt[index]); table->col_value_stmt[index] = NULL;}
    1033            1 :     return 1;
    1034         1082 : }
    1035              : 
    1036          283 : bool table_ensure_capacity (cloudsync_context *data) {
    1037          283 :     if (data->tables_count < data->tables_cap) return true;
    1038              :     
    1039            0 :     int new_cap = data->tables_cap ? data->tables_cap * 2 : CLOUDSYNC_INIT_NTABLES;
    1040            0 :     size_t bytes = (size_t)new_cap * sizeof(*data->tables);
    1041            0 :     void *p = cloudsync_memory_realloc(data->tables, bytes);
    1042            0 :     if (!p) return false;
    1043              :     
    1044            0 :     data->tables = (cloudsync_table_context **)p;
    1045            0 :     data->tables_cap = new_cap;
    1046            0 :     return true;
    1047          283 : }
    1048              : 
    1049          288 : bool table_add_to_context (cloudsync_context *data, table_algo algo, const char *table_name) {
    1050              :     DEBUG_DBFUNCTION("cloudsync_context_add_table %s", table_name);
    1051              : 
    1052              :     // Check if table already initialized in this connection's context.
    1053              :     // Note: This prevents same-connection duplicate initialization.
    1054              :     // SQLite clients cannot distinguish schemas, so having 'public.users'
    1055              :     // and 'auth.users' would cause sync ambiguity. Users should avoid
    1056              :     // initializing tables with the same name in different schemas.
    1057              :     // If two concurrent connections initialize tables with the same name
    1058              :     // in different schemas, the behavior is undefined.
    1059          288 :     cloudsync_table_context *table = table_lookup(data, table_name);
    1060          288 :     if (table) return true;
    1061              :     
    1062              :     // check for space availability
    1063          283 :     if (!table_ensure_capacity(data)) return false;
    1064              :     
    1065              :     // setup a new table
    1066          283 :     table = table_create(data, table_name, algo);
    1067          283 :     if (!table) return false;
    1068              :     
    1069              :     // fill remaining metadata in the table
    1070          283 :     int count = database_count_pk(data, table_name, false, table->schema);
    1071          283 :     if (count < 0) {cloudsync_set_dberror(data); goto abort_add_table;}
    1072          283 :     table->npks = count;
    1073          283 :     if (table->npks == 0) {
    1074              :         #if CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
    1075            0 :         goto abort_add_table;
    1076              :         #else
    1077              :         table->rowid_only = true;
    1078              :         table->npks = 1; // rowid
    1079              :         #endif
    1080              :     }
    1081              :     
    1082          283 :     int ncols = database_count_nonpk(data, table_name, table->schema);
    1083          283 :     if (ncols < 0) {cloudsync_set_dberror(data); goto abort_add_table;}
    1084          283 :     int rc = table_add_stmts(table, ncols);
    1085          283 :     if (rc != DBRES_OK) goto abort_add_table;
    1086              :     
    1087              :     // a table with only pk(s) is totally legal
    1088          283 :     if (ncols > 0) {
    1089          244 :         table->col_name = (char **)cloudsync_memory_zeroalloc((uint64_t)(sizeof(char *) * ncols));
    1090          244 :         if (!table->col_name) goto abort_add_table;
    1091              : 
    1092          244 :         table->col_id = (int *)cloudsync_memory_zeroalloc((uint64_t)(sizeof(int) * ncols));
    1093          244 :         if (!table->col_id) goto abort_add_table;
    1094              : 
    1095          244 :         table->col_merge_stmt = (dbvm_t **)cloudsync_memory_zeroalloc((uint64_t)(sizeof(void *) * ncols));
    1096          244 :         if (!table->col_merge_stmt) goto abort_add_table;
    1097              : 
    1098          244 :         table->col_value_stmt = (dbvm_t **)cloudsync_memory_zeroalloc((uint64_t)(sizeof(void *) * ncols));
    1099          244 :         if (!table->col_value_stmt) goto abort_add_table;
    1100              : 
    1101          244 :         table->col_algo = (col_algo_t *)cloudsync_memory_zeroalloc((uint64_t)(sizeof(col_algo_t) * ncols));
    1102          244 :         if (!table->col_algo) goto abort_add_table;
    1103              : 
    1104          244 :         table->col_delimiter = (char **)cloudsync_memory_zeroalloc((uint64_t)(sizeof(char *) * ncols));
    1105          244 :         if (!table->col_delimiter) goto abort_add_table;
    1106              : 
    1107              :         // Pass empty string when schema is NULL; SQL will fall back to current_schema()
    1108          244 :         const char *schema = table->schema ? table->schema : "";
    1109          488 :         char *sql = cloudsync_memory_mprintf(SQL_PRAGMA_TABLEINFO_LIST_NONPK_NAME_CID,
    1110          244 :                                               table_name, schema, table_name, schema);
    1111          244 :         if (!sql) goto abort_add_table;
    1112          244 :         rc = database_exec_callback(data, sql, table_add_to_context_cb, (void *)table);
    1113          244 :         cloudsync_memory_free(sql);
    1114          244 :         if (rc == DBRES_ABORT) goto abort_add_table;
    1115          243 :     }
    1116              :     
    1117              :     // append newly created table
    1118          282 :     data->tables[data->tables_count++] = table;
    1119          282 :     return true;
    1120              :     
    1121              : abort_add_table:
    1122            1 :     table_free(table);
    1123            1 :     return false;
    1124          288 : }
    1125              : 
    1126            0 : dbvm_t *cloudsync_colvalue_stmt (cloudsync_context *data, const char *tbl_name, bool *persistent) {
    1127            0 :     dbvm_t *vm = NULL;
    1128            0 :     *persistent = false;
    1129              : 
    1130            0 :     cloudsync_table_context *table = table_lookup(data, tbl_name);
    1131            0 :     if (table) {
    1132            0 :         char *col_name = NULL;
    1133            0 :         if (table->ncols > 0) {
    1134            0 :             col_name = table->col_name[0];
    1135              :             // retrieve col_value precompiled statement
    1136            0 :             vm = table_column_lookup(table, col_name, false, NULL);
    1137            0 :             *persistent = true;
    1138            0 :         } else {
    1139            0 :             char *sql = table_build_value_sql(table, "*");
    1140            0 :             databasevm_prepare(data, sql, (void **)&vm, 0);
    1141            0 :             cloudsync_memory_free(sql);
    1142            0 :             *persistent = false;
    1143              :         }
    1144            0 :     }
    1145              :     
    1146            0 :     return vm;
    1147              : }
    1148              : 
    1149         6876 : bool table_enabled (cloudsync_table_context *table) {
    1150         6876 :     return table->enabled;
    1151              : }
    1152              : 
    1153            6 : void table_set_enabled (cloudsync_table_context *table, bool value) {
    1154            6 :     table->enabled = value;
    1155            6 : }
    1156              : 
    1157        19850 : int table_count_cols (cloudsync_table_context *table) {
    1158        19850 :     return table->ncols;
    1159              : }
    1160              : 
    1161         6766 : int table_count_pks (cloudsync_table_context *table) {
    1162         6766 :     return table->npks;
    1163              : }
    1164              : 
    1165        32916 : const char *table_colname (cloudsync_table_context *table, int index) {
    1166        32916 :     return table->col_name[index];
    1167              : }
    1168              : 
    1169         3586 : bool table_pk_exists (cloudsync_table_context *table, const char *value, size_t len) {
    1170              :     // check if a row with the same primary key already exists
    1171              :     // if so, this means the row might have been previously deleted (sentinel)
    1172         3586 :     return (dbvm_count(table->meta_pkexists_stmt, value, len, DBTYPE_BLOB) > 0);
    1173              : }
    1174              : 
    1175            0 : char **table_pknames (cloudsync_table_context *table) {
    1176            0 :     return table->pk_name;
    1177              : }
    1178              : 
    1179           23 : void table_set_pknames (cloudsync_table_context *table, char **pknames) {
    1180           23 :     table_pknames_free(table->pk_name, table->npks);
    1181           23 :     table->pk_name = pknames;
    1182           23 : }
    1183              : 
    1184        49228 : bool table_algo_isgos (cloudsync_table_context *table) {
    1185        49228 :     return (table->algo == table_algo_crdt_gos);
    1186              : }
    1187              : 
    1188            0 : const char *table_schema (cloudsync_table_context *table) {
    1189            0 :     return table->schema;
    1190              : }
    1191              : 
    1192              : // MARK: - Merge Insert -
    1193              : 
    1194        46078 : int64_t merge_get_local_cl (cloudsync_table_context *table, const char *pk, int pklen) {
    1195        46078 :     dbvm_t *vm = table->meta_local_cl_stmt;
    1196        46078 :     int64_t result = -1;
    1197              :     
    1198        46078 :     int rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
    1199        46078 :     if (rc != DBRES_OK) goto cleanup;
    1200              :     
    1201        46078 :     rc = databasevm_bind_blob(vm, 2, (const void *)pk, pklen);
    1202        46078 :     if (rc != DBRES_OK) goto cleanup;
    1203              :     
    1204        46078 :     rc = databasevm_step(vm);
    1205        46078 :     if (rc == DBRES_ROW) result = database_column_int(vm, 0);
    1206            0 :     else if (rc == DBRES_DONE) result = 0;
    1207              :     
    1208              : cleanup:
    1209        46078 :     if (result == -1) cloudsync_set_dberror(table->context);
    1210        46078 :     dbvm_reset(vm);
    1211        46078 :     return result;
    1212              : }
    1213              : 
    1214        45051 : int merge_get_col_version (cloudsync_table_context *table, const char *col_name, const char *pk, int pklen, int64_t *version) {
    1215        45051 :     dbvm_t *vm = table->meta_col_version_stmt;
    1216              :     
    1217        45051 :     int rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
    1218        45051 :     if (rc != DBRES_OK) goto cleanup;
    1219              :     
    1220        45051 :     rc = databasevm_bind_text(vm, 2, col_name, -1);
    1221        45051 :     if (rc != DBRES_OK) goto cleanup;
    1222              :     
    1223        45051 :     rc = databasevm_step(vm);
    1224        69976 :     if (rc == DBRES_ROW) {
    1225        24925 :         *version = database_column_int(vm, 0);
    1226        24925 :         rc = DBRES_OK;
    1227        24925 :     }
    1228              :     
    1229              : cleanup:
    1230        45051 :     if ((rc != DBRES_OK) && (rc != DBRES_DONE)) cloudsync_set_dberror(table->context);
    1231        45051 :     dbvm_reset(vm);
    1232        45051 :     return rc;
    1233              : }
    1234              : 
    1235        25136 : int merge_set_winner_clock (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pk_len, const char *colname, int64_t col_version, int64_t db_version, const char *site_id, int site_len, int64_t seq, int64_t *rowid) {
    1236              :     
    1237              :     // get/set site_id
    1238        25136 :     dbvm_t *vm = data->getset_siteid_stmt;
    1239        25136 :     int rc = databasevm_bind_blob(vm, 1, (const void *)site_id, site_len);
    1240        25136 :     if (rc != DBRES_OK) goto cleanup_merge;
    1241              :     
    1242        25136 :     rc = databasevm_step(vm);
    1243        25136 :     if (rc != DBRES_ROW) goto cleanup_merge;
    1244              :     
    1245        25136 :     int64_t ord = database_column_int(vm, 0);
    1246        25136 :     dbvm_reset(vm);
    1247              :     
    1248        25136 :     vm = table->meta_winner_clock_stmt;
    1249        25136 :     rc = databasevm_bind_blob(vm, 1, (const void *)pk, pk_len);
    1250        25136 :     if (rc != DBRES_OK) goto cleanup_merge;
    1251              :     
    1252        25136 :     rc = databasevm_bind_text(vm, 2, (colname) ? colname : CLOUDSYNC_TOMBSTONE_VALUE, -1);
    1253        25136 :     if (rc != DBRES_OK) goto cleanup_merge;
    1254              :     
    1255        25136 :     rc = databasevm_bind_int(vm, 3, col_version);
    1256        25136 :     if (rc != DBRES_OK) goto cleanup_merge;
    1257              :     
    1258        25136 :     rc = databasevm_bind_int(vm, 4, db_version);
    1259        25136 :     if (rc != DBRES_OK) goto cleanup_merge;
    1260              :     
    1261        25136 :     rc = databasevm_bind_int(vm, 5, seq);
    1262        25136 :     if (rc != DBRES_OK) goto cleanup_merge;
    1263              :     
    1264        25136 :     rc = databasevm_bind_int(vm, 6, ord);
    1265        25136 :     if (rc != DBRES_OK) goto cleanup_merge;
    1266              :     
    1267        25136 :     rc = databasevm_step(vm);
    1268        50272 :     if (rc == DBRES_ROW) {
    1269        25136 :         *rowid = database_column_int(vm, 0);
    1270        25136 :         rc = DBRES_OK;
    1271        25136 :     }
    1272              :     
    1273              : cleanup_merge:
    1274        25136 :     if (rc != DBRES_OK) cloudsync_set_dberror(data);
    1275        25136 :     dbvm_reset(vm);
    1276        25136 :     return rc;
    1277              : }
    1278              : 
    1279              : // MARK: - Deferred column-batch merge functions -
    1280              : 
    1281        21337 : static int merge_pending_add (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, const char *col_name, dbvalue_t *col_value, int64_t col_version, int64_t db_version, const char *site_id, int site_len, int64_t seq) {
    1282        21337 :     merge_pending_batch *batch = data->pending_batch;
    1283              : 
    1284              :     // Store table and PK on first entry
    1285        21337 :     if (batch->table == NULL) {
    1286         7699 :         batch->table = table;
    1287         7699 :         batch->pk = (char *)cloudsync_memory_alloc(pklen);
    1288         7699 :         if (!batch->pk) return cloudsync_set_error(data, "merge_pending_add: out of memory for pk", DBRES_NOMEM);
    1289         7699 :         memcpy(batch->pk, pk, pklen);
    1290         7699 :         batch->pk_len = pklen;
    1291         7699 :     }
    1292              : 
    1293              :     // Ensure capacity
    1294        21337 :     if (batch->count >= batch->capacity) {
    1295          504 :         int new_cap = batch->capacity ? batch->capacity * 2 : 8;
    1296          504 :         merge_pending_entry *new_entries = (merge_pending_entry *)cloudsync_memory_realloc(batch->entries, new_cap * sizeof(merge_pending_entry));
    1297          504 :         if (!new_entries) return cloudsync_set_error(data, "merge_pending_add: out of memory for entries", DBRES_NOMEM);
    1298          504 :         batch->entries = new_entries;
    1299          504 :         batch->capacity = new_cap;
    1300          504 :     }
    1301              : 
    1302              :     // Resolve col_name to a stable pointer from the table context
    1303              :     // (the incoming col_name may point to VM-owned memory that gets freed on reset)
    1304        21337 :     int col_idx = -1;
    1305        21337 :     table_column_lookup(table, col_name, true, &col_idx);
    1306        21337 :     const char *stable_col_name = (col_idx >= 0) ? table_colname(table, col_idx) : NULL;
    1307        21337 :     if (!stable_col_name) return cloudsync_set_error(data, "merge_pending_add: column not found in table context", DBRES_ERROR);
    1308              : 
    1309        21337 :     merge_pending_entry *e = &batch->entries[batch->count];
    1310        21337 :     e->col_name = stable_col_name;
    1311        21337 :     e->col_value = col_value ? (dbvalue_t *)database_value_dup(col_value) : NULL;
    1312        21337 :     e->col_version = col_version;
    1313        21337 :     e->db_version = db_version;
    1314        21337 :     e->site_id_len = (site_len <= (int)sizeof(e->site_id)) ? site_len : (int)sizeof(e->site_id);
    1315        21337 :     memcpy(e->site_id, site_id, e->site_id_len);
    1316        21337 :     e->seq = seq;
    1317              : 
    1318        21337 :     batch->count++;
    1319        21337 :     return DBRES_OK;
    1320        21337 : }
    1321              : 
    1322        18904 : static void merge_pending_free_entries (merge_pending_batch *batch) {
    1323        18904 :     if (batch->entries) {
    1324        35578 :         for (int i = 0; i < batch->count; i++) {
    1325        21337 :             if (batch->entries[i].col_value) {
    1326        21337 :                 database_value_free(batch->entries[i].col_value);
    1327        21337 :                 batch->entries[i].col_value = NULL;
    1328        21337 :             }
    1329        21337 :         }
    1330        14241 :     }
    1331        18904 :     if (batch->pk) {
    1332         7731 :         cloudsync_memory_free(batch->pk);
    1333         7731 :         batch->pk = NULL;
    1334         7731 :     }
    1335        18904 :     batch->table = NULL;
    1336        18904 :     batch->pk_len = 0;
    1337        18904 :     batch->cl = 0;
    1338        18904 :     batch->sentinel_pending = false;
    1339        18904 :     batch->row_exists = false;
    1340        18904 :     batch->count = 0;
    1341        18904 : }
    1342              : 
    1343        18904 : static int merge_flush_pending (cloudsync_context *data) {
    1344        18904 :     merge_pending_batch *batch = data->pending_batch;
    1345        18904 :     if (!batch) return DBRES_OK;
    1346              : 
    1347        18904 :     int rc = DBRES_OK;
    1348        18904 :     bool flush_savepoint = false;
    1349              : 
    1350              :     // Nothing to write — handle sentinel-only case or skip
    1351        18904 :     if (batch->count == 0 && !(batch->sentinel_pending && batch->table)) {
    1352        11173 :         goto cleanup;
    1353              :     }
    1354              : 
    1355              :     // Wrap database operations in a savepoint so that on failure (e.g. RLS
    1356              :     // denial) the rollback properly releases all executor resources (open
    1357              :     // relations, snapshots, plan cache) acquired during the failed statement.
    1358         7731 :     flush_savepoint = (database_begin_savepoint(data, "merge_flush") == DBRES_OK);
    1359              : 
    1360         7731 :     if (batch->count == 0) {
    1361              :         // Sentinel with no winning columns (PK-only row)
    1362           30 :         dbvm_t *vm = batch->table->real_merge_sentinel_stmt;
    1363           30 :         rc = pk_decode_prikey(batch->pk, (size_t)batch->pk_len, pk_decode_bind_callback, vm);
    1364           30 :         if (rc < 0) {
    1365            0 :             cloudsync_set_dberror(data);
    1366            0 :             dbvm_reset(vm);
    1367            0 :             goto cleanup;
    1368              :         }
    1369           30 :         SYNCBIT_SET(data);
    1370           30 :         rc = databasevm_step(vm);
    1371           30 :         dbvm_reset(vm);
    1372           30 :         SYNCBIT_RESET(data);
    1373           30 :         if (rc == DBRES_DONE) rc = DBRES_OK;
    1374           30 :         if (rc != DBRES_OK) {
    1375            0 :             cloudsync_set_dberror(data);
    1376            0 :             goto cleanup;
    1377              :         }
    1378           30 :         goto cleanup;
    1379              :     }
    1380              : 
    1381              :     // Check if cached prepared statement can be reused
    1382         7701 :     cloudsync_table_context *table = batch->table;
    1383         7701 :     dbvm_t *vm = NULL;
    1384         7701 :     bool cache_hit = false;
    1385              : 
    1386        14595 :     if (batch->cached_vm &&
    1387         7197 :         batch->cached_row_exists == batch->row_exists &&
    1388         6894 :         batch->cached_col_count == batch->count) {
    1389         6870 :         cache_hit = true;
    1390        26559 :         for (int i = 0; i < batch->count; i++) {
    1391        19715 :             if (batch->cached_col_names[i] != batch->entries[i].col_name) {
    1392           26 :                 cache_hit = false;
    1393           26 :                 break;
    1394              :             }
    1395        19689 :         }
    1396         6870 :     }
    1397              : 
    1398         7701 :     if (cache_hit) {
    1399         6844 :         vm = batch->cached_vm;
    1400         6844 :         dbvm_reset(vm);
    1401         6844 :     } else {
    1402              :         // Invalidate old cache
    1403          857 :         if (batch->cached_vm) {
    1404          353 :             databasevm_finalize(batch->cached_vm);
    1405          353 :             batch->cached_vm = NULL;
    1406          353 :         }
    1407              : 
    1408              :         // Build multi-column SQL
    1409          857 :         const char **colnames = (const char **)cloudsync_memory_alloc(batch->count * sizeof(const char *));
    1410          857 :         if (!colnames) {
    1411            0 :             rc = cloudsync_set_error(data, "merge_flush_pending: out of memory", DBRES_NOMEM);
    1412            0 :             goto cleanup;
    1413              :         }
    1414         2505 :         for (int i = 0; i < batch->count; i++) {
    1415         1648 :             colnames[i] = batch->entries[i].col_name;
    1416         1648 :         }
    1417              : 
    1418          857 :         char *sql = batch->row_exists
    1419          417 :             ? sql_build_update_pk_and_multi_cols(data, table->name, colnames, batch->count, table->schema)
    1420          440 :             : sql_build_upsert_pk_and_multi_cols(data, table->name, colnames, batch->count, table->schema);
    1421          857 :         cloudsync_memory_free(colnames);
    1422              : 
    1423          857 :         if (!sql) {
    1424            0 :             rc = cloudsync_set_error(data, "merge_flush_pending: unable to build multi-column upsert SQL", DBRES_ERROR);
    1425            0 :             goto cleanup;
    1426              :         }
    1427              : 
    1428          857 :         rc = databasevm_prepare(data, sql, &vm, 0);
    1429          857 :         cloudsync_memory_free(sql);
    1430          857 :         if (rc != DBRES_OK) {
    1431            0 :             rc = cloudsync_set_error(data, "merge_flush_pending: unable to prepare statement", rc);
    1432            0 :             goto cleanup;
    1433              :         }
    1434              : 
    1435              :         // Update cache
    1436          857 :         batch->cached_vm = vm;
    1437          857 :         batch->cached_row_exists = batch->row_exists;
    1438          857 :         batch->cached_col_count = batch->count;
    1439              :         // Reallocate cached_col_names if needed
    1440          857 :         if (batch->cached_col_count > 0) {
    1441          857 :             const char **new_names = (const char **)cloudsync_memory_realloc(
    1442          857 :                 batch->cached_col_names, batch->count * sizeof(const char *));
    1443          857 :             if (new_names) {
    1444         2505 :                 for (int i = 0; i < batch->count; i++) {
    1445         1648 :                     new_names[i] = batch->entries[i].col_name;
    1446         1648 :                 }
    1447          857 :                 batch->cached_col_names = new_names;
    1448          857 :             }
    1449          857 :         }
    1450              :     }
    1451              : 
    1452              :     // Bind PKs (positions 1..npks)
    1453         7701 :     int npks = pk_decode_prikey(batch->pk, (size_t)batch->pk_len, pk_decode_bind_callback, vm);
    1454         7701 :     if (npks < 0) {
    1455            0 :         cloudsync_set_dberror(data);
    1456            0 :         dbvm_reset(vm);
    1457            0 :         rc = DBRES_ERROR;
    1458            0 :         goto cleanup;
    1459              :     }
    1460              : 
    1461              :     // Bind column values (positions npks+1..npks+count)
    1462        29038 :     for (int i = 0; i < batch->count; i++) {
    1463        21337 :         merge_pending_entry *e = &batch->entries[i];
    1464        21337 :         int bind_idx = npks + 1 + i;
    1465        21337 :         if (e->col_value) {
    1466        21337 :             rc = databasevm_bind_value(vm, bind_idx, e->col_value);
    1467        21337 :         } else {
    1468            0 :             rc = databasevm_bind_null(vm, bind_idx);
    1469              :         }
    1470        21337 :         if (rc != DBRES_OK) {
    1471            0 :             cloudsync_set_dberror(data);
    1472            0 :             dbvm_reset(vm);
    1473            0 :             goto cleanup;
    1474              :         }
    1475        21337 :     }
    1476              : 
    1477              :     // Execute with SYNCBIT and GOS handling
    1478         7701 :     if (table->algo == table_algo_crdt_gos) table->enabled = 0;
    1479         7701 :     SYNCBIT_SET(data);
    1480         7701 :     rc = databasevm_step(vm);
    1481         7701 :     dbvm_reset(vm);
    1482         7701 :     SYNCBIT_RESET(data);
    1483         7701 :     if (table->algo == table_algo_crdt_gos) table->enabled = 1;
    1484              : 
    1485         7701 :     if (rc != DBRES_DONE) {
    1486            3 :         cloudsync_set_dberror(data);
    1487            3 :         goto cleanup;
    1488              :     }
    1489         7698 :     rc = DBRES_OK;
    1490              : 
    1491              :     // Call merge_set_winner_clock for each buffered entry
    1492         7698 :     int64_t rowid = 0;
    1493        29027 :     for (int i = 0; i < batch->count; i++) {
    1494        21329 :         merge_pending_entry *e = &batch->entries[i];
    1495        42658 :         int clock_rc = merge_set_winner_clock(data, table, batch->pk, batch->pk_len,
    1496        21329 :                                                e->col_name, e->col_version, e->db_version,
    1497        21329 :                                                (const char *)e->site_id, e->site_id_len,
    1498        21329 :                                                e->seq, &rowid);
    1499        21329 :         if (clock_rc != DBRES_OK) {
    1500            0 :             rc = clock_rc;
    1501            0 :             goto cleanup;
    1502              :         }
    1503        29027 :     }
    1504              : 
    1505              : cleanup:
    1506        18904 :     merge_pending_free_entries(batch);
    1507        18904 :     if (flush_savepoint) {
    1508         7731 :         if (rc == DBRES_OK) database_commit_savepoint(data, "merge_flush");
    1509            3 :         else database_rollback_savepoint(data, "merge_flush");
    1510         7731 :     }
    1511        18904 :     return rc;
    1512        18904 : }
    1513              : 
    1514         3167 : int merge_insert_col (cloudsync_context *data, cloudsync_table_context *table, const void *pk, int pklen, const char *col_name, dbvalue_t *col_value, int64_t col_version, int64_t db_version, const char *site_id, int site_len, int64_t seq, int64_t *rowid) {
    1515              :     int index;
    1516         3167 :     dbvm_t *vm = table_column_lookup(table, col_name, true, &index);
    1517         3167 :     if (vm == NULL) return cloudsync_set_error(data, "Unable to retrieve column merge precompiled statement in merge_insert_col", DBRES_MISUSE);
    1518              :     
    1519              :     // INSERT INTO table (pk1, pk2, col_name) VALUES (?, ?, ?) ON CONFLICT DO UPDATE SET col_name=?;"
    1520              :     
    1521              :     // bind primary key(s)
    1522         3167 :     int rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, vm);
    1523         3167 :     if (rc < 0) {
    1524            0 :         cloudsync_set_dberror(data);
    1525            0 :         dbvm_reset(vm);
    1526            0 :         return rc;
    1527              :     }
    1528              :     
    1529              :     // bind value (always bind all expected parameters for correct prepared statement handling)
    1530         3167 :     if (col_value) {
    1531         3167 :         rc = databasevm_bind_value(vm, table->npks+1, col_value);
    1532         3167 :         if (rc == DBRES_OK) rc = databasevm_bind_value(vm, table->npks+2, col_value);
    1533         3167 :     } else {
    1534            0 :         rc = databasevm_bind_null(vm, table->npks+1);
    1535            0 :         if (rc == DBRES_OK) rc = databasevm_bind_null(vm, table->npks+2);
    1536              :     }
    1537         3167 :     if (rc != DBRES_OK) {
    1538            0 :         cloudsync_set_dberror(data);
    1539            0 :         dbvm_reset(vm);
    1540            0 :         return rc;
    1541              :     }
    1542              : 
    1543              :     // perform real operation and disable triggers
    1544              :     
    1545              :     // in case of GOS we reused the table->col_merge_stmt statement
    1546              :     // which looks like: INSERT INTO table (pk1, pk2, col_name) VALUES (?, ?, ?) ON CONFLICT DO UPDATE SET col_name=?;"
    1547              :     // but the UPDATE in the CONFLICT statement would return SQLITE_CONSTRAINT because the trigger raises the error
    1548              :     // the trick is to disable that trigger before executing the statement
    1549         3167 :     if (table->algo == table_algo_crdt_gos) table->enabled = 0;
    1550         3167 :     SYNCBIT_SET(data);
    1551         3167 :     rc = databasevm_step(vm);
    1552              :     DEBUG_MERGE("merge_insert(%02x%02x): %s (%d)", data->site_id[UUID_LEN-2], data->site_id[UUID_LEN-1], databasevm_sql(vm), rc);
    1553         3167 :     dbvm_reset(vm);
    1554         3167 :     SYNCBIT_RESET(data);
    1555         3167 :     if (table->algo == table_algo_crdt_gos) table->enabled = 1;
    1556              :     
    1557         3167 :     if (rc != DBRES_DONE) {
    1558            0 :         cloudsync_set_dberror(data);
    1559            0 :         return rc;
    1560              :     }
    1561              :     
    1562         3167 :     return merge_set_winner_clock(data, table, pk, pklen, col_name, col_version, db_version, site_id, site_len, seq, rowid);
    1563         3167 : }
    1564              : 
    1565          169 : int merge_delete (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, const char *colname, int64_t cl, int64_t db_version, const char *site_id, int site_len, int64_t seq, int64_t *rowid) {
    1566          169 :     int rc = DBRES_OK;
    1567              :     
    1568              :     // reset return value
    1569          169 :     *rowid = 0;
    1570              :     
    1571              :     // bind pk
    1572          169 :     dbvm_t *vm = table->real_merge_delete_stmt;
    1573          169 :     rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, vm);
    1574          169 :     if (rc < 0) {
    1575            0 :         rc = cloudsync_set_dberror(data);
    1576            0 :         dbvm_reset(vm);
    1577            0 :         return rc;
    1578              :     }
    1579              :     
    1580              :     // perform real operation and disable triggers
    1581          169 :     SYNCBIT_SET(data);
    1582          169 :     rc = databasevm_step(vm);
    1583              :     DEBUG_MERGE("merge_delete(%02x%02x): %s (%d)", data->site_id[UUID_LEN-2], data->site_id[UUID_LEN-1], databasevm_sql(vm), rc);
    1584          169 :     dbvm_reset(vm);
    1585          169 :     SYNCBIT_RESET(data);
    1586          169 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    1587          169 :     if (rc != DBRES_OK) {
    1588            0 :         cloudsync_set_dberror(data);
    1589            0 :         return rc;
    1590              :     }
    1591              :     
    1592          169 :     rc = merge_set_winner_clock(data, table, pk, pklen, colname, cl, db_version, site_id, site_len, seq, rowid);
    1593          169 :     if (rc != DBRES_OK) return rc;
    1594              :     
    1595              :     // drop clocks _after_ setting the winner clock so we don't lose track of the max db_version!!
    1596              :     // this must never come before `set_winner_clock`
    1597          169 :     vm = table->meta_merge_delete_drop;
    1598          169 :     rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
    1599          169 :     if (rc == DBRES_OK) rc = databasevm_step(vm);
    1600          169 :     dbvm_reset(vm);
    1601              :     
    1602          169 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    1603          169 :     if (rc != DBRES_OK) cloudsync_set_dberror(data);
    1604          169 :     return rc;
    1605          169 : }
    1606              : 
    1607           59 : int merge_zeroclock_on_resurrect(cloudsync_table_context *table, int64_t db_version, const char *pk, int pklen) {
    1608           59 :     dbvm_t *vm = table->meta_zero_clock_stmt;
    1609              :     
    1610           59 :     int rc = databasevm_bind_int(vm, 1, db_version);
    1611           59 :     if (rc != DBRES_OK) goto cleanup;
    1612              :     
    1613           59 :     rc = databasevm_bind_blob(vm, 2, (const void *)pk, pklen);
    1614           59 :     if (rc != DBRES_OK) goto cleanup;
    1615              :     
    1616           59 :     rc = databasevm_step(vm);
    1617           59 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    1618              :     
    1619              : cleanup:
    1620           59 :     if (rc != DBRES_OK) cloudsync_set_dberror(table->context);
    1621           59 :     dbvm_reset(vm);
    1622           59 :     return rc;
    1623              : }
    1624              : 
    1625              : // executed only if insert_cl == local_cl
    1626        45051 : int merge_did_cid_win (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, dbvalue_t *insert_value, const char *site_id, int site_len, const char *col_name, int64_t col_version, bool *didwin_flag) {
    1627              :     
    1628        45051 :     if (col_name == NULL) col_name = CLOUDSYNC_TOMBSTONE_VALUE;
    1629              :     
    1630              :     int64_t local_version;
    1631        45051 :     int rc = merge_get_col_version(table, col_name, pk, pklen, &local_version);
    1632        45051 :     if (rc == DBRES_DONE) {
    1633              :         // no rows returned, the incoming change wins if there's nothing there locally
    1634        20126 :         *didwin_flag = true;
    1635        20126 :         return DBRES_OK;
    1636              :     }
    1637        24925 :     if (rc != DBRES_OK) return rc;
    1638              :     
    1639              :     // rc == DBRES_OK, means that a row with a version exists
    1640        24925 :     if (local_version != col_version) {
    1641         1977 :         if (col_version > local_version) {*didwin_flag = true; return DBRES_OK;}
    1642          744 :         if (col_version < local_version) {*didwin_flag = false; return DBRES_OK;}
    1643            0 :     }
    1644              :     
    1645              :     // rc == DBRES_ROW and col_version == local_version, need to compare values
    1646              : 
    1647              :     // retrieve col_value precompiled statement
    1648        22948 :     bool is_block_col = block_is_block_colname(col_name) && table_has_block_cols(table);
    1649              :     dbvm_t *vm;
    1650        22948 :     if (is_block_col) {
    1651              :         // Block column: read value from blocks table (pk + col_name bindings)
    1652           43 :         vm = table_block_value_read_stmt(table);
    1653           43 :         if (!vm) return cloudsync_set_error(data, "Unable to retrieve block value read statement in merge_did_cid_win", DBRES_ERROR);
    1654           43 :         rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
    1655           43 :         if (rc != DBRES_OK) { dbvm_reset(vm); return cloudsync_set_dberror(data); }
    1656           43 :         rc = databasevm_bind_text(vm, 2, col_name, -1);
    1657           43 :         if (rc != DBRES_OK) { dbvm_reset(vm); return cloudsync_set_dberror(data); }
    1658           43 :     } else {
    1659        22905 :         vm = table_column_lookup(table, col_name, false, NULL);
    1660        22905 :         if (!vm) return cloudsync_set_error(data, "Unable to retrieve column value precompiled statement in merge_did_cid_win", DBRES_ERROR);
    1661              : 
    1662              :         // bind primary key values
    1663        22905 :         rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, (void *)vm);
    1664        22905 :         if (rc < 0) {
    1665            0 :             rc = cloudsync_set_dberror(data);
    1666            0 :             dbvm_reset(vm);
    1667            0 :             return rc;
    1668              :         }
    1669              :     }
    1670              :         
    1671              :     // execute vm
    1672              :     dbvalue_t *local_value;
    1673        22948 :     rc = databasevm_step(vm);
    1674        22948 :     if (rc == DBRES_DONE) {
    1675              :         // meta entry exists but the actual value is missing
    1676              :         // we should allow the value_compare function to make a decision
    1677              :         // value_compare has been modified to handle the case where lvalue is NULL
    1678            2 :         local_value = NULL;
    1679            2 :         rc = DBRES_OK;
    1680        22948 :     } else if (rc == DBRES_ROW) {
    1681        22946 :         local_value = database_column_value(vm, 0);
    1682        22946 :         rc = DBRES_OK;
    1683        22946 :     } else {
    1684            0 :         goto cleanup;
    1685              :     }
    1686              :     
    1687              :     // compare values
    1688        22948 :     int ret = dbutils_value_compare(insert_value, local_value);
    1689              :     // reset after compare, otherwise local value would be deallocated
    1690        22948 :     dbvm_reset(vm);
    1691        22948 :     vm = NULL;
    1692              :     
    1693        22948 :     bool compare_site_id = (ret == 0 && data->merge_equal_values == true);
    1694        22948 :     if (!compare_site_id) {
    1695        22948 :         *didwin_flag = (ret > 0);
    1696        22948 :         goto cleanup;
    1697              :     }
    1698              :     
    1699              :     // values are the same and merge_equal_values is true
    1700            0 :     vm = table->meta_site_id_stmt;
    1701            0 :     rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
    1702            0 :     if (rc != DBRES_OK) goto cleanup;
    1703              :     
    1704            0 :     rc = databasevm_bind_text(vm, 2, col_name, -1);
    1705            0 :     if (rc != DBRES_OK) goto cleanup;
    1706              :     
    1707            0 :     rc = databasevm_step(vm);
    1708            0 :     if (rc == DBRES_ROW) {
    1709            0 :         const void *local_site_id = database_column_blob(vm, 0, NULL);
    1710            0 :         if (!local_site_id) {
    1711            0 :             dbvm_reset(vm);
    1712            0 :             return cloudsync_set_error(data, "NULL site_id in cloudsync table, table is probably corrupted", DBRES_ERROR);
    1713              :         }
    1714            0 :         ret = memcmp(site_id, local_site_id, site_len);
    1715            0 :         *didwin_flag = (ret > 0);
    1716            0 :         dbvm_reset(vm);
    1717            0 :         return DBRES_OK;
    1718              :     }
    1719              :     
    1720              :     // handle error condition here
    1721            0 :     dbvm_reset(vm);
    1722            0 :     return cloudsync_set_error(data, "Unable to find site_id for previous change, cloudsync table is probably corrupted", DBRES_ERROR);
    1723              :     
    1724              : cleanup:
    1725        22948 :     if (rc != DBRES_OK) cloudsync_set_dberror(data);
    1726        22948 :     dbvm_reset(vm);
    1727        22948 :     return rc;
    1728        45051 : }
    1729              : 
    1730           59 : int merge_sentinel_only_insert (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, int64_t cl, int64_t db_version, const char *site_id, int site_len, int64_t seq, int64_t *rowid) {
    1731              : 
    1732              :     // reset return value
    1733           59 :     *rowid = 0;
    1734              : 
    1735           59 :     if (data->pending_batch == NULL) {
    1736              :         // Immediate mode: execute base table INSERT
    1737            0 :         dbvm_t *vm = table->real_merge_sentinel_stmt;
    1738            0 :         int rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, vm);
    1739            0 :         if (rc < 0) {
    1740            0 :             rc = cloudsync_set_dberror(data);
    1741            0 :             dbvm_reset(vm);
    1742            0 :             return rc;
    1743              :         }
    1744              : 
    1745            0 :         SYNCBIT_SET(data);
    1746            0 :         rc = databasevm_step(vm);
    1747            0 :         dbvm_reset(vm);
    1748            0 :         SYNCBIT_RESET(data);
    1749            0 :         if (rc == DBRES_DONE) rc = DBRES_OK;
    1750            0 :         if (rc != DBRES_OK) {
    1751            0 :             cloudsync_set_dberror(data);
    1752            0 :             return rc;
    1753              :         }
    1754            0 :     } else {
    1755              :         // Batch mode: skip base table INSERT, the batch flush will create the row
    1756           59 :         merge_pending_batch *batch = data->pending_batch;
    1757           59 :         batch->sentinel_pending = true;
    1758           59 :         if (batch->table == NULL) {
    1759           32 :             batch->table = table;
    1760           32 :             batch->pk = (char *)cloudsync_memory_alloc(pklen);
    1761           32 :             if (!batch->pk) return cloudsync_set_error(data, "merge_sentinel_only_insert: out of memory for pk", DBRES_NOMEM);
    1762           32 :             memcpy(batch->pk, pk, pklen);
    1763           32 :             batch->pk_len = pklen;
    1764           32 :         }
    1765              :     }
    1766              : 
    1767              :     // Metadata operations always execute regardless of batch mode
    1768           59 :     int rc = merge_zeroclock_on_resurrect(table, db_version, pk, pklen);
    1769           59 :     if (rc != DBRES_OK) return rc;
    1770              : 
    1771           59 :     return merge_set_winner_clock(data, table, pk, pklen, NULL, cl, db_version, site_id, site_len, seq, rowid);
    1772           59 : }
    1773              : 
    1774              : // MARK: - Block-level merge helpers -
    1775              : 
    1776              : // Store a block value in the blocks table
    1777          379 : static int block_store_value (cloudsync_context *data, cloudsync_table_context *table, const void *pk, int pklen, const char *block_colname, dbvalue_t *col_value) {
    1778          379 :     dbvm_t *vm = table->block_value_write_stmt;
    1779          379 :     if (!vm) return cloudsync_set_error(data, "block_store_value: blocks table not initialized", DBRES_MISUSE);
    1780              : 
    1781          379 :     int rc = databasevm_bind_blob(vm, 1, pk, pklen);
    1782          379 :     if (rc != DBRES_OK) goto cleanup;
    1783          379 :     rc = databasevm_bind_text(vm, 2, block_colname, -1);
    1784          379 :     if (rc != DBRES_OK) goto cleanup;
    1785          379 :     if (col_value) {
    1786          379 :         rc = databasevm_bind_value(vm, 3, col_value);
    1787          379 :     } else {
    1788            0 :         rc = databasevm_bind_null(vm, 3);
    1789              :     }
    1790          379 :     if (rc != DBRES_OK) goto cleanup;
    1791              : 
    1792          379 :     rc = databasevm_step(vm);
    1793          379 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    1794              : 
    1795              : cleanup:
    1796          379 :     if (rc != DBRES_OK) cloudsync_set_dberror(data);
    1797          379 :     databasevm_reset(vm);
    1798          379 :     return rc;
    1799          379 : }
    1800              : 
    1801              : // Delete a block value from the blocks table
    1802           74 : static int block_delete_value (cloudsync_context *data, cloudsync_table_context *table, const void *pk, int pklen, const char *block_colname) {
    1803           74 :     dbvm_t *vm = table->block_value_delete_stmt;
    1804           74 :     if (!vm) return cloudsync_set_error(data, "block_delete_value: blocks table not initialized", DBRES_MISUSE);
    1805              : 
    1806           74 :     int rc = databasevm_bind_blob(vm, 1, pk, pklen);
    1807           74 :     if (rc != DBRES_OK) goto cleanup;
    1808           74 :     rc = databasevm_bind_text(vm, 2, block_colname, -1);
    1809           74 :     if (rc != DBRES_OK) goto cleanup;
    1810              : 
    1811           74 :     rc = databasevm_step(vm);
    1812           74 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    1813              : 
    1814              : cleanup:
    1815           74 :     if (rc != DBRES_OK) cloudsync_set_dberror(data);
    1816           74 :     databasevm_reset(vm);
    1817           74 :     return rc;
    1818           74 : }
    1819              : 
    1820              : // Materialize all alive blocks for a base column into the base table
    1821          462 : int block_materialize_column (cloudsync_context *data, cloudsync_table_context *table, const void *pk, int pklen, const char *base_col_name) {
    1822          462 :     if (!table->block_list_stmt) return cloudsync_set_error(data, "block_materialize_column: blocks table not initialized", DBRES_MISUSE);
    1823              : 
    1824              :     // Find column index and delimiter
    1825          462 :     int col_idx = -1;
    1826          468 :     for (int i = 0; i < table->ncols; i++) {
    1827          468 :         if (strcasecmp(table->col_name[i], base_col_name) == 0) {
    1828          462 :             col_idx = i;
    1829          462 :             break;
    1830              :         }
    1831            6 :     }
    1832          462 :     if (col_idx < 0) return cloudsync_set_error(data, "block_materialize_column: column not found", DBRES_ERROR);
    1833          462 :     const char *delimiter = table->col_delimiter[col_idx] ? table->col_delimiter[col_idx] : BLOCK_DEFAULT_DELIMITER;
    1834              : 
    1835              :     // Build the LIKE pattern for block col_names: "base_col\x1F%"
    1836          462 :     char *like_pattern = block_build_colname(base_col_name, "%");
    1837          462 :     if (!like_pattern) return DBRES_NOMEM;
    1838              : 
    1839              :     // Query alive blocks from blocks table joined with metadata
    1840              :     // block_list_stmt: SELECT b.col_value FROM blocks b JOIN meta m
    1841              :     //   ON b.pk = m.pk AND b.col_name = m.col_name
    1842              :     //   WHERE b.pk = ? AND b.col_name LIKE ? AND m.col_version % 2 = 1
    1843              :     //   ORDER BY b.col_name
    1844          462 :     dbvm_t *vm = table->block_list_stmt;
    1845          462 :     int rc = databasevm_bind_blob(vm, 1, pk, pklen);
    1846          462 :     if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_reset(vm); return rc; }
    1847          462 :     rc = databasevm_bind_text(vm, 2, like_pattern, -1);
    1848          462 :     if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_reset(vm); return rc; }
    1849              :     // Bind pk again for the join condition (parameter 3)
    1850          462 :     rc = databasevm_bind_blob(vm, 3, pk, pklen);
    1851          462 :     if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_reset(vm); return rc; }
    1852          462 :     rc = databasevm_bind_text(vm, 4, like_pattern, -1);
    1853          462 :     if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_reset(vm); return rc; }
    1854              : 
    1855              :     // Collect block values
    1856          462 :     const char **block_values = NULL;
    1857          462 :     int block_count = 0;
    1858          462 :     int block_cap = 0;
    1859              : 
    1860        22695 :     while ((rc = databasevm_step(vm)) == DBRES_ROW) {
    1861        22233 :         const char *value = database_column_text(vm, 0);
    1862        22233 :         if (block_count >= block_cap) {
    1863         1076 :             int new_cap = block_cap ? block_cap * 2 : 16;
    1864         1076 :             const char **new_arr = (const char **)cloudsync_memory_realloc((void *)block_values, (uint64_t)(new_cap * sizeof(char *)));
    1865         1076 :             if (!new_arr) { rc = DBRES_NOMEM; break; }
    1866         1076 :             block_values = new_arr;
    1867         1076 :             block_cap = new_cap;
    1868         1076 :         }
    1869        22233 :         block_values[block_count] = value ? cloudsync_string_dup(value) : cloudsync_string_dup("");
    1870        22233 :         block_count++;
    1871              :     }
    1872          462 :     databasevm_reset(vm);
    1873          462 :     cloudsync_memory_free(like_pattern);
    1874              : 
    1875          462 :     if (rc != DBRES_DONE && rc != DBRES_OK && rc != DBRES_ROW) {
    1876              :         // Free collected values
    1877            0 :         for (int i = 0; i < block_count; i++) cloudsync_memory_free((void *)block_values[i]);
    1878            0 :         if (block_values) cloudsync_memory_free((void *)block_values);
    1879            0 :         return cloudsync_set_dberror(data);
    1880              :     }
    1881              : 
    1882              :     // Materialize text (NULL when no alive blocks)
    1883          462 :     char *text = (block_count > 0) ? block_materialize_text(block_values, block_count, delimiter) : NULL;
    1884        22695 :     for (int i = 0; i < block_count; i++) cloudsync_memory_free((void *)block_values[i]);
    1885          462 :     if (block_values) cloudsync_memory_free((void *)block_values);
    1886          462 :     if (block_count > 0 && !text) return DBRES_NOMEM;
    1887              : 
    1888              :     // Update the base table column via the col_merge_stmt (with triggers disabled)
    1889          462 :     dbvm_t *merge_vm = table->col_merge_stmt[col_idx];
    1890          462 :     if (!merge_vm) { cloudsync_memory_free(text); return DBRES_ERROR; }
    1891              : 
    1892              :     // Bind PKs
    1893          462 :     rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, merge_vm);
    1894          462 :     if (rc < 0) { cloudsync_memory_free(text); databasevm_reset(merge_vm); return DBRES_ERROR; }
    1895              : 
    1896              :     // Bind the text value twice (INSERT value + ON CONFLICT UPDATE value)
    1897          462 :     int npks = table->npks;
    1898          462 :     if (text) {
    1899          458 :         rc = databasevm_bind_text(merge_vm, npks + 1, text, -1);
    1900          458 :         if (rc != DBRES_OK) { cloudsync_memory_free(text); databasevm_reset(merge_vm); return rc; }
    1901          458 :         rc = databasevm_bind_text(merge_vm, npks + 2, text, -1);
    1902          458 :         if (rc != DBRES_OK) { cloudsync_memory_free(text); databasevm_reset(merge_vm); return rc; }
    1903          458 :     } else {
    1904            4 :         rc = databasevm_bind_null(merge_vm, npks + 1);
    1905            4 :         if (rc != DBRES_OK) { databasevm_reset(merge_vm); return rc; }
    1906            4 :         rc = databasevm_bind_null(merge_vm, npks + 2);
    1907            4 :         if (rc != DBRES_OK) { databasevm_reset(merge_vm); return rc; }
    1908              :     }
    1909              : 
    1910              :     // Execute with triggers disabled
    1911          462 :     table->enabled = 0;
    1912          462 :     SYNCBIT_SET(data);
    1913          462 :     rc = databasevm_step(merge_vm);
    1914          462 :     databasevm_reset(merge_vm);
    1915          462 :     SYNCBIT_RESET(data);
    1916          462 :     table->enabled = 1;
    1917              : 
    1918          462 :     cloudsync_memory_free(text);
    1919              : 
    1920          462 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    1921          462 :     if (rc != DBRES_OK) return cloudsync_set_dberror(data);
    1922          462 :     return DBRES_OK;
    1923          462 : }
    1924              : 
    1925              : // Accessor for has_block_cols flag
    1926         1024 : bool table_has_block_cols (cloudsync_table_context *table) {
    1927         1024 :     return table && table->has_block_cols;
    1928              : }
    1929              : 
    1930              : // Get block column algo for a given column index
    1931        11588 : col_algo_t table_col_algo (cloudsync_table_context *table, int index) {
    1932        11588 :     if (!table || !table->col_algo || index < 0 || index >= table->ncols) return col_algo_normal;
    1933        11588 :     return table->col_algo[index];
    1934        11588 : }
    1935              : 
    1936              : // Get block delimiter for a given column index
    1937          137 : const char *table_col_delimiter (cloudsync_table_context *table, int index) {
    1938          137 :     if (!table || !table->col_delimiter || index < 0 || index >= table->ncols) return BLOCK_DEFAULT_DELIMITER;
    1939          137 :     return table->col_delimiter[index] ? table->col_delimiter[index] : BLOCK_DEFAULT_DELIMITER;
    1940          137 : }
    1941              : 
    1942              : // Block column struct accessors (for use outside cloudsync.c where struct is opaque)
    1943         1024 : dbvm_t *table_block_value_read_stmt (cloudsync_table_context *table) { return table ? table->block_value_read_stmt : NULL; }
    1944          506 : dbvm_t *table_block_value_write_stmt (cloudsync_table_context *table) { return table ? table->block_value_write_stmt : NULL; }
    1945           93 : dbvm_t *table_block_list_stmt (cloudsync_table_context *table) { return table ? table->block_list_stmt : NULL; }
    1946           93 : const char *table_blocks_ref (cloudsync_table_context *table) { return table ? table->blocks_ref : NULL; }
    1947              : 
    1948            3 : void table_set_col_delimiter (cloudsync_table_context *table, int col_idx, const char *delimiter) {
    1949            3 :     if (!table || !table->col_delimiter || col_idx < 0 || col_idx >= table->ncols) return;
    1950            3 :     if (table->col_delimiter[col_idx]) cloudsync_memory_free(table->col_delimiter[col_idx]);
    1951            3 :     table->col_delimiter[col_idx] = delimiter ? cloudsync_string_dup(delimiter) : NULL;
    1952            3 : }
    1953              : 
    1954              : // Find column index by name
    1955          127 : int table_col_index (cloudsync_table_context *table, const char *col_name) {
    1956          127 :     if (!table || !col_name) return -1;
    1957          131 :     for (int i = 0; i < table->ncols; i++) {
    1958          131 :         if (strcasecmp(table->col_name[i], col_name) == 0) return i;
    1959            4 :     }
    1960            0 :     return -1;
    1961          127 : }
    1962              : 
    1963        46078 : int merge_insert (cloudsync_context *data, cloudsync_table_context *table, const char *insert_pk, int insert_pk_len, int64_t insert_cl, const char *insert_name, dbvalue_t *insert_value, int64_t insert_col_version, int64_t insert_db_version, const char *insert_site_id, int insert_site_id_len, int64_t insert_seq, int64_t *rowid) {
    1964              :     // Handle DWS and AWS algorithms here
    1965              :     // Delete-Wins Set (DWS): table_algo_crdt_dws
    1966              :     // Add-Wins Set (AWS): table_algo_crdt_aws
    1967              :     
    1968              :     // Causal-Length Set (CLS) Algorithm (default)
    1969              :     
    1970              :     // compute the local causal length for the row based on the primary key
    1971              :     // the causal length is used to determine the order of operations and resolve conflicts.
    1972        46078 :     int64_t local_cl = merge_get_local_cl(table, insert_pk, insert_pk_len);
    1973        46078 :     if (local_cl < 0) return cloudsync_set_error(data, "Unable to compute local causal length", DBRES_ERROR);
    1974              :     
    1975              :     // if the incoming causal length is older than the local causal length, we can safely ignore it
    1976              :     // because the local changes are more recent
    1977        46078 :     if (insert_cl < local_cl) return DBRES_OK;
    1978              :     
    1979              :     // check if the operation is a delete by examining the causal length
    1980              :     // even causal lengths typically signify delete operations
    1981        45853 :     bool is_delete = (insert_cl % 2 == 0);
    1982        45853 :     if (is_delete) {
    1983              :         // if it's a delete, check if the local state is at the same causal length
    1984              :         // if it is, no further action is needed
    1985          613 :         if (local_cl == insert_cl) return DBRES_OK;
    1986              :         
    1987              :         // perform a delete merge if the causal length is newer than the local one
    1988          338 :         int rc = merge_delete(data, table, insert_pk, insert_pk_len, insert_name, insert_col_version,
    1989          169 :                               insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid);
    1990          169 :         if (rc != DBRES_OK) cloudsync_set_error(data, "Unable to perform merge_delete", rc);
    1991          169 :         return rc;
    1992              :     }
    1993              :     
    1994              :     // if the operation is a sentinel-only insert (indicating a new row or resurrected row with no column update), handle it separately.
    1995        45240 :     bool is_sentinel_only = (strcmp(insert_name, CLOUDSYNC_TOMBSTONE_VALUE) == 0);
    1996        45240 :     if (is_sentinel_only) {
    1997          189 :         if (local_cl == insert_cl) return DBRES_OK;
    1998              :         
    1999              :         // perform a sentinel-only insert to track the existence of the row
    2000          118 :         int rc = merge_sentinel_only_insert(data, table, insert_pk, insert_pk_len, insert_col_version,
    2001           59 :                                             insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid);
    2002           59 :         if (rc != DBRES_OK) cloudsync_set_error(data, "Unable to perform merge_sentinel_only_insert", rc);
    2003           59 :         return rc;
    2004              :     }
    2005              :     
    2006              :     // from this point I can be sure that insert_name is not sentinel
    2007              :     
    2008              :     // handle the case where a row is being resurrected (e.g., after a delete, a new insert for the same row)
    2009              :     // odd causal lengths can "resurrect" rows
    2010        45051 :     bool needs_resurrect = (insert_cl > local_cl && insert_cl % 2 == 1);
    2011        45051 :     bool row_exists_locally = local_cl != 0;
    2012              :    
    2013              :     // if a resurrection is needed, insert a sentinel to mark the row as alive
    2014              :     // this handles out-of-order deliveries where the row was deleted and is now being re-inserted
    2015        45051 :     if (needs_resurrect && (row_exists_locally || (!row_exists_locally && insert_cl > 1))) {
    2016            0 :         int rc = merge_sentinel_only_insert(data, table, insert_pk, insert_pk_len, insert_cl,
    2017            0 :                                             insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid);
    2018            0 :         if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to perform merge_sentinel_only_insert", rc);
    2019            0 :     }
    2020              :     
    2021              :     // at this point, we determine whether the incoming change wins based on causal length
    2022              :     // this can be due to a resurrection, a non-existent local row, or a conflict resolution
    2023        45051 :     bool flag = false;
    2024        45051 :     int rc = merge_did_cid_win(data, table, insert_pk, insert_pk_len, insert_value, insert_site_id, insert_site_id_len, insert_name, insert_col_version, &flag);
    2025        45051 :     if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to perform merge_did_cid_win", rc);
    2026              :     
    2027              :     // check if the incoming change wins and should be applied
    2028        45051 :     bool does_cid_win = ((needs_resurrect) || (!row_exists_locally) || (flag));
    2029        45051 :     if (!does_cid_win) return DBRES_OK;
    2030              : 
    2031              :     // Block-level LWW: if the incoming col_name is a block entry (contains \x1F),
    2032              :     // bypass the normal base-table write and instead store the value in the blocks table.
    2033              :     // The base table column will be materialized from all alive blocks.
    2034        21766 :     if (block_is_block_colname(insert_name) && table->has_block_cols) {
    2035              :         // Store or delete block value in blocks table depending on tombstone status
    2036          412 :         if (insert_col_version % 2 == 0) {
    2037              :             // Tombstone: remove from blocks table
    2038           33 :             rc = block_delete_value(data, table, insert_pk, insert_pk_len, insert_name);
    2039           33 :         } else {
    2040          379 :             rc = block_store_value(data, table, insert_pk, insert_pk_len, insert_name, insert_value);
    2041              :         }
    2042          412 :         if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to store/delete block value", rc);
    2043              : 
    2044              :         // Set winner clock in metadata
    2045          824 :         rc = merge_set_winner_clock(data, table, insert_pk, insert_pk_len, insert_name,
    2046          412 :                                     insert_col_version, insert_db_version,
    2047          412 :                                     insert_site_id, insert_site_id_len, insert_seq, rowid);
    2048          412 :         if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to set winner clock for block", rc);
    2049              : 
    2050              :         // Materialize the full column from blocks into the base table
    2051          412 :         char *base_col = block_extract_base_colname(insert_name);
    2052          412 :         if (base_col) {
    2053          412 :             rc = block_materialize_column(data, table, insert_pk, insert_pk_len, base_col);
    2054          412 :             cloudsync_memory_free(base_col);
    2055          412 :             if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to materialize block column", rc);
    2056          412 :         }
    2057              : 
    2058          412 :         return DBRES_OK;
    2059              :     }
    2060              : 
    2061              :     // perform the final column insert or update if the incoming change wins
    2062        21354 :     if (data->pending_batch) {
    2063              :         // Propagate row_exists_locally to the batch on the first winning column.
    2064              :         // This lets merge_flush_pending choose UPDATE vs INSERT ON CONFLICT,
    2065              :         // which matters when RLS policies reference columns not in the payload.
    2066        21337 :         if (data->pending_batch->table == NULL) {
    2067         7699 :             data->pending_batch->row_exists = row_exists_locally;
    2068         7699 :         }
    2069        21337 :         rc = merge_pending_add(data, table, insert_pk, insert_pk_len, insert_name, insert_value, insert_col_version, insert_db_version, insert_site_id, insert_site_id_len, insert_seq);
    2070        21337 :         if (rc != DBRES_OK) cloudsync_set_error(data, "Unable to perform merge_pending_add", rc);
    2071        21337 :     } else {
    2072           17 :         rc = merge_insert_col(data, table, insert_pk, insert_pk_len, insert_name, insert_value, insert_col_version, insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid);
    2073           17 :         if (rc != DBRES_OK) cloudsync_set_error(data, "Unable to perform merge_insert_col", rc);
    2074              :     }
    2075              : 
    2076        21354 :     return rc;
    2077        46078 : }
    2078              : 
    2079              : // MARK: - Block column setup -
    2080              : 
    2081              : // Migrate existing tracked rows to block format when block-level LWW is first enabled on a column.
    2082              : // Scans the metadata table for alive rows with the plain col_name entry (not yet block entries),
    2083              : // reads each row's current value from the base table, splits it into blocks, and inserts
    2084              : // the block entries into both the blocks table and the metadata table.
    2085              : // Uses INSERT OR IGNORE semantics so the operation is safe to call multiple times.
    2086           73 : static int block_migrate_existing_rows (cloudsync_context *data, cloudsync_table_context *table, int col_idx) {
    2087           73 :     const char *col_name = table->col_name[col_idx];
    2088           73 :     if (!col_name || !table->meta_ref || !table->blocks_ref) return DBRES_OK;
    2089              : 
    2090           73 :     const char *delim = table->col_delimiter[col_idx] ? table->col_delimiter[col_idx] : BLOCK_DEFAULT_DELIMITER;
    2091           73 :     int64_t db_version = cloudsync_dbversion_next(data, CLOUDSYNC_VALUE_NOTSET);
    2092              : 
    2093              :     // Phase 1: collect all existing PKs that have an alive regular col_name entry
    2094              :     // AND do not yet have any entries in the blocks table for this column.
    2095              :     // The NOT IN filter makes this idempotent: rows that were already migrated
    2096              :     // (or had their blocks created via INSERT) are skipped on subsequent calls.
    2097              :     // We collect PKs before writing so that writes to the metadata table (Phase 2)
    2098              :     // do not perturb the read cursor on the same table.
    2099           73 :     char *like_pattern = block_build_colname(col_name, "%");
    2100           73 :     if (!like_pattern) return DBRES_NOMEM;
    2101              : 
    2102           73 :     char *scan_sql = cloudsync_memory_mprintf(SQL_META_SCAN_COL_FOR_MIGRATION, table->meta_ref, table->blocks_ref);
    2103           73 :     if (!scan_sql) { cloudsync_memory_free(like_pattern); return DBRES_NOMEM; }
    2104           73 :     dbvm_t *scan_vm = NULL;
    2105           73 :     int rc = databasevm_prepare(data, scan_sql, &scan_vm, 0);
    2106           73 :     cloudsync_memory_free(scan_sql);
    2107           73 :     if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); return rc; }
    2108              : 
    2109           73 :     rc = databasevm_bind_text(scan_vm, 1, col_name, -1);
    2110           73 :     if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_finalize(scan_vm); return rc; }
    2111              :     // Bind like_pattern as ?2 and keep it alive until after all scan steps complete,
    2112              :     // because databasevm_bind_text uses SQLITE_STATIC (no copy).
    2113           73 :     rc = databasevm_bind_text(scan_vm, 2, like_pattern, -1);
    2114           73 :     if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_finalize(scan_vm); return rc; }
    2115              : 
    2116              :     // Collect pk blobs into a dynamically-grown array of owned copies
    2117           73 :     void  **pks     = NULL;
    2118           73 :     size_t *pklens  = NULL;
    2119           73 :     int     pk_count = 0;
    2120           73 :     int     pk_cap   = 0;
    2121              : 
    2122           75 :     while ((rc = databasevm_step(scan_vm)) == DBRES_ROW) {
    2123            2 :         size_t pklen = 0;
    2124            2 :         const void *pk = database_column_blob(scan_vm, 0, &pklen);
    2125            2 :         if (!pk || pklen == 0) continue;
    2126              : 
    2127            2 :         if (pk_count >= pk_cap) {
    2128            1 :             int new_cap = pk_cap ? pk_cap * 2 : 8;
    2129            1 :             void  **new_pks    = (void  **)cloudsync_memory_realloc(pks,    (uint64_t)(new_cap * sizeof(void *)));
    2130            1 :             size_t *new_pklens = (size_t *)cloudsync_memory_realloc(pklens, (uint64_t)(new_cap * sizeof(size_t)));
    2131            1 :             if (!new_pks || !new_pklens) {
    2132            0 :                 cloudsync_memory_free(new_pks ? new_pks : pks);
    2133            0 :                 cloudsync_memory_free(new_pklens ? new_pklens : pklens);
    2134            0 :                 databasevm_finalize(scan_vm);
    2135            0 :                 return DBRES_NOMEM;
    2136              :             }
    2137            1 :             pks    = new_pks;
    2138            1 :             pklens = new_pklens;
    2139            1 :             pk_cap = new_cap;
    2140            1 :         }
    2141              : 
    2142            2 :         pks[pk_count] = cloudsync_memory_alloc((uint64_t)pklen);
    2143            2 :         if (!pks[pk_count]) { rc = DBRES_NOMEM; break; }
    2144            2 :         memcpy(pks[pk_count], pk, pklen);
    2145            2 :         pklens[pk_count] = pklen;
    2146            2 :         pk_count++;
    2147              :     }
    2148              : 
    2149           73 :     databasevm_finalize(scan_vm);
    2150           73 :     cloudsync_memory_free(like_pattern); // safe to free after scan_vm is finalized
    2151           73 :     if (rc != DBRES_DONE && rc != DBRES_OK) {
    2152            0 :         for (int i = 0; i < pk_count; i++) cloudsync_memory_free(pks[i]);
    2153            0 :         cloudsync_memory_free(pks);
    2154            0 :         cloudsync_memory_free(pklens);
    2155            0 :         return rc;
    2156              :     }
    2157              : 
    2158           73 :     if (pk_count == 0) {
    2159           72 :         cloudsync_memory_free(pks);
    2160           72 :         cloudsync_memory_free(pklens);
    2161           72 :         return DBRES_OK;
    2162              :     }
    2163              : 
    2164              :     // Phase 2: for each collected PK, read the column value, split into blocks,
    2165              :     // and insert into the blocks table + metadata using INSERT OR IGNORE.
    2166              : 
    2167            1 :     char *meta_sql = cloudsync_memory_mprintf(SQL_META_INSERT_BLOCK_IGNORE, table->meta_ref);
    2168            1 :     if (!meta_sql) { rc = DBRES_NOMEM; goto cleanup_pks; }
    2169            1 :     dbvm_t *meta_vm = NULL;
    2170            1 :     rc = databasevm_prepare(data, meta_sql, &meta_vm, 0);
    2171            1 :     cloudsync_memory_free(meta_sql);
    2172            1 :     if (rc != DBRES_OK) goto cleanup_pks;
    2173              : 
    2174            1 :     char *blocks_sql = cloudsync_memory_mprintf(SQL_BLOCKS_INSERT_IGNORE, table->blocks_ref);
    2175            1 :     if (!blocks_sql) { databasevm_finalize(meta_vm); rc = DBRES_NOMEM; goto cleanup_pks; }
    2176            1 :     dbvm_t *blocks_vm = NULL;
    2177            1 :     rc = databasevm_prepare(data, blocks_sql, &blocks_vm, 0);
    2178            1 :     cloudsync_memory_free(blocks_sql);
    2179            1 :     if (rc != DBRES_OK) { databasevm_finalize(meta_vm); goto cleanup_pks; }
    2180              : 
    2181            1 :     dbvm_t *val_vm = (dbvm_t *)table_column_lookup(table, col_name, false, NULL);
    2182              : 
    2183            3 :     for (int p = 0; p < pk_count; p++) {
    2184            2 :         const void *pk = pks[p];
    2185            2 :         size_t pklen   = pklens[p];
    2186              : 
    2187            2 :         if (!val_vm) continue;
    2188              : 
    2189              :         // Read current column value from the base table
    2190            2 :         int bind_rc = pk_decode_prikey((char *)pk, pklen, pk_decode_bind_callback, (void *)val_vm);
    2191            2 :         if (bind_rc < 0) { databasevm_reset(val_vm); continue; }
    2192              : 
    2193            2 :         int step_rc = databasevm_step(val_vm);
    2194            2 :         const char *text = (step_rc == DBRES_ROW) ? database_column_text(val_vm, 0) : NULL;
    2195              :         // Make a copy of text before resetting val_vm, as the pointer is only valid until reset
    2196            2 :         char *text_copy = text ? cloudsync_string_dup(text) : NULL;
    2197            2 :         databasevm_reset(val_vm);
    2198              : 
    2199            2 :         if (!text_copy) continue; // NULL column value: nothing to migrate
    2200              : 
    2201              :         // Split text into blocks and store each one
    2202            2 :         block_list_t *blocks = block_split(text_copy, delim);
    2203            2 :         cloudsync_memory_free(text_copy);
    2204            2 :         if (!blocks) continue;
    2205              : 
    2206            2 :         char **positions = block_initial_positions(blocks->count);
    2207            2 :         if (positions) {
    2208            7 :             for (int b = 0; b < blocks->count; b++) {
    2209            5 :                 char *block_cn = block_build_colname(col_name, positions[b]);
    2210            5 :                 if (block_cn) {
    2211              :                     // Metadata entry (skip if this block position already exists)
    2212            5 :                     databasevm_bind_blob(meta_vm, 1, pk, (int)pklen);
    2213            5 :                     databasevm_bind_text(meta_vm, 2, block_cn, -1);
    2214            5 :                     databasevm_bind_int(meta_vm, 3, 1);            // col_version = 1 (alive)
    2215            5 :                     databasevm_bind_int(meta_vm, 4, db_version);
    2216            5 :                     databasevm_bind_int(meta_vm, 5, cloudsync_bumpseq(data));
    2217            5 :                     databasevm_step(meta_vm);
    2218            5 :                     databasevm_reset(meta_vm);
    2219              : 
    2220              :                     // Block value (skip if this block position already exists)
    2221            5 :                     databasevm_bind_blob(blocks_vm, 1, pk, (int)pklen);
    2222            5 :                     databasevm_bind_text(blocks_vm, 2, block_cn, -1);
    2223            5 :                     databasevm_bind_text(blocks_vm, 3, blocks->entries[b].content, -1);
    2224            5 :                     databasevm_step(blocks_vm);
    2225            5 :                     databasevm_reset(blocks_vm);
    2226              : 
    2227            5 :                     cloudsync_memory_free(block_cn);
    2228            5 :                 }
    2229            5 :                 cloudsync_memory_free(positions[b]);
    2230            5 :             }
    2231            2 :             cloudsync_memory_free(positions);
    2232            2 :         }
    2233            2 :         block_list_free(blocks);
    2234            2 :     }
    2235              : 
    2236            1 :     databasevm_finalize(meta_vm);
    2237            1 :     databasevm_finalize(blocks_vm);
    2238            1 :     rc = DBRES_OK;
    2239              : 
    2240              : cleanup_pks:
    2241            3 :     for (int i = 0; i < pk_count; i++) cloudsync_memory_free(pks[i]);
    2242            1 :     cloudsync_memory_free(pks);
    2243            1 :     cloudsync_memory_free(pklens);
    2244            1 :     return rc;
    2245           73 : }
    2246              : 
    2247           74 : int cloudsync_setup_block_column (cloudsync_context *data, const char *table_name, const char *col_name, const char *delimiter, bool persist) {
    2248           74 :     cloudsync_table_context *table = table_lookup(data, table_name);
    2249           74 :     if (!table) return cloudsync_set_error(data, "cloudsync_setup_block_column: table not found", DBRES_ERROR);
    2250              : 
    2251              :     // Find column index
    2252           74 :     int col_idx = table_col_index(table, col_name);
    2253           74 :     if (col_idx < 0) {
    2254              :         char buf[1024];
    2255            0 :         snprintf(buf, sizeof(buf), "cloudsync_setup_block_column: column '%s' not found in table '%s'", col_name, table_name);
    2256            0 :         return cloudsync_set_error(data, buf, DBRES_ERROR);
    2257              :     }
    2258              : 
    2259              :     // Set column algo
    2260           74 :     table->col_algo[col_idx] = col_algo_block;
    2261           74 :     table->has_block_cols = true;
    2262              : 
    2263              :     // Set delimiter (can be NULL for default)
    2264           74 :     if (table->col_delimiter[col_idx]) {
    2265            0 :         cloudsync_memory_free(table->col_delimiter[col_idx]);
    2266            0 :         table->col_delimiter[col_idx] = NULL;
    2267            0 :     }
    2268           74 :     if (delimiter) {
    2269            1 :         table->col_delimiter[col_idx] = cloudsync_string_dup(delimiter);
    2270            1 :     }
    2271              : 
    2272              :     // Create blocks table if not already done
    2273           74 :     if (!table->blocks_ref) {
    2274           71 :         table->blocks_ref = database_build_blocks_ref(table->schema, table->name);
    2275           71 :         if (!table->blocks_ref) return DBRES_NOMEM;
    2276              : 
    2277              :         // CREATE TABLE IF NOT EXISTS
    2278           71 :         char *sql = cloudsync_memory_mprintf(SQL_BLOCKS_CREATE_TABLE, table->blocks_ref);
    2279           71 :         if (!sql) return DBRES_NOMEM;
    2280              : 
    2281           71 :         int rc = database_exec(data, sql);
    2282           71 :         cloudsync_memory_free(sql);
    2283           71 :         if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to create blocks table", rc);
    2284              : 
    2285              :         // Prepare block statements
    2286              :         // Write: upsert into blocks (pk, col_name, col_value)
    2287           71 :         sql = cloudsync_memory_mprintf(SQL_BLOCKS_UPSERT, table->blocks_ref);
    2288           71 :         if (!sql) return DBRES_NOMEM;
    2289           71 :         rc = databasevm_prepare(data, sql, (void **)&table->block_value_write_stmt, DBFLAG_PERSISTENT);
    2290           71 :         cloudsync_memory_free(sql);
    2291           71 :         if (rc != DBRES_OK) return rc;
    2292              : 
    2293              :         // Read: SELECT col_value FROM blocks WHERE pk = ? AND col_name = ?
    2294           71 :         sql = cloudsync_memory_mprintf(SQL_BLOCKS_SELECT, table->blocks_ref);
    2295           71 :         if (!sql) return DBRES_NOMEM;
    2296           71 :         rc = databasevm_prepare(data, sql, (void **)&table->block_value_read_stmt, DBFLAG_PERSISTENT);
    2297           71 :         cloudsync_memory_free(sql);
    2298           71 :         if (rc != DBRES_OK) return rc;
    2299              : 
    2300              :         // Delete: DELETE FROM blocks WHERE pk = ? AND col_name = ?
    2301           71 :         sql = cloudsync_memory_mprintf(SQL_BLOCKS_DELETE, table->blocks_ref);
    2302           71 :         if (!sql) return DBRES_NOMEM;
    2303           71 :         rc = databasevm_prepare(data, sql, (void **)&table->block_value_delete_stmt, DBFLAG_PERSISTENT);
    2304           71 :         cloudsync_memory_free(sql);
    2305           71 :         if (rc != DBRES_OK) return rc;
    2306              : 
    2307              :         // List alive blocks for materialization
    2308           71 :         sql = cloudsync_memory_mprintf(SQL_BLOCKS_LIST_ALIVE, table->blocks_ref, table->meta_ref);
    2309           71 :         if (!sql) return DBRES_NOMEM;
    2310           71 :         rc = databasevm_prepare(data, sql, (void **)&table->block_list_stmt, DBFLAG_PERSISTENT);
    2311           71 :         cloudsync_memory_free(sql);
    2312           71 :         if (rc != DBRES_OK) return rc;
    2313           71 :     }
    2314              : 
    2315              :     // Persist settings (skipped when called from the settings loader, since
    2316              :     // writing to cloudsync_table_settings while sqlite3_exec is iterating it
    2317              :     // re-feeds the rewritten row to the cursor and causes an infinite loop).
    2318           74 :     if (persist) {
    2319           73 :         int rc = dbutils_table_settings_set_key_value(data, table_name, col_name, "algo", "block");
    2320           73 :         if (rc != DBRES_OK) return rc;
    2321              : 
    2322           73 :         if (delimiter) {
    2323            0 :             rc = dbutils_table_settings_set_key_value(data, table_name, col_name, "delimiter", delimiter);
    2324            0 :             if (rc != DBRES_OK) return rc;
    2325            0 :         }
    2326              : 
    2327              :         // Migrate any existing tracked rows: populate the blocks table and metadata with
    2328              :         // block entries derived from the current column value, so that subsequent UPDATE
    2329              :         // operations can diff against the real existing state instead of treating everything
    2330              :         // as new, and so this node participates correctly in LWW conflict resolution.
    2331           73 :         rc = block_migrate_existing_rows(data, table, col_idx);
    2332           73 :         if (rc != DBRES_OK) return rc;
    2333           73 :     }
    2334              : 
    2335           74 :     return DBRES_OK;
    2336           74 : }
    2337              : 
    2338              : // MARK: - Private -
    2339              : 
    2340          232 : bool cloudsync_config_exists (cloudsync_context *data) {
    2341          232 :     return database_internal_table_exists(data, CLOUDSYNC_SITEID_NAME) == true;
    2342              : }
    2343              : 
    2344          248 : cloudsync_context *cloudsync_context_create (void *db) {
    2345          248 :     cloudsync_context *data = (cloudsync_context *)cloudsync_memory_zeroalloc((uint64_t)(sizeof(cloudsync_context)));
    2346          248 :     if (!data) return NULL;
    2347              :     DEBUG_SETTINGS("cloudsync_context_create %p", data);
    2348              :     
    2349          248 :     data->libversion = CLOUDSYNC_VERSION;
    2350          248 :     data->pending_db_version = CLOUDSYNC_VALUE_NOTSET;
    2351              :     #if CLOUDSYNC_DEBUG
    2352              :     data->debug = 1;
    2353              :     #endif
    2354              :     
    2355              :     // allocate space for 64 tables (it can grow if needed)
    2356          248 :     uint64_t mem_needed = (uint64_t)(CLOUDSYNC_INIT_NTABLES * sizeof(cloudsync_table_context *));
    2357          248 :     data->tables = (cloudsync_table_context **)cloudsync_memory_zeroalloc(mem_needed);
    2358          248 :     if (!data->tables) {cloudsync_memory_free(data); return NULL;}
    2359              :     
    2360          248 :     data->tables_cap = CLOUDSYNC_INIT_NTABLES;
    2361          248 :     data->tables_count = 0;
    2362          248 :     data->db = db;
    2363              :     
    2364              :     // SQLite exposes col_value as ANY, but other databases require a concrete type.
    2365              :     // In PostgreSQL we expose col_value as bytea, which holds the pk-encoded value bytes (type + data).
    2366              :     // Because col_value is already encoded, we skip decoding this field and pass it through as bytea.
    2367              :     // It is decoded to the target column type just before applying changes to the base table.
    2368          248 :     data->skip_decode_idx = (db == NULL) ? CLOUDSYNC_PK_INDEX_COLVALUE : -1;
    2369              : 
    2370          248 :     return data;
    2371          248 : }
    2372              : 
    2373          248 : void cloudsync_context_free (void *ctx) {
    2374          248 :     cloudsync_context *data = (cloudsync_context *)ctx;
    2375              :     DEBUG_SETTINGS("cloudsync_context_free %p", data);
    2376          248 :     if (!data) return;
    2377              : 
    2378              :     // free all table contexts and prepared statements
    2379          248 :     cloudsync_terminate(data);
    2380              : 
    2381          248 :     cloudsync_memory_free(data->tables);
    2382          248 :     cloudsync_memory_free(data);
    2383          248 : }
    2384              : 
    2385          343 : const char *cloudsync_context_init (cloudsync_context *data) {
    2386          343 :     if (!data) return NULL;
    2387              :     
    2388              :     // perform init just the first time, if the site_id field is not set.
    2389              :     // The data->site_id value could exists while settings tables don't exists if the
    2390              :     // cloudsync_context_init was previously called in init transaction that was rolled back
    2391              :     // because of an error during the init process.
    2392          343 :     if (data->site_id[0] == 0 || !database_internal_table_exists(data, CLOUDSYNC_SITEID_NAME)) {
    2393          228 :         if (dbutils_settings_init(data) != DBRES_OK) return NULL;
    2394          228 :         if (cloudsync_add_dbvms(data) != DBRES_OK) return NULL;
    2395          228 :         if (cloudsync_load_siteid(data) != DBRES_OK) return NULL;
    2396          228 :         data->schema_hash = database_schema_hash(data);
    2397          228 :     }
    2398              :     
    2399          343 :     return (const char *)data->site_id;
    2400          343 : }
    2401              : 
    2402         1368 : void cloudsync_sync_key (cloudsync_context *data, const char *key, const char *value) {
    2403              :     DEBUG_SETTINGS("cloudsync_sync_key key: %s value: %s", key, value);
    2404              :     
    2405              :     // sync data
    2406         1368 :     if (strcmp(key, CLOUDSYNC_KEY_SCHEMAVERSION) == 0) {
    2407          228 :         data->schema_version = (int)strtol(value, NULL, 0);
    2408          228 :         return;
    2409              :     }
    2410              :     
    2411         1140 :     if (strcmp(key, CLOUDSYNC_KEY_DEBUG) == 0) {
    2412            0 :         data->debug = 0;
    2413            0 :         if (value && (value[0] != 0) && (value[0] != '0')) data->debug = 1;
    2414            0 :         return;
    2415              :     }
    2416              : 
    2417         1140 :     if (strcmp(key, CLOUDSYNC_KEY_SCHEMA) == 0) {
    2418            0 :         cloudsync_set_schema(data, value);
    2419            0 :         return;
    2420              :     }
    2421         1368 : }
    2422              : 
    2423              : #if 0
    2424              : void cloudsync_sync_table_key(cloudsync_context *data, const char *table, const char *column, const char *key, const char *value) {
    2425              :     DEBUG_SETTINGS("cloudsync_sync_table_key table: %s column: %s key: %s value: %s", table, column, key, value);
    2426              :     // Unused in this version
    2427              :     return;
    2428              : }
    2429              : #endif
    2430              : 
    2431         8310 : int cloudsync_commit_hook (void *ctx) {
    2432         8310 :     cloudsync_context *data = (cloudsync_context *)ctx;
    2433              :     
    2434         8310 :     data->db_version = data->pending_db_version;
    2435         8310 :     data->pending_db_version = CLOUDSYNC_VALUE_NOTSET;
    2436         8310 :     data->seq = 0;
    2437              :     
    2438         8310 :     return DBRES_OK;
    2439              : }
    2440              : 
    2441            3 : void cloudsync_rollback_hook (void *ctx) {
    2442            3 :     cloudsync_context *data = (cloudsync_context *)ctx;
    2443              :     
    2444            3 :     data->pending_db_version = CLOUDSYNC_VALUE_NOTSET;
    2445            3 :     data->seq = 0;
    2446            3 : }
    2447              : 
    2448           24 : int cloudsync_begin_alter (cloudsync_context *data, const char *table_name) {
    2449              :     // init cloudsync_settings
    2450           24 :     if (cloudsync_context_init(data) == NULL) {
    2451            0 :         return DBRES_MISUSE;
    2452              :     }
    2453              : 
    2454              :     // lookup table
    2455           24 :     cloudsync_table_context *table = table_lookup(data, table_name);
    2456           24 :     if (!table) {
    2457              :         char buffer[1024];
    2458            1 :         snprintf(buffer, sizeof(buffer), "Unable to find table %s", table_name);
    2459            1 :         return cloudsync_set_error(data, buffer, DBRES_MISUSE);
    2460              :     }
    2461              : 
    2462              :     // idempotent: if already altering, return OK
    2463           23 :     if (table->is_altering) return DBRES_OK;
    2464              : 
    2465              :     // retrieve primary key(s)
    2466           23 :     char **names = NULL;
    2467           23 :     int nrows = 0;
    2468           23 :     int rc = database_pk_names(data, table_name, &names, &nrows);
    2469           23 :     if (rc != DBRES_OK) {
    2470              :         char buffer[1024];
    2471            0 :         snprintf(buffer, sizeof(buffer), "Unable to get primary keys for table %s", table_name);
    2472            0 :         cloudsync_set_error(data, buffer, DBRES_MISUSE);
    2473            0 :         goto rollback_begin_alter;
    2474              :     }
    2475              : 
    2476              :     // sanity check the number of primary keys
    2477           23 :     if (nrows != table_count_pks(table)) {
    2478              :         char buffer[1024];
    2479            0 :         snprintf(buffer, sizeof(buffer), "Number of primary keys for table %s changed before ALTER", table_name);
    2480            0 :         cloudsync_set_error(data, buffer, DBRES_MISUSE);
    2481            0 :         goto rollback_begin_alter;
    2482              :     }
    2483              : 
    2484              :     // drop original triggers
    2485           23 :     rc = database_delete_triggers(data, table_name);
    2486           23 :     if (rc != DBRES_OK) {
    2487              :         char buffer[1024];
    2488            0 :         snprintf(buffer, sizeof(buffer), "Unable to delete triggers for table %s in cloudsync_begin_alter.", table_name);
    2489            0 :         cloudsync_set_error(data, buffer, DBRES_ERROR);
    2490            0 :         goto rollback_begin_alter;
    2491              :     }
    2492              : 
    2493           23 :     table_set_pknames(table, names);
    2494           23 :     table->is_altering = true;
    2495           23 :     return DBRES_OK;
    2496              : 
    2497              : rollback_begin_alter:
    2498            0 :     if (names) table_pknames_free(names, nrows);
    2499            0 :     return rc;
    2500           24 : }
    2501              : 
    2502           23 : int cloudsync_finalize_alter (cloudsync_context *data, cloudsync_table_context *table) {
    2503              :     // check if dbversion needed to be updated
    2504           23 :     cloudsync_dbversion_check_uptodate(data);
    2505              : 
    2506              :     // if primary-key columns change, all row identities change.
    2507              :     // In that case, the clock table must be dropped, recreated,
    2508              :     // and backfilled. We detect this by comparing the unique index
    2509              :     // in the lookaside table with the source table's PKs.
    2510              :     
    2511              :     // retrieve primary keys (to check is they changed)
    2512           23 :     char **result = NULL;
    2513           23 :     int nrows = 0;
    2514           23 :     int rc = database_pk_names (data, table->name, &result, &nrows);
    2515           23 :     if (rc != DBRES_OK || nrows == 0) {
    2516            0 :         if (nrows == 0) rc = DBRES_MISUSE;
    2517            0 :         goto finalize;
    2518              :     }
    2519              :     
    2520              :     // check if there are differences
    2521           23 :     bool pk_diff = (nrows != table->npks);
    2522           23 :     if (!pk_diff) {
    2523           45 :         for (int i = 0; i < nrows; ++i) {
    2524           34 :             if (strcmp(table->pk_name[i], result[i]) != 0) {
    2525            6 :                 pk_diff = true;
    2526            6 :                 break;
    2527              :             }
    2528           28 :         }
    2529           17 :     }
    2530              :     
    2531           23 :     if (pk_diff) {
    2532              :         // drop meta-table, it will be recreated
    2533           12 :         char *sql = cloudsync_memory_mprintf(SQL_DROP_CLOUDSYNC_TABLE, table->meta_ref);
    2534           12 :         rc = database_exec(data, sql);
    2535           12 :         cloudsync_memory_free(sql);
    2536           12 :         if (rc != DBRES_OK) {
    2537            0 :             DEBUG_DBERROR(rc, "cloudsync_finalize_alter", data);
    2538            0 :             goto finalize;
    2539              :         }
    2540           12 :     } else {
    2541              :         // compact meta-table
    2542              :         // delete entries for removed columns
    2543           11 :         const char *schema = table->schema ? table->schema : "";
    2544           11 :         char *sql = sql_build_delete_cols_not_in_schema_query(schema, table->name, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
    2545           11 :         rc = database_exec(data, sql);
    2546           11 :         cloudsync_memory_free(sql);
    2547           11 :         if (rc != DBRES_OK) {
    2548            0 :             DEBUG_DBERROR(rc, "cloudsync_finalize_alter", data);
    2549            0 :             goto finalize;
    2550              :         }
    2551              :         
    2552           11 :         sql = sql_build_pk_qualified_collist_query(schema, table->name);
    2553           11 :         if (!sql) {rc = DBRES_NOMEM; goto finalize;}
    2554              :         
    2555           11 :         char *pkclause = NULL;
    2556           11 :         rc = database_select_text(data, sql, &pkclause);
    2557           11 :         cloudsync_memory_free(sql);
    2558           11 :         if (rc != DBRES_OK) goto finalize;
    2559           11 :         char *pkvalues = (pkclause) ? pkclause : "rowid";
    2560              :         
    2561              :         // delete entries related to rows that no longer exist in the original table, but preserve tombstone
    2562           11 :         sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_GC_DELETE_ORPHANED_PK, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE, CLOUDSYNC_TOMBSTONE_VALUE, table->base_ref, table->meta_ref, pkvalues);
    2563           11 :         rc = database_exec(data, sql);
    2564           11 :         if (pkclause) cloudsync_memory_free(pkclause);
    2565           11 :         cloudsync_memory_free(sql);
    2566           11 :         if (rc != DBRES_OK) {
    2567            0 :             DEBUG_DBERROR(rc, "cloudsync_finalize_alter", data);
    2568            0 :             goto finalize;
    2569              :         }
    2570              : 
    2571              :     }
    2572              :     
    2573              :     // update key to be later used in cloudsync_dbversion_rebuild
    2574              :     char buf[256];
    2575           23 :     snprintf(buf, sizeof(buf), "%" PRId64, data->db_version);
    2576           23 :     dbutils_settings_set_key_value(data, "pre_alter_dbversion", buf);
    2577              :     
    2578              : finalize:
    2579           23 :     table_pknames_free(result, nrows);
    2580           23 :     return rc;
    2581              : }
    2582              : 
    2583           24 : int cloudsync_commit_alter (cloudsync_context *data, const char *table_name) {
    2584           24 :     int rc = DBRES_MISUSE;
    2585           24 :     cloudsync_table_context *table = NULL;
    2586              : 
    2587              :     // init cloudsync_settings
    2588           24 :     if (cloudsync_context_init(data) == NULL) {
    2589            0 :         cloudsync_set_error(data, "Unable to initialize cloudsync context", DBRES_MISUSE);
    2590            0 :         goto rollback_finalize_alter;
    2591              :     }
    2592              : 
    2593              :     // lookup table
    2594           24 :     table = table_lookup(data, table_name);
    2595           24 :     if (!table) {
    2596              :         char buffer[1024];
    2597            1 :         snprintf(buffer, sizeof(buffer), "Unable to find table %s", table_name);
    2598            1 :         cloudsync_set_error(data, buffer, DBRES_MISUSE);
    2599            1 :         goto rollback_finalize_alter;
    2600              :     }
    2601              : 
    2602              :     // idempotent: if not altering, return OK
    2603           23 :     if (!table->is_altering) return DBRES_OK;
    2604              : 
    2605           23 :     rc = cloudsync_finalize_alter(data, table);
    2606           23 :     if (rc != DBRES_OK) goto rollback_finalize_alter;
    2607              : 
    2608              :     // the table is outdated, delete it and it will be reloaded in the cloudsync_init_internal
    2609              :     // is_altering is reset implicitly because table_free + cloudsync_init_table
    2610              :     // will reallocate the table context with zero-initialized memory
    2611           23 :     table_remove(data, table);
    2612           23 :     table_free(table);
    2613           23 :     table = NULL;
    2614              : 
    2615              :     // init again cloudsync for the table
    2616           23 :     table_algo algo_current = dbutils_table_settings_get_algo(data, table_name);
    2617           23 :     if (algo_current == table_algo_none) algo_current = dbutils_table_settings_get_algo(data, "*");
    2618           23 :     rc = cloudsync_init_table(data, table_name, cloudsync_algo_name(algo_current), CLOUDSYNC_INIT_FLAG_SKIP_INT_PK_CHECK);
    2619           23 :     if (rc != DBRES_OK) goto rollback_finalize_alter;
    2620              : 
    2621           23 :     return DBRES_OK;
    2622              : 
    2623              : rollback_finalize_alter:
    2624            1 :     if (table) {
    2625            0 :         table_set_pknames(table, NULL);
    2626            0 :         table->is_altering = false;
    2627            0 :     }
    2628            1 :     return rc;
    2629           24 : }
    2630              : 
    2631              : // MARK: - Filter Rewrite -
    2632              : 
    2633              : // Replace bare column names in a filter expression with prefix-qualified names.
    2634              : // E.g., filter="user_id = 42", prefix="NEW", columns=["user_id","id"] → "NEW.\"user_id\" = 42"
    2635              : // Columns must be sorted by length descending by the caller to avoid partial matches.
    2636              : // Skips content inside single-quoted string literals.
    2637              : // Returns a newly allocated string (caller must free with cloudsync_memory_free), or NULL on error.
    2638              : // Helper: check if an identifier token matches a column name.
    2639          112 : static bool filter_is_column (const char *token, size_t token_len, char **columns, int ncols) {
    2640          450 :     for (int i = 0; i < ncols; ++i) {
    2641          388 :         if (strlen(columns[i]) == token_len && strncmp(token, columns[i], token_len) == 0)
    2642           50 :             return true;
    2643          338 :     }
    2644           62 :     return false;
    2645          112 : }
    2646              : 
    2647              : // Helper: check if character is part of a SQL identifier.
    2648          796 : static bool filter_is_ident_char (char c) {
    2649         1262 :     return (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') ||
    2650          466 :            (c >= '0' && c <= '9') || c == '_';
    2651              : }
    2652              : 
    2653           40 : char *cloudsync_filter_add_row_prefix (const char *filter, const char *prefix, char **columns, int ncols) {
    2654           40 :     if (!filter || !prefix || !columns || ncols <= 0) return NULL;
    2655              : 
    2656           40 :     size_t filter_len = strlen(filter);
    2657           40 :     size_t prefix_len = strlen(prefix);
    2658              : 
    2659              :     // Each identifier match grows by at most (prefix_len + 3) bytes.
    2660              :     // Worst case: the entire filter is one repeated column reference separated by
    2661              :     // single characters, so up to (filter_len / 2) matches.  Use a safe upper bound.
    2662           40 :     size_t max_growth = (filter_len / 2 + 1) * (prefix_len + 3);
    2663           40 :     size_t cap = filter_len + max_growth + 64;
    2664           40 :     char *result = (char *)cloudsync_memory_alloc(cap);
    2665           40 :     if (!result) return NULL;
    2666           40 :     size_t out = 0;
    2667              : 
    2668              :     // Single pass: tokenize into identifiers, quoted strings, and everything else.
    2669           40 :     size_t i = 0;
    2670          336 :     while (i < filter_len) {
    2671              :         // Skip single-quoted string literals verbatim (handle '' escape)
    2672          296 :         if (filter[i] == '\'') {
    2673            6 :             result[out++] = filter[i++];
    2674           38 :             while (i < filter_len) {
    2675           38 :                 if (filter[i] == '\'') {
    2676            6 :                     result[out++] = filter[i++];
    2677              :                     // '' is an escaped quote — keep going
    2678            6 :                     if (i < filter_len && filter[i] == '\'') {
    2679            0 :                         result[out++] = filter[i++];
    2680            0 :                         continue;
    2681              :                     }
    2682            6 :                     break; // single ' ends the literal
    2683              :                 }
    2684           32 :                 result[out++] = filter[i++];
    2685              :             }
    2686            6 :             continue;
    2687              :         }
    2688              : 
    2689              :         // Extract identifier token
    2690          290 :         if (filter_is_ident_char(filter[i])) {
    2691          112 :             size_t start = i;
    2692          540 :             while (i < filter_len && filter_is_ident_char(filter[i])) ++i;
    2693          112 :             size_t token_len = i - start;
    2694              : 
    2695          112 :             if (filter_is_column(&filter[start], token_len, columns, ncols)) {
    2696              :                 // Emit PREFIX."column_name"
    2697           50 :                 memcpy(&result[out], prefix, prefix_len); out += prefix_len;
    2698           50 :                 result[out++] = '.';
    2699           50 :                 result[out++] = '"';
    2700           50 :                 memcpy(&result[out], &filter[start], token_len); out += token_len;
    2701           50 :                 result[out++] = '"';
    2702           50 :             } else {
    2703              :                 // Not a column — copy as-is
    2704           62 :                 memcpy(&result[out], &filter[start], token_len); out += token_len;
    2705              :             }
    2706          112 :             continue;
    2707              :         }
    2708              : 
    2709              :         // Any other character — copy as-is
    2710          178 :         result[out++] = filter[i++];
    2711              :     }
    2712              : 
    2713           40 :     result[out] = '\0';
    2714           40 :     return result;
    2715           40 : }
    2716              : 
    2717           24 : int cloudsync_reset_metatable (cloudsync_context *data, const char *table_name) {
    2718           24 :     cloudsync_table_context *table = table_lookup(data, table_name);
    2719           24 :     if (!table) return DBRES_ERROR;
    2720              : 
    2721           24 :     char *sql = cloudsync_memory_mprintf(SQL_DELETE_ALL_FROM_CLOUDSYNC_TABLE, table->meta_ref);
    2722           24 :     int rc = database_exec(data, sql);
    2723           24 :     cloudsync_memory_free(sql);
    2724           24 :     if (rc != DBRES_OK) return rc;
    2725              : 
    2726           24 :     return cloudsync_refill_metatable(data, table_name);
    2727           24 : }
    2728              : 
    2729          307 : int cloudsync_refill_metatable (cloudsync_context *data, const char *table_name) {
    2730          307 :     cloudsync_table_context *table = table_lookup(data, table_name);
    2731          307 :     if (!table) return DBRES_ERROR;
    2732              : 
    2733          307 :     dbvm_t *vm = NULL;
    2734          307 :     int64_t db_version = cloudsync_dbversion_next(data, CLOUDSYNC_VALUE_NOTSET);
    2735              : 
    2736              :     // Read row-level filter from settings (if any)
    2737              :     char filter_buf[2048];
    2738          307 :     int frc = dbutils_table_settings_get_value(data, table_name, "*", "filter", filter_buf, sizeof(filter_buf));
    2739          307 :     const char *filter = (frc == DBRES_OK && filter_buf[0]) ? filter_buf : NULL;
    2740              : 
    2741          307 :     const char *schema = table->schema ? table->schema : "";
    2742          307 :     char *sql = sql_build_pk_collist_query(schema, table_name);
    2743          307 :     char *pkclause_identifiers = NULL;
    2744          307 :     int rc = database_select_text(data, sql, &pkclause_identifiers);
    2745          307 :     cloudsync_memory_free(sql);
    2746          307 :     if (rc != DBRES_OK) goto finalize;
    2747          307 :     char *pkvalues_identifiers = (pkclause_identifiers) ? pkclause_identifiers : "rowid";
    2748              : 
    2749              :     // Use database-specific query builder to handle type differences in composite PKs
    2750          307 :     sql = sql_build_insert_missing_pks_query(schema, table_name, pkvalues_identifiers, table->base_ref, table->meta_ref, filter);
    2751          307 :     if (!sql) {rc = DBRES_NOMEM; goto finalize;}
    2752          307 :     rc = database_exec(data, sql);
    2753          307 :     cloudsync_memory_free(sql);
    2754          307 :     if (rc != DBRES_OK) goto finalize;
    2755              : 
    2756              :     // fill missing colums
    2757              :     // for each non-pk column:
    2758              :     // The new query does 1 encode per source row and one indexed NOT-EXISTS probe.
    2759              :     // The old plan does many decodes per candidate and can't use an index to rule out matches quickly—so it burns CPU and I/O.
    2760              : 
    2761          307 :     if (filter) {
    2762           20 :         sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_PKS_NOT_IN_SYNC_FOR_COL_FILTERED, pkvalues_identifiers, table->base_ref, filter, table->meta_ref);
    2763           20 :     } else {
    2764          287 :         sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_PKS_NOT_IN_SYNC_FOR_COL, pkvalues_identifiers, table->base_ref, table->meta_ref);
    2765              :     }
    2766          307 :     rc = databasevm_prepare(data, sql, (void **)&vm, DBFLAG_PERSISTENT);
    2767          307 :     cloudsync_memory_free(sql);
    2768          307 :     if (rc != DBRES_OK) goto finalize;
    2769              :      
    2770         1455 :     for (int i=0; i<table->ncols; ++i) {
    2771         1148 :         char *col_name = table->col_name[i];
    2772              : 
    2773         1148 :         rc = databasevm_bind_text(vm, 1, col_name, -1);
    2774         1148 :         if (rc != DBRES_OK) goto finalize;
    2775              : 
    2776         1186 :         while (1) {
    2777         1186 :             rc = databasevm_step(vm);
    2778         1186 :             if (rc == DBRES_ROW) {
    2779           38 :                 size_t pklen = 0;
    2780           38 :                 const void *pk = (const char *)database_column_blob(vm, 0, &pklen);
    2781           38 :                 if (!pk) { rc = DBRES_ERROR; break; }
    2782           38 :                 rc = local_mark_insert_or_update_meta(table, pk, pklen, col_name, db_version, cloudsync_bumpseq(data));
    2783         1186 :             } else if (rc == DBRES_DONE) {
    2784         1148 :                 rc = DBRES_OK;
    2785         1148 :                 break;
    2786              :             } else {
    2787            0 :                 break;
    2788              :             }
    2789              :         }
    2790         1148 :         if (rc != DBRES_OK) goto finalize;
    2791              : 
    2792         1148 :         databasevm_reset(vm);
    2793         1455 :     }
    2794              :     
    2795              : finalize:
    2796          307 :     if (rc != DBRES_OK) {DEBUG_ALWAYS("cloudsync_refill_metatable error: %s", database_errmsg(data));}
    2797          307 :     if (pkclause_identifiers) cloudsync_memory_free(pkclause_identifiers);
    2798          307 :     if (vm) databasevm_finalize(vm);
    2799          307 :     return rc;
    2800          307 : }
    2801              : 
    2802              : // MARK: - Local -
    2803              : 
    2804            4 : int local_update_sentinel (cloudsync_table_context *table, const void *pk, size_t pklen, int64_t db_version, int seq) {
    2805            4 :     dbvm_t *vm = table->meta_sentinel_update_stmt;
    2806            4 :     if (!vm) return -1;
    2807              :     
    2808            4 :     int rc = databasevm_bind_int(vm, 1, db_version);
    2809            4 :     if (rc != DBRES_OK) goto cleanup;
    2810              :     
    2811            4 :     rc = databasevm_bind_int(vm, 2, seq);
    2812            4 :     if (rc != DBRES_OK) goto cleanup;
    2813              :     
    2814            4 :     rc = databasevm_bind_blob(vm, 3, pk, (int)pklen);
    2815            4 :     if (rc != DBRES_OK) goto cleanup;
    2816              :     
    2817            4 :     rc = databasevm_step(vm);
    2818            4 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    2819              :     
    2820              : cleanup:
    2821            4 :     DEBUG_DBERROR(rc, "local_update_sentinel", table->context);
    2822            4 :     databasevm_reset(vm);
    2823            4 :     return rc;
    2824            4 : }
    2825              : 
    2826          127 : int local_mark_insert_sentinel_meta (cloudsync_table_context *table, const void *pk, size_t pklen, int64_t db_version, int seq) {
    2827          127 :     dbvm_t *vm = table->meta_sentinel_insert_stmt;
    2828          127 :     if (!vm) return -1;
    2829              :     
    2830          127 :     int rc = databasevm_bind_blob(vm, 1, pk, (int)pklen);
    2831          127 :     if (rc != DBRES_OK) goto cleanup;
    2832              :     
    2833          127 :     rc = databasevm_bind_int(vm, 2, db_version);
    2834          127 :     if (rc != DBRES_OK) goto cleanup;
    2835              :     
    2836          127 :     rc = databasevm_bind_int(vm, 3, seq);
    2837          127 :     if (rc != DBRES_OK) goto cleanup;
    2838              :     
    2839          127 :     rc = databasevm_bind_int(vm, 4, db_version);
    2840          127 :     if (rc != DBRES_OK) goto cleanup;
    2841              :     
    2842          127 :     rc = databasevm_bind_int(vm, 5, seq);
    2843          127 :     if (rc != DBRES_OK) goto cleanup;
    2844              :     
    2845          127 :     rc = databasevm_step(vm);
    2846          127 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    2847              :     
    2848              : cleanup:
    2849          127 :     DEBUG_DBERROR(rc, "local_insert_sentinel", table->context);
    2850          127 :     databasevm_reset(vm);
    2851          127 :     return rc;
    2852          127 : }
    2853              : 
    2854        11954 : int local_mark_insert_or_update_meta_impl (cloudsync_table_context *table, const void *pk, size_t pklen, const char *col_name, int col_version, int64_t db_version, int seq) {
    2855              :     
    2856        11954 :     dbvm_t *vm = table->meta_row_insert_update_stmt;
    2857        11954 :     if (!vm) return -1;
    2858              :     
    2859        11954 :     int rc = databasevm_bind_blob(vm, 1, pk, pklen);
    2860        11954 :     if (rc != DBRES_OK) goto cleanup;
    2861              :     
    2862        11954 :     rc = databasevm_bind_text(vm, 2, (col_name) ? col_name : CLOUDSYNC_TOMBSTONE_VALUE, -1);
    2863        11954 :     if (rc != DBRES_OK) goto cleanup;
    2864              :     
    2865        11954 :     rc = databasevm_bind_int(vm, 3, col_version);
    2866        11954 :     if (rc != DBRES_OK) goto cleanup;
    2867              :     
    2868        11954 :     rc = databasevm_bind_int(vm, 4, db_version);
    2869        11954 :     if (rc != DBRES_OK) goto cleanup;
    2870              :     
    2871        11954 :     rc = databasevm_bind_int(vm, 5, seq);
    2872        11954 :     if (rc != DBRES_OK) goto cleanup;
    2873              :     
    2874        11954 :     rc = databasevm_bind_int(vm, 6, db_version);
    2875        11954 :     if (rc != DBRES_OK) goto cleanup;
    2876              :     
    2877        11954 :     rc = databasevm_bind_int(vm, 7, seq);
    2878        11954 :     if (rc != DBRES_OK) goto cleanup;
    2879              :     
    2880        11954 :     rc = databasevm_step(vm);
    2881        11954 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    2882              :     
    2883              : cleanup:
    2884        11954 :     DEBUG_DBERROR(rc, "local_insert_or_update", table->context);
    2885        11954 :     databasevm_reset(vm);
    2886        11954 :     return rc;
    2887        11954 : }
    2888              : 
    2889        11847 : int local_mark_insert_or_update_meta (cloudsync_table_context *table, const void *pk, size_t pklen, const char *col_name, int64_t db_version, int seq) {
    2890        11847 :     return local_mark_insert_or_update_meta_impl(table, pk, pklen, col_name, 1, db_version, seq);
    2891              : }
    2892              : 
    2893           41 : int local_mark_delete_block_meta (cloudsync_table_context *table, const void *pk, size_t pklen, const char *block_colname, int64_t db_version, int seq) {
    2894              :     // Mark a block as deleted by setting col_version = 2 (even = deleted)
    2895           41 :     return local_mark_insert_or_update_meta_impl(table, pk, pklen, block_colname, 2, db_version, seq);
    2896              : }
    2897              : 
    2898           41 : int block_delete_value_external (cloudsync_context *data, cloudsync_table_context *table, const void *pk, size_t pklen, const char *block_colname) {
    2899           41 :     return block_delete_value(data, table, pk, (int)pklen, block_colname);
    2900              : }
    2901              : 
    2902           66 : int local_mark_delete_meta (cloudsync_table_context *table, const void *pk, size_t pklen, int64_t db_version, int seq) {
    2903           66 :     return local_mark_insert_or_update_meta_impl(table, pk, pklen, NULL, 2, db_version, seq);
    2904              : }
    2905              : 
    2906           36 : int local_drop_meta (cloudsync_table_context *table, const void *pk, size_t pklen) {
    2907           36 :     dbvm_t *vm = table->meta_row_drop_stmt;
    2908           36 :     if (!vm) return -1;
    2909              :     
    2910           36 :     int rc = databasevm_bind_blob(vm, 1, pk, pklen);
    2911           36 :     if (rc != DBRES_OK) goto cleanup;
    2912              :     
    2913           36 :     rc = databasevm_step(vm);
    2914           36 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    2915              :     
    2916              : cleanup:
    2917           36 :     DEBUG_DBERROR(rc, "local_drop_meta", table->context);
    2918           36 :     databasevm_reset(vm);
    2919           36 :     return rc;
    2920           36 : }
    2921              : 
    2922           30 : int local_update_move_meta (cloudsync_table_context *table, const void *pk, size_t pklen, const void *pk2, size_t pklen2, int64_t db_version) {
    2923              :     /*
    2924              :       * This function moves non-sentinel metadata entries from an old primary key (OLD.pk)
    2925              :       * to a new primary key (NEW.pk) when a primary key change occurs.
    2926              :       *
    2927              :       * To ensure consistency and proper conflict resolution in a CRDT (Conflict-free Replicated Data Type) system,
    2928              :       * each non-sentinel metadata entry involved in the move must have a unique sequence value (seq).
    2929              :       *
    2930              :       * The `seq` is crucial for tracking the order of operations and for detecting and resolving conflicts
    2931              :       * during synchronization between replicas. Without a unique `seq` for each entry, concurrent updates
    2932              :       * may be applied incorrectly, leading to data inconsistency.
    2933              :       *
    2934              :       * When performing the update, a unique `seq` must be assigned to each metadata row. This can be achieved
    2935              :       * by either incrementing the maximum sequence value in the table or using a function (e.g., cloudsync_bumpseq(data))
    2936              :       * that generates a unique sequence for each row. The update query should ensure that each row moved
    2937              :       * from OLD.pk to NEW.pk gets a distinct `seq` to maintain proper versioning and ordering of changes.
    2938              :      */
    2939              :     
    2940              :     // see https://github.com/sqliteai/sqlite-sync/blob/main/docs/PriKey.md for more details
    2941              :     // pk2 is the old pk
    2942              :     
    2943           30 :     dbvm_t *vm = table->meta_update_move_stmt;
    2944           30 :     if (!vm) return -1;
    2945              :     
    2946              :     // new primary key
    2947           30 :     int rc = databasevm_bind_blob(vm, 1, pk, pklen);
    2948           30 :     if (rc != DBRES_OK) goto cleanup;
    2949              :     
    2950              :     // new db_version
    2951           30 :     rc = databasevm_bind_int(vm, 2, db_version);
    2952           30 :     if (rc != DBRES_OK) goto cleanup;
    2953              :     
    2954              :     // old primary key
    2955           30 :     rc = databasevm_bind_blob(vm, 3, pk2, pklen2);
    2956           30 :     if (rc != DBRES_OK) goto cleanup;
    2957              :     
    2958           30 :     rc = databasevm_step(vm);
    2959           30 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    2960              :     
    2961              : cleanup:
    2962           30 :     DEBUG_DBERROR(rc, "local_update_move_meta", table->context);
    2963           30 :     databasevm_reset(vm);
    2964           30 :     return rc;
    2965           30 : }
    2966              : 
    2967              : // MARK: - Payload Encode / Decode -
    2968              : 
    2969          687 : static void cloudsync_payload_checksum_store (cloudsync_payload_header *header, uint64_t checksum) {
    2970          687 :     uint64_t h = checksum & 0xFFFFFFFFFFFFULL; // keep 48 bits
    2971          687 :     header->checksum[0] = (uint8_t)(h >> 40);
    2972          687 :     header->checksum[1] = (uint8_t)(h >> 32);
    2973          687 :     header->checksum[2] = (uint8_t)(h >> 24);
    2974          687 :     header->checksum[3] = (uint8_t)(h >> 16);
    2975          687 :     header->checksum[4] = (uint8_t)(h >>  8);
    2976          687 :     header->checksum[5] = (uint8_t)(h >>  0);
    2977          687 : }
    2978              : 
    2979          682 : static uint64_t cloudsync_payload_checksum_load (cloudsync_payload_header *header) {
    2980         2046 :     return ((uint64_t)header->checksum[0] << 40) |
    2981         1364 :            ((uint64_t)header->checksum[1] << 32) |
    2982         1364 :            ((uint64_t)header->checksum[2] << 24) |
    2983         1364 :            ((uint64_t)header->checksum[3] << 16) |
    2984         1364 :            ((uint64_t)header->checksum[4] <<  8) |
    2985          682 :            ((uint64_t)header->checksum[5] <<  0);
    2986              : }
    2987              : 
    2988          682 : static bool cloudsync_payload_checksum_verify (cloudsync_payload_header *header, uint64_t checksum) {
    2989          682 :     uint64_t checksum1 = cloudsync_payload_checksum_load(header);
    2990          682 :     uint64_t checksum2 = checksum & 0xFFFFFFFFFFFFULL;
    2991          682 :     return (checksum1 == checksum2);
    2992              : }
    2993              : 
    2994        49172 : static bool cloudsync_payload_encode_check (cloudsync_payload_context *payload, size_t needed) {
    2995        49172 :     if (payload->nrows == 0) needed += sizeof(cloudsync_payload_header);
    2996              :     
    2997              :     // alloc/resize buffer
    2998        49172 :     if (payload->bused + needed > payload->balloc) {
    2999          696 :         if (needed < CLOUDSYNC_PAYLOAD_MINBUF_SIZE) needed = CLOUDSYNC_PAYLOAD_MINBUF_SIZE;
    3000          696 :         size_t balloc = payload->balloc + needed;
    3001              :         
    3002          696 :         char *buffer = cloudsync_memory_realloc(payload->buffer, balloc);
    3003          696 :         if (!buffer) {
    3004            0 :             if (payload->buffer) cloudsync_memory_free(payload->buffer);
    3005            0 :             memset(payload, 0, sizeof(cloudsync_payload_context));
    3006            0 :             return false;
    3007              :         }
    3008              :         
    3009          696 :         payload->buffer = buffer;
    3010          696 :         payload->balloc = balloc;
    3011          696 :         if (payload->nrows == 0) payload->bused = sizeof(cloudsync_payload_header);
    3012          696 :     }
    3013              :     
    3014        49172 :     return true;
    3015        49172 : }
    3016              : 
    3017        50554 : size_t cloudsync_payload_context_size (size_t *header_size) {
    3018        50554 :     if (header_size) *header_size = sizeof(cloudsync_payload_header);
    3019        50554 :     return sizeof(cloudsync_payload_context);
    3020              : }
    3021              : 
    3022          687 : void cloudsync_payload_header_init (cloudsync_payload_header *header, uint32_t expanded_size, uint16_t ncols, uint32_t nrows, uint64_t hash) {
    3023          687 :     memset(header, 0, sizeof(cloudsync_payload_header));
    3024              :     assert(sizeof(cloudsync_payload_header)==32);
    3025              :     
    3026              :     int major, minor, patch;
    3027          687 :     sscanf(CLOUDSYNC_VERSION, "%d.%d.%d", &major, &minor, &patch);
    3028              :     
    3029          687 :     header->signature = htonl(CLOUDSYNC_PAYLOAD_SIGNATURE);
    3030          687 :     header->version = CLOUDSYNC_PAYLOAD_VERSION_2;
    3031          687 :     header->libversion[0] = (uint8_t)major;
    3032          687 :     header->libversion[1] = (uint8_t)minor;
    3033          687 :     header->libversion[2] = (uint8_t)patch;
    3034          687 :     header->expanded_size = htonl(expanded_size);
    3035          687 :     header->ncols = htons(ncols);
    3036          687 :     header->nrows = htonl(nrows);
    3037          687 :     header->schema_hash = htonll(hash);
    3038          687 : }
    3039              : 
    3040        49172 : int cloudsync_payload_encode_step (cloudsync_payload_context *payload, cloudsync_context *data, int argc, dbvalue_t **argv) {
    3041              :     DEBUG_FUNCTION("cloudsync_payload_encode_step");
    3042              :     // debug_values(argc, argv);
    3043              :     
    3044              :     // check if the step function is called for the first time
    3045        49172 :     if (payload->nrows == 0) payload->ncols = (uint16_t)argc;
    3046              :     
    3047        49172 :     size_t breq = pk_encode_size((dbvalue_t **)argv, argc, 0, data->skip_decode_idx);
    3048        49172 :     if (cloudsync_payload_encode_check(payload, breq) == false) {
    3049            0 :         return cloudsync_set_error(data, "Not enough memory to resize payload internal buffer", DBRES_NOMEM);
    3050              :     }
    3051              :     
    3052        49172 :     char *buffer = payload->buffer + payload->bused;
    3053        49172 :     size_t bsize = payload->balloc - payload->bused;
    3054        49172 :     char *p = pk_encode((dbvalue_t **)argv, argc, buffer, false, &bsize, data->skip_decode_idx);
    3055        49172 :     if (!p) return cloudsync_set_error(data, "An error occurred while encoding payload", DBRES_ERROR);
    3056              :     
    3057              :     // update buffer
    3058        49172 :     payload->bused += breq;
    3059              :     
    3060              :     // increment row counter
    3061        49172 :     ++payload->nrows;
    3062              :     
    3063        49172 :     return DBRES_OK;
    3064        49172 : }
    3065              : 
    3066          695 : int cloudsync_payload_encode_final (cloudsync_payload_context *payload, cloudsync_context *data) {
    3067              :     DEBUG_FUNCTION("cloudsync_payload_encode_final");
    3068              :     
    3069          695 :     if (payload->nrows == 0) {
    3070            8 :         if (payload->buffer) cloudsync_memory_free(payload->buffer);
    3071            8 :         payload->buffer = NULL;
    3072            8 :         payload->bsize = 0;
    3073            8 :         return DBRES_OK;
    3074              :     }
    3075              :     
    3076          687 :     if (payload->nrows > UINT32_MAX) {
    3077            0 :         if (payload->buffer) cloudsync_memory_free(payload->buffer);
    3078            0 :         payload->buffer = NULL;
    3079            0 :         payload->bsize = 0;
    3080            0 :         cloudsync_set_error(data, "Maximum number of payload rows reached", DBRES_ERROR);
    3081            0 :         return DBRES_ERROR;
    3082              :     }
    3083              :     
    3084              :     // sanity check about buffer size
    3085          687 :     int header_size = (int)sizeof(cloudsync_payload_header);
    3086          687 :     int64_t buffer_size = (int64_t)payload->bused - (int64_t)header_size;
    3087          687 :     if (buffer_size < 0) {
    3088            0 :         if (payload->buffer) cloudsync_memory_free(payload->buffer);
    3089            0 :         payload->buffer = NULL;
    3090            0 :         payload->bsize = 0;
    3091            0 :         cloudsync_set_error(data, "cloudsync_encode: internal size underflow", DBRES_ERROR);
    3092            0 :         return DBRES_ERROR;
    3093              :     }
    3094          687 :     if (buffer_size > INT_MAX) {
    3095            0 :         if (payload->buffer) cloudsync_memory_free(payload->buffer);
    3096            0 :         payload->buffer = NULL;
    3097            0 :         payload->bsize = 0;
    3098            0 :         cloudsync_set_error(data, "cloudsync_encode: payload too large to compress (INT_MAX limit)", DBRES_ERROR);
    3099            0 :         return DBRES_ERROR;
    3100              :     }
    3101              :     // try to allocate buffer used for compressed data
    3102          687 :     int real_buffer_size = (int)buffer_size;
    3103          687 :     int zbound = LZ4_compressBound(real_buffer_size);
    3104          687 :     char *zbuffer = cloudsync_memory_alloc(zbound + header_size); // if for some reasons allocation fails then just skip compression
    3105              :     
    3106              :     // skip the reserved header from the buffer to compress
    3107          687 :     char *src_buffer = payload->buffer + sizeof(cloudsync_payload_header);
    3108          687 :     int zused = (zbuffer) ? LZ4_compress_default(src_buffer, zbuffer+header_size, real_buffer_size, zbound) : 0;
    3109          687 :     bool use_uncompressed_buffer = (!zused || zused > real_buffer_size);
    3110          687 :     CHECK_FORCE_UNCOMPRESSED_BUFFER();
    3111              :     
    3112              :     // setup payload header
    3113          687 :     cloudsync_payload_header header = {0};
    3114          687 :     uint32_t expanded_size = (use_uncompressed_buffer) ? 0 : real_buffer_size;
    3115          687 :     cloudsync_payload_header_init(&header, expanded_size, payload->ncols, (uint32_t)payload->nrows, data->schema_hash);
    3116              :     
    3117              :     // if compression fails or if compressed size is bigger than original buffer, then use the uncompressed buffer
    3118          687 :     if (use_uncompressed_buffer) {
    3119           16 :         if (zbuffer) cloudsync_memory_free(zbuffer);
    3120           16 :         zbuffer = payload->buffer;
    3121           16 :         zused = real_buffer_size;
    3122           16 :     }
    3123              :     
    3124              :     // compute checksum of the buffer
    3125          687 :     uint64_t checksum = pk_checksum(zbuffer + header_size, zused);
    3126          687 :     cloudsync_payload_checksum_store(&header, checksum);
    3127              :     
    3128              :     // copy header and data to SQLite BLOB
    3129          687 :     memcpy(zbuffer, &header, sizeof(cloudsync_payload_header));
    3130          687 :     int blob_size = zused + sizeof(cloudsync_payload_header);
    3131          687 :     payload->bsize = blob_size;
    3132              :     
    3133              :     // cleanup memory
    3134          687 :     if (zbuffer != payload->buffer) {
    3135          671 :         cloudsync_memory_free (payload->buffer);
    3136          671 :         payload->buffer = zbuffer;
    3137          671 :     }
    3138              :     
    3139          687 :     return DBRES_OK;
    3140          695 : }
    3141              : 
    3142          695 : char *cloudsync_payload_blob (cloudsync_payload_context *payload, int64_t *blob_size, int64_t *nrows) {
    3143              :     DEBUG_FUNCTION("cloudsync_payload_blob");
    3144              :     
    3145          695 :     if (blob_size) *blob_size = (int64_t)payload->bsize;
    3146          695 :     if (nrows) *nrows = (int64_t)payload->nrows;
    3147          695 :     return payload->buffer;
    3148              : }
    3149              : 
    3150       441828 : static int cloudsync_payload_decode_callback (void *xdata, int index, int type, int64_t ival, double dval, char *pval) {
    3151       441828 :     cloudsync_pk_decode_bind_context *decode_context = (cloudsync_pk_decode_bind_context*)xdata;
    3152       441828 :     int rc = pk_decode_bind_callback(decode_context->vm, index, type, ival, dval, pval);
    3153              :     
    3154       441828 :     if (rc == DBRES_OK) {
    3155              :         // the dbversion index is smaller than seq index, so it is processed first
    3156              :         // when processing the dbversion column: save the value to the tmp_dbversion field
    3157              :         // when processing the seq column: update the dbversion and seq fields only if the current dbversion is greater than the last max value
    3158       441828 :         switch (index) {
    3159              :             case CLOUDSYNC_PK_INDEX_TBL:
    3160        49092 :                 if (type == DBTYPE_TEXT) {
    3161        49092 :                     decode_context->tbl = pval;
    3162        49092 :                     decode_context->tbl_len = ival;
    3163        49092 :                 }
    3164        49092 :                 break;
    3165              :             case CLOUDSYNC_PK_INDEX_PK:
    3166        49092 :                 if (type == DBTYPE_BLOB) {
    3167        49092 :                     decode_context->pk = pval;
    3168        49092 :                     decode_context->pk_len = ival;
    3169        49092 :                 }
    3170        49092 :                 break;
    3171              :             case CLOUDSYNC_PK_INDEX_COLNAME:
    3172        49092 :                 if (type == DBTYPE_TEXT) {
    3173        49092 :                     decode_context->col_name = pval;
    3174        49092 :                     decode_context->col_name_len = ival;
    3175        49092 :                 }
    3176        49092 :                 break;
    3177              :             case CLOUDSYNC_PK_INDEX_COLVERSION:
    3178        49092 :                 if (type == DBTYPE_INTEGER) decode_context->col_version = ival;
    3179        49092 :                 break;
    3180              :             case CLOUDSYNC_PK_INDEX_DBVERSION:
    3181        49092 :                 if (type == DBTYPE_INTEGER) decode_context->db_version = ival;
    3182        49092 :                 break;
    3183              :             case CLOUDSYNC_PK_INDEX_SITEID:
    3184        49092 :                 if (type == DBTYPE_BLOB) {
    3185        49092 :                     decode_context->site_id = pval;
    3186        49092 :                     decode_context->site_id_len = ival;
    3187        49092 :                 }
    3188        49092 :                 break;
    3189              :             case CLOUDSYNC_PK_INDEX_CL:
    3190        49092 :                 if (type == DBTYPE_INTEGER) decode_context->cl = ival;
    3191        49092 :                 break;
    3192              :             case CLOUDSYNC_PK_INDEX_SEQ:
    3193        49092 :                 if (type == DBTYPE_INTEGER) decode_context->seq = ival;
    3194        49092 :                 break;
    3195              :         }
    3196       441828 :     }
    3197              :         
    3198       441828 :     return rc;
    3199              : }
    3200              : 
    3201              : // #ifndef CLOUDSYNC_OMIT_RLS_VALIDATION
    3202              : 
    3203          684 : int cloudsync_payload_apply (cloudsync_context *data, const char *payload, int blen, int *pnrows) {
    3204              :     // sanity check
    3205          684 :     if (blen < (int)sizeof(cloudsync_payload_header)) return cloudsync_set_error(data, "Error on cloudsync_payload_apply: invalid payload length", DBRES_MISUSE);
    3206              :     
    3207              :     // decode header
    3208              :     cloudsync_payload_header header;
    3209          684 :     memcpy(&header, payload, sizeof(cloudsync_payload_header));
    3210              :     
    3211          684 :     header.signature = ntohl(header.signature);
    3212          684 :     header.expanded_size = ntohl(header.expanded_size);
    3213          684 :     header.ncols = ntohs(header.ncols);
    3214          684 :     header.nrows = ntohl(header.nrows);
    3215          684 :     header.schema_hash = ntohll(header.schema_hash);
    3216              :     
    3217              :     // compare schema_hash only if not disabled and if the received payload was created with the current header version
    3218              :     // to avoid schema hash mismatch when processed by a peer with a different extension version during software updates.
    3219          684 :     if (dbutils_settings_get_int64_value(data, CLOUDSYNC_KEY_SKIP_SCHEMA_HASH_CHECK) == 0 && header.version == CLOUDSYNC_PAYLOAD_VERSION_LATEST ) {
    3220          684 :         if (header.schema_hash != data->schema_hash) {
    3221            4 :             if (!database_check_schema_hash(data, header.schema_hash)) {
    3222              :                 char buffer[1024];
    3223            2 :                 snprintf(buffer, sizeof(buffer), "Cannot apply the received payload because the schema hash is unknown %llu.", header.schema_hash);
    3224            2 :                 return cloudsync_set_error(data, buffer, DBRES_MISUSE);
    3225              :             }
    3226            2 :         }
    3227          682 :     }
    3228              :     
    3229              :     // sanity check header
    3230          682 :     if ((header.signature != CLOUDSYNC_PAYLOAD_SIGNATURE) || (header.ncols == 0)) {
    3231            0 :         return cloudsync_set_error(data, "Error on cloudsync_payload_apply: invalid signature or column size", DBRES_MISUSE);
    3232              :     }
    3233              :     
    3234          682 :     const char *buffer = payload + sizeof(cloudsync_payload_header);
    3235          682 :     size_t buf_len = (size_t)blen - sizeof(cloudsync_payload_header);
    3236              : 
    3237              :     // sanity check checksum (only if version is >= 2)
    3238          682 :     if (header.version >= CLOUDSYNC_PAYLOAD_MIN_VERSION_WITH_CHECKSUM) {
    3239          682 :         uint64_t checksum = pk_checksum(buffer, buf_len);
    3240          682 :         if (cloudsync_payload_checksum_verify(&header, checksum) == false) {
    3241            1 :             return cloudsync_set_error(data, "Error on cloudsync_payload_apply: invalid checksum", DBRES_MISUSE);
    3242              :         }
    3243          681 :     }
    3244              : 
    3245              :     // check if payload is compressed
    3246          681 :     char *clone = NULL;
    3247          681 :     if (header.expanded_size != 0) {
    3248          665 :         clone = (char *)cloudsync_memory_alloc(header.expanded_size);
    3249          665 :         if (!clone) return cloudsync_set_error(data, "Unable to allocate memory to uncompress payload", DBRES_NOMEM);
    3250              : 
    3251          665 :         int lz4_rc = LZ4_decompress_safe(buffer, clone, (int)buf_len, (int)header.expanded_size);
    3252          665 :         if (lz4_rc <= 0 || (uint32_t)lz4_rc != header.expanded_size) {
    3253            0 :             if (clone) cloudsync_memory_free(clone);
    3254            0 :             return cloudsync_set_error(data, "Error on cloudsync_payload_apply: unable to decompress BLOB", DBRES_MISUSE);
    3255              :         }
    3256              : 
    3257          665 :         buffer = (const char *)clone;
    3258          665 :         buf_len = (size_t)header.expanded_size;
    3259          665 :     }
    3260              :     
    3261              :     // precompile the insert statement
    3262          681 :     dbvm_t *vm = NULL;
    3263          681 :     int rc = databasevm_prepare(data, SQL_CHANGES_INSERT_ROW, &vm, 0);
    3264          681 :     if (rc != DBRES_OK) {
    3265            0 :         if (clone) cloudsync_memory_free(clone);
    3266            0 :         return cloudsync_set_error(data, "Error on cloudsync_payload_apply: error while compiling SQL statement", rc);
    3267              :     }
    3268              :     
    3269              :     // process buffer, one row at a time
    3270          681 :     uint16_t ncols = header.ncols;
    3271          681 :     uint32_t nrows = header.nrows;
    3272          681 :     int64_t last_payload_db_version = -1;
    3273          681 :     int dbversion = dbutils_settings_get_int_value(data, CLOUDSYNC_KEY_CHECK_DBVERSION);
    3274          681 :     int seq = dbutils_settings_get_int_value(data, CLOUDSYNC_KEY_CHECK_SEQ);
    3275          681 :     cloudsync_pk_decode_bind_context decoded_context = {.vm = vm};
    3276              : 
    3277              :     // Initialize deferred column-batch merge
    3278          681 :     merge_pending_batch batch = {0};
    3279          681 :     data->pending_batch = &batch;
    3280          681 :     bool in_savepoint = false;
    3281          681 :     const void *last_pk = NULL;
    3282          681 :     int64_t last_pk_len = 0;
    3283          681 :     const char *last_tbl = NULL;
    3284          681 :     int64_t last_tbl_len = 0;
    3285              : 
    3286        49773 :     for (uint32_t i=0; i<nrows; ++i) {
    3287        49092 :         size_t seek = 0;
    3288        49092 :         int res = pk_decode((char *)buffer, buf_len, ncols, &seek, data->skip_decode_idx, cloudsync_payload_decode_callback, &decoded_context);
    3289        49092 :         if (res == -1) {
    3290            0 :             merge_flush_pending(data);
    3291            0 :             data->pending_batch = NULL;
    3292            0 :             if (batch.cached_vm) { databasevm_finalize(batch.cached_vm); batch.cached_vm = NULL; }
    3293            0 :             if (batch.cached_col_names) { cloudsync_memory_free(batch.cached_col_names); batch.cached_col_names = NULL; }
    3294            0 :             if (batch.entries) { cloudsync_memory_free(batch.entries); batch.entries = NULL; }
    3295            0 :             if (in_savepoint) database_rollback_savepoint(data, "cloudsync_payload_apply");
    3296            0 :             rc = DBRES_ERROR;
    3297            0 :             goto cleanup;
    3298              :         }
    3299              : 
    3300              :         // Detect PK/table/db_version boundary to flush pending batch
    3301        97503 :         bool pk_changed = (last_pk != NULL &&
    3302        48411 :                            (last_pk_len != decoded_context.pk_len ||
    3303        46485 :                             memcmp(last_pk, decoded_context.pk, last_pk_len) != 0));
    3304        97503 :         bool tbl_changed = (last_tbl != NULL &&
    3305        48411 :                             (last_tbl_len != decoded_context.tbl_len ||
    3306        48339 :                              memcmp(last_tbl, decoded_context.tbl, last_tbl_len) != 0));
    3307        49092 :         bool db_version_changed = (last_payload_db_version != decoded_context.db_version);
    3308              : 
    3309              :         // Flush pending batch before any boundary change
    3310        49092 :         if (pk_changed || tbl_changed || db_version_changed) {
    3311        18223 :             int flush_rc = merge_flush_pending(data);
    3312        18223 :             if (flush_rc != DBRES_OK) {
    3313            1 :                 rc = flush_rc;
    3314              :                 // continue processing remaining rows
    3315            1 :             }
    3316        18223 :         }
    3317              : 
    3318              :         // Per-db_version savepoints group rows with the same source db_version
    3319              :         // into one transaction. In SQLite autocommit mode, the RELEASE triggers
    3320              :         // the commit hook which bumps data->db_version and resets seq, ensuring
    3321              :         // unique (db_version, seq) tuples across groups. In PostgreSQL SPI,
    3322              :         // database_in_transaction() is always true so this block is inactive —
    3323              :         // the inner per-PK savepoint in merge_flush_pending handles RLS instead.
    3324        49092 :         if (in_savepoint && db_version_changed) {
    3325         4287 :             rc = database_commit_savepoint(data, "cloudsync_payload_apply");
    3326         4287 :             if (rc != DBRES_OK) {
    3327            0 :                 merge_pending_free_entries(&batch);
    3328            0 :                 data->pending_batch = NULL;
    3329            0 :                 cloudsync_set_error(data, "Error on cloudsync_payload_apply: unable to release a savepoint", rc);
    3330            0 :                 goto cleanup;
    3331              :             }
    3332         4287 :             in_savepoint = false;
    3333         4287 :         }
    3334              : 
    3335        49092 :         if (!in_savepoint && db_version_changed && !database_in_transaction(data)) {
    3336         4968 :             rc = database_begin_savepoint(data, "cloudsync_payload_apply");
    3337         4968 :             if (rc != DBRES_OK) {
    3338            0 :                 merge_pending_free_entries(&batch);
    3339            0 :                 data->pending_batch = NULL;
    3340            0 :                 cloudsync_set_error(data, "Error on cloudsync_payload_apply: unable to start a transaction", rc);
    3341            0 :                 goto cleanup;
    3342              :             }
    3343         4968 :             in_savepoint = true;
    3344         4968 :         }
    3345              : 
    3346              :         // Track db_version for batch-flush boundary detection
    3347        49092 :         if (db_version_changed) {
    3348         4968 :             last_payload_db_version = decoded_context.db_version;
    3349         4968 :         }
    3350              : 
    3351              :         // Update PK/table tracking
    3352        49092 :         last_pk = decoded_context.pk;
    3353        49092 :         last_pk_len = decoded_context.pk_len;
    3354        49092 :         last_tbl = decoded_context.tbl;
    3355        49092 :         last_tbl_len = decoded_context.tbl_len;
    3356              : 
    3357        49092 :         rc = databasevm_step(vm);
    3358        49092 :         if (rc != DBRES_DONE) {
    3359              :             // don't "break;", the error can be due to a RLS policy.
    3360              :             // in case of error we try to apply the following changes
    3361            2 :         }
    3362              : 
    3363        49092 :         buffer += seek;
    3364        49092 :         buf_len -= seek;
    3365        49092 :         dbvm_reset(vm);
    3366        49092 :     }
    3367              : 
    3368              :     // Final flush after loop
    3369              :     {
    3370          681 :         int flush_rc = merge_flush_pending(data);
    3371          681 :         if (flush_rc != DBRES_OK && rc == DBRES_OK) rc = flush_rc;
    3372              :     }
    3373          681 :     data->pending_batch = NULL;
    3374              : 
    3375          681 :     if (in_savepoint) {
    3376          681 :         int rc1 = database_commit_savepoint(data, "cloudsync_payload_apply");
    3377          681 :         if (rc1 != DBRES_OK) rc = rc1;
    3378          681 :     }
    3379              : 
    3380              :     // save last error (unused if function returns OK)
    3381          681 :     if (rc != DBRES_OK && rc != DBRES_DONE) {
    3382            1 :         cloudsync_set_dberror(data);
    3383            1 :     }
    3384              : 
    3385          681 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    3386         1361 :     if (rc == DBRES_OK) {
    3387              :         char buf[256];
    3388          680 :         if (decoded_context.db_version >= dbversion) {
    3389          546 :             snprintf(buf, sizeof(buf), "%" PRId64, decoded_context.db_version);
    3390          546 :             dbutils_settings_set_key_value(data, CLOUDSYNC_KEY_CHECK_DBVERSION, buf);
    3391              :             
    3392          546 :             if (decoded_context.seq != seq) {
    3393          338 :                 snprintf(buf, sizeof(buf), "%" PRId64, decoded_context.seq);
    3394          338 :                 dbutils_settings_set_key_value(data, CLOUDSYNC_KEY_CHECK_SEQ, buf);
    3395          338 :             }
    3396          546 :         }
    3397          680 :     }
    3398              : 
    3399              : cleanup:
    3400              :     // cleanup merge_pending_batch
    3401          681 :     if (batch.cached_vm) { databasevm_finalize(batch.cached_vm); batch.cached_vm = NULL; }
    3402          681 :     if (batch.cached_col_names) { cloudsync_memory_free(batch.cached_col_names); batch.cached_col_names = NULL; }
    3403          681 :     if (batch.entries) { cloudsync_memory_free(batch.entries); batch.entries = NULL; }  
    3404              : 
    3405              :     // cleanup vm
    3406          681 :     if (vm) databasevm_finalize(vm);
    3407              : 
    3408              :     // cleanup memory
    3409          681 :     if (clone) cloudsync_memory_free(clone);
    3410              : 
    3411              :     // error already saved in (save last error)
    3412          681 :     if (rc != DBRES_OK) return rc;
    3413              : 
    3414              :     // return the number of processed rows
    3415          680 :     if (pnrows) *pnrows = nrows;
    3416          680 :     return DBRES_OK;
    3417          684 : }
    3418              : 
    3419              : // MARK: - Payload load/store -
    3420              : 
    3421            0 : int cloudsync_payload_get (cloudsync_context *data, char **blob, int *blob_size, int *db_version, int64_t *new_db_version) {
    3422              :     // retrieve current db_version and seq
    3423            0 :     *db_version = dbutils_settings_get_int_value(data, CLOUDSYNC_KEY_SEND_DBVERSION);
    3424            0 :     if (*db_version < 0) return DBRES_ERROR;
    3425              :     
    3426              :     // retrieve BLOB
    3427              :     char sql[1024];
    3428            0 :     snprintf(sql, sizeof(sql), "WITH max_db_version AS (SELECT MAX(db_version) AS max_db_version FROM cloudsync_changes WHERE site_id=cloudsync_siteid()) "
    3429              :                                "SELECT * FROM (SELECT cloudsync_payload_encode(tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq) AS payload, max_db_version AS max_db_version FROM cloudsync_changes, max_db_version WHERE site_id=cloudsync_siteid() AND db_version>%d) WHERE payload IS NOT NULL", *db_version);
    3430              :     
    3431            0 :     int64_t len = 0;
    3432            0 :     int rc = database_select_blob_int(data, sql, blob, &len, new_db_version);
    3433            0 :     *blob_size = (int)len;
    3434            0 :     if (rc != DBRES_OK) return rc;
    3435              :     
    3436              :     // exit if there is no data to send
    3437            0 :     if (*blob == NULL || *blob_size == 0) return DBRES_OK;
    3438            0 :     return rc;
    3439            0 : }
    3440              : 
    3441              : #ifdef CLOUDSYNC_DESKTOP_OS
    3442            0 : int cloudsync_payload_save (cloudsync_context *data, const char *payload_path, int *size) {
    3443              :     DEBUG_FUNCTION("cloudsync_payload_save");
    3444              :     
    3445              :     // silently delete any other payload with the same name
    3446            0 :     cloudsync_file_delete(payload_path);
    3447              :     
    3448              :     // retrieve payload
    3449            0 :     char *blob = NULL;
    3450            0 :     int blob_size = 0, db_version = 0;
    3451            0 :     int64_t new_db_version = 0;
    3452            0 :     int rc = cloudsync_payload_get(data, &blob, &blob_size, &db_version, &new_db_version);
    3453            0 :     if (rc != DBRES_OK) {
    3454            0 :         if (db_version < 0) return cloudsync_set_error(data, "Unable to retrieve db_version", rc);
    3455            0 :         return cloudsync_set_error(data, "Unable to retrieve changes in cloudsync_payload_save", rc);
    3456              :     }
    3457              :     
    3458              :     // exit if there is no data to save
    3459            0 :     if (blob == NULL || blob_size == 0) {
    3460            0 :         if (size) *size = 0;
    3461            0 :         return DBRES_OK;
    3462              :     }
    3463              :     
    3464              :     // write payload to file
    3465            0 :     bool res = cloudsync_file_write(payload_path, blob, (size_t)blob_size);
    3466            0 :     cloudsync_memory_free(blob);
    3467            0 :     if (res == false) {
    3468            0 :         return cloudsync_set_error(data, "Unable to write payload to file path", DBRES_IOERR);
    3469              :     }
    3470              :     
    3471              :     // returns blob size
    3472            0 :     if (size) *size = blob_size;
    3473            0 :     return DBRES_OK;
    3474            0 : }
    3475              : #endif
    3476              : 
    3477              : // MARK: - Core -
    3478              : 
    3479          293 : int cloudsync_table_sanity_check (cloudsync_context *data, const char *name, CLOUDSYNC_INIT_FLAG init_flags) {
    3480              :     DEBUG_DBFUNCTION("cloudsync_table_sanity_check %s", name);
    3481              :     char buffer[2048];
    3482              :     
    3483              :     // sanity check table name
    3484          293 :     if (name == NULL) {
    3485            1 :         return cloudsync_set_error(data, "cloudsync_init requires a non-null table parameter", DBRES_ERROR);
    3486              :     }
    3487              :     
    3488              :     // avoid allocating heap memory for SQL statements by setting a maximum length of 512 characters
    3489              :     // for table names. This limit is reasonable and helps prevent memory management issues.
    3490          292 :     const size_t maxlen = CLOUDSYNC_MAX_TABLENAME_LEN;
    3491          292 :     if (strlen(name) > maxlen) {
    3492            1 :         snprintf(buffer, sizeof(buffer), "Table name cannot be longer than %d characters", (int)maxlen);
    3493            1 :         return cloudsync_set_error(data, buffer, DBRES_ERROR);
    3494              :     }
    3495              :     
    3496              :     // check if already initialized
    3497          291 :     cloudsync_table_context *table = table_lookup(data, name);
    3498          291 :     if (table) return DBRES_OK;
    3499              :     
    3500              :     // check if table exists
    3501          287 :     if (database_table_exists(data, name, cloudsync_schema(data)) == false) {
    3502            2 :         snprintf(buffer, sizeof(buffer), "Table %s does not exist", name);
    3503            2 :         return cloudsync_set_error(data, buffer, DBRES_ERROR);
    3504              :     }
    3505              :     
    3506              :     // no more than 128 columns can be used as a composite primary key (SQLite hard limit)
    3507          285 :     int npri_keys = database_count_pk(data, name, false, cloudsync_schema(data));
    3508          285 :     if (npri_keys < 0) return cloudsync_set_dberror(data);
    3509          285 :     if (npri_keys > 128) return cloudsync_set_error(data, "No more than 128 columns can be used to form a composite primary key", DBRES_ERROR);
    3510              :     
    3511              :     #if CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
    3512              :     // if count == 0 means that rowid will be used as primary key (BTW: very bad choice for the user)
    3513          285 :     if (npri_keys == 0) {
    3514            1 :         snprintf(buffer, sizeof(buffer), "Rowid only tables are not supported, all primary keys must be explicitly set and declared as NOT NULL (table %s)", name);
    3515            1 :         return cloudsync_set_error(data, buffer, DBRES_ERROR);
    3516              :     }
    3517              :     #endif
    3518              :     
    3519          284 :     bool skip_int_pk_check = (init_flags & CLOUDSYNC_INIT_FLAG_SKIP_INT_PK_CHECK) != 0;
    3520          284 :     if (!skip_int_pk_check) {
    3521          221 :         if (npri_keys == 1) {
    3522              :             // the affinity of a column is determined by the declared type of the column,
    3523              :             // according to the following rules in the order shown:
    3524              :             // 1. If the declared type contains the string "INT" then it is assigned INTEGER affinity.
    3525          133 :             int npri_keys_int = database_count_int_pk(data, name, cloudsync_schema(data));
    3526          133 :             if (npri_keys_int < 0) return cloudsync_set_dberror(data);
    3527          133 :             if (npri_keys == npri_keys_int) {
    3528            1 :                 snprintf(buffer, sizeof(buffer), "Table %s uses a single-column INTEGER primary key. For CRDT replication, primary keys must be globally unique. Consider using a TEXT primary key with UUIDs or ULID to avoid conflicts across nodes. If you understand the risk and still want to use this INTEGER primary key, set the third argument of the cloudsync_init function to 1 to skip this check.", name);
    3529            1 :                 return cloudsync_set_error(data, buffer, DBRES_ERROR);
    3530              :             }
    3531              :             
    3532          132 :         }
    3533          220 :     }
    3534              :         
    3535              :     // if user declared explicit primary key(s) then make sure they are all declared as NOT NULL
    3536              :     #if CLOUDSYNC_CHECK_NOTNULL_PRIKEYS
    3537              :     bool skip_notnull_prikeys_check = (init_flags & CLOUDSYNC_INIT_FLAG_SKIP_NOT_NULL_PRIKEYS_CHECK) != 0;
    3538              :     if (!skip_notnull_prikeys_check) {
    3539              :         if (npri_keys > 0) {
    3540              :             int npri_keys_notnull = database_count_pk(data, name, true, cloudsync_schema(data));
    3541              :             if (npri_keys_notnull < 0) return cloudsync_set_dberror(data);
    3542              :             if (npri_keys != npri_keys_notnull) {
    3543              :                 snprintf(buffer, sizeof(buffer), "All primary keys must be explicitly declared as NOT NULL (table %s)", name);
    3544              :                 return cloudsync_set_error(data, buffer, DBRES_ERROR);
    3545              :             }
    3546              :         }
    3547              :     }
    3548              :     #endif
    3549              :     
    3550              :     // check for columns declared as NOT NULL without a DEFAULT value.
    3551              :     // Otherwise, col_merge_stmt would fail if changes to other columns are inserted first.
    3552          283 :     bool skip_notnull_default_check = (init_flags & CLOUDSYNC_INIT_FLAG_SKIP_NOT_NULL_DEFAULT_CHECK) != 0;
    3553          283 :     if (!skip_notnull_default_check) {
    3554          283 :         int n_notnull_nodefault = database_count_notnull_without_default(data, name, cloudsync_schema(data));
    3555          283 :         if (n_notnull_nodefault < 0) return cloudsync_set_dberror(data);
    3556          283 :         if (n_notnull_nodefault > 0) {
    3557            0 :             snprintf(buffer, sizeof(buffer), "All non-primary key columns declared as NOT NULL must have a DEFAULT value. (table %s)", name);
    3558            0 :             return cloudsync_set_error(data, buffer, DBRES_ERROR);
    3559              :         }
    3560          283 :     }
    3561              :     
    3562          283 :     return DBRES_OK;
    3563          293 : }
    3564              : 
    3565            4 : int cloudsync_cleanup_internal (cloudsync_context *data, cloudsync_table_context *table) {
    3566            4 :     if (cloudsync_context_init(data) == NULL) return DBRES_MISUSE;
    3567              : 
    3568              :     // drop meta-table
    3569            4 :     const char *table_name = table->name;
    3570            4 :     char *sql = cloudsync_memory_mprintf(SQL_DROP_CLOUDSYNC_TABLE, table->meta_ref);
    3571            4 :     int rc = database_exec(data, sql);
    3572            4 :     cloudsync_memory_free(sql);
    3573            4 :     if (rc != DBRES_OK) {
    3574              :         char buffer[1024];
    3575            0 :         snprintf(buffer, sizeof(buffer), "Unable to drop cloudsync table %s_cloudsync in cloudsync_cleanup", table_name);
    3576            0 :         return cloudsync_set_error(data, buffer, rc);
    3577              :     }
    3578              : 
    3579              :     // drop blocks table if this table has block LWW columns
    3580            4 :     if (table->blocks_ref) {
    3581            1 :         sql = cloudsync_memory_mprintf(SQL_DROP_CLOUDSYNC_TABLE, table->blocks_ref);
    3582            1 :         rc = database_exec(data, sql);
    3583            1 :         cloudsync_memory_free(sql);
    3584            1 :         if (rc != DBRES_OK) {
    3585              :             char buffer[1024];
    3586            0 :             snprintf(buffer, sizeof(buffer), "Unable to drop blocks table %s_cloudsync_blocks in cloudsync_cleanup", table_name);
    3587            0 :             return cloudsync_set_error(data, buffer, rc);
    3588              :         }
    3589            1 :     }
    3590              : 
    3591              :     // drop original triggers
    3592            4 :     rc = database_delete_triggers(data, table_name);
    3593            4 :     if (rc != DBRES_OK) {
    3594              :         char buffer[1024];
    3595            0 :         snprintf(buffer, sizeof(buffer), "Unable to delete triggers for table %s", table_name);
    3596            0 :         return cloudsync_set_error(data, buffer, rc);
    3597              :     }
    3598              :     
    3599              :     // remove all table related settings
    3600            4 :     dbutils_table_settings_set_key_value(data, table_name, NULL, NULL, NULL);
    3601            4 :     return DBRES_OK;
    3602            4 : }
    3603              : 
    3604            4 : int cloudsync_cleanup (cloudsync_context *data, const char *table_name) {
    3605            4 :     cloudsync_table_context *table = table_lookup(data, table_name);
    3606            4 :     if (!table) return DBRES_OK;
    3607              :     
    3608              :     // TODO: check what happen if cloudsync_cleanup_internal failes (not eveything dropped) and the table is still in memory?
    3609              :     
    3610            4 :     int rc = cloudsync_cleanup_internal(data, table);
    3611            4 :     if (rc != DBRES_OK) return rc;
    3612              :     
    3613            4 :     int counter = table_remove(data, table);
    3614            4 :     table_free(table);
    3615              :     
    3616            4 :     if (counter == 0) {
    3617              :         // cleanup database on last table
    3618            2 :         cloudsync_reset_siteid(data);
    3619            2 :         dbutils_settings_cleanup(data);
    3620            2 :     } else {
    3621            2 :         if (database_internal_table_exists(data, CLOUDSYNC_TABLE_SETTINGS_NAME) == true) {
    3622            2 :             cloudsync_update_schema_hash(data);
    3623            2 :         }
    3624              :     }
    3625              :     
    3626            4 :     return DBRES_OK;
    3627            4 : }
    3628              : 
    3629            0 : int cloudsync_cleanup_all (cloudsync_context *data) {
    3630            0 :     return database_cleanup(data);
    3631              : }
    3632              : 
    3633          482 : int cloudsync_terminate (cloudsync_context *data) {
    3634              :     // can't use for/loop here because data->tables_count is changed by table_remove
    3635          737 :     while (data->tables_count > 0) {
    3636          255 :         cloudsync_table_context *t = data->tables[data->tables_count - 1];
    3637          255 :         table_remove(data, t);
    3638          255 :         table_free(t);
    3639              :     }
    3640              :     
    3641          482 :     if (data->schema_version_stmt) databasevm_finalize(data->schema_version_stmt);
    3642          482 :     if (data->data_version_stmt) databasevm_finalize(data->data_version_stmt);
    3643          482 :     if (data->db_version_stmt) databasevm_finalize(data->db_version_stmt);
    3644          482 :     if (data->getset_siteid_stmt) databasevm_finalize(data->getset_siteid_stmt);
    3645          482 :     if (data->current_schema) cloudsync_memory_free(data->current_schema);
    3646              :     
    3647          482 :     data->schema_version_stmt = NULL;
    3648          482 :     data->data_version_stmt = NULL;
    3649          482 :     data->db_version_stmt = NULL;
    3650          482 :     data->getset_siteid_stmt = NULL;
    3651          482 :     data->current_schema = NULL;
    3652              :     
    3653              :     // reset the site_id so the cloudsync_context_init will be executed again
    3654              :     // if any other cloudsync function is called after terminate
    3655          482 :     data->site_id[0] = 0;
    3656              :     
    3657          482 :     return 1;
    3658              : }
    3659              : 
    3660          288 : int cloudsync_init_table (cloudsync_context *data, const char *table_name, const char *algo_name, CLOUDSYNC_INIT_FLAG init_flags) {
    3661              :     // sanity check table and its primary key(s)
    3662          288 :     int rc = cloudsync_table_sanity_check(data, table_name, init_flags);
    3663          288 :     if (rc != DBRES_OK) return rc;
    3664              :     
    3665              :     // init cloudsync_settings
    3666          286 :     if (cloudsync_context_init(data) == NULL) {
    3667            0 :         return cloudsync_set_error(data, "Unable to initialize cloudsync context", DBRES_MISUSE);
    3668              :     }
    3669              :     
    3670              :     // sanity check algo name (if exists)
    3671          286 :     table_algo algo_new = table_algo_none;
    3672          286 :     if (!algo_name) algo_name = CLOUDSYNC_DEFAULT_ALGO;
    3673              :     
    3674          286 :     algo_new = cloudsync_algo_from_name(algo_name);
    3675          286 :     if (algo_new == table_algo_none) {
    3676              :         char buffer[1024];
    3677            1 :         snprintf(buffer, sizeof(buffer), "Unknown CRDT algorithm name %s", algo_name);
    3678            1 :         return cloudsync_set_error(data, buffer, DBRES_ERROR);
    3679              :     }
    3680              : 
    3681              :     // DWS and AWS algorithms are not yet implemented in the merge logic
    3682          285 :     if (algo_new == table_algo_crdt_dws || algo_new == table_algo_crdt_aws) {
    3683              :         char buffer[1024];
    3684            2 :         snprintf(buffer, sizeof(buffer), "CRDT algorithm %s is not yet supported", algo_name);
    3685            2 :         return cloudsync_set_error(data, buffer, DBRES_ERROR);
    3686              :     }
    3687              :     
    3688              :     // check if table name was already augmented
    3689          283 :     table_algo algo_current = dbutils_table_settings_get_algo(data, table_name);
    3690              :     
    3691              :     // sanity check algorithm
    3692          283 :     if ((algo_new == algo_current) && (algo_current != table_algo_none)) {
    3693              :         // if table algorithms and the same and not none, do nothing
    3694          283 :     } else if ((algo_new == table_algo_none) && (algo_current == table_algo_none)) {
    3695              :         // nothing is written into settings because the default table_algo_crdt_cls will be used
    3696            0 :         algo_new = algo_current = table_algo_crdt_cls;
    3697          255 :     } else if ((algo_new == table_algo_none) && (algo_current != table_algo_none)) {
    3698              :         // algo is already written into settins so just use it
    3699            0 :         algo_new = algo_current;
    3700          255 :     } else if ((algo_new != table_algo_none) && (algo_current == table_algo_none)) {
    3701              :         // write table algo name in settings
    3702          255 :         dbutils_table_settings_set_key_value(data, table_name, "*", "algo", algo_name);
    3703          255 :     } else {
    3704              :         // error condition
    3705            0 :         return cloudsync_set_error(data, "The function cloudsync_cleanup(table) must be called before changing a table algorithm", DBRES_MISUSE);
    3706              :     }
    3707              :     
    3708              :     // Run the following function even if table was already augmented.
    3709              :     // It is safe to call the following function multiple times, if there is nothing to update nothing will be changed.
    3710              :     // After an alter table, in contrast, all the cloudsync triggers, tables and stmts must be recreated.
    3711              :     
    3712              :     // sync algo with table (unused in this version)
    3713              :     // cloudsync_sync_table_key(data, table_name, "*", CLOUDSYNC_KEY_ALGO, crdt_algo_name(algo_new));
    3714              :     
    3715              :     // read row-level filter from settings (if any)
    3716              :     char init_filter_buf[2048];
    3717          283 :     int init_frc = dbutils_table_settings_get_value(data, table_name, "*", "filter", init_filter_buf, sizeof(init_filter_buf));
    3718          283 :     const char *init_filter = (init_frc == DBRES_OK && init_filter_buf[0]) ? init_filter_buf : NULL;
    3719              : 
    3720              :     // check triggers
    3721          283 :     rc = database_create_triggers(data, table_name, algo_new, init_filter);
    3722          283 :     if (rc != DBRES_OK) return cloudsync_set_error(data, "An error occurred while creating triggers", DBRES_MISUSE);
    3723              :     
    3724              :     // check meta-table
    3725          283 :     rc = database_create_metatable(data, table_name);
    3726          283 :     if (rc != DBRES_OK) return cloudsync_set_error(data, "An error occurred while creating metatable", DBRES_MISUSE);
    3727              :     
    3728              :     // add prepared statements
    3729          283 :     if (cloudsync_add_dbvms(data) != DBRES_OK) {
    3730            0 :         return cloudsync_set_error(data, "An error occurred while trying to compile prepared SQL statements", DBRES_MISUSE);
    3731              :     }
    3732              :     
    3733              :     // add table to in-memory data context
    3734          283 :     if (table_add_to_context(data, algo_new, table_name) == false) {
    3735              :         char buffer[1024];
    3736            0 :         snprintf(buffer, sizeof(buffer), "An error occurred while adding %s table information to global context", table_name);
    3737            0 :         return cloudsync_set_error(data, buffer, DBRES_MISUSE);
    3738              :     }
    3739              :     
    3740          283 :     if (cloudsync_refill_metatable(data, table_name) != DBRES_OK) {
    3741            0 :         return cloudsync_set_error(data, "An error occurred while trying to fill the augmented table", DBRES_MISUSE);
    3742              :     }
    3743              :         
    3744          283 :     return DBRES_OK;
    3745          288 : }
        

Generated by: LCOV version 2.4-0