]> arthur.barton.de Git - netatalk.git/blobdiff - etc/afpd/spotlight.c
Spotlight: enhance behaviour for long running queries
[netatalk.git] / etc / afpd / spotlight.c
index 7551bc7174f1ed2b655fb6d664e06ce7c5f6f02c..d2539b8358e25684e3d4c568ee8cce9df55c73ea 100644 (file)
 #include "directory.h"
 #include "etc/spotlight/sparql_parser.h"
 
+#include <glib.h>
+
 #define MAX_SL_RESULTS 20
 
+struct slq_state_names {
+    slq_state_t state;
+    const char *state_name;
+};
+
+static struct slq_state_names slq_state_names[] = {
+    {SLQ_STATE_NEW, "SLQ_STATE_NEW"},
+    {SLQ_STATE_RUNNING, "SLQ_STATE_RUNNING"},
+    {SLQ_STATE_RESULTS, "SLQ_STATE_RESULTS"},
+    {SLQ_STATE_FULL, "SLQ_STATE_FULL"},
+    {SLQ_STATE_DONE, "SLQ_STATE_DONE"},
+    {SLQ_STATE_CANCEL_PENDING, "SLQ_STATE_CANCEL_PENDING"},
+    {SLQ_STATE_CANCELLED, "SLQ_STATE_CANCELLED"},
+    {SLQ_STATE_ERROR, "SLQ_STATE_ERROR"}
+};
+
+
 static char *tracker_to_unix_path(TALLOC_CTX *mem_ctx, const char *uri);
 static int cnid_comp_fn(const void *p1, const void *p2);
 static bool create_result_handle(slq_t *slq);
@@ -383,7 +402,7 @@ static bool create_result_handle(slq_t *slq)
 static bool add_results(sl_array_t *array, slq_t *slq)
 {
     sl_filemeta_t *fm;
-    uint64_t status = 0;
+    uint64_t status;
 
     /* FileMeta */
     fm = talloc_zero(array, sl_filemeta_t);
@@ -391,6 +410,19 @@ static bool add_results(sl_array_t *array, slq_t *slq)
         return false;
     }
 
+    switch (slq->slq_state) {
+    case SLQ_STATE_RUNNING:
+        /*
+         * Wtf, why 35? Taken from an AFP capture.
+         */
+        status = 35;
+        break;
+
+    default:
+        status = 0;
+        break;
+    }
+
     dalloc_add_copy(array, &status, uint64_t);
     dalloc_add(array, slq->query_results->cnids, sl_cnids_t);
     if (slq->query_results->num_results > 0) {
@@ -486,7 +518,6 @@ static void slq_destroy(slq_t *slq)
  **/
 static void slq_cancel(slq_t *slq)
 {
-    g_cancellable_cancel(slq->cancellable);
     slq->slq_state = SLQ_STATE_CANCEL_PENDING;
     slq_remove(slq);
     slq_cancelled_add(slq);
@@ -500,9 +531,6 @@ static int slq_free_cb(slq_t *slq)
     if (slq->tracker_cursor) {
         g_object_unref(slq->tracker_cursor);
     }
-    if (slq->cancellable) {
-        g_object_unref(slq->cancellable);
-    }
     return 0;
 }
 
@@ -517,14 +545,38 @@ static void slq_cancelled_cleanup(void)
     list_for_each(p, &sl_cancelled_queries) {
         q = list_entry(p, slq_t, slq_list);
         if (q->slq_state == SLQ_STATE_CANCELLED) {
+            LOG(log_debug, logtype_sl,
+                "ctx1: %" PRIx64 ", ctx2: %" PRIx64 ": cancelled",
+                q->slq_ctx1, q->slq_ctx2);
             list_del(p);
             talloc_free(q);
+        } else {
+            LOG(log_debug, logtype_sl,
+                "ctx1: %" PRIx64 ", ctx2: %" PRIx64 ": %s",
+                q->slq_ctx1, q->slq_ctx2, slq_state_names[q->slq_state].state_name);
         }
     }
 
     return;
 }
 
+static void slq_dump(void)
+{
+    struct list_head *p;
+    slq_t *q = NULL;
+    int i = 0;
+
+    list_for_each(p, &sl_queries) {
+        q = list_entry(p, slq_t, slq_list);
+        LOG(log_debug, logtype_sl,
+            "query[%d]: ctx1: %" PRIx64 ", ctx2: %" PRIx64 ", state: %s",
+            i++, q->slq_ctx1, q->slq_ctx2,
+            slq_state_names[q->slq_state].state_name);
+    }
+
+    return;
+}
+
 /************************************************
  * Tracker async callbacks
  ************************************************/
@@ -564,15 +616,22 @@ static void tracker_cursor_cb(GObject      *object,
     bool ok;
     cnid_t did, id;
 
-    if (g_cancellable_is_cancelled(slq->cancellable)) {
-        slq->slq_state = SLQ_STATE_CANCELLED;
-        return;
-    }
+    LOG(log_debug, logtype_sl,
+        "cursor cb[%d]: ctx1: %" PRIx64 ", ctx2: %" PRIx64,
+        slq->query_results->num_results, slq->slq_ctx1, slq->slq_ctx2);
 
     more_results = tracker_sparql_cursor_next_finish(slq->tracker_cursor,
                                                      res,
                                                      &error);
 
+    if (slq->slq_state == SLQ_STATE_CANCEL_PENDING) {
+        LOG(log_debug, logtype_sl,
+            "cursor cb: ctx1: %" PRIx64 ", ctx2: %" PRIx64 ": cancelled",
+            slq->slq_ctx1, slq->slq_ctx2);
+        slq->slq_state = SLQ_STATE_CANCELLED;
+        return;
+    }
+
     if (error) {
         LOG(log_error, logtype_sl, "Tracker cursor: %s", error->message);
         g_error_free(error);
@@ -581,6 +640,7 @@ static void tracker_cursor_cb(GObject      *object,
     }
 
     if (!more_results) {
+        LOG(log_debug, logtype_sl, "tracker_cursor_cb: done");
         slq->slq_state = SLQ_STATE_DONE;
         return;
     }
@@ -593,9 +653,11 @@ static void tracker_cursor_cb(GObject      *object,
          * seen it.
          */
         LOG(log_debug, logtype_sl, "no URI for result");
-        goto exit;
+        return;
     }
 
+    LOG(log_debug, logtype_sl, "URI: %s", uri);
+
     path = tracker_to_unix_path(slq->query_results, uri);
     if (path == NULL) {
         LOG(log_error, logtype_sl, "error converting Tracker URI: %s", uri);
@@ -637,10 +699,22 @@ static void tracker_cursor_cb(GObject      *object,
 
 exit:
     if (slq->query_results->num_results < MAX_SL_RESULTS) {
+        LOG(log_debug, logtype_sl,
+            "cursor cb[%d]: ctx1: %" PRIx64 ", ctx2: %" PRIx64 ": requesting more results",
+            slq->query_results->num_results - 1, slq->slq_ctx1, slq->slq_ctx2);
+
+        slq->slq_state = SLQ_STATE_RESULTS;
+
         tracker_sparql_cursor_next_async(slq->tracker_cursor,
-                                         slq->cancellable,
+                                         slq->slq_obj->sl_ctx->cancellable,
                                          tracker_cursor_cb,
                                          slq);
+    } else {
+        LOG(log_debug, logtype_sl,
+            "cursor cb[%d]: ctx1: %" PRIx64 ", ctx2: %" PRIx64 ": full",
+            slq->query_results->num_results - 1, slq->slq_ctx1, slq->slq_ctx2);
+
+        slq->slq_state = SLQ_STATE_FULL;
     }
 }
 
@@ -648,20 +722,23 @@ static void tracker_query_cb(GObject      *object,
                              GAsyncResult *res,
                              gpointer      user_data)
 {
-    bool ok;
     GError *error = NULL;
     slq_t *slq = user_data;
 
-    if (g_cancellable_is_cancelled(slq->cancellable)) {
-        slq->slq_state = SLQ_STATE_CANCELLED;
-        return;
-    }
+    LOG(log_debug, logtype_sl,
+        "query cb: ctx1: %" PRIx64 ", ctx2: %" PRIx64,
+        slq->slq_ctx1, slq->slq_ctx2);
 
     slq->tracker_cursor = tracker_sparql_connection_query_finish(
         TRACKER_SPARQL_CONNECTION(object),
         res,
         &error);
 
+    if (slq->slq_state == SLQ_STATE_CANCEL_PENDING) {
+        slq->slq_state = SLQ_STATE_CANCELLED;
+        return;
+    }
+
     if (error) {
         slq->slq_state = SLQ_STATE_ERROR;
         LOG(log_error, logtype_sl, "Tracker query error: %s", error->message);
@@ -671,13 +748,6 @@ static void tracker_query_cb(GObject      *object,
 
     slq->slq_state = SLQ_STATE_RESULTS;
 
-    ok = create_result_handle(slq);
-    if (!ok) {
-        LOG(log_error, logtype_sl, "create_result_handle error");
-        slq->slq_state = SLQ_STATE_ERROR;
-        return;
-    }
-
     tracker_sparql_cursor_next_async(slq->tracker_cursor,
                                      slq->slq_obj->sl_ctx->cancellable,
                                      tracker_cursor_cb,
@@ -759,10 +829,12 @@ static int sl_rpc_openQuery(AFPObj *obj,
     uint64_t result;
     gchar *sparql_query;
     GError *error = NULL;
+    bool ok;
 
     array = talloc_zero(reply, sl_array_t);
 
     if (obj->sl_ctx->tracker_con == NULL) {
+        LOG(log_error, logtype_sl, "no tracker connection");
         EC_FAIL;
     }
 
@@ -773,7 +845,6 @@ static int sl_rpc_openQuery(AFPObj *obj,
     slq->slq_vol = v;
     slq->slq_allow_expr = obj->options.flags & OPTION_SPOTLIGHT_EXPR ? true : false;
     slq->slq_result_limit = obj->options.sparql_limit;
-    slq->cancellable = g_cancellable_new();
     talloc_set_destructor(slq, slq_free_cb);
 
     LOG(log_debug, logtype_sl, "Spotlight: expr: %s, limit: %" PRIu64,
@@ -831,7 +902,7 @@ static int sl_rpc_openQuery(AFPObj *obj,
 
     tracker_sparql_connection_query_async(obj->sl_ctx->tracker_con,
                                           sparql_query,
-                                          slq->cancellable,
+                                          slq->slq_obj->sl_ctx->cancellable,
                                           tracker_query_cb,
                                           slq);
     if (error) {
@@ -842,6 +913,14 @@ static int sl_rpc_openQuery(AFPObj *obj,
     }
 
     slq->slq_state = SLQ_STATE_RUNNING;
+
+    ok = create_result_handle(slq);
+    if (!ok) {
+        LOG(log_error, logtype_sl, "create_result_handle error");
+        slq->slq_state = SLQ_STATE_ERROR;
+        EC_FAIL;
+    }
+
     slq_add(slq);
 
 EC_CLEANUP:
@@ -895,16 +974,19 @@ static int sl_rpc_fetchQueryResultsForContext(const AFPObj *obj,
     switch (slq->slq_state) {
     case SLQ_STATE_RUNNING:
     case SLQ_STATE_RESULTS:
+    case SLQ_STATE_FULL:
     case SLQ_STATE_DONE:
         ok = add_results(array, slq);
         if (!ok) {
             LOG(log_error, logtype_sl, "error adding results");
             EC_FAIL;
         }
-        if (slq->slq_state == SLQ_STATE_RESULTS) {
+        if (slq->slq_state == SLQ_STATE_FULL) {
+            slq->slq_state = SLQ_STATE_RESULTS;
+
             tracker_sparql_cursor_next_async(
                 slq->tracker_cursor,
-                slq->cancellable,
+                slq->slq_obj->sl_ctx->cancellable,
                 tracker_cursor_cb,
                 slq);
         }
@@ -1160,13 +1242,18 @@ static int sl_rpc_closeQueryForContext(const AFPObj *obj,
     }
 
     switch (slq->slq_state) {
+    case SLQ_STATE_FULL:
     case SLQ_STATE_DONE:
     case SLQ_STATE_ERROR:
+        LOG(log_debug, logtype_sl, "close: destroying query: state %s",
+           slq_state_names[slq->slq_state].state_name);
         slq_destroy(slq);
         break;
 
     case SLQ_STATE_RUNNING:
     case SLQ_STATE_RESULTS:
+        LOG(log_debug, logtype_sl, "close: cancel query: state %s",
+           slq_state_names[slq->slq_state].state_name);
         slq_cancel(slq);
         break;
 
@@ -1214,6 +1301,9 @@ int spotlight_init(AFPObj *obj)
     /*
      * Tracker uses glibs event dispatching, so we need a mainloop
      */
+#if ((GLIB_MAJOR_VERSION <= 2) && (GLIB_MINOR_VERSION < 36))
+        g_type_init();
+#endif
     sl_ctx->mainloop = g_main_loop_new(NULL, false);
     sl_ctx->cancellable = g_cancellable_new();
 
@@ -1254,6 +1344,7 @@ int afp_spotlight_rpc(AFPObj *obj, char *ibuf, size_t ibuflen,
     }
 
     spotlight_init(obj);
+    slq_dump();
 
     /*
      * Process finished glib events