]> arthur.barton.de Git - netatalk.git/commitdiff
Spotlight: fix async query close
authorRalph Boehme <rb@sernet.de>
Fri, 8 Aug 2014 12:14:47 +0000 (14:14 +0200)
committerRalph Boehme <rb@sernet.de>
Fri, 8 Aug 2014 12:14:47 +0000 (14:14 +0200)
* use state flags for marking closed queries
* don't use GCancellables

Additionally add lots of debugging.

Signed-off-by: Ralph Boehme <rb@sernet.de>
etc/afpd/spotlight.c
include/atalk/spotlight.h

index 7551bc7174f1ed2b655fb6d664e06ce7c5f6f02c..ee948125a61c1dc301448ed6d5ff29773eeacba4 100644 (file)
 
 #define MAX_SL_RESULTS 20
 
+struct slq_state_names {
+    slq_state_t state;
+    const char *state_name;
+};
+
+static struct slq_state_names slq_state_names[] = {
+    {SLQ_STATE_NEW, "SLQ_STATE_NEW"},
+    {SLQ_STATE_RUNNING, "SLQ_STATE_RUNNING"},
+    {SLQ_STATE_RESULTS, "SLQ_STATE_RESULTS"},
+    {SLQ_STATE_FULL, "SLQ_STATE_FULL"},
+    {SLQ_STATE_DONE, "SLQ_STATE_DONE"},
+    {SLQ_STATE_CANCEL_PENDING, "SLQ_STATE_CANCEL_PENDING"},
+    {SLQ_STATE_CANCELLED, "SLQ_STATE_CANCELLED"},
+    {SLQ_STATE_ERROR, "SLQ_STATE_ERROR"}
+};
+
+
 static char *tracker_to_unix_path(TALLOC_CTX *mem_ctx, const char *uri);
 static int cnid_comp_fn(const void *p1, const void *p2);
 static bool create_result_handle(slq_t *slq);
@@ -486,7 +503,6 @@ static void slq_destroy(slq_t *slq)
  **/
 static void slq_cancel(slq_t *slq)
 {
-    g_cancellable_cancel(slq->cancellable);
     slq->slq_state = SLQ_STATE_CANCEL_PENDING;
     slq_remove(slq);
     slq_cancelled_add(slq);
@@ -500,9 +516,6 @@ static int slq_free_cb(slq_t *slq)
     if (slq->tracker_cursor) {
         g_object_unref(slq->tracker_cursor);
     }
-    if (slq->cancellable) {
-        g_object_unref(slq->cancellable);
-    }
     return 0;
 }
 
@@ -517,14 +530,38 @@ static void slq_cancelled_cleanup(void)
     list_for_each(p, &sl_cancelled_queries) {
         q = list_entry(p, slq_t, slq_list);
         if (q->slq_state == SLQ_STATE_CANCELLED) {
+            LOG(log_debug, logtype_sl,
+                "ctx1: %" PRIx64 ", ctx2: %" PRIx64 ": cancelled",
+                q->slq_ctx1, q->slq_ctx2);
             list_del(p);
             talloc_free(q);
+        } else {
+            LOG(log_debug, logtype_sl,
+                "ctx1: %" PRIx64 ", ctx2: %" PRIx64 ": %s",
+                q->slq_ctx1, q->slq_ctx2, slq_state_names[q->slq_state].state_name);
         }
     }
 
     return;
 }
 
+static void slq_dump(void)
+{
+    struct list_head *p;
+    slq_t *q = NULL;
+    int i = 0;
+
+    list_for_each(p, &sl_queries) {
+        q = list_entry(p, slq_t, slq_list);
+        LOG(log_debug, logtype_sl,
+            "query[%d]: ctx1: %" PRIx64 ", ctx2: %" PRIx64 ", state: %s",
+            i++, q->slq_ctx1, q->slq_ctx2,
+            slq_state_names[q->slq_state].state_name);
+    }
+
+    return;
+}
+
 /************************************************
  * Tracker async callbacks
  ************************************************/
@@ -564,15 +601,22 @@ static void tracker_cursor_cb(GObject      *object,
     bool ok;
     cnid_t did, id;
 
-    if (g_cancellable_is_cancelled(slq->cancellable)) {
-        slq->slq_state = SLQ_STATE_CANCELLED;
-        return;
-    }
+    LOG(log_debug, logtype_sl,
+        "cursor cb: ctx1: %" PRIx64 ", ctx2: %" PRIx64,
+        slq->slq_ctx1, slq->slq_ctx2);
 
     more_results = tracker_sparql_cursor_next_finish(slq->tracker_cursor,
                                                      res,
                                                      &error);
 
+    if (slq->slq_state == SLQ_STATE_CANCEL_PENDING) {
+        LOG(log_debug, logtype_sl,
+            "cursor cb: ctx1: %" PRIx64 ", ctx2: %" PRIx64 ": cancelled",
+            slq->slq_ctx1, slq->slq_ctx2);
+        slq->slq_state = SLQ_STATE_CANCELLED;
+        return;
+    }
+
     if (error) {
         LOG(log_error, logtype_sl, "Tracker cursor: %s", error->message);
         g_error_free(error);
@@ -581,6 +625,7 @@ static void tracker_cursor_cb(GObject      *object,
     }
 
     if (!more_results) {
+        LOG(log_debug, logtype_sl, "tracker_cursor_cb: done");
         slq->slq_state = SLQ_STATE_DONE;
         return;
     }
@@ -593,9 +638,11 @@ static void tracker_cursor_cb(GObject      *object,
          * seen it.
          */
         LOG(log_debug, logtype_sl, "no URI for result");
-        goto exit;
+        return;
     }
 
+    LOG(log_debug, logtype_sl, "URI: %s", uri);
+
     path = tracker_to_unix_path(slq->query_results, uri);
     if (path == NULL) {
         LOG(log_error, logtype_sl, "error converting Tracker URI: %s", uri);
@@ -637,10 +684,13 @@ static void tracker_cursor_cb(GObject      *object,
 
 exit:
     if (slq->query_results->num_results < MAX_SL_RESULTS) {
+        slq->slq_state = SLQ_STATE_RESULTS;
         tracker_sparql_cursor_next_async(slq->tracker_cursor,
-                                         slq->cancellable,
+                                         slq->slq_obj->sl_ctx->cancellable,
                                          tracker_cursor_cb,
                                          slq);
+    } else {
+        slq->slq_state = SLQ_STATE_FULL;
     }
 }
 
@@ -652,16 +702,20 @@ static void tracker_query_cb(GObject      *object,
     GError *error = NULL;
     slq_t *slq = user_data;
 
-    if (g_cancellable_is_cancelled(slq->cancellable)) {
-        slq->slq_state = SLQ_STATE_CANCELLED;
-        return;
-    }
+    LOG(log_debug, logtype_sl,
+        "query cb: ctx1: %" PRIx64 ", ctx2: %" PRIx64,
+        slq->slq_ctx1, slq->slq_ctx2);
 
     slq->tracker_cursor = tracker_sparql_connection_query_finish(
         TRACKER_SPARQL_CONNECTION(object),
         res,
         &error);
 
+    if (slq->slq_state == SLQ_STATE_CANCEL_PENDING) {
+        slq->slq_state = SLQ_STATE_CANCELLED;
+        return;
+    }
+
     if (error) {
         slq->slq_state = SLQ_STATE_ERROR;
         LOG(log_error, logtype_sl, "Tracker query error: %s", error->message);
@@ -763,6 +817,7 @@ static int sl_rpc_openQuery(AFPObj *obj,
     array = talloc_zero(reply, sl_array_t);
 
     if (obj->sl_ctx->tracker_con == NULL) {
+        LOG(log_error, logtype_sl, "no tracker connection");
         EC_FAIL;
     }
 
@@ -773,7 +828,6 @@ static int sl_rpc_openQuery(AFPObj *obj,
     slq->slq_vol = v;
     slq->slq_allow_expr = obj->options.flags & OPTION_SPOTLIGHT_EXPR ? true : false;
     slq->slq_result_limit = obj->options.sparql_limit;
-    slq->cancellable = g_cancellable_new();
     talloc_set_destructor(slq, slq_free_cb);
 
     LOG(log_debug, logtype_sl, "Spotlight: expr: %s, limit: %" PRIu64,
@@ -831,7 +885,7 @@ static int sl_rpc_openQuery(AFPObj *obj,
 
     tracker_sparql_connection_query_async(obj->sl_ctx->tracker_con,
                                           sparql_query,
-                                          slq->cancellable,
+                                          slq->slq_obj->sl_ctx->cancellable,
                                           tracker_query_cb,
                                           slq);
     if (error) {
@@ -895,16 +949,18 @@ static int sl_rpc_fetchQueryResultsForContext(const AFPObj *obj,
     switch (slq->slq_state) {
     case SLQ_STATE_RUNNING:
     case SLQ_STATE_RESULTS:
+    case SLQ_STATE_FULL:
     case SLQ_STATE_DONE:
         ok = add_results(array, slq);
         if (!ok) {
             LOG(log_error, logtype_sl, "error adding results");
             EC_FAIL;
         }
-        if (slq->slq_state == SLQ_STATE_RESULTS) {
+        if (slq->slq_state == SLQ_STATE_RESULTS
+            || slq->slq_state == SLQ_STATE_FULL) {
             tracker_sparql_cursor_next_async(
                 slq->tracker_cursor,
-                slq->cancellable,
+                slq->slq_obj->sl_ctx->cancellable,
                 tracker_cursor_cb,
                 slq);
         }
@@ -1160,13 +1216,18 @@ static int sl_rpc_closeQueryForContext(const AFPObj *obj,
     }
 
     switch (slq->slq_state) {
+    case SLQ_STATE_FULL:
     case SLQ_STATE_DONE:
     case SLQ_STATE_ERROR:
+        LOG(log_debug, logtype_sl, "close: destroying query: state %s",
+           slq_state_names[slq->slq_state].state_name);
         slq_destroy(slq);
         break;
 
     case SLQ_STATE_RUNNING:
     case SLQ_STATE_RESULTS:
+        LOG(log_debug, logtype_sl, "close: cancel query: state %s",
+           slq_state_names[slq->slq_state].state_name);
         slq_cancel(slq);
         break;
 
@@ -1254,6 +1315,7 @@ int afp_spotlight_rpc(AFPObj *obj, char *ibuf, size_t ibuflen,
     }
 
     spotlight_init(obj);
+    slq_dump();
 
     /*
      * Process finished glib events
index 691213801604343f540dac692ce54218c28ada73..395b9eadb434f2c93d98a27087f3b26980cf9ed5 100644 (file)
@@ -71,6 +71,7 @@ typedef enum {
        SLQ_STATE_NEW,            /* Query received from client           */
        SLQ_STATE_RUNNING,        /* Query dispatched to Tracker          */
        SLQ_STATE_RESULTS,        /* Async Tracker query read             */
+       SLQ_STATE_FULL,           /* result queue is full                 */
        SLQ_STATE_DONE,           /* Got all results from Tracker         */
     SLQ_STATE_CANCEL_PENDING, /* a cancel op for the query is pending */
     SLQ_STATE_CANCELLED,      /* the query has been cancelled         */
@@ -101,9 +102,6 @@ typedef struct _slq_t {
     bool              slq_allow_expr;     /* Whether to allow expressions     */
     uint64_t          slq_result_limit;   /* Whether to LIMIT SPARQL results  */
     struct sl_rslts  *query_results;      /* query results                    */
-#ifdef HAVE_TRACKER
-    GCancellable     *cancellable;
-#endif
 } slq_t;
 
 struct sl_ctx {