]> arthur.barton.de Git - netatalk.git/blob - etc/afpd/fce_api.c
Merge remote branch 'netafp/master' into branch-allea
[netatalk.git] / etc / afpd / fce_api.c
1 /*
2  * Copyright (c) 2010 Mark Williams
3  *
4  * File change event API for netatalk
5  *
6  * for every detected filesystem change a UDP packet is sent to an arbitrary list
7  * of listeners. Each packet contains unix path of modified filesystem element,
8  * event reason, and a consecutive event id (32 bit). Technically we are UDP client and are sending
9  * out packets synchronuosly as they are created by the afp functions. This should not affect
10  * performance measurably. The only delaying calls occur during initialization, if we have to
11  * resolve non-IP hostnames to IP. All numeric data inside the packet is network byte order, so use
12  * ntohs / ntohl to resolve length and event id. Ideally a listener receives every packet with
13  * no gaps in event ids, starting with event id 1 and mode FCE_CONN_START followed by
14  * data events from id 2 up to 0xFFFFFFFF, followed by 0 to 0xFFFFFFFF and so on.
15  *
16  * A gap or not starting with 1 mode FCE_CONN_START or receiving mode FCE_CONN_BROKEN means that
17  * the listener has lost at least one filesystem event
18  * 
19  * All Rights Reserved.  See COPYRIGHT.
20  */
21
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif /* HAVE_CONFIG_H */
25
26 #include <stdio.h>
27
28 #include <string.h>
29 #include <stdlib.h>
30 #include <errno.h>
31 #include <time.h>
32
33
34 #include <sys/param.h>
35 #include <sys/socket.h>
36 #include <netinet/in.h>
37 #include <arpa/inet.h>
38 #include <netdb.h>
39
40 #include <atalk/adouble.h>
41 #include <atalk/vfs.h>
42 #include <atalk/logger.h>
43 #include <atalk/afp.h>
44 #include <atalk/util.h>
45 #include <atalk/cnid.h>
46 #include <atalk/unix.h>
47 #include <atalk/fce_api.h>
48 #include <atalk/globals.h>
49
50 #include "fork.h"
51 #include "file.h"
52 #include "directory.h"
53 #include "desktop.h"
54 #include "volume.h"
55
56 // ONLY USED IN THIS FILE
57 #include "fce_api_internal.h"
58
59 #define FCE_TRUE 1
60 #define FCE_FALSE 0
61
62 /* We store our connection data here */
63 static struct udp_entry udp_socket_list[FCE_MAX_UDP_SOCKS];
64 static int udp_sockets = 0;
65 static int udp_initialized = FCE_FALSE;
66 static unsigned long fce_ev_enabled =
67     (1 << FCE_FILE_MODIFY) |
68     (1 << FCE_FILE_DELETE) |
69     (1 << FCE_DIR_DELETE) |
70     (1 << FCE_FILE_CREATE) |
71     (1 << FCE_DIR_CREATE);
72
73 static uint64_t tm_used;          /* used for passing to event handler */
74 #define MAXIOBUF 1024
75 static char iobuf[MAXIOBUF];
76 static const char *skip_files[] = 
77 {
78         ".DS_Store",
79         NULL
80 };
81 static struct fce_close_event last_close_event;
82
83 /*
84  *
85  * Initialize network structs for any listeners
86  * We dont give return code because all errors are handled internally (I hope..)
87  *
88  * */
89 void fce_init_udp()
90 {
91     int rv;
92     struct addrinfo hints, *servinfo, *p;
93
94     if (udp_initialized == FCE_TRUE)
95         return;
96
97     memset(&hints, 0, sizeof hints);
98     hints.ai_family = AF_UNSPEC;
99     hints.ai_socktype = SOCK_DGRAM;
100
101     for (int i = 0; i < udp_sockets; i++) {
102         struct udp_entry *udp_entry = udp_socket_list + i;
103
104         /* Close any pending sockets */
105         if (udp_entry->sock != -1)
106             close(udp_entry->sock);
107
108         if ((rv = getaddrinfo(udp_entry->addr, udp_entry->port, &hints, &servinfo)) != 0) {
109             LOG(log_error, logtype_afpd, "fce_init_udp: getaddrinfo(%s:%s): %s",
110                 udp_entry->addr, udp_entry->port, gai_strerror(rv));
111             continue;
112         }
113
114         /* loop through all the results and make a socket */
115         for (p = servinfo; p != NULL; p = p->ai_next) {
116             if ((udp_entry->sock = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1) {
117                 LOG(log_error, logtype_afpd, "fce_init_udp: socket(%s:%s): %s",
118                     udp_entry->addr, udp_entry->port, strerror(errno));
119                 continue;
120             }
121             break;
122         }
123
124         if (p == NULL) {
125             LOG(log_error, logtype_afpd, "fce_init_udp: no socket for %s:%s",
126                 udp_entry->addr, udp_entry->port);
127         }
128         udp_entry->addrinfo = *p;
129         memcpy(&udp_entry->addrinfo, p, sizeof(struct addrinfo));
130         memcpy(&udp_entry->sockaddr, p->ai_addr, sizeof(struct sockaddr_storage));
131         freeaddrinfo(servinfo);
132     }
133
134     udp_initialized = FCE_TRUE;
135 }
136
137 void fce_cleanup()
138 {
139     if (udp_initialized == FCE_FALSE )
140         return;
141
142     for (int i = 0; i < udp_sockets; i++)
143     {
144         struct udp_entry *udp_entry = udp_socket_list + i;
145
146         /* Close any pending sockets */
147         if (udp_entry->sock != -1)
148         {
149             close( udp_entry->sock );
150             udp_entry->sock = -1;
151         }
152     }
153     udp_initialized = FCE_FALSE;
154 }
155
156 /*
157  * Construct a UDP packet for our listeners and return packet size
158  * */
159 static ssize_t build_fce_packet( struct fce_packet *packet, char *path, int mode, uint32_t event_id )
160 {
161     size_t pathlen = 0;
162     ssize_t data_len = 0;
163     uint64_t *t;
164
165     /* Set content of packet */
166     memcpy(packet->magic, FCE_PACKET_MAGIC, sizeof(packet->magic) );
167     packet->version = FCE_PACKET_VERSION;
168     packet->mode = mode;
169    
170     packet->event_id = event_id; 
171
172     pathlen = strlen(path); /* exclude string terminator */
173     
174     /* This should never happen, but before we bust this server, we send nonsense, fce listener has to cope */
175     if (pathlen >= MAXPATHLEN)
176         pathlen = MAXPATHLEN - 1;
177
178     packet->datalen = pathlen;
179
180     /* This is the payload len. Means: the packet has len bytes more until packet is finished */
181     data_len = FCE_PACKET_HEADER_SIZE + pathlen;
182
183     switch (mode) {
184     case FCE_TM_SIZE:
185         t = (uint64_t *)packet->data;
186         *t = hton64(tm_used);
187         memcpy(packet->data + sizeof(tm_used), path, pathlen);
188         
189         packet->datalen = pathlen + sizeof(tm_used);
190         data_len += sizeof(tm_used);
191         break;
192     default:
193         memcpy(packet->data, path, pathlen);
194         break;
195     }
196
197     /* return the packet len */
198     return data_len;
199 }
200
201 /*
202  * Handle Endianess and write into buffer w/o padding
203  **/ 
204 static void pack_fce_packet(struct fce_packet *packet, unsigned char *buf, int maxlen)
205 {
206     unsigned char *p = buf;
207
208     memcpy(p, &packet->magic[0], sizeof(packet->magic));
209     p += sizeof(packet->magic);
210
211     *p = packet->version;
212     p++;
213     
214     *p = packet->mode;
215     p++;
216     
217     uint32_t *id = (uint32_t*)p;
218     *id = htonl(packet->event_id);
219     p += sizeof(packet->event_id);
220
221     uint16_t *l = ( uint16_t *)p;
222     *l = htons(packet->datalen);
223     p += sizeof(packet->datalen);
224
225     if (((p - buf) +  packet->datalen) < maxlen) {
226         memcpy(p, &packet->data[0], packet->datalen);
227     }
228 }
229
230 /*
231  * Send the fce information to all (connected) listeners
232  * We dont give return code because all errors are handled internally (I hope..)
233  * */
234 static void send_fce_event( char *path, int mode )
235 {    
236     static int first_event = FCE_TRUE;
237
238     struct fce_packet packet;
239     void *data = &packet;
240     static uint32_t event_id = 0; /* the unique packet couter to detect packet/data loss. Going from 0xFFFFFFFF to 0x0 is a valid increment */
241     time_t now = time(NULL);
242
243     LOG(log_debug, logtype_afpd, "send_fce_event: start");
244
245     /* initialized ? */
246     if (first_event == FCE_TRUE) {
247         first_event = FCE_FALSE;
248         fce_init_udp();
249         /* Notify listeners the we start from the beginning */
250         send_fce_event( "", FCE_CONN_START );
251     }
252
253     /* build our data packet */
254     ssize_t data_len = build_fce_packet( &packet, path, mode, ++event_id );
255     pack_fce_packet(&packet, iobuf, MAXIOBUF);
256
257     for (int i = 0; i < udp_sockets; i++)
258     {
259         int sent_data = 0;
260         struct udp_entry *udp_entry = udp_socket_list + i;
261
262         /* we had a problem earlier ? */
263         if (udp_entry->sock == -1)
264         {
265             /* We still have to wait ?*/
266             if (now < udp_entry->next_try_on_error)
267                 continue;
268
269             /* Reopen socket */
270             udp_entry->sock = socket(udp_entry->addrinfo.ai_family,
271                                      udp_entry->addrinfo.ai_socktype,
272                                      udp_entry->addrinfo.ai_protocol);
273             
274             if (udp_entry->sock == -1) {
275                 /* failed again, so go to rest again */
276                 LOG(log_error, logtype_afpd, "Cannot recreate socket for fce UDP connection: errno %d", errno  );
277
278                 udp_entry->next_try_on_error = now + FCE_SOCKET_RETRY_DELAY_S;
279                 continue;
280             }
281
282             udp_entry->next_try_on_error = 0;
283
284             /* Okay, we have a running socket again, send server that we had a problem on our side*/
285             data_len = build_fce_packet( &packet, "", FCE_CONN_BROKEN, 0 );
286             pack_fce_packet(&packet, iobuf, MAXIOBUF);
287
288             sendto(udp_entry->sock,
289                    iobuf,
290                    data_len,
291                    0,
292                    (struct sockaddr *)&udp_entry->sockaddr,
293                    udp_entry->addrinfo.ai_addrlen);
294
295             /* Rebuild our original data packet */
296             data_len = build_fce_packet( &packet, path, mode, event_id );
297             pack_fce_packet(&packet, iobuf, MAXIOBUF);
298         }
299
300         sent_data = sendto(udp_entry->sock,
301                            iobuf,
302                            data_len,
303                            0,
304                            (struct sockaddr *)&udp_entry->sockaddr,
305                            udp_entry->addrinfo.ai_addrlen);
306
307         /* Problems ? */
308         if (sent_data != data_len) {
309             /* Argh, socket broke, we close and retry later */
310             LOG(log_error, logtype_afpd, "send_fce_event: error sending packet to %s:%s, transfered %d of %d: %s",
311                 udp_entry->addr, udp_entry->port, sent_data, data_len, strerror(errno));
312
313             close( udp_entry->sock );
314             udp_entry->sock = -1;
315             udp_entry->next_try_on_error = now + FCE_SOCKET_RETRY_DELAY_S;
316         }
317     }
318 }
319
320 static int add_udp_socket(const char *target_ip, const char *target_port )
321 {
322     if (target_port == NULL)
323         target_port = FCE_DEFAULT_PORT_STRING;
324
325     if (udp_sockets >= FCE_MAX_UDP_SOCKS) {
326         LOG(log_error, logtype_afpd, "Too many file change api UDP connections (max %d allowed)", FCE_MAX_UDP_SOCKS );
327         return AFPERR_PARAM;
328     }
329
330     udp_socket_list[udp_sockets].addr = strdup(target_ip);
331     udp_socket_list[udp_sockets].port = strdup(target_port);
332     udp_socket_list[udp_sockets].sock = -1;
333     memset(&udp_socket_list[udp_sockets].addrinfo, 0, sizeof(struct addrinfo));
334     memset(&udp_socket_list[udp_sockets].sockaddr, 0, sizeof(struct sockaddr_storage));
335     udp_socket_list[udp_sockets].next_try_on_error = 0;
336
337     udp_sockets++;
338
339     return AFP_OK;
340 }
341
342 static void save_close_event(const char *path)
343 {
344     time_t now = time(NULL);
345
346     /* Check if it's a close for the same event as the last one */
347     if (last_close_event.time   /* is there any saved event ? */
348         && (strcmp(path, last_close_event.path) != 0)) {
349         /* no, so send the saved event out now */
350         send_fce_event(last_close_event.path, FCE_FILE_MODIFY);
351     }
352
353     LOG(log_debug, logtype_afpd, "save_close_event: %s", path);
354
355     last_close_event.time = now;
356     strncpy(last_close_event.path, path, MAXPATHLEN);
357 }
358
359 /*
360  *
361  * Dispatcher for all incoming file change events
362  *
363  * */
364 static int register_fce(const char *u_name, int is_dir, int mode)
365 {
366     static int first_event = FCE_TRUE;
367
368     if (udp_sockets == 0)
369         /* No listeners configured */
370         return AFP_OK;
371
372     if (u_name == NULL)
373         return AFPERR_PARAM;
374
375         /* do some initialization on the fly the first time */
376         if (first_event) {
377                 fce_initialize_history();
378         first_event = FCE_FALSE;
379         }
380
381         /* handle files which should not cause events (.DS_Store atc. ) */
382         for (int i = 0; skip_files[i] != NULL; i++)
383         {
384                 if (!strcmp( u_name, skip_files[i]))
385                         return AFP_OK;
386         }
387
388
389         char full_path_buffer[MAXPATHLEN + 1] = {""};
390         const char *cwd = getcwdpath();
391
392     if (mode == FCE_TM_SIZE) {
393         strlcpy(full_path_buffer, u_name, MAXPATHLEN);
394     } else if (!is_dir || mode == FCE_DIR_DELETE) {
395                 if (strlen( cwd ) + strlen( u_name) + 1 >= MAXPATHLEN) {
396                         LOG(log_error, logtype_afpd, "FCE file name too long: %s/%s", cwd, u_name );
397                         return AFPERR_PARAM;
398                 }
399                 sprintf( full_path_buffer, "%s/%s", cwd, u_name );
400         } else {
401                 if (strlen( cwd ) >= MAXPATHLEN) {
402                         LOG(log_error, logtype_afpd, "FCE directory name too long: %s", cwd);
403                         return AFPERR_PARAM;
404                 }
405                 strcpy( full_path_buffer, cwd);
406         }
407
408         /* Can we ignore this event based on type or history? */
409         if (!(mode & FCE_TM_SIZE) && fce_handle_coalescation( full_path_buffer, is_dir, mode ))
410         {
411                 LOG(log_debug9, logtype_afpd, "Coalesced fc event <%d> for <%s>", mode, full_path_buffer );
412                 return AFP_OK;
413         }
414
415         LOG(log_debug9, logtype_afpd, "Detected fc event <%d> for <%s>", mode, full_path_buffer );
416
417     if (mode & FCE_FILE_MODIFY) {
418         save_close_event(full_path_buffer);
419         return AFP_OK;
420     }
421
422     send_fce_event( full_path_buffer, mode );
423
424     return AFP_OK;
425 }
426
427 static void check_saved_close_events(int fmodwait)
428 {
429     time_t now = time(NULL);
430
431     /* check if configured holdclose time has passed */
432     if (last_close_event.time && ((last_close_event.time + fmodwait) < now)) {
433         LOG(log_debug, logtype_afpd, "check_saved_close_events: sending event: %s", last_close_event.path);
434         /* yes, send event */
435         send_fce_event(&last_close_event.path[0], FCE_FILE_MODIFY);
436         last_close_event.path[0] = 0;
437         last_close_event.time = 0;
438     }
439 }
440
441 /******************** External calls start here **************************/
442
443 /*
444  * API-Calls for file change api, called form outside (file.c directory.c ofork.c filedir.c)
445  * */
446 #ifndef FCE_TEST_MAIN
447
448 void fce_pending_events(AFPObj *obj)
449 {
450     vol_fce_tm_event();
451     check_saved_close_events(obj->options.fce_fmodwait);
452 }
453
454 int fce_register_delete_file( struct path *path )
455 {
456     int ret = AFP_OK;
457
458     if (path == NULL)
459         return AFPERR_PARAM;
460
461     if (!(fce_ev_enabled & (1 << FCE_FILE_DELETE)))
462         return ret;
463         
464     ret = register_fce( path->u_name, false, FCE_FILE_DELETE );
465
466     return ret;
467 }
468 int fce_register_delete_dir( char *name )
469 {
470     int ret = AFP_OK;
471
472     if (name == NULL)
473         return AFPERR_PARAM;
474
475     if (!(fce_ev_enabled & (1 << FCE_DIR_DELETE)))
476         return ret;
477         
478     ret = register_fce( name, true, FCE_DIR_DELETE);
479
480     return ret;
481 }
482
483 int fce_register_new_dir( struct path *path )
484 {
485     int ret = AFP_OK;
486
487     if (path == NULL)
488         return AFPERR_PARAM;
489
490     if (!(fce_ev_enabled & (1 << FCE_DIR_CREATE)))
491         return ret;
492
493     ret = register_fce( path->u_name, true, FCE_DIR_CREATE );
494
495     return ret;
496 }
497
498
499 int fce_register_new_file( struct path *path )
500 {
501     int ret = AFP_OK;
502
503     if (path == NULL)
504         return AFPERR_PARAM;
505
506     if (!(fce_ev_enabled & (1 << FCE_FILE_CREATE)))
507         return ret;
508
509     ret = register_fce( path->u_name, false, FCE_FILE_CREATE );
510
511     return ret;
512 }
513
514 int fce_register_file_modification( struct ofork *ofork )
515 {
516     char *u_name = NULL;
517     struct vol *vol;
518     int ret = AFP_OK;
519
520     if (ofork == NULL || ofork->of_vol == NULL)
521         return AFPERR_PARAM;
522
523     if (!(fce_ev_enabled & (1 << FCE_FILE_MODIFY)))
524         return ret;
525
526     vol = ofork->of_vol;
527
528     if (NULL == (u_name = mtoupath(vol, of_name(ofork), ofork->of_did, utf8_encoding(vol->v_obj)))) 
529     {
530         return AFPERR_MISC;
531     }
532     
533     ret = register_fce( u_name, false, FCE_FILE_MODIFY );
534     
535     return ret;    
536 }
537
538 int fce_register_tm_size(const char *vol, size_t used)
539 {
540     int ret = AFP_OK;
541
542     if (vol == NULL)
543         return AFPERR_PARAM;
544
545     if (!(fce_ev_enabled & (1 << FCE_TM_SIZE)))
546         return ret;
547
548     tm_used = used;             /* oh what a hack */
549     ret = register_fce(vol, false, FCE_TM_SIZE);
550
551     return ret;
552 }
553 #endif
554
555 /*
556  *
557  * Extern connect to afpd parameter, can be called multiple times for multiple listeners (up to MAX_UDP_SOCKS times)
558  *
559  * */
560 int fce_add_udp_socket(const char *target)
561 {
562         const char *port = FCE_DEFAULT_PORT_STRING;
563         char target_ip[256] = {""};
564
565         strncpy(target_ip, target, sizeof(target_ip) -1);
566
567         char *port_delim = strchr( target_ip, ':' );
568         if (port_delim) {
569                 *port_delim = 0;
570                 port = port_delim + 1;
571         }
572         return add_udp_socket(target_ip, port);
573 }
574
575 int fce_set_events(const char *events)
576 {
577     char *e;
578     char *p;
579     
580     if (events == NULL)
581         return AFPERR_PARAM;
582
583     e = strdup(events);
584
585     fce_ev_enabled = 0;
586
587     for (p = strtok(e, ","); p; p = strtok(NULL, ",")) {
588         if (strcmp(p, "fmod") == 0) {
589             fce_ev_enabled |= (1 << FCE_FILE_MODIFY);
590         } else if (strcmp(p, "fdel") == 0) {
591             fce_ev_enabled |= (1 << FCE_FILE_DELETE);
592         } else if (strcmp(p, "ddel") == 0) {
593             fce_ev_enabled |= (1 << FCE_DIR_DELETE);
594         } else if (strcmp(p, "fcre") == 0) {
595             fce_ev_enabled |= (1 << FCE_FILE_CREATE);
596         } else if (strcmp(p, "dcre") == 0) {
597             fce_ev_enabled |= (1 << FCE_DIR_CREATE);
598         } else if (strcmp(p, "tmsz") == 0) {
599             fce_ev_enabled |= (1 << FCE_TM_SIZE);
600         }
601     }
602
603     free(e);
604
605     return AFP_OK;
606 }
607
608 #ifdef FCE_TEST_MAIN
609
610
611 void shortsleep( unsigned int us )
612 {    
613     usleep( us );
614 }
615 int main( int argc, char*argv[] )
616 {
617     int c,ret;
618
619     char *port = FCE_DEFAULT_PORT_STRING;
620     char *host = "localhost";
621     int delay_between_events = 1000;
622     int event_code = FCE_FILE_MODIFY;
623     char pathbuff[1024];
624     int duration_in_seconds = 0; // TILL ETERNITY
625     char target[256];
626     char *path = getcwd( pathbuff, sizeof(pathbuff) );
627
628     // FULLSPEED TEST IS "-s 1001" -> delay is 0 -> send packets without pause
629
630     while ((c = getopt(argc, argv, "d:e:h:p:P:s:")) != -1) {
631         switch(c) {
632         case '?':
633             fprintf(stdout, "%s: [ -p Port -h Listener1 [ -h Listener2 ...] -P path -s Delay_between_events_in_us -e event_code -d Duration ]\n", argv[0]);
634             exit(1);
635             break;
636         case 'd':
637             duration_in_seconds = atoi(optarg);
638             break;
639         case 'e':
640             event_code = atoi(optarg);
641             break;
642         case 'h':
643             host = strdup(optarg);
644             break;
645         case 'p':
646             port = strdup(optarg);
647             break;
648         case 'P':
649             path = strdup(optarg);
650             break;
651         case 's':
652             delay_between_events = atoi(optarg);
653             break;
654         }
655     }
656
657     sprintf(target, "%s:%s", host, port);
658     if (fce_add_udp_socket(target) != 0)
659         return 1;
660
661     int ev_cnt = 0;
662     time_t start_time = time(NULL);
663     time_t end_time = 0;
664
665     if (duration_in_seconds)
666         end_time = start_time + duration_in_seconds;
667
668     while (1)
669     {
670         time_t now = time(NULL);
671         if (now > start_time)
672         {
673             start_time = now;
674             fprintf( stdout, "%d events/s\n", ev_cnt );
675             ev_cnt = 0;
676         }
677         if (end_time && now >= end_time)
678             break;
679
680         register_fce( path, 0, event_code );
681         ev_cnt++;
682
683         
684         shortsleep( delay_between_events );
685     }
686 }
687 #endif /* TESTMAIN*/