import calendar, datetime from datetime import timedelta from decimal import Decimal from itertools import groupby from django.shortcuts import render from django.contrib.auth.models import User, Group from django.core import exceptions from django.db.models import Q from caremyway.api.models import UserInfo, Client, Provider, WorkType, Manage, Price, Shift from rest_framework import viewsets, permissions, status, serializers from rest_framework.decorators import api_view from rest_framework.response import Response from rest_framework.views import APIView from rest_framework.renderers import StaticHTMLRenderer from caremyway.api.serializers import UserSerializer, UserInfoSerializer, ClientSerializer, ProviderSerializer, WorkTypeSerializer, EmployeeSerializer, EmployerSerializer, PriceSerializer, CShiftSerializer, PShiftSerializer, TimeSheetSerializer, RecordSheetSerializer, PayStubSerializer class UserViewSet(viewsets.ModelViewSet): lookup_field = 'username' serializer_class = UserSerializer # Disallow POSTing from /users/ route. Only rest-auth can add users. http_method_names = ['get', 'head', 'put', 'options'] def get_queryset(self): return User.objects.filter(username=self.request.user) class UserInfoViewSet(viewsets.ModelViewSet): lookup_field = 'user__username' serializer_class = UserInfoSerializer # Disallow DELETE http_method_names = ['get', 'post', 'head', 'put', 'options'] def get_queryset(self): return UserInfo.objects.filter(user__username=self.request.user) class ClientViewSet(viewsets.ModelViewSet): lookup_field = 'user__username' serializer_class = ClientSerializer # Disallow DELETE http_method_names = ['get', 'post', 'head', 'put', 'options'] def get_queryset(self): return Client.objects.filter(user__username=self.request.user) class ProviderViewSet(viewsets.ModelViewSet): lookup_field = 'user__username' serializer_class = ProviderSerializer # Disallow DELETE http_method_names = ['get', 'post', 'head', 'put', 'options'] def get_queryset(self): return Provider.objects.filter(user__username=self.request.user) class WorkTypeViewSet(viewsets.ModelViewSet): lookup_field = 'uuid' serializer_class = WorkTypeSerializer def get_queryset(self): return WorkType.objects.filter(client__user__username=self.request.user) def destroy(self, request, *args, **kwargs): instance = self.get_object() instance.deleted = True instance.save() return Response(status=status.HTTP_204_NO_CONTENT) class EmployeeViewSet(viewsets.ModelViewSet): lookup_field = 'uuid' serializer_class = EmployeeSerializer def get_queryset(self): return Manage.objects.filter(client__user__username=self.request.user) def destroy(self, request, *args, **kwargs): instance = self.get_object() instance.deleted = True instance.save() return Response(status=status.HTTP_204_NO_CONTENT) class EmployerViewSet(viewsets.ModelViewSet): lookup_field = 'uuid' serializer_class = EmployerSerializer # Disallow creation and deletions of relationships http_method_names = ['get', 'head', 'put', 'options'] def get_queryset(self): return Manage.objects.filter(provider__user__username=self.request.user) class PriceViewSet(viewsets.ModelViewSet): lookup_field = 'uuid' serializer_class = PriceSerializer def get_queryset(self): return Price.objects.filter(management__client__user__username=self.request.user) def destroy(self, request, *args, **kwargs): instance = self.get_object() instance.deleted = True instance.save() return Response(status=status.HTTP_204_NO_CONTENT) class CShiftViewSet(viewsets.ModelViewSet): lookup_field = 'uuid' serializer_class = CShiftSerializer def get_queryset(self): return Shift.objects.filter(deleted=False) \ .filter(price__management__client__user__username=self.request.user) \ .order_by('-set_start') def destroy(self, request, *args, **kwargs): instance = self.get_object() if instance.actual_start: return Response("Can't delete after check in.", status=status.HTTP_400_BAD_REQUEST) instance.deleted = True instance.save() return Response(status=status.HTTP_204_NO_CONTENT) class PShiftViewSet(viewsets.ModelViewSet): lookup_field = 'uuid' serializer_class = PShiftSerializer # Disallow creation and deletions of relationships http_method_names = ['get', 'head', 'put', 'options'] def get_queryset(self): return Shift.objects.filter(deleted=False) \ .filter(price__management__provider__user__username=self.request.user) \ .order_by('-set_start') def validate_param(value, field): class ValidateParam(serializers.Serializer): value = field obj = ValidateParam(data={'value': value}) if obj.is_valid(): return obj.validated_data.get('value'); else: raise serializers.ValidationError(obj.errors['value']) def get_paystart(payday): # Assumes payday is a valided date obj if payday.day == 15: paystart_day = 1 elif payday.day == calendar.monthrange(payday.year, payday.month)[1]: paystart_day = 16 else: raise serializers.ValidationError("Date is not a valid payday.") return datetime.date(payday.year, payday.month, paystart_day) - timedelta(days=4) def get_payend(payday): return payday - timedelta(days=4) def gen_timesheets(user, payday, manage=None): paystart = get_paystart(payday) payend = get_payend(payday) timesheets = [] shifts = Shift.objects.filter(deleted=False) \ .exclude(actual_end__isnull=True) \ .filter( Q(price__management__client__user__username=user) | Q(price__management__provider__user__username=user)) \ .filter(set_date__gte=paystart) \ .filter(set_date__lte=payend) \ .order_by('price__management', 'actual_start') if manage: shifts = shifts.filter(price__management__uuid=manage) manage_shifts = groupby(shifts, lambda x: x.price.management) for management, shifts in manage_shifts: timesheet = {} timesheet['management'] = management timesheet['client'] = management.client timesheet['provider'] = management.provider timesheet['payday'] = payday timesheet['paystart'] = paystart timesheet['payend'] = payend timesheet['shifts'] = [] for s in shifts: shift = {} shift['date'] = s.set_date shift['start'] = s.actual_start shift['end'] = s.actual_end shift['hours'] = (shift['end'] - shift['start']).total_seconds() / 3600 shift['rate'] = s.set_price shift['amount'] = Decimal(shift['hours']) * shift['rate'] shift['work'] = s.price.work_type.label timesheet['shifts'].append(shift) timesheets.append(timesheet) return timesheets def get_paydays_todate(payday): # Assumes payday is a valided date obj year = payday.year paydays = [] if payday.day == 15: paydays.append(payday) elif payday.day == calendar.monthrange(payday.year, payday.month)[1]: paydays.append(payday) paydays.append(datetime.date(year, payday.month, 15)) else: raise serializers.ValidationError("Date is not a valid payday.") for month in reversed(range(1, payday.month)): end_of_month = calendar.monthrange(year, month)[1] paydays.append(datetime.date(year, month, end_of_month)) paydays.append(datetime.date(year, month, 15)) return list(reversed(paydays)) def get_cpp(cpp_ytd, earnings): D = cpp_ytd PI = earnings P = 24 C_i = Decimal(2564.10) - D C_ii = Decimal(0.0495) * (PI - Decimal(3500 / P)) C = min(C_i, C_ii) if C < 0: C = 0 return C def get_ei(ei_ytd, earnings): D1 = ei_ytd IE = earnings EI_i = Decimal(836.19) - D1 EI_ii = Decimal(0.0163) * IE EI = min(EI_i, EI_ii) if EI < 0: EI = 0 return EI def get_taxable_income(earnings): P = 24 I = earnings F = 0 F2 = 0 U1 = 0 HD = 0 F1 = 0 A = (P * (I - F - F2 - U1)) - HD - F1 return A def get_fed_tax(taxable_income, cpp, ei, cpp_ytd, ei_ytd): table = [(0, 0.150, 0), (45916, 0.205, 2525), (91831, 0.260, 7576), (142352, 0.290, 11847), (202800, 0.330, 19959)] A = taxable_income R = Decimal([row for row in table if row[0] < A][-1][1]) K = [row for row in table if row[0] < A][-1][2] TC = 11635 K1 = Decimal(0.15 * TC) P = 24 if cpp_ytd + cpp >= 2564.10: PxC = Decimal(2564.10) else: C = cpp PxC = P * C if ei_ytd + ei >= 836.19: PxEI = Decimal(836.19) else: EI = ei PxEI = P * EI K2 = (Decimal(0.15) * PxC) + (Decimal(0.15) * PxEI) K3 = 0 K4_i = Decimal(0.15) * A K4_ii = Decimal(0.15 * 1178) K4 = min(K4_i, K4_ii) T3 = (R * A) - K - K1 - K2 - K3 - K4 if T3 < 0: T3 = 0 return T3 def get_fed_tax_payable(fed_tax): T3 = fed_tax LCF = 0 T1 = T3 - LCF if T1 < 0: T1 = 0 return T1 def get_prov_tax(taxable_income, cpp, ei, cpp_ytd, ei_ytd): table = [(0, 0.10, 0), (126625, 0.12, 2533), (151950, 0.13, 4052), (202600, 0.14, 6078), (303900, 0.15, 9117)] A = taxable_income V = Decimal([row for row in table if row[0] < A][-1][1]) KP = [row for row in table if row[0] < A][-1][2] TCP = 18690 K1P = Decimal(0.1 * TCP) P = 24 if cpp_ytd + cpp >= 2564.10: PxC = Decimal(2564.10) else: C = cpp PxC = P * C if ei_ytd + ei >= 836.19: PxEI = Decimal(836.19) else: EI = ei PxEI = P * EI K2P = (Decimal(0.10) * PxC) + (Decimal(0.10) * PxEI) K3P = 0 T4 = (V * A) - KP - K1P - K2P - K3P if T4 < 0: T4 = 0 return T4 def get_prov_tax_deduction(prov_tax): T4 = prov_tax V1 = 0 S = 0 LCP = 0 T2 = T4 - V1 - S - LCP if T2 < 0: T2 = 0 return T2 def get_income_tax(fed_tax_payable, prov_tax_deduction): T1 = fed_tax_payable T2 = prov_tax_deduction P = 24 L = 0 T = Decimal((T1 + T2) / P) + L return T def gen_recordsheets(user, payday, manage): paydays = get_paydays_todate(payday) recordsheets = [] flat_records = [] for payday in paydays: timesheets = gen_timesheets(user, payday, manage) for timesheet in timesheets: record = {} record['payday'] = payday record['management'] = timesheet['management'] record['reg_pay'] = sum(shift['amount'] for shift in timesheet['shifts']) record['vac_pay'] = record['reg_pay'] * Decimal(0.04) record['earnings'] = record['reg_pay'] + record['vac_pay'] record['cpp_ytd'] = sum(r['cpp'] for r in flat_records if r['management'] == timesheet['management']) record['cpp'] = get_cpp(record['cpp_ytd'], record['earnings']) record['ei_ytd'] = sum(r['ei'] for r in flat_records if r['management'] == timesheet['management']) record['ei'] = get_ei(record['ei_ytd'], record['earnings']) record['taxable_income'] = get_taxable_income(record['earnings']) record['fed_tax'] = get_fed_tax(record['taxable_income'], record['cpp'], record['ei'], record['cpp_ytd'], record['ei_ytd']) record['fed_tax_payable'] = get_fed_tax_payable(record['fed_tax']) record['prov_tax'] = get_prov_tax(record['taxable_income'], record['cpp'], record['ei'], record['cpp_ytd'], record['ei_ytd']) record['prov_tax_deduction'] = get_prov_tax_deduction(record['prov_tax']) record['income_tax'] = get_income_tax(record['fed_tax_payable'], record['prov_tax_deduction']) record['total_deductions'] = record['income_tax'] + record['cpp'] + record['ei'] record['net_pay'] = record['earnings'] - record['total_deductions'] flat_records.append(record) flat_records = sorted(flat_records, key=lambda x: x['management'].uuid) grouped_records = groupby(flat_records, lambda x: x['management']) for management, record in grouped_records: recordsheet = {} recordsheet['management'] = management recordsheet['client'] = management.client recordsheet['provider'] = management.provider recordsheet['phone_number'] = management.provider.user.userinfo.phone_number recordsheet['sin'] = management.provider.sin recordsheet['record'] = list(record) recordsheets.append(recordsheet) return list(recordsheets) def gen_paystubs(user, payday, manage): recordsheets = gen_recordsheets(user, payday, manage) print(recordsheets) paystubs = [] for recordsheet in recordsheets: lastrecord = recordsheet['record'][-1] paystub = {} paystub['management'] = recordsheet['management'] paystub['client'] = recordsheet['client'] paystub['provider'] = recordsheet['provider'] paystub['payday'] = lastrecord['payday'] paystub['reg_pay'] = lastrecord['reg_pay'] paystub['vac_pay'] = lastrecord['vac_pay'] paystub['earnings'] = lastrecord['earnings'] paystub['cpp'] = lastrecord['cpp'] paystub['ei'] = lastrecord['ei'] paystub['income_tax'] = lastrecord['income_tax'] paystub['total_deductions'] = lastrecord['total_deductions'] paystub['net_pay'] = lastrecord['net_pay'] paystubs.append(paystub) return list(paystubs) class ReportView(APIView): def get(self, request, *args, **kwargs): response = [] report_type = kwargs['type'] qp = request.query_params if report_type == 'timesheet': payday = validate_param(qp.get('payday'), serializers.DateField()) manage = validate_param(qp.get('manage'), serializers.UUIDField(allow_null=True)) timesheets = gen_timesheets(request.user, payday, manage) response = TimeSheetSerializer(timesheets, many=True).data elif report_type == 'record': payday = validate_param(qp.get('payday'), serializers.DateField()) manage = validate_param(qp.get('manage'), serializers.UUIDField(allow_null=True)) recordsheets = gen_recordsheets(request.user, payday, manage) response = RecordSheetSerializer(recordsheets, many=True).data elif report_type == 'paystub': payday = validate_param(qp.get('payday'), serializers.DateField()) manage = validate_param(qp.get('manage'), serializers.UUIDField(allow_null=True)) paystubs = gen_paystubs(request.user, payday, manage) response = PayStubSerializer(paystubs, many=True).data return Response(response, status=status.HTTP_200_OK) @api_view() def null_view(request): return Response(status=status.HTTP_400_BAD_REQUEST)