LCOV - code coverage report
Current view: top level - src - cloudsync.c (source / functions) Coverage Total Hit
Test: coverage.info Lines: 85.6 % 1982 1696
Test Date: 2026-03-25 16:18:39 Functions: 91.2 % 125 114

            Line data    Source code
       1              : //
       2              : //  cloudsync.c
       3              : //  cloudsync
       4              : //
       5              : //  Created by Marco Bambini on 16/05/24.
       6              : //
       7              : 
       8              : #include <inttypes.h>
       9              : #include <stdbool.h>
      10              : #include <limits.h>
      11              : #include <stdint.h>
      12              : #include <stdlib.h>
      13              : #include <string.h>
      14              : #include <assert.h>
      15              : #include <stdio.h>
      16              : #include <errno.h>
      17              : #include <math.h>
      18              : 
      19              : #include "cloudsync.h"
      20              : #include "lz4.h"
      21              : #include "pk.h"
      22              : #include "sql.h"
      23              : #include "utils.h"
      24              : #include "dbutils.h"
      25              : #include "block.h"
      26              : 
      27              : #ifdef _WIN32
      28              : #include <winsock2.h>
      29              : #include <ws2tcpip.h>
      30              : #else
      31              : #include <arpa/inet.h>                          // for htonl, htons, ntohl, ntohs
      32              : #include <netinet/in.h>                         // for struct sockaddr_in, INADDR_ANY, etc. (if needed)
      33              : #endif
      34              : 
      35              : #ifndef htonll
      36              : #if __BIG_ENDIAN__
      37              : #define htonll(x)                               (x)
      38              : #define ntohll(x)                               (x)
      39              : #else
      40              : #ifndef htobe64
      41              : #define htonll(x)                               ((uint64_t)htonl((x) & 0xFFFFFFFF) << 32 | (uint64_t)htonl((x) >> 32))
      42              : #define ntohll(x)                               ((uint64_t)ntohl((x) & 0xFFFFFFFF) << 32 | (uint64_t)ntohl((x) >> 32))
      43              : #else
      44              : #define htonll(x)                               htobe64(x)
      45              : #define ntohll(x)                               be64toh(x)
      46              : #endif
      47              : #endif
      48              : #endif
      49              : 
      50              : #define CLOUDSYNC_INIT_NTABLES                  64
      51              : #define CLOUDSYNC_MIN_DB_VERSION                0
      52              : 
      53              : #define CLOUDSYNC_PAYLOAD_MINBUF_SIZE                   (512*1024)
      54              : #define CLOUDSYNC_PAYLOAD_SIGNATURE                     0x434C5359  /* 'C','L','S','Y' */
      55              : #define CLOUDSYNC_PAYLOAD_VERSION_ORIGNAL               1
      56              : #define CLOUDSYNC_PAYLOAD_VERSION_1                     CLOUDSYNC_PAYLOAD_VERSION_ORIGNAL
      57              : #define CLOUDSYNC_PAYLOAD_VERSION_2                     2
      58              : #define CLOUDSYNC_PAYLOAD_VERSION_LATEST                CLOUDSYNC_PAYLOAD_VERSION_2
      59              : #define CLOUDSYNC_PAYLOAD_MIN_VERSION_WITH_CHECKSUM     CLOUDSYNC_PAYLOAD_VERSION_2
      60              : 
      61              : #ifndef MAX
      62              : #define MAX(a, b)                               (((a)>(b))?(a):(b))
      63              : #endif
      64              : 
      65              : #define DEBUG_DBERROR(_rc, _fn, _data)   do {if (_rc != DBRES_OK) printf("Error in %s: %s\n", _fn, database_errmsg(_data));} while (0)
      66              : 
      67              : typedef enum {
      68              :     CLOUDSYNC_PK_INDEX_TBL          = 0,
      69              :     CLOUDSYNC_PK_INDEX_PK           = 1,
      70              :     CLOUDSYNC_PK_INDEX_COLNAME      = 2,
      71              :     CLOUDSYNC_PK_INDEX_COLVALUE     = 3,
      72              :     CLOUDSYNC_PK_INDEX_COLVERSION   = 4,
      73              :     CLOUDSYNC_PK_INDEX_DBVERSION    = 5,
      74              :     CLOUDSYNC_PK_INDEX_SITEID       = 6,
      75              :     CLOUDSYNC_PK_INDEX_CL           = 7,
      76              :     CLOUDSYNC_PK_INDEX_SEQ          = 8
      77              : } CLOUDSYNC_PK_INDEX;
      78              : 
      79              : typedef enum {
      80              :     DBVM_VALUE_ERROR      = -1,
      81              :     DBVM_VALUE_UNCHANGED  = 0,
      82              :     DBVM_VALUE_CHANGED    = 1,
      83              : } DBVM_VALUE;
      84              : 
      85              : #define SYNCBIT_SET(_data)                  _data->insync = 1
      86              : #define SYNCBIT_RESET(_data)                _data->insync = 0
      87              : 
      88              : // MARK: - Deferred column-batch merge -
      89              : 
      90              : typedef struct {
      91              :     const char  *col_name;       // pointer into table_context->col_name[idx] (stable)
      92              :     dbvalue_t   *col_value;      // duplicated via database_value_dup (owned)
      93              :     int64_t     col_version;
      94              :     int64_t     db_version;
      95              :     uint8_t     site_id[UUID_LEN];
      96              :     int         site_id_len;
      97              :     int64_t     seq;
      98              : } merge_pending_entry;
      99              : 
     100              : typedef struct {
     101              :     cloudsync_table_context *table;
     102              :     char        *pk;             // malloc'd copy, freed on flush
     103              :     int         pk_len;
     104              :     int64_t     cl;
     105              :     bool        sentinel_pending;
     106              :     bool        row_exists;       // true when the PK already exists locally
     107              :     int         count;
     108              :     int         capacity;
     109              :     merge_pending_entry *entries;
     110              : 
     111              :     // Statement cache — reuse the prepared statement when the column
     112              :     // combination and row_exists flag match between consecutive PK flushes.
     113              :     dbvm_t      *cached_vm;
     114              :     bool        cached_row_exists;
     115              :     int         cached_col_count;
     116              :     const char  **cached_col_names; // array of pointers into table_context (not owned)
     117              : } merge_pending_batch;
     118              : 
     119              : // MARK: -
     120              : 
     121              : struct cloudsync_pk_decode_bind_context {
     122              :     dbvm_t      *vm;
     123              :     char        *tbl;
     124              :     int64_t     tbl_len;
     125              :     const void  *pk;
     126              :     int64_t     pk_len;
     127              :     char        *col_name;
     128              :     int64_t     col_name_len;
     129              :     int64_t     col_version;
     130              :     int64_t     db_version;
     131              :     const void  *site_id;
     132              :     int64_t     site_id_len;
     133              :     int64_t     cl;
     134              :     int64_t     seq;
     135              : };
     136              : 
     137              : struct cloudsync_context {
     138              :     void        *db;
     139              :     char        errmsg[1024];
     140              :     int         errcode;
     141              :     
     142              :     char        *libversion;
     143              :     uint8_t     site_id[UUID_LEN];
     144              :     int         insync;
     145              :     int         debug;
     146              :     bool        merge_equal_values;
     147              :     void        *aux_data;
     148              :     
     149              :     // stmts and context values
     150              :     dbvm_t      *schema_version_stmt;
     151              :     dbvm_t      *data_version_stmt;
     152              :     dbvm_t      *db_version_stmt;
     153              :     dbvm_t      *getset_siteid_stmt;
     154              :     int         data_version;
     155              :     int         schema_version;
     156              :     uint64_t    schema_hash;
     157              :     
     158              :     // set at transaction start and reset on commit/rollback
     159              :     int64_t    db_version;
     160              :     // version the DB would have if the transaction committed now
     161              :     int64_t    pending_db_version;
     162              :     // used to set an order inside each transaction
     163              :     int        seq;
     164              :     
     165              :     // optional schema_name to be set in the cloudsync_table_context
     166              :     char       *current_schema;
     167              :     
     168              :     // augmented tables are stored in-memory so we do not need to retrieve information about
     169              :     // col_names and cid from the disk each time a write statement is performed
     170              :     // we do also not need to use an hash map here because for few tables the direct
     171              :     // in-memory comparison with table name is faster
     172              :     cloudsync_table_context **tables;   // dense vector: [0..tables_count-1] are valid
     173              :     int tables_count;                   // size
     174              :     int tables_cap;                     // capacity
     175              : 
     176              :     int skip_decode_idx;                // -1 in sqlite, col_value index in postgresql
     177              : 
     178              :     // deferred column-batch merge (active during payload_apply)
     179              :     merge_pending_batch *pending_batch;
     180              : };
     181              : 
     182              : struct cloudsync_table_context {
     183              :     table_algo  algo;                           // CRDT algoritm associated to the table
     184              :     char        *name;                          // table name
     185              :     char        *schema;                        // table schema
     186              :     char        *meta_ref;                      // schema-qualified meta table name (e.g. "schema"."name_cloudsync")
     187              :     char        *base_ref;                      // schema-qualified base table name (e.g. "schema"."name")
     188              :     char        **col_name;                     // array of column names
     189              :     dbvm_t      **col_merge_stmt;               // array of merge insert stmt (indexed by col_name)
     190              :     dbvm_t      **col_value_stmt;               // array of column value stmt (indexed by col_name)
     191              :     int         *col_id;                        // array of column id
     192              :     col_algo_t  *col_algo;                      // per-column algorithm (normal or block)
     193              :     char        **col_delimiter;                // per-column delimiter for block splitting (NULL = default "\n")
     194              :     bool        has_block_cols;                 // quick check: does this table have any block columns?
     195              :     dbvm_t      *block_value_read_stmt;         // SELECT col_value FROM blocks table
     196              :     dbvm_t      *block_value_write_stmt;        // INSERT OR REPLACE into blocks table
     197              :     dbvm_t      *block_value_delete_stmt;       // DELETE from blocks table
     198              :     dbvm_t      *block_list_stmt;               // SELECT block entries for materialization
     199              :     char        *blocks_ref;                    // schema-qualified blocks table name
     200              :     int         ncols;                          // number of non primary key cols
     201              :     int         npks;                           // number of primary key cols
     202              :     bool        enabled;                        // flag to check if a table is enabled or disabled
     203              :     #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
     204              :     bool        rowid_only;                     // a table with no primary keys other than the implicit rowid
     205              :     #endif
     206              :     
     207              :     char        **pk_name;                      // array of primary key names
     208              : 
     209              :     // precompiled statements
     210              :     dbvm_t      *meta_pkexists_stmt;            // check if a primary key already exist in the augmented table
     211              :     dbvm_t      *meta_sentinel_update_stmt;     // update a local sentinel row
     212              :     dbvm_t      *meta_sentinel_insert_stmt;     // insert a local sentinel row
     213              :     dbvm_t      *meta_row_insert_update_stmt;   // insert/update a local row
     214              :     dbvm_t      *meta_row_drop_stmt;            // delete rows from meta
     215              :     dbvm_t      *meta_update_move_stmt;         // update rows in meta when pk changes
     216              :     dbvm_t      *meta_local_cl_stmt;            // compute local cl value
     217              :     dbvm_t      *meta_winner_clock_stmt;        // get the rowid of the last inserted/updated row in the meta table
     218              :     dbvm_t      *meta_merge_delete_drop;
     219              :     dbvm_t      *meta_zero_clock_stmt;
     220              :     dbvm_t      *meta_col_version_stmt;
     221              :     dbvm_t      *meta_site_id_stmt;
     222              :     
     223              :     dbvm_t      *real_col_values_stmt;          // retrieve all column values based on pk
     224              :     dbvm_t      *real_merge_delete_stmt;
     225              :     dbvm_t      *real_merge_sentinel_stmt;
     226              :     
     227              :     bool        is_altering;                    // flag to track if a table alteration is in progress
     228              : 
     229              :     // context
     230              :     cloudsync_context *context;
     231              : };
     232              : 
     233              : struct cloudsync_payload_context {
     234              :     char        *buffer;
     235              :     size_t      bsize;
     236              :     size_t      balloc;
     237              :     size_t      bused;
     238              :     uint64_t    nrows;
     239              :     uint16_t    ncols;
     240              : };
     241              : 
     242              : #ifdef _MSC_VER
     243              :     #pragma pack(push, 1) // For MSVC: pack struct with 1-byte alignment
     244              :     #define PACKED
     245              : #else
     246              :     #define PACKED __attribute__((__packed__))
     247              : #endif
     248              : 
     249              : typedef struct PACKED {
     250              :     uint32_t    signature;          // 'CLSY'
     251              :     uint8_t     version;            // protocol version
     252              :     uint8_t     libversion[3];      // major.minor.patch
     253              :     uint32_t    expanded_size;
     254              :     uint16_t    ncols;
     255              :     uint32_t    nrows;
     256              :     uint64_t    schema_hash;
     257              :     uint8_t     checksum[6];        // 48 bits checksum (to ensure struct is 32 bytes)
     258              : } cloudsync_payload_header;
     259              : 
     260              : #ifdef _MSC_VER
     261              :     #pragma pack(pop)
     262              : #endif
     263              : 
     264              : #if CLOUDSYNC_UNITTEST
     265              : bool force_uncompressed_blob = false;
     266              : #define CHECK_FORCE_UNCOMPRESSED_BUFFER()   if (force_uncompressed_blob) use_uncompressed_buffer = true
     267              : #else
     268              : #define CHECK_FORCE_UNCOMPRESSED_BUFFER()
     269              : #endif
     270              : 
     271              : // Internal prototypes
     272              : int local_mark_insert_or_update_meta (cloudsync_table_context *table, const void *pk, size_t pklen, const char *col_name, int64_t db_version, int seq);
     273              : 
     274              : // MARK: - CRDT algos -
     275              : 
     276          552 : table_algo cloudsync_algo_from_name (const char *algo_name) {
     277          552 :     if (algo_name == NULL) return table_algo_none;
     278              :     
     279          552 :     if ((strcasecmp(algo_name, "CausalLengthSet") == 0) || (strcasecmp(algo_name, "cls") == 0)) return table_algo_crdt_cls;
     280          243 :     if ((strcasecmp(algo_name, "GrowOnlySet") == 0) || (strcasecmp(algo_name, "gos") == 0)) return table_algo_crdt_gos;
     281          236 :     if ((strcasecmp(algo_name, "DeleteWinsSet") == 0) || (strcasecmp(algo_name, "dws") == 0)) return table_algo_crdt_dws;
     282          235 :     if ((strcasecmp(algo_name, "AddWinsSet") == 0) || (strcasecmp(algo_name, "aws") == 0)) return table_algo_crdt_aws;
     283              :     
     284              :     // if nothing is found
     285          234 :     return table_algo_none;
     286          552 : }
     287              : 
     288          110 : const char *cloudsync_algo_name (table_algo algo) {
     289          110 :     switch (algo) {
     290          105 :         case table_algo_crdt_cls: return "cls";
     291            1 :         case table_algo_crdt_gos: return "gos";
     292            1 :         case table_algo_crdt_dws: return "dws";
     293            1 :         case table_algo_crdt_aws: return "aws";
     294            1 :         case table_algo_none: return NULL;
     295              :     }
     296            1 :     return NULL;
     297          110 : }
     298              : 
     299              : // MARK: - DBVM Utils -
     300              : 
     301        31645 : DBVM_VALUE dbvm_execute (dbvm_t *stmt, cloudsync_context *data) {
     302        31645 :     if (!stmt) return DBVM_VALUE_ERROR;
     303              : 
     304        31645 :     int rc = databasevm_step(stmt);
     305        31645 :     if (rc != DBRES_ROW && rc != DBRES_DONE) {
     306            2 :         if (data) DEBUG_DBERROR(rc, "stmt_execute", data);
     307            2 :         databasevm_reset(stmt);
     308            2 :         return DBVM_VALUE_ERROR;
     309              :     }
     310              : 
     311        31643 :     DBVM_VALUE result = DBVM_VALUE_CHANGED;
     312        31643 :     if (stmt == data->data_version_stmt) {
     313        29413 :         int version = (int)database_column_int(stmt, 0);
     314        29413 :         if (version != data->data_version) {
     315          203 :             data->data_version = version;
     316          203 :         } else {
     317        29210 :             result = DBVM_VALUE_UNCHANGED;
     318              :         }
     319        31643 :     } else if (stmt == data->schema_version_stmt) {
     320         1115 :         int version = (int)database_column_int(stmt, 0);
     321         1115 :         if (version > data->schema_version) {
     322          332 :             data->schema_version = version;
     323          332 :         } else {
     324          783 :             result = DBVM_VALUE_UNCHANGED;
     325              :         }
     326              :         
     327         2230 :     } else if (stmt == data->db_version_stmt) {
     328         1115 :         data->db_version = (rc == DBRES_DONE) ? CLOUDSYNC_MIN_DB_VERSION : database_column_int(stmt, 0);
     329         1115 :     }
     330              :     
     331        31643 :     databasevm_reset(stmt);
     332        31643 :     return result;
     333        31645 : }
     334              : 
     335         3531 : int dbvm_count (dbvm_t *stmt, const char *value, size_t len, int type) {
     336         3531 :     int result = -1;
     337         3531 :     int rc = DBRES_OK;
     338              :     
     339         3531 :     if (value) {
     340         3530 :         rc = (type == DBTYPE_TEXT) ? databasevm_bind_text(stmt, 1, value, (int)len) : databasevm_bind_blob(stmt, 1, value, len);
     341         3530 :         if (rc != DBRES_OK) goto cleanup;
     342         3530 :     }
     343              : 
     344         3531 :     rc = databasevm_step(stmt);
     345         7062 :     if (rc == DBRES_DONE) {
     346            1 :         result = 0;
     347            1 :         rc = DBRES_OK;
     348         3531 :     } else if (rc == DBRES_ROW) {
     349         3530 :         result = (int)database_column_int(stmt, 0);
     350         3530 :         rc = DBRES_OK;
     351         3530 :     }
     352              :     
     353              : cleanup:
     354         3531 :     databasevm_reset(stmt);
     355         3531 :     return result;
     356              : }
     357              : 
     358       255105 : void dbvm_reset (dbvm_t *stmt) {
     359       255105 :     if (!stmt) return;
     360       232187 :     databasevm_clear_bindings(stmt);
     361       232187 :     databasevm_reset(stmt);
     362       255105 : }
     363              : 
     364              : // MARK: - Database Version -
     365              : 
     366          595 : char *cloudsync_dbversion_build_query (cloudsync_context *data) {
     367              :     // this function must be manually called each time tables changes
     368              :     // because the query plan changes too and it must be re-prepared
     369              :     // unfortunately there is no other way
     370              :     
     371              :     // we need to execute a query like:
     372              :     /*
     373              :      SELECT max(version) as version FROM (
     374              :          SELECT max(db_version) as version FROM "table1_cloudsync"
     375              :          UNION ALL
     376              :          SELECT max(db_version) as version FROM "table2_cloudsync"
     377              :          UNION ALL
     378              :          SELECT max(db_version) as version FROM "table3_cloudsync"
     379              :          UNION
     380              :          SELECT value as version FROM cloudsync_settings WHERE key = 'pre_alter_dbversion'
     381              :      )
     382              :      */
     383              :     
     384              :     // the good news is that the query can be computed in SQLite without the need to do any extra computation from the host language
     385              :     
     386          595 :     char *value = NULL;
     387          595 :     int rc = database_select_text(data, SQL_DBVERSION_BUILD_QUERY, &value);
     388          595 :     return (rc == DBRES_OK) ? value : NULL;
     389              : }
     390              : 
     391          798 : int cloudsync_dbversion_rebuild (cloudsync_context *data) {
     392          798 :     if (data->db_version_stmt) {
     393          392 :         databasevm_finalize(data->db_version_stmt);
     394          392 :         data->db_version_stmt = NULL;
     395          392 :     }
     396              :     
     397          798 :     int64_t count = dbutils_table_settings_count_tables(data);
     398          798 :     if (count == 0) return DBRES_OK;
     399          595 :     else if (count == -1) return cloudsync_set_dberror(data);
     400              :     
     401          595 :     char *sql = cloudsync_dbversion_build_query(data);
     402          595 :     if (!sql) return DBRES_NOMEM;
     403              :     DEBUG_SQL("db_version_stmt: %s", sql);
     404              :     
     405          595 :     int rc = databasevm_prepare(data, sql, (void **)&data->db_version_stmt, DBFLAG_PERSISTENT);
     406              :     DEBUG_STMT("db_version_stmt %p", data->db_version_stmt);
     407          595 :     cloudsync_memory_free(sql);
     408          595 :     return rc;
     409          798 : }
     410              : 
     411         1115 : int cloudsync_dbversion_rerun (cloudsync_context *data) {
     412         1115 :     DBVM_VALUE schema_changed = dbvm_execute(data->schema_version_stmt, data);
     413         1115 :     if (schema_changed == DBVM_VALUE_ERROR) return -1;
     414              : 
     415         1115 :     if (schema_changed == DBVM_VALUE_CHANGED) {
     416          332 :         int rc = cloudsync_dbversion_rebuild(data);
     417          332 :         if (rc != DBRES_OK) return -1;
     418          332 :     }
     419              : 
     420         1115 :     if (!data->db_version_stmt) {
     421            0 :         data->db_version = CLOUDSYNC_MIN_DB_VERSION;
     422            0 :         return 0;
     423              :     }
     424              : 
     425         1115 :     DBVM_VALUE rc = dbvm_execute(data->db_version_stmt, data);
     426         1115 :     if (rc == DBVM_VALUE_ERROR) return -1;
     427         1115 :     return 0;
     428         1115 : }
     429              : 
     430        29413 : int cloudsync_dbversion_check_uptodate (cloudsync_context *data) {
     431              :     // perform a PRAGMA data_version to check if some other process write any data
     432        29413 :     DBVM_VALUE rc = dbvm_execute(data->data_version_stmt, data);
     433        29413 :     if (rc == DBVM_VALUE_ERROR) return -1;
     434              :     
     435              :     // db_version is already set and there is no need to update it
     436        29413 :     if (data->db_version != CLOUDSYNC_VALUE_NOTSET && rc == DBVM_VALUE_UNCHANGED) return 0;
     437              :     
     438         1115 :     return cloudsync_dbversion_rerun(data);
     439        29413 : }
     440              : 
     441        29389 : int64_t cloudsync_dbversion_next (cloudsync_context *data, int64_t merging_version) {
     442        29389 :     int rc = cloudsync_dbversion_check_uptodate(data);
     443        29389 :     if (rc != DBRES_OK) return -1;
     444              :     
     445        29389 :     int64_t result = data->db_version + 1;
     446        29389 :     if (result < data->pending_db_version) result = data->pending_db_version;
     447        29389 :     if (merging_version != CLOUDSYNC_VALUE_NOTSET && result < merging_version) result = merging_version;
     448        29389 :     data->pending_db_version = result;
     449              :     
     450        29389 :     return result;
     451        29389 : }
     452              : 
     453              : // MARK: - PK Context -
     454              : 
     455            0 : char *cloudsync_pk_context_tbl (cloudsync_pk_decode_bind_context *ctx, int64_t *tbl_len) {
     456            0 :     *tbl_len = ctx->tbl_len;
     457            0 :     return ctx->tbl;
     458              : }
     459              : 
     460            0 : void *cloudsync_pk_context_pk (cloudsync_pk_decode_bind_context *ctx, int64_t *pk_len) {
     461            0 :     *pk_len = ctx->pk_len;
     462            0 :     return (void *)ctx->pk;
     463              : }
     464              : 
     465            0 : char *cloudsync_pk_context_colname (cloudsync_pk_decode_bind_context *ctx, int64_t *colname_len) {
     466            0 :     *colname_len = ctx->col_name_len;
     467            0 :     return ctx->col_name;
     468              : }
     469              : 
     470            0 : int64_t cloudsync_pk_context_cl (cloudsync_pk_decode_bind_context *ctx) {
     471            0 :     return ctx->cl;
     472              : }
     473              : 
     474            0 : int64_t cloudsync_pk_context_dbversion (cloudsync_pk_decode_bind_context *ctx) {
     475            0 :     return ctx->db_version;
     476              : }
     477              : 
     478              : // MARK: - CloudSync Context -
     479              : 
     480        12609 : int cloudsync_insync (cloudsync_context *data) {
     481        12609 :     return data->insync;
     482              : }
     483              : 
     484          764 : void *cloudsync_siteid (cloudsync_context *data) {
     485          764 :     return (void *)data->site_id;
     486              : }
     487              : 
     488            1 : void cloudsync_reset_siteid (cloudsync_context *data) {
     489            1 :     memset(data->site_id, 0, sizeof(uint8_t) * UUID_LEN);
     490            1 : }
     491              : 
     492          205 : int cloudsync_load_siteid (cloudsync_context *data) {
     493              :     // check if site_id was already loaded
     494          205 :     if (data->site_id[0] != 0) return DBRES_OK;
     495              :     
     496              :     // load site_id
     497          203 :     char *buffer = NULL;
     498          203 :     int64_t size = 0;
     499          203 :     int rc = database_select_blob(data, SQL_SITEID_SELECT_ROWID0, &buffer, &size);
     500          203 :     if (rc != DBRES_OK) return rc;
     501          203 :     if (!buffer || size != UUID_LEN) {
     502            0 :         if (buffer) cloudsync_memory_free(buffer);
     503            0 :         return cloudsync_set_error(data, "Unable to retrieve siteid", DBRES_MISUSE);
     504              :     }
     505              :     
     506          203 :     memcpy(data->site_id, buffer, UUID_LEN);
     507          203 :     cloudsync_memory_free(buffer);
     508              :     
     509          203 :     return DBRES_OK;
     510          205 : }
     511              : 
     512            1 : int64_t cloudsync_dbversion (cloudsync_context *data) {
     513            1 :     return data->db_version;
     514              : }
     515              : 
     516        11977 : int cloudsync_bumpseq (cloudsync_context *data) {
     517        11977 :     int value = data->seq;
     518        11977 :     data->seq += 1;
     519        11977 :     return value;
     520              : }
     521              : 
     522          263 : void cloudsync_update_schema_hash (cloudsync_context *data) {
     523          263 :     database_update_schema_hash(data, &data->schema_hash);
     524          263 : }
     525              : 
     526        59466 : void *cloudsync_db (cloudsync_context *data) {
     527        59466 :     return data->db;
     528              : }
     529              : 
     530          466 : int cloudsync_add_dbvms (cloudsync_context *data) {
     531              :     DEBUG_DBFUNCTION("cloudsync_add_stmts");
     532              :     
     533          466 :     if (data->data_version_stmt == NULL) {
     534          203 :         int rc = databasevm_prepare(data, SQL_DATA_VERSION, (void **)&data->data_version_stmt, DBFLAG_PERSISTENT);
     535              :         DEBUG_STMT("data_version_stmt %p", data->data_version_stmt);
     536          203 :         if (rc != DBRES_OK) return rc;
     537              :         DEBUG_SQL("data_version_stmt: %s", SQL_DATA_VERSION);
     538          203 :     }
     539              :     
     540          466 :     if (data->schema_version_stmt == NULL) {
     541          203 :         int rc = databasevm_prepare(data, SQL_SCHEMA_VERSION, (void **)&data->schema_version_stmt, DBFLAG_PERSISTENT);
     542              :         DEBUG_STMT("schema_version_stmt %p", data->schema_version_stmt);
     543          203 :         if (rc != DBRES_OK) return rc;
     544              :         DEBUG_SQL("schema_version_stmt: %s", SQL_SCHEMA_VERSION);
     545          203 :     }
     546              :     
     547          466 :     if (data->getset_siteid_stmt == NULL) {
     548              :         // get and set index of the site_id
     549              :         // in SQLite, we can’t directly combine an INSERT and a SELECT to both insert a row and return an identifier (rowid) in a single statement,
     550              :         // however, we can use a workaround by leveraging the INSERT statement with ON CONFLICT DO UPDATE and then combining it with RETURNING rowid
     551          203 :         int rc = databasevm_prepare(data, SQL_SITEID_GETSET_ROWID_BY_SITEID, (void **)&data->getset_siteid_stmt, DBFLAG_PERSISTENT);
     552              :         DEBUG_STMT("getset_siteid_stmt %p", data->getset_siteid_stmt);
     553          203 :         if (rc != DBRES_OK) return rc;
     554              :         DEBUG_SQL("getset_siteid_stmt: %s", SQL_SITEID_GETSET_ROWID_BY_SITEID);
     555          203 :     }
     556              :     
     557          466 :     return cloudsync_dbversion_rebuild(data);
     558          466 : }
     559              : 
     560           21 : int cloudsync_set_error (cloudsync_context *data, const char *err_user, int err_code) {
     561              :     // force err_code to be something different than OK
     562           21 :     if (err_code == DBRES_OK) err_code = database_errcode(data);
     563           21 :     if (err_code == DBRES_OK) err_code = DBRES_ERROR;
     564              :     
     565              :     // compute a meaningful error message
     566           21 :     if (err_user == NULL) {
     567            5 :         snprintf(data->errmsg, sizeof(data->errmsg), "%s", database_errmsg(data));
     568            5 :     } else {
     569           16 :         const char *db_error = database_errmsg(data);
     570              :         char db_error_copy[sizeof(data->errmsg)];
     571           16 :         int rc = database_errcode(data);
     572           16 :         if (rc == DBRES_OK) {
     573           16 :             snprintf(data->errmsg, sizeof(data->errmsg), "%s", err_user);
     574           16 :         } else {
     575            0 :             if (db_error == data->errmsg) {
     576            0 :                 snprintf(db_error_copy, sizeof(db_error_copy), "%s", db_error);
     577            0 :                 db_error = db_error_copy;
     578            0 :             }
     579            0 :             snprintf(data->errmsg, sizeof(data->errmsg), "%s (%s)", err_user, db_error);
     580              :         }
     581              :     }
     582              :     
     583           21 :     data->errcode = err_code;
     584           21 :     return err_code;
     585              : }
     586              : 
     587            5 : int cloudsync_set_dberror (cloudsync_context *data) {
     588            5 :     return cloudsync_set_error(data, NULL, DBRES_OK);
     589              : }
     590              : 
     591           12 : const char *cloudsync_errmsg (cloudsync_context *data) {
     592           12 :     return data->errmsg;
     593              : }
     594              : 
     595            3 : int cloudsync_errcode (cloudsync_context *data) {
     596            3 :     return data->errcode;
     597              : }
     598              : 
     599            2 : void cloudsync_reset_error (cloudsync_context *data) {
     600            2 :     data->errmsg[0] = 0;
     601            2 :     data->errcode = DBRES_OK;
     602            2 : }
     603              : 
     604            3 : void *cloudsync_auxdata (cloudsync_context *data) {
     605            3 :     return data->aux_data;
     606              : }
     607              : 
     608            2 : void cloudsync_set_auxdata (cloudsync_context *data, void *xdata) {
     609            2 :     data->aux_data = xdata;
     610            2 : }
     611              : 
     612            6 : void cloudsync_set_schema (cloudsync_context *data, const char *schema) {
     613            6 :     if (data->current_schema && schema && strcmp(data->current_schema, schema) == 0) return;
     614            5 :     if (data->current_schema) cloudsync_memory_free(data->current_schema);
     615            5 :     data->current_schema = NULL;
     616            5 :     if (schema) data->current_schema = cloudsync_string_dup_lowercase(schema);
     617            6 : }
     618              : 
     619         1169 : const char *cloudsync_schema (cloudsync_context *data) {
     620         1169 :     return data->current_schema;
     621              : }
     622              : 
     623            4 : const char *cloudsync_table_schema (cloudsync_context *data, const char *table_name) {
     624            4 :     cloudsync_table_context *table = table_lookup(data, table_name);
     625            4 :     if (!table) return NULL;
     626              : 
     627            1 :     return table->schema;
     628            4 : }
     629              : 
     630              : // MARK: - Table Utils -
     631              : 
     632           69 : void table_pknames_free (char **names, int nrows) {
     633           69 :     if (!names) return;
     634          132 :     for (int i = 0; i < nrows; ++i) {cloudsync_memory_free(names[i]);}
     635           46 :     cloudsync_memory_free(names);
     636           69 : }
     637              : 
     638          258 : char *table_build_mergedelete_sql (cloudsync_table_context *table) {
     639              :     #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
     640              :     if (table->rowid_only) {
     641              :         char *sql = memory_mprintf(SQL_DELETE_ROW_BY_ROWID, table->name);
     642              :         return sql;
     643              :     }
     644              :     #endif
     645              : 
     646          258 :     return sql_build_delete_by_pk(table->context, table->name, table->schema);
     647              : }
     648              : 
     649         1298 : char *table_build_mergeinsert_sql (cloudsync_table_context *table, const char *colname) {
     650         1298 :     char *sql = NULL;
     651              :     
     652              :     #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
     653              :     if (table->rowid_only) {
     654              :         if (colname == NULL) {
     655              :             // INSERT OR IGNORE INTO customers (first_name,last_name) VALUES (?,?);
     656              :             sql = memory_mprintf(SQL_INSERT_ROWID_IGNORE, table->name);
     657              :         } else {
     658              :             // INSERT INTO customers (first_name,last_name,age) VALUES (?,?,?) ON CONFLICT DO UPDATE SET age=?;
     659              :             sql = memory_mprintf(SQL_UPSERT_ROWID_AND_COL_BY_ROWID, table->name, colname, colname);
     660              :         }
     661              :         return sql;
     662              :     }
     663              :     #endif
     664              :     
     665         1298 :     if (colname == NULL) {
     666              :         // is sentinel insert
     667          258 :         sql = sql_build_insert_pk_ignore(table->context, table->name, table->schema);
     668          258 :     } else {
     669         1040 :         sql = sql_build_upsert_pk_and_col(table->context, table->name, colname, table->schema);
     670              :     }
     671         1298 :     return sql;
     672              : }
     673              : 
     674         1040 : char *table_build_value_sql (cloudsync_table_context *table, const char *colname) {
     675              :     #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
     676              :     if (table->rowid_only) {
     677              :         char *colnamequote = "\"";
     678              :         char *sql = memory_mprintf(SQL_SELECT_COLS_BY_ROWID_FMT, colnamequote, colname, colnamequote, table->name);
     679              :         return sql;
     680              :     }
     681              :     #endif
     682              :         
     683              :     // SELECT age FROM customers WHERE first_name=? AND last_name=?;
     684         1040 :     return sql_build_select_cols_by_pk(table->context, table->name, colname, table->schema);
     685              : }
     686              :     
     687          258 : cloudsync_table_context *table_create (cloudsync_context *data, const char *name, table_algo algo) {
     688              :     DEBUG_DBFUNCTION("table_create %s", name);
     689              :     
     690          258 :     cloudsync_table_context *table = (cloudsync_table_context *)cloudsync_memory_zeroalloc(sizeof(cloudsync_table_context));
     691          258 :     if (!table) return NULL;
     692              :     
     693          258 :     table->context = data;
     694          258 :     table->algo = algo;
     695          258 :     table->name = cloudsync_string_dup_lowercase(name);
     696              : 
     697              :     // Detect schema from metadata table location. If metadata table doesn't
     698              :     // exist yet (during initialization), fall back to cloudsync_schema() which
     699              :     // returns the explicitly set schema or current_schema().
     700          258 :     table->schema = database_table_schema(name);
     701          258 :     if (!table->schema) {
     702          258 :         const char *fallback_schema = cloudsync_schema(data);
     703          258 :         if (fallback_schema) {
     704            0 :             table->schema = cloudsync_string_dup(fallback_schema);
     705            0 :         }
     706          258 :     }
     707              : 
     708          258 :     if (!table->name) {
     709            0 :         cloudsync_memory_free(table);
     710            0 :         return NULL;
     711              :     }
     712          258 :     table->meta_ref = database_build_meta_ref(table->schema, table->name);
     713          258 :     table->base_ref = database_build_base_ref(table->schema, table->name);
     714          258 :     table->enabled = true;
     715              :         
     716          258 :     return table;
     717          258 : }
     718              : 
     719          258 : void table_free (cloudsync_table_context *table) {
     720              :     DEBUG_DBFUNCTION("table_free %s", (table) ? (table->name) : "NULL");
     721          258 :     if (!table) return;
     722              :     
     723          258 :     if (table->ncols > 0) {
     724          219 :         if (table->col_name) {
     725         1259 :             for (int i=0; i<table->ncols; ++i) {
     726         1040 :                 cloudsync_memory_free(table->col_name[i]);
     727         1040 :             }
     728          219 :             cloudsync_memory_free(table->col_name);
     729          219 :         }
     730          219 :         if (table->col_merge_stmt) {
     731         1259 :             for (int i=0; i<table->ncols; ++i) {
     732         1040 :                 databasevm_finalize(table->col_merge_stmt[i]);
     733         1040 :             }
     734          219 :             cloudsync_memory_free(table->col_merge_stmt);
     735          219 :         }
     736          219 :         if (table->col_value_stmt) {
     737         1259 :             for (int i=0; i<table->ncols; ++i) {
     738         1040 :                 databasevm_finalize(table->col_value_stmt[i]);
     739         1040 :             }
     740          219 :             cloudsync_memory_free(table->col_value_stmt);
     741          219 :         }
     742          219 :         if (table->col_id) {
     743          219 :             cloudsync_memory_free(table->col_id);
     744          219 :         }
     745          219 :         if (table->col_algo) {
     746          219 :             cloudsync_memory_free(table->col_algo);
     747          219 :         }
     748          219 :         if (table->col_delimiter) {
     749         1259 :             for (int i=0; i<table->ncols; ++i) {
     750         1040 :                 if (table->col_delimiter[i]) cloudsync_memory_free(table->col_delimiter[i]);
     751         1040 :             }
     752          219 :             cloudsync_memory_free(table->col_delimiter);
     753          219 :         }
     754          219 :     }
     755              : 
     756          258 :     if (table->block_value_read_stmt) databasevm_finalize(table->block_value_read_stmt);
     757          258 :     if (table->block_value_write_stmt) databasevm_finalize(table->block_value_write_stmt);
     758          258 :     if (table->block_value_delete_stmt) databasevm_finalize(table->block_value_delete_stmt);
     759          258 :     if (table->block_list_stmt) databasevm_finalize(table->block_list_stmt);
     760          258 :     if (table->blocks_ref) cloudsync_memory_free(table->blocks_ref);
     761              : 
     762          258 :     if (table->name) cloudsync_memory_free(table->name);
     763          258 :     if (table->schema) cloudsync_memory_free(table->schema);
     764          258 :     if (table->meta_ref) cloudsync_memory_free(table->meta_ref);
     765          258 :     if (table->base_ref) cloudsync_memory_free(table->base_ref);
     766          258 :     if (table->pk_name) table_pknames_free(table->pk_name, table->npks);
     767          258 :     if (table->meta_pkexists_stmt) databasevm_finalize(table->meta_pkexists_stmt);
     768          258 :     if (table->meta_sentinel_update_stmt) databasevm_finalize(table->meta_sentinel_update_stmt);
     769          258 :     if (table->meta_sentinel_insert_stmt) databasevm_finalize(table->meta_sentinel_insert_stmt);
     770          258 :     if (table->meta_row_insert_update_stmt) databasevm_finalize(table->meta_row_insert_update_stmt);
     771          258 :     if (table->meta_row_drop_stmt) databasevm_finalize(table->meta_row_drop_stmt);
     772          258 :     if (table->meta_update_move_stmt) databasevm_finalize(table->meta_update_move_stmt);
     773          258 :     if (table->meta_local_cl_stmt) databasevm_finalize(table->meta_local_cl_stmt);
     774          258 :     if (table->meta_winner_clock_stmt) databasevm_finalize(table->meta_winner_clock_stmt);
     775          258 :     if (table->meta_merge_delete_drop) databasevm_finalize(table->meta_merge_delete_drop);
     776          258 :     if (table->meta_zero_clock_stmt) databasevm_finalize(table->meta_zero_clock_stmt);
     777          258 :     if (table->meta_col_version_stmt) databasevm_finalize(table->meta_col_version_stmt);
     778          258 :     if (table->meta_site_id_stmt) databasevm_finalize(table->meta_site_id_stmt);
     779              :     
     780          258 :     if (table->real_col_values_stmt) databasevm_finalize(table->real_col_values_stmt);
     781          258 :     if (table->real_merge_delete_stmt) databasevm_finalize(table->real_merge_delete_stmt);
     782          258 :     if (table->real_merge_sentinel_stmt) databasevm_finalize(table->real_merge_sentinel_stmt);
     783              :     
     784          258 :     cloudsync_memory_free(table);
     785          258 : }
     786              : 
     787          258 : int table_add_stmts (cloudsync_table_context *table, int ncols) {
     788          258 :     int rc = DBRES_OK;
     789          258 :     char *sql = NULL;
     790          258 :     cloudsync_context *data = table->context;
     791              :     
     792              :     // META TABLE statements
     793              :     
     794              :     // CREATE TABLE IF NOT EXISTS \"%w_cloudsync\" (pk BLOB NOT NULL, col_name TEXT NOT NULL, col_version INTEGER, db_version INTEGER, site_id INTEGER DEFAULT 0, seq INTEGER, PRIMARY KEY (pk, col_name));
     795              :     
     796              :     // precompile the pk exists statement
     797              :     // we do not need an index on the pk column because it is already covered by the fact that it is part of the prikeys
     798              :     // EXPLAIN QUERY PLAN reports: SEARCH table_name USING PRIMARY KEY (pk=?)
     799          258 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_ROW_EXISTS_BY_PK, table->meta_ref);
     800          258 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     801              :     DEBUG_SQL("meta_pkexists_stmt: %s", sql);
     802              : 
     803          258 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_pkexists_stmt, DBFLAG_PERSISTENT);
     804          258 :     cloudsync_memory_free(sql);
     805          258 :     if (rc != DBRES_OK) goto cleanup;
     806              : 
     807              :     // precompile the update local sentinel statement
     808          258 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_UPDATE_COL_BUMP_VERSION, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
     809          258 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     810              :     DEBUG_SQL("meta_sentinel_update_stmt: %s", sql);
     811              :     
     812          258 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_sentinel_update_stmt, DBFLAG_PERSISTENT);
     813          258 :     cloudsync_memory_free(sql);
     814          258 :     if (rc != DBRES_OK) goto cleanup;
     815              :     
     816              :     // precompile the insert local sentinel statement
     817          258 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_UPSERT_COL_INIT_OR_BUMP_VERSION, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE, table->meta_ref, table->meta_ref, table->meta_ref);
     818          258 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     819              :     DEBUG_SQL("meta_sentinel_insert_stmt: %s", sql);
     820              :     
     821          258 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_sentinel_insert_stmt, DBFLAG_PERSISTENT);
     822          258 :     cloudsync_memory_free(sql);
     823          258 :     if (rc != DBRES_OK) goto cleanup;
     824              : 
     825              :     // precompile the insert/update local row statement
     826          258 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_UPSERT_RAW_COLVERSION, table->meta_ref, table->meta_ref);
     827          258 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     828              :     DEBUG_SQL("meta_row_insert_update_stmt: %s", sql);
     829              :     
     830          258 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_row_insert_update_stmt, DBFLAG_PERSISTENT);
     831          258 :     cloudsync_memory_free(sql);
     832          258 :     if (rc != DBRES_OK) goto cleanup;
     833              :     
     834              :     // precompile the delete rows from meta
     835          258 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_DELETE_PK_EXCEPT_COL, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
     836          258 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     837              :     DEBUG_SQL("meta_row_drop_stmt: %s", sql);
     838              : 
     839          258 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_row_drop_stmt, DBFLAG_PERSISTENT);
     840          258 :     cloudsync_memory_free(sql);
     841          258 :     if (rc != DBRES_OK) goto cleanup;
     842              :     
     843              :     // precompile the update rows from meta when pk changes
     844              :     // see https://github.com/sqliteai/sqlite-sync/blob/main/docs/PriKey.md for more details
     845          258 :     sql = sql_build_rekey_pk_and_reset_version_except_col(data, table->name, CLOUDSYNC_TOMBSTONE_VALUE);
     846          258 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     847              :     DEBUG_SQL("meta_update_move_stmt: %s", sql);
     848              :     
     849          258 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_update_move_stmt, DBFLAG_PERSISTENT);
     850          258 :     cloudsync_memory_free(sql);
     851          258 :     if (rc != DBRES_OK) goto cleanup;
     852              :     
     853              :     // local cl
     854          258 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_GET_COL_VERSION_OR_ROW_EXISTS, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE, table->meta_ref);
     855          258 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     856              :     DEBUG_SQL("meta_local_cl_stmt: %s", sql);
     857              :     
     858          258 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_local_cl_stmt, DBFLAG_PERSISTENT);
     859          258 :     cloudsync_memory_free(sql);
     860          258 :     if (rc != DBRES_OK) goto cleanup;
     861              :     
     862              :     // rowid of the last inserted/updated row in the meta table
     863          258 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_INSERT_RETURN_CHANGE_ID, table->meta_ref);
     864          258 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     865              :     DEBUG_SQL("meta_winner_clock_stmt: %s", sql);
     866              :     
     867          258 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_winner_clock_stmt, DBFLAG_PERSISTENT);
     868          258 :     cloudsync_memory_free(sql);
     869          258 :     if (rc != DBRES_OK) goto cleanup;
     870              :     
     871          258 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_DELETE_PK_EXCEPT_COL, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
     872          258 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     873              :     DEBUG_SQL("meta_merge_delete_drop: %s", sql);
     874              : 
     875          258 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_merge_delete_drop, DBFLAG_PERSISTENT);
     876          258 :     cloudsync_memory_free(sql);
     877          258 :     if (rc != DBRES_OK) goto cleanup;
     878              :     
     879              :     // zero clock
     880          258 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_TOMBSTONE_PK_EXCEPT_COL, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
     881          258 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     882              :     DEBUG_SQL("meta_zero_clock_stmt: %s", sql);
     883              :     
     884          258 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_zero_clock_stmt, DBFLAG_PERSISTENT);
     885          258 :     cloudsync_memory_free(sql);
     886          258 :     if (rc != DBRES_OK) goto cleanup;
     887              :     
     888              :     // col_version
     889          258 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_COL_VERSION_BY_PK_COL, table->meta_ref);
     890          258 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     891              :     DEBUG_SQL("meta_col_version_stmt: %s", sql);
     892              :     
     893          258 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_col_version_stmt, DBFLAG_PERSISTENT);
     894          258 :     cloudsync_memory_free(sql);
     895          258 :     if (rc != DBRES_OK) goto cleanup;
     896              :     
     897              :     // site_id
     898          258 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_SITE_ID_BY_PK_COL, table->meta_ref);
     899          258 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     900              :     DEBUG_SQL("meta_site_id_stmt: %s", sql);
     901              :     
     902          258 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_site_id_stmt, DBFLAG_PERSISTENT);
     903          258 :     cloudsync_memory_free(sql);
     904          258 :     if (rc != DBRES_OK) goto cleanup;
     905              :     
     906              :     // REAL TABLE statements
     907              : 
     908              :     // precompile the get column value statement
     909          258 :     if (ncols > 0) {
     910          219 :         sql = sql_build_select_nonpk_by_pk(data, table->name, table->schema);
     911          219 :         if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     912              :         DEBUG_SQL("real_col_values_stmt: %s", sql);
     913              :         
     914          219 :         rc = databasevm_prepare(data, sql, (void **)&table->real_col_values_stmt, DBFLAG_PERSISTENT);
     915          219 :         cloudsync_memory_free(sql);
     916          219 :         if (rc != DBRES_OK) goto cleanup;
     917          219 :     }
     918              :     
     919          258 :     sql = table_build_mergedelete_sql(table);
     920          258 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     921              :     DEBUG_SQL("real_merge_delete: %s", sql);
     922              :     
     923          258 :     rc = databasevm_prepare(data, sql, (void **)&table->real_merge_delete_stmt, DBFLAG_PERSISTENT);
     924          258 :     cloudsync_memory_free(sql);
     925          258 :     if (rc != DBRES_OK) goto cleanup;
     926              :     
     927          258 :     sql = table_build_mergeinsert_sql(table, NULL);
     928          258 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     929              :     DEBUG_SQL("real_merge_sentinel: %s", sql);
     930              :     
     931          258 :     rc = databasevm_prepare(data, sql, (void **)&table->real_merge_sentinel_stmt, DBFLAG_PERSISTENT);
     932          258 :     cloudsync_memory_free(sql);
     933          258 :     if (rc != DBRES_OK) goto cleanup;
     934              :     
     935              : cleanup:
     936          258 :     if (rc != DBRES_OK) DEBUG_ALWAYS("table_add_stmts error: %d %s\n", rc, database_errmsg(data));
     937          258 :     return rc;
     938              : }
     939              : 
     940       183643 : cloudsync_table_context *table_lookup (cloudsync_context *data, const char *table_name) {
     941              :     DEBUG_DBFUNCTION("table_lookup %s", table_name);
     942              :     
     943       183643 :     if (table_name) {
     944       185247 :         for (int i=0; i<data->tables_count; ++i) {
     945       184717 :             if ((strcasecmp(data->tables[i]->name, table_name) == 0)) return data->tables[i];
     946         1604 :         }
     947          530 :     }
     948              :     
     949          530 :     return NULL;
     950       183643 : }
     951              : 
     952       169057 : void *table_column_lookup (cloudsync_table_context *table, const char *col_name, bool is_merge, int *index) {
     953              :     DEBUG_DBFUNCTION("table_column_lookup %s", col_name);
     954              :     
     955       315065 :     for (int i=0; i<table->ncols; ++i) {
     956       315065 :         if (strcasecmp(table->col_name[i], col_name) == 0) {
     957       169057 :             if (index) *index = i;
     958       169057 :             return (is_merge) ? table->col_merge_stmt[i] : table->col_value_stmt[i];
     959              :         }
     960       146008 :     }
     961              :     
     962            0 :     if (index) *index = -1;
     963            0 :     return NULL;
     964       169057 : }
     965              : 
     966          258 : int table_remove (cloudsync_context *data, cloudsync_table_context *table) {
     967          258 :     const char *table_name = table->name;
     968              :     DEBUG_DBFUNCTION("table_remove %s", table_name);
     969              :     
     970          306 :     for (int i = 0; i < data->tables_count; ++i) {
     971          306 :         cloudsync_table_context *t = data->tables[i];
     972              :         
     973              :         // pointer compare is fastest but fallback to strcasecmp if not same pointer
     974          306 :         if ((t == table) || ((strcasecmp(t->name, table_name) == 0))) {
     975          258 :             int last = data->tables_count - 1;
     976          258 :             data->tables[i] = data->tables[last];   // move last into the hole (keeps array dense)
     977          258 :             data->tables[last] = NULL;              // NULLify tail (as an extra security measure)
     978          258 :             data->tables_count--;
     979          258 :             return data->tables_count;
     980              :         }
     981           48 :     }
     982              :     
     983            0 :     return -1;
     984          258 : }
     985              : 
     986         1040 : int table_add_to_context_cb (void *xdata, int ncols, char **values, char **names) {
     987         1040 :     cloudsync_table_context *table = (cloudsync_table_context *)xdata;
     988         1040 :     cloudsync_context *data = table->context;
     989              : 
     990         1040 :     int index = table->ncols;
     991         2080 :     for (int i=0; i<ncols; i+=2) {
     992         1040 :         const char *name = values[i];
     993         1040 :         int cid = (int)strtol(values[i+1], NULL, 0);
     994              : 
     995         1040 :         table->col_id[index] = cid;
     996         1040 :         table->col_name[index] = cloudsync_string_dup_lowercase(name);
     997         1040 :         if (!table->col_name[index]) goto error;
     998              : 
     999         1040 :         char *sql = table_build_mergeinsert_sql(table, name);
    1000         1040 :         if (!sql) goto error;
    1001              :         DEBUG_SQL("col_merge_stmt[%d]: %s", index, sql);
    1002              : 
    1003         1040 :         int rc = databasevm_prepare(data, sql, (void **)&table->col_merge_stmt[index], DBFLAG_PERSISTENT);
    1004         1040 :         cloudsync_memory_free(sql);
    1005         1040 :         if (rc != DBRES_OK) goto error;
    1006         1040 :         if (!table->col_merge_stmt[index]) goto error;
    1007              : 
    1008         1040 :         sql = table_build_value_sql(table, name);
    1009         1040 :         if (!sql) goto error;
    1010              :         DEBUG_SQL("col_value_stmt[%d]: %s", index, sql);
    1011              : 
    1012         1040 :         rc = databasevm_prepare(data, sql, (void **)&table->col_value_stmt[index], DBFLAG_PERSISTENT);
    1013         1040 :         cloudsync_memory_free(sql);
    1014         1040 :         if (rc != DBRES_OK) goto error;
    1015         1040 :         if (!table->col_value_stmt[index]) goto error;
    1016         1040 :     }
    1017         1040 :     table->ncols += 1;
    1018              : 
    1019         1040 :     return 0;
    1020              : 
    1021              : error:
    1022              :     // clean up partially-initialized entry at index
    1023            0 :     if (table->col_name[index]) {cloudsync_memory_free(table->col_name[index]); table->col_name[index] = NULL;}
    1024            0 :     if (table->col_merge_stmt[index]) {databasevm_finalize(table->col_merge_stmt[index]); table->col_merge_stmt[index] = NULL;}
    1025            0 :     if (table->col_value_stmt[index]) {databasevm_finalize(table->col_value_stmt[index]); table->col_value_stmt[index] = NULL;}
    1026            0 :     return 1;
    1027         1040 : }
    1028              : 
    1029          258 : bool table_ensure_capacity (cloudsync_context *data) {
    1030          258 :     if (data->tables_count < data->tables_cap) return true;
    1031              :     
    1032            0 :     int new_cap = data->tables_cap ? data->tables_cap * 2 : CLOUDSYNC_INIT_NTABLES;
    1033            0 :     size_t bytes = (size_t)new_cap * sizeof(*data->tables);
    1034            0 :     void *p = cloudsync_memory_realloc(data->tables, bytes);
    1035            0 :     if (!p) return false;
    1036              :     
    1037            0 :     data->tables = (cloudsync_table_context **)p;
    1038            0 :     data->tables_cap = new_cap;
    1039            0 :     return true;
    1040          258 : }
    1041              : 
    1042          263 : bool table_add_to_context (cloudsync_context *data, table_algo algo, const char *table_name) {
    1043              :     DEBUG_DBFUNCTION("cloudsync_context_add_table %s", table_name);
    1044              : 
    1045              :     // Check if table already initialized in this connection's context.
    1046              :     // Note: This prevents same-connection duplicate initialization.
    1047              :     // SQLite clients cannot distinguish schemas, so having 'public.users'
    1048              :     // and 'auth.users' would cause sync ambiguity. Users should avoid
    1049              :     // initializing tables with the same name in different schemas.
    1050              :     // If two concurrent connections initialize tables with the same name
    1051              :     // in different schemas, the behavior is undefined.
    1052          263 :     cloudsync_table_context *table = table_lookup(data, table_name);
    1053          263 :     if (table) return true;
    1054              :     
    1055              :     // check for space availability
    1056          258 :     if (!table_ensure_capacity(data)) return false;
    1057              :     
    1058              :     // setup a new table
    1059          258 :     table = table_create(data, table_name, algo);
    1060          258 :     if (!table) return false;
    1061              :     
    1062              :     // fill remaining metadata in the table
    1063          258 :     int count = database_count_pk(data, table_name, false, table->schema);
    1064          258 :     if (count < 0) {cloudsync_set_dberror(data); goto abort_add_table;}
    1065          258 :     table->npks = count;
    1066          258 :     if (table->npks == 0) {
    1067              :         #if CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
    1068            0 :         goto abort_add_table;
    1069              :         #else
    1070              :         table->rowid_only = true;
    1071              :         table->npks = 1; // rowid
    1072              :         #endif
    1073              :     }
    1074              :     
    1075          258 :     int ncols = database_count_nonpk(data, table_name, table->schema);
    1076          258 :     if (ncols < 0) {cloudsync_set_dberror(data); goto abort_add_table;}
    1077          258 :     int rc = table_add_stmts(table, ncols);
    1078          258 :     if (rc != DBRES_OK) goto abort_add_table;
    1079              :     
    1080              :     // a table with only pk(s) is totally legal
    1081          258 :     if (ncols > 0) {
    1082          219 :         table->col_name = (char **)cloudsync_memory_alloc((uint64_t)(sizeof(char *) * ncols));
    1083          219 :         if (!table->col_name) goto abort_add_table;
    1084              :         
    1085          219 :         table->col_id = (int *)cloudsync_memory_alloc((uint64_t)(sizeof(int) * ncols));
    1086          219 :         if (!table->col_id) goto abort_add_table;
    1087              :         
    1088          219 :         table->col_merge_stmt = (dbvm_t **)cloudsync_memory_alloc((uint64_t)(sizeof(void *) * ncols));
    1089          219 :         if (!table->col_merge_stmt) goto abort_add_table;
    1090              :         
    1091          219 :         table->col_value_stmt = (dbvm_t **)cloudsync_memory_alloc((uint64_t)(sizeof(void *) * ncols));
    1092          219 :         if (!table->col_value_stmt) goto abort_add_table;
    1093              : 
    1094          219 :         table->col_algo = (col_algo_t *)cloudsync_memory_zeroalloc((uint64_t)(sizeof(col_algo_t) * ncols));
    1095          219 :         if (!table->col_algo) goto abort_add_table;
    1096              : 
    1097          219 :         table->col_delimiter = (char **)cloudsync_memory_zeroalloc((uint64_t)(sizeof(char *) * ncols));
    1098          219 :         if (!table->col_delimiter) goto abort_add_table;
    1099              : 
    1100              :         // Pass empty string when schema is NULL; SQL will fall back to current_schema()
    1101          219 :         const char *schema = table->schema ? table->schema : "";
    1102          438 :         char *sql = cloudsync_memory_mprintf(SQL_PRAGMA_TABLEINFO_LIST_NONPK_NAME_CID,
    1103          219 :                                               table_name, schema, table_name, schema);
    1104          219 :         if (!sql) goto abort_add_table;
    1105          219 :         rc = database_exec_callback(data, sql, table_add_to_context_cb, (void *)table);
    1106          219 :         cloudsync_memory_free(sql);
    1107          219 :         if (rc == DBRES_ABORT) goto abort_add_table;
    1108          219 :     }
    1109              :     
    1110              :     // append newly created table
    1111          258 :     data->tables[data->tables_count++] = table;
    1112          258 :     return true;
    1113              :     
    1114              : abort_add_table:
    1115            0 :     table_free(table);
    1116            0 :     return false;
    1117          263 : }
    1118              : 
    1119            0 : dbvm_t *cloudsync_colvalue_stmt (cloudsync_context *data, const char *tbl_name, bool *persistent) {
    1120            0 :     dbvm_t *vm = NULL;
    1121            0 :     *persistent = false;
    1122              : 
    1123            0 :     cloudsync_table_context *table = table_lookup(data, tbl_name);
    1124            0 :     if (table) {
    1125            0 :         char *col_name = NULL;
    1126            0 :         if (table->ncols > 0) {
    1127            0 :             col_name = table->col_name[0];
    1128              :             // retrieve col_value precompiled statement
    1129            0 :             vm = table_column_lookup(table, col_name, false, NULL);
    1130            0 :             *persistent = true;
    1131            0 :         } else {
    1132            0 :             char *sql = table_build_value_sql(table, "*");
    1133            0 :             databasevm_prepare(data, sql, (void **)&vm, 0);
    1134            0 :             cloudsync_memory_free(sql);
    1135            0 :             *persistent = false;
    1136              :         }
    1137            0 :     }
    1138              :     
    1139            0 :     return vm;
    1140              : }
    1141              : 
    1142         6832 : bool table_enabled (cloudsync_table_context *table) {
    1143         6832 :     return table->enabled;
    1144              : }
    1145              : 
    1146            6 : void table_set_enabled (cloudsync_table_context *table, bool value) {
    1147            6 :     table->enabled = value;
    1148            6 : }
    1149              : 
    1150        19566 : int table_count_cols (cloudsync_table_context *table) {
    1151        19566 :     return table->ncols;
    1152              : }
    1153              : 
    1154         6687 : int table_count_pks (cloudsync_table_context *table) {
    1155         6687 :     return table->npks;
    1156              : }
    1157              : 
    1158        32728 : const char *table_colname (cloudsync_table_context *table, int index) {
    1159        32728 :     return table->col_name[index];
    1160              : }
    1161              : 
    1162         3530 : bool table_pk_exists (cloudsync_table_context *table, const char *value, size_t len) {
    1163              :     // check if a row with the same primary key already exists
    1164              :     // if so, this means the row might have been previously deleted (sentinel)
    1165         3530 :     return (dbvm_count(table->meta_pkexists_stmt, value, len, DBTYPE_BLOB) > 0);
    1166              : }
    1167              : 
    1168            0 : char **table_pknames (cloudsync_table_context *table) {
    1169            0 :     return table->pk_name;
    1170              : }
    1171              : 
    1172           23 : void table_set_pknames (cloudsync_table_context *table, char **pknames) {
    1173           23 :     table_pknames_free(table->pk_name, table->npks);
    1174           23 :     table->pk_name = pknames;
    1175           23 : }
    1176              : 
    1177        49192 : bool table_algo_isgos (cloudsync_table_context *table) {
    1178        49192 :     return (table->algo == table_algo_crdt_gos);
    1179              : }
    1180              : 
    1181            0 : const char *table_schema (cloudsync_table_context *table) {
    1182            0 :     return table->schema;
    1183              : }
    1184              : 
    1185              : // MARK: - Merge Insert -
    1186              : 
    1187        46042 : int64_t merge_get_local_cl (cloudsync_table_context *table, const char *pk, int pklen) {
    1188        46042 :     dbvm_t *vm = table->meta_local_cl_stmt;
    1189        46042 :     int64_t result = -1;
    1190              :     
    1191        46042 :     int rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
    1192        46042 :     if (rc != DBRES_OK) goto cleanup;
    1193              :     
    1194        46042 :     rc = databasevm_bind_blob(vm, 2, (const void *)pk, pklen);
    1195        46042 :     if (rc != DBRES_OK) goto cleanup;
    1196              :     
    1197        46042 :     rc = databasevm_step(vm);
    1198        46042 :     if (rc == DBRES_ROW) result = database_column_int(vm, 0);
    1199            0 :     else if (rc == DBRES_DONE) result = 0;
    1200              :     
    1201              : cleanup:
    1202        46042 :     if (result == -1) cloudsync_set_dberror(table->context);
    1203        46042 :     dbvm_reset(vm);
    1204        46042 :     return result;
    1205              : }
    1206              : 
    1207        45005 : int merge_get_col_version (cloudsync_table_context *table, const char *col_name, const char *pk, int pklen, int64_t *version) {
    1208        45005 :     dbvm_t *vm = table->meta_col_version_stmt;
    1209              :     
    1210        45005 :     int rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
    1211        45005 :     if (rc != DBRES_OK) goto cleanup;
    1212              :     
    1213        45005 :     rc = databasevm_bind_text(vm, 2, col_name, -1);
    1214        45005 :     if (rc != DBRES_OK) goto cleanup;
    1215              :     
    1216        45005 :     rc = databasevm_step(vm);
    1217        69907 :     if (rc == DBRES_ROW) {
    1218        24902 :         *version = database_column_int(vm, 0);
    1219        24902 :         rc = DBRES_OK;
    1220        24902 :     }
    1221              :     
    1222              : cleanup:
    1223        45005 :     if ((rc != DBRES_OK) && (rc != DBRES_DONE)) cloudsync_set_dberror(table->context);
    1224        45005 :     dbvm_reset(vm);
    1225        45005 :     return rc;
    1226              : }
    1227              : 
    1228        25121 : int merge_set_winner_clock (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pk_len, const char *colname, int64_t col_version, int64_t db_version, const char *site_id, int site_len, int64_t seq, int64_t *rowid) {
    1229              :     
    1230              :     // get/set site_id
    1231        25121 :     dbvm_t *vm = data->getset_siteid_stmt;
    1232        25121 :     int rc = databasevm_bind_blob(vm, 1, (const void *)site_id, site_len);
    1233        25121 :     if (rc != DBRES_OK) goto cleanup_merge;
    1234              :     
    1235        25121 :     rc = databasevm_step(vm);
    1236        25121 :     if (rc != DBRES_ROW) goto cleanup_merge;
    1237              :     
    1238        25121 :     int64_t ord = database_column_int(vm, 0);
    1239        25121 :     dbvm_reset(vm);
    1240              :     
    1241        25121 :     vm = table->meta_winner_clock_stmt;
    1242        25121 :     rc = databasevm_bind_blob(vm, 1, (const void *)pk, pk_len);
    1243        25121 :     if (rc != DBRES_OK) goto cleanup_merge;
    1244              :     
    1245        25121 :     rc = databasevm_bind_text(vm, 2, (colname) ? colname : CLOUDSYNC_TOMBSTONE_VALUE, -1);
    1246        25121 :     if (rc != DBRES_OK) goto cleanup_merge;
    1247              :     
    1248        25121 :     rc = databasevm_bind_int(vm, 3, col_version);
    1249        25121 :     if (rc != DBRES_OK) goto cleanup_merge;
    1250              :     
    1251        25121 :     rc = databasevm_bind_int(vm, 4, db_version);
    1252        25121 :     if (rc != DBRES_OK) goto cleanup_merge;
    1253              :     
    1254        25121 :     rc = databasevm_bind_int(vm, 5, seq);
    1255        25121 :     if (rc != DBRES_OK) goto cleanup_merge;
    1256              :     
    1257        25121 :     rc = databasevm_bind_int(vm, 6, ord);
    1258        25121 :     if (rc != DBRES_OK) goto cleanup_merge;
    1259              :     
    1260        25121 :     rc = databasevm_step(vm);
    1261        50242 :     if (rc == DBRES_ROW) {
    1262        25121 :         *rowid = database_column_int(vm, 0);
    1263        25121 :         rc = DBRES_OK;
    1264        25121 :     }
    1265              :     
    1266              : cleanup_merge:
    1267        25121 :     if (rc != DBRES_OK) cloudsync_set_dberror(data);
    1268        25121 :     dbvm_reset(vm);
    1269        25121 :     return rc;
    1270              : }
    1271              : 
    1272              : // MARK: - Deferred column-batch merge functions -
    1273              : 
    1274        21318 : static int merge_pending_add (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, const char *col_name, dbvalue_t *col_value, int64_t col_version, int64_t db_version, const char *site_id, int site_len, int64_t seq) {
    1275        21318 :     merge_pending_batch *batch = data->pending_batch;
    1276              : 
    1277              :     // Store table and PK on first entry
    1278        21318 :     if (batch->table == NULL) {
    1279         7685 :         batch->table = table;
    1280         7685 :         batch->pk = (char *)cloudsync_memory_alloc(pklen);
    1281         7685 :         if (!batch->pk) return cloudsync_set_error(data, "merge_pending_add: out of memory for pk", DBRES_NOMEM);
    1282         7685 :         memcpy(batch->pk, pk, pklen);
    1283         7685 :         batch->pk_len = pklen;
    1284         7685 :     }
    1285              : 
    1286              :     // Ensure capacity
    1287        21318 :     if (batch->count >= batch->capacity) {
    1288          494 :         int new_cap = batch->capacity ? batch->capacity * 2 : 8;
    1289          494 :         merge_pending_entry *new_entries = (merge_pending_entry *)cloudsync_memory_realloc(batch->entries, new_cap * sizeof(merge_pending_entry));
    1290          494 :         if (!new_entries) return cloudsync_set_error(data, "merge_pending_add: out of memory for entries", DBRES_NOMEM);
    1291          494 :         batch->entries = new_entries;
    1292          494 :         batch->capacity = new_cap;
    1293          494 :     }
    1294              : 
    1295              :     // Resolve col_name to a stable pointer from the table context
    1296              :     // (the incoming col_name may point to VM-owned memory that gets freed on reset)
    1297        21318 :     int col_idx = -1;
    1298        21318 :     table_column_lookup(table, col_name, true, &col_idx);
    1299        21318 :     const char *stable_col_name = (col_idx >= 0) ? table_colname(table, col_idx) : NULL;
    1300        21318 :     if (!stable_col_name) return cloudsync_set_error(data, "merge_pending_add: column not found in table context", DBRES_ERROR);
    1301              : 
    1302        21318 :     merge_pending_entry *e = &batch->entries[batch->count];
    1303        21318 :     e->col_name = stable_col_name;
    1304        21318 :     e->col_value = col_value ? (dbvalue_t *)database_value_dup(col_value) : NULL;
    1305        21318 :     e->col_version = col_version;
    1306        21318 :     e->db_version = db_version;
    1307        21318 :     e->site_id_len = (site_len <= (int)sizeof(e->site_id)) ? site_len : (int)sizeof(e->site_id);
    1308        21318 :     memcpy(e->site_id, site_id, e->site_id_len);
    1309        21318 :     e->seq = seq;
    1310              : 
    1311        21318 :     batch->count++;
    1312        21318 :     return DBRES_OK;
    1313        21318 : }
    1314              : 
    1315        18878 : static void merge_pending_free_entries (merge_pending_batch *batch) {
    1316        18878 :     if (batch->entries) {
    1317        35561 :         for (int i = 0; i < batch->count; i++) {
    1318        21318 :             if (batch->entries[i].col_value) {
    1319        21318 :                 database_value_free(batch->entries[i].col_value);
    1320        21318 :                 batch->entries[i].col_value = NULL;
    1321        21318 :             }
    1322        21318 :         }
    1323        14243 :     }
    1324        18878 :     if (batch->pk) {
    1325         7717 :         cloudsync_memory_free(batch->pk);
    1326         7717 :         batch->pk = NULL;
    1327         7717 :     }
    1328        18878 :     batch->table = NULL;
    1329        18878 :     batch->pk_len = 0;
    1330        18878 :     batch->cl = 0;
    1331        18878 :     batch->sentinel_pending = false;
    1332        18878 :     batch->row_exists = false;
    1333        18878 :     batch->count = 0;
    1334        18878 : }
    1335              : 
    1336        18878 : static int merge_flush_pending (cloudsync_context *data) {
    1337        18878 :     merge_pending_batch *batch = data->pending_batch;
    1338        18878 :     if (!batch) return DBRES_OK;
    1339              : 
    1340        18878 :     int rc = DBRES_OK;
    1341        18878 :     bool flush_savepoint = false;
    1342              : 
    1343              :     // Nothing to write — handle sentinel-only case or skip
    1344        18878 :     if (batch->count == 0 && !(batch->sentinel_pending && batch->table)) {
    1345        11161 :         goto cleanup;
    1346              :     }
    1347              : 
    1348              :     // Wrap database operations in a savepoint so that on failure (e.g. RLS
    1349              :     // denial) the rollback properly releases all executor resources (open
    1350              :     // relations, snapshots, plan cache) acquired during the failed statement.
    1351         7717 :     flush_savepoint = (database_begin_savepoint(data, "merge_flush") == DBRES_OK);
    1352              : 
    1353         7717 :     if (batch->count == 0) {
    1354              :         // Sentinel with no winning columns (PK-only row)
    1355           30 :         dbvm_t *vm = batch->table->real_merge_sentinel_stmt;
    1356           30 :         rc = pk_decode_prikey(batch->pk, (size_t)batch->pk_len, pk_decode_bind_callback, vm);
    1357           30 :         if (rc < 0) {
    1358            0 :             cloudsync_set_dberror(data);
    1359            0 :             dbvm_reset(vm);
    1360            0 :             goto cleanup;
    1361              :         }
    1362           30 :         SYNCBIT_SET(data);
    1363           30 :         rc = databasevm_step(vm);
    1364           30 :         dbvm_reset(vm);
    1365           30 :         SYNCBIT_RESET(data);
    1366           30 :         if (rc == DBRES_DONE) rc = DBRES_OK;
    1367           30 :         if (rc != DBRES_OK) {
    1368            0 :             cloudsync_set_dberror(data);
    1369            0 :             goto cleanup;
    1370              :         }
    1371           30 :         goto cleanup;
    1372              :     }
    1373              : 
    1374              :     // Check if cached prepared statement can be reused
    1375         7687 :     cloudsync_table_context *table = batch->table;
    1376         7687 :     dbvm_t *vm = NULL;
    1377         7687 :     bool cache_hit = false;
    1378              : 
    1379        14576 :     if (batch->cached_vm &&
    1380         7193 :         batch->cached_row_exists == batch->row_exists &&
    1381         6889 :         batch->cached_col_count == batch->count) {
    1382         6867 :         cache_hit = true;
    1383        26557 :         for (int i = 0; i < batch->count; i++) {
    1384        19715 :             if (batch->cached_col_names[i] != batch->entries[i].col_name) {
    1385           25 :                 cache_hit = false;
    1386           25 :                 break;
    1387              :             }
    1388        19690 :         }
    1389         6867 :     }
    1390              : 
    1391         7687 :     if (cache_hit) {
    1392         6842 :         vm = batch->cached_vm;
    1393         6842 :         dbvm_reset(vm);
    1394         6842 :     } else {
    1395              :         // Invalidate old cache
    1396          845 :         if (batch->cached_vm) {
    1397          351 :             databasevm_finalize(batch->cached_vm);
    1398          351 :             batch->cached_vm = NULL;
    1399          351 :         }
    1400              : 
    1401              :         // Build multi-column SQL
    1402          845 :         const char **colnames = (const char **)cloudsync_memory_alloc(batch->count * sizeof(const char *));
    1403          845 :         if (!colnames) {
    1404            0 :             rc = cloudsync_set_error(data, "merge_flush_pending: out of memory", DBRES_NOMEM);
    1405            0 :             goto cleanup;
    1406              :         }
    1407         2473 :         for (int i = 0; i < batch->count; i++) {
    1408         1628 :             colnames[i] = batch->entries[i].col_name;
    1409         1628 :         }
    1410              : 
    1411          845 :         char *sql = batch->row_exists
    1412          417 :             ? sql_build_update_pk_and_multi_cols(data, table->name, colnames, batch->count, table->schema)
    1413          428 :             : sql_build_upsert_pk_and_multi_cols(data, table->name, colnames, batch->count, table->schema);
    1414          845 :         cloudsync_memory_free(colnames);
    1415              : 
    1416          845 :         if (!sql) {
    1417            0 :             rc = cloudsync_set_error(data, "merge_flush_pending: unable to build multi-column upsert SQL", DBRES_ERROR);
    1418            0 :             goto cleanup;
    1419              :         }
    1420              : 
    1421          845 :         rc = databasevm_prepare(data, sql, &vm, 0);
    1422          845 :         cloudsync_memory_free(sql);
    1423          845 :         if (rc != DBRES_OK) {
    1424            0 :             rc = cloudsync_set_error(data, "merge_flush_pending: unable to prepare statement", rc);
    1425            0 :             goto cleanup;
    1426              :         }
    1427              : 
    1428              :         // Update cache
    1429          845 :         batch->cached_vm = vm;
    1430          845 :         batch->cached_row_exists = batch->row_exists;
    1431          845 :         batch->cached_col_count = batch->count;
    1432              :         // Reallocate cached_col_names if needed
    1433          845 :         if (batch->cached_col_count > 0) {
    1434          845 :             const char **new_names = (const char **)cloudsync_memory_realloc(
    1435          845 :                 batch->cached_col_names, batch->count * sizeof(const char *));
    1436          845 :             if (new_names) {
    1437         2473 :                 for (int i = 0; i < batch->count; i++) {
    1438         1628 :                     new_names[i] = batch->entries[i].col_name;
    1439         1628 :                 }
    1440          845 :                 batch->cached_col_names = new_names;
    1441          845 :             }
    1442          845 :         }
    1443              :     }
    1444              : 
    1445              :     // Bind PKs (positions 1..npks)
    1446         7687 :     int npks = pk_decode_prikey(batch->pk, (size_t)batch->pk_len, pk_decode_bind_callback, vm);
    1447         7687 :     if (npks < 0) {
    1448            0 :         cloudsync_set_dberror(data);
    1449            0 :         dbvm_reset(vm);
    1450            0 :         rc = DBRES_ERROR;
    1451            0 :         goto cleanup;
    1452              :     }
    1453              : 
    1454              :     // Bind column values (positions npks+1..npks+count)
    1455        29005 :     for (int i = 0; i < batch->count; i++) {
    1456        21318 :         merge_pending_entry *e = &batch->entries[i];
    1457        21318 :         int bind_idx = npks + 1 + i;
    1458        21318 :         if (e->col_value) {
    1459        21318 :             rc = databasevm_bind_value(vm, bind_idx, e->col_value);
    1460        21318 :         } else {
    1461            0 :             rc = databasevm_bind_null(vm, bind_idx);
    1462              :         }
    1463        21318 :         if (rc != DBRES_OK) {
    1464            0 :             cloudsync_set_dberror(data);
    1465            0 :             dbvm_reset(vm);
    1466            0 :             goto cleanup;
    1467              :         }
    1468        21318 :     }
    1469              : 
    1470              :     // Execute with SYNCBIT and GOS handling
    1471         7687 :     if (table->algo == table_algo_crdt_gos) table->enabled = 0;
    1472         7687 :     SYNCBIT_SET(data);
    1473         7687 :     rc = databasevm_step(vm);
    1474         7687 :     dbvm_reset(vm);
    1475         7687 :     SYNCBIT_RESET(data);
    1476         7687 :     if (table->algo == table_algo_crdt_gos) table->enabled = 1;
    1477              : 
    1478         7687 :     if (rc != DBRES_DONE) {
    1479            3 :         cloudsync_set_dberror(data);
    1480            3 :         goto cleanup;
    1481              :     }
    1482         7684 :     rc = DBRES_OK;
    1483              : 
    1484              :     // Call merge_set_winner_clock for each buffered entry
    1485         7684 :     int64_t rowid = 0;
    1486        28994 :     for (int i = 0; i < batch->count; i++) {
    1487        21310 :         merge_pending_entry *e = &batch->entries[i];
    1488        42620 :         int clock_rc = merge_set_winner_clock(data, table, batch->pk, batch->pk_len,
    1489        21310 :                                                e->col_name, e->col_version, e->db_version,
    1490        21310 :                                                (const char *)e->site_id, e->site_id_len,
    1491        21310 :                                                e->seq, &rowid);
    1492        21310 :         if (clock_rc != DBRES_OK) {
    1493            0 :             rc = clock_rc;
    1494            0 :             goto cleanup;
    1495              :         }
    1496        28994 :     }
    1497              : 
    1498              : cleanup:
    1499        18878 :     merge_pending_free_entries(batch);
    1500        18878 :     if (flush_savepoint) {
    1501         7717 :         if (rc == DBRES_OK) database_commit_savepoint(data, "merge_flush");
    1502            3 :         else database_rollback_savepoint(data, "merge_flush");
    1503         7717 :     }
    1504        18878 :     return rc;
    1505        18878 : }
    1506              : 
    1507         3167 : int merge_insert_col (cloudsync_context *data, cloudsync_table_context *table, const void *pk, int pklen, const char *col_name, dbvalue_t *col_value, int64_t col_version, int64_t db_version, const char *site_id, int site_len, int64_t seq, int64_t *rowid) {
    1508              :     int index;
    1509         3167 :     dbvm_t *vm = table_column_lookup(table, col_name, true, &index);
    1510         3167 :     if (vm == NULL) return cloudsync_set_error(data, "Unable to retrieve column merge precompiled statement in merge_insert_col", DBRES_MISUSE);
    1511              :     
    1512              :     // INSERT INTO table (pk1, pk2, col_name) VALUES (?, ?, ?) ON CONFLICT DO UPDATE SET col_name=?;"
    1513              :     
    1514              :     // bind primary key(s)
    1515         3167 :     int rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, vm);
    1516         3167 :     if (rc < 0) {
    1517            0 :         cloudsync_set_dberror(data);
    1518            0 :         dbvm_reset(vm);
    1519            0 :         return rc;
    1520              :     }
    1521              :     
    1522              :     // bind value (always bind all expected parameters for correct prepared statement handling)
    1523         3167 :     if (col_value) {
    1524         3167 :         rc = databasevm_bind_value(vm, table->npks+1, col_value);
    1525         3167 :         if (rc == DBRES_OK) rc = databasevm_bind_value(vm, table->npks+2, col_value);
    1526         3167 :     } else {
    1527            0 :         rc = databasevm_bind_null(vm, table->npks+1);
    1528            0 :         if (rc == DBRES_OK) rc = databasevm_bind_null(vm, table->npks+2);
    1529              :     }
    1530         3167 :     if (rc != DBRES_OK) {
    1531            0 :         cloudsync_set_dberror(data);
    1532            0 :         dbvm_reset(vm);
    1533            0 :         return rc;
    1534              :     }
    1535              : 
    1536              :     // perform real operation and disable triggers
    1537              :     
    1538              :     // in case of GOS we reused the table->col_merge_stmt statement
    1539              :     // which looks like: INSERT INTO table (pk1, pk2, col_name) VALUES (?, ?, ?) ON CONFLICT DO UPDATE SET col_name=?;"
    1540              :     // but the UPDATE in the CONFLICT statement would return SQLITE_CONSTRAINT because the trigger raises the error
    1541              :     // the trick is to disable that trigger before executing the statement
    1542         3167 :     if (table->algo == table_algo_crdt_gos) table->enabled = 0;
    1543         3167 :     SYNCBIT_SET(data);
    1544         3167 :     rc = databasevm_step(vm);
    1545              :     DEBUG_MERGE("merge_insert(%02x%02x): %s (%d)", data->site_id[UUID_LEN-2], data->site_id[UUID_LEN-1], databasevm_sql(vm), rc);
    1546         3167 :     dbvm_reset(vm);
    1547         3167 :     SYNCBIT_RESET(data);
    1548         3167 :     if (table->algo == table_algo_crdt_gos) table->enabled = 1;
    1549              :     
    1550         3167 :     if (rc != DBRES_DONE) {
    1551            0 :         cloudsync_set_dberror(data);
    1552            0 :         return rc;
    1553              :     }
    1554              :     
    1555         3167 :     return merge_set_winner_clock(data, table, pk, pklen, col_name, col_version, db_version, site_id, site_len, seq, rowid);
    1556         3167 : }
    1557              : 
    1558          170 : int merge_delete (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, const char *colname, int64_t cl, int64_t db_version, const char *site_id, int site_len, int64_t seq, int64_t *rowid) {
    1559          170 :     int rc = DBRES_OK;
    1560              :     
    1561              :     // reset return value
    1562          170 :     *rowid = 0;
    1563              :     
    1564              :     // bind pk
    1565          170 :     dbvm_t *vm = table->real_merge_delete_stmt;
    1566          170 :     rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, vm);
    1567          170 :     if (rc < 0) {
    1568            0 :         rc = cloudsync_set_dberror(data);
    1569            0 :         dbvm_reset(vm);
    1570            0 :         return rc;
    1571              :     }
    1572              :     
    1573              :     // perform real operation and disable triggers
    1574          170 :     SYNCBIT_SET(data);
    1575          170 :     rc = databasevm_step(vm);
    1576              :     DEBUG_MERGE("merge_delete(%02x%02x): %s (%d)", data->site_id[UUID_LEN-2], data->site_id[UUID_LEN-1], databasevm_sql(vm), rc);
    1577          170 :     dbvm_reset(vm);
    1578          170 :     SYNCBIT_RESET(data);
    1579          170 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    1580          170 :     if (rc != DBRES_OK) {
    1581            0 :         cloudsync_set_dberror(data);
    1582            0 :         return rc;
    1583              :     }
    1584              :     
    1585          170 :     rc = merge_set_winner_clock(data, table, pk, pklen, colname, cl, db_version, site_id, site_len, seq, rowid);
    1586          170 :     if (rc != DBRES_OK) return rc;
    1587              :     
    1588              :     // drop clocks _after_ setting the winner clock so we don't lose track of the max db_version!!
    1589              :     // this must never come before `set_winner_clock`
    1590          170 :     vm = table->meta_merge_delete_drop;
    1591          170 :     rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
    1592          170 :     if (rc == DBRES_OK) rc = databasevm_step(vm);
    1593          170 :     dbvm_reset(vm);
    1594              :     
    1595          170 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    1596          170 :     if (rc != DBRES_OK) cloudsync_set_dberror(data);
    1597          170 :     return rc;
    1598          170 : }
    1599              : 
    1600           62 : int merge_zeroclock_on_resurrect(cloudsync_table_context *table, int64_t db_version, const char *pk, int pklen) {
    1601           62 :     dbvm_t *vm = table->meta_zero_clock_stmt;
    1602              :     
    1603           62 :     int rc = databasevm_bind_int(vm, 1, db_version);
    1604           62 :     if (rc != DBRES_OK) goto cleanup;
    1605              :     
    1606           62 :     rc = databasevm_bind_blob(vm, 2, (const void *)pk, pklen);
    1607           62 :     if (rc != DBRES_OK) goto cleanup;
    1608              :     
    1609           62 :     rc = databasevm_step(vm);
    1610           62 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    1611              :     
    1612              : cleanup:
    1613           62 :     if (rc != DBRES_OK) cloudsync_set_dberror(table->context);
    1614           62 :     dbvm_reset(vm);
    1615           62 :     return rc;
    1616              : }
    1617              : 
    1618              : // executed only if insert_cl == local_cl
    1619        45005 : int merge_did_cid_win (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, dbvalue_t *insert_value, const char *site_id, int site_len, const char *col_name, int64_t col_version, bool *didwin_flag) {
    1620              :     
    1621        45005 :     if (col_name == NULL) col_name = CLOUDSYNC_TOMBSTONE_VALUE;
    1622              :     
    1623              :     int64_t local_version;
    1624        45005 :     int rc = merge_get_col_version(table, col_name, pk, pklen, &local_version);
    1625        45005 :     if (rc == DBRES_DONE) {
    1626              :         // no rows returned, the incoming change wins if there's nothing there locally
    1627        20103 :         *didwin_flag = true;
    1628        20103 :         return DBRES_OK;
    1629              :     }
    1630        24902 :     if (rc != DBRES_OK) return rc;
    1631              :     
    1632              :     // rc == DBRES_OK, means that a row with a version exists
    1633        24902 :     if (local_version != col_version) {
    1634         1984 :         if (col_version > local_version) {*didwin_flag = true; return DBRES_OK;}
    1635          749 :         if (col_version < local_version) {*didwin_flag = false; return DBRES_OK;}
    1636            0 :     }
    1637              :     
    1638              :     // rc == DBRES_ROW and col_version == local_version, need to compare values
    1639              : 
    1640              :     // retrieve col_value precompiled statement
    1641        22918 :     bool is_block_col = block_is_block_colname(col_name) && table_has_block_cols(table);
    1642              :     dbvm_t *vm;
    1643        22918 :     if (is_block_col) {
    1644              :         // Block column: read value from blocks table (pk + col_name bindings)
    1645           43 :         vm = table_block_value_read_stmt(table);
    1646           43 :         if (!vm) return cloudsync_set_error(data, "Unable to retrieve block value read statement in merge_did_cid_win", DBRES_ERROR);
    1647           43 :         rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
    1648           43 :         if (rc != DBRES_OK) { dbvm_reset(vm); return cloudsync_set_dberror(data); }
    1649           43 :         rc = databasevm_bind_text(vm, 2, col_name, -1);
    1650           43 :         if (rc != DBRES_OK) { dbvm_reset(vm); return cloudsync_set_dberror(data); }
    1651           43 :     } else {
    1652        22875 :         vm = table_column_lookup(table, col_name, false, NULL);
    1653        22875 :         if (!vm) return cloudsync_set_error(data, "Unable to retrieve column value precompiled statement in merge_did_cid_win", DBRES_ERROR);
    1654              : 
    1655              :         // bind primary key values
    1656        22875 :         rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, (void *)vm);
    1657        22875 :         if (rc < 0) {
    1658            0 :             rc = cloudsync_set_dberror(data);
    1659            0 :             dbvm_reset(vm);
    1660            0 :             return rc;
    1661              :         }
    1662              :     }
    1663              :         
    1664              :     // execute vm
    1665              :     dbvalue_t *local_value;
    1666        22918 :     rc = databasevm_step(vm);
    1667        22918 :     if (rc == DBRES_DONE) {
    1668              :         // meta entry exists but the actual value is missing
    1669              :         // we should allow the value_compare function to make a decision
    1670              :         // value_compare has been modified to handle the case where lvalue is NULL
    1671            2 :         local_value = NULL;
    1672            2 :         rc = DBRES_OK;
    1673        22918 :     } else if (rc == DBRES_ROW) {
    1674        22916 :         local_value = database_column_value(vm, 0);
    1675        22916 :         rc = DBRES_OK;
    1676        22916 :     } else {
    1677            0 :         goto cleanup;
    1678              :     }
    1679              :     
    1680              :     // compare values
    1681        22918 :     int ret = dbutils_value_compare(insert_value, local_value);
    1682              :     // reset after compare, otherwise local value would be deallocated
    1683        22918 :     dbvm_reset(vm);
    1684        22918 :     vm = NULL;
    1685              :     
    1686        22918 :     bool compare_site_id = (ret == 0 && data->merge_equal_values == true);
    1687        22918 :     if (!compare_site_id) {
    1688        22918 :         *didwin_flag = (ret > 0);
    1689        22918 :         goto cleanup;
    1690              :     }
    1691              :     
    1692              :     // values are the same and merge_equal_values is true
    1693            0 :     vm = table->meta_site_id_stmt;
    1694            0 :     rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
    1695            0 :     if (rc != DBRES_OK) goto cleanup;
    1696              :     
    1697            0 :     rc = databasevm_bind_text(vm, 2, col_name, -1);
    1698            0 :     if (rc != DBRES_OK) goto cleanup;
    1699              :     
    1700            0 :     rc = databasevm_step(vm);
    1701            0 :     if (rc == DBRES_ROW) {
    1702            0 :         const void *local_site_id = database_column_blob(vm, 0, NULL);
    1703            0 :         if (!local_site_id) {
    1704            0 :             dbvm_reset(vm);
    1705            0 :             return cloudsync_set_error(data, "NULL site_id in cloudsync table, table is probably corrupted", DBRES_ERROR);
    1706              :         }
    1707            0 :         ret = memcmp(site_id, local_site_id, site_len);
    1708            0 :         *didwin_flag = (ret > 0);
    1709            0 :         dbvm_reset(vm);
    1710            0 :         return DBRES_OK;
    1711              :     }
    1712              :     
    1713              :     // handle error condition here
    1714            0 :     dbvm_reset(vm);
    1715            0 :     return cloudsync_set_error(data, "Unable to find site_id for previous change, cloudsync table is probably corrupted", DBRES_ERROR);
    1716              :     
    1717              : cleanup:
    1718        22918 :     if (rc != DBRES_OK) cloudsync_set_dberror(data);
    1719        22918 :     dbvm_reset(vm);
    1720        22918 :     return rc;
    1721        45005 : }
    1722              : 
    1723           62 : int merge_sentinel_only_insert (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, int64_t cl, int64_t db_version, const char *site_id, int site_len, int64_t seq, int64_t *rowid) {
    1724              : 
    1725              :     // reset return value
    1726           62 :     *rowid = 0;
    1727              : 
    1728           62 :     if (data->pending_batch == NULL) {
    1729              :         // Immediate mode: execute base table INSERT
    1730            0 :         dbvm_t *vm = table->real_merge_sentinel_stmt;
    1731            0 :         int rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, vm);
    1732            0 :         if (rc < 0) {
    1733            0 :             rc = cloudsync_set_dberror(data);
    1734            0 :             dbvm_reset(vm);
    1735            0 :             return rc;
    1736              :         }
    1737              : 
    1738            0 :         SYNCBIT_SET(data);
    1739            0 :         rc = databasevm_step(vm);
    1740            0 :         dbvm_reset(vm);
    1741            0 :         SYNCBIT_RESET(data);
    1742            0 :         if (rc == DBRES_DONE) rc = DBRES_OK;
    1743            0 :         if (rc != DBRES_OK) {
    1744            0 :             cloudsync_set_dberror(data);
    1745            0 :             return rc;
    1746              :         }
    1747            0 :     } else {
    1748              :         // Batch mode: skip base table INSERT, the batch flush will create the row
    1749           62 :         merge_pending_batch *batch = data->pending_batch;
    1750           62 :         batch->sentinel_pending = true;
    1751           62 :         if (batch->table == NULL) {
    1752           32 :             batch->table = table;
    1753           32 :             batch->pk = (char *)cloudsync_memory_alloc(pklen);
    1754           32 :             if (!batch->pk) return cloudsync_set_error(data, "merge_sentinel_only_insert: out of memory for pk", DBRES_NOMEM);
    1755           32 :             memcpy(batch->pk, pk, pklen);
    1756           32 :             batch->pk_len = pklen;
    1757           32 :         }
    1758              :     }
    1759              : 
    1760              :     // Metadata operations always execute regardless of batch mode
    1761           62 :     int rc = merge_zeroclock_on_resurrect(table, db_version, pk, pklen);
    1762           62 :     if (rc != DBRES_OK) return rc;
    1763              : 
    1764           62 :     return merge_set_winner_clock(data, table, pk, pklen, NULL, cl, db_version, site_id, site_len, seq, rowid);
    1765           62 : }
    1766              : 
    1767              : // MARK: - Block-level merge helpers -
    1768              : 
    1769              : // Store a block value in the blocks table
    1770          379 : static int block_store_value (cloudsync_context *data, cloudsync_table_context *table, const void *pk, int pklen, const char *block_colname, dbvalue_t *col_value) {
    1771          379 :     dbvm_t *vm = table->block_value_write_stmt;
    1772          379 :     if (!vm) return cloudsync_set_error(data, "block_store_value: blocks table not initialized", DBRES_MISUSE);
    1773              : 
    1774          379 :     int rc = databasevm_bind_blob(vm, 1, pk, pklen);
    1775          379 :     if (rc != DBRES_OK) goto cleanup;
    1776          379 :     rc = databasevm_bind_text(vm, 2, block_colname, -1);
    1777          379 :     if (rc != DBRES_OK) goto cleanup;
    1778          379 :     if (col_value) {
    1779          379 :         rc = databasevm_bind_value(vm, 3, col_value);
    1780          379 :     } else {
    1781            0 :         rc = databasevm_bind_null(vm, 3);
    1782              :     }
    1783          379 :     if (rc != DBRES_OK) goto cleanup;
    1784              : 
    1785          379 :     rc = databasevm_step(vm);
    1786          379 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    1787              : 
    1788              : cleanup:
    1789          379 :     if (rc != DBRES_OK) cloudsync_set_dberror(data);
    1790          379 :     databasevm_reset(vm);
    1791          379 :     return rc;
    1792          379 : }
    1793              : 
    1794              : // Delete a block value from the blocks table
    1795           72 : static int block_delete_value (cloudsync_context *data, cloudsync_table_context *table, const void *pk, int pklen, const char *block_colname) {
    1796           72 :     dbvm_t *vm = table->block_value_delete_stmt;
    1797           72 :     if (!vm) return cloudsync_set_error(data, "block_delete_value: blocks table not initialized", DBRES_MISUSE);
    1798              : 
    1799           72 :     int rc = databasevm_bind_blob(vm, 1, pk, pklen);
    1800           72 :     if (rc != DBRES_OK) goto cleanup;
    1801           72 :     rc = databasevm_bind_text(vm, 2, block_colname, -1);
    1802           72 :     if (rc != DBRES_OK) goto cleanup;
    1803              : 
    1804           72 :     rc = databasevm_step(vm);
    1805           72 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    1806              : 
    1807              : cleanup:
    1808           72 :     if (rc != DBRES_OK) cloudsync_set_dberror(data);
    1809           72 :     databasevm_reset(vm);
    1810           72 :     return rc;
    1811           72 : }
    1812              : 
    1813              : // Materialize all alive blocks for a base column into the base table
    1814          462 : int block_materialize_column (cloudsync_context *data, cloudsync_table_context *table, const void *pk, int pklen, const char *base_col_name) {
    1815          462 :     if (!table->block_list_stmt) return cloudsync_set_error(data, "block_materialize_column: blocks table not initialized", DBRES_MISUSE);
    1816              : 
    1817              :     // Find column index and delimiter
    1818          462 :     int col_idx = -1;
    1819          468 :     for (int i = 0; i < table->ncols; i++) {
    1820          468 :         if (strcasecmp(table->col_name[i], base_col_name) == 0) {
    1821          462 :             col_idx = i;
    1822          462 :             break;
    1823              :         }
    1824            6 :     }
    1825          462 :     if (col_idx < 0) return cloudsync_set_error(data, "block_materialize_column: column not found", DBRES_ERROR);
    1826          462 :     const char *delimiter = table->col_delimiter[col_idx] ? table->col_delimiter[col_idx] : BLOCK_DEFAULT_DELIMITER;
    1827              : 
    1828              :     // Build the LIKE pattern for block col_names: "base_col\x1F%"
    1829          462 :     char *like_pattern = block_build_colname(base_col_name, "%");
    1830          462 :     if (!like_pattern) return DBRES_NOMEM;
    1831              : 
    1832              :     // Query alive blocks from blocks table joined with metadata
    1833              :     // block_list_stmt: SELECT b.col_value FROM blocks b JOIN meta m
    1834              :     //   ON b.pk = m.pk AND b.col_name = m.col_name
    1835              :     //   WHERE b.pk = ? AND b.col_name LIKE ? AND m.col_version % 2 = 1
    1836              :     //   ORDER BY b.col_name
    1837          462 :     dbvm_t *vm = table->block_list_stmt;
    1838          462 :     int rc = databasevm_bind_blob(vm, 1, pk, pklen);
    1839          462 :     if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_reset(vm); return rc; }
    1840          462 :     rc = databasevm_bind_text(vm, 2, like_pattern, -1);
    1841          462 :     if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_reset(vm); return rc; }
    1842              :     // Bind pk again for the join condition (parameter 3)
    1843          462 :     rc = databasevm_bind_blob(vm, 3, pk, pklen);
    1844          462 :     if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_reset(vm); return rc; }
    1845          462 :     rc = databasevm_bind_text(vm, 4, like_pattern, -1);
    1846          462 :     if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_reset(vm); return rc; }
    1847              : 
    1848              :     // Collect block values
    1849          462 :     const char **block_values = NULL;
    1850          462 :     int block_count = 0;
    1851          462 :     int block_cap = 0;
    1852              : 
    1853        22695 :     while ((rc = databasevm_step(vm)) == DBRES_ROW) {
    1854        22233 :         const char *value = database_column_text(vm, 0);
    1855        22233 :         if (block_count >= block_cap) {
    1856         1076 :             int new_cap = block_cap ? block_cap * 2 : 16;
    1857         1076 :             const char **new_arr = (const char **)cloudsync_memory_realloc((void *)block_values, (uint64_t)(new_cap * sizeof(char *)));
    1858         1076 :             if (!new_arr) { rc = DBRES_NOMEM; break; }
    1859         1076 :             block_values = new_arr;
    1860         1076 :             block_cap = new_cap;
    1861         1076 :         }
    1862        22233 :         block_values[block_count] = value ? cloudsync_string_dup(value) : cloudsync_string_dup("");
    1863        22233 :         block_count++;
    1864              :     }
    1865          462 :     databasevm_reset(vm);
    1866          462 :     cloudsync_memory_free(like_pattern);
    1867              : 
    1868          462 :     if (rc != DBRES_DONE && rc != DBRES_OK && rc != DBRES_ROW) {
    1869              :         // Free collected values
    1870            0 :         for (int i = 0; i < block_count; i++) cloudsync_memory_free((void *)block_values[i]);
    1871            0 :         if (block_values) cloudsync_memory_free((void *)block_values);
    1872            0 :         return cloudsync_set_dberror(data);
    1873              :     }
    1874              : 
    1875              :     // Materialize text (NULL when no alive blocks)
    1876          462 :     char *text = (block_count > 0) ? block_materialize_text(block_values, block_count, delimiter) : NULL;
    1877        22695 :     for (int i = 0; i < block_count; i++) cloudsync_memory_free((void *)block_values[i]);
    1878          462 :     if (block_values) cloudsync_memory_free((void *)block_values);
    1879          462 :     if (block_count > 0 && !text) return DBRES_NOMEM;
    1880              : 
    1881              :     // Update the base table column via the col_merge_stmt (with triggers disabled)
    1882          462 :     dbvm_t *merge_vm = table->col_merge_stmt[col_idx];
    1883          462 :     if (!merge_vm) { cloudsync_memory_free(text); return DBRES_ERROR; }
    1884              : 
    1885              :     // Bind PKs
    1886          462 :     rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, merge_vm);
    1887          462 :     if (rc < 0) { cloudsync_memory_free(text); databasevm_reset(merge_vm); return DBRES_ERROR; }
    1888              : 
    1889              :     // Bind the text value twice (INSERT value + ON CONFLICT UPDATE value)
    1890          462 :     int npks = table->npks;
    1891          462 :     if (text) {
    1892          458 :         rc = databasevm_bind_text(merge_vm, npks + 1, text, -1);
    1893          458 :         if (rc != DBRES_OK) { cloudsync_memory_free(text); databasevm_reset(merge_vm); return rc; }
    1894          458 :         rc = databasevm_bind_text(merge_vm, npks + 2, text, -1);
    1895          458 :         if (rc != DBRES_OK) { cloudsync_memory_free(text); databasevm_reset(merge_vm); return rc; }
    1896          458 :     } else {
    1897            4 :         rc = databasevm_bind_null(merge_vm, npks + 1);
    1898            4 :         if (rc != DBRES_OK) { databasevm_reset(merge_vm); return rc; }
    1899            4 :         rc = databasevm_bind_null(merge_vm, npks + 2);
    1900            4 :         if (rc != DBRES_OK) { databasevm_reset(merge_vm); return rc; }
    1901              :     }
    1902              : 
    1903              :     // Execute with triggers disabled
    1904          462 :     table->enabled = 0;
    1905          462 :     SYNCBIT_SET(data);
    1906          462 :     rc = databasevm_step(merge_vm);
    1907          462 :     databasevm_reset(merge_vm);
    1908          462 :     SYNCBIT_RESET(data);
    1909          462 :     table->enabled = 1;
    1910              : 
    1911          462 :     cloudsync_memory_free(text);
    1912              : 
    1913          462 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    1914          462 :     if (rc != DBRES_OK) return cloudsync_set_dberror(data);
    1915          462 :     return DBRES_OK;
    1916          462 : }
    1917              : 
    1918              : // Accessor for has_block_cols flag
    1919         1024 : bool table_has_block_cols (cloudsync_table_context *table) {
    1920         1024 :     return table && table->has_block_cols;
    1921              : }
    1922              : 
    1923              : // Get block column algo for a given column index
    1924        11420 : col_algo_t table_col_algo (cloudsync_table_context *table, int index) {
    1925        11420 :     if (!table || !table->col_algo || index < 0 || index >= table->ncols) return col_algo_normal;
    1926        11420 :     return table->col_algo[index];
    1927        11420 : }
    1928              : 
    1929              : // Get block delimiter for a given column index
    1930          133 : const char *table_col_delimiter (cloudsync_table_context *table, int index) {
    1931          133 :     if (!table || !table->col_delimiter || index < 0 || index >= table->ncols) return BLOCK_DEFAULT_DELIMITER;
    1932          133 :     return table->col_delimiter[index] ? table->col_delimiter[index] : BLOCK_DEFAULT_DELIMITER;
    1933          133 : }
    1934              : 
    1935              : // Block column struct accessors (for use outside cloudsync.c where struct is opaque)
    1936         1024 : dbvm_t *table_block_value_read_stmt (cloudsync_table_context *table) { return table ? table->block_value_read_stmt : NULL; }
    1937          496 : dbvm_t *table_block_value_write_stmt (cloudsync_table_context *table) { return table ? table->block_value_write_stmt : NULL; }
    1938           91 : dbvm_t *table_block_list_stmt (cloudsync_table_context *table) { return table ? table->block_list_stmt : NULL; }
    1939           91 : const char *table_blocks_ref (cloudsync_table_context *table) { return table ? table->blocks_ref : NULL; }
    1940              : 
    1941            2 : void table_set_col_delimiter (cloudsync_table_context *table, int col_idx, const char *delimiter) {
    1942            2 :     if (!table || !table->col_delimiter || col_idx < 0 || col_idx >= table->ncols) return;
    1943            2 :     if (table->col_delimiter[col_idx]) cloudsync_memory_free(table->col_delimiter[col_idx]);
    1944            2 :     table->col_delimiter[col_idx] = delimiter ? cloudsync_string_dup(delimiter) : NULL;
    1945            2 : }
    1946              : 
    1947              : // Find column index by name
    1948          121 : int table_col_index (cloudsync_table_context *table, const char *col_name) {
    1949          121 :     if (!table || !col_name) return -1;
    1950          125 :     for (int i = 0; i < table->ncols; i++) {
    1951          125 :         if (strcasecmp(table->col_name[i], col_name) == 0) return i;
    1952            4 :     }
    1953            0 :     return -1;
    1954          121 : }
    1955              : 
    1956        46042 : int merge_insert (cloudsync_context *data, cloudsync_table_context *table, const char *insert_pk, int insert_pk_len, int64_t insert_cl, const char *insert_name, dbvalue_t *insert_value, int64_t insert_col_version, int64_t insert_db_version, const char *insert_site_id, int insert_site_id_len, int64_t insert_seq, int64_t *rowid) {
    1957              :     // Handle DWS and AWS algorithms here
    1958              :     // Delete-Wins Set (DWS): table_algo_crdt_dws
    1959              :     // Add-Wins Set (AWS): table_algo_crdt_aws
    1960              :     
    1961              :     // Causal-Length Set (CLS) Algorithm (default)
    1962              :     
    1963              :     // compute the local causal length for the row based on the primary key
    1964              :     // the causal length is used to determine the order of operations and resolve conflicts.
    1965        46042 :     int64_t local_cl = merge_get_local_cl(table, insert_pk, insert_pk_len);
    1966        46042 :     if (local_cl < 0) return cloudsync_set_error(data, "Unable to compute local causal length", DBRES_ERROR);
    1967              :     
    1968              :     // if the incoming causal length is older than the local causal length, we can safely ignore it
    1969              :     // because the local changes are more recent
    1970        46042 :     if (insert_cl < local_cl) return DBRES_OK;
    1971              :     
    1972              :     // check if the operation is a delete by examining the causal length
    1973              :     // even causal lengths typically signify delete operations
    1974        45814 :     bool is_delete = (insert_cl % 2 == 0);
    1975        45814 :     if (is_delete) {
    1976              :         // if it's a delete, check if the local state is at the same causal length
    1977              :         // if it is, no further action is needed
    1978          621 :         if (local_cl == insert_cl) return DBRES_OK;
    1979              :         
    1980              :         // perform a delete merge if the causal length is newer than the local one
    1981          340 :         int rc = merge_delete(data, table, insert_pk, insert_pk_len, insert_name, insert_col_version,
    1982          170 :                               insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid);
    1983          170 :         if (rc != DBRES_OK) cloudsync_set_error(data, "Unable to perform merge_delete", rc);
    1984          170 :         return rc;
    1985              :     }
    1986              :     
    1987              :     // if the operation is a sentinel-only insert (indicating a new row or resurrected row with no column update), handle it separately.
    1988        45193 :     bool is_sentinel_only = (strcmp(insert_name, CLOUDSYNC_TOMBSTONE_VALUE) == 0);
    1989        45193 :     if (is_sentinel_only) {
    1990          188 :         if (local_cl == insert_cl) return DBRES_OK;
    1991              :         
    1992              :         // perform a sentinel-only insert to track the existence of the row
    1993          124 :         int rc = merge_sentinel_only_insert(data, table, insert_pk, insert_pk_len, insert_col_version,
    1994           62 :                                             insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid);
    1995           62 :         if (rc != DBRES_OK) cloudsync_set_error(data, "Unable to perform merge_sentinel_only_insert", rc);
    1996           62 :         return rc;
    1997              :     }
    1998              :     
    1999              :     // from this point I can be sure that insert_name is not sentinel
    2000              :     
    2001              :     // handle the case where a row is being resurrected (e.g., after a delete, a new insert for the same row)
    2002              :     // odd causal lengths can "resurrect" rows
    2003        45005 :     bool needs_resurrect = (insert_cl > local_cl && insert_cl % 2 == 1);
    2004        45005 :     bool row_exists_locally = local_cl != 0;
    2005              :    
    2006              :     // if a resurrection is needed, insert a sentinel to mark the row as alive
    2007              :     // this handles out-of-order deliveries where the row was deleted and is now being re-inserted
    2008        45005 :     if (needs_resurrect && (row_exists_locally || (!row_exists_locally && insert_cl > 1))) {
    2009            0 :         int rc = merge_sentinel_only_insert(data, table, insert_pk, insert_pk_len, insert_cl,
    2010            0 :                                             insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid);
    2011            0 :         if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to perform merge_sentinel_only_insert", rc);
    2012            0 :     }
    2013              :     
    2014              :     // at this point, we determine whether the incoming change wins based on causal length
    2015              :     // this can be due to a resurrection, a non-existent local row, or a conflict resolution
    2016        45005 :     bool flag = false;
    2017        45005 :     int rc = merge_did_cid_win(data, table, insert_pk, insert_pk_len, insert_value, insert_site_id, insert_site_id_len, insert_name, insert_col_version, &flag);
    2018        45005 :     if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to perform merge_did_cid_win", rc);
    2019              :     
    2020              :     // check if the incoming change wins and should be applied
    2021        45005 :     bool does_cid_win = ((needs_resurrect) || (!row_exists_locally) || (flag));
    2022        45005 :     if (!does_cid_win) return DBRES_OK;
    2023              : 
    2024              :     // Block-level LWW: if the incoming col_name is a block entry (contains \x1F),
    2025              :     // bypass the normal base-table write and instead store the value in the blocks table.
    2026              :     // The base table column will be materialized from all alive blocks.
    2027        21747 :     if (block_is_block_colname(insert_name) && table->has_block_cols) {
    2028              :         // Store or delete block value in blocks table depending on tombstone status
    2029          412 :         if (insert_col_version % 2 == 0) {
    2030              :             // Tombstone: remove from blocks table
    2031           33 :             rc = block_delete_value(data, table, insert_pk, insert_pk_len, insert_name);
    2032           33 :         } else {
    2033          379 :             rc = block_store_value(data, table, insert_pk, insert_pk_len, insert_name, insert_value);
    2034              :         }
    2035          412 :         if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to store/delete block value", rc);
    2036              : 
    2037              :         // Set winner clock in metadata
    2038          824 :         rc = merge_set_winner_clock(data, table, insert_pk, insert_pk_len, insert_name,
    2039          412 :                                     insert_col_version, insert_db_version,
    2040          412 :                                     insert_site_id, insert_site_id_len, insert_seq, rowid);
    2041          412 :         if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to set winner clock for block", rc);
    2042              : 
    2043              :         // Materialize the full column from blocks into the base table
    2044          412 :         char *base_col = block_extract_base_colname(insert_name);
    2045          412 :         if (base_col) {
    2046          412 :             rc = block_materialize_column(data, table, insert_pk, insert_pk_len, base_col);
    2047          412 :             cloudsync_memory_free(base_col);
    2048          412 :             if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to materialize block column", rc);
    2049          412 :         }
    2050              : 
    2051          412 :         return DBRES_OK;
    2052              :     }
    2053              : 
    2054              :     // perform the final column insert or update if the incoming change wins
    2055        21335 :     if (data->pending_batch) {
    2056              :         // Propagate row_exists_locally to the batch on the first winning column.
    2057              :         // This lets merge_flush_pending choose UPDATE vs INSERT ON CONFLICT,
    2058              :         // which matters when RLS policies reference columns not in the payload.
    2059        21318 :         if (data->pending_batch->table == NULL) {
    2060         7685 :             data->pending_batch->row_exists = row_exists_locally;
    2061         7685 :         }
    2062        21318 :         rc = merge_pending_add(data, table, insert_pk, insert_pk_len, insert_name, insert_value, insert_col_version, insert_db_version, insert_site_id, insert_site_id_len, insert_seq);
    2063        21318 :         if (rc != DBRES_OK) cloudsync_set_error(data, "Unable to perform merge_pending_add", rc);
    2064        21318 :     } else {
    2065           17 :         rc = merge_insert_col(data, table, insert_pk, insert_pk_len, insert_name, insert_value, insert_col_version, insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid);
    2066           17 :         if (rc != DBRES_OK) cloudsync_set_error(data, "Unable to perform merge_insert_col", rc);
    2067              :     }
    2068              : 
    2069        21335 :     return rc;
    2070        46042 : }
    2071              : 
    2072              : // MARK: - Block column setup -
    2073              : 
    2074           69 : int cloudsync_setup_block_column (cloudsync_context *data, const char *table_name, const char *col_name, const char *delimiter) {
    2075           69 :     cloudsync_table_context *table = table_lookup(data, table_name);
    2076           69 :     if (!table) return cloudsync_set_error(data, "cloudsync_setup_block_column: table not found", DBRES_ERROR);
    2077              : 
    2078              :     // Find column index
    2079           69 :     int col_idx = table_col_index(table, col_name);
    2080           69 :     if (col_idx < 0) {
    2081              :         char buf[1024];
    2082            0 :         snprintf(buf, sizeof(buf), "cloudsync_setup_block_column: column '%s' not found in table '%s'", col_name, table_name);
    2083            0 :         return cloudsync_set_error(data, buf, DBRES_ERROR);
    2084              :     }
    2085              : 
    2086              :     // Set column algo
    2087           69 :     table->col_algo[col_idx] = col_algo_block;
    2088           69 :     table->has_block_cols = true;
    2089              : 
    2090              :     // Set delimiter (can be NULL for default)
    2091           69 :     if (table->col_delimiter[col_idx]) {
    2092            0 :         cloudsync_memory_free(table->col_delimiter[col_idx]);
    2093            0 :         table->col_delimiter[col_idx] = NULL;
    2094            0 :     }
    2095           69 :     if (delimiter) {
    2096            0 :         table->col_delimiter[col_idx] = cloudsync_string_dup(delimiter);
    2097            0 :     }
    2098              : 
    2099              :     // Create blocks table if not already done
    2100           69 :     if (!table->blocks_ref) {
    2101           67 :         table->blocks_ref = database_build_blocks_ref(table->schema, table->name);
    2102           67 :         if (!table->blocks_ref) return DBRES_NOMEM;
    2103              : 
    2104              :         // CREATE TABLE IF NOT EXISTS
    2105           67 :         char *sql = cloudsync_memory_mprintf(SQL_BLOCKS_CREATE_TABLE, table->blocks_ref);
    2106           67 :         if (!sql) return DBRES_NOMEM;
    2107              : 
    2108           67 :         int rc = database_exec(data, sql);
    2109           67 :         cloudsync_memory_free(sql);
    2110           67 :         if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to create blocks table", rc);
    2111              : 
    2112              :         // Prepare block statements
    2113              :         // Write: upsert into blocks (pk, col_name, col_value)
    2114           67 :         sql = cloudsync_memory_mprintf(SQL_BLOCKS_UPSERT, table->blocks_ref);
    2115           67 :         if (!sql) return DBRES_NOMEM;
    2116           67 :         rc = databasevm_prepare(data, sql, (void **)&table->block_value_write_stmt, DBFLAG_PERSISTENT);
    2117           67 :         cloudsync_memory_free(sql);
    2118           67 :         if (rc != DBRES_OK) return rc;
    2119              : 
    2120              :         // Read: SELECT col_value FROM blocks WHERE pk = ? AND col_name = ?
    2121           67 :         sql = cloudsync_memory_mprintf(SQL_BLOCKS_SELECT, table->blocks_ref);
    2122           67 :         if (!sql) return DBRES_NOMEM;
    2123           67 :         rc = databasevm_prepare(data, sql, (void **)&table->block_value_read_stmt, DBFLAG_PERSISTENT);
    2124           67 :         cloudsync_memory_free(sql);
    2125           67 :         if (rc != DBRES_OK) return rc;
    2126              : 
    2127              :         // Delete: DELETE FROM blocks WHERE pk = ? AND col_name = ?
    2128           67 :         sql = cloudsync_memory_mprintf(SQL_BLOCKS_DELETE, table->blocks_ref);
    2129           67 :         if (!sql) return DBRES_NOMEM;
    2130           67 :         rc = databasevm_prepare(data, sql, (void **)&table->block_value_delete_stmt, DBFLAG_PERSISTENT);
    2131           67 :         cloudsync_memory_free(sql);
    2132           67 :         if (rc != DBRES_OK) return rc;
    2133              : 
    2134              :         // List alive blocks for materialization
    2135           67 :         sql = cloudsync_memory_mprintf(SQL_BLOCKS_LIST_ALIVE, table->blocks_ref, table->meta_ref);
    2136           67 :         if (!sql) return DBRES_NOMEM;
    2137           67 :         rc = databasevm_prepare(data, sql, (void **)&table->block_list_stmt, DBFLAG_PERSISTENT);
    2138           67 :         cloudsync_memory_free(sql);
    2139           67 :         if (rc != DBRES_OK) return rc;
    2140           67 :     }
    2141              : 
    2142              :     // Persist settings
    2143           69 :     int rc = dbutils_table_settings_set_key_value(data, table_name, col_name, "algo", "block");
    2144           69 :     if (rc != DBRES_OK) return rc;
    2145              : 
    2146           69 :     if (delimiter) {
    2147            0 :         rc = dbutils_table_settings_set_key_value(data, table_name, col_name, "delimiter", delimiter);
    2148            0 :         if (rc != DBRES_OK) return rc;
    2149            0 :     }
    2150              : 
    2151           69 :     return DBRES_OK;
    2152           69 : }
    2153              : 
    2154              : // MARK: - Private -
    2155              : 
    2156          210 : bool cloudsync_config_exists (cloudsync_context *data) {
    2157          210 :     return database_internal_table_exists(data, CLOUDSYNC_SITEID_NAME) == true;
    2158              : }
    2159              : 
    2160          224 : cloudsync_context *cloudsync_context_create (void *db) {
    2161          224 :     cloudsync_context *data = (cloudsync_context *)cloudsync_memory_zeroalloc((uint64_t)(sizeof(cloudsync_context)));
    2162          224 :     if (!data) return NULL;
    2163              :     DEBUG_SETTINGS("cloudsync_context_create %p", data);
    2164              :     
    2165          224 :     data->libversion = CLOUDSYNC_VERSION;
    2166          224 :     data->pending_db_version = CLOUDSYNC_VALUE_NOTSET;
    2167              :     #if CLOUDSYNC_DEBUG
    2168              :     data->debug = 1;
    2169              :     #endif
    2170              :     
    2171              :     // allocate space for 64 tables (it can grow if needed)
    2172          224 :     uint64_t mem_needed = (uint64_t)(CLOUDSYNC_INIT_NTABLES * sizeof(cloudsync_table_context *));
    2173          224 :     data->tables = (cloudsync_table_context **)cloudsync_memory_zeroalloc(mem_needed);
    2174          224 :     if (!data->tables) {cloudsync_memory_free(data); return NULL;}
    2175              :     
    2176          224 :     data->tables_cap = CLOUDSYNC_INIT_NTABLES;
    2177          224 :     data->tables_count = 0;
    2178          224 :     data->db = db;
    2179              :     
    2180              :     // SQLite exposes col_value as ANY, but other databases require a concrete type.
    2181              :     // In PostgreSQL we expose col_value as bytea, which holds the pk-encoded value bytes (type + data).
    2182              :     // Because col_value is already encoded, we skip decoding this field and pass it through as bytea.
    2183              :     // It is decoded to the target column type just before applying changes to the base table.
    2184          224 :     data->skip_decode_idx = (db == NULL) ? CLOUDSYNC_PK_INDEX_COLVALUE : -1;
    2185              : 
    2186          224 :     return data;
    2187          224 : }
    2188              : 
    2189          224 : void cloudsync_context_free (void *ctx) {
    2190          224 :     cloudsync_context *data = (cloudsync_context *)ctx;
    2191              :     DEBUG_SETTINGS("cloudsync_context_free %p", data);
    2192          224 :     if (!data) return;
    2193              : 
    2194              :     // free all table contexts and prepared statements
    2195          224 :     cloudsync_terminate(data);
    2196              : 
    2197          224 :     cloudsync_memory_free(data->tables);
    2198          224 :     cloudsync_memory_free(data);
    2199          224 : }
    2200              : 
    2201          316 : const char *cloudsync_context_init (cloudsync_context *data) {
    2202          316 :     if (!data) return NULL;
    2203              :     
    2204              :     // perform init just the first time, if the site_id field is not set.
    2205              :     // The data->site_id value could exists while settings tables don't exists if the
    2206              :     // cloudsync_context_init was previously called in init transaction that was rolled back
    2207              :     // because of an error during the init process.
    2208          316 :     if (data->site_id[0] == 0 || !database_internal_table_exists(data, CLOUDSYNC_SITEID_NAME)) {
    2209          205 :         if (dbutils_settings_init(data) != DBRES_OK) return NULL;
    2210          205 :         if (cloudsync_add_dbvms(data) != DBRES_OK) return NULL;
    2211          205 :         if (cloudsync_load_siteid(data) != DBRES_OK) return NULL;
    2212          205 :         data->schema_hash = database_schema_hash(data);
    2213          205 :     }
    2214              :     
    2215          316 :     return (const char *)data->site_id;
    2216          316 : }
    2217              : 
    2218         1303 : void cloudsync_sync_key (cloudsync_context *data, const char *key, const char *value) {
    2219              :     DEBUG_SETTINGS("cloudsync_sync_key key: %s value: %s", key, value);
    2220              :     
    2221              :     // sync data
    2222         1303 :     if (strcmp(key, CLOUDSYNC_KEY_SCHEMAVERSION) == 0) {
    2223          205 :         data->schema_version = (int)strtol(value, NULL, 0);
    2224          205 :         return;
    2225              :     }
    2226              :     
    2227         1098 :     if (strcmp(key, CLOUDSYNC_KEY_DEBUG) == 0) {
    2228            0 :         data->debug = 0;
    2229            0 :         if (value && (value[0] != 0) && (value[0] != '0')) data->debug = 1;
    2230            0 :         return;
    2231              :     }
    2232              : 
    2233         1098 :     if (strcmp(key, CLOUDSYNC_KEY_SCHEMA) == 0) {
    2234            0 :         cloudsync_set_schema(data, value);
    2235            0 :         return;
    2236              :     }
    2237         1303 : }
    2238              : 
    2239              : #if 0
    2240              : void cloudsync_sync_table_key(cloudsync_context *data, const char *table, const char *column, const char *key, const char *value) {
    2241              :     DEBUG_SETTINGS("cloudsync_sync_table_key table: %s column: %s key: %s value: %s", table, column, key, value);
    2242              :     // Unused in this version
    2243              :     return;
    2244              : }
    2245              : #endif
    2246              : 
    2247         7847 : int cloudsync_commit_hook (void *ctx) {
    2248         7847 :     cloudsync_context *data = (cloudsync_context *)ctx;
    2249              :     
    2250         7847 :     data->db_version = data->pending_db_version;
    2251         7847 :     data->pending_db_version = CLOUDSYNC_VALUE_NOTSET;
    2252         7847 :     data->seq = 0;
    2253              :     
    2254         7847 :     return DBRES_OK;
    2255              : }
    2256              : 
    2257            3 : void cloudsync_rollback_hook (void *ctx) {
    2258            3 :     cloudsync_context *data = (cloudsync_context *)ctx;
    2259              :     
    2260            3 :     data->pending_db_version = CLOUDSYNC_VALUE_NOTSET;
    2261            3 :     data->seq = 0;
    2262            3 : }
    2263              : 
    2264           24 : int cloudsync_begin_alter (cloudsync_context *data, const char *table_name) {
    2265              :     // init cloudsync_settings
    2266           24 :     if (cloudsync_context_init(data) == NULL) {
    2267            0 :         return DBRES_MISUSE;
    2268              :     }
    2269              : 
    2270              :     // lookup table
    2271           24 :     cloudsync_table_context *table = table_lookup(data, table_name);
    2272           24 :     if (!table) {
    2273              :         char buffer[1024];
    2274            1 :         snprintf(buffer, sizeof(buffer), "Unable to find table %s", table_name);
    2275            1 :         return cloudsync_set_error(data, buffer, DBRES_MISUSE);
    2276              :     }
    2277              : 
    2278              :     // idempotent: if already altering, return OK
    2279           23 :     if (table->is_altering) return DBRES_OK;
    2280              : 
    2281              :     // retrieve primary key(s)
    2282           23 :     char **names = NULL;
    2283           23 :     int nrows = 0;
    2284           23 :     int rc = database_pk_names(data, table_name, &names, &nrows);
    2285           23 :     if (rc != DBRES_OK) {
    2286              :         char buffer[1024];
    2287            0 :         snprintf(buffer, sizeof(buffer), "Unable to get primary keys for table %s", table_name);
    2288            0 :         cloudsync_set_error(data, buffer, DBRES_MISUSE);
    2289            0 :         goto rollback_begin_alter;
    2290              :     }
    2291              : 
    2292              :     // sanity check the number of primary keys
    2293           23 :     if (nrows != table_count_pks(table)) {
    2294              :         char buffer[1024];
    2295            0 :         snprintf(buffer, sizeof(buffer), "Number of primary keys for table %s changed before ALTER", table_name);
    2296            0 :         cloudsync_set_error(data, buffer, DBRES_MISUSE);
    2297            0 :         goto rollback_begin_alter;
    2298              :     }
    2299              : 
    2300              :     // drop original triggers
    2301           23 :     rc = database_delete_triggers(data, table_name);
    2302           23 :     if (rc != DBRES_OK) {
    2303              :         char buffer[1024];
    2304            0 :         snprintf(buffer, sizeof(buffer), "Unable to delete triggers for table %s in cloudsync_begin_alter.", table_name);
    2305            0 :         cloudsync_set_error(data, buffer, DBRES_ERROR);
    2306            0 :         goto rollback_begin_alter;
    2307              :     }
    2308              : 
    2309           23 :     table_set_pknames(table, names);
    2310           23 :     table->is_altering = true;
    2311           23 :     return DBRES_OK;
    2312              : 
    2313              : rollback_begin_alter:
    2314            0 :     if (names) table_pknames_free(names, nrows);
    2315            0 :     return rc;
    2316           24 : }
    2317              : 
    2318           23 : int cloudsync_finalize_alter (cloudsync_context *data, cloudsync_table_context *table) {
    2319              :     // check if dbversion needed to be updated
    2320           23 :     cloudsync_dbversion_check_uptodate(data);
    2321              : 
    2322              :     // if primary-key columns change, all row identities change.
    2323              :     // In that case, the clock table must be dropped, recreated,
    2324              :     // and backfilled. We detect this by comparing the unique index
    2325              :     // in the lookaside table with the source table's PKs.
    2326              :     
    2327              :     // retrieve primary keys (to check is they changed)
    2328           23 :     char **result = NULL;
    2329           23 :     int nrows = 0;
    2330           23 :     int rc = database_pk_names (data, table->name, &result, &nrows);
    2331           23 :     if (rc != DBRES_OK || nrows == 0) {
    2332            0 :         if (nrows == 0) rc = DBRES_MISUSE;
    2333            0 :         goto finalize;
    2334              :     }
    2335              :     
    2336              :     // check if there are differences
    2337           23 :     bool pk_diff = (nrows != table->npks);
    2338           23 :     if (!pk_diff) {
    2339           45 :         for (int i = 0; i < nrows; ++i) {
    2340           34 :             if (strcmp(table->pk_name[i], result[i]) != 0) {
    2341            6 :                 pk_diff = true;
    2342            6 :                 break;
    2343              :             }
    2344           28 :         }
    2345           17 :     }
    2346              :     
    2347           23 :     if (pk_diff) {
    2348              :         // drop meta-table, it will be recreated
    2349           12 :         char *sql = cloudsync_memory_mprintf(SQL_DROP_CLOUDSYNC_TABLE, table->meta_ref);
    2350           12 :         rc = database_exec(data, sql);
    2351           12 :         cloudsync_memory_free(sql);
    2352           12 :         if (rc != DBRES_OK) {
    2353            0 :             DEBUG_DBERROR(rc, "cloudsync_finalize_alter", data);
    2354            0 :             goto finalize;
    2355              :         }
    2356           12 :     } else {
    2357              :         // compact meta-table
    2358              :         // delete entries for removed columns
    2359           11 :         const char *schema = table->schema ? table->schema : "";
    2360           11 :         char *sql = sql_build_delete_cols_not_in_schema_query(schema, table->name, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
    2361           11 :         rc = database_exec(data, sql);
    2362           11 :         cloudsync_memory_free(sql);
    2363           11 :         if (rc != DBRES_OK) {
    2364            0 :             DEBUG_DBERROR(rc, "cloudsync_finalize_alter", data);
    2365            0 :             goto finalize;
    2366              :         }
    2367              :         
    2368           11 :         sql = sql_build_pk_qualified_collist_query(schema, table->name);
    2369           11 :         if (!sql) {rc = DBRES_NOMEM; goto finalize;}
    2370              :         
    2371           11 :         char *pkclause = NULL;
    2372           11 :         rc = database_select_text(data, sql, &pkclause);
    2373           11 :         cloudsync_memory_free(sql);
    2374           11 :         if (rc != DBRES_OK) goto finalize;
    2375           11 :         char *pkvalues = (pkclause) ? pkclause : "rowid";
    2376              :         
    2377              :         // delete entries related to rows that no longer exist in the original table, but preserve tombstone
    2378           11 :         sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_GC_DELETE_ORPHANED_PK, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE, CLOUDSYNC_TOMBSTONE_VALUE, table->base_ref, table->meta_ref, pkvalues);
    2379           11 :         rc = database_exec(data, sql);
    2380           11 :         if (pkclause) cloudsync_memory_free(pkclause);
    2381           11 :         cloudsync_memory_free(sql);
    2382           11 :         if (rc != DBRES_OK) {
    2383            0 :             DEBUG_DBERROR(rc, "cloudsync_finalize_alter", data);
    2384            0 :             goto finalize;
    2385              :         }
    2386              : 
    2387              :     }
    2388              :     
    2389              :     // update key to be later used in cloudsync_dbversion_rebuild
    2390              :     char buf[256];
    2391           23 :     snprintf(buf, sizeof(buf), "%" PRId64, data->db_version);
    2392           23 :     dbutils_settings_set_key_value(data, "pre_alter_dbversion", buf);
    2393              :     
    2394              : finalize:
    2395           23 :     table_pknames_free(result, nrows);
    2396           23 :     return rc;
    2397              : }
    2398              : 
    2399           24 : int cloudsync_commit_alter (cloudsync_context *data, const char *table_name) {
    2400           24 :     int rc = DBRES_MISUSE;
    2401           24 :     cloudsync_table_context *table = NULL;
    2402              : 
    2403              :     // init cloudsync_settings
    2404           24 :     if (cloudsync_context_init(data) == NULL) {
    2405            0 :         cloudsync_set_error(data, "Unable to initialize cloudsync context", DBRES_MISUSE);
    2406            0 :         goto rollback_finalize_alter;
    2407              :     }
    2408              : 
    2409              :     // lookup table
    2410           24 :     table = table_lookup(data, table_name);
    2411           24 :     if (!table) {
    2412              :         char buffer[1024];
    2413            1 :         snprintf(buffer, sizeof(buffer), "Unable to find table %s", table_name);
    2414            1 :         cloudsync_set_error(data, buffer, DBRES_MISUSE);
    2415            1 :         goto rollback_finalize_alter;
    2416              :     }
    2417              : 
    2418              :     // idempotent: if not altering, return OK
    2419           23 :     if (!table->is_altering) return DBRES_OK;
    2420              : 
    2421           23 :     rc = cloudsync_finalize_alter(data, table);
    2422           23 :     if (rc != DBRES_OK) goto rollback_finalize_alter;
    2423              : 
    2424              :     // the table is outdated, delete it and it will be reloaded in the cloudsync_init_internal
    2425              :     // is_altering is reset implicitly because table_free + cloudsync_init_table
    2426              :     // will reallocate the table context with zero-initialized memory
    2427           23 :     table_remove(data, table);
    2428           23 :     table_free(table);
    2429           23 :     table = NULL;
    2430              : 
    2431              :     // init again cloudsync for the table
    2432           23 :     table_algo algo_current = dbutils_table_settings_get_algo(data, table_name);
    2433           23 :     if (algo_current == table_algo_none) algo_current = dbutils_table_settings_get_algo(data, "*");
    2434           23 :     rc = cloudsync_init_table(data, table_name, cloudsync_algo_name(algo_current), true);
    2435           23 :     if (rc != DBRES_OK) goto rollback_finalize_alter;
    2436              : 
    2437           23 :     return DBRES_OK;
    2438              : 
    2439              : rollback_finalize_alter:
    2440            1 :     if (table) {
    2441            0 :         table_set_pknames(table, NULL);
    2442            0 :         table->is_altering = false;
    2443            0 :     }
    2444            1 :     return rc;
    2445           24 : }
    2446              : 
    2447              : // MARK: - Filter Rewrite -
    2448              : 
    2449              : // Replace bare column names in a filter expression with prefix-qualified names.
    2450              : // E.g., filter="user_id = 42", prefix="NEW", columns=["user_id","id"] → "NEW.\"user_id\" = 42"
    2451              : // Columns must be sorted by length descending by the caller to avoid partial matches.
    2452              : // Skips content inside single-quoted string literals.
    2453              : // Returns a newly allocated string (caller must free with cloudsync_memory_free), or NULL on error.
    2454              : // Helper: check if an identifier token matches a column name.
    2455            8 : static bool filter_is_column (const char *token, size_t token_len, char **columns, int ncols) {
    2456           28 :     for (int i = 0; i < ncols; ++i) {
    2457           24 :         if (strlen(columns[i]) == token_len && strncmp(token, columns[i], token_len) == 0)
    2458            4 :             return true;
    2459           20 :     }
    2460            4 :     return false;
    2461            8 : }
    2462              : 
    2463              : // Helper: check if character is part of a SQL identifier.
    2464           56 : static bool filter_is_ident_char (char c) {
    2465           92 :     return (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') ||
    2466           36 :            (c >= '0' && c <= '9') || c == '_';
    2467              : }
    2468              : 
    2469            4 : char *cloudsync_filter_add_row_prefix (const char *filter, const char *prefix, char **columns, int ncols) {
    2470            4 :     if (!filter || !prefix || !columns || ncols <= 0) return NULL;
    2471              : 
    2472            4 :     size_t filter_len = strlen(filter);
    2473            4 :     size_t prefix_len = strlen(prefix);
    2474              : 
    2475              :     // Each identifier match grows by at most (prefix_len + 3) bytes.
    2476              :     // Worst case: the entire filter is one repeated column reference separated by
    2477              :     // single characters, so up to (filter_len / 2) matches.  Use a safe upper bound.
    2478            4 :     size_t max_growth = (filter_len / 2 + 1) * (prefix_len + 3);
    2479            4 :     size_t cap = filter_len + max_growth + 64;
    2480            4 :     char *result = (char *)cloudsync_memory_alloc(cap);
    2481            4 :     if (!result) return NULL;
    2482            4 :     size_t out = 0;
    2483              : 
    2484              :     // Single pass: tokenize into identifiers, quoted strings, and everything else.
    2485            4 :     size_t i = 0;
    2486           24 :     while (i < filter_len) {
    2487              :         // Skip single-quoted string literals verbatim (handle '' escape)
    2488           20 :         if (filter[i] == '\'') {
    2489            0 :             result[out++] = filter[i++];
    2490            0 :             while (i < filter_len) {
    2491            0 :                 if (filter[i] == '\'') {
    2492            0 :                     result[out++] = filter[i++];
    2493              :                     // '' is an escaped quote — keep going
    2494            0 :                     if (i < filter_len && filter[i] == '\'') {
    2495            0 :                         result[out++] = filter[i++];
    2496            0 :                         continue;
    2497              :                     }
    2498            0 :                     break; // single ' ends the literal
    2499              :                 }
    2500            0 :                 result[out++] = filter[i++];
    2501              :             }
    2502            0 :             continue;
    2503              :         }
    2504              : 
    2505              :         // Extract identifier token
    2506           20 :         if (filter_is_ident_char(filter[i])) {
    2507            8 :             size_t start = i;
    2508           40 :             while (i < filter_len && filter_is_ident_char(filter[i])) ++i;
    2509            8 :             size_t token_len = i - start;
    2510              : 
    2511            8 :             if (filter_is_column(&filter[start], token_len, columns, ncols)) {
    2512              :                 // Emit PREFIX."column_name"
    2513            4 :                 memcpy(&result[out], prefix, prefix_len); out += prefix_len;
    2514            4 :                 result[out++] = '.';
    2515            4 :                 result[out++] = '"';
    2516            4 :                 memcpy(&result[out], &filter[start], token_len); out += token_len;
    2517            4 :                 result[out++] = '"';
    2518            4 :             } else {
    2519              :                 // Not a column — copy as-is
    2520            4 :                 memcpy(&result[out], &filter[start], token_len); out += token_len;
    2521              :             }
    2522            8 :             continue;
    2523              :         }
    2524              : 
    2525              :         // Any other character — copy as-is
    2526           12 :         result[out++] = filter[i++];
    2527              :     }
    2528              : 
    2529            4 :     result[out] = '\0';
    2530            4 :     return result;
    2531            4 : }
    2532              : 
    2533          261 : int cloudsync_refill_metatable (cloudsync_context *data, const char *table_name) {
    2534          261 :     cloudsync_table_context *table = table_lookup(data, table_name);
    2535          261 :     if (!table) return DBRES_ERROR;
    2536              : 
    2537          261 :     dbvm_t *vm = NULL;
    2538          261 :     int64_t db_version = cloudsync_dbversion_next(data, CLOUDSYNC_VALUE_NOTSET);
    2539              : 
    2540              :     // Read row-level filter from settings (if any)
    2541              :     char filter_buf[2048];
    2542          261 :     int frc = dbutils_table_settings_get_value(data, table_name, "*", "filter", filter_buf, sizeof(filter_buf));
    2543          261 :     const char *filter = (frc == DBRES_OK && filter_buf[0]) ? filter_buf : NULL;
    2544              : 
    2545          261 :     const char *schema = table->schema ? table->schema : "";
    2546          261 :     char *sql = sql_build_pk_collist_query(schema, table_name);
    2547          261 :     char *pkclause_identifiers = NULL;
    2548          261 :     int rc = database_select_text(data, sql, &pkclause_identifiers);
    2549          261 :     cloudsync_memory_free(sql);
    2550          261 :     if (rc != DBRES_OK) goto finalize;
    2551          261 :     char *pkvalues_identifiers = (pkclause_identifiers) ? pkclause_identifiers : "rowid";
    2552              : 
    2553              :     // Use database-specific query builder to handle type differences in composite PKs
    2554          261 :     sql = sql_build_insert_missing_pks_query(schema, table_name, pkvalues_identifiers, table->base_ref, table->meta_ref, filter);
    2555          261 :     if (!sql) {rc = DBRES_NOMEM; goto finalize;}
    2556          261 :     rc = database_exec(data, sql);
    2557          261 :     cloudsync_memory_free(sql);
    2558          261 :     if (rc != DBRES_OK) goto finalize;
    2559              : 
    2560              :     // fill missing colums
    2561              :     // for each non-pk column:
    2562              :     // The new query does 1 encode per source row and one indexed NOT-EXISTS probe.
    2563              :     // The old plan does many decodes per candidate and can't use an index to rule out matches quickly—so it burns CPU and I/O.
    2564              : 
    2565          261 :     if (filter) {
    2566            0 :         sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_PKS_NOT_IN_SYNC_FOR_COL_FILTERED, pkvalues_identifiers, table->base_ref, filter, table->meta_ref);
    2567            0 :     } else {
    2568          261 :         sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_PKS_NOT_IN_SYNC_FOR_COL, pkvalues_identifiers, table->base_ref, table->meta_ref);
    2569              :     }
    2570          261 :     rc = databasevm_prepare(data, sql, (void **)&vm, DBFLAG_PERSISTENT);
    2571          261 :     cloudsync_memory_free(sql);
    2572          261 :     if (rc != DBRES_OK) goto finalize;
    2573              :      
    2574         1310 :     for (int i=0; i<table->ncols; ++i) {
    2575         1049 :         char *col_name = table->col_name[i];
    2576              : 
    2577         1049 :         rc = databasevm_bind_text(vm, 1, col_name, -1);
    2578         1049 :         if (rc != DBRES_OK) goto finalize;
    2579              : 
    2580         1087 :         while (1) {
    2581         1087 :             rc = databasevm_step(vm);
    2582         1087 :             if (rc == DBRES_ROW) {
    2583           38 :                 size_t pklen = 0;
    2584           38 :                 const void *pk = (const char *)database_column_blob(vm, 0, &pklen);
    2585           38 :                 if (!pk) { rc = DBRES_ERROR; break; }
    2586           38 :                 rc = local_mark_insert_or_update_meta(table, pk, pklen, col_name, db_version, cloudsync_bumpseq(data));
    2587         1087 :             } else if (rc == DBRES_DONE) {
    2588         1049 :                 rc = DBRES_OK;
    2589         1049 :                 break;
    2590              :             } else {
    2591            0 :                 break;
    2592              :             }
    2593              :         }
    2594         1049 :         if (rc != DBRES_OK) goto finalize;
    2595              : 
    2596         1049 :         databasevm_reset(vm);
    2597         1310 :     }
    2598              :     
    2599              : finalize:
    2600          261 :     if (rc != DBRES_OK) {DEBUG_ALWAYS("cloudsync_refill_metatable error: %s", database_errmsg(data));}
    2601          261 :     if (pkclause_identifiers) cloudsync_memory_free(pkclause_identifiers);
    2602          261 :     if (vm) databasevm_finalize(vm);
    2603          261 :     return rc;
    2604          261 : }
    2605              : 
    2606              : // MARK: - Local -
    2607              : 
    2608            4 : int local_update_sentinel (cloudsync_table_context *table, const void *pk, size_t pklen, int64_t db_version, int seq) {
    2609            4 :     dbvm_t *vm = table->meta_sentinel_update_stmt;
    2610            4 :     if (!vm) return -1;
    2611              :     
    2612            4 :     int rc = databasevm_bind_int(vm, 1, db_version);
    2613            4 :     if (rc != DBRES_OK) goto cleanup;
    2614              :     
    2615            4 :     rc = databasevm_bind_int(vm, 2, seq);
    2616            4 :     if (rc != DBRES_OK) goto cleanup;
    2617              :     
    2618            4 :     rc = databasevm_bind_blob(vm, 3, pk, (int)pklen);
    2619            4 :     if (rc != DBRES_OK) goto cleanup;
    2620              :     
    2621            4 :     rc = databasevm_step(vm);
    2622            4 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    2623              :     
    2624              : cleanup:
    2625            4 :     DEBUG_DBERROR(rc, "local_update_sentinel", table->context);
    2626            4 :     databasevm_reset(vm);
    2627            4 :     return rc;
    2628            4 : }
    2629              : 
    2630          128 : int local_mark_insert_sentinel_meta (cloudsync_table_context *table, const void *pk, size_t pklen, int64_t db_version, int seq) {
    2631          128 :     dbvm_t *vm = table->meta_sentinel_insert_stmt;
    2632          128 :     if (!vm) return -1;
    2633              :     
    2634          128 :     int rc = databasevm_bind_blob(vm, 1, pk, (int)pklen);
    2635          128 :     if (rc != DBRES_OK) goto cleanup;
    2636              :     
    2637          128 :     rc = databasevm_bind_int(vm, 2, db_version);
    2638          128 :     if (rc != DBRES_OK) goto cleanup;
    2639              :     
    2640          128 :     rc = databasevm_bind_int(vm, 3, seq);
    2641          128 :     if (rc != DBRES_OK) goto cleanup;
    2642              :     
    2643          128 :     rc = databasevm_bind_int(vm, 4, db_version);
    2644          128 :     if (rc != DBRES_OK) goto cleanup;
    2645              :     
    2646          128 :     rc = databasevm_bind_int(vm, 5, seq);
    2647          128 :     if (rc != DBRES_OK) goto cleanup;
    2648              :     
    2649          128 :     rc = databasevm_step(vm);
    2650          128 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    2651              :     
    2652              : cleanup:
    2653          128 :     DEBUG_DBERROR(rc, "local_insert_sentinel", table->context);
    2654          128 :     databasevm_reset(vm);
    2655          128 :     return rc;
    2656          128 : }
    2657              : 
    2658        11781 : int local_mark_insert_or_update_meta_impl (cloudsync_table_context *table, const void *pk, size_t pklen, const char *col_name, int col_version, int64_t db_version, int seq) {
    2659              :     
    2660        11781 :     dbvm_t *vm = table->meta_row_insert_update_stmt;
    2661        11781 :     if (!vm) return -1;
    2662              :     
    2663        11781 :     int rc = databasevm_bind_blob(vm, 1, pk, pklen);
    2664        11781 :     if (rc != DBRES_OK) goto cleanup;
    2665              :     
    2666        11781 :     rc = databasevm_bind_text(vm, 2, (col_name) ? col_name : CLOUDSYNC_TOMBSTONE_VALUE, -1);
    2667        11781 :     if (rc != DBRES_OK) goto cleanup;
    2668              :     
    2669        11781 :     rc = databasevm_bind_int(vm, 3, col_version);
    2670        11781 :     if (rc != DBRES_OK) goto cleanup;
    2671              :     
    2672        11781 :     rc = databasevm_bind_int(vm, 4, db_version);
    2673        11781 :     if (rc != DBRES_OK) goto cleanup;
    2674              :     
    2675        11781 :     rc = databasevm_bind_int(vm, 5, seq);
    2676        11781 :     if (rc != DBRES_OK) goto cleanup;
    2677              :     
    2678        11781 :     rc = databasevm_bind_int(vm, 6, db_version);
    2679        11781 :     if (rc != DBRES_OK) goto cleanup;
    2680              :     
    2681        11781 :     rc = databasevm_bind_int(vm, 7, seq);
    2682        11781 :     if (rc != DBRES_OK) goto cleanup;
    2683              :     
    2684        11781 :     rc = databasevm_step(vm);
    2685        11781 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    2686              :     
    2687              : cleanup:
    2688        11781 :     DEBUG_DBERROR(rc, "local_insert_or_update", table->context);
    2689        11781 :     databasevm_reset(vm);
    2690        11781 :     return rc;
    2691        11781 : }
    2692              : 
    2693        11676 : int local_mark_insert_or_update_meta (cloudsync_table_context *table, const void *pk, size_t pklen, const char *col_name, int64_t db_version, int seq) {
    2694        11676 :     return local_mark_insert_or_update_meta_impl(table, pk, pklen, col_name, 1, db_version, seq);
    2695              : }
    2696              : 
    2697           39 : int local_mark_delete_block_meta (cloudsync_table_context *table, const void *pk, size_t pklen, const char *block_colname, int64_t db_version, int seq) {
    2698              :     // Mark a block as deleted by setting col_version = 2 (even = deleted)
    2699           39 :     return local_mark_insert_or_update_meta_impl(table, pk, pklen, block_colname, 2, db_version, seq);
    2700              : }
    2701              : 
    2702           39 : int block_delete_value_external (cloudsync_context *data, cloudsync_table_context *table, const void *pk, size_t pklen, const char *block_colname) {
    2703           39 :     return block_delete_value(data, table, pk, (int)pklen, block_colname);
    2704              : }
    2705              : 
    2706           66 : int local_mark_delete_meta (cloudsync_table_context *table, const void *pk, size_t pklen, int64_t db_version, int seq) {
    2707           66 :     return local_mark_insert_or_update_meta_impl(table, pk, pklen, NULL, 2, db_version, seq);
    2708              : }
    2709              : 
    2710           35 : int local_drop_meta (cloudsync_table_context *table, const void *pk, size_t pklen) {
    2711           35 :     dbvm_t *vm = table->meta_row_drop_stmt;
    2712           35 :     if (!vm) return -1;
    2713              :     
    2714           35 :     int rc = databasevm_bind_blob(vm, 1, pk, pklen);
    2715           35 :     if (rc != DBRES_OK) goto cleanup;
    2716              :     
    2717           35 :     rc = databasevm_step(vm);
    2718           35 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    2719              :     
    2720              : cleanup:
    2721           35 :     DEBUG_DBERROR(rc, "local_drop_meta", table->context);
    2722           35 :     databasevm_reset(vm);
    2723           35 :     return rc;
    2724           35 : }
    2725              : 
    2726           31 : int local_update_move_meta (cloudsync_table_context *table, const void *pk, size_t pklen, const void *pk2, size_t pklen2, int64_t db_version) {
    2727              :     /*
    2728              :       * This function moves non-sentinel metadata entries from an old primary key (OLD.pk)
    2729              :       * to a new primary key (NEW.pk) when a primary key change occurs.
    2730              :       *
    2731              :       * To ensure consistency and proper conflict resolution in a CRDT (Conflict-free Replicated Data Type) system,
    2732              :       * each non-sentinel metadata entry involved in the move must have a unique sequence value (seq).
    2733              :       *
    2734              :       * The `seq` is crucial for tracking the order of operations and for detecting and resolving conflicts
    2735              :       * during synchronization between replicas. Without a unique `seq` for each entry, concurrent updates
    2736              :       * may be applied incorrectly, leading to data inconsistency.
    2737              :       *
    2738              :       * When performing the update, a unique `seq` must be assigned to each metadata row. This can be achieved
    2739              :       * by either incrementing the maximum sequence value in the table or using a function (e.g., cloudsync_bumpseq(data))
    2740              :       * that generates a unique sequence for each row. The update query should ensure that each row moved
    2741              :       * from OLD.pk to NEW.pk gets a distinct `seq` to maintain proper versioning and ordering of changes.
    2742              :      */
    2743              :     
    2744              :     // see https://github.com/sqliteai/sqlite-sync/blob/main/docs/PriKey.md for more details
    2745              :     // pk2 is the old pk
    2746              :     
    2747           31 :     dbvm_t *vm = table->meta_update_move_stmt;
    2748           31 :     if (!vm) return -1;
    2749              :     
    2750              :     // new primary key
    2751           31 :     int rc = databasevm_bind_blob(vm, 1, pk, pklen);
    2752           31 :     if (rc != DBRES_OK) goto cleanup;
    2753              :     
    2754              :     // new db_version
    2755           31 :     rc = databasevm_bind_int(vm, 2, db_version);
    2756           31 :     if (rc != DBRES_OK) goto cleanup;
    2757              :     
    2758              :     // old primary key
    2759           31 :     rc = databasevm_bind_blob(vm, 3, pk2, pklen2);
    2760           31 :     if (rc != DBRES_OK) goto cleanup;
    2761              :     
    2762           31 :     rc = databasevm_step(vm);
    2763           31 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    2764              :     
    2765              : cleanup:
    2766           31 :     DEBUG_DBERROR(rc, "local_update_move_meta", table->context);
    2767           31 :     databasevm_reset(vm);
    2768           31 :     return rc;
    2769           31 : }
    2770              : 
    2771              : // MARK: - Payload Encode / Decode -
    2772              : 
    2773          677 : static void cloudsync_payload_checksum_store (cloudsync_payload_header *header, uint64_t checksum) {
    2774          677 :     uint64_t h = checksum & 0xFFFFFFFFFFFFULL; // keep 48 bits
    2775          677 :     header->checksum[0] = (uint8_t)(h >> 40);
    2776          677 :     header->checksum[1] = (uint8_t)(h >> 32);
    2777          677 :     header->checksum[2] = (uint8_t)(h >> 24);
    2778          677 :     header->checksum[3] = (uint8_t)(h >> 16);
    2779          677 :     header->checksum[4] = (uint8_t)(h >>  8);
    2780          677 :     header->checksum[5] = (uint8_t)(h >>  0);
    2781          677 : }
    2782              : 
    2783          672 : static uint64_t cloudsync_payload_checksum_load (cloudsync_payload_header *header) {
    2784         2016 :     return ((uint64_t)header->checksum[0] << 40) |
    2785         1344 :            ((uint64_t)header->checksum[1] << 32) |
    2786         1344 :            ((uint64_t)header->checksum[2] << 24) |
    2787         1344 :            ((uint64_t)header->checksum[3] << 16) |
    2788         1344 :            ((uint64_t)header->checksum[4] <<  8) |
    2789          672 :            ((uint64_t)header->checksum[5] <<  0);
    2790              : }
    2791              : 
    2792          672 : static bool cloudsync_payload_checksum_verify (cloudsync_payload_header *header, uint64_t checksum) {
    2793          672 :     uint64_t checksum1 = cloudsync_payload_checksum_load(header);
    2794          672 :     uint64_t checksum2 = checksum & 0xFFFFFFFFFFFFULL;
    2795          672 :     return (checksum1 == checksum2);
    2796              : }
    2797              : 
    2798        49136 : static bool cloudsync_payload_encode_check (cloudsync_payload_context *payload, size_t needed) {
    2799        49136 :     if (payload->nrows == 0) needed += sizeof(cloudsync_payload_header);
    2800              :     
    2801              :     // alloc/resize buffer
    2802        49136 :     if (payload->bused + needed > payload->balloc) {
    2803          686 :         if (needed < CLOUDSYNC_PAYLOAD_MINBUF_SIZE) needed = CLOUDSYNC_PAYLOAD_MINBUF_SIZE;
    2804          686 :         size_t balloc = payload->balloc + needed;
    2805              :         
    2806          686 :         char *buffer = cloudsync_memory_realloc(payload->buffer, balloc);
    2807          686 :         if (!buffer) {
    2808            0 :             if (payload->buffer) cloudsync_memory_free(payload->buffer);
    2809            0 :             memset(payload, 0, sizeof(cloudsync_payload_context));
    2810            0 :             return false;
    2811              :         }
    2812              :         
    2813          686 :         payload->buffer = buffer;
    2814          686 :         payload->balloc = balloc;
    2815          686 :         if (payload->nrows == 0) payload->bused = sizeof(cloudsync_payload_header);
    2816          686 :     }
    2817              :     
    2818        49136 :     return true;
    2819        49136 : }
    2820              : 
    2821        50498 : size_t cloudsync_payload_context_size (size_t *header_size) {
    2822        50498 :     if (header_size) *header_size = sizeof(cloudsync_payload_header);
    2823        50498 :     return sizeof(cloudsync_payload_context);
    2824              : }
    2825              : 
    2826          677 : void cloudsync_payload_header_init (cloudsync_payload_header *header, uint32_t expanded_size, uint16_t ncols, uint32_t nrows, uint64_t hash) {
    2827          677 :     memset(header, 0, sizeof(cloudsync_payload_header));
    2828              :     assert(sizeof(cloudsync_payload_header)==32);
    2829              :     
    2830              :     int major, minor, patch;
    2831          677 :     sscanf(CLOUDSYNC_VERSION, "%d.%d.%d", &major, &minor, &patch);
    2832              :     
    2833          677 :     header->signature = htonl(CLOUDSYNC_PAYLOAD_SIGNATURE);
    2834          677 :     header->version = CLOUDSYNC_PAYLOAD_VERSION_2;
    2835          677 :     header->libversion[0] = (uint8_t)major;
    2836          677 :     header->libversion[1] = (uint8_t)minor;
    2837          677 :     header->libversion[2] = (uint8_t)patch;
    2838          677 :     header->expanded_size = htonl(expanded_size);
    2839          677 :     header->ncols = htons(ncols);
    2840          677 :     header->nrows = htonl(nrows);
    2841          677 :     header->schema_hash = htonll(hash);
    2842          677 : }
    2843              : 
    2844        49136 : int cloudsync_payload_encode_step (cloudsync_payload_context *payload, cloudsync_context *data, int argc, dbvalue_t **argv) {
    2845              :     DEBUG_FUNCTION("cloudsync_payload_encode_step");
    2846              :     // debug_values(argc, argv);
    2847              :     
    2848              :     // check if the step function is called for the first time
    2849        49136 :     if (payload->nrows == 0) payload->ncols = (uint16_t)argc;
    2850              :     
    2851        49136 :     size_t breq = pk_encode_size((dbvalue_t **)argv, argc, 0, data->skip_decode_idx);
    2852        49136 :     if (cloudsync_payload_encode_check(payload, breq) == false) {
    2853            0 :         return cloudsync_set_error(data, "Not enough memory to resize payload internal buffer", DBRES_NOMEM);
    2854              :     }
    2855              :     
    2856        49136 :     char *buffer = payload->buffer + payload->bused;
    2857        49136 :     size_t bsize = payload->balloc - payload->bused;
    2858        49136 :     char *p = pk_encode((dbvalue_t **)argv, argc, buffer, false, &bsize, data->skip_decode_idx);
    2859        49136 :     if (!p) return cloudsync_set_error(data, "An error occurred while encoding payload", DBRES_ERROR);
    2860              :     
    2861              :     // update buffer
    2862        49136 :     payload->bused += breq;
    2863              :     
    2864              :     // increment row counter
    2865        49136 :     ++payload->nrows;
    2866              :     
    2867        49136 :     return DBRES_OK;
    2868        49136 : }
    2869              : 
    2870          685 : int cloudsync_payload_encode_final (cloudsync_payload_context *payload, cloudsync_context *data) {
    2871              :     DEBUG_FUNCTION("cloudsync_payload_encode_final");
    2872              :     
    2873          685 :     if (payload->nrows == 0) {
    2874            8 :         if (payload->buffer) cloudsync_memory_free(payload->buffer);
    2875            8 :         payload->buffer = NULL;
    2876            8 :         payload->bsize = 0;
    2877            8 :         return DBRES_OK;
    2878              :     }
    2879              :     
    2880          677 :     if (payload->nrows > UINT32_MAX) {
    2881            0 :         if (payload->buffer) cloudsync_memory_free(payload->buffer);
    2882            0 :         payload->buffer = NULL;
    2883            0 :         payload->bsize = 0;
    2884            0 :         cloudsync_set_error(data, "Maximum number of payload rows reached", DBRES_ERROR);
    2885            0 :         return DBRES_ERROR;
    2886              :     }
    2887              :     
    2888              :     // sanity check about buffer size
    2889          677 :     int header_size = (int)sizeof(cloudsync_payload_header);
    2890          677 :     int64_t buffer_size = (int64_t)payload->bused - (int64_t)header_size;
    2891          677 :     if (buffer_size < 0) {
    2892            0 :         if (payload->buffer) cloudsync_memory_free(payload->buffer);
    2893            0 :         payload->buffer = NULL;
    2894            0 :         payload->bsize = 0;
    2895            0 :         cloudsync_set_error(data, "cloudsync_encode: internal size underflow", DBRES_ERROR);
    2896            0 :         return DBRES_ERROR;
    2897              :     }
    2898          677 :     if (buffer_size > INT_MAX) {
    2899            0 :         if (payload->buffer) cloudsync_memory_free(payload->buffer);
    2900            0 :         payload->buffer = NULL;
    2901            0 :         payload->bsize = 0;
    2902            0 :         cloudsync_set_error(data, "cloudsync_encode: payload too large to compress (INT_MAX limit)", DBRES_ERROR);
    2903            0 :         return DBRES_ERROR;
    2904              :     }
    2905              :     // try to allocate buffer used for compressed data
    2906          677 :     int real_buffer_size = (int)buffer_size;
    2907          677 :     int zbound = LZ4_compressBound(real_buffer_size);
    2908          677 :     char *zbuffer = cloudsync_memory_alloc(zbound + header_size); // if for some reasons allocation fails then just skip compression
    2909              :     
    2910              :     // skip the reserved header from the buffer to compress
    2911          677 :     char *src_buffer = payload->buffer + sizeof(cloudsync_payload_header);
    2912          677 :     int zused = (zbuffer) ? LZ4_compress_default(src_buffer, zbuffer+header_size, real_buffer_size, zbound) : 0;
    2913          677 :     bool use_uncompressed_buffer = (!zused || zused > real_buffer_size);
    2914          677 :     CHECK_FORCE_UNCOMPRESSED_BUFFER();
    2915              :     
    2916              :     // setup payload header
    2917          677 :     cloudsync_payload_header header = {0};
    2918          677 :     uint32_t expanded_size = (use_uncompressed_buffer) ? 0 : real_buffer_size;
    2919          677 :     cloudsync_payload_header_init(&header, expanded_size, payload->ncols, (uint32_t)payload->nrows, data->schema_hash);
    2920              :     
    2921              :     // if compression fails or if compressed size is bigger than original buffer, then use the uncompressed buffer
    2922          677 :     if (use_uncompressed_buffer) {
    2923           15 :         if (zbuffer) cloudsync_memory_free(zbuffer);
    2924           15 :         zbuffer = payload->buffer;
    2925           15 :         zused = real_buffer_size;
    2926           15 :     }
    2927              :     
    2928              :     // compute checksum of the buffer
    2929          677 :     uint64_t checksum = pk_checksum(zbuffer + header_size, zused);
    2930          677 :     cloudsync_payload_checksum_store(&header, checksum);
    2931              :     
    2932              :     // copy header and data to SQLite BLOB
    2933          677 :     memcpy(zbuffer, &header, sizeof(cloudsync_payload_header));
    2934          677 :     int blob_size = zused + sizeof(cloudsync_payload_header);
    2935          677 :     payload->bsize = blob_size;
    2936              :     
    2937              :     // cleanup memory
    2938          677 :     if (zbuffer != payload->buffer) {
    2939          662 :         cloudsync_memory_free (payload->buffer);
    2940          662 :         payload->buffer = zbuffer;
    2941          662 :     }
    2942              :     
    2943          677 :     return DBRES_OK;
    2944          685 : }
    2945              : 
    2946          685 : char *cloudsync_payload_blob (cloudsync_payload_context *payload, int64_t *blob_size, int64_t *nrows) {
    2947              :     DEBUG_FUNCTION("cloudsync_payload_blob");
    2948              :     
    2949          685 :     if (blob_size) *blob_size = (int64_t)payload->bsize;
    2950          685 :     if (nrows) *nrows = (int64_t)payload->nrows;
    2951          685 :     return payload->buffer;
    2952              : }
    2953              : 
    2954       441504 : static int cloudsync_payload_decode_callback (void *xdata, int index, int type, int64_t ival, double dval, char *pval) {
    2955       441504 :     cloudsync_pk_decode_bind_context *decode_context = (cloudsync_pk_decode_bind_context*)xdata;
    2956       441504 :     int rc = pk_decode_bind_callback(decode_context->vm, index, type, ival, dval, pval);
    2957              :     
    2958       441504 :     if (rc == DBRES_OK) {
    2959              :         // the dbversion index is smaller than seq index, so it is processed first
    2960              :         // when processing the dbversion column: save the value to the tmp_dbversion field
    2961              :         // when processing the seq column: update the dbversion and seq fields only if the current dbversion is greater than the last max value
    2962       441504 :         switch (index) {
    2963              :             case CLOUDSYNC_PK_INDEX_TBL:
    2964        49056 :                 if (type == DBTYPE_TEXT) {
    2965        49056 :                     decode_context->tbl = pval;
    2966        49056 :                     decode_context->tbl_len = ival;
    2967        49056 :                 }
    2968        49056 :                 break;
    2969              :             case CLOUDSYNC_PK_INDEX_PK:
    2970        49056 :                 if (type == DBTYPE_BLOB) {
    2971        49056 :                     decode_context->pk = pval;
    2972        49056 :                     decode_context->pk_len = ival;
    2973        49056 :                 }
    2974        49056 :                 break;
    2975              :             case CLOUDSYNC_PK_INDEX_COLNAME:
    2976        49056 :                 if (type == DBTYPE_TEXT) {
    2977        49056 :                     decode_context->col_name = pval;
    2978        49056 :                     decode_context->col_name_len = ival;
    2979        49056 :                 }
    2980        49056 :                 break;
    2981              :             case CLOUDSYNC_PK_INDEX_COLVERSION:
    2982        49056 :                 if (type == DBTYPE_INTEGER) decode_context->col_version = ival;
    2983        49056 :                 break;
    2984              :             case CLOUDSYNC_PK_INDEX_DBVERSION:
    2985        49056 :                 if (type == DBTYPE_INTEGER) decode_context->db_version = ival;
    2986        49056 :                 break;
    2987              :             case CLOUDSYNC_PK_INDEX_SITEID:
    2988        49056 :                 if (type == DBTYPE_BLOB) {
    2989        49056 :                     decode_context->site_id = pval;
    2990        49056 :                     decode_context->site_id_len = ival;
    2991        49056 :                 }
    2992        49056 :                 break;
    2993              :             case CLOUDSYNC_PK_INDEX_CL:
    2994        49056 :                 if (type == DBTYPE_INTEGER) decode_context->cl = ival;
    2995        49056 :                 break;
    2996              :             case CLOUDSYNC_PK_INDEX_SEQ:
    2997        49056 :                 if (type == DBTYPE_INTEGER) decode_context->seq = ival;
    2998        49056 :                 break;
    2999              :         }
    3000       441504 :     }
    3001              :         
    3002       441504 :     return rc;
    3003              : }
    3004              : 
    3005              : // #ifndef CLOUDSYNC_OMIT_RLS_VALIDATION
    3006              : 
    3007          674 : int cloudsync_payload_apply (cloudsync_context *data, const char *payload, int blen, int *pnrows) {
    3008              :     // sanity check
    3009          674 :     if (blen < (int)sizeof(cloudsync_payload_header)) return cloudsync_set_error(data, "Error on cloudsync_payload_apply: invalid payload length", DBRES_MISUSE);
    3010              :     
    3011              :     // decode header
    3012              :     cloudsync_payload_header header;
    3013          674 :     memcpy(&header, payload, sizeof(cloudsync_payload_header));
    3014              :     
    3015          674 :     header.signature = ntohl(header.signature);
    3016          674 :     header.expanded_size = ntohl(header.expanded_size);
    3017          674 :     header.ncols = ntohs(header.ncols);
    3018          674 :     header.nrows = ntohl(header.nrows);
    3019          674 :     header.schema_hash = ntohll(header.schema_hash);
    3020              :     
    3021              :     // compare schema_hash only if not disabled and if the received payload was created with the current header version
    3022              :     // to avoid schema hash mismatch when processed by a peer with a different extension version during software updates.
    3023          674 :     if (dbutils_settings_get_int64_value(data, CLOUDSYNC_KEY_SKIP_SCHEMA_HASH_CHECK) == 0 && header.version == CLOUDSYNC_PAYLOAD_VERSION_LATEST ) {
    3024          674 :         if (header.schema_hash != data->schema_hash) {
    3025            4 :             if (!database_check_schema_hash(data, header.schema_hash)) {
    3026              :                 char buffer[1024];
    3027            2 :                 snprintf(buffer, sizeof(buffer), "Cannot apply the received payload because the schema hash is unknown %llu.", header.schema_hash);
    3028            2 :                 return cloudsync_set_error(data, buffer, DBRES_MISUSE);
    3029              :             }
    3030            2 :         }
    3031          672 :     }
    3032              :     
    3033              :     // sanity check header
    3034          672 :     if ((header.signature != CLOUDSYNC_PAYLOAD_SIGNATURE) || (header.ncols == 0)) {
    3035            0 :         return cloudsync_set_error(data, "Error on cloudsync_payload_apply: invalid signature or column size", DBRES_MISUSE);
    3036              :     }
    3037              :     
    3038          672 :     const char *buffer = payload + sizeof(cloudsync_payload_header);
    3039          672 :     size_t buf_len = (size_t)blen - sizeof(cloudsync_payload_header);
    3040              : 
    3041              :     // sanity check checksum (only if version is >= 2)
    3042          672 :     if (header.version >= CLOUDSYNC_PAYLOAD_MIN_VERSION_WITH_CHECKSUM) {
    3043          672 :         uint64_t checksum = pk_checksum(buffer, buf_len);
    3044          672 :         if (cloudsync_payload_checksum_verify(&header, checksum) == false) {
    3045            1 :             return cloudsync_set_error(data, "Error on cloudsync_payload_apply: invalid checksum", DBRES_MISUSE);
    3046              :         }
    3047          671 :     }
    3048              : 
    3049              :     // check if payload is compressed
    3050          671 :     char *clone = NULL;
    3051          671 :     if (header.expanded_size != 0) {
    3052          656 :         clone = (char *)cloudsync_memory_alloc(header.expanded_size);
    3053          656 :         if (!clone) return cloudsync_set_error(data, "Unable to allocate memory to uncompress payload", DBRES_NOMEM);
    3054              : 
    3055          656 :         int lz4_rc = LZ4_decompress_safe(buffer, clone, (int)buf_len, (int)header.expanded_size);
    3056          656 :         if (lz4_rc <= 0 || (uint32_t)lz4_rc != header.expanded_size) {
    3057            0 :             if (clone) cloudsync_memory_free(clone);
    3058            0 :             return cloudsync_set_error(data, "Error on cloudsync_payload_apply: unable to decompress BLOB", DBRES_MISUSE);
    3059              :         }
    3060              : 
    3061          656 :         buffer = (const char *)clone;
    3062          656 :         buf_len = (size_t)header.expanded_size;
    3063          656 :     }
    3064              :     
    3065              :     // precompile the insert statement
    3066          671 :     dbvm_t *vm = NULL;
    3067          671 :     int rc = databasevm_prepare(data, SQL_CHANGES_INSERT_ROW, &vm, 0);
    3068          671 :     if (rc != DBRES_OK) {
    3069            0 :         if (clone) cloudsync_memory_free(clone);
    3070            0 :         return cloudsync_set_error(data, "Error on cloudsync_payload_apply: error while compiling SQL statement", rc);
    3071              :     }
    3072              :     
    3073              :     // process buffer, one row at a time
    3074          671 :     uint16_t ncols = header.ncols;
    3075          671 :     uint32_t nrows = header.nrows;
    3076          671 :     int64_t last_payload_db_version = -1;
    3077          671 :     int dbversion = dbutils_settings_get_int_value(data, CLOUDSYNC_KEY_CHECK_DBVERSION);
    3078          671 :     int seq = dbutils_settings_get_int_value(data, CLOUDSYNC_KEY_CHECK_SEQ);
    3079          671 :     cloudsync_pk_decode_bind_context decoded_context = {.vm = vm};
    3080              : 
    3081              :     // Initialize deferred column-batch merge
    3082          671 :     merge_pending_batch batch = {0};
    3083          671 :     data->pending_batch = &batch;
    3084          671 :     bool in_savepoint = false;
    3085          671 :     const void *last_pk = NULL;
    3086          671 :     int64_t last_pk_len = 0;
    3087          671 :     const char *last_tbl = NULL;
    3088          671 :     int64_t last_tbl_len = 0;
    3089              : 
    3090        49727 :     for (uint32_t i=0; i<nrows; ++i) {
    3091        49056 :         size_t seek = 0;
    3092        49056 :         int res = pk_decode((char *)buffer, buf_len, ncols, &seek, data->skip_decode_idx, cloudsync_payload_decode_callback, &decoded_context);
    3093        49056 :         if (res == -1) {
    3094            0 :             merge_flush_pending(data);
    3095            0 :             data->pending_batch = NULL;
    3096            0 :             if (batch.cached_vm) { databasevm_finalize(batch.cached_vm); batch.cached_vm = NULL; }
    3097            0 :             if (batch.cached_col_names) { cloudsync_memory_free(batch.cached_col_names); batch.cached_col_names = NULL; }
    3098            0 :             if (batch.entries) { cloudsync_memory_free(batch.entries); batch.entries = NULL; }
    3099            0 :             if (in_savepoint) database_rollback_savepoint(data, "cloudsync_payload_apply");
    3100            0 :             rc = DBRES_ERROR;
    3101            0 :             goto cleanup;
    3102              :         }
    3103              : 
    3104              :         // Detect PK/table/db_version boundary to flush pending batch
    3105        97441 :         bool pk_changed = (last_pk != NULL &&
    3106        48385 :                            (last_pk_len != decoded_context.pk_len ||
    3107        46469 :                             memcmp(last_pk, decoded_context.pk, last_pk_len) != 0));
    3108        97441 :         bool tbl_changed = (last_tbl != NULL &&
    3109        48385 :                             (last_tbl_len != decoded_context.tbl_len ||
    3110        48315 :                              memcmp(last_tbl, decoded_context.tbl, last_tbl_len) != 0));
    3111        49056 :         bool db_version_changed = (last_payload_db_version != decoded_context.db_version);
    3112              : 
    3113              :         // Flush pending batch before any boundary change
    3114        49056 :         if (pk_changed || tbl_changed || db_version_changed) {
    3115        18207 :             int flush_rc = merge_flush_pending(data);
    3116        18207 :             if (flush_rc != DBRES_OK) {
    3117            1 :                 rc = flush_rc;
    3118              :                 // continue processing remaining rows
    3119            1 :             }
    3120        18207 :         }
    3121              : 
    3122              :         // Per-db_version savepoints group rows with the same source db_version
    3123              :         // into one transaction. In SQLite autocommit mode, the RELEASE triggers
    3124              :         // the commit hook which bumps data->db_version and resets seq, ensuring
    3125              :         // unique (db_version, seq) tuples across groups. In PostgreSQL SPI,
    3126              :         // database_in_transaction() is always true so this block is inactive —
    3127              :         // the inner per-PK savepoint in merge_flush_pending handles RLS instead.
    3128        49056 :         if (in_savepoint && db_version_changed) {
    3129         4280 :             rc = database_commit_savepoint(data, "cloudsync_payload_apply");
    3130         4280 :             if (rc != DBRES_OK) {
    3131            0 :                 merge_pending_free_entries(&batch);
    3132            0 :                 data->pending_batch = NULL;
    3133            0 :                 cloudsync_set_error(data, "Error on cloudsync_payload_apply: unable to release a savepoint", rc);
    3134            0 :                 goto cleanup;
    3135              :             }
    3136         4280 :             in_savepoint = false;
    3137         4280 :         }
    3138              : 
    3139        49056 :         if (!in_savepoint && db_version_changed && !database_in_transaction(data)) {
    3140         4951 :             rc = database_begin_savepoint(data, "cloudsync_payload_apply");
    3141         4951 :             if (rc != DBRES_OK) {
    3142            0 :                 merge_pending_free_entries(&batch);
    3143            0 :                 data->pending_batch = NULL;
    3144            0 :                 cloudsync_set_error(data, "Error on cloudsync_payload_apply: unable to start a transaction", rc);
    3145            0 :                 goto cleanup;
    3146              :             }
    3147         4951 :             in_savepoint = true;
    3148         4951 :         }
    3149              : 
    3150              :         // Track db_version for batch-flush boundary detection
    3151        49056 :         if (db_version_changed) {
    3152         4951 :             last_payload_db_version = decoded_context.db_version;
    3153         4951 :         }
    3154              : 
    3155              :         // Update PK/table tracking
    3156        49056 :         last_pk = decoded_context.pk;
    3157        49056 :         last_pk_len = decoded_context.pk_len;
    3158        49056 :         last_tbl = decoded_context.tbl;
    3159        49056 :         last_tbl_len = decoded_context.tbl_len;
    3160              : 
    3161        49056 :         rc = databasevm_step(vm);
    3162        49056 :         if (rc != DBRES_DONE) {
    3163              :             // don't "break;", the error can be due to a RLS policy.
    3164              :             // in case of error we try to apply the following changes
    3165            2 :         }
    3166              : 
    3167        49056 :         buffer += seek;
    3168        49056 :         buf_len -= seek;
    3169        49056 :         dbvm_reset(vm);
    3170        49056 :     }
    3171              : 
    3172              :     // Final flush after loop
    3173              :     {
    3174          671 :         int flush_rc = merge_flush_pending(data);
    3175          671 :         if (flush_rc != DBRES_OK && rc == DBRES_OK) rc = flush_rc;
    3176              :     }
    3177          671 :     data->pending_batch = NULL;
    3178              : 
    3179          671 :     if (in_savepoint) {
    3180          671 :         int rc1 = database_commit_savepoint(data, "cloudsync_payload_apply");
    3181          671 :         if (rc1 != DBRES_OK) rc = rc1;
    3182          671 :     }
    3183              : 
    3184              :     // save last error (unused if function returns OK)
    3185          671 :     if (rc != DBRES_OK && rc != DBRES_DONE) {
    3186            1 :         cloudsync_set_dberror(data);
    3187            1 :     }
    3188              : 
    3189          671 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    3190         1341 :     if (rc == DBRES_OK) {
    3191              :         char buf[256];
    3192          670 :         if (decoded_context.db_version >= dbversion) {
    3193          536 :             snprintf(buf, sizeof(buf), "%" PRId64, decoded_context.db_version);
    3194          536 :             dbutils_settings_set_key_value(data, CLOUDSYNC_KEY_CHECK_DBVERSION, buf);
    3195              :             
    3196          536 :             if (decoded_context.seq != seq) {
    3197          332 :                 snprintf(buf, sizeof(buf), "%" PRId64, decoded_context.seq);
    3198          332 :                 dbutils_settings_set_key_value(data, CLOUDSYNC_KEY_CHECK_SEQ, buf);
    3199          332 :             }
    3200          536 :         }
    3201          670 :     }
    3202              : 
    3203              : cleanup:
    3204              :     // cleanup merge_pending_batch
    3205          671 :     if (batch.cached_vm) { databasevm_finalize(batch.cached_vm); batch.cached_vm = NULL; }
    3206          671 :     if (batch.cached_col_names) { cloudsync_memory_free(batch.cached_col_names); batch.cached_col_names = NULL; }
    3207          671 :     if (batch.entries) { cloudsync_memory_free(batch.entries); batch.entries = NULL; }  
    3208              : 
    3209              :     // cleanup vm
    3210          671 :     if (vm) databasevm_finalize(vm);
    3211              : 
    3212              :     // cleanup memory
    3213          671 :     if (clone) cloudsync_memory_free(clone);
    3214              : 
    3215              :     // error already saved in (save last error)
    3216          671 :     if (rc != DBRES_OK) return rc;
    3217              : 
    3218              :     // return the number of processed rows
    3219          670 :     if (pnrows) *pnrows = nrows;
    3220          670 :     return DBRES_OK;
    3221          674 : }
    3222              : 
    3223              : // MARK: - Payload load/store -
    3224              : 
    3225            0 : int cloudsync_payload_get (cloudsync_context *data, char **blob, int *blob_size, int *db_version, int64_t *new_db_version) {
    3226              :     // retrieve current db_version and seq
    3227            0 :     *db_version = dbutils_settings_get_int_value(data, CLOUDSYNC_KEY_SEND_DBVERSION);
    3228            0 :     if (*db_version < 0) return DBRES_ERROR;
    3229              :     
    3230              :     // retrieve BLOB
    3231              :     char sql[1024];
    3232            0 :     snprintf(sql, sizeof(sql), "WITH max_db_version AS (SELECT MAX(db_version) AS max_db_version FROM cloudsync_changes WHERE site_id=cloudsync_siteid()) "
    3233              :                                "SELECT * FROM (SELECT cloudsync_payload_encode(tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq) AS payload, max_db_version AS max_db_version FROM cloudsync_changes, max_db_version WHERE site_id=cloudsync_siteid() AND db_version>%d) WHERE payload IS NOT NULL", *db_version);
    3234              :     
    3235            0 :     int64_t len = 0;
    3236            0 :     int rc = database_select_blob_int(data, sql, blob, &len, new_db_version);
    3237            0 :     *blob_size = (int)len;
    3238            0 :     if (rc != DBRES_OK) return rc;
    3239              :     
    3240              :     // exit if there is no data to send
    3241            0 :     if (*blob == NULL || *blob_size == 0) return DBRES_OK;
    3242            0 :     return rc;
    3243            0 : }
    3244              : 
    3245              : #ifdef CLOUDSYNC_DESKTOP_OS
    3246            0 : int cloudsync_payload_save (cloudsync_context *data, const char *payload_path, int *size) {
    3247              :     DEBUG_FUNCTION("cloudsync_payload_save");
    3248              :     
    3249              :     // silently delete any other payload with the same name
    3250            0 :     cloudsync_file_delete(payload_path);
    3251              :     
    3252              :     // retrieve payload
    3253            0 :     char *blob = NULL;
    3254            0 :     int blob_size = 0, db_version = 0;
    3255            0 :     int64_t new_db_version = 0;
    3256            0 :     int rc = cloudsync_payload_get(data, &blob, &blob_size, &db_version, &new_db_version);
    3257            0 :     if (rc != DBRES_OK) {
    3258            0 :         if (db_version < 0) return cloudsync_set_error(data, "Unable to retrieve db_version", rc);
    3259            0 :         return cloudsync_set_error(data, "Unable to retrieve changes in cloudsync_payload_save", rc);
    3260              :     }
    3261              :     
    3262              :     // exit if there is no data to save
    3263            0 :     if (blob == NULL || blob_size == 0) {
    3264            0 :         if (size) *size = 0;
    3265            0 :         return DBRES_OK;
    3266              :     }
    3267              :     
    3268              :     // write payload to file
    3269            0 :     bool res = cloudsync_file_write(payload_path, blob, (size_t)blob_size);
    3270            0 :     cloudsync_memory_free(blob);
    3271            0 :     if (res == false) {
    3272            0 :         return cloudsync_set_error(data, "Unable to write payload to file path", DBRES_IOERR);
    3273              :     }
    3274              :     
    3275              :     // returns blob size
    3276            0 :     if (size) *size = blob_size;
    3277            0 :     return DBRES_OK;
    3278            0 : }
    3279              : #endif
    3280              : 
    3281              : // MARK: - Core -
    3282              : 
    3283          271 : int cloudsync_table_sanity_check (cloudsync_context *data, const char *name, bool skip_int_pk_check) {
    3284              :     DEBUG_DBFUNCTION("cloudsync_table_sanity_check %s", name);
    3285              :     char buffer[2048];
    3286              :     
    3287              :     // sanity check table name
    3288          271 :     if (name == NULL) {
    3289            1 :         return cloudsync_set_error(data, "cloudsync_init requires a non-null table parameter", DBRES_ERROR);
    3290              :     }
    3291              :     
    3292              :     // avoid allocating heap memory for SQL statements by setting a maximum length of 512 characters
    3293              :     // for table names. This limit is reasonable and helps prevent memory management issues.
    3294          270 :     const size_t maxlen = CLOUDSYNC_MAX_TABLENAME_LEN;
    3295          270 :     if (strlen(name) > maxlen) {
    3296            1 :         snprintf(buffer, sizeof(buffer), "Table name cannot be longer than %d characters", (int)maxlen);
    3297            1 :         return cloudsync_set_error(data, buffer, DBRES_ERROR);
    3298              :     }
    3299              :     
    3300              :     // check if already initialized
    3301          269 :     cloudsync_table_context *table = table_lookup(data, name);
    3302          269 :     if (table) return DBRES_OK;
    3303              :     
    3304              :     // check if table exists
    3305          265 :     if (database_table_exists(data, name, cloudsync_schema(data)) == false) {
    3306            2 :         snprintf(buffer, sizeof(buffer), "Table %s does not exist", name);
    3307            2 :         return cloudsync_set_error(data, buffer, DBRES_ERROR);
    3308              :     }
    3309              :     
    3310              :     // no more than 128 columns can be used as a composite primary key (SQLite hard limit)
    3311          263 :     int npri_keys = database_count_pk(data, name, false, cloudsync_schema(data));
    3312          263 :     if (npri_keys < 0) return cloudsync_set_dberror(data);
    3313          263 :     if (npri_keys > 128) return cloudsync_set_error(data, "No more than 128 columns can be used to form a composite primary key", DBRES_ERROR);
    3314              :     
    3315              :     #if CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
    3316              :     // if count == 0 means that rowid will be used as primary key (BTW: very bad choice for the user)
    3317          263 :     if (npri_keys == 0) {
    3318            1 :         snprintf(buffer, sizeof(buffer), "Rowid only tables are not supported, all primary keys must be explicitly set and declared as NOT NULL (table %s)", name);
    3319            1 :         return cloudsync_set_error(data, buffer, DBRES_ERROR);
    3320              :     }
    3321              :     #endif
    3322              :         
    3323          262 :     if (!skip_int_pk_check) {
    3324          199 :         if (npri_keys == 1) {
    3325              :             // the affinity of a column is determined by the declared type of the column,
    3326              :             // according to the following rules in the order shown:
    3327              :             // 1. If the declared type contains the string "INT" then it is assigned INTEGER affinity.
    3328          115 :             int npri_keys_int = database_count_int_pk(data, name, cloudsync_schema(data));
    3329          115 :             if (npri_keys_int < 0) return cloudsync_set_dberror(data);
    3330          115 :             if (npri_keys == npri_keys_int) {
    3331            1 :                 snprintf(buffer, sizeof(buffer), "Table %s uses a single-column INTEGER primary key. For CRDT replication, primary keys must be globally unique. Consider using a TEXT primary key with UUIDs or ULID to avoid conflicts across nodes. If you understand the risk and still want to use this INTEGER primary key, set the third argument of the cloudsync_init function to 1 to skip this check.", name);
    3332            1 :                 return cloudsync_set_error(data, buffer, DBRES_ERROR);
    3333              :             }
    3334              :             
    3335          114 :         }
    3336          198 :     }
    3337              :         
    3338              :     // if user declared explicit primary key(s) then make sure they are all declared as NOT NULL
    3339              :     #if CLOUDSYNC_CHECK_NOTNULL_PRIKEYS
    3340              :     if (npri_keys > 0) {
    3341              :         int npri_keys_notnull = database_count_pk(data, name, true, cloudsync_schema(data));
    3342              :         if (npri_keys_notnull < 0) return cloudsync_set_dberror(data);
    3343              :         if (npri_keys != npri_keys_notnull) {
    3344              :             snprintf(buffer, sizeof(buffer), "All primary keys must be explicitly declared as NOT NULL (table %s)", name);
    3345              :             return cloudsync_set_error(data, buffer, DBRES_ERROR);
    3346              :         }
    3347              :     }
    3348              :     #endif
    3349              :     
    3350              :     // check for columns declared as NOT NULL without a DEFAULT value.
    3351              :     // Otherwise, col_merge_stmt would fail if changes to other columns are inserted first.
    3352          261 :     int n_notnull_nodefault = database_count_notnull_without_default(data, name, cloudsync_schema(data));
    3353          261 :     if (n_notnull_nodefault < 0) return cloudsync_set_dberror(data);
    3354          261 :     if (n_notnull_nodefault > 0) {
    3355            0 :         snprintf(buffer, sizeof(buffer), "All non-primary key columns declared as NOT NULL must have a DEFAULT value. (table %s)", name);
    3356            0 :         return cloudsync_set_error(data, buffer, DBRES_ERROR);
    3357              :     }
    3358              :     
    3359          261 :     return DBRES_OK;
    3360          271 : }
    3361              : 
    3362            3 : int cloudsync_cleanup_internal (cloudsync_context *data, cloudsync_table_context *table) {
    3363            3 :     if (cloudsync_context_init(data) == NULL) return DBRES_MISUSE;
    3364              :     
    3365              :     // drop meta-table
    3366            3 :     const char *table_name = table->name;
    3367            3 :     char *sql = cloudsync_memory_mprintf(SQL_DROP_CLOUDSYNC_TABLE, table->meta_ref);
    3368            3 :     int rc = database_exec(data, sql);
    3369            3 :     cloudsync_memory_free(sql);
    3370            3 :     if (rc != DBRES_OK) {
    3371              :         char buffer[1024];
    3372            0 :         snprintf(buffer, sizeof(buffer), "Unable to drop cloudsync table %s_cloudsync in cloudsync_cleanup", table_name);
    3373            0 :         return cloudsync_set_error(data, buffer, rc);
    3374              :     }
    3375              :     
    3376              :     // drop original triggers
    3377            3 :     rc = database_delete_triggers(data, table_name);
    3378            3 :     if (rc != DBRES_OK) {
    3379              :         char buffer[1024];
    3380            0 :         snprintf(buffer, sizeof(buffer), "Unable to delete triggers for table %s", table_name);
    3381            0 :         return cloudsync_set_error(data, buffer, rc);
    3382              :     }
    3383              :     
    3384              :     // remove all table related settings
    3385            3 :     dbutils_table_settings_set_key_value(data, table_name, NULL, NULL, NULL);
    3386            3 :     return DBRES_OK;
    3387            3 : }
    3388              : 
    3389            3 : int cloudsync_cleanup (cloudsync_context *data, const char *table_name) {
    3390            3 :     cloudsync_table_context *table = table_lookup(data, table_name);
    3391            3 :     if (!table) return DBRES_OK;
    3392              :     
    3393              :     // TODO: check what happen if cloudsync_cleanup_internal failes (not eveything dropped) and the table is still in memory?
    3394              :     
    3395            3 :     int rc = cloudsync_cleanup_internal(data, table);
    3396            3 :     if (rc != DBRES_OK) return rc;
    3397              :     
    3398            3 :     int counter = table_remove(data, table);
    3399            3 :     table_free(table);
    3400              :     
    3401            3 :     if (counter == 0) {
    3402              :         // cleanup database on last table
    3403            1 :         cloudsync_reset_siteid(data);
    3404            1 :         dbutils_settings_cleanup(data);
    3405            1 :     } else {
    3406            2 :         if (database_internal_table_exists(data, CLOUDSYNC_TABLE_SETTINGS_NAME) == true) {
    3407            2 :             cloudsync_update_schema_hash(data);
    3408            2 :         }
    3409              :     }
    3410              :     
    3411            3 :     return DBRES_OK;
    3412            3 : }
    3413              : 
    3414            0 : int cloudsync_cleanup_all (cloudsync_context *data) {
    3415            0 :     return database_cleanup(data);
    3416              : }
    3417              : 
    3418          436 : int cloudsync_terminate (cloudsync_context *data) {
    3419              :     // can't use for/loop here because data->tables_count is changed by table_remove
    3420          668 :     while (data->tables_count > 0) {
    3421          232 :         cloudsync_table_context *t = data->tables[data->tables_count - 1];
    3422          232 :         table_remove(data, t);
    3423          232 :         table_free(t);
    3424              :     }
    3425              :     
    3426          436 :     if (data->schema_version_stmt) databasevm_finalize(data->schema_version_stmt);
    3427          436 :     if (data->data_version_stmt) databasevm_finalize(data->data_version_stmt);
    3428          436 :     if (data->db_version_stmt) databasevm_finalize(data->db_version_stmt);
    3429          436 :     if (data->getset_siteid_stmt) databasevm_finalize(data->getset_siteid_stmt);
    3430          436 :     if (data->current_schema) cloudsync_memory_free(data->current_schema);
    3431              :     
    3432          436 :     data->schema_version_stmt = NULL;
    3433          436 :     data->data_version_stmt = NULL;
    3434          436 :     data->db_version_stmt = NULL;
    3435          436 :     data->getset_siteid_stmt = NULL;
    3436          436 :     data->current_schema = NULL;
    3437              :     
    3438              :     // reset the site_id so the cloudsync_context_init will be executed again
    3439              :     // if any other cloudsync function is called after terminate
    3440          436 :     data->site_id[0] = 0;
    3441              :     
    3442          436 :     return 1;
    3443              : }
    3444              : 
    3445          266 : int cloudsync_init_table (cloudsync_context *data, const char *table_name, const char *algo_name, bool skip_int_pk_check) {
    3446              :     // sanity check table and its primary key(s)
    3447          266 :     int rc = cloudsync_table_sanity_check(data, table_name, skip_int_pk_check);
    3448          266 :     if (rc != DBRES_OK) return rc;
    3449              :     
    3450              :     // init cloudsync_settings
    3451          264 :     if (cloudsync_context_init(data) == NULL) {
    3452            0 :         return cloudsync_set_error(data, "Unable to initialize cloudsync context", DBRES_MISUSE);
    3453              :     }
    3454              :     
    3455              :     // sanity check algo name (if exists)
    3456          264 :     table_algo algo_new = table_algo_none;
    3457          264 :     if (!algo_name) algo_name = CLOUDSYNC_DEFAULT_ALGO;
    3458              :     
    3459          264 :     algo_new = cloudsync_algo_from_name(algo_name);
    3460          264 :     if (algo_new == table_algo_none) {
    3461              :         char buffer[1024];
    3462            1 :         snprintf(buffer, sizeof(buffer), "Unknown CRDT algorithm name %s", algo_name);
    3463            1 :         return cloudsync_set_error(data, buffer, DBRES_ERROR);
    3464              :     }
    3465              : 
    3466              :     // DWS and AWS algorithms are not yet implemented in the merge logic
    3467          263 :     if (algo_new == table_algo_crdt_dws || algo_new == table_algo_crdt_aws) {
    3468              :         char buffer[1024];
    3469            2 :         snprintf(buffer, sizeof(buffer), "CRDT algorithm %s is not yet supported", algo_name);
    3470            2 :         return cloudsync_set_error(data, buffer, DBRES_ERROR);
    3471              :     }
    3472              :     
    3473              :     // check if table name was already augmented
    3474          261 :     table_algo algo_current = dbutils_table_settings_get_algo(data, table_name);
    3475              :     
    3476              :     // sanity check algorithm
    3477          261 :     if ((algo_new == algo_current) && (algo_current != table_algo_none)) {
    3478              :         // if table algorithms and the same and not none, do nothing
    3479          261 :     } else if ((algo_new == table_algo_none) && (algo_current == table_algo_none)) {
    3480              :         // nothing is written into settings because the default table_algo_crdt_cls will be used
    3481            0 :         algo_new = algo_current = table_algo_crdt_cls;
    3482          233 :     } else if ((algo_new == table_algo_none) && (algo_current != table_algo_none)) {
    3483              :         // algo is already written into settins so just use it
    3484            0 :         algo_new = algo_current;
    3485          233 :     } else if ((algo_new != table_algo_none) && (algo_current == table_algo_none)) {
    3486              :         // write table algo name in settings
    3487          233 :         dbutils_table_settings_set_key_value(data, table_name, "*", "algo", algo_name);
    3488          233 :     } else {
    3489              :         // error condition
    3490            0 :         return cloudsync_set_error(data, "The function cloudsync_cleanup(table) must be called before changing a table algorithm", DBRES_MISUSE);
    3491              :     }
    3492              :     
    3493              :     // Run the following function even if table was already augmented.
    3494              :     // It is safe to call the following function multiple times, if there is nothing to update nothing will be changed.
    3495              :     // After an alter table, in contrast, all the cloudsync triggers, tables and stmts must be recreated.
    3496              :     
    3497              :     // sync algo with table (unused in this version)
    3498              :     // cloudsync_sync_table_key(data, table_name, "*", CLOUDSYNC_KEY_ALGO, crdt_algo_name(algo_new));
    3499              :     
    3500              :     // read row-level filter from settings (if any)
    3501              :     char init_filter_buf[2048];
    3502          261 :     int init_frc = dbutils_table_settings_get_value(data, table_name, "*", "filter", init_filter_buf, sizeof(init_filter_buf));
    3503          261 :     const char *init_filter = (init_frc == DBRES_OK && init_filter_buf[0]) ? init_filter_buf : NULL;
    3504              : 
    3505              :     // check triggers
    3506          261 :     rc = database_create_triggers(data, table_name, algo_new, init_filter);
    3507          261 :     if (rc != DBRES_OK) return cloudsync_set_error(data, "An error occurred while creating triggers", DBRES_MISUSE);
    3508              :     
    3509              :     // check meta-table
    3510          261 :     rc = database_create_metatable(data, table_name);
    3511          261 :     if (rc != DBRES_OK) return cloudsync_set_error(data, "An error occurred while creating metatable", DBRES_MISUSE);
    3512              :     
    3513              :     // add prepared statements
    3514          261 :     if (cloudsync_add_dbvms(data) != DBRES_OK) {
    3515            0 :         return cloudsync_set_error(data, "An error occurred while trying to compile prepared SQL statements", DBRES_MISUSE);
    3516              :     }
    3517              :     
    3518              :     // add table to in-memory data context
    3519          261 :     if (table_add_to_context(data, algo_new, table_name) == false) {
    3520              :         char buffer[1024];
    3521            0 :         snprintf(buffer, sizeof(buffer), "An error occurred while adding %s table information to global context", table_name);
    3522            0 :         return cloudsync_set_error(data, buffer, DBRES_MISUSE);
    3523              :     }
    3524              :     
    3525          261 :     if (cloudsync_refill_metatable(data, table_name) != DBRES_OK) {
    3526            0 :         return cloudsync_set_error(data, "An error occurred while trying to fill the augmented table", DBRES_MISUSE);
    3527              :     }
    3528              :         
    3529          261 :     return DBRES_OK;
    3530          266 : }
        

Generated by: LCOV version 2.4-0