]> arthur.barton.de Git - netatalk.git/blobdiff - etc/afpd/fce_api.c
fce: fix event id increment
[netatalk.git] / etc / afpd / fce_api.c
index b2dead294445229bb3984c6018fd22a62744879d..c2afc82f0dc2f5512b3b8d8604370e6630a4648c 100644 (file)
-/*\r
- * Copyright (c) 2010 Mark Williams\r
- *\r
- * File change event API for netatalk\r
- *\r
- * for every detected filesystem change a UDP packet is sent to an arbitrary list\r
- * of listeners. Each packet contains unix path of modified filesystem element,\r
- * event reason, and a consecutive event id (32 bit). Technically we are UDP client and are sending\r
- * out packets synchronuosly as they are created by the afp functions. This should not affect\r
- * performance measurably. The only delaying calls occur during initialization, if we have to\r
- * resolve non-IP hostnames to IP. All numeric data inside the packet is network byte order, so use\r
- * ntohs / ntohl to resolve length and event id. Ideally a listener receives every packet with\r
- * no gaps in event ids, starting with event id 1 and mode FCE_CONN_START followed by\r
- * data events from id 2 up to 0xFFFFFFFF, followed by 0 to 0xFFFFFFFF and so on.\r
- *\r
- * A gap or not starting with 1 mode FCE_CONN_START or receiving mode FCE_CONN_BROKEN means that\r
- * the listener has lost at least one filesystem event\r
- * \r
- * All Rights Reserved.  See COPYRIGHT.\r
- */\r
-\r
-#ifdef HAVE_CONFIG_H\r
-#include "config.h"\r
-#endif /* HAVE_CONFIG_H */\r
-\r
-#include <stdio.h>\r
-\r
-#include <string.h>\r
-#include <stdlib.h>\r
-#include <errno.h>\r
-#include <time.h>\r
-\r
-\r
-#include <sys/param.h>\r
-#include <sys/socket.h>\r
-#include <netinet/in.h>\r
-#include <arpa/inet.h>\r
-#include <netdb.h>\r
-\r
-#include <netatalk/at.h>\r
-\r
-#include <atalk/adouble.h>\r
-#include <atalk/vfs.h>\r
-#include <atalk/logger.h>\r
-#include <atalk/afp.h>\r
-#include <atalk/util.h>\r
-#include <atalk/cnid.h>\r
-#include <atalk/unix.h>\r
-#include <atalk/fce_api.h>\r
-#include <atalk/globals.h>\r
-\r
-#include "fork.h"\r
-#include "file.h"\r
-#include "directory.h"\r
-#include "desktop.h"\r
-#include "volume.h"\r
-\r
-// ONLY USED IN THIS FILE\r
-#include "fce_api_internal.h"\r
-\r
-#define FCE_TRUE 1\r
-#define FCE_FALSE 0\r
-\r
-/* We store our connection data here */\r
-static struct udp_entry udp_socket_list[FCE_MAX_UDP_SOCKS];\r
-static int udp_sockets = 0;\r
-static int udp_initialized = FCE_FALSE;\r
-static unsigned long fce_ev_enabled =\r
-    (1 << FCE_FILE_MODIFY) |\r
-    (1 << FCE_FILE_DELETE) |\r
-    (1 << FCE_DIR_DELETE) |\r
-    (1 << FCE_FILE_CREATE) |\r
-    (1 << FCE_DIR_CREATE);\r
-\r
-static uint64_t tm_used;          /* used for passing to event handler */\r
-#define MAXIOBUF 1024\r
-static char iobuf[MAXIOBUF];\r
-static const char *skip_files[] = \r
-{\r
-       ".DS_Store",\r
-       NULL\r
-};\r
-\r
-/*\r
- *\r
- * Initialize network structs for any listeners\r
- * We dont give return code because all errors are handled internally (I hope..)\r
- *\r
- * */\r
-void fce_init_udp()\r
-{\r
-    int rv;\r
-    struct addrinfo hints, *servinfo, *p;\r
-\r
-    if (udp_initialized == FCE_TRUE)\r
-        return;\r
-\r
-    memset(&hints, 0, sizeof hints);\r
-    hints.ai_family = AF_UNSPEC;\r
-    hints.ai_socktype = SOCK_DGRAM;\r
-\r
-    for (int i = 0; i < udp_sockets; i++) {\r
-        struct udp_entry *udp_entry = udp_socket_list + i;\r
-\r
-        /* Close any pending sockets */\r
-        if (udp_entry->sock != -1)\r
-            close(udp_entry->sock);\r
-\r
-        if ((rv = getaddrinfo(udp_entry->addr, udp_entry->port, &hints, &servinfo)) != 0) {\r
-            LOG(log_error, logtype_afpd, "fce_init_udp: getaddrinfo(%s:%s): %s",\r
-                udp_entry->addr, udp_entry->port, gai_strerror(rv));\r
-            continue;\r
-        }\r
-\r
-        /* loop through all the results and make a socket */\r
-        for (p = servinfo; p != NULL; p = p->ai_next) {\r
-            if ((udp_entry->sock = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1) {\r
-                LOG(log_error, logtype_afpd, "fce_init_udp: socket(%s:%s): %s",\r
-                    udp_entry->addr, udp_entry->port, strerror(errno));\r
-                continue;\r
-            }\r
-            break;\r
-        }\r
-\r
-        if (p == NULL) {\r
-            LOG(log_error, logtype_afpd, "fce_init_udp: no socket for %s:%s",\r
-                udp_entry->addr, udp_entry->port);\r
-        }\r
-        udp_entry->addrinfo = *p;\r
-        memcpy(&udp_entry->addrinfo, p, sizeof(struct addrinfo));\r
-        memcpy(&udp_entry->sockaddr, p->ai_addr, sizeof(struct sockaddr_storage));\r
-        freeaddrinfo(servinfo);\r
-    }\r
-\r
-    udp_initialized = FCE_TRUE;\r
-}\r
-\r
-void fce_cleanup()\r
-{\r
-    if (udp_initialized == FCE_FALSE )\r
-        return;\r
-\r
-    for (int i = 0; i < udp_sockets; i++)\r
-    {\r
-        struct udp_entry *udp_entry = udp_socket_list + i;\r
-\r
-        /* Close any pending sockets */\r
-        if (udp_entry->sock != -1)\r
-        {\r
-            close( udp_entry->sock );\r
-            udp_entry->sock = -1;\r
-        }\r
-    }\r
-    udp_initialized = FCE_FALSE;\r
-}\r
-\r
-\r
-/*\r
- * Construct a UDP packet for our listeners and return packet size\r
- * */\r
-static ssize_t build_fce_packet( struct fce_packet *packet, char *path, int mode, uint32_t event_id )\r
-{\r
-    size_t pathlen;\r
-    ssize_t data_len = 0;\r
-\r
-    strncpy(packet->magic, FCE_PACKET_MAGIC, sizeof(packet->magic) );\r
-    packet->version = FCE_PACKET_VERSION;\r
-    packet->mode = mode;\r
-    packet->event_id = event_id;\r
-\r
-    pathlen = strlen(path) + 1; /* include string terminator */\r
-\r
-    /* This should never happen, but before we bust this server, we send nonsense, fce listener has to cope */\r
-    if (pathlen >= MAXPATHLEN)\r
-        pathlen = MAXPATHLEN - 1;\r
-\r
-    /* This is the payload len. Means: the stream has len bytes more until packet is finished */\r
-    /* A server should read the first 16 byte, decode them and then fetch the rest */\r
-    data_len = FCE_PACKET_HEADER_SIZE + pathlen;\r
-    packet->datalen = pathlen;\r
-\r
-    switch (mode) {\r
-    case FCE_TM_SIZE:\r
-        tm_used = hton64(tm_used);\r
-        memcpy(packet->data, &tm_used, sizeof(tm_used));\r
-        strncpy(packet->data + sizeof(tm_used), path, pathlen);\r
-\r
-        packet->datalen += sizeof(tm_used);\r
-        data_len += sizeof(tm_used);\r
-        break;\r
-    default:\r
-        strncpy(packet->data, path, pathlen);\r
-        break;\r
-    }\r
-\r
-    /* return the packet len */\r
-    return data_len;\r
-}\r
-\r
-static int pack_fce_packet(struct fce_packet *packet, unsigned char *buf)\r
-{\r
-    unsigned char *p = buf;\r
-\r
-    memcpy(p, &packet->magic[0], sizeof(packet->magic));\r
-    p += sizeof(packet->magic);\r
-\r
-    *p = packet->version;\r
-    p++;\r
-    \r
-    *p = packet->mode;\r
-    p++;\r
-    \r
-    uint32_t id = htonl(packet->event_id);\r
-    memcpy(p, &id, sizeof(id));\r
-    p += sizeof(packet->event_id);\r
-\r
-    uint16_t l = htons(packet->datalen);\r
-    memcpy(p, &l, sizeof(l));\r
-    p += sizeof(l);\r
-\r
-    memcpy(p, &packet->data[0], packet->datalen);\r
-    p += packet->datalen;\r
-\r
-    return 0;\r
-}\r
-\r
-/*\r
- * Send the fce information to all (connected) listeners\r
- * We dont give return code because all errors are handled internally (I hope..)\r
- * */\r
-static void send_fce_event( char *path, int mode )\r
-{    \r
-    struct fce_packet packet;\r
-    void *data = &packet;\r
-    static uint32_t event_id = 0; /* the unique packet couter to detect packet/data loss. Going from 0xFFFFFFFF to 0x0 is a valid increment */\r
-\r
-    LOG(log_debug, logtype_afpd, "send_fce_event: start");\r
-\r
-    time_t now = time(NULL);\r
-\r
-    /* build our data packet */\r
-    ssize_t data_len = build_fce_packet( &packet, path, mode, ++event_id );\r
-    pack_fce_packet(&packet, iobuf);\r
-\r
-    for (int i = 0; i < udp_sockets; i++)\r
-    {\r
-        int sent_data = 0;\r
-        struct udp_entry *udp_entry = udp_socket_list + i;\r
-\r
-        /* we had a problem earlier ? */\r
-        if (udp_entry->sock == -1)\r
-        {\r
-            /* We still have to wait ?*/\r
-            if (now < udp_entry->next_try_on_error)\r
-                continue;\r
-\r
-            /* Reopen socket */\r
-            udp_entry->sock = socket(udp_entry->addrinfo.ai_family,\r
-                                     udp_entry->addrinfo.ai_socktype,\r
-                                     udp_entry->addrinfo.ai_protocol);\r
-            \r
-            if (udp_entry->sock == -1) {\r
-                /* failed again, so go to rest again */\r
-                LOG(log_error, logtype_afpd, "Cannot recreate socket for fce UDP connection: errno %d", errno  );\r
-\r
-                udp_entry->next_try_on_error = now + FCE_SOCKET_RETRY_DELAY_S;\r
-                continue;\r
-            }\r
-\r
-            udp_entry->next_try_on_error = 0;\r
-\r
-            /* Okay, we have a running socket again, send server that we had a problem on our side*/\r
-            data_len = build_fce_packet( &packet, "", FCE_CONN_BROKEN, 0 );\r
-            pack_fce_packet(&packet, iobuf);\r
-\r
-            sendto(udp_entry->sock,\r
-                   iobuf,\r
-                   data_len,\r
-                   0,\r
-                   (struct sockaddr *)&udp_entry->sockaddr,\r
-                   udp_entry->addrinfo.ai_addrlen);\r
-\r
-            /* Rebuild our original data packet */\r
-            data_len = build_fce_packet( &packet, path, mode, event_id );\r
-            pack_fce_packet(&packet, iobuf);\r
-        }\r
-\r
-        sent_data = sendto(udp_entry->sock,\r
-                           iobuf,\r
-                           data_len,\r
-                           0,\r
-                           (struct sockaddr *)&udp_entry->sockaddr,\r
-                           udp_entry->addrinfo.ai_addrlen);\r
-\r
-        /* Problems ? */\r
-        if (sent_data != data_len) {\r
-            /* Argh, socket broke, we close and retry later */\r
-            LOG(log_error, logtype_afpd, "send_fce_event: error sending packet to %s:%s, transfered %d of %d: %s",\r
-                udp_entry->addr, udp_entry->port, sent_data, data_len, strerror(errno));\r
-\r
-            close( udp_entry->sock );\r
-            udp_entry->sock = -1;\r
-            udp_entry->next_try_on_error = now + FCE_SOCKET_RETRY_DELAY_S;\r
-        }\r
-    }\r
-}\r
-\r
-static int add_udp_socket(const char *target_ip, const char *target_port )\r
-{\r
-    if (target_port == NULL)\r
-        target_port = FCE_DEFAULT_PORT_STRING;\r
-\r
-    if (udp_sockets >= FCE_MAX_UDP_SOCKS) {\r
-        LOG(log_error, logtype_afpd, "Too many file change api UDP connections (max %d allowed)", FCE_MAX_UDP_SOCKS );\r
-        return AFPERR_PARAM;\r
-    }\r
-\r
-    udp_socket_list[udp_sockets].addr = strdup(target_ip);\r
-    udp_socket_list[udp_sockets].port = strdup(target_port);\r
-    udp_socket_list[udp_sockets].sock = -1;\r
-    memset(&udp_socket_list[udp_sockets].addrinfo, 0, sizeof(struct addrinfo));\r
-    memset(&udp_socket_list[udp_sockets].sockaddr, 0, sizeof(struct sockaddr_storage));\r
-    udp_socket_list[udp_sockets].next_try_on_error = 0;\r
-\r
-    udp_sockets++;\r
-\r
-    return AFP_OK;\r
-}\r
-\r
-/*\r
- *\r
- * Dispatcher for all incoming file change events\r
- *\r
- * */\r
-static int register_fce(const char *u_name, int is_dir, int mode)\r
-{\r
-    if (udp_sockets == 0)\r
-        /* No listeners configured */\r
-        return AFP_OK;\r
-\r
-    if (u_name == NULL)\r
-        return AFPERR_PARAM;\r
-\r
-    static int first_event = FCE_TRUE;\r
-\r
-       /* do some initialization on the fly the first time */\r
-       if (first_event) {\r
-               fce_initialize_history();\r
-       }\r
-\r
-       /* handle files which should not cause events (.DS_Store atc. ) */\r
-       for (int i = 0; skip_files[i] != NULL; i++)\r
-       {\r
-               if (!strcmp( u_name, skip_files[i]))\r
-                       return AFP_OK;\r
-       }\r
-\r
-\r
-       char full_path_buffer[MAXPATHLEN + 1] = {""};\r
-       const char *cwd = getcwdpath();\r
-\r
-    if (mode == FCE_TM_SIZE) {\r
-        strlcpy(full_path_buffer, u_name, MAXPATHLEN);\r
-    } else if (!is_dir || mode == FCE_DIR_DELETE) {\r
-               if (strlen( cwd ) + strlen( u_name) + 1 >= MAXPATHLEN) {\r
-                       LOG(log_error, logtype_afpd, "FCE file name too long: %s/%s", cwd, u_name );\r
-                       return AFPERR_PARAM;\r
-               }\r
-               sprintf( full_path_buffer, "%s/%s", cwd, u_name );\r
-       } else {\r
-               if (strlen( cwd ) >= MAXPATHLEN) {\r
-                       LOG(log_error, logtype_afpd, "FCE directory name too long: %s", cwd);\r
-                       return AFPERR_PARAM;\r
-               }\r
-               strcpy( full_path_buffer, cwd);\r
-       }\r
-\r
-       /* Can we ignore this event based on type or history? */\r
-       if (!(mode & FCE_TM_SIZE) && fce_handle_coalescation( full_path_buffer, is_dir, mode ))\r
-       {\r
-               LOG(log_debug9, logtype_afpd, "Coalesced fc event <%d> for <%s>", mode, full_path_buffer );\r
-               return AFP_OK;\r
-       }\r
-\r
-       LOG(log_debug9, logtype_afpd, "Detected fc event <%d> for <%s>", mode, full_path_buffer );\r
-\r
-\r
-    /* we do initilization on the fly, no blocking calls in here \r
-     * (except when using FQDN in broken DNS environment)\r
-     */\r
-    if (first_event == FCE_TRUE)\r
-    {\r
-        fce_init_udp();\r
-        \r
-        /* Notify listeners the we start from the beginning */\r
-        send_fce_event( "", FCE_CONN_START );\r
-        \r
-        first_event = FCE_FALSE;\r
-    }\r
-\r
-       /* Handle UDP transport */\r
-    send_fce_event( full_path_buffer, mode );\r
-\r
-    return AFP_OK;\r
-}\r
-\r
-\r
-/******************** External calls start here **************************/\r
-\r
-/*\r
- * API-Calls for file change api, called form outside (file.c directory.c ofork.c filedir.c)\r
- * */\r
-#ifndef FCE_TEST_MAIN\r
-\r
-int fce_register_delete_file( struct path *path )\r
-{\r
-    int ret = AFP_OK;\r
-\r
-    if (path == NULL)\r
-        return AFPERR_PARAM;\r
-\r
-    if (!(fce_ev_enabled & (1 << FCE_FILE_DELETE)))\r
-        return ret;\r
-       \r
-    ret = register_fce( path->u_name, FALSE, FCE_FILE_DELETE );\r
-\r
-    return ret;\r
-}\r
-int fce_register_delete_dir( char *name )\r
-{\r
-    int ret = AFP_OK;\r
-\r
-    if (name == NULL)\r
-        return AFPERR_PARAM;\r
-\r
-    if (!(fce_ev_enabled & (1 << FCE_DIR_DELETE)))\r
-        return ret;\r
-       \r
-    ret = register_fce( name, TRUE, FCE_DIR_DELETE);\r
-\r
-    return ret;\r
-}\r
-\r
-int fce_register_new_dir( struct path *path )\r
-{\r
-    int ret = AFP_OK;\r
-\r
-    if (path == NULL)\r
-        return AFPERR_PARAM;\r
-\r
-    if (!(fce_ev_enabled & (1 << FCE_DIR_CREATE)))\r
-        return ret;\r
-\r
-    ret = register_fce( path->u_name, TRUE, FCE_DIR_CREATE );\r
-\r
-    return ret;\r
-}\r
-\r
-\r
-int fce_register_new_file( struct path *path )\r
-{\r
-    int ret = AFP_OK;\r
-\r
-    if (path == NULL)\r
-        return AFPERR_PARAM;\r
-\r
-    if (!(fce_ev_enabled & (1 << FCE_FILE_CREATE)))\r
-        return ret;\r
-\r
-    ret = register_fce( path->u_name, FALSE, FCE_FILE_CREATE );\r
-\r
-    return ret;\r
-}\r
-\r
-int fce_register_file_modification( struct ofork *ofork )\r
-{\r
-    char *u_name = NULL;\r
-    struct vol *vol;\r
-    int ret = AFP_OK;\r
-\r
-    if (ofork == NULL || ofork->of_vol == NULL)\r
-        return AFPERR_PARAM;\r
-\r
-    if (!(fce_ev_enabled & (1 << FCE_FILE_MODIFY)))\r
-        return ret;\r
-\r
-    vol = ofork->of_vol;\r
-\r
-    if (NULL == (u_name = mtoupath(vol, of_name(ofork), ofork->of_did, utf8_encoding()))) \r
-    {\r
-        return AFPERR_MISC;\r
-    }\r
-    \r
-    ret = register_fce( u_name, FALSE, FCE_FILE_MODIFY );\r
-    \r
-    return ret;    \r
-}\r
-\r
-int fce_register_tm_size(const char *vol, size_t used)\r
-{\r
-    int ret = AFP_OK;\r
-\r
-    if (vol == NULL)\r
-        return AFPERR_PARAM;\r
-\r
-    if (!(fce_ev_enabled & (1 << FCE_TM_SIZE)))\r
-        return ret;\r
-\r
-    tm_used = used;             /* oh what a hack */\r
-    ret = register_fce(vol, FALSE, FCE_TM_SIZE);\r
-\r
-    return ret;\r
-}\r
-#endif\r
-\r
-/*\r
- *\r
- * Extern connect to afpd parameter, can be called multiple times for multiple listeners (up to MAX_UDP_SOCKS times)\r
- *\r
- * */\r
-int fce_add_udp_socket(const char *target)\r
-{\r
-       const char *port = FCE_DEFAULT_PORT_STRING;\r
-       char target_ip[256] = {""};\r
-\r
-       strncpy(target_ip, target, sizeof(target_ip) -1);\r
-\r
-       char *port_delim = strchr( target_ip, ':' );\r
-       if (port_delim) {\r
-               *port_delim = 0;\r
-               port = port_delim + 1;\r
-       }\r
-       return add_udp_socket(target_ip, port);\r
-}\r
-\r
-int fce_set_events(const char *events)\r
-{\r
-    char *e;\r
-    char *p;\r
-    \r
-    if (events == NULL)\r
-        return AFPERR_PARAM;\r
-\r
-    e = strdup(events);\r
-\r
-    fce_ev_enabled = 0;\r
-\r
-    for (p = strtok(e, ","); p; p = strtok(NULL, ",")) {\r
-        if (strcmp(p, "fmod") == 0) {\r
-            fce_ev_enabled |= (1 << FCE_FILE_MODIFY);\r
-        } else if (strcmp(p, "fdel") == 0) {\r
-            fce_ev_enabled |= (1 << FCE_FILE_DELETE);\r
-        } else if (strcmp(p, "ddel") == 0) {\r
-            fce_ev_enabled |= (1 << FCE_DIR_DELETE);\r
-        } else if (strcmp(p, "fcre") == 0) {\r
-            fce_ev_enabled |= (1 << FCE_FILE_CREATE);\r
-        } else if (strcmp(p, "dcre") == 0) {\r
-            fce_ev_enabled |= (1 << FCE_DIR_CREATE);\r
-        } else if (strcmp(p, "tmsz") == 0) {\r
-            fce_ev_enabled |= (1 << FCE_TM_SIZE);\r
-        }\r
-    }\r
-\r
-    free(e);\r
-}\r
-\r
-#ifdef FCE_TEST_MAIN\r
-\r
-\r
-void shortsleep( unsigned int us )\r
-{    \r
-    usleep( us );\r
-}\r
-int main( int argc, char*argv[] )\r
-{\r
-    int c,ret;\r
-\r
-    char *port = FCE_DEFAULT_PORT_STRING;\r
-    char *host = "localhost";\r
-    int delay_between_events = 1000;\r
-    int event_code = FCE_FILE_MODIFY;\r
-    char pathbuff[1024];\r
-    int duration_in_seconds = 0; // TILL ETERNITY\r
-    char target[256];\r
-    char *path = getcwd( pathbuff, sizeof(pathbuff) );\r
-\r
-    // FULLSPEED TEST IS "-s 1001" -> delay is 0 -> send packets without pause\r
-\r
-    while ((c = getopt(argc, argv, "d:e:h:p:P:s:")) != -1) {\r
-        switch(c) {\r
-        case '?':\r
-            fprintf(stdout, "%s: [ -p Port -h Listener1 [ -h Listener2 ...] -P path -s Delay_between_events_in_us -e event_code -d Duration ]\n", argv[0]);\r
-            exit(1);\r
-            break;\r
-        case 'd':\r
-            duration_in_seconds = atoi(optarg);\r
-            break;\r
-        case 'e':\r
-            event_code = atoi(optarg);\r
-            break;\r
-        case 'h':\r
-            host = strdup(optarg);\r
-            break;\r
-        case 'p':\r
-            port = strdup(optarg);\r
-            break;\r
-        case 'P':\r
-            path = strdup(optarg);\r
-            break;\r
-        case 's':\r
-            delay_between_events = atoi(optarg);\r
-            break;\r
-        }\r
-    }\r
-\r
-    sprintf(target, "%s:%s", host, port);\r
-    if (fce_add_udp_socket(target) != 0)\r
-        return 1;\r
-\r
-    int ev_cnt = 0;\r
-    time_t start_time = time(NULL);\r
-    time_t end_time = 0;\r
-\r
-    if (duration_in_seconds)\r
-        end_time = start_time + duration_in_seconds;\r
-\r
-    while (1)\r
-    {\r
-        time_t now = time(NULL);\r
-        if (now > start_time)\r
-        {\r
-            start_time = now;\r
-            fprintf( stdout, "%d events/s\n", ev_cnt );\r
-            ev_cnt = 0;\r
-        }\r
-        if (end_time && now >= end_time)\r
-            break;\r
-\r
-        register_fce( path, 0, event_code );\r
-        ev_cnt++;\r
-\r
-        \r
-        shortsleep( delay_between_events );\r
-    }\r
-}\r
-#endif /* TESTMAIN*/\r
+/*
+ * Copyright (c) 2010 Mark Williams
+ * Copyright (c) 2012 Frank Lahm <franklahm@gmail.com>
+ *
+ * File change event API for netatalk
+ *
+ * for every detected filesystem change a UDP packet is sent to an arbitrary list
+ * of listeners. Each packet contains unix path of modified filesystem element,
+ * event reason, and a consecutive event id (32 bit). Technically we are UDP client and are sending
+ * out packets synchronuosly as they are created by the afp functions. This should not affect
+ * performance measurably. The only delaying calls occur during initialization, if we have to
+ * resolve non-IP hostnames to IP. All numeric data inside the packet is network byte order, so use
+ * ntohs / ntohl to resolve length and event id. Ideally a listener receives every packet with
+ * no gaps in event ids, starting with event id 1 and mode FCE_CONN_START followed by
+ * data events from id 2 up to 0xFFFFFFFF, followed by 0 to 0xFFFFFFFF and so on.
+ *
+ * A gap or not starting with 1 mode FCE_CONN_START or receiving mode FCE_CONN_BROKEN means that
+ * the listener has lost at least one filesystem event
+ * 
+ * All Rights Reserved.  See COPYRIGHT.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif /* HAVE_CONFIG_H */
+
+#include <stdio.h>
+
+#include <string.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <time.h>
+#include <sys/param.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <stdbool.h>
+
+#include <atalk/adouble.h>
+#include <atalk/vfs.h>
+#include <atalk/logger.h>
+#include <atalk/afp.h>
+#include <atalk/util.h>
+#include <atalk/cnid.h>
+#include <atalk/unix.h>
+#include <atalk/fce_api.h>
+#include <atalk/globals.h>
+
+#include "fork.h"
+#include "file.h"
+#include "directory.h"
+#include "desktop.h"
+#include "volume.h"
+
+// ONLY USED IN THIS FILE
+#include "fce_api_internal.h"
+
+extern int afprun_bg(int root, char *cmd);
+
+/* We store our connection data here */
+static struct udp_entry udp_socket_list[FCE_MAX_UDP_SOCKS];
+static int udp_sockets = 0;
+static bool udp_initialized = false;
+static unsigned long fce_ev_enabled =
+    (1 << FCE_FILE_MODIFY) |
+    (1 << FCE_FILE_DELETE) |
+    (1 << FCE_DIR_DELETE) |
+    (1 << FCE_FILE_CREATE) |
+    (1 << FCE_DIR_CREATE) |
+    (1 << FCE_FILE_MOVE) |
+    (1 << FCE_DIR_MOVE) |
+    (1 << FCE_LOGIN) |
+    (1 << FCE_LOGOUT);
+
+static uint8_t fce_ev_info;    /* flags of additional info to send in events */
+
+#define MAXIOBUF 4096
+static unsigned char iobuf[MAXIOBUF];
+static const char **skip_files;
+static struct fce_close_event last_close_event;
+
+static char *fce_event_names[] = {
+    "",
+    "FCE_FILE_MODIFY",
+    "FCE_FILE_DELETE",
+    "FCE_DIR_DELETE",
+    "FCE_FILE_CREATE",
+    "FCE_DIR_CREATE",
+    "FCE_FILE_MOVE",
+    "FCE_DIR_MOVE",
+    "FCE_LOGIN",
+    "FCE_LOGOUT"
+};
+
+/*
+ *
+ * Initialize network structs for any listeners
+ * We dont give return code because all errors are handled internally (I hope..)
+ *
+ * */
+void fce_init_udp()
+{
+    int rv;
+    struct addrinfo hints, *servinfo, *p;
+
+    if (udp_initialized == true)
+        return;
+
+    memset(&hints, 0, sizeof hints);
+    hints.ai_family = AF_UNSPEC;
+    hints.ai_socktype = SOCK_DGRAM;
+
+    for (int i = 0; i < udp_sockets; i++) {
+        struct udp_entry *udp_entry = udp_socket_list + i;
+
+        /* Close any pending sockets */
+        if (udp_entry->sock != -1)
+            close(udp_entry->sock);
+
+        if ((rv = getaddrinfo(udp_entry->addr, udp_entry->port, &hints, &servinfo)) != 0) {
+            LOG(log_error, logtype_fce, "fce_init_udp: getaddrinfo(%s:%s): %s",
+                udp_entry->addr, udp_entry->port, gai_strerror(rv));
+            continue;
+        }
+
+        /* loop through all the results and make a socket */
+        for (p = servinfo; p != NULL; p = p->ai_next) {
+            if ((udp_entry->sock = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1) {
+                LOG(log_error, logtype_fce, "fce_init_udp: socket(%s:%s): %s",
+                    udp_entry->addr, udp_entry->port, strerror(errno));
+                continue;
+            }
+            break;
+        }
+
+        if (p == NULL) {
+            LOG(log_error, logtype_fce, "fce_init_udp: no socket for %s:%s",
+                udp_entry->addr, udp_entry->port);
+        }
+        udp_entry->addrinfo = *p;
+        memcpy(&udp_entry->addrinfo, p, sizeof(struct addrinfo));
+        memcpy(&udp_entry->sockaddr, p->ai_addr, sizeof(struct sockaddr_storage));
+        freeaddrinfo(servinfo);
+    }
+
+    udp_initialized = true;
+}
+
+void fce_cleanup()
+{
+    if (udp_initialized == false )
+        return;
+
+    for (int i = 0; i < udp_sockets; i++)
+    {
+        struct udp_entry *udp_entry = udp_socket_list + i;
+
+        /* Close any pending sockets */
+        if (udp_entry->sock != -1)
+        {
+            close( udp_entry->sock );
+            udp_entry->sock = -1;
+        }
+    }
+    udp_initialized = false;
+}
+
+/*
+ * Construct a UDP packet for our listeners and return packet size
+ * */
+static ssize_t build_fce_packet(const AFPObj *obj,
+                                char *iobuf,
+                                fce_ev_t event,
+                                const char *path,
+                                const char *oldpath,
+                                pid_t pid,
+                                const char *user,
+                                uint32_t event_id)
+{
+    char *p = iobuf;
+    size_t pathlen;
+    ssize_t datalen = 0;
+    uint16_t uint16;
+    uint32_t uint32;
+    uint64_t uint64;
+    uint8_t packet_info = fce_ev_info;
+
+    /* FCE magic */
+    memcpy(p, FCE_PACKET_MAGIC, 8);
+    p += 8;
+    datalen += 8;
+
+    /* version */
+    *p = FCE_PACKET_VERSION;
+    p += 1;
+    datalen += 1;
+
+    /* optional: options */
+    if (FCE_PACKET_VERSION > 1) {
+        if (oldpath)
+            packet_info |= FCE_EV_INFO_SRCPATH;
+        *p = packet_info;
+        p += 1;
+        datalen += 1;
+    }
+
+    /* event */
+    *p = event;
+    p += 1;
+    datalen += 1;
+
+    /* optional: padding */
+    if (FCE_PACKET_VERSION > 1) {
+        p += 1;
+        datalen += 1;
+    }
+
+    /* optional: reserved */
+    if (FCE_PACKET_VERSION > 1) {
+        p += 8;
+        datalen += 8;
+    }
+
+    /* event ID */
+    uint32 = htonl(event_id);
+    memcpy(p, &uint32, sizeof(uint32));
+    p += sizeof(uint32);
+    datalen += sizeof(uint32);
+
+    /* optional: pid */
+    if (packet_info & FCE_EV_INFO_PID) {
+        uint64 = pid;
+        uint64 = hton64(uint64);
+        memcpy(p, &uint64, sizeof(uint64));
+        p += sizeof(uint64);
+        datalen += sizeof(uint64);
+    }
+
+    /* optional: username */
+    if (packet_info & FCE_EV_INFO_USER) {
+        uint16 = strlen(user);
+        uint16 = htons(uint16);
+        memcpy(p, &uint16, sizeof(uint16));
+        p += sizeof(uint16);
+        datalen += sizeof(uint16);
+        memcpy(p, user, strlen(user));
+        p += strlen(user);
+        datalen += strlen(user);
+    }
+
+    /* path */
+    if ((pathlen = strlen(path)) >= MAXPATHLEN)
+        pathlen = MAXPATHLEN - 1;
+    uint16 = pathlen;
+    uint16 = htons(uint16);
+    memcpy(p, &uint16, sizeof(uint16));
+    p += sizeof(uint16);
+    datalen += sizeof(uint16);
+    memcpy(p, path, pathlen);
+    p += pathlen;
+    datalen += pathlen;
+
+    /* optional: source path */
+    if (packet_info & FCE_EV_INFO_SRCPATH) {
+        if ((pathlen = strlen(oldpath)) >= MAXPATHLEN)
+            pathlen = MAXPATHLEN - 1;
+        uint16 = pathlen;
+        uint16 = htons(uint16);
+        memcpy(p, &uint16, sizeof(uint16));
+        p += sizeof(uint16);
+        datalen += sizeof(uint16);
+        memcpy(p, oldpath, pathlen);
+        p += pathlen;
+        datalen += pathlen;
+    }
+
+    /* return the packet len */
+    return datalen;
+}
+
+/*
+ * Send the fce information to all (connected) listeners
+ * We dont give return code because all errors are handled internally (I hope..)
+ * */
+static void send_fce_event(const AFPObj *obj, int event, const char *path, const char *oldpath)
+{    
+    static bool first_event = true;
+    static uint32_t event_id = 0; /* the unique packet couter to detect packet/data loss. Going from 0xFFFFFFFF to 0x0 is a valid increment */
+    static char *user;
+    time_t now = time(NULL);
+    ssize_t data_len;
+
+    /* initialized ? */
+    if (first_event == true) {
+        first_event = false;
+
+        struct passwd *pwd = getpwuid(obj->uid);
+        user = strdup(pwd->pw_name);
+
+        switch (obj->fce_version) {
+        case 1:
+            /* fce_ev_info unused */
+            break;
+        case 2:
+            fce_ev_info = FCE_EV_INFO_PID | FCE_EV_INFO_USER;
+            break;
+        default:
+            fce_ev_info = 0;
+            LOG(log_error, logtype_fce, "Unsupported FCE protocol version %d", obj->fce_version);
+            break;
+        }
+
+        fce_init_udp();
+        /* Notify listeners the we start from the beginning */
+        send_fce_event(obj, FCE_CONN_START, "", NULL);
+    }
+
+    /* run script */
+    if (obj->fce_notify_script) {
+        static bstring quote = NULL;
+        static bstring quoterep = NULL;
+        static bstring slash = NULL;
+        static bstring slashrep = NULL;
+
+        if (!quote) {
+            quote = bfromcstr("'");
+            quoterep = bfromcstr("'\\''");
+            slash = bfromcstr("\\");
+            slashrep = bfromcstr("\\\\");
+        }
+
+        bstring cmd = bformat("%s -v %d -e %s -i %" PRIu32 "",
+                              obj->fce_notify_script,
+                              FCE_PACKET_VERSION,
+                              fce_event_names[event],
+                              event_id);
+
+        if (path[0]) {
+            bstring bpath = bfromcstr(path);
+            bfindreplace(bpath, slash, slashrep, 0);
+            bfindreplace(bpath, quote, quoterep, 0);
+            bformata(cmd, " -P '%s'", bdata(bpath));
+            bdestroy(bpath);
+        }
+        if (fce_ev_info | FCE_EV_INFO_PID)
+            bformata(cmd, " -p %" PRIu64 "", (uint64_t)getpid());
+        if (fce_ev_info | FCE_EV_INFO_USER)
+            bformata(cmd, " -u %s", user);
+        if (oldpath) {
+            bstring boldpath = bfromcstr(oldpath);
+            bfindreplace(boldpath, slash, slashrep, 0);
+            bfindreplace(boldpath, quote, quoterep, 0);
+            bformata(cmd, " -S '%s'", bdata(boldpath));
+            bdestroy(boldpath);
+        }
+        (void)afprun_bg(1, bdata(cmd));
+        bdestroy(cmd);
+    }
+
+    for (int i = 0; i < udp_sockets; i++) {
+        int sent_data = 0;
+        struct udp_entry *udp_entry = udp_socket_list + i;
+
+        /* we had a problem earlier ? */
+        if (udp_entry->sock == -1) {
+            /* We still have to wait ?*/
+            if (now < udp_entry->next_try_on_error)
+                continue;
+
+            /* Reopen socket */
+            udp_entry->sock = socket(udp_entry->addrinfo.ai_family,
+                                     udp_entry->addrinfo.ai_socktype,
+                                     udp_entry->addrinfo.ai_protocol);
+            
+            if (udp_entry->sock == -1) {
+                /* failed again, so go to rest again */
+                LOG(log_error, logtype_fce, "Cannot recreate socket for fce UDP connection: errno %d", errno  );
+
+                udp_entry->next_try_on_error = now + FCE_SOCKET_RETRY_DELAY_S;
+                continue;
+            }
+
+            udp_entry->next_try_on_error = 0;
+
+            /* Okay, we have a running socket again, send server that we had a problem on our side*/
+            data_len = build_fce_packet(obj, iobuf, FCE_CONN_BROKEN, "", NULL, getpid(), user, 0);
+
+            sendto(udp_entry->sock,
+                   iobuf,
+                   data_len,
+                   0,
+                   (struct sockaddr *)&udp_entry->sockaddr,
+                   udp_entry->addrinfo.ai_addrlen);
+        }
+
+        /* build our data packet */
+        data_len = build_fce_packet(obj, iobuf, event, path, oldpath, getpid(), user, event_id);
+
+        sent_data = sendto(udp_entry->sock,
+                           iobuf,
+                           data_len,
+                           0,
+                           (struct sockaddr *)&udp_entry->sockaddr,
+                           udp_entry->addrinfo.ai_addrlen);
+
+        /* Problems ? */
+        if (sent_data != data_len) {
+            /* Argh, socket broke, we close and retry later */
+            LOG(log_error, logtype_fce, "send_fce_event: error sending packet to %s:%s, transfered %d of %d: %s",
+                udp_entry->addr, udp_entry->port, sent_data, data_len, strerror(errno));
+
+            close( udp_entry->sock );
+            udp_entry->sock = -1;
+            udp_entry->next_try_on_error = now + FCE_SOCKET_RETRY_DELAY_S;
+        }
+    }
+
+    event_id++;
+}
+
+static int add_udp_socket(const char *target_ip, const char *target_port )
+{
+    if (target_port == NULL)
+        target_port = FCE_DEFAULT_PORT_STRING;
+
+    if (udp_sockets >= FCE_MAX_UDP_SOCKS) {
+        LOG(log_error, logtype_fce, "Too many file change api UDP connections (max %d allowed)", FCE_MAX_UDP_SOCKS );
+        return AFPERR_PARAM;
+    }
+
+    udp_socket_list[udp_sockets].addr = strdup(target_ip);
+    udp_socket_list[udp_sockets].port = strdup(target_port);
+    udp_socket_list[udp_sockets].sock = -1;
+    memset(&udp_socket_list[udp_sockets].addrinfo, 0, sizeof(struct addrinfo));
+    memset(&udp_socket_list[udp_sockets].sockaddr, 0, sizeof(struct sockaddr_storage));
+    udp_socket_list[udp_sockets].next_try_on_error = 0;
+
+    udp_sockets++;
+
+    return AFP_OK;
+}
+
+static void save_close_event(const AFPObj *obj, const char *path)
+{
+    time_t now = time(NULL);
+
+    /* Check if it's a close for the same event as the last one */
+    if (last_close_event.time   /* is there any saved event ? */
+        && (strcmp(path, last_close_event.path) != 0)) {
+        /* no, so send the saved event out now */
+        send_fce_event(obj, FCE_FILE_MODIFY,last_close_event.path, NULL);
+    }
+
+    LOG(log_debug, logtype_fce, "save_close_event: %s", path);
+
+    last_close_event.time = now;
+    strncpy(last_close_event.path, path, MAXPATHLEN);
+}
+
+static void fce_init_ign_names(const char *ignores)
+{
+    int count = 0;
+    char *names = strdup(ignores);
+    char *p;
+    int i = 0;
+
+    while (names[i]) {
+        count++;
+        for (; names[i] && names[i] != '/'; i++)
+            ;
+        if (!names[i])
+            break;
+        i++;
+    }
+
+    skip_files = calloc(count + 1, sizeof(char *));
+
+    for (i = 0, p = strtok(names, "/"); p ; p = strtok(NULL, "/"))
+        skip_files[i++] = strdup(p);
+
+    free(names);
+}
+
+/*
+ *
+ * Dispatcher for all incoming file change events
+ *
+ * */
+int fce_register(const AFPObj *obj, fce_ev_t event, const char *path, const char *oldpath)
+{
+    static bool first_event = true;
+    const char *bname;
+
+    if (!(fce_ev_enabled & (1 << event)))
+        return AFP_OK;
+
+    AFP_ASSERT(event >= FCE_FIRST_EVENT && event <= FCE_LAST_EVENT);
+    AFP_ASSERT(path);
+
+    LOG(log_debug, logtype_fce, "register_fce(path: %s, event: %s)",
+        path, fce_event_names[event]);
+
+    bname = basename_safe(path);
+
+    if ((udp_sockets == 0) && (obj->fce_notify_script == NULL)) {
+        /* No listeners configured */
+        return AFP_OK;
+    }
+
+       /* do some initialization on the fly the first time */
+       if (first_event) {
+               fce_initialize_history();
+        fce_init_ign_names(obj->fce_ign_names);
+        first_event = false;
+       }
+
+       /* handle files which should not cause events (.DS_Store atc. ) */
+    for (int i = 0; skip_files[i] != NULL; i++) {
+        if (strcmp(bname, skip_files[i]) == 0)
+                       return AFP_OK;
+       }
+
+       /* Can we ignore this event based on type or history? */
+       if (fce_handle_coalescation(event, path)) {
+               LOG(log_debug9, logtype_fce, "Coalesced fc event <%d> for <%s>", event, path);
+               return AFP_OK;
+       }
+
+    switch (event) {
+    case FCE_FILE_MODIFY:
+        save_close_event(obj, path);
+        break;
+    default:
+        send_fce_event(obj, event, path, oldpath);
+        break;
+    }
+
+    return AFP_OK;
+}
+
+static void check_saved_close_events(const AFPObj *obj)
+{
+    time_t now = time(NULL);
+
+    /* check if configured holdclose time has passed */
+    if (last_close_event.time && ((last_close_event.time + obj->options.fce_fmodwait) < now)) {
+        LOG(log_debug, logtype_fce, "check_saved_close_events: sending event: %s", last_close_event.path);
+        /* yes, send event */
+        send_fce_event(obj, FCE_FILE_MODIFY, &last_close_event.path[0], NULL);
+        last_close_event.path[0] = 0;
+        last_close_event.time = 0;
+    }
+}
+
+/******************** External calls start here **************************/
+
+/*
+ * API-Calls for file change api, called form outside (file.c directory.c ofork.c filedir.c)
+ * */
+void fce_pending_events(const AFPObj *obj)
+{
+    if (!udp_sockets)
+        return;
+    check_saved_close_events(obj);
+}
+
+/*
+ *
+ * Extern connect to afpd parameter, can be called multiple times for multiple listeners (up to MAX_UDP_SOCKS times)
+ *
+ * */
+int fce_add_udp_socket(const char *target)
+{
+       const char *port = FCE_DEFAULT_PORT_STRING;
+       char target_ip[256] = {""};
+
+       strncpy(target_ip, target, sizeof(target_ip) -1);
+
+       char *port_delim = strchr( target_ip, ':' );
+       if (port_delim) {
+               *port_delim = 0;
+               port = port_delim + 1;
+       }
+       return add_udp_socket(target_ip, port);
+}
+
+int fce_set_events(const char *events)
+{
+    char *e;
+    char *p;
+    
+    if (events == NULL)
+        return AFPERR_PARAM;
+
+    e = strdup(events);
+
+    fce_ev_enabled = 0;
+
+    for (p = strtok(e, ", "); p; p = strtok(NULL, ", ")) {
+        if (strcmp(p, "fmod") == 0) {
+            fce_ev_enabled |= (1 << FCE_FILE_MODIFY);
+        } else if (strcmp(p, "fdel") == 0) {
+            fce_ev_enabled |= (1 << FCE_FILE_DELETE);
+        } else if (strcmp(p, "ddel") == 0) {
+            fce_ev_enabled |= (1 << FCE_DIR_DELETE);
+        } else if (strcmp(p, "fcre") == 0) {
+            fce_ev_enabled |= (1 << FCE_FILE_CREATE);
+        } else if (strcmp(p, "dcre") == 0) {
+            fce_ev_enabled |= (1 << FCE_DIR_CREATE);
+        } else if (strcmp(p, "fmov") == 0) {
+            fce_ev_enabled |= (1 << FCE_FILE_MOVE);
+        } else if (strcmp(p, "dmov") == 0) {
+            fce_ev_enabled |= (1 << FCE_DIR_MOVE);
+        } else if (strcmp(p, "login") == 0) {
+            fce_ev_enabled |= (1 << FCE_LOGIN);
+        } else if (strcmp(p, "logout") == 0) {
+            fce_ev_enabled |= (1 << FCE_LOGOUT);
+        }
+    }
+
+    free(e);
+
+    return AFP_OK;
+}