]> arthur.barton.de Git - netatalk.git/blob - etc/afpd/fce_api.c
Merge remote branch 'origin/product-2-2' into develop
[netatalk.git] / etc / afpd / fce_api.c
1 /*
2  * Copyright (c) 2010 Mark Williams
3  *
4  * File change event API for netatalk
5  *
6  * for every detected filesystem change a UDP packet is sent to an arbitrary list
7  * of listeners. Each packet contains unix path of modified filesystem element,
8  * event reason, and a consecutive event id (32 bit). Technically we are UDP client and are sending
9  * out packets synchronuosly as they are created by the afp functions. This should not affect
10  * performance measurably. The only delaying calls occur during initialization, if we have to
11  * resolve non-IP hostnames to IP. All numeric data inside the packet is network byte order, so use
12  * ntohs / ntohl to resolve length and event id. Ideally a listener receives every packet with
13  * no gaps in event ids, starting with event id 1 and mode FCE_CONN_START followed by
14  * data events from id 2 up to 0xFFFFFFFF, followed by 0 to 0xFFFFFFFF and so on.
15  *
16  * A gap or not starting with 1 mode FCE_CONN_START or receiving mode FCE_CONN_BROKEN means that
17  * the listener has lost at least one filesystem event
18  * 
19  * All Rights Reserved.  See COPYRIGHT.
20  */
21
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif /* HAVE_CONFIG_H */
25
26 #include <stdio.h>
27
28 #include <string.h>
29 #include <stdlib.h>
30 #include <errno.h>
31 #include <time.h>
32
33
34 #include <sys/param.h>
35 #include <sys/socket.h>
36 #include <netinet/in.h>
37 #include <arpa/inet.h>
38 #include <netdb.h>
39
40 #include <atalk/adouble.h>
41 #include <atalk/vfs.h>
42 #include <atalk/logger.h>
43 #include <atalk/afp.h>
44 #include <atalk/util.h>
45 #include <atalk/cnid.h>
46 #include <atalk/unix.h>
47 #include <atalk/fce_api.h>
48 #include <atalk/globals.h>
49
50 #include "fork.h"
51 #include "file.h"
52 #include "directory.h"
53 #include "desktop.h"
54 #include "volume.h"
55
56 // ONLY USED IN THIS FILE
57 #include "fce_api_internal.h"
58
59 #define FCE_TRUE 1
60 #define FCE_FALSE 0
61
62 /* We store our connection data here */
63 static struct udp_entry udp_socket_list[FCE_MAX_UDP_SOCKS];
64 static int udp_sockets = 0;
65 static int udp_initialized = FCE_FALSE;
66 static unsigned long fce_ev_enabled =
67     (1 << FCE_FILE_MODIFY) |
68     (1 << FCE_FILE_DELETE) |
69     (1 << FCE_DIR_DELETE) |
70     (1 << FCE_FILE_CREATE) |
71     (1 << FCE_DIR_CREATE);
72
73 static uint64_t tm_used;          /* used for passing to event handler */
74 #define MAXIOBUF 1024
75 static char iobuf[MAXIOBUF];
76 static const char *skip_files[] = 
77 {
78         ".DS_Store",
79         NULL
80 };
81 static struct fce_close_event last_close_event;
82
83 /*
84  *
85  * Initialize network structs for any listeners
86  * We dont give return code because all errors are handled internally (I hope..)
87  *
88  * */
89 void fce_init_udp()
90 {
91     int rv;
92     struct addrinfo hints, *servinfo, *p;
93
94     if (udp_initialized == FCE_TRUE)
95         return;
96
97     memset(&hints, 0, sizeof hints);
98     hints.ai_family = AF_UNSPEC;
99     hints.ai_socktype = SOCK_DGRAM;
100
101     for (int i = 0; i < udp_sockets; i++) {
102         struct udp_entry *udp_entry = udp_socket_list + i;
103
104         /* Close any pending sockets */
105         if (udp_entry->sock != -1)
106             close(udp_entry->sock);
107
108         if ((rv = getaddrinfo(udp_entry->addr, udp_entry->port, &hints, &servinfo)) != 0) {
109             LOG(log_error, logtype_afpd, "fce_init_udp: getaddrinfo(%s:%s): %s",
110                 udp_entry->addr, udp_entry->port, gai_strerror(rv));
111             continue;
112         }
113
114         /* loop through all the results and make a socket */
115         for (p = servinfo; p != NULL; p = p->ai_next) {
116             if ((udp_entry->sock = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1) {
117                 LOG(log_error, logtype_afpd, "fce_init_udp: socket(%s:%s): %s",
118                     udp_entry->addr, udp_entry->port, strerror(errno));
119                 continue;
120             }
121             break;
122         }
123
124         if (p == NULL) {
125             LOG(log_error, logtype_afpd, "fce_init_udp: no socket for %s:%s",
126                 udp_entry->addr, udp_entry->port);
127         }
128         udp_entry->addrinfo = *p;
129         memcpy(&udp_entry->addrinfo, p, sizeof(struct addrinfo));
130         memcpy(&udp_entry->sockaddr, p->ai_addr, sizeof(struct sockaddr_storage));
131         freeaddrinfo(servinfo);
132     }
133
134     udp_initialized = FCE_TRUE;
135 }
136
137 void fce_cleanup()
138 {
139     if (udp_initialized == FCE_FALSE )
140         return;
141
142     for (int i = 0; i < udp_sockets; i++)
143     {
144         struct udp_entry *udp_entry = udp_socket_list + i;
145
146         /* Close any pending sockets */
147         if (udp_entry->sock != -1)
148         {
149             close( udp_entry->sock );
150             udp_entry->sock = -1;
151         }
152     }
153     udp_initialized = FCE_FALSE;
154 }
155
156 /*
157  * Construct a UDP packet for our listeners and return packet size
158  * */
159 static ssize_t build_fce_packet( struct fce_packet *packet, char *path, int mode, uint32_t event_id )
160 {
161     size_t pathlen = 0;
162     ssize_t data_len = 0;
163     uint64_t *t;
164
165     /* Set content of packet */
166     memcpy(packet->magic, FCE_PACKET_MAGIC, sizeof(packet->magic) );
167     packet->version = FCE_PACKET_VERSION;
168     packet->mode = mode;
169    
170     packet->event_id = event_id; 
171
172     pathlen = strlen(path); /* exclude string terminator */
173     
174     /* This should never happen, but before we bust this server, we send nonsense, fce listener has to cope */
175     if (pathlen >= MAXPATHLEN)
176         pathlen = MAXPATHLEN - 1;
177
178     packet->datalen = pathlen;
179
180     /* This is the payload len. Means: the packet has len bytes more until packet is finished */
181     data_len = FCE_PACKET_HEADER_SIZE + pathlen;
182
183     switch (mode) {
184     case FCE_TM_SIZE:
185         t = (uint64_t *)packet->data;
186         *t = hton64(tm_used);
187         memcpy(packet->data + sizeof(tm_used), path, pathlen);
188         
189         packet->datalen = pathlen + sizeof(tm_used);
190         data_len += sizeof(tm_used);
191         break;
192     default:
193         memcpy(packet->data, path, pathlen);
194         break;
195     }
196
197     /* return the packet len */
198     return data_len;
199 }
200
201 /*
202  * Handle Endianess and write into buffer w/o padding
203  **/ 
204 static void pack_fce_packet(struct fce_packet *packet, unsigned char *buf, int maxlen)
205 {
206     unsigned char *p = buf;
207
208     memcpy(p, &packet->magic[0], sizeof(packet->magic));
209     p += sizeof(packet->magic);
210
211     *p = packet->version;
212     p++;
213     
214     *p = packet->mode;
215     p++;
216     
217     uint32_t *id = (uint32_t*)p;
218     *id = htonl(packet->event_id);
219     p += sizeof(packet->event_id);
220
221     uint16_t *l = ( uint16_t *)p;
222     *l = htons(packet->datalen);
223     p += sizeof(packet->datalen);
224
225     if (((p - buf) +  packet->datalen) < maxlen) {
226         memcpy(p, &packet->data[0], packet->datalen);
227     }
228 }
229
230 /*
231  * Send the fce information to all (connected) listeners
232  * We dont give return code because all errors are handled internally (I hope..)
233  * */
234 static void send_fce_event( char *path, int mode )
235 {    
236     static int first_event = FCE_TRUE;
237
238     struct fce_packet packet;
239     void *data = &packet;
240     static uint32_t event_id = 0; /* the unique packet couter to detect packet/data loss. Going from 0xFFFFFFFF to 0x0 is a valid increment */
241     time_t now = time(NULL);
242
243     LOG(log_debug, logtype_afpd, "send_fce_event: start");
244
245     /* initialized ? */
246     if (first_event == FCE_TRUE) {
247         first_event = FCE_FALSE;
248         fce_init_udp();
249         /* Notify listeners the we start from the beginning */
250         send_fce_event( "", FCE_CONN_START );
251     }
252
253     /* build our data packet */
254     ssize_t data_len = build_fce_packet( &packet, path, mode, ++event_id );
255     pack_fce_packet(&packet, iobuf, MAXIOBUF);
256
257     for (int i = 0; i < udp_sockets; i++)
258     {
259         int sent_data = 0;
260         struct udp_entry *udp_entry = udp_socket_list + i;
261
262         /* we had a problem earlier ? */
263         if (udp_entry->sock == -1)
264         {
265             /* We still have to wait ?*/
266             if (now < udp_entry->next_try_on_error)
267                 continue;
268
269             /* Reopen socket */
270             udp_entry->sock = socket(udp_entry->addrinfo.ai_family,
271                                      udp_entry->addrinfo.ai_socktype,
272                                      udp_entry->addrinfo.ai_protocol);
273             
274             if (udp_entry->sock == -1) {
275                 /* failed again, so go to rest again */
276                 LOG(log_error, logtype_afpd, "Cannot recreate socket for fce UDP connection: errno %d", errno  );
277
278                 udp_entry->next_try_on_error = now + FCE_SOCKET_RETRY_DELAY_S;
279                 continue;
280             }
281
282             udp_entry->next_try_on_error = 0;
283
284             /* Okay, we have a running socket again, send server that we had a problem on our side*/
285             data_len = build_fce_packet( &packet, "", FCE_CONN_BROKEN, 0 );
286             pack_fce_packet(&packet, iobuf, MAXIOBUF);
287
288             sendto(udp_entry->sock,
289                    iobuf,
290                    data_len,
291                    0,
292                    (struct sockaddr *)&udp_entry->sockaddr,
293                    udp_entry->addrinfo.ai_addrlen);
294
295             /* Rebuild our original data packet */
296             data_len = build_fce_packet( &packet, path, mode, event_id );
297             pack_fce_packet(&packet, iobuf, MAXIOBUF);
298         }
299
300         sent_data = sendto(udp_entry->sock,
301                            iobuf,
302                            data_len,
303                            0,
304                            (struct sockaddr *)&udp_entry->sockaddr,
305                            udp_entry->addrinfo.ai_addrlen);
306
307         /* Problems ? */
308         if (sent_data != data_len) {
309             /* Argh, socket broke, we close and retry later */
310             LOG(log_error, logtype_afpd, "send_fce_event: error sending packet to %s:%s, transfered %d of %d: %s",
311                 udp_entry->addr, udp_entry->port, sent_data, data_len, strerror(errno));
312
313             close( udp_entry->sock );
314             udp_entry->sock = -1;
315             udp_entry->next_try_on_error = now + FCE_SOCKET_RETRY_DELAY_S;
316         }
317     }
318 }
319
320 static int add_udp_socket(const char *target_ip, const char *target_port )
321 {
322     if (target_port == NULL)
323         target_port = FCE_DEFAULT_PORT_STRING;
324
325     if (udp_sockets >= FCE_MAX_UDP_SOCKS) {
326         LOG(log_error, logtype_afpd, "Too many file change api UDP connections (max %d allowed)", FCE_MAX_UDP_SOCKS );
327         return AFPERR_PARAM;
328     }
329
330     udp_socket_list[udp_sockets].addr = strdup(target_ip);
331     udp_socket_list[udp_sockets].port = strdup(target_port);
332     udp_socket_list[udp_sockets].sock = -1;
333     memset(&udp_socket_list[udp_sockets].addrinfo, 0, sizeof(struct addrinfo));
334     memset(&udp_socket_list[udp_sockets].sockaddr, 0, sizeof(struct sockaddr_storage));
335     udp_socket_list[udp_sockets].next_try_on_error = 0;
336
337     udp_sockets++;
338
339     return AFP_OK;
340 }
341
342 static void save_close_event(const char *path)
343 {
344     time_t now = time(NULL);
345
346     /* Check if it's a close for the same event as the last one */
347     if (last_close_event.time   /* is there any saved event ? */
348         && (strcmp(path, last_close_event.path) != 0)) {
349         /* no, so send the saved event out now */
350         send_fce_event(last_close_event.path, FCE_FILE_MODIFY);
351     }
352
353     LOG(log_debug, logtype_afpd, "save_close_event: %s", path);
354
355     last_close_event.time = now;
356     strncpy(last_close_event.path, path, MAXPATHLEN);
357 }
358
359 /*
360  *
361  * Dispatcher for all incoming file change events
362  *
363  * */
364 static int register_fce(const char *u_name, int is_dir, int mode)
365 {
366     static int first_event = FCE_TRUE;
367
368     if (udp_sockets == 0)
369         /* No listeners configured */
370         return AFP_OK;
371
372     if (u_name == NULL)
373         return AFPERR_PARAM;
374
375         /* do some initialization on the fly the first time */
376         if (first_event) {
377                 fce_initialize_history();
378         first_event = FCE_FALSE;
379         }
380
381         /* handle files which should not cause events (.DS_Store atc. ) */
382         for (int i = 0; skip_files[i] != NULL; i++)
383         {
384                 if (!strcmp( u_name, skip_files[i]))
385                         return AFP_OK;
386         }
387
388
389         char full_path_buffer[MAXPATHLEN + 1] = {""};
390         const char *cwd = getcwdpath();
391
392     if (mode == FCE_TM_SIZE) {
393         strlcpy(full_path_buffer, u_name, MAXPATHLEN);
394     } else if (!is_dir || mode == FCE_DIR_DELETE) {
395                 if (strlen( cwd ) + strlen( u_name) + 1 >= MAXPATHLEN) {
396                         LOG(log_error, logtype_afpd, "FCE file name too long: %s/%s", cwd, u_name );
397                         return AFPERR_PARAM;
398                 }
399                 sprintf( full_path_buffer, "%s/%s", cwd, u_name );
400         } else {
401                 if (strlen( cwd ) >= MAXPATHLEN) {
402                         LOG(log_error, logtype_afpd, "FCE directory name too long: %s", cwd);
403                         return AFPERR_PARAM;
404                 }
405                 strcpy( full_path_buffer, cwd);
406         }
407
408         /* Can we ignore this event based on type or history? */
409         if (!(mode & FCE_TM_SIZE) && fce_handle_coalescation( full_path_buffer, is_dir, mode ))
410         {
411                 LOG(log_debug9, logtype_afpd, "Coalesced fc event <%d> for <%s>", mode, full_path_buffer );
412                 return AFP_OK;
413         }
414
415         LOG(log_debug9, logtype_afpd, "Detected fc event <%d> for <%s>", mode, full_path_buffer );
416
417     if (mode & FCE_FILE_MODIFY) {
418         save_close_event(full_path_buffer);
419         return AFP_OK;
420     }
421
422     send_fce_event( full_path_buffer, mode );
423
424     return AFP_OK;
425 }
426
427 static void check_saved_close_events(int fmodwait)
428 {
429     time_t now = time(NULL);
430
431     /* check if configured holdclose time has passed */
432     if (last_close_event.time && ((last_close_event.time + fmodwait) < now)) {
433         LOG(log_debug, logtype_afpd, "check_saved_close_events: sending event: %s", last_close_event.path);
434         /* yes, send event */
435         send_fce_event(&last_close_event.path[0], FCE_FILE_MODIFY);
436         last_close_event.path[0] = 0;
437         last_close_event.time = 0;
438     }
439 }
440
441 /******************** External calls start here **************************/
442
443 /*
444  * API-Calls for file change api, called form outside (file.c directory.c ofork.c filedir.c)
445  * */
446 #ifndef FCE_TEST_MAIN
447
448 void fce_pending_events(AFPObj *obj)
449 {
450     vol_fce_tm_event();
451     check_saved_close_events(obj->options.fce_fmodwait);
452 }
453
454 int fce_register_delete_file( struct path *path )
455 {
456     int ret = AFP_OK;
457
458     if (path == NULL)
459         return AFPERR_PARAM;
460
461     if (!(fce_ev_enabled & (1 << FCE_FILE_DELETE)))
462         return ret;
463         
464     ret = register_fce( path->u_name, false, FCE_FILE_DELETE );
465
466     return ret;
467 }
468 int fce_register_delete_dir( char *name )
469 {
470     int ret = AFP_OK;
471
472     if (name == NULL)
473         return AFPERR_PARAM;
474
475     if (!(fce_ev_enabled & (1 << FCE_DIR_DELETE)))
476         return ret;
477         
478     ret = register_fce( name, true, FCE_DIR_DELETE);
479
480     return ret;
481 }
482
483 int fce_register_new_dir( struct path *path )
484 {
485     int ret = AFP_OK;
486
487     if (path == NULL)
488         return AFPERR_PARAM;
489
490     if (!(fce_ev_enabled & (1 << FCE_DIR_CREATE)))
491         return ret;
492
493     ret = register_fce( path->u_name, true, FCE_DIR_CREATE );
494
495     return ret;
496 }
497
498
499 int fce_register_new_file( struct path *path )
500 {
501     int ret = AFP_OK;
502
503     if (path == NULL)
504         return AFPERR_PARAM;
505
506     if (!(fce_ev_enabled & (1 << FCE_FILE_CREATE)))
507         return ret;
508
509     ret = register_fce( path->u_name, false, FCE_FILE_CREATE );
510
511     return ret;
512 }
513
514 int fce_register_file_modification( struct ofork *ofork )
515 {
516     int ret = AFP_OK;
517
518     if (ofork == NULL)
519         return AFPERR_PARAM;
520
521     if (!(fce_ev_enabled & (1 << FCE_FILE_MODIFY)))
522         return ret;
523
524     ret = register_fce(of_name(ofork), false, FCE_FILE_MODIFY );
525     
526     return ret;    
527 }
528
529 int fce_register_tm_size(const char *vol, size_t used)
530 {
531     int ret = AFP_OK;
532
533     if (vol == NULL)
534         return AFPERR_PARAM;
535
536     if (!(fce_ev_enabled & (1 << FCE_TM_SIZE)))
537         return ret;
538
539     tm_used = used;             /* oh what a hack */
540     ret = register_fce(vol, false, FCE_TM_SIZE);
541
542     return ret;
543 }
544 #endif
545
546 /*
547  *
548  * Extern connect to afpd parameter, can be called multiple times for multiple listeners (up to MAX_UDP_SOCKS times)
549  *
550  * */
551 int fce_add_udp_socket(const char *target)
552 {
553         const char *port = FCE_DEFAULT_PORT_STRING;
554         char target_ip[256] = {""};
555
556         strncpy(target_ip, target, sizeof(target_ip) -1);
557
558         char *port_delim = strchr( target_ip, ':' );
559         if (port_delim) {
560                 *port_delim = 0;
561                 port = port_delim + 1;
562         }
563         return add_udp_socket(target_ip, port);
564 }
565
566 int fce_set_events(const char *events)
567 {
568     char *e;
569     char *p;
570     
571     if (events == NULL)
572         return AFPERR_PARAM;
573
574     e = strdup(events);
575
576     fce_ev_enabled = 0;
577
578     for (p = strtok(e, ", "); p; p = strtok(NULL, ", ")) {
579         if (strcmp(p, "fmod") == 0) {
580             fce_ev_enabled |= (1 << FCE_FILE_MODIFY);
581         } else if (strcmp(p, "fdel") == 0) {
582             fce_ev_enabled |= (1 << FCE_FILE_DELETE);
583         } else if (strcmp(p, "ddel") == 0) {
584             fce_ev_enabled |= (1 << FCE_DIR_DELETE);
585         } else if (strcmp(p, "fcre") == 0) {
586             fce_ev_enabled |= (1 << FCE_FILE_CREATE);
587         } else if (strcmp(p, "dcre") == 0) {
588             fce_ev_enabled |= (1 << FCE_DIR_CREATE);
589         } else if (strcmp(p, "tmsz") == 0) {
590             fce_ev_enabled |= (1 << FCE_TM_SIZE);
591         }
592     }
593
594     free(e);
595
596     return AFP_OK;
597 }
598
599 #ifdef FCE_TEST_MAIN
600
601
602 void shortsleep( unsigned int us )
603 {    
604     usleep( us );
605 }
606 int main( int argc, char*argv[] )
607 {
608     int c,ret;
609
610     char *port = FCE_DEFAULT_PORT_STRING;
611     char *host = "localhost";
612     int delay_between_events = 1000;
613     int event_code = FCE_FILE_MODIFY;
614     char pathbuff[1024];
615     int duration_in_seconds = 0; // TILL ETERNITY
616     char target[256];
617     char *path = getcwd( pathbuff, sizeof(pathbuff) );
618
619     // FULLSPEED TEST IS "-s 1001" -> delay is 0 -> send packets without pause
620
621     while ((c = getopt(argc, argv, "d:e:h:p:P:s:")) != -1) {
622         switch(c) {
623         case '?':
624             fprintf(stdout, "%s: [ -p Port -h Listener1 [ -h Listener2 ...] -P path -s Delay_between_events_in_us -e event_code -d Duration ]\n", argv[0]);
625             exit(1);
626             break;
627         case 'd':
628             duration_in_seconds = atoi(optarg);
629             break;
630         case 'e':
631             event_code = atoi(optarg);
632             break;
633         case 'h':
634             host = strdup(optarg);
635             break;
636         case 'p':
637             port = strdup(optarg);
638             break;
639         case 'P':
640             path = strdup(optarg);
641             break;
642         case 's':
643             delay_between_events = atoi(optarg);
644             break;
645         }
646     }
647
648     sprintf(target, "%s:%s", host, port);
649     if (fce_add_udp_socket(target) != 0)
650         return 1;
651
652     int ev_cnt = 0;
653     time_t start_time = time(NULL);
654     time_t end_time = 0;
655
656     if (duration_in_seconds)
657         end_time = start_time + duration_in_seconds;
658
659     while (1)
660     {
661         time_t now = time(NULL);
662         if (now > start_time)
663         {
664             start_time = now;
665             fprintf( stdout, "%d events/s\n", ev_cnt );
666             ev_cnt = 0;
667         }
668         if (end_time && now >= end_time)
669             break;
670
671         register_fce( path, 0, event_code );
672         ev_cnt++;
673
674         
675         shortsleep( delay_between_events );
676     }
677 }
678 #endif /* TESTMAIN*/