为unix平台增加XZ多线程解压缩支持(基于7zip LZMA SDK, C/C++)
Note
This post has nothing to do with the pixz project. I am talking about decompressing the original xz archive using 7-zip’s LZMA SDK under unix environment.
Background
Originally the 7zip’s LZMA SDK (version 19.00) only covers parallel xz decompression for Windows systems. This post shows the C code that adds support for lib pthread, i.e. unix systems.
Compiling with C++ using the C library should also work, I have tested it on my own box.
Little Details
Actually the original writer has completed all the necessary abstraction for the multi-threading pipeline. All I have done is adding some macros and pthread equivalents to Windows threading model.
Usage
Replace lzma/C/... with below files. The new code should work on both Windows and Unix systems.
Git Repo
See https://github.com/idailylife/lzma-xz-parallel
Source Code
Threads.h
/* Threads.h -- multithreading library
2017-06-18 : Igor Pavlov : Public domain */
#ifndef __7Z_THREADS_H
#define __7Z_THREADS_H
#ifdef _WIN32
#include <windows.h>
#else
#include <pthread.h>
#endif
#include "7zTypes.h"
EXTERN_C_BEGIN
WRes HandlePtr_Close(HANDLE *h);
WRes Handle_WaitObject(HANDLE h);
#ifdef _WIN32
typedef HANDLE CThread;
#define Thread_Construct(p) *(p) = NULL
#define Thread_WasCreated(p) (*(p) != NULL)
#define Thread_Close(p) HandlePtr_Close(p)
#define Thread_Wait(p) Handle_WaitObject(*(p))
#else
typedef void* LPVOID;
typedef pthread_t* CThread;
#define Thread_Construct(p) *(p) = NULL
#define Thread_WasCreated(p) (*(p) != NULL)
#define Thread_Close(p) HandleThread_Close(*(p))
#define Thread_Wait(p) HandleThread_Join(*(p))
WRes HandleThread_Close(pthread_t* th);
WRes HandleThread_Join(pthread_t* th);
#endif
typedef
#ifdef UNDER_CE
  DWORD
#else
  unsigned
#endif
  THREAD_FUNC_RET_TYPE;
#define THREAD_FUNC_CALL_TYPE MY_STD_CALL
#define THREAD_FUNC_DECL THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE
typedef THREAD_FUNC_RET_TYPE (THREAD_FUNC_CALL_TYPE * THREAD_FUNC_TYPE)(void *);
WRes Thread_Create(CThread *p, THREAD_FUNC_TYPE func, LPVOID param);
#ifdef _WIN32
typedef HANDLE CEvent;
typedef CEvent CAutoResetEvent;
typedef CEvent CManualResetEvent;
#define Event_Construct(p) *(p) = NULL
#define Event_IsCreated(p) (*(p) != NULL)
#define Event_Close(p) HandlePtr_Close(p)
#define Event_Wait(p) Handle_WaitObject(*(p))
WRes Event_Set(CEvent *p);
WRes Event_Reset(CEvent *p);
WRes ManualResetEvent_Create(CManualResetEvent *p, int signaled); // not used
WRes ManualResetEvent_CreateNotSignaled(CManualResetEvent *p); // not used
WRes AutoResetEvent_Create(CAutoResetEvent *p, int signaled);
WRes AutoResetEvent_CreateNotSignaled(CAutoResetEvent *p);
#else
typedef struct {
  bool state;
  pthread_mutex_t mutex;
  pthread_cond_t cond;
} event_t;
typedef event_t* CEvent;
typedef CEvent CAutoResetEvent;
#define Event_Construct(p) *(p) = NULL
#define Event_IsCreated(p) (*(p) != NULL)
WRes Event_Close(CEvent* p);
WRes Event_Set(CEvent *p);
WRes Event_Reset(CEvent *p);
WRes Event_Wait(CEvent* p);
WRes AutoResetEvent_CreateNotSignaled(CAutoResetEvent* p);
#endif
// CSemaphore is not used for decoding
#ifdef _WIN32
typedef HANDLE CSemaphore;
#define Semaphore_Construct(p) *(p) = NULL
#define Semaphore_IsCreated(p) (*(p) != NULL)
#define Semaphore_Close(p) HandlePtr_Close(p)
#define Semaphore_Wait(p) Handle_WaitObject(*(p))
WRes Semaphore_Create(CSemaphore *p, UInt32 initCount, UInt32 maxCount);
WRes Semaphore_ReleaseN(CSemaphore *p, UInt32 num);
WRes Semaphore_Release1(CSemaphore *p);
#endif
#ifdef _WIN32
typedef CRITICAL_SECTION CCriticalSection;
WRes CriticalSection_Init(CCriticalSection *p);
#define CriticalSection_Delete(p) DeleteCriticalSection(p)
#define CriticalSection_Enter(p) EnterCriticalSection(p)
#define CriticalSection_Leave(p) LeaveCriticalSection(p)
#else
/// use mutex instead
typedef pthread_mutex_t* CCriticalSection
WRes CriticalSection_Init(CCriticalSection *p);
WRes CriticalSection_Delete(CCriticalSection *p);
WRes CriticalSection_Enter(CCriticalSection *p);
WRes CriticalSection_Leave(CCriticalSection *p);
#endif
EXTERN_C_END
#endif
Threads.c
/* Threads.c -- multithreading library
2017-06-26 : Igor Pavlov : Public domain */
#include "Precomp.h"
#ifdef _WIN32
  #ifndef UNDER_CE
  #include <process.h>
  #endif
#endif
#include "Threads.h"
#ifdef _WIN32
static WRes GetError()
{
  DWORD res = GetLastError();
  return res ? (WRes)res : 1;
}
static WRes HandleToWRes(HANDLE h) { return (h != NULL) ? 0 : GetError(); }
static WRes BOOLToWRes(BOOL v) { return v ? 0 : GetError(); }
WRes HandlePtr_Close(HANDLE *p)
{
  if (*p != NULL)
  {
    if (!CloseHandle(*p))
      return GetError();
    *p = NULL;
  }
  return 0;
}
WRes Handle_WaitObject(HANDLE h) { return (WRes)WaitForSingleObject(h, INFINITE); }
#else
/// unix specific functions
WRes HandleThread_Close(pthread_t* th) {
  free(th);
  th = NULL;
  return 0;
}
WRes HandleThread_Join(pthread_t* th) {
  return pthread_join(*th, NULL);
}
#endif
#ifdef _WIN32
WRes Thread_Create(CThread *p, THREAD_FUNC_TYPE func, LPVOID param)
{
  /* Windows Me/98/95: threadId parameter may not be NULL in _beginthreadex/CreateThread functions */
  
  #ifdef UNDER_CE
  
  DWORD threadId;
  *p = CreateThread(0, 0, func, param, 0, &threadId);
  #else
  unsigned threadId;
  *p = (HANDLE)_beginthreadex(NULL, 0, func, param, 0, &threadId);
   
  #endif
  /* maybe we must use errno here, but probably GetLastError() is also OK. */
  return HandleToWRes(*p);
}
#else
pthread_attr_t g_th_attrs[64]; //NOTE: maximum of 64 threads
size_t g_th_index = 0;
WRes Thread_Create(CThread *p, THREAD_FUNC_TYPE func, LPVOID param)
{
  *p = malloc(sizeof(pthread_t));
  pthread_t* th = *p;
  int ret = pthread_attr_init(&(g_th_attrs[g_th_index]));
  assert(ret==0);
  ret = pthread_create(th, &(g_th_attrs[g_th_index]), func, param);
  g_th_index++;
  return ret;
}
#endif
#ifdef _WIN32
static WRes Event_Create(CEvent *p, BOOL manualReset, int signaled)
{
  *p = CreateEvent(NULL, manualReset, (signaled ? TRUE : FALSE), NULL);
  return HandleToWRes(*p);
}
WRes Event_Set(CEvent *p) { return BOOLToWRes(SetEvent(*p)); }
WRes Event_Reset(CEvent *p) { return BOOLToWRes(ResetEvent(*p)); }
WRes ManualResetEvent_Create(CManualResetEvent *p, int signaled) { return Event_Create(p, TRUE, signaled); }
WRes AutoResetEvent_Create(CAutoResetEvent *p, int signaled) { return Event_Create(p, FALSE, signaled); }
WRes ManualResetEvent_CreateNotSignaled(CManualResetEvent *p) { return ManualResetEvent_Create(p, 0); }
WRes AutoResetEvent_CreateNotSignaled(CAutoResetEvent *p) { return AutoResetEvent_Create(p, 0); }
#else 
///unix
WRes Event_Close(CEvent* p) {
  if (!p || !(*p))
    return 0;
  event_t* evt = *p;
  pthread_cond_destroy(&evt->cond);
  pthread_mutex_destroy(&evt->mutex);
  free(evt);
  *p = NULL;
}
WRes Event_Set(CEvent *p) {
  event_t* evt = *p;
  if (pthread_mutex_lock(&evt->mutex) != 0) {
    return 1;
  }
  evt->state = true;
  if (evt->manual_reset) {
    if (pthread_cond_broadcast(&evt->cond)) {
      return 1;
    }
  } else {
    if (pthread_cond_signal(&evn->cond)) {
      return 1;
    }
  }
  if (pthread_mutex_unlock(&evt->mutex)) {
    return 1;
  }
  return 0;
}
WRes Event_Reset(CEvent* p) {
  event_t* evt = *p;
  if (pthread_mutex_lock(&evt->mutex)) {
    return 1;
  }
  evt->state = false;
  if (pthread_mutex_unlock(&evt->mutex)) {
    return 1;
  }
  return 0;
}
WRes Event_Wait(CEvent* p) {
  event_t* evt = *p;
  if (pthread_mutex_lock(&evt->mutex)) {
    return 1;
  }
  while (!evt->state) {
    if (pthread_cond_wait(&evt->cond, &evt->mutex)) {
      pthread_mutex_unlock(&evt->mutex);
      return 1;
    }
  }
  evt->state = false;
  if (pthread_mutex_unlock(&evt->mutex)) {
    return 1;
  }
  return 0;
}
WRes AutoResetEvent_CreateNotSignaled(CAutoResetEvent *p) {
  *p = malloc(sizeof(event_t));
  memset(evt, 0, sizeof(event_t));
  evt->state = false;
  evt->manual_reset = false;
  if (pthread_mutex_init(&evt->mutex, NULL)) {
    return 1;
  }
  if (pthread_cond_init(&evt->cond, NULL)) {
    return 1;
  }
  return 0;
}
#endif
#ifdef _WIN32
WRes Semaphore_Create(CSemaphore *p, UInt32 initCount, UInt32 maxCount)
{
  *p = CreateSemaphore(NULL, (LONG)initCount, (LONG)maxCount, NULL);
  return HandleToWRes(*p);
}
static WRes Semaphore_Release(CSemaphore *p, LONG releaseCount, LONG *previousCount)
  { return BOOLToWRes(ReleaseSemaphore(*p, releaseCount, previousCount)); }
WRes Semaphore_ReleaseN(CSemaphore *p, UInt32 num)
  { return Semaphore_Release(p, (LONG)num, NULL); }
WRes Semaphore_Release1(CSemaphore *p) { return Semaphore_ReleaseN(p, 1); }
#endif
#ifdef _WIN32
WRes CriticalSection_Init(CCriticalSection *p)
{
  /* InitializeCriticalSection can raise only STATUS_NO_MEMORY exception */
  #ifdef _MSC_VER
  __try
  #endif
  {
    InitializeCriticalSection(p);
    /* InitializeCriticalSectionAndSpinCount(p, 0); */
  }
  #ifdef _MSC_VER
  __except (EXCEPTION_EXECUTE_HANDLER) { return 1; }
  #endif
  return 0;
}
#else
WRes CriticalSection_Init(CCriticalSection *p) {
  *p = malloc(sizeof(pthread_mutex_t));
  if (pthread_mutex_init(*p, NULL)) {
    return 1;
  }
  return 0;
}
WRes CriticalSection_Delete(CCriticalSection *p) {
  pthread_mutex_t* mtx = *p;
  return pthread_mutex_destroy(mtx);
}
WRes CriticalSection_Enter(CCriticalSection *p) {
  if (pthread_mutex_lock(&evt->mutex)) {
    return 1;
  }
  return 0;
}
WRes CriticalSection_Leave(CCriticalSection *p) {
  if (pthread_mutex_unlock(&evt->mutex)) {
    return 1;
  }
  return 0;
}
#endif
XzDec.c
add the following macro
#ifndef _WIN32
#define S_OK 0x00000000
#define E_FAIL 0x80004005
#endifMtDec.c
replace function ThreadFUnc1 with:
static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc1(void* pp)
{
  static int g_ok_stat = 0x0;
  static int g_err_stat = 0x80004005;
  WRes res;
  CMtDecThread *t = (CMtDecThread*)pp;
  CMtDec *p;
  res = ThreadFunc2(t);
  p = t->mtDec;
  if (res == 0) {
#ifdef _WIN32
    return p->exitThreadWRes;
#else
    if (p->exitThreadWRes) { return &g_err_stat; }
    else { return &g_ok_stat; }
#endif
  }
  {
    // it's unexpected situation for some threading function error
    if (p->exitThreadWRes == 0)
      p->exitThreadWRes = res;
    PRF(printf("\nthread exit error = %d\n", res));
    p->exitThread = True;
    Event_Set(&p->threads[0].canRead);
    Event_Set(&p->threads[0].canWrite);
    MtProgress_SetError(&p->mtProgress, MY_SRes_HRESULT_FROM_WRes(res));
  }
#ifdef _WIN32
    return res;
#else
    return &g_err_stat;
#endif
}
发表回复