Lambda 표현식 이란?

Java에서도 똑같이 Lambda 표현식이 있으며 Java 1.8 부터 지원하게 되었다.
즉 "익명함수"라고 하며 보통 함수명을 붙여서 함수를 만들지만 재사용성이 없으며 1급함수를 다룰때는 Lambda가 유용할 수 있다.

Lambda 표현식 쓰는 법

def add_ten(x):
    return x + 10

lambda x: x + 10

즉 일반적으로는 def 함수명(parameter): 과 return 으로 이루어진다.

하지만 lambda는 익명이므로, lambda parameter: statement 또는 표현식의 경우 return 값이 된다.

일반적인 함수와 매핑을 해보면 def -> lambda    (parameter): -> parameter:    return -> 생략하고 바로 return값.

또한 중요한 것은 lambda 에서 expression 부분은 무조건 1개만 가능하다. 여러개를 사용할 수 없다.( colon(;) 으로 구분해도 안된다. )

 

Lambda 사용 예시

Lambda의 기본적인 사용예시를 아래에 설명한다.
  • 만약 일반 함수처럼 이름(reference)을 할당 하고 싶다면.
    • add_ten = lambda x: x+10
    • 하지만 이는 이렇게 된다는 것만 알것. PEP8에서 assignment(=)와 함께 lambda를 쓸려면 def를 쓰라고 강력 권고하고 있음.
  • lambda *args, **kwargs: None
    • 어떤 parameter가 오더라도 None을 return하라는 의미임.(즉 아무것도 하지 마라.)
    • 아래와 동일한 코드임.
def do_nothing(*args, **kwargs):
    passs

 

Lambda 자주 사용하는 용법

실제 개발할때 가장 많이 사용하는 lambda case이다. map, filter, sorted 시에 list와 함께 쓰인다.

map, filter, sorted, list 와 함께 사용

map은 말그대로 매핑하는 기능을 수행하는 built-in 함수이다.

map(function, iterable, ...) 의 signature 이며, function은 매핑Rule이다. 그리고 iterable은 보통 list가 들어오며

iterable로 들어온 대상의 항목을 하나씩 매핑하라는 뜻이다.

따라서 map(대문자바꾸는 함수, ['a', 'b', 'c']) 이렇게 하면 ['A','B','C'] 가 나온다.

 

map(lambda x: x.upper(),  ['a', 'b', 'c']) => 이거 자체도 map함수이며 list로 할려면,

list(map(lambda x: x.upper(),  ['a', 'b', 'c'])) 이렇게 하면 ['A', 'B', 'C']가 나온다. 


filter(function, iterable...) 도 필터링하는 built-in 함수이다.

즉, 필터링을 하되 어떻게 필터링해라라는 것이 function으로 오고, iterable은 필터링할 대상이다.

예를 들면 아래와 같다.

list(filter(lambda x: 'o' in x, ['cat', 'dog', 'cow']))

즉, x 가 in x 로 사용되므로 iterable이고 그게 바로 다음 parameter이다. 그리고 x 안에 알파벳 'o'  있으면 True임.

따라서 눈치챘겠지만 function은 boolean을 return하는 함수이어야 한다.


sorted 도 정렬된 객체를 새로 만드는 built-in 함수이다. 

sorted(iterable, function..) 이 된다.

즉 sort할 대상 iterable이며 해당 iterable에서 정렬할 로직이 function이 된다.

ids = ['id1', 'id2', 'id30', 'id3', 'id22', 'id100']

sorted_ids = sorted(ids, key=lambda x: int(x[2:]))

상기 예는 ids를 정렬해서 새로운 sorted_ids를 만들으라는 것인데 정렬로직은 x: int(x[2:]) 이다.

즉 iterable x 가 들어오면 세번째(포함)이후가 key value가 된다. 해당 값으로 정렬하라는 것이며

위 예의 경우는 id를 뺀 세번째 1,2,30,3,22,100 으로 정렬하라는 것이다.

즉 return은 ['id1', 'id2', 'id3', 'id22', 'id30', 'id100'] 이 된다.

 

'Python' 카테고리의 다른 글

Python Celery Task Monitoring  (0) 2019.10.08
Celery from scratch  (0) 2019.05.20
Python - Monkey Patch  (1) 2019.05.02
Python Excel to MySQL  (0) 2019.04.12
Database 정보 CSV 작성방법  (0) 2019.04.10

Celery Startup

Celery Coding

Coding은 매우 간단하다.(아래 샘플)

 

[file 명 : tasks.py]

from celery import Celery

app = Celery('tasks', broker="broker url", backend="backend url")

@app.task

def add(x, y):

    return x+y

 

이렇게 하면 기본적인 코딩 끝이다.

@app.task는 celery에 task를 등록한다는 것이다.

 

Celery 실행

실행방법

celery config paramters

 

http://docs.celeryproject.org/en/latest/userguide/configuration.html#example-configuration-file

ex) CELERYD_CONCURRENCY 는 celery 의 동시 실행 worker갯수를 의미한다.

따라서, 해당 변수를 setting하면 제어가능하다.

위의 변수를 setting안하면 default는 12이다.

따라서 아래 명령어 celery 를 실행하고 ps -eaf | grep celery | grep -v grep 하면 총 13개가 나온다.

1개는 master 이며 나머지 12개가 worker이다.

app = Celery(...), app.conf.update(CELERYD_CONCURRENCY=3) 이렇게 하고 다시 ps 해보면 4가 나온다.

 

celery -A tasks worker --loglevel=info

-A: application ( = -app )  => 확장자를 제외하고 입력함. 만약 tasks.py 이면 tasks를 입력

worker 는 기본 명령어이며 worker 서버를 구동하라는 뜻이다.

--loglevel은 다들 아는 것이니 넘어가자.

정리 : -A 파이썬화일명(확장자제외) 이부분만 기억하면 된다. 

 

celery worker 에 작업시키기

python prompt로 들어감.

중요한 것은, @app.task를 통해 등록한 add task를 호출하는 것은 delay 이다.

delay : task call

위에 key가 redis에 저장되는 키이다. ( celery-task-meta-  + key )

위와 같이 redis-cli로 들어가서 get key정보를 하면 값이 나온다. result를 보면 6이다.

task_id는 key가 된다.

 

비동기 작업 수행

Celery 활용시 필수 사항

대부분 비동기처리를 위해 Celery를 사용한다.

그리고 서버는 일반적으로 여러대이며 어떤 서버(그리고 어떤 Celery데몬이) 가 해당 Task를 수행할지 모른다.

예를 들면 File Upload 및 데이터 처리시 1번서버에서 Upload를 하면 1번서버에서 수행되어야 하지만,

2번서버의 Celery데몬이 작업을 수행하면 File not found error가 난다.

또는 2개의 데몬이 있는데 대량 작업이 돈다면 그 작업은 하나의 데몬, 나머지 작업은 다른 데몬이 하도록 하게 하고 싶은 경우,

Queue라는 개념이 매우 중요하다.

 

예를 들면,

add 라는 task가 있다고 하자.

@app.task
def add(x, y):
    sleep(30)
    return x + y

위 Task는 30초 걸리는 장시간 작업이다.

celery -A proj worker -l info
celery -A proj worker -l info -Q hipri

위 데몬에  -Q option이 큐지정이다.

-Q option : 기본값은 celery이다. 즉 없는 것은 -Q celery와 같다.

-Q hipri : queue가 hipri로 들어오면 수행하는 데몬이고 나머지는 수행안한다.

apply_async를 써야 real-time으로 queue를 제어할 수 있고, delay를 사용할려면 queue는 속성으로 한번 지정되면,

다시 수정해서 celery데몬을 띄워야 하니 나는 항상 apply_async를 사용한다.

1) add.apply_async((2,5))
2) add.apply_async((2,3), queue='celery')
3) add.apply_async((2,5), queue='hipri')

1)번과 2)번은 default로 동일하다.

3)번은 데몬의 처리queue가 hipri인 대상이 수행하라는 뜻이다.

 

이걸로 아래의 실무에 적용할수 있다.(예시임.)

sequenced : long running job은 하나의 데몬이 순차적으로 수행

paralleled : 가벼운 작업은 여러 데몬이 병렬로 수행

near_real : 준실시간 처리는 여기로... 등등

 

여러 celery데몬을 띄워서 제어하는 것 까지 실무에서 많이 사용한다.

이상 기본적인 Celery를 마쳤으며 이정도만 알아도 활용할 수 있다.

더 고급정보는 실무에 활용하면서 배우기 바란다.

 

 

'Python' 카테고리의 다른 글

Python Celery Task Monitoring  (0) 2019.10.08
Python Lambda  (0) 2019.09.03
Python - Monkey Patch  (1) 2019.05.02
Python Excel to MySQL  (0) 2019.04.12
Database 정보 CSV 작성방법  (0) 2019.04.10

Monkey Patch란?

업무를 하면서 가끔 회사에서 Monkey Patch를 하고...라는 말을 듣는다.

무슨뜻이지? 하면서 한번도 찾아본적이 없어서 이번에 찾아보고 정리한다.

 

위키피디아에 이렇게 정리되어 있다.

 

A monkey patch is a way for a program to extend or modify supporting system software locally (affecting only the running instance of the program).

해석을 한다면 monkey patch는 프로그램이 내부 프로그램을 로컬에서 잠시 다른 기능을 수행하도록 수정하거나 확장하도록 하는 방법이며 이는 로컬이므로 런타임시에만 영향을 주는 방식이다.

 

예시로는 Python으로 아래와 같이 있다.

>>> import math
>>> math.pi
3.141592653589793
>>> math.pi = 3
>>> math.pi
3
>>> ================================ RESTART ================================
>>> import math
>>> math.pi
3.141592653589793
>>>

위의 내용만 봐도 이해는 했을 것이다.

그런데 어디다가 쓰지?

 

예상은 했겠지만, Test할때 주로 쓴다.

그리고 다른 기능을 수행하도록 하는 부분 또한 Mock Object를 원하는 기능이 수행하도록 만든 뒤 그것으로 replace하는 것이다.

 

my_calendar.py

import requests
from datetime import datetime

def is_weekday():
    today = datetime.today()
    return (0 <= today.weekday() < 5)

def get_holidays():
    r = requests.get('http://xxx.holiday.org/api/holidays')
    if r.status_code == 200:
        return r.json()
    return None

위와 같은 코드를 만들었고, 이제 테스트를 해야 한다.

get_holidays() 함수는 requests가 필요한데 아직 위의 url에서는 해당 api를 서비스하지 않고 있다.

즉, 테스트를 하기가 어렵다.

 

아래와 같이 tests.py를 한번 보자.

 

import unittest
from my_calendar import get_holidays
from requests.exceptions import Timeout
from unittest.mock import patch

class TestCalendar(unittest.TestCase):
    @patch('my_calendar.requests')
    def test_get_holidays_timeout(self, mock_requests):
            mock_requests.get.side_effect = Timeout
            with self.assertRaises(Timeout):
                get_holidays()
                mock_requests.get.assert_called_once()

if __name__ == '__main__':
    unittest.main()

위의 코드는 my_calenadar class를 Test하기 위한 코드이다.

 

@patch('my_calendar.requests') 이부분이 위에 설명되었던 Monkey Patch부분이다.

@patch decorator에 들어가는 파라메터(my_calendar.requests)는 replace 대상(target)이 되는 속성이다.

즉, my_calendar 소스내에 import requests 가 있는데 이걸 내가 원하는 requests로 바꿀려고 한다.

그럼 바꿀려고 하는 mock object 참조변수는 def test_get_holidays_timeout(self, mock_requests) 의 mock_requests이다.

 

따라서 해당 함수 안에서 mock_requests.get.side_effect = Timeout 즉 해당 request의 get() 함수는 실제 url에 접속하고 관계없이,

Timeout을 Raise해줘.. 라는 것으로 가짜 object를 만들었다. 이제 @patch decorator를 통해 실제 my_calendar.py의 requests 객체를 바꾸었으니 Exception Test가 가능해졌다.

 

with 문 이하는 본 monkey patch와 관계없는 Mock관련 사항이므로 설명을 생략한다.

 

따로 Python의 Mock 관련해서는 설명 글을 게시할 예정이다.

 

정리하자면 위와 같이 requests 는 아시겠지만 이를 테스트할려면 해당 api서버가 개발을 해주거나 서비스를 제공해야 테스트 가능하다.

이부분을 내가 로컬에서 단위테스트를 하기 위해서는 원래의 requests를 내가 원하는 방식으로 바꿔줘야 하는데 이게 monkey patching이며, 원하는 방식으로 바꾸기 위한 임시객체(가짜객체)를 Mock Object라 하며 Python에서는 unittest.mock에서 강력한 기능들을 제공한다.

 

끝.

'Python' 카테고리의 다른 글

Python Celery Task Monitoring  (0) 2019.10.08
Python Lambda  (0) 2019.09.03
Celery from scratch  (0) 2019.05.20
Python Excel to MySQL  (0) 2019.04.12
Database 정보 CSV 작성방법  (0) 2019.04.10

대량 엑셀을 효율적으로 ...

배경

현업이 IT부서에 Raw Data를 뽑아달라고 했다.

뽑아보니 레코드 수도 그렇지만 700Mb가 넘는 사이즈였다...

이걸로 뭘 하겠냐고 물어보니 엑셀함수를 이용해서 데이터 검증을 하겠다고 한다...

일단 뽑아줄테니 이걸로 업무하기는 어려울것 같다, 화일 여는데만 10분걸리고 수정/저장할때마다 10분씩 걸릴거다..

라고 하였다. ( 매주 하던 작업이고 할때마다 하루종일 걸린다고 했다..)

 

업무처리 방식 제안

도저히 생산성이 안나올테니 걸어볼 엑셀 함수와 최종 어떤 작업할껀지 알아내고, IT부서에서 기본적인 엑셀 작업을 해주기로 했다.

 

Python 활용..

Python script를 통해 제공한 엑셀을 내 Local DB( MySQL ) 로 로드하여 각종 함수를 적용하고 다시 그것을 엑셀로 뽑아서 주면 된다.

다른 라이브러리도 있지만 나는 pandas 를 익숙하게 잘 쓰고 싶어서 pandas로 하고 python orm은 sqlalchemy를 활용하기로 했다.

 

따로 엑셀함수등 구현은 업무마다 많이 상이하니..

"Excel File을 DB에 로드" 하는 부분만 예시로 기재한다.

import pandas as pd
from sqlalchemy import create_engine
table = pd.read_excel('엑셀화일명.xlsx', sheet_name='쉬트명', header=0,)
engine = create_engine("mysql+pymysql://DB계정:비밀번호@127.0.0.1:3306/스키마명", encoding='utf-8-sig')
table.to_sql(name='테이블명', con=engine, if_exists='append', index=False)

위 Script는 연습해서 외워서 필요할때 테이블만 만들고 간단히 메일쓰듯이 할 수 있으면 좋겠다.

이거 하나 해주니 현업이 엄청 좋아했다. 덩달아 나의 평가도 올라갔다.


잠깐! 꼭 알아야 할 것

1. Encoding

내 Table 의 첫번째 컬럼은 Bigint(20) 이었다.

그런데 들어가다가 오류가 났다. 분명히 눈엔 47748649 만 보이는데...Why?

위 engine 의 encoding option을 보면 처음에는 utf-8 로 했다가 utf-8-sig 로 하니까 잘된다.

utf-8의 BOM문제라고 한다.  그래서 그냥 utf-8 이 아닌 utf-8-sig ( 약어는...signature로 예상됨. )

로 변경하니 잘된다. 따라서 위 오류나면 ( \ufeff --> \u : 유니코드 의미, feff : UTF-16 Big Endian 의미)

즉 UTF-16 Big Endian으로 Encoding된 문자를 utf-8로 해서 앞에 BOM 문자가 포함되어 오류남.

위처럼 하면 utf-8 로 인코딩하되, signature를 보고 input 문자의 encoding을 확인하라는 의미임.

참고 URL : http://blog.wystan.net/2007/08/18/bom-byte-order-mark-problem

 

 2. sqlalchemy , pymysql 등 환경

pip install pandas
pip install pymysql
pip install sqlalchemy

따라하기 예제코드

아래는 Full Code 이며, 계정과 비밀번호, 스키마명만 변경해서 하면 된다.

테이블구조는 예제 실습을 위해 임의로 정하였다.

 

내 Local에 테이블이 없는 상태이다.

mysql table 확인

처리해야 할 엑셀 화일 샘플이다.( raw_data_test.xlsx)

아래 코드 실행

import pandas as pd
from sqlalchemy import create_engine

engine = create_engine("mysql+pymysql://계정:비밀번호@127.0.0.1:3306/스키마명", encoding='utf-8-sig')

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.dialects.mysql import BIGINT
from sqlalchemy import Column, String, Integer

Base = declarative_base()

class BillRawTest(Base):
    __tablename__ = 'bill_raw_test'
    __table_args__ = {'extend_existing': True}
    bill_id = Column(BIGINT(20), primary_key  = True)
    item = Column(String(20))
    amount = Column(Integer)

metadata = Base.metadata
metadata.create_all(engine)

df = pd.read_excel('~/Downloads/bill_raw_test.xlsx', sheet_name='Sheet1', header=0)
df.to_sql(name='bill_raw_test', con = engine, if_exists='append', index=False)

 

실행 후 DB조회

 

끝.

'Python' 카테고리의 다른 글

Python Celery Task Monitoring  (0) 2019.10.08
Python Lambda  (0) 2019.09.03
Celery from scratch  (0) 2019.05.20
Python - Monkey Patch  (1) 2019.05.02
Database 정보 CSV 작성방법  (0) 2019.04.10

MySQL 의 테이블 데이터를 CSV로 만들기

현장에서 운영업무를 하다 보면 현업이 Raw 데이터를 긴급으로 CSV로 달라는 요청이 종종 있다.

서버는 상용이고 터미널이외엔 적당한 tool이 없다.

 

이런 경우, Python을 활용하면 빨리 된다.

상용의 개인계정(리눅스)에 python 과 pip를 설치 할 수만 있다면 OK!

 

리눅스 서버에서,

pip install pandas
pip install sqlachemy

 

그리고 python 을 쳐서 python prompt 로 진입.

import pandas as pd
import sqlalchemy as sql

connect_string = 'mysql+pymysql://유저명:비밀번호@DB서버호스트명:3306/스키마명'

sql_engine = sql.create_engine(connect_string)
query =query = "select * from 주문테이블 where 조건들"
df = pd.read_sql_query(query, sql_engine)
df.to_csv(r'test.csv')

 

위와 같이 하면 총 7줄의 코딩으로  간단히 처리된다.

 

끝.

 

'Python' 카테고리의 다른 글

Python Celery Task Monitoring  (0) 2019.10.08
Python Lambda  (0) 2019.09.03
Celery from scratch  (0) 2019.05.20
Python - Monkey Patch  (1) 2019.05.02
Python Excel to MySQL  (0) 2019.04.12

+ Recent posts