标签: parallel

  • Support parallel XZ decompression for unix (7zip LZMA SDK based, C/C++)

    为unix平台增加XZ多线程解压缩支持(基于7zip LZMA SDK, C/C++)

    Note

    This post has nothing to do with the pixz project. I am talking about decompressing the original xz archive using 7-zip’s LZMA SDK under unix environment.

    Background

    Originally the 7zip’s LZMA SDK (version 19.00) only covers parallel xz decompression for Windows systems. This post shows the C code that adds support for lib pthread, i.e. unix systems.
    Compiling with C++ using the C library should also work, I have tested it on my own box.

    Little Details

    Actually the original writer has completed all the necessary abstraction for the multi-threading pipeline. All I have done is adding some macros and pthread equivalents to Windows threading model.

    Usage

    Replace lzma/C/... with below files. The new code should work on both Windows and Unix systems.

    Git Repo

    See https://github.com/idailylife/lzma-xz-parallel

    Source Code

    Threads.h

    /* Threads.h -- multithreading library
    2017-06-18 : Igor Pavlov : Public domain */
    
    #ifndef __7Z_THREADS_H
    #define __7Z_THREADS_H
    
    #ifdef _WIN32
    #include <windows.h>
    #else
    #include <pthread.h>
    #endif
    
    #include "7zTypes.h"
    
    EXTERN_C_BEGIN
    
    WRes HandlePtr_Close(HANDLE *h);
    WRes Handle_WaitObject(HANDLE h);
    
    #ifdef _WIN32
    typedef HANDLE CThread;
    #define Thread_Construct(p) *(p) = NULL
    #define Thread_WasCreated(p) (*(p) != NULL)
    #define Thread_Close(p) HandlePtr_Close(p)
    #define Thread_Wait(p) Handle_WaitObject(*(p))
    #else
    typedef void* LPVOID;
    typedef pthread_t* CThread;
    #define Thread_Construct(p) *(p) = NULL
    #define Thread_WasCreated(p) (*(p) != NULL)
    #define Thread_Close(p) HandleThread_Close(*(p))
    #define Thread_Wait(p) HandleThread_Join(*(p))
    WRes HandleThread_Close(pthread_t* th);
    WRes HandleThread_Join(pthread_t* th);
    
    #endif
    
    
    typedef
    #ifdef UNDER_CE
      DWORD
    #else
      unsigned
    #endif
      THREAD_FUNC_RET_TYPE;
    
    #define THREAD_FUNC_CALL_TYPE MY_STD_CALL
    #define THREAD_FUNC_DECL THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE
    typedef THREAD_FUNC_RET_TYPE (THREAD_FUNC_CALL_TYPE * THREAD_FUNC_TYPE)(void *);
    WRes Thread_Create(CThread *p, THREAD_FUNC_TYPE func, LPVOID param);
    
    #ifdef _WIN32
    
    typedef HANDLE CEvent;
    typedef CEvent CAutoResetEvent;
    typedef CEvent CManualResetEvent;
    #define Event_Construct(p) *(p) = NULL
    #define Event_IsCreated(p) (*(p) != NULL)
    #define Event_Close(p) HandlePtr_Close(p)
    #define Event_Wait(p) Handle_WaitObject(*(p))
    WRes Event_Set(CEvent *p);
    WRes Event_Reset(CEvent *p);
    WRes ManualResetEvent_Create(CManualResetEvent *p, int signaled); // not used
    WRes ManualResetEvent_CreateNotSignaled(CManualResetEvent *p); // not used
    WRes AutoResetEvent_Create(CAutoResetEvent *p, int signaled);
    WRes AutoResetEvent_CreateNotSignaled(CAutoResetEvent *p);
    
    #else
    typedef struct {
      bool state;
      pthread_mutex_t mutex;
      pthread_cond_t cond;
    } event_t;
    
    typedef event_t* CEvent;
    typedef CEvent CAutoResetEvent;
    #define Event_Construct(p) *(p) = NULL
    #define Event_IsCreated(p) (*(p) != NULL)
    
    WRes Event_Close(CEvent* p);
    WRes Event_Set(CEvent *p);
    WRes Event_Reset(CEvent *p);
    WRes Event_Wait(CEvent* p);
    WRes AutoResetEvent_CreateNotSignaled(CAutoResetEvent* p);
    
    #endif
    
    // CSemaphore is not used for decoding
    #ifdef _WIN32
    typedef HANDLE CSemaphore;
    #define Semaphore_Construct(p) *(p) = NULL
    #define Semaphore_IsCreated(p) (*(p) != NULL)
    #define Semaphore_Close(p) HandlePtr_Close(p)
    #define Semaphore_Wait(p) Handle_WaitObject(*(p))
    WRes Semaphore_Create(CSemaphore *p, UInt32 initCount, UInt32 maxCount);
    WRes Semaphore_ReleaseN(CSemaphore *p, UInt32 num);
    WRes Semaphore_Release1(CSemaphore *p);
    #endif
    
    #ifdef _WIN32
    
    typedef CRITICAL_SECTION CCriticalSection;
    WRes CriticalSection_Init(CCriticalSection *p);
    #define CriticalSection_Delete(p) DeleteCriticalSection(p)
    #define CriticalSection_Enter(p) EnterCriticalSection(p)
    #define CriticalSection_Leave(p) LeaveCriticalSection(p)
    
    #else
    /// use mutex instead
    typedef pthread_mutex_t* CCriticalSection
    WRes CriticalSection_Init(CCriticalSection *p);
    WRes CriticalSection_Delete(CCriticalSection *p);
    WRes CriticalSection_Enter(CCriticalSection *p);
    WRes CriticalSection_Leave(CCriticalSection *p);
    
    #endif
    EXTERN_C_END
    
    #endif
    

    Threads.c

    /* Threads.c -- multithreading library
    2017-06-26 : Igor Pavlov : Public domain */
    
    #include "Precomp.h"
    
    #ifdef _WIN32
      #ifndef UNDER_CE
      #include <process.h>
      #endif
    #endif
    
    #include "Threads.h"
    
    #ifdef _WIN32
    static WRes GetError()
    {
      DWORD res = GetLastError();
      return res ? (WRes)res : 1;
    }
    
    static WRes HandleToWRes(HANDLE h) { return (h != NULL) ? 0 : GetError(); }
    static WRes BOOLToWRes(BOOL v) { return v ? 0 : GetError(); }
    
    WRes HandlePtr_Close(HANDLE *p)
    {
      if (*p != NULL)
      {
        if (!CloseHandle(*p))
          return GetError();
        *p = NULL;
      }
      return 0;
    }
    
    WRes Handle_WaitObject(HANDLE h) { return (WRes)WaitForSingleObject(h, INFINITE); }
    
    #else
    /// unix specific functions
    
    WRes HandleThread_Close(pthread_t* th) {
      free(th);
      th = NULL;
      return 0;
    }
    
    WRes HandleThread_Join(pthread_t* th) {
      return pthread_join(*th, NULL);
    }
    
    #endif
    
    
    #ifdef _WIN32
    WRes Thread_Create(CThread *p, THREAD_FUNC_TYPE func, LPVOID param)
    {
      /* Windows Me/98/95: threadId parameter may not be NULL in _beginthreadex/CreateThread functions */
      
      #ifdef UNDER_CE
      
      DWORD threadId;
      *p = CreateThread(0, 0, func, param, 0, &threadId);
    
      #else
    
      unsigned threadId;
      *p = (HANDLE)_beginthreadex(NULL, 0, func, param, 0, &threadId);
       
      #endif
    
      /* maybe we must use errno here, but probably GetLastError() is also OK. */
      return HandleToWRes(*p);
    }
    #else
    pthread_attr_t g_th_attrs[64]; //NOTE: maximum of 64 threads
    size_t g_th_index = 0;
    
    WRes Thread_Create(CThread *p, THREAD_FUNC_TYPE func, LPVOID param)
    {
      *p = malloc(sizeof(pthread_t));
      pthread_t* th = *p;
      int ret = pthread_attr_init(&(g_th_attrs[g_th_index]));
      assert(ret==0);
      ret = pthread_create(th, &(g_th_attrs[g_th_index]), func, param);
      g_th_index++;
      return ret;
    }
    #endif
    
    
    #ifdef _WIN32
    static WRes Event_Create(CEvent *p, BOOL manualReset, int signaled)
    {
      *p = CreateEvent(NULL, manualReset, (signaled ? TRUE : FALSE), NULL);
      return HandleToWRes(*p);
    }
    
    WRes Event_Set(CEvent *p) { return BOOLToWRes(SetEvent(*p)); }
    WRes Event_Reset(CEvent *p) { return BOOLToWRes(ResetEvent(*p)); }
    
    WRes ManualResetEvent_Create(CManualResetEvent *p, int signaled) { return Event_Create(p, TRUE, signaled); }
    WRes AutoResetEvent_Create(CAutoResetEvent *p, int signaled) { return Event_Create(p, FALSE, signaled); }
    WRes ManualResetEvent_CreateNotSignaled(CManualResetEvent *p) { return ManualResetEvent_Create(p, 0); }
    WRes AutoResetEvent_CreateNotSignaled(CAutoResetEvent *p) { return AutoResetEvent_Create(p, 0); }
    
    #else 
    ///unix
    
    WRes Event_Close(CEvent* p) {
      if (!p || !(*p))
        return 0;
      event_t* evt = *p;
      pthread_cond_destroy(&evt->cond);
      pthread_mutex_destroy(&evt->mutex);
      free(evt);
      *p = NULL;
    }
    
    
    WRes Event_Set(CEvent *p) {
      event_t* evt = *p;
      if (pthread_mutex_lock(&evt->mutex) != 0) {
        return 1;
      }
      evt->state = true;
    
      if (evt->manual_reset) {
        if (pthread_cond_broadcast(&evt->cond)) {
          return 1;
        }
      } else {
        if (pthread_cond_signal(&evn->cond)) {
          return 1;
        }
      }
    
      if (pthread_mutex_unlock(&evt->mutex)) {
        return 1;
      }
    
      return 0;
    }
    
    WRes Event_Reset(CEvent* p) {
      event_t* evt = *p;
      if (pthread_mutex_lock(&evt->mutex)) {
        return 1;
      }
    
      evt->state = false;
    
      if (pthread_mutex_unlock(&evt->mutex)) {
        return 1;
      }
    
      return 0;
    }
    
    WRes Event_Wait(CEvent* p) {
      event_t* evt = *p;
      if (pthread_mutex_lock(&evt->mutex)) {
        return 1;
      }
    
      while (!evt->state) {
        if (pthread_cond_wait(&evt->cond, &evt->mutex)) {
          pthread_mutex_unlock(&evt->mutex);
          return 1;
        }
      }
    
      evt->state = false;
      if (pthread_mutex_unlock(&evt->mutex)) {
        return 1;
      }
      return 0;
    }
    
    WRes AutoResetEvent_CreateNotSignaled(CAutoResetEvent *p) {
      *p = malloc(sizeof(event_t));
      memset(evt, 0, sizeof(event_t));
      evt->state = false;
      evt->manual_reset = false;
      if (pthread_mutex_init(&evt->mutex, NULL)) {
        return 1;
      }
      if (pthread_cond_init(&evt->cond, NULL)) {
        return 1;
      }
      return 0;
    }
    
    #endif
    
    
    #ifdef _WIN32
    
    WRes Semaphore_Create(CSemaphore *p, UInt32 initCount, UInt32 maxCount)
    {
      *p = CreateSemaphore(NULL, (LONG)initCount, (LONG)maxCount, NULL);
      return HandleToWRes(*p);
    }
    
    static WRes Semaphore_Release(CSemaphore *p, LONG releaseCount, LONG *previousCount)
      { return BOOLToWRes(ReleaseSemaphore(*p, releaseCount, previousCount)); }
    WRes Semaphore_ReleaseN(CSemaphore *p, UInt32 num)
      { return Semaphore_Release(p, (LONG)num, NULL); }
    WRes Semaphore_Release1(CSemaphore *p) { return Semaphore_ReleaseN(p, 1); }
    
    #endif
    
    
    #ifdef _WIN32
    WRes CriticalSection_Init(CCriticalSection *p)
    {
      /* InitializeCriticalSection can raise only STATUS_NO_MEMORY exception */
      #ifdef _MSC_VER
      __try
      #endif
      {
        InitializeCriticalSection(p);
        /* InitializeCriticalSectionAndSpinCount(p, 0); */
      }
      #ifdef _MSC_VER
      __except (EXCEPTION_EXECUTE_HANDLER) { return 1; }
      #endif
      return 0;
    }
    
    #else
    WRes CriticalSection_Init(CCriticalSection *p) {
      *p = malloc(sizeof(pthread_mutex_t));
      if (pthread_mutex_init(*p, NULL)) {
        return 1;
      }
      return 0;
    }
    
    WRes CriticalSection_Delete(CCriticalSection *p) {
      pthread_mutex_t* mtx = *p;
      return pthread_mutex_destroy(mtx);
    }
    
    WRes CriticalSection_Enter(CCriticalSection *p) {
      if (pthread_mutex_lock(&evt->mutex)) {
        return 1;
      }
      return 0;
    }
    
    WRes CriticalSection_Leave(CCriticalSection *p) {
      if (pthread_mutex_unlock(&evt->mutex)) {
        return 1;
      }
      return 0;
    }
    
    #endif
    

    XzDec.c

    add the following macro

    #ifndef _WIN32
    #define S_OK 0x00000000
    #define E_FAIL 0x80004005
    #endif

    MtDec.c

    replace function ThreadFUnc1 with:

    static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc1(void* pp)
    {
      static int g_ok_stat = 0x0;
      static int g_err_stat = 0x80004005;
      WRes res;
      CMtDecThread *t = (CMtDecThread*)pp;
      CMtDec *p;
    
      res = ThreadFunc2(t);
      p = t->mtDec;
      if (res == 0) {
    #ifdef _WIN32
        return p->exitThreadWRes;
    #else
        if (p->exitThreadWRes) { return &g_err_stat; }
        else { return &g_ok_stat; }
    #endif
      }
      {
        // it's unexpected situation for some threading function error
        if (p->exitThreadWRes == 0)
          p->exitThreadWRes = res;
        PRF(printf("\nthread exit error = %d\n", res));
        p->exitThread = True;
        Event_Set(&p->threads[0].canRead);
        Event_Set(&p->threads[0].canWrite);
        MtProgress_SetError(&p->mtProgress, MY_SRes_HRESULT_FROM_WRes(res));
      }
    #ifdef _WIN32
        return res;
    #else
        return &g_err_stat;
    #endif
    }