Support parallel XZ decompression for unix (7zip LZMA SDK based, C/C++)

为unix平台增加XZ多线程解压缩支持(基于7zip LZMA SDK, C/C++)

Note

This post has nothing to do with the pixz project. I am talking about decompressing the original xz archive using 7-zip’s LZMA SDK under unix environment.

Background

Originally the 7zip’s LZMA SDK (version 19.00) only covers parallel xz decompression for Windows systems. This post shows the C code that adds support for lib pthread, i.e. unix systems.
Compiling with C++ using the C library should also work, I have tested it on my own box.

Little Details

Actually the original writer has completed all the necessary abstraction for the multi-threading pipeline. All I have done is adding some macros and pthread equivalents to Windows threading model.

Usage

Replace lzma/C/... with below files. The new code should work on both Windows and Unix systems.

Git Repo

See https://github.com/idailylife/lzma-xz-parallel

Source Code

Threads.h

/* Threads.h -- multithreading library
2017-06-18 : Igor Pavlov : Public domain */

#ifndef __7Z_THREADS_H
#define __7Z_THREADS_H

#ifdef _WIN32
#include <windows.h>
#else
#include <pthread.h>
#endif

#include "7zTypes.h"

EXTERN_C_BEGIN

WRes HandlePtr_Close(HANDLE *h);
WRes Handle_WaitObject(HANDLE h);

#ifdef _WIN32
typedef HANDLE CThread;
#define Thread_Construct(p) *(p) = NULL
#define Thread_WasCreated(p) (*(p) != NULL)
#define Thread_Close(p) HandlePtr_Close(p)
#define Thread_Wait(p) Handle_WaitObject(*(p))
#else
typedef void* LPVOID;
typedef pthread_t* CThread;
#define Thread_Construct(p) *(p) = NULL
#define Thread_WasCreated(p) (*(p) != NULL)
#define Thread_Close(p) HandleThread_Close(*(p))
#define Thread_Wait(p) HandleThread_Join(*(p))
WRes HandleThread_Close(pthread_t* th);
WRes HandleThread_Join(pthread_t* th);

#endif


typedef
#ifdef UNDER_CE
  DWORD
#else
  unsigned
#endif
  THREAD_FUNC_RET_TYPE;

#define THREAD_FUNC_CALL_TYPE MY_STD_CALL
#define THREAD_FUNC_DECL THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE
typedef THREAD_FUNC_RET_TYPE (THREAD_FUNC_CALL_TYPE * THREAD_FUNC_TYPE)(void *);
WRes Thread_Create(CThread *p, THREAD_FUNC_TYPE func, LPVOID param);

#ifdef _WIN32

typedef HANDLE CEvent;
typedef CEvent CAutoResetEvent;
typedef CEvent CManualResetEvent;
#define Event_Construct(p) *(p) = NULL
#define Event_IsCreated(p) (*(p) != NULL)
#define Event_Close(p) HandlePtr_Close(p)
#define Event_Wait(p) Handle_WaitObject(*(p))
WRes Event_Set(CEvent *p);
WRes Event_Reset(CEvent *p);
WRes ManualResetEvent_Create(CManualResetEvent *p, int signaled); // not used
WRes ManualResetEvent_CreateNotSignaled(CManualResetEvent *p); // not used
WRes AutoResetEvent_Create(CAutoResetEvent *p, int signaled);
WRes AutoResetEvent_CreateNotSignaled(CAutoResetEvent *p);

#else
typedef struct {
  bool state;
  pthread_mutex_t mutex;
  pthread_cond_t cond;
} event_t;

typedef event_t* CEvent;
typedef CEvent CAutoResetEvent;
#define Event_Construct(p) *(p) = NULL
#define Event_IsCreated(p) (*(p) != NULL)

WRes Event_Close(CEvent* p);
WRes Event_Set(CEvent *p);
WRes Event_Reset(CEvent *p);
WRes Event_Wait(CEvent* p);
WRes AutoResetEvent_CreateNotSignaled(CAutoResetEvent* p);

#endif

// CSemaphore is not used for decoding
#ifdef _WIN32
typedef HANDLE CSemaphore;
#define Semaphore_Construct(p) *(p) = NULL
#define Semaphore_IsCreated(p) (*(p) != NULL)
#define Semaphore_Close(p) HandlePtr_Close(p)
#define Semaphore_Wait(p) Handle_WaitObject(*(p))
WRes Semaphore_Create(CSemaphore *p, UInt32 initCount, UInt32 maxCount);
WRes Semaphore_ReleaseN(CSemaphore *p, UInt32 num);
WRes Semaphore_Release1(CSemaphore *p);
#endif

#ifdef _WIN32

typedef CRITICAL_SECTION CCriticalSection;
WRes CriticalSection_Init(CCriticalSection *p);
#define CriticalSection_Delete(p) DeleteCriticalSection(p)
#define CriticalSection_Enter(p) EnterCriticalSection(p)
#define CriticalSection_Leave(p) LeaveCriticalSection(p)

#else
/// use mutex instead
typedef pthread_mutex_t* CCriticalSection
WRes CriticalSection_Init(CCriticalSection *p);
WRes CriticalSection_Delete(CCriticalSection *p);
WRes CriticalSection_Enter(CCriticalSection *p);
WRes CriticalSection_Leave(CCriticalSection *p);

#endif
EXTERN_C_END

#endif

Threads.c

/* Threads.c -- multithreading library
2017-06-26 : Igor Pavlov : Public domain */

#include "Precomp.h"

#ifdef _WIN32
  #ifndef UNDER_CE
  #include <process.h>
  #endif
#endif

#include "Threads.h"

#ifdef _WIN32
static WRes GetError()
{
  DWORD res = GetLastError();
  return res ? (WRes)res : 1;
}

static WRes HandleToWRes(HANDLE h) { return (h != NULL) ? 0 : GetError(); }
static WRes BOOLToWRes(BOOL v) { return v ? 0 : GetError(); }

WRes HandlePtr_Close(HANDLE *p)
{
  if (*p != NULL)
  {
    if (!CloseHandle(*p))
      return GetError();
    *p = NULL;
  }
  return 0;
}

WRes Handle_WaitObject(HANDLE h) { return (WRes)WaitForSingleObject(h, INFINITE); }

#else
/// unix specific functions

WRes HandleThread_Close(pthread_t* th) {
  free(th);
  th = NULL;
  return 0;
}

WRes HandleThread_Join(pthread_t* th) {
  return pthread_join(*th, NULL);
}

#endif


#ifdef _WIN32
WRes Thread_Create(CThread *p, THREAD_FUNC_TYPE func, LPVOID param)
{
  /* Windows Me/98/95: threadId parameter may not be NULL in _beginthreadex/CreateThread functions */
  
  #ifdef UNDER_CE
  
  DWORD threadId;
  *p = CreateThread(0, 0, func, param, 0, &threadId);

  #else

  unsigned threadId;
  *p = (HANDLE)_beginthreadex(NULL, 0, func, param, 0, &threadId);
   
  #endif

  /* maybe we must use errno here, but probably GetLastError() is also OK. */
  return HandleToWRes(*p);
}
#else
pthread_attr_t g_th_attrs[64]; //NOTE: maximum of 64 threads
size_t g_th_index = 0;

WRes Thread_Create(CThread *p, THREAD_FUNC_TYPE func, LPVOID param)
{
  *p = malloc(sizeof(pthread_t));
  pthread_t* th = *p;
  int ret = pthread_attr_init(&(g_th_attrs[g_th_index]));
  assert(ret==0);
  ret = pthread_create(th, &(g_th_attrs[g_th_index]), func, param);
  g_th_index++;
  return ret;
}
#endif


#ifdef _WIN32
static WRes Event_Create(CEvent *p, BOOL manualReset, int signaled)
{
  *p = CreateEvent(NULL, manualReset, (signaled ? TRUE : FALSE), NULL);
  return HandleToWRes(*p);
}

WRes Event_Set(CEvent *p) { return BOOLToWRes(SetEvent(*p)); }
WRes Event_Reset(CEvent *p) { return BOOLToWRes(ResetEvent(*p)); }

WRes ManualResetEvent_Create(CManualResetEvent *p, int signaled) { return Event_Create(p, TRUE, signaled); }
WRes AutoResetEvent_Create(CAutoResetEvent *p, int signaled) { return Event_Create(p, FALSE, signaled); }
WRes ManualResetEvent_CreateNotSignaled(CManualResetEvent *p) { return ManualResetEvent_Create(p, 0); }
WRes AutoResetEvent_CreateNotSignaled(CAutoResetEvent *p) { return AutoResetEvent_Create(p, 0); }

#else 
///unix

WRes Event_Close(CEvent* p) {
  if (!p || !(*p))
    return 0;
  event_t* evt = *p;
  pthread_cond_destroy(&evt->cond);
  pthread_mutex_destroy(&evt->mutex);
  free(evt);
  *p = NULL;
}


WRes Event_Set(CEvent *p) {
  event_t* evt = *p;
  if (pthread_mutex_lock(&evt->mutex) != 0) {
    return 1;
  }
  evt->state = true;

  if (evt->manual_reset) {
    if (pthread_cond_broadcast(&evt->cond)) {
      return 1;
    }
  } else {
    if (pthread_cond_signal(&evn->cond)) {
      return 1;
    }
  }

  if (pthread_mutex_unlock(&evt->mutex)) {
    return 1;
  }

  return 0;
}

WRes Event_Reset(CEvent* p) {
  event_t* evt = *p;
  if (pthread_mutex_lock(&evt->mutex)) {
    return 1;
  }

  evt->state = false;

  if (pthread_mutex_unlock(&evt->mutex)) {
    return 1;
  }

  return 0;
}

WRes Event_Wait(CEvent* p) {
  event_t* evt = *p;
  if (pthread_mutex_lock(&evt->mutex)) {
    return 1;
  }

  while (!evt->state) {
    if (pthread_cond_wait(&evt->cond, &evt->mutex)) {
      pthread_mutex_unlock(&evt->mutex);
      return 1;
    }
  }

  evt->state = false;
  if (pthread_mutex_unlock(&evt->mutex)) {
    return 1;
  }
  return 0;
}

WRes AutoResetEvent_CreateNotSignaled(CAutoResetEvent *p) {
  *p = malloc(sizeof(event_t));
  memset(evt, 0, sizeof(event_t));
  evt->state = false;
  evt->manual_reset = false;
  if (pthread_mutex_init(&evt->mutex, NULL)) {
    return 1;
  }
  if (pthread_cond_init(&evt->cond, NULL)) {
    return 1;
  }
  return 0;
}

#endif


#ifdef _WIN32

WRes Semaphore_Create(CSemaphore *p, UInt32 initCount, UInt32 maxCount)
{
  *p = CreateSemaphore(NULL, (LONG)initCount, (LONG)maxCount, NULL);
  return HandleToWRes(*p);
}

static WRes Semaphore_Release(CSemaphore *p, LONG releaseCount, LONG *previousCount)
  { return BOOLToWRes(ReleaseSemaphore(*p, releaseCount, previousCount)); }
WRes Semaphore_ReleaseN(CSemaphore *p, UInt32 num)
  { return Semaphore_Release(p, (LONG)num, NULL); }
WRes Semaphore_Release1(CSemaphore *p) { return Semaphore_ReleaseN(p, 1); }

#endif


#ifdef _WIN32
WRes CriticalSection_Init(CCriticalSection *p)
{
  /* InitializeCriticalSection can raise only STATUS_NO_MEMORY exception */
  #ifdef _MSC_VER
  __try
  #endif
  {
    InitializeCriticalSection(p);
    /* InitializeCriticalSectionAndSpinCount(p, 0); */
  }
  #ifdef _MSC_VER
  __except (EXCEPTION_EXECUTE_HANDLER) { return 1; }
  #endif
  return 0;
}

#else
WRes CriticalSection_Init(CCriticalSection *p) {
  *p = malloc(sizeof(pthread_mutex_t));
  if (pthread_mutex_init(*p, NULL)) {
    return 1;
  }
  return 0;
}

WRes CriticalSection_Delete(CCriticalSection *p) {
  pthread_mutex_t* mtx = *p;
  return pthread_mutex_destroy(mtx);
}

WRes CriticalSection_Enter(CCriticalSection *p) {
  if (pthread_mutex_lock(&evt->mutex)) {
    return 1;
  }
  return 0;
}

WRes CriticalSection_Leave(CCriticalSection *p) {
  if (pthread_mutex_unlock(&evt->mutex)) {
    return 1;
  }
  return 0;
}

#endif

XzDec.c

add the following macro

#ifndef _WIN32
#define S_OK 0x00000000
#define E_FAIL 0x80004005
#endif

MtDec.c

replace function ThreadFUnc1 with:

static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc1(void* pp)
{
  static int g_ok_stat = 0x0;
  static int g_err_stat = 0x80004005;
  WRes res;
  CMtDecThread *t = (CMtDecThread*)pp;
  CMtDec *p;

  res = ThreadFunc2(t);
  p = t->mtDec;
  if (res == 0) {
#ifdef _WIN32
    return p->exitThreadWRes;
#else
    if (p->exitThreadWRes) { return &g_err_stat; }
    else { return &g_ok_stat; }
#endif
  }
  {
    // it's unexpected situation for some threading function error
    if (p->exitThreadWRes == 0)
      p->exitThreadWRes = res;
    PRF(printf("\nthread exit error = %d\n", res));
    p->exitThread = True;
    Event_Set(&p->threads[0].canRead);
    Event_Set(&p->threads[0].canWrite);
    MtProgress_SetError(&p->mtProgress, MY_SRes_HRESULT_FROM_WRes(res));
  }
#ifdef _WIN32
    return res;
#else
    return &g_err_stat;
#endif
}

评论

《“Support parallel XZ decompression for unix (7zip LZMA SDK based, C/C++)”》 有 1 条评论

  1. master 的头像

    LZMA SDK is placed in the public domain.

    Anyone is free to copy, modify, publish, use, compile, sell, or distribute the original LZMA SDK code, either in source code form or as a compiled binary, for any purpose, commercial or non-commercial, and by any means.

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据