]> arthur.barton.de Git - bup.git/blobdiff - lib/bup/_helpers.c
Wrap readline oursleves to avoid py3's interference
[bup.git] / lib / bup / _helpers.c
index 946ff4ad63763ba61847d44e5092502070a1eb05..d954f1d3266766d353ca9694aa6865e8225164e5 100644 (file)
 #include <time.h>
 #endif
 
+#ifdef BUP_HAVE_READLINE
+#if ! defined(_XOPEN_SOURCE) || _XOPEN_SOURCE < BUP_RL_EXPECTED_XOPEN_SOURCE
+# warning "_XOPEN_SOURCE version is too low for readline"
+#endif
+#include <readline/readline.h>
+#include <readline/history.h>
+#endif
+
 #include "bupsplit.h"
 
 #if defined(FS_IOC_GETFLAGS) && defined(FS_IOC_SETFLAGS)
@@ -1740,6 +1748,7 @@ static PyObject *tuple_from_cstrs(char **cstrs)
     return result;
 }
 
+
 static long getpw_buf_size;
 
 static PyObject *pwd_struct_to_py(const struct passwd *pwd, int rc)
@@ -1876,6 +1885,217 @@ static PyObject *bup_gethostname(PyObject *mod, PyObject *ignore)
     return PyBytes_FromString(buf);
 }
 
+
+#ifdef BUP_HAVE_READLINE
+
+static char *cstr_from_bytes(PyObject *bytes)
+{
+    char *buf;
+    Py_ssize_t length;
+    int rc = PyBytes_AsStringAndSize(bytes, &buf, &length);
+    if (rc == -1)
+        return NULL;
+    char *result = checked_malloc(length, sizeof(char));
+    if (!result)
+        return NULL;
+    memcpy(result, buf, length);
+    return result;
+}
+
+static char **cstrs_from_seq(PyObject *seq)
+{
+    char **result = NULL;
+    seq = PySequence_Fast(seq, "Cannot convert sequence items to C strings");
+    if (!seq)
+        return NULL;
+
+    const Py_ssize_t len = PySequence_Fast_GET_SIZE(seq);
+    if (len > PY_SSIZE_T_MAX - 1) {
+        PyErr_Format(PyExc_OverflowError,
+                     "Sequence length %zd too large for conversion to C array",
+                     len);
+        goto finish;
+    }
+    result = checked_malloc(len + 1, sizeof(char *));
+    if (!result)
+        goto finish;
+    Py_ssize_t i = 0;
+    for (i = 0; i < len; i++)
+    {
+        PyObject *item = PySequence_Fast_GET_ITEM(seq, i);
+        if (!item)
+            goto abandon_result;
+        result[i] = cstr_from_bytes(item);
+        if (!result[i]) {
+            i--;
+            goto abandon_result;
+        }
+    }
+    result[len] = NULL;
+    goto finish;
+
+ abandon_result:
+    if (result) {
+        for (; i > 0; i--)
+            free(result[i]);
+        free(result);
+        result = NULL;
+    }
+ finish:
+    Py_DECREF(seq);
+    return result;
+}
+
+static char* our_word_break_chars = NULL;
+
+static PyObject *
+bup_set_completer_word_break_characters(PyObject *self, PyObject *args)
+{
+    char *bytes;
+    if (!PyArg_ParseTuple(args, cstr_argf, &bytes))
+       return NULL;
+    char *prev = our_word_break_chars;
+    char *next = strdup(bytes);
+    if (!next)
+        return PyErr_NoMemory();
+    our_word_break_chars = next;
+    rl_completer_word_break_characters = next;
+    if (prev)
+        free(prev);
+    Py_RETURN_NONE;
+}
+
+static PyObject *
+bup_get_completer_word_break_characters(PyObject *self, PyObject *args)
+{
+    if (!PyArg_ParseTuple(args, ""))
+       return NULL;
+    return PyBytes_FromString(rl_completer_word_break_characters);
+}
+
+static PyObject *bup_get_line_buffer(PyObject *self, PyObject *args)
+{
+    if (!PyArg_ParseTuple(args, ""))
+       return NULL;
+    return PyBytes_FromString(rl_line_buffer);
+}
+
+static PyObject *
+bup_parse_and_bind(PyObject *self, PyObject *args)
+{
+    char *bytes;
+    if (!PyArg_ParseTuple(args, cstr_argf ":parse_and_bind", &bytes))
+       return NULL;
+    char *tmp = strdup(bytes); // Because it may modify the arg
+    if (!tmp)
+        return PyErr_NoMemory();
+    int rc = rl_parse_and_bind(tmp);
+    free(tmp);
+    if (rc != 0)
+        return PyErr_Format(PyExc_OSError,
+                            "system rl_parse_and_bind failed (%d)", rc);
+    Py_RETURN_NONE;
+}
+
+
+static PyObject *py_on_attempted_completion;
+static char **prev_completions;
+
+static char **on_attempted_completion(const char *text, int start, int end)
+{
+    if (!py_on_attempted_completion)
+        return NULL;
+
+    char **result = NULL;
+    PyObject *py_result = PyObject_CallFunction(py_on_attempted_completion,
+                                                cstr_argf "ii",
+                                                text, start, end);
+    if (!py_result)
+        return NULL;
+    if (py_result != Py_None) {
+        result = cstrs_from_seq(py_result);
+        free(prev_completions);
+        prev_completions = result;
+    }
+    Py_DECREF(py_result);
+    return result;
+}
+
+static PyObject *
+bup_set_attempted_completion_function(PyObject *self, PyObject *args)
+{
+    PyObject *completer;
+    if (!PyArg_ParseTuple(args, "O", &completer))
+       return NULL;
+
+    PyObject *prev = py_on_attempted_completion;
+    if (completer == Py_None)
+    {
+        py_on_attempted_completion = NULL;
+        rl_attempted_completion_function = NULL;
+    } else {
+        py_on_attempted_completion = completer;
+        rl_attempted_completion_function = on_attempted_completion;
+        Py_INCREF(completer);
+    }
+    Py_XDECREF(prev);
+    Py_RETURN_NONE;
+}
+
+
+static PyObject *py_on_completion_entry;
+
+static char *on_completion_entry(const char *text, int state)
+{
+    if (!py_on_completion_entry)
+        return NULL;
+
+    PyObject *py_result = PyObject_CallFunction(py_on_completion_entry,
+                                                cstr_argf "i", text, state);
+    if (!py_result)
+        return NULL;
+    char *result = (py_result == Py_None) ? NULL : cstr_from_bytes(py_result);
+    Py_DECREF(py_result);
+    return result;
+}
+
+static PyObject *
+bup_set_completion_entry_function(PyObject *self, PyObject *args)
+{
+    PyObject *completer;
+    if (!PyArg_ParseTuple(args, "O", &completer))
+       return NULL;
+
+    PyObject *prev = py_on_completion_entry;
+    if (completer == Py_None) {
+        py_on_completion_entry = NULL;
+        rl_completion_entry_function = NULL;
+    } else {
+        py_on_completion_entry = completer;
+        rl_completion_entry_function = on_completion_entry;
+        Py_INCREF(completer);
+    }
+    Py_XDECREF(prev);
+    Py_RETURN_NONE;
+}
+
+static PyObject *
+bup_readline(PyObject *self, PyObject *args)
+{
+    char *prompt;
+    if (!PyArg_ParseTuple(args, cstr_argf, &prompt))
+       return NULL;
+    char *line = readline(prompt);
+    if (!line)
+        return PyErr_Format(PyExc_EOFError, "readline EOF");
+    PyObject *result = PyBytes_FromString(line);
+    free(line);
+    return result;
+}
+
+#endif // defined BUP_HAVE_READLINE
+
+
 static PyMethodDef helper_methods[] = {
     { "write_sparsely", bup_write_sparsely, METH_VARARGS,
       "Write buf excepting zeros at the end. Return trailing zero count." },
@@ -1965,6 +2185,22 @@ static PyMethodDef helper_methods[] = {
       " not exist." },
     { "gethostname", bup_gethostname, METH_NOARGS,
       "Return the current hostname (as bytes)" },
+#ifdef BUP_HAVE_READLINE
+    { "set_completion_entry_function", bup_set_completion_entry_function, METH_VARARGS,
+      "Set rl_completion_entry_function.  Called as f(text, state)." },
+    { "set_attempted_completion_function", bup_set_attempted_completion_function, METH_VARARGS,
+      "Set rl_attempted_completion_function.  Called as f(text, start, end)." },
+    { "parse_and_bind", bup_parse_and_bind, METH_VARARGS,
+      "Call rl_parse_and_bind." },
+    { "get_line_buffer", bup_get_line_buffer, METH_NOARGS,
+      "Return rl_line_buffer." },
+    { "get_completer_word_break_characters", bup_get_completer_word_break_characters, METH_NOARGS,
+      "Return rl_completer_word_break_characters." },
+    { "set_completer_word_break_characters", bup_set_completer_word_break_characters, METH_VARARGS,
+      "Set rl_completer_word_break_characters." },
+    { "readline", bup_readline, METH_VARARGS,
+      "Call readline(prompt)." },
+#endif // defined BUP_HAVE_READLINE
     { NULL, NULL, 0, NULL },  // sentinel
 };