Saturday, December 9, 2023

Monitoring Data Usage with Python: A Two-Part Program

Are you frequently puzzled by the rapid depletion of your data, wondering if there's an explanation for its swift consumption? Perhaps you've had moments where it feels like your data plan is slipping away faster than anticipated, leaving you with a sense of being shortchanged. If you find yourself yearning for the ability to keep a close eye on your data usage and establish a cutoff point based on a specific megabyte threshold, you're not alone. The desire to gain better control over your data consumption is a common sentiment, and fortunately, there are solutions available to help you achieve just that.

In this article, we'll explore how to monitor and display data usage using a Python program. The solution consists of two programs: a data usage monitoring program responsible for capturing real-time data usage and saving it to a MySQL database, and a PyQt6 program that displays the data usage per hour.

Data Usage Monitoring Program

The data usage monitoring program utilizes the psutil library to collect real-time network statistics, and it interacts with the WorldTimeAPI to obtain the current time with a specified timezone. The collected data is then stored in a MySQL database.


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import psutil
import time
import requests
import mysql.connector as mysql
from datetime import datetime, timedelta

incoming = 0
outgoing = 0
data_usage = 0
t = 0

def get_current_time_with_offset(timezone="Asia/Hong_Kong"):
    try:
        response = requests.get(f"http://worldtimeapi.org/api/timezone/{timezone}")

        if response.status_code == 200:
            data = response.json()
            current_time_str = data['datetime']
            current_time = datetime.strptime(current_time_str, "%Y-%m-%dT%H:%M:%S.%f%z")
            new_time = current_time + timedelta(seconds=1)
            return new_time
        else:
            print(f"Error: Unable to fetch current time. Status code: {response.status_code}")

    except Exception as e:
        print(f"An error occurred: {e}")

# Example usage:
new_time = get_current_time_with_offset()

def get_realtime_data_usage(interval=1):
    global new_time, data_usage, outgoing, incoming, t

    prev_net_stats = psutil.net_io_counters()

    while True:
        if t / 10 == 1:
            t = 0
            new_time = get_current_time_with_offset()
        t += 1
        new_time = new_time + timedelta(seconds=1)
        current_net_stats = psutil.net_io_counters()

        data_transferred = (
            (current_net_stats.bytes_sent - prev_net_stats.bytes_sent) +
            (current_net_stats.bytes_recv - prev_net_stats.bytes_recv)
        ) / 1000
        data_usage = data_usage + round(data_transferred / 1000, 3)
        outgoing = outgoing + round((current_net_stats.bytes_sent - prev_net_stats.bytes_sent) / 1000, 3)
        incoming = incoming + round((current_net_stats.bytes_recv - prev_net_stats.bytes_recv) / 1000, 3)

        data_usage1 = round(data_transferred, 3)
        outgoing1 = round((current_net_stats.bytes_sent - prev_net_stats.bytes_sent) / 1000, 3)
        incoming1 = round((current_net_stats.bytes_recv - prev_net_stats.bytes_recv) / 1000, 3)

        print(
            str(new_time.strftime("%Y-%m-%d %H:%M:%S")) +
            '-' + str(round(data_usage, 3)) +
            '-' + str(round(outgoing, 3)) +
            '-' + str(round(incoming, 3))
        )

        db = mysql.connect(
            host="localhost",
            user="root",
            passwd="",
            database="data_usage"
        )
        cursor = db.cursor()
        sql = "INSERT INTO data_usage (date_time, data_usage, incoming, outgoing) VALUES (%s, %s, %s, %s)"
        val = (new_time.strftime("%Y-%m-%d %H:%M:%S"), round(data_usage1, 3), round(incoming1, 3), round(outgoing1, 3))
        cursor.execute(sql, val)
        db.commit()

        prev_net_stats = current_net_stats
        time.sleep(interval)

# Example usage:
get_realtime_data_usage()

This program constantly monitors data usage and inserts the collected information into a MySQL database. The data includes the timestamp, total data usage, incoming data, and outgoing data.


PyQt6 Program for Display

The PyQt6 program provides a graphical user interface to display data usage per hour. It offers features like auto-refresh, specifying the refresh interval, and selecting a date for display.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
import sys
from PyQt6.QtWidgets import QApplication, QMainWindow, QMessageBox, QVBoxLayout, QWidget, QTableWidget, QTableWidgetItem, QPushButton, QDateEdit, QCheckBox, QLabel, QLineEdit
from PyQt6.QtGui import QFont, QColor
from PyQt6.QtCore import Qt, QDate, QTimer, QDateTime
import mysql.connector

class DataUsageApp(QMainWindow):
    def __init__(self):
        super().__init__()
        self.refresh_timer = QTimer()
        self.refresh_timer.timeout.connect(self.refresh_data)

        self.init_ui()

    def init_ui(self):
        self.setWindowTitle('Data Usage App')
        central_widget = QWidget(self)
        self.setCentralWidget(central_widget)

        layout = QVBoxLayout(central_widget)

        self.label_date_time = QLabel(self)
        self.label_date_time.setAlignment(Qt.AlignmentFlag.AlignCenter)
        font = self.label_date_time.font()
        font.setPointSize(30)
        self.label_date_time.setFont(font)
        layout.addWidget(self.label_date_time)

        current_date_time = QDateTime.currentDateTime()
        date_time_str = current_date_time.toString("yyyy-MM-dd hh:mm:ss")
        self.label_date_time.setText(date_time_str)

        qdate = QDate.currentDate()
        self.date_edit = QDateEdit(qdate, self)
        self.date_edit.setCalendarPopup(True)
        layout.addWidget(self.date_edit)

        self.label_refresh_interval = QLabel('Refresh Interval (seconds):', self)
        layout.addWidget(self.label_refresh_interval)

        self.line_edit_refresh_interval = QLineEdit(self)
        layout.addWidget(self.line_edit_refresh_interval)

        self.checkbox_auto_refresh = QCheckBox('Auto Refresh', self)
        self.checkbox_auto_refresh.stateChanged.connect(self.auto_refresh_changed)
        layout.addWidget(self.checkbox_auto_refresh)

        btn_refresh = QPushButton('Refresh Data', self)
        btn_refresh.clicked.connect(self.refresh_data)
        layout.addWidget(btn_refresh)

        self.table_widget = QTableWidget()
        layout.addWidget(self.table_widget)
        self.setGeometry(50, 55, 445, 800)

    def auto_refresh_changed(self, checked):
        if checked:
            self.start_auto_refresh()
        else:
            self.stop_auto_refresh()

    def start_auto_refresh(self):
        try:
            interval = int(self.line_edit_refresh_interval.text())
            if interval > 0:
                self.refresh_timer.start(interval * 1000)  # Convert seconds to milliseconds
            else:
                self.show_error_message("Please enter a valid positive refresh interval.")
        except ValueError:
            self.show_error_message("Please enter a valid number for the refresh interval.")

    def stop_auto_refresh(self):
        self.refresh_timer.stop()

    def show_error_message(self, message):
        error_box = QMessageBox(self)
        error_box.setIcon(QMessageBox.Icon.Critical)
        error_box.setWindowTitle("Error")
        error_box.setText(message)
        error_box.exec()

    def refresh_data(self):
        self.table_widget.clear()

        current_date_time = QDateTime.currentDateTime()
        date_time_str = current_date_time.toString("yyyy-MM-dd hh:mm:ss")
        self.label_date_time.setText(date_time_str)

        selected_date = self.date_edit.date().toString('yyyy-MM-dd')
        result = self.execute_query(selected_date)
        self.populate_table(result)

        if self.checkbox_auto_refresh.isChecked():
            try:
                interval = int(self.line_edit_refresh_interval.text())
                if interval > 0:
                    self.refresh_timer.start(interval * 1000)  # Convert seconds to milliseconds
                else:
                    self.show_error_message("Please enter a valid positive refresh interval.")
                    self.refresh_timer.stop()
            except ValueError:
                self.show_error_message("Please enter a valid positive refresh interval.")
                self.refresh_timer.stop()
        else:
            self.refresh_timer.stop()

    def execute_query(self, selected_date):
        try:
            connection = mysql.connector.connect(
                user='root',
                password='',
                host='localhost',
                database='data_usage'
            )

            cursor = connection.cursor()

            select_query = f"""
            SELECT
                DATE_FORMAT(date_time, '%H:00:00') AS hour,
                ROUND(SUM(data_usage), 3) AS total_data_usage,
                ROUND(SUM(incoming), 3) AS total_incoming,
                ROUND(SUM(outgoing), 3) AS total_outgoing
            FROM
                data_usage
            WHERE
                DATE(date_time) = STR_TO_DATE('{selected_date}', '%Y-%m-%d')
            GROUP BY
                hour
            ORDER BY
                hour;
            """

            cursor.execute(select_query)
            result = cursor.fetchall()
            return result

        except mysql.connector.Error as err:
            print(f"Error: {err}")

        finally:
            if 'connection' in locals() and connection.is_connected():
                cursor.close()
                connection.close()

    def populate_table(self, result):
        self.table_widget.setRowCount(len(result))
        self.table_widget.setColumnCount(len(result[0]) if result else 0)

        header_labels = ['Hour', 'Total Data Usage', 'Total Incoming', 'Total Outgoing']
        self.table_widget.setHorizontalHeaderLabels(header_labels)

        for row_index, row_data in enumerate(result):
            for col_index, col_data in enumerate(row_data):
                item = QTableWidgetItem(str(col_data))
                self.table_widget.setItem(row_index, col_index, item)

                if col_index in [1, 2, 3]:
                    item.setTextAlignment(Qt.AlignmentFlag.AlignRight | Qt.AlignmentFlag.AlignVCenter)

        self.add_totals_row(result)

    def add_totals_row(self, result):
        self.table_widget.insertRow(self.table_widget.rowCount())

        totals = [sum(float(column) if column and str(column).replace('.', '', 1).isdigit() else 0.0 for column in col) for col in zip(*result)]

        for col_index, total in enumerate(totals):
            item = QTableWidgetItem(str(round(total, 3)))
            item.setFlags(item.flags() & ~Qt.ItemFlag.ItemIsEditable)
            self.table_widget.setItem(self.table_widget.rowCount() - 1, col_index, item)

            if col_index in [1, 2, 3]:
                item.setTextAlignment(Qt.AlignmentFlag.AlignRight | Qt.AlignmentFlag.AlignVCenter)

            if col_index == 0:
                item.setText('TOTAL')
                font = item.font()
                font.setBold(True)
                item.setFont(font)

            item.setBackground(QColor(211, 211, 211))

def main():
    app = QApplication(sys.argv)
    ex = DataUsageApp()
    ex.show()
    sys.exit(app.exec())

if __name__ == '__main__':
    main()

The screenshot:


This PyQt6 program provides a user-friendly interface for visualizing data usage. It includes features such as auto-refresh, specifying refresh intervals, and selecting dates for data display. The data is fetched from the MySQL database and presented in a tabular format.

Feel free to customize and enhance these programs based on your specific requirements.


No comments:

Post a Comment