[PATCH libgpg-error 4/4] estream: Support 'es_poll' on Windows.

Justus Winter justus at g10code.com
Wed Oct 19 12:20:44 CEST 2016


* src/Makefile.am (arch_sources): Add new file.
* src/estream.c (O_NONBLOCK): Move to 'gpgrt-int.h'.
(BUFFER_BLOCK_SIZE): Likewise.
(BUFFER_UNREAD_SIZE): Likewise.
(struct notify_list_s, notify_list_t): Likewise.
(struct _gpgrt_stream_internal, estream_internal_t): Likewise.
(X_POLLABLE): New macro.
(parse_mode): Parse keyword 'pollable', emulate O_NONBLOCK using the
same mechanism on Windows.
(_gpgrt_poll): Use the new '_gpgrt_w32_poll' on Windows.
* src/gpgrt-int.h (_gpgrt_functions_w32_pollable): New declaration.
(_gpgrt_w32_pollable_create): New prototype.
(_gpgrt_w32_poll): Likewise.
* src/w32-estream.c: New file.  This code is adapted from GPGME.
* tests/t-poll.c (create_pipe): Create pollable streams.

GnuPG-bug-id: 2731
Signed-off-by: Justus Winter <justus at g10code.com>
---
 src/Makefile.am   |    3 +-
 src/estream.c     |  125 +++----
 src/gpgrt-int.h   |   74 ++++
 src/w32-estream.c | 1044 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 tests/t-poll.c    |    4 +-
 5 files changed, 1174 insertions(+), 76 deletions(-)
 create mode 100644 src/w32-estream.c

diff --git a/src/Makefile.am b/src/Makefile.am
index c1e86a7..0c18252 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -109,7 +109,8 @@ CLEANFILES = err-sources.h err-codes.h code-to-errno.h code-from-errno.h \
 # {{{ Begin Windows part
 #
 if HAVE_W32_SYSTEM
-arch_sources = w32-gettext.c w32-lock.c w32-lock-obj.h w32-thread.c w32-iconv.c
+arch_sources = w32-gettext.c w32-lock.c w32-lock-obj.h w32-thread.c \
+	       w32-iconv.c w32-estream.c
 RCCOMPILE = $(RC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) \
             -DLOCALEDIR=\"$(localedir)\" $(AM_CPPFLAGS) $(CPPFLAGS)
 LTRCCOMPILE = $(LIBTOOL) --mode=compile --tag=RC $(RCCOMPILE)
diff --git a/src/estream.c b/src/estream.c
index 7ef687a..5c64295 100644
--- a/src/estream.c
+++ b/src/estream.c
@@ -126,8 +126,6 @@
 # ifndef  S_IXOTH
 #  define S_IXOTH S_IXUSR
 # endif
-# undef  O_NONBLOCK
-# define O_NONBLOCK  0  /* FIXME: Not yet supported.  */
 #endif
 
 #if !defined (EWOULDBLOCK) && defined (HAVE_W32_SYSTEM)
@@ -173,66 +171,6 @@ typedef void (*func_free_t) (void *mem);
 
 
 /*
- * Buffer management layer.
- */
-
-#define BUFFER_BLOCK_SIZE  BUFSIZ
-#define BUFFER_UNREAD_SIZE 16
-
-
-/*
- * A type to hold notification functions.
- */
-struct notify_list_s
-{
-  struct notify_list_s *next;
-  void (*fnc) (estream_t, void*); /* The notification function.  */
-  void *fnc_value;                /* The value to be passed to FNC.  */
-};
-typedef struct notify_list_s *notify_list_t;
-
-
-/*
- * The private object describing a stream.
- */
-struct _gpgrt_stream_internal
-{
-  unsigned char buffer[BUFFER_BLOCK_SIZE];
-  unsigned char unread_buffer[BUFFER_UNREAD_SIZE];
-
-  gpgrt_lock_t lock;		 /* Lock.  Used by *_stream_lock(). */
-
-  gpgrt_stream_backend_kind_t kind;
-  void *cookie;			 /* Cookie.                */
-  void *opaque;			 /* Opaque data.           */
-  unsigned int modeflags;	 /* Flags for the backend. */
-  char *printable_fname;         /* Malloced filename for es_fname_get.  */
-  gpgrt_off_t offset;
-  gpgrt_cookie_read_function_t  func_read;
-  gpgrt_cookie_write_function_t func_write;
-  gpgrt_cookie_seek_function_t  func_seek;
-  gpgrt_cookie_close_function_t func_close;
-  cookie_ioctl_function_t func_ioctl;
-  int strategy;
-  es_syshd_t syshd;              /* A copy of the system handle.  */
-  struct
-  {
-    unsigned int err: 1;
-    unsigned int eof: 1;
-    unsigned int hup: 1;
-  } indicators;
-  unsigned int deallocate_buffer: 1;
-  unsigned int is_stdstream:1;   /* This is a standard stream.  */
-  unsigned int stdstream_fd:2;   /* 0, 1 or 2 for a standard stream.  */
-  unsigned int printable_fname_inuse: 1;  /* es_fname_get has been used.  */
-  unsigned int samethread: 1;    /* The "samethread" mode keyword.  */
-  size_t print_ntotal;           /* Bytes written from in print_writer. */
-  notify_list_t onclose;         /* On close notify function list.  */
-};
-typedef struct _gpgrt_stream_internal *estream_internal_t;
-
-
-/*
  * A linked list to hold active stream objects.
  * Protected by ESTREAM_LIST_LOCK.
  */
@@ -1670,6 +1608,7 @@ func_file_create (void **cookie, int *filedes,
 /* Flags used by parse_mode and friends.  */
 #define X_SAMETHREAD	(1 << 0)
 #define X_SYSOPEN	(1 << 1)
+#define X_POLLABLE	(1 << 2)
 
 /* Parse the mode flags of fopen et al.  In addition to the POSIX
  * defined mode flags keyword parameters are supported.  These are
@@ -1707,6 +1646,13 @@ func_file_create (void **cookie, int *filedes,
  *    under Windows the direct W32 API functions (HANDLE) are used
  *    instead of their libc counterparts (fd).
  *
+ * pollable
+ *
+ *    The object is opened in a way suitable for use with es_poll.  On
+ *    POSIX this is a NOP but under Windows we create up to two
+ *    threads, one for reading and one for writing, do any I/O there,
+ *    and synchronize with them in order to support es_poll.
+ *
  * Note: R_CMODE is optional because is only required by functions
  * which are able to creat a file.
  */
@@ -1812,6 +1758,10 @@ parse_mode (const char *modestr,
               return -1;
             }
           oflags |= O_NONBLOCK;
+#if HAVE_W32_SYSTEM
+          /* Currently, nonblock implies pollable on Windows.  */
+          *r_xmode |= X_POLLABLE;
+#endif
         }
       else if (!strncmp (modestr, "sysopen", 7))
         {
@@ -1823,6 +1773,16 @@ parse_mode (const char *modestr,
             }
           *r_xmode |= X_SYSOPEN;
         }
+      else if (!strncmp (modestr, "pollable", 8))
+        {
+          modestr += 8;
+          if (*modestr && !strchr (" \t,", *modestr))
+            {
+              _set_errno (EINVAL);
+              return -1;
+            }
+          *r_xmode |= X_POLLABLE;
+        }
     }
   if (!got_cmode)
     cmode = (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH);
@@ -2110,6 +2070,23 @@ es_create (estream_t *stream, void *cookie, es_syshd_t *syshd,
   stream_new->unread_buffer_size = sizeof (stream_internal_new->unread_buffer);
   stream_new->intern = stream_internal_new;
 
+#if _WIN32
+  if (xmode & X_POLLABLE)
+    {
+      void *new_cookie;
+
+      err = _gpgrt_w32_pollable_create (&new_cookie, modeflags,
+                                        functions, cookie);
+      if (err)
+        goto out;
+
+      modeflags &= ~O_NONBLOCK;
+      cookie = new_cookie;
+      kind = BACKEND_W32_POLLABLE;
+      functions = _gpgrt_functions_w32_pollable;
+    }
+#endif
+
   init_stream_obj (stream_new, cookie, syshd, kind, functions, modeflags,
                    xmode);
   init_stream_lock (stream_new);
@@ -4715,11 +4692,13 @@ _gpgrt_poll (gpgrt_poll_t *fds, unsigned int nfds, int timeout)
 {
   gpgrt_poll_t *item;
   int count = 0;
+#ifndef _WIN32
   fd_set readfds, writefds, exceptfds;
   int any_readfd, any_writefd, any_exceptfd;
-  int idx;
   int max_fd;
   int fd, ret, any;
+#endif
+  int idx;
 
   if (!fds)
     {
@@ -4767,6 +4746,15 @@ _gpgrt_poll (gpgrt_poll_t *fds, unsigned int nfds, int timeout)
     return count;  /* Early return without waiting.  */
 
   /* Now do the real select.  */
+#ifdef _WIN32
+  if (pre_syscall_func)
+    pre_syscall_func ();
+
+  count = _gpgrt_w32_poll (fds, nfds, timeout);
+
+  if (post_syscall_func)
+    post_syscall_func ();
+#else
   any_readfd = any_writefd = any_exceptfd = 0;
   max_fd = 0;
   for (item = fds, idx = 0; idx < nfds; item++, idx++)
@@ -4812,11 +4800,6 @@ _gpgrt_poll (gpgrt_poll_t *fds, unsigned int nfds, int timeout)
         }
     }
 
-#ifdef _WIN32
-  (void)timeout;
-  ret = -1;
-  _set_errno (EOPNOTSUPP);
-#else
   if (pre_syscall_func)
     pre_syscall_func ();
   do
@@ -4834,7 +4817,6 @@ _gpgrt_poll (gpgrt_poll_t *fds, unsigned int nfds, int timeout)
   while (ret == -1 && errno == EINTR);
   if (post_syscall_func)
     post_syscall_func ();
-#endif
 
   if (ret == -1)
     return -1;
@@ -4860,9 +4842,6 @@ _gpgrt_poll (gpgrt_poll_t *fds, unsigned int nfds, int timeout)
           item->got_hup = 1;
           any = 1;
         }
-#ifndef _WIN32
-      /* NB.: We can't use FD_ISSET under windows - but we don't have
-       * support for it anyway.  */
       if (item->want_read && FD_ISSET (fd, &readfds))
         {
           item->got_read = 1;
@@ -4878,11 +4857,11 @@ _gpgrt_poll (gpgrt_poll_t *fds, unsigned int nfds, int timeout)
           item->got_oob = 1;
           any = 1;
         }
-#endif /*!_WIN32*/
 
       if (any)
         count++;
     }
+#endif
 
   return count;
 }
diff --git a/src/gpgrt-int.h b/src/gpgrt-int.h
index 2b13350..57eb86a 100644
--- a/src/gpgrt-int.h
+++ b/src/gpgrt-int.h
@@ -52,6 +52,12 @@ gpg_err_code_t _gpgrt_yield (void);
 
 /* Local definitions for estream.  */
 
+#if HAVE_W32_SYSTEM
+# ifndef  O_NONBLOCK
+#  define O_NONBLOCK  0x40000000	/* FIXME: Is that safe?  */
+# endif
+#endif
+
 /*
  * A private cookie function to implement an internal IOCTL service.
  * and ist IOCTL numbers.
@@ -80,6 +86,65 @@ typedef enum
   } gpgrt_stream_backend_kind_t;
 
 
+/*
+ * A type to hold notification functions.
+ */
+struct notify_list_s
+{
+  struct notify_list_s *next;
+  void (*fnc) (estream_t, void*); /* The notification function.  */
+  void *fnc_value;                /* The value to be passed to FNC.  */
+};
+typedef struct notify_list_s *notify_list_t;
+
+
+/*
+ * Buffer management layer.
+ */
+
+#define BUFFER_BLOCK_SIZE  BUFSIZ
+#define BUFFER_UNREAD_SIZE 16
+
+
+/*
+ * The private object describing a stream.
+ */
+struct _gpgrt_stream_internal
+{
+  unsigned char buffer[BUFFER_BLOCK_SIZE];
+  unsigned char unread_buffer[BUFFER_UNREAD_SIZE];
+
+  gpgrt_lock_t lock;		 /* Lock.  Used by *_stream_lock(). */
+
+  gpgrt_stream_backend_kind_t kind;
+  void *cookie;			 /* Cookie.                */
+  void *opaque;			 /* Opaque data.           */
+  unsigned int modeflags;	 /* Flags for the backend. */
+  char *printable_fname;         /* Malloced filename for es_fname_get.  */
+  gpgrt_off_t offset;
+  gpgrt_cookie_read_function_t  func_read;
+  gpgrt_cookie_write_function_t func_write;
+  gpgrt_cookie_seek_function_t  func_seek;
+  gpgrt_cookie_close_function_t func_close;
+  cookie_ioctl_function_t func_ioctl;
+  int strategy;
+  es_syshd_t syshd;              /* A copy of the system handle.  */
+  struct
+  {
+    unsigned int err: 1;
+    unsigned int eof: 1;
+    unsigned int hup: 1;
+  } indicators;
+  unsigned int deallocate_buffer: 1;
+  unsigned int is_stdstream:1;   /* This is a standard stream.  */
+  unsigned int stdstream_fd:2;   /* 0, 1 or 2 for a standard stream.  */
+  unsigned int printable_fname_inuse: 1;  /* es_fname_get has been used.  */
+  unsigned int samethread: 1;    /* The "samethread" mode keyword.  */
+  size_t print_ntotal;           /* Bytes written from in print_writer. */
+  notify_list_t onclose;         /* On close notify function list.  */
+};
+typedef struct _gpgrt_stream_internal *estream_internal_t;
+
 
 /* Local prototypes for estream.  */
 int _gpgrt_es_init (void);
@@ -236,5 +301,14 @@ const char *_gpgrt_fname_get (gpgrt_stream_t stream);
 
 #include "estream-printf.h"
 
+#if _WIN32
+/* Prototypes for w32-estream.c.  */
+struct cookie_io_functions_s _gpgrt_functions_w32_pollable;
+int _gpgrt_w32_pollable_create (void *_GPGRT__RESTRICT *_GPGRT__RESTRICT cookie,
+                                unsigned int modeflags,
+                                struct cookie_io_functions_s next_functions,
+                                void *next_cookie);
+int _gpgrt_w32_poll (gpgrt_poll_t *fds, size_t nfds, int timeout);
+#endif
 
 #endif /*_GPGRT_GPGRT_INT_H*/
diff --git a/src/w32-estream.c b/src/w32-estream.c
new file mode 100644
index 0000000..643b581
--- /dev/null
+++ b/src/w32-estream.c
@@ -0,0 +1,1044 @@
+/* w32-estream.c - es_poll support on W32.
+   Copyright (C) 2000 Werner Koch (dd9jn)
+   Copyright (C) 2001, 2002, 2003, 2004, 2007, 2010, 2016 g10 Code GmbH
+
+   This file is part of libgpg-error.
+
+   libgpg-error is free software; you can redistribute it and/or
+   modify it under the terms of the GNU Lesser General Public License
+   as published by the Free Software Foundation; either version 2.1 of
+   the License, or (at your option) any later version.
+
+   libgpg-error is distributed in the hope that it will be useful, but
+   WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+   Lesser General Public License for more details.
+
+   You should have received a copy of the GNU Lesser General Public
+   License along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
+   02111-1307, USA.  */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include <errno.h>
+#include <fcntl.h>
+#ifdef HAVE_SYS_TIME_H
+# include <sys/time.h>
+#endif
+#ifdef HAVE_SYS_TYPES_H
+# include <sys/types.h>
+#endif
+#include <io.h>
+#include <windows.h>
+
+#include "gpgrt-int.h"
+
+/*
+ * In order to support es_poll on Windows, we create a proxy shim that
+ * we use as the estream I/O functions.  This shim creates reader and
+ * writer threads that use the original I/O functions.
+ */
+
+
+
+/* Tracing/debugging support.  */
+#if 0
+#define TRACE(msg, ...)                                         \
+  fprintf (stderr, msg, ## __VA_ARGS__)
+#define TRACE_CTX(ctx, msg, ...)                                \
+  fprintf (stderr, "%p: " msg "\n", ctx, ## __VA_ARGS__)
+#define TRACE_ERR(ctx, err, msg, ...)	do {                    \
+    char error_message[128];                                    \
+    FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM                    \
+                  | FORMAT_MESSAGE_IGNORE_INSERTS,              \
+                  NULL,                                         \
+                  err,                                          \
+                  MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),    \
+                  (LPTSTR) &error_message,                      \
+                  sizeof error_message, NULL );                 \
+    fprintf (stderr, "%p: " msg ": %s\n", ctx,                  \
+             ## __VA_ARGS__, error_message);                    \
+  } while (0)
+#else
+#define TRACE(msg, ...)			(void) 0
+#define TRACE_CTX(ctx, msg, ...)	(void) 0
+#define TRACE_ERR(ctx, err, msg, ...)	(void) 0
+#endif
+
+
+
+/* Calculate array dimension.  */
+#ifndef DIM
+#define DIM(array) (sizeof (array) / sizeof (*array))
+#endif
+
+#define READBUF_SIZE 4096
+#define WRITEBUF_SIZE 4096
+
+
+typedef struct estream_cookie_w32_pollable *estream_cookie_w32_pollable_t;
+
+struct reader_context_s
+{
+  estream_cookie_w32_pollable_t pcookie;
+  HANDLE thread_hd;
+
+  gpgrt_lock_t mutex;
+
+  int stop_me;
+  int eof;
+  int eof_shortcut;
+  int error;
+  int error_code;
+
+  /* This is manually reset.  */
+  HANDLE have_data_ev;
+  /* This is automatically reset.  */
+  HANDLE have_space_ev;
+  /* This is manually reset but actually only triggered once.  */
+  HANDLE close_ev;
+
+  size_t readpos, writepos;
+  char buffer[READBUF_SIZE];
+};
+
+struct writer_context_s
+{
+  estream_cookie_w32_pollable_t pcookie;
+  HANDLE thread_hd;
+
+  gpgrt_lock_t mutex;
+
+  int stop_me;
+  int error;
+  int error_code;
+
+  /* This is manually reset.  */
+  HANDLE have_data;
+  HANDLE is_empty;
+  HANDLE close_ev;
+  size_t nbytes;
+  char buffer[WRITEBUF_SIZE];
+};
+
+/* Cookie for pollable objects.  */
+struct estream_cookie_w32_pollable
+{
+  unsigned int modeflags;
+
+  struct cookie_io_functions_s next_functions;
+  void *next_cookie;
+
+  struct reader_context_s *reader;
+  struct writer_context_s *writer;
+};
+
+
+static HANDLE
+set_synchronize (HANDLE hd)
+{
+#ifdef HAVE_W32CE_SYSTEM
+  return hd;
+#else
+  HANDLE new_hd;
+
+  /* For NT we have to set the sync flag.  It seems that the only way
+     to do it is by duplicating the handle.  Tsss...  */
+  if (!DuplicateHandle (GetCurrentProcess (), hd,
+			GetCurrentProcess (), &new_hd,
+			EVENT_MODIFY_STATE | SYNCHRONIZE, FALSE, 0))
+    {
+      TRACE_ERR (NULL, GetLastError (), "DuplicateHandle failed");
+      /* FIXME: Should translate the error code.  */
+      _gpg_err_set_errno (EIO);
+      return INVALID_HANDLE_VALUE;
+    }
+
+  CloseHandle (hd);
+  return new_hd;
+#endif
+}
+
+
+static DWORD CALLBACK
+reader (void *arg)
+{
+  struct reader_context_s *ctx = arg;
+  int nbytes;
+  ssize_t nread;
+  TRACE_CTX (ctx, "reader starting");
+
+  for (;;)
+    {
+      _gpgrt_lock_lock (&ctx->mutex);
+      /* Leave a 1 byte gap so that we can see whether it is empty or
+	 full.  */
+      while ((ctx->writepos + 1) % READBUF_SIZE == ctx->readpos)
+	{
+	  /* Wait for space.  */
+	  if (!ResetEvent (ctx->have_space_ev))
+	    TRACE_ERR (ctx, GetLastError (), "ResetEvent failed");
+	  _gpgrt_lock_unlock (&ctx->mutex);
+	  TRACE_CTX (ctx, "waiting for space");
+	  WaitForSingleObject (ctx->have_space_ev, INFINITE);
+	  TRACE_CTX (ctx, "got space");
+	  _gpgrt_lock_lock (&ctx->mutex);
+        }
+      assert (((ctx->writepos + 1) % READBUF_SIZE != ctx->readpos));
+      if (ctx->stop_me)
+	{
+	  _gpgrt_lock_unlock (&ctx->mutex);
+	  break;
+        }
+      nbytes = (ctx->readpos + READBUF_SIZE
+		- ctx->writepos - 1) % READBUF_SIZE;
+      assert (nbytes);
+      if (nbytes > READBUF_SIZE - ctx->writepos)
+	nbytes = READBUF_SIZE - ctx->writepos;
+      _gpgrt_lock_unlock (&ctx->mutex);
+
+      TRACE_CTX (ctx, "reading up to %d bytes", nbytes);
+
+      nread = ctx->pcookie->next_functions.public.func_read
+        (ctx->pcookie->next_cookie, ctx->buffer + ctx->writepos, nbytes);
+      TRACE_CTX (ctx, "got %d bytes", nread);
+      if (nread < 0)
+        {
+          ctx->error_code = (int) errno;
+          /* NOTE (W32CE): Do not ignore ERROR_BUSY!  Check at
+             least stop_me if that happens.  */
+          if (ctx->error_code == ERROR_BROKEN_PIPE)
+            {
+              ctx->eof = 1;
+              TRACE_CTX (ctx, "got EOF (broken pipe)");
+            }
+          else
+            {
+              ctx->error = 1;
+              TRACE_ERR (ctx, ctx->error_code, "read error");
+            }
+          break;
+        }
+
+      _gpgrt_lock_lock (&ctx->mutex);
+      if (ctx->stop_me)
+	{
+	  _gpgrt_lock_unlock (&ctx->mutex);
+	  break;
+        }
+      if (!nread)
+	{
+	  ctx->eof = 1;
+	  TRACE_CTX (ctx, "got eof");
+	  _gpgrt_lock_unlock (&ctx->mutex);
+	  break;
+        }
+
+      ctx->writepos = (ctx->writepos + nread) % READBUF_SIZE;
+      if (!SetEvent (ctx->have_data_ev))
+	TRACE_ERR (ctx, GetLastError (), "SetEvent (%p) failed",
+                   ctx->have_data_ev);
+      _gpgrt_lock_unlock (&ctx->mutex);
+    }
+  /* Indicate that we have an error or EOF.  */
+  if (!SetEvent (ctx->have_data_ev))
+    TRACE_ERR (ctx, GetLastError (), "SetEvent (%p) failed",
+               ctx->have_data_ev);
+
+  TRACE_CTX (ctx, "waiting for close");
+  WaitForSingleObject (ctx->close_ev, INFINITE);
+
+  CloseHandle (ctx->close_ev);
+  CloseHandle (ctx->have_data_ev);
+  CloseHandle (ctx->have_space_ev);
+  CloseHandle (ctx->thread_hd);
+  _gpgrt_lock_destroy (&ctx->mutex);
+  _gpgrt_free (ctx);
+
+  return 0;
+}
+
+
+static struct reader_context_s *
+create_reader (estream_cookie_w32_pollable_t pcookie)
+{
+  struct reader_context_s *ctx;
+  SECURITY_ATTRIBUTES sec_attr;
+  DWORD tid;
+
+  memset (&sec_attr, 0, sizeof sec_attr);
+  sec_attr.nLength = sizeof sec_attr;
+  sec_attr.bInheritHandle = FALSE;
+
+  ctx = calloc (1, sizeof *ctx);
+  if (!ctx)
+    {
+      return NULL;
+    }
+
+  ctx->pcookie = pcookie;
+
+  ctx->have_data_ev = CreateEvent (&sec_attr, TRUE, FALSE, NULL);
+  if (ctx->have_data_ev)
+    ctx->have_space_ev = CreateEvent (&sec_attr, FALSE, TRUE, NULL);
+  if (ctx->have_space_ev)
+    ctx->close_ev = CreateEvent (&sec_attr, TRUE, FALSE, NULL);
+  if (!ctx->have_data_ev || !ctx->have_space_ev || !ctx->close_ev)
+    {
+      TRACE_ERR (ctx, GetLastError (), "CreateEvent failed");
+      if (ctx->have_data_ev)
+	CloseHandle (ctx->have_data_ev);
+      if (ctx->have_space_ev)
+	CloseHandle (ctx->have_space_ev);
+      if (ctx->close_ev)
+	CloseHandle (ctx->close_ev);
+      _gpgrt_free (ctx);
+      return NULL;
+    }
+
+  ctx->have_data_ev = set_synchronize (ctx->have_data_ev);
+  _gpgrt_lock_init (&ctx->mutex);
+
+#ifdef HAVE_W32CE_SYSTEM
+  ctx->thread_hd = CreateThread (&sec_attr, 64 * 1024, reader, ctx,
+				 STACK_SIZE_PARAM_IS_A_RESERVATION, &tid);
+#else
+  ctx->thread_hd = CreateThread (&sec_attr, 0, reader, ctx, 0, &tid);
+#endif
+
+  if (!ctx->thread_hd)
+    {
+      TRACE_ERR (ctx, GetLastError (), "CreateThread failed");
+      _gpgrt_lock_destroy (&ctx->mutex);
+      if (ctx->have_data_ev)
+	CloseHandle (ctx->have_data_ev);
+      if (ctx->have_space_ev)
+	CloseHandle (ctx->have_space_ev);
+      if (ctx->close_ev)
+	CloseHandle (ctx->close_ev);
+      _gpgrt_free (ctx);
+      return NULL;
+    }
+  else
+    {
+#if 0
+      /* We set the priority of the thread higher because we know that
+         it only runs for a short time.  This greatly helps to
+         increase the performance of the I/O.  */
+      SetThreadPriority (ctx->thread_hd, get_desired_thread_priority ());
+#endif
+    }
+
+  return ctx;
+}
+
+
+/* Prepare destruction of the reader thread for CTX.  Returns 0 if a
+   call to this function is sufficient and destroy_reader_finish shall
+   not be called.  */
+static void
+destroy_reader (struct reader_context_s *ctx)
+{
+  _gpgrt_lock_lock (&ctx->mutex);
+  ctx->stop_me = 1;
+  if (ctx->have_space_ev)
+    SetEvent (ctx->have_space_ev);
+  _gpgrt_lock_unlock (&ctx->mutex);
+
+#ifdef HAVE_W32CE_SYSTEM
+  /* Scenario: We never create a full pipe, but already started
+     reading.  Then we need to unblock the reader in the pipe driver
+     to make our reader thread notice that we want it to go away.  */
+
+  if (ctx->file_hd != INVALID_HANDLE_VALUE)
+    {
+      if (!DeviceIoControl (ctx->file_hd, GPGCEDEV_IOCTL_UNBLOCK,
+			NULL, 0, NULL, 0, NULL, NULL))
+	{
+	  TRACE_ERR (ctx, GetLastError (), "unblock control call failed");
+	}
+    }
+#endif
+
+  /* XXX is it feasible to unblock the thread?  */
+
+  /* After setting this event CTX is void. */
+  SetEvent (ctx->close_ev);
+}
+
+
+/*
+ * Read function for pollable objects.
+ */
+static gpgrt_ssize_t
+func_w32_pollable_read (void *cookie, void *buffer, size_t count)
+{
+  estream_cookie_w32_pollable_t pcookie = cookie;
+  gpgrt_ssize_t nread;
+  struct reader_context_s *ctx;
+
+  ctx = pcookie->reader;
+  if (ctx == NULL)
+    {
+      pcookie->reader = ctx = create_reader (pcookie);
+      if (!ctx)
+        {
+          _gpg_err_set_errno (EBADF);
+          return -1;
+        }
+    }
+
+  TRACE_CTX (ctx, "pollable read buffer=%p, count=%u", buffer, count);
+
+  if (ctx->eof_shortcut)
+    return 0;
+
+  _gpgrt_lock_lock (&ctx->mutex);
+  TRACE_CTX (ctx, "readpos: %d, writepos %d", ctx->readpos, ctx->writepos);
+  if (ctx->readpos == ctx->writepos && !ctx->error)
+    {
+      /* No data available.  */
+      int eof = ctx->eof;
+      _gpgrt_lock_unlock (&ctx->mutex);
+
+      if (pcookie->modeflags & O_NONBLOCK && ! eof)
+        {
+          _gpg_err_set_errno (EAGAIN);
+          return -1;
+        }
+
+      TRACE_CTX (ctx, "waiting for data");
+      WaitForSingleObject (ctx->have_data_ev, INFINITE);
+      TRACE_CTX (ctx, "data available");
+      _gpgrt_lock_lock (&ctx->mutex);
+    }
+
+  if (ctx->readpos == ctx->writepos || ctx->error)
+    {
+      _gpgrt_lock_unlock (&ctx->mutex);
+      ctx->eof_shortcut = 1;
+      if (ctx->eof)
+	return 0;
+      if (!ctx->error)
+	{
+	  TRACE_CTX (ctx, "EOF but ctx->eof flag not set");
+	  return 0;
+	}
+      _gpg_err_set_errno (ctx->error_code);
+      return -1;
+    }
+
+  nread = ctx->readpos < ctx->writepos
+    ? ctx->writepos - ctx->readpos
+    : READBUF_SIZE - ctx->readpos;
+  if (nread > count)
+    nread = count;
+  memcpy (buffer, ctx->buffer + ctx->readpos, nread);
+  ctx->readpos = (ctx->readpos + nread) % READBUF_SIZE;
+  if (ctx->readpos == ctx->writepos && !ctx->eof)
+    {
+      if (!ResetEvent (ctx->have_data_ev))
+	{
+	  TRACE_ERR (ctx, GetLastError (), "ResetEvent failed");
+	  _gpgrt_lock_unlock (&ctx->mutex);
+	  /* FIXME: Should translate the error code.  */
+	  _gpg_err_set_errno (EIO);
+	  return -1;
+	}
+    }
+  if (!SetEvent (ctx->have_space_ev))
+    {
+      TRACE_ERR (ctx, GetLastError (), "SetEvent (%p) failed",
+                 ctx->have_space_ev);
+      _gpgrt_lock_unlock (&ctx->mutex);
+      /* FIXME: Should translate the error code.  */
+      _gpg_err_set_errno (EIO);
+      return -1;
+    }
+  _gpgrt_lock_unlock (&ctx->mutex);
+
+  return nread;
+}
+
+
+/* The writer does use a simple buffering strategy so that we are
+   informed about write errors as soon as possible (i. e. with the the
+   next call to the write function.  */
+static DWORD CALLBACK
+writer (void *arg)
+{
+  struct writer_context_s *ctx = arg;
+  ssize_t nwritten;
+
+  TRACE_CTX (ctx, "writer starting");
+
+  for (;;)
+    {
+      _gpgrt_lock_lock (&ctx->mutex);
+      if (ctx->stop_me && !ctx->nbytes)
+	{
+	  _gpgrt_lock_unlock (&ctx->mutex);
+	  break;
+        }
+      if (!ctx->nbytes)
+	{
+	  if (!SetEvent (ctx->is_empty))
+	    TRACE_ERR (ctx, GetLastError (), "SetEvent failed");
+	  if (!ResetEvent (ctx->have_data))
+	    TRACE_ERR (ctx, GetLastError (), "ResetEvent failed");
+	  _gpgrt_lock_unlock (&ctx->mutex);
+	  TRACE_CTX (ctx, "idle");
+	  WaitForSingleObject (ctx->have_data, INFINITE);
+	  TRACE_CTX (ctx, "got data to write");
+	  _gpgrt_lock_lock (&ctx->mutex);
+        }
+      if (ctx->stop_me && !ctx->nbytes)
+	{
+	  _gpgrt_lock_unlock (&ctx->mutex);
+	  break;
+        }
+      _gpgrt_lock_unlock (&ctx->mutex);
+
+      TRACE_CTX (ctx, "writing up to %d bytes", ctx->nbytes);
+
+      nwritten = ctx->pcookie->next_functions.public.func_write
+        (ctx->pcookie->next_cookie, ctx->buffer, ctx->nbytes);
+      TRACE_CTX (ctx, "wrote %d bytes", nwritten);
+      if (nwritten < 1)
+        {
+          /* XXX */
+          if (errno == ERROR_BUSY)
+            {
+              /* Probably stop_me is set now.  */
+              TRACE_CTX (ctx, "pipe busy (unblocked?)");
+              continue;
+            }
+
+          ctx->error_code = errno;
+          ctx->error = 1;
+          TRACE_ERR (ctx, ctx->error_code, "write error");
+          break;
+        }
+
+      _gpgrt_lock_lock (&ctx->mutex);
+      ctx->nbytes -= nwritten;
+      _gpgrt_lock_unlock (&ctx->mutex);
+    }
+  /* Indicate that we have an error.  */
+  if (!SetEvent (ctx->is_empty))
+    TRACE_ERR (ctx, GetLastError (), "SetEvent failed");
+
+  TRACE_CTX (ctx, "waiting for close");
+  WaitForSingleObject (ctx->close_ev, INFINITE);
+
+  if (ctx->nbytes)
+    TRACE_CTX (ctx, "still %d bytes in buffer at close time", ctx->nbytes);
+
+  CloseHandle (ctx->close_ev);
+  CloseHandle (ctx->have_data);
+  CloseHandle (ctx->is_empty);
+  CloseHandle (ctx->thread_hd);
+  _gpgrt_lock_destroy (&ctx->mutex);
+  _gpgrt_free (ctx);
+
+  return 0;
+}
+
+
+static struct writer_context_s *
+create_writer (estream_cookie_w32_pollable_t pcookie)
+{
+  struct writer_context_s *ctx;
+  SECURITY_ATTRIBUTES sec_attr;
+  DWORD tid;
+
+  memset (&sec_attr, 0, sizeof sec_attr);
+  sec_attr.nLength = sizeof sec_attr;
+  sec_attr.bInheritHandle = FALSE;
+
+  ctx = calloc (1, sizeof *ctx);
+  if (!ctx)
+    {
+      return NULL;
+    }
+
+  ctx->pcookie = pcookie;
+
+  ctx->have_data = CreateEvent (&sec_attr, TRUE, FALSE, NULL);
+  if (ctx->have_data)
+    ctx->is_empty  = CreateEvent (&sec_attr, TRUE, TRUE, NULL);
+  if (ctx->is_empty)
+    ctx->close_ev = CreateEvent (&sec_attr, TRUE, FALSE, NULL);
+  if (!ctx->have_data || !ctx->is_empty || !ctx->close_ev)
+    {
+      TRACE_ERR (ctx, GetLastError (), "CreateEvent failed");
+      if (ctx->have_data)
+	CloseHandle (ctx->have_data);
+      if (ctx->is_empty)
+	CloseHandle (ctx->is_empty);
+      if (ctx->close_ev)
+	CloseHandle (ctx->close_ev);
+      _gpgrt_free (ctx);
+      return NULL;
+    }
+
+  ctx->is_empty = set_synchronize (ctx->is_empty);
+  _gpgrt_lock_init (&ctx->mutex);
+
+#ifdef HAVE_W32CE_SYSTEM
+  ctx->thread_hd = CreateThread (&sec_attr, 64 * 1024, writer, ctx,
+				 STACK_SIZE_PARAM_IS_A_RESERVATION, &tid);
+#else
+  ctx->thread_hd = CreateThread (&sec_attr, 0, writer, ctx, 0, &tid );
+#endif
+
+  if (!ctx->thread_hd)
+    {
+      TRACE_ERR (ctx, GetLastError (), "CreateThread failed");
+      _gpgrt_lock_destroy (&ctx->mutex);
+      if (ctx->have_data)
+	CloseHandle (ctx->have_data);
+      if (ctx->is_empty)
+	CloseHandle (ctx->is_empty);
+      if (ctx->close_ev)
+	CloseHandle (ctx->close_ev);
+      _gpgrt_free (ctx);
+      return NULL;
+    }
+  else
+    {
+#if 0
+      /* We set the priority of the thread higher because we know
+	 that it only runs for a short time.  This greatly helps to
+	 increase the performance of the I/O.  */
+      SetThreadPriority (ctx->thread_hd, get_desired_thread_priority ());
+#endif
+    }
+
+  return ctx;
+}
+
+
+static void
+destroy_writer (struct writer_context_s *ctx)
+{
+  _gpgrt_lock_lock (&ctx->mutex);
+  ctx->stop_me = 1;
+  if (ctx->have_data)
+    SetEvent (ctx->have_data);
+  _gpgrt_lock_unlock (&ctx->mutex);
+
+  /* Give the writer a chance to flush the buffer.  */
+  WaitForSingleObject (ctx->is_empty, INFINITE);
+
+#ifdef HAVE_W32CE_SYSTEM
+  /* Scenario: We never create a full pipe, but already started
+     writing more than the pipe buffer.  Then we need to unblock the
+     writer in the pipe driver to make our writer thread notice that
+     we want it to go away.  */
+
+  if (!DeviceIoControl (ctx->file_hd, GPGCEDEV_IOCTL_UNBLOCK,
+			NULL, 0, NULL, 0, NULL, NULL))
+    {
+      TRACE_ERR (ctx, GetLastError (), "unblock control call failed");
+    }
+#endif
+
+  /* After setting this event CTX is void.  */
+  SetEvent (ctx->close_ev);
+}
+
+
+/*
+ * Write function for pollable objects.
+ */
+static gpgrt_ssize_t
+func_w32_pollable_write (void *cookie, const void *buffer, size_t count)
+{
+  estream_cookie_w32_pollable_t pcookie = cookie;
+  struct writer_context_s *ctx;
+
+  if (count == 0)
+    return 0;
+
+  ctx = pcookie->writer;
+  if (ctx == NULL)
+    {
+      pcookie->writer = ctx = create_writer (pcookie);
+      if (!ctx)
+        return -1;
+    }
+
+  _gpgrt_lock_lock (&ctx->mutex);
+  TRACE_CTX (ctx, "pollable write buffer: %p, count: %d, nbytes: %d",
+         buffer, count, ctx->nbytes);
+  if (!ctx->error && ctx->nbytes)
+    {
+      /* Bytes are pending for send.  */
+
+      /* Reset the is_empty event.  Better safe than sorry.  */
+      if (!ResetEvent (ctx->is_empty))
+	{
+	  TRACE_ERR (ctx, GetLastError (), "ResetEvent failed");
+	  _gpgrt_lock_unlock (&ctx->mutex);
+	  /* FIXME: Should translate the error code.  */
+	  _gpg_err_set_errno (EIO);
+	  return -1;
+	}
+      _gpgrt_lock_unlock (&ctx->mutex);
+
+      if (pcookie->modeflags & O_NONBLOCK)
+        {
+          TRACE_CTX (ctx, "would block");
+          _gpg_err_set_errno (EAGAIN);
+          return -1;
+        }
+
+      TRACE_CTX (ctx, "waiting for empty buffer");
+      WaitForSingleObject (ctx->is_empty, INFINITE);
+      TRACE_CTX (ctx, "buffer is empty");
+      _gpgrt_lock_lock (&ctx->mutex);
+    }
+
+  if (ctx->error)
+    {
+      _gpgrt_lock_unlock (&ctx->mutex);
+      if (ctx->error_code == ERROR_NO_DATA)
+        _gpg_err_set_errno (EPIPE);
+      else
+        _gpg_err_set_errno (EIO);
+      return -1;
+    }
+
+  /* If no error occurred, the number of bytes in the buffer must be
+     zero.  */
+  assert (!ctx->nbytes);
+
+  if (count > WRITEBUF_SIZE)
+    count = WRITEBUF_SIZE;
+  memcpy (ctx->buffer, buffer, count);
+  ctx->nbytes = count;
+
+  /* We have to reset the is_empty event early, because it is also
+     used by the select() implementation to probe the channel.  */
+  if (!ResetEvent (ctx->is_empty))
+    {
+      TRACE_ERR (ctx, GetLastError (), "ResetEvent failed");
+      _gpgrt_lock_unlock (&ctx->mutex);
+      /* FIXME: Should translate the error code.  */
+      _gpg_err_set_errno (EIO);
+      return -1;
+    }
+  if (!SetEvent (ctx->have_data))
+    {
+      TRACE_ERR (ctx, GetLastError (), "SetEvent failed");
+      _gpgrt_lock_unlock (&ctx->mutex);
+      /* FIXME: Should translate the error code.  */
+      _gpg_err_set_errno (EIO);
+      return -1;
+    }
+  _gpgrt_lock_unlock (&ctx->mutex);
+
+  return (int) count;
+}
+
+
+int
+_gpgrt_w32_poll (gpgrt_poll_t *fds, size_t nfds, int timeout)
+{
+  HANDLE waitbuf[MAXIMUM_WAIT_OBJECTS];
+  int waitidx[MAXIMUM_WAIT_OBJECTS];
+  int code;
+  int nwait;
+  int i;
+  int any;
+  int count;
+
+#if 0
+ restart:
+#endif
+
+  TRACE ("poll on [ ");
+  any = 0;
+  nwait = 0;
+  count = 0;
+  for (i = 0; i < nfds; i++)
+    {
+      struct estream_cookie_w32_pollable *pcookie;
+
+      if (fds[i].ignore)
+	continue;
+
+      if (fds[i].stream->intern->kind != BACKEND_W32_POLLABLE)
+        {
+          /* This stream does not support polling.  */
+          fds[i].got_err = 1;
+          continue;
+        }
+
+      pcookie = fds[i].stream->intern->cookie;
+
+      if (fds[i].want_read || fds[i].want_write)
+	{
+          /* XXX: What if one wants read and write, is that supported?  */
+	  if (fds[i].want_read)
+	    {
+	      struct reader_context_s *ctx = pcookie->reader;
+              TRACE ("%d/read ", i);
+              if (ctx == NULL)
+                {
+                  pcookie->reader = ctx = create_reader (pcookie);
+                  if (!ctx)
+                    {
+                      /* FIXME:  Is the error code appropriate?  */
+                      _gpg_err_set_errno (EBADF);
+                      return -1;
+                    }
+                }
+
+              if (nwait >= DIM (waitbuf))
+                {
+                  TRACE ("oops ]: Too many objects for WFMO!\n");
+                  /* FIXME: Should translate the error code.  */
+                  _gpg_err_set_errno (EIO);
+                  return -1;
+                }
+              waitidx[nwait] = i;
+              waitbuf[nwait++] = ctx->have_data_ev;
+	      any = 1;
+            }
+	  else if (fds[i].want_write)
+	    {
+	      struct writer_context_s *ctx = pcookie->writer;
+              TRACE ("%d/write ", i);
+              if (ctx == NULL)
+                {
+                  pcookie->writer = ctx = create_writer (pcookie);
+                  if (!ctx)
+                    {
+                      /* FIXME:  Is the error code appropriate?  */
+                      _gpg_err_set_errno (EBADF);
+                      return -1;
+                    }
+                }
+
+              if (nwait >= DIM (waitbuf))
+                {
+                  TRACE ("oops ]: Too many objects for WFMO!");
+                  /* FIXME: Should translate the error code.  */
+                  _gpg_err_set_errno (EIO);
+                  return -1;
+                }
+              waitidx[nwait] = i;
+              waitbuf[nwait++] = ctx->is_empty;
+	      any = 1;
+            }
+        }
+    }
+  TRACE ("]\n");
+  if (!any)
+    return 0;
+
+  code = WaitForMultipleObjects (nwait, waitbuf, 0,
+                                 timeout == -1 ? INFINITE : timeout);
+  if (code >= WAIT_OBJECT_0 && code < WAIT_OBJECT_0 + nwait)
+    {
+      /* This WFMO is a really silly function: It does return either
+	 the index of the signaled object or if 2 objects have been
+	 signalled at the same time, the index of the object with the
+	 lowest object is returned - so and how do we find out how
+	 many objects have been signaled???.  The only solution I can
+	 imagine is to test each object starting with the returned
+	 index individually - how dull.  */
+      any = 0;
+      for (i = code - WAIT_OBJECT_0; i < nwait; i++)
+	{
+	  if (WaitForSingleObject (waitbuf[i], 0) == WAIT_OBJECT_0)
+	    {
+	      assert (waitidx[i] >=0 && waitidx[i] < nfds);
+              /* XXX: What if one wants read and write, is that
+                 supported?  */
+              if (fds[waitidx[i]].want_read)
+                fds[waitidx[i]].got_read = 1;
+              else if (fds[waitidx[i]].want_write)
+                fds[waitidx[i]].got_write = 1;
+	      any = 1;
+	      count++;
+	    }
+	}
+      if (!any)
+	{
+	  TRACE ("no signaled objects found after WFMO\n");
+	  count = -1;
+	}
+    }
+  else if (code == WAIT_TIMEOUT)
+    TRACE ("WFMO timed out\n");
+  else if (code == WAIT_FAILED)
+    {
+      TRACE_ERR (NULL, GetLastError (), "WFMO failed");
+#if 0
+      if (GetLastError () == ERROR_INVALID_HANDLE)
+	{
+	  int k;
+	  int j = handle_to_fd (waitbuf[i]);
+
+	  TRACE ("WFMO invalid handle %d removed\n", j);
+	  for (k = 0 ; k < nfds; k++)
+	    {
+	      if (fds[k].fd == j)
+		{
+		  fds[k].want_read = fds[k].want_write = 0;
+		  goto restart;
+                }
+            }
+	  TRACE (" oops, or not???\n");
+        }
+#endif
+      count = -1;
+    }
+  else
+    {
+      TRACE ("WFMO returned %d\n", code);
+      count = -1;
+    }
+
+  if (count > 0)
+    {
+      TRACE ("poll OK [ ");
+      for (i = 0; i < nfds; i++)
+	{
+	  if (fds[i].ignore)
+	    continue;
+	  if (fds[i].got_read || fds[i].got_write)
+	    TRACE ("%c%d ", fds[i].want_read ? 'r' : 'w', i);
+        }
+      TRACE ("]\n");
+    }
+
+  if (count < 0)
+    {
+      /* FIXME: Should determine a proper error code.  */
+      _gpg_err_set_errno (EIO);
+    }
+
+  return count;
+}
+
+
+
+/*
+ * Implementation of pollable I/O on Windows.
+ */
+
+/*
+ * Constructor for pollable objects.
+ */
+int
+_gpgrt_w32_pollable_create (void *_GPGRT__RESTRICT *_GPGRT__RESTRICT cookie,
+                            unsigned int modeflags,
+                            struct cookie_io_functions_s next_functions,
+                            void *next_cookie)
+{
+  estream_cookie_w32_pollable_t pcookie;
+  int err;
+
+  pcookie = _gpgrt_malloc (sizeof *pcookie);
+  if (!pcookie)
+    err = -1;
+  else
+    {
+      pcookie->modeflags = modeflags;
+      pcookie->next_functions = next_functions;
+      pcookie->next_cookie = next_cookie;
+      pcookie->reader = NULL;
+      pcookie->writer = NULL;
+      *cookie = pcookie;
+      err = 0;
+    }
+
+  return err;
+}
+
+
+/*
+ * Seek function for pollable objects.
+ */
+static int
+func_w32_pollable_seek (void *cookie, gpgrt_off_t *offset, int whence)
+{
+  estream_cookie_w32_pollable_t pcookie = cookie;
+  (void) pcookie;
+  (void) offset;
+  (void) whence;
+  /* XXX */
+  _gpg_err_set_errno (EOPNOTSUPP);
+  return -1;
+}
+
+
+/*
+ * The IOCTL function for pollable objects.
+ */
+static int
+func_w32_pollable_ioctl (void *cookie, int cmd, void *ptr, size_t *len)
+{
+  estream_cookie_w32_pollable_t pcookie = cookie;
+  cookie_ioctl_function_t func_ioctl = pcookie->next_functions.func_ioctl;
+
+  if (cmd == COOKIE_IOCTL_NONBLOCK)
+    {
+      if (ptr)
+        pcookie->modeflags |= O_NONBLOCK;
+      else
+        pcookie->modeflags &= ~O_NONBLOCK;
+      return 0;
+    }
+
+  if (func_ioctl)
+    return func_ioctl (pcookie->next_cookie, cmd, ptr, len);
+
+  _gpg_err_set_errno (EOPNOTSUPP);
+  return -1;
+}
+
+
+/*
+ * The destroy function for pollable objects.
+ */
+static int
+func_w32_pollable_destroy (void *cookie)
+{
+  estream_cookie_w32_pollable_t pcookie = cookie;
+
+  if (cookie)
+    {
+      if (pcookie->reader)
+        destroy_reader (pcookie->reader);
+      if (pcookie->writer)
+        destroy_writer (pcookie->writer);
+      pcookie->next_functions.public.func_close (pcookie->next_cookie);
+      _gpgrt_free (pcookie);
+    }
+  return 0;
+}
+
+/*
+ * Access object for the pollable functions.
+ */
+struct cookie_io_functions_s _gpgrt_functions_w32_pollable =
+  {
+    {
+      func_w32_pollable_read,
+      func_w32_pollable_write,
+      func_w32_pollable_seek,
+      func_w32_pollable_destroy,
+    },
+    func_w32_pollable_ioctl,
+  };
diff --git a/tests/t-poll.c b/tests/t-poll.c
index 026bb88..d39797a 100644
--- a/tests/t-poll.c
+++ b/tests/t-poll.c
@@ -191,14 +191,14 @@ create_pipe (estream_t *r_in, estream_t *r_out)
 
   show ("created pipe [%d, %d]\n", filedes[0], filedes[1]);
 
-  *r_in = es_fdopen (filedes[0], "r");
+  *r_in = es_fdopen (filedes[0], "r,pollable");
   if (!*r_in)
     {
       err = gpg_error_from_syserror ();
       die ("error creating a stream for a pipe: %s\n", gpg_strerror (err));
     }
 
-  *r_out = es_fdopen (filedes[1], "w");
+  *r_out = es_fdopen (filedes[1], "w,pollable");
   if (!*r_out)
     {
       err = gpg_error_from_syserror ();
-- 
2.9.3




More information about the Gnupg-devel mailing list